Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Debezium
Вы можете отслеживать изменения данных в Managed Service for PostgreSQL и отправлять их в Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).
Далее будет продемонстрировано на примере, как настроить CDC с помощью Debezium.
Перед началом работы
-
Создайте кластер-источник Managed Service for PostgreSQL:
- с хостами в публичном доступе;
- с базой данных
db1
; - с пользователем
user1
.
-
Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей вам конфигурации с хостами в публичном доступе.
-
Создайте виртуальную машину для Debezium с Ubuntu 20.04 и публичным IP-адресом.
-
Настройте группы безопасности так, чтобы к кластерам можно было подключиться из созданной виртуальной машины:
-
Подключитесь к виртуальной машине по SSH и установите зависимости:
sudo apt update && \ sudo apt install -y git docker.io kafkacat postgresql-client
-
Подключитесь к виртуальной машине по SSH и проверьте, что доступны кластеры:
- Managed Service for PostgreSQL (используйте утилиту
psql
); - Managed Service for Apache Kafka® (используйте утилиту
kafkacat
).
- Managed Service for PostgreSQL (используйте утилиту
Подготовьте кластер-источник
-
Назначьте пользователю
user1
рольmdb_replication
.Это нужно для создания публикации (publication), с помощью которой Debezium будет отслеживать изменения в кластере Managed Service for PostgreSQL.
-
Подключитесь к базе данных
db1
кластера Managed Service for PostgreSQL от имени пользователяuser1
. -
Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.
Создайте таблицу:
CREATE TABLE public.measurements( "device_id" TEXT PRIMARY KEY NOT NULL, "datetime" TIMESTAMP NOT NULL, "latitude" REAL NOT NULL, "longitude" REAL NOT NULL, "altitude" REAL NOT NULL, "speed" REAL NOT NULL, "battery_voltage" REAL, "cabin_temperature" REAL NOT NULL, "fuel_level" REAL );
Наполните таблицу данными:
INSERT INTO public.measurements VALUES ('iv9a94th6rztooxh5ur2', '2020-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qmz3sdbrbu', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th678tooxh5ur2', '2020-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
-
Создайте публикацию для добавленной таблицы:
CREATE PUBLICATION mpg_publication FOR TABLE public.measurements;
Настройте Debezium
-
Склонируйте репозиторий:
cd ~/ && \ git clone https://github.com/yandex-cloud/examples.git
-
Скачайте и распакуйте актуальный Debezium-коннектор в директорию
~/examples/mdb/managed-kafka/debezium-cdc/plugins/
.Ниже приведен пример для версии
1.6.0
. Нужные команды зависят от типа кластера-источника:Managed Service for PostgreSQLmkdir ~/examples/mdb/managed-kafka/debezium-cdc/plugins/ && \ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0.Final-plugin.tar.gz && \ tar -xzvf debezium-connector-postgres-1.6.0.Final-plugin.tar.gz -C ~/examples/mdb/managed-kafka/debezium-cdc/plugins/
-
Соберите образ Docker:
cd ~/examples/mdb/managed-kafka/debezium-cdc && \ sudo docker build --tag debezium ./
-
Создайте файл
~/examples/mdb/managed-kafka/debezium-cdc/mdb-connector.properties
с настройками Debezium для подключения к кластеру-источнику:Managed Service for PostgreSQLname=debezium-mpg connector.class=io.debezium.connector.postgresql.PostgresConnector plugin.name=pgoutput database.hostname=c-<идентификатор кластера>.rw.mdb.yandexcloud.net database.port=6432 database.user=user1 database.password=<пароль пользователя user1> database.dbname=db1 database.server.name=mpg table.include.list=public.measurements publication.name=mpg_publication slot.name=debezium_slot heartbeat.interval.ms=15000 heartbeat.topics.prefix=__debezium-heartbeat
Идентификатор кластера можно запросить со списком кластеров в каталоге.
Здесь:
name
— имя коннектора Debezium.database.hostname
— особый FQDN для подключения к хосту-мастеру кластера-источника.database.user
— имя пользователя PostgreSQL.database.dbname
— имя базы данных PostgreSQL.database.server.name
— произвольное имя сервера баз данных, которое Debezium будет использовать при выборе топика для отправки сообщений.table.include.list
— список имен таблиц, для которых Debezium должен отслеживать изменения. Укажите полные имена, включающие в себя имя схемы (по умолчаниюpublic
). Debezium будет использовать значения настроек из этого поля при выборе топика для отправки сообщений.publication.name
— имя публикации, созданной на кластере-источнике.slot.name
— имя слота репликации, который будет создан Debezium при работе с публикацией.heartbeat.interval.ms
иheartbeat.topics.prefix
— настройки heartbeat, необходимые для работы Debezium.
Подготовьте кластер-приемник
Настройки кластера-приемника зависят от типа кластера-источника:
-
Создайте топик
mpg.public.measurements
, в который будут помещаться данные, поступающие от кластера-источника.Имена топиков для данных конструируются по принципу:
<имя сервера>.<имя схемы>.<имя таблицы>
.Согласно файлу настроек Debezium:
- Имя сервера
mpg
указано в параметреdatabase.server.name
. - Имя схемы
public
указано вместе с именем таблицыmeasurements
в параметреtable.include.list
.
Если необходимо получить данные из нескольких таблиц, создайте для каждой из них отдельный топик.
- Имя сервера
-
Создайте служебный топик
__debezium-heartbeat.mpg
с политикой очистки логаcompact
.Имена служебных топиков конструируются по принципу:
<префикс для heartbeat>.<имя сервера>
.Согласно файлу настроек Debezium:
- Префикс
__debezium-heartbeat
указан в параметреheartbeat.topics.prefix
. - Имя сервера
mpg
указано в параметреdatabase.server.name
.
Если необходимо получить данные из нескольких кластеров-источников, создайте для каждого из них отдельный служебный топик.
- Префикс
-
Создайте учетную запись
debezium
.Выдайте ей следующие права на созданные топики:
ACCESS_ROLE_CONSUMER
ACCESS_ROLE_PRODUCER
Запустите Debezium
-
Укажите в файле
~/examples/mdb/managed-kafka/debezium-cdc/.env
переменные окружения, используемые для доступа к кластеру-приемнику:BROKERS=<FQDN хоста-брокера>:9091 USER=debezium PASSWORD=<пароль пользователя debezium>
Список FQDN хостов-брокеров можно запросить со списком хостов в кластере.
-
Запустите контейнер с собранным Docker-образом:
cd ~/examples/mdb/managed-kafka/debezium-cdc/ && \ sudo docker run --name debezium --rm --env-file .env \ -v ~/examples/mdb/managed-kafka/debezium-cdc/plugins:/home/appuser/plugins \ -v ~/examples/mdb/managed-kafka/debezium-cdc/mdb-connector.properties:/home/appuser/config/connector.properties \ debezium:latest
Контейнер будет непрерывно передавать новые данные из кластера-источника в кластер-приемник.
Проверьте работоспособность Debezium
-
Подключитесь к кластеру-приемнику, например, с помощью kafkacat, и проверьте, что данные поступают.
В сообщениях топика
mpg.public.measurements
должны присутствовать данные и схема их формата.Пример фрагмента сообщения{ "schema": { ... }, "payload": { "before": null, "after": { "device_id": "iv9a94th6rztooxh5ur2", "datetime": 1591378020000000, "latitude": 55.70329, "longitude": 37.65472, "altitude": 427.5, "speed": 0.0, "battery_voltage": 23.5, "cabin_temperature": 17.0, "fuel_level": null }, "source": { "version": "1.6.0.Final", "connector": "postgresql", "name": "mpg", "ts_ms": 1628245046882, "snapshot": "true", "db": "db1", "sequence": "[null,\"4328525512\"]", "schema": "public", "table": "measurements", "txId": 8861, "lsn": 4328525328, "xmin": null }, "op": "r", "ts_ms": 1628245046893, "transaction": null } }
-
Добавьте данные в таблицу на кластере-источнике:
Managed Service for PostgreSQL-
Добавьте еще одну строку в таблицу
measurements
:INSERT INTO public.measurements VALUES ('iv7b74th678tooxh5ur2', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
-
Подключитесь к кластеру-приемнику и убедитесь, что в топик
mpg.public.measurements
попали новые данные.
Удалите созданные ресурсы
Если созданные ресурсы вам больше не нужны, удалите их:
-
Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
-
Удалите кластеры: