Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
© 2022 ООО «Яндекс.Облако»
Практические руководства
  • Веб-сервис
    • Все руководства
    • Cтатический сайт в Object Storage
    • Cайт на LAMP- или LEMP-стеке
    • Отказоустойчивый сайт с балансировкой нагрузки через Network Load Balancer
    • Отказоустойчивый сайт с балансировкой нагрузки через Application Load Balancer
    • Сайт на базе Joomla с БД PostgreSQL
    • Сайт на WordPress
    • Сайт на WordPress с БД MySQL
    • Перенос WordPress сайта с хостинга в Yandex Cloud
    • Веб-сайт на базе 1С-Битрикс
    • Интеграция L7-балансировщика с Cloud CDN и Object Storage
    • Сине-зеленое и канареечное развертывание версий сервиса
  • Интернет-магазины
    • Все руководства
    • Интернет-магазин на 1С-Битрикс
    • Интернет-магазин на Opencart
  • Архив данных
    • Все руководства
    • Однонодовый файловый сервер
    • Настройка SFTP-сервера на Centos 7
    • Резервное копирование в Object Storage через Acronis
    • Резервное копирование в Object Storage через CloudBerry Desktop Backup
    • Резервное копирование в Object Storage через Duplicati
    • Резервное копирование в Object Storage через Bacula
    • Резервное копирование в Object Storage через Veritas Backup Exec
    • Оцифровка архива в Yandex Vision
  • Тестовая среда
    • Все руководства
    • Тестирование приложений с помощью GitLab
    • Создание тестовых ВМ через GitLab CI
    • Высокопроизводительные вычисления на прерываемых виртуальных машинах
    • Эмуляция множества IoT-устройств
    • Нагрузочное тестирование gRPC-сервиса
    • HTTPS-тест с постоянной нагрузкой с помощью Phantom
    • HTTPS-тест со ступенчатой нагрузкой с помощью Pandora
  • Управление инфраструктурой
    • Все руководства
    • Начало работы с Terraform
    • Загрузка состояний Terraform в Object Storage
    • Начало работы с Packer
    • Сборка образа ВМ с набором инфраструктурных инструментов с помощью Packer
    • Автоматизация сборки образов ВМ с помощью Jenkins
    • Непрерывное развертывание контейнеризованных приложений с помощью GitLab
    • Создание кластера Linux-серверов «1С:Предприятия» с кластером Managed Service for PostgreSQL
    • Создание кластера Windows-серверов «1С:Предприятия» с базой данных SQL Server
    • Миграция в Yandex Cloud с помощью Hystax Acura
    • Защита от сбоев с помощью Hystax Acura
    • Настройка отказоустойчивой архитектуры в Yandex Cloud
    • Создание SAP-программы в Yandex Cloud
  • Построение Data Platform
    • Все руководства
    • Синхронизация данных из MySQL с помощью Yandex Data Transfer
    • Миграция базы данных из Yandex Managed Service for MySQL в MySQL
    • Настройка управляемой базы данных в кластере ClickHouse для Graphite
    • Обмен данными между Yandex Managed Service for ClickHouse и Yandex Data Proc
    • Импорт базы данных в Yandex Data Proc с использованием Sqoop
    • Использование Confluent Schema Registry с Yandex Managed Service for Apache Kafka®
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Миграция данных в Yandex Managed Service for Apache Kafka®
    • Перенос коллекций из MongoDB в Yandex Managed Service for MongoDB
    • Миграция базы данных в Yandex Managed Service for SQL Server
    • Перенос данных из PostgreSQL в ClickHouse с помощью Yandex Data Transfer
    • Настройка Kafka Connect для работы с кластером Yandex Managed Service for Apache Kafka®
    • Настройка Yandex Cloud DNS для доступа к кластерам управляемых баз данных из других облачных сетей
    • Миграция в Yandex Managed Service for Elasticsearch с помощью Reindex API
    • Использование скриптов инициализации для настройки GeeseFS в Yandex Data Proc
  • Windows в Yandex Cloud
    • Все руководства
    • Развертывание Active Directory
    • Развертывание Microsoft Exchange
    • Развертывание Remote Desktop Services
    • Развертывание группы доступности Always On
    • Развертывание группы доступности Always On с внутренним сетевым балансировщиком
    • Развертывание Remote Desktop Gateway
  • Сетевая маршрутизация
    • Все руководства
    • Маршрутизация с помощью NAT-инстанса
    • Создание VPN-туннеля
    • Установка виртуального роутера Cisco CSR1000v
    • Установка виртуального роутера Mikrotik CHR
    • Соединение с облачной сетью при помощи OpenVPN
    • Настройка сети для Yandex Data Proc
  • Визуализация и анализ данных
    • Все руководства
    • Визуализация данных из CSV-файла
    • Создание и публикация диаграммы с картой Москвы из CSV-файла
    • Анализ продаж сети магазинов из БД ClickHouse
    • Анализ открытых данных ДТП на дорогах России
    • Анализ продаж и локаций пиццерий на данных из БД ClickHouse и Marketplace
    • Веб-аналитика с подключением к Яндекс Метрике
    • Веб-аналитика с расчетом воронок и когорт на данных Яндекс Метрики
    • Аналитика мобильного приложения на данных AppMetrica
    • Анализ статистики подкастов Яндекс Музыки (для авторов подкастов)
    • Визуализация данных с помощью SQL-чарта
    • Анализ customer journey мобильного приложения на данных AppMetrica
    • Анализ логов Object Storage при помощи DataLens
  • Интернет вещей
    • Руководства по работе с интернетом вещей
    • Мониторинг состояния географически распределенных устройств
    • Мониторинг показаний датчиков и уведомления о событиях
  • Бессерверные технологии
    • Сокращатель ссылок
    • Ввод данных в системы хранения
    • Хранение журналов работы приложения
  1. Построение Data Platform
  2. Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Debezium

Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Debezium

Статья создана
Yandex Cloud
  • Перед началом работы
  • Подготовьте кластер-источник
  • Настройте Debezium
  • Подготовьте кластер-приемник
  • Запустите Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в Managed Service for PostgreSQL и отправлять их в Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Далее будет продемонстрировано на примере, как настроить CDC с помощью Debezium.

Перед началом работы

  1. Создайте кластер-источник Managed Service for PostgreSQL:

    • с хостами в публичном доступе;
    • с базой данных db1;
    • с пользователем user1.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей вам конфигурации с хостами в публичном доступе.

  3. Создайте виртуальную машину для Debezium с Ubuntu 20.04 и публичным IP-адресом.

  4. Настройте группы безопасности так, чтобы к кластерам можно было подключиться из созданной виртуальной машины:

    • Managed Service for PostgreSQL;
    • Managed Service for Apache Kafka®.
  5. Подключитесь к виртуальной машине по SSH и установите зависимости:

    sudo apt update && \
    sudo apt install -y git docker.io kafkacat postgresql-client
    
  6. Подключитесь к виртуальной машине по SSH и проверьте, что доступны кластеры:

    • Managed Service for PostgreSQL (используйте утилиту psql);
    • Managed Service for Apache Kafka® (используйте утилиту kafkacat).

Подготовьте кластер-источник

Managed Service for PostgreSQL
  1. Назначьте пользователю user1 роль mdb_replication.

    Это нужно для создания публикации (publication), с помощью которой Debezium будет отслеживать изменения в кластере Managed Service for PostgreSQL.

  2. Подключитесь к базе данных db1 кластера Managed Service for PostgreSQL от имени пользователя user1.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.

    Создайте таблицу:

    CREATE TABLE public.measurements(
      "device_id" TEXT PRIMARY KEY NOT NULL,
      "datetime" TIMESTAMP NOT NULL,
      "latitude" REAL NOT NULL,
      "longitude" REAL NOT NULL,
      "altitude" REAL NOT NULL,
      "speed" REAL NOT NULL,
      "battery_voltage" REAL,
      "cabin_temperature" REAL NOT NULL,
      "fuel_level" REAL
    );
    

    Наполните таблицу данными:

    INSERT INTO public.measurements VALUES
      ('iv9a94th6rztooxh5ur2', '2020-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
      ('rhibbh3y08qmz3sdbrbu', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
      ('iv9a94th678tooxh5ur2', '2020-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
    
  4. Создайте публикацию для добавленной таблицы:

    CREATE PUBLICATION mpg_publication FOR TABLE public.measurements;
    

Настройте Debezium

  1. Подключитесь к виртуальной машине по SSH.

  2. Склонируйте репозиторий:

    cd ~/ && \
    git clone https://github.com/yandex-cloud/examples.git
    
  3. Скачайте и распакуйте актуальный Debezium-коннектор в директорию ~/examples/mdb/managed-kafka/debezium-cdc/plugins/.

    Ниже приведен пример для версии 1.6.0. Нужные команды зависят от типа кластера-источника:

    Managed Service for PostgreSQL
    mkdir  ~/examples/mdb/managed-kafka/debezium-cdc/plugins/ && \
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0.Final-plugin.tar.gz && \
    tar -xzvf debezium-connector-postgres-1.6.0.Final-plugin.tar.gz -C ~/examples/mdb/managed-kafka/debezium-cdc/plugins/
    
  4. Соберите образ Docker:

    cd ~/examples/mdb/managed-kafka/debezium-cdc && \
    sudo docker build --tag debezium ./
    
  5. Создайте файл ~/examples/mdb/managed-kafka/debezium-cdc/mdb-connector.properties с настройками Debezium для подключения к кластеру-источнику:

    Managed Service for PostgreSQL
    name=debezium-mpg
    connector.class=io.debezium.connector.postgresql.PostgresConnector
    plugin.name=pgoutput
    database.hostname=c-<идентификатор кластера>.rw.mdb.yandexcloud.net
    database.port=6432
    database.user=user1
    database.password=<пароль пользователя user1>
    database.dbname=db1
    database.server.name=mpg
    table.include.list=public.measurements
    publication.name=mpg_publication
    slot.name=debezium_slot
    heartbeat.interval.ms=15000
    heartbeat.topics.prefix=__debezium-heartbeat
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

    Здесь:

    • name — имя коннектора Debezium.
    • database.hostname — особый FQDN для подключения к хосту-мастеру кластера-источника.
    • database.user — имя пользователя PostgreSQL.
    • database.dbname — имя базы данных PostgreSQL.
    • database.server.name — произвольное имя сервера баз данных, которое Debezium будет использовать при выборе топика для отправки сообщений.
    • table.include.list — список имен таблиц, для которых Debezium должен отслеживать изменения. Укажите полные имена, включающие в себя имя схемы (по умолчанию public). Debezium будет использовать значения настроек из этого поля при выборе топика для отправки сообщений.
    • publication.name — имя публикации, созданной на кластере-источнике.
    • slot.name — имя слота репликации, который будет создан Debezium при работе с публикацией.
    • heartbeat.interval.ms и heartbeat.topics.prefix — настройки heartbeat, необходимые для работы Debezium.

Подготовьте кластер-приемник

Настройки кластера-приемника зависят от типа кластера-источника:

Managed Service for PostgreSQL
  1. Создайте топик mpg.public.measurements, в который будут помещаться данные, поступающие от кластера-источника.

    Имена топиков для данных конструируются по принципу: <имя сервера>.<имя схемы>.<имя таблицы>.

    Согласно файлу настроек Debezium:

    • Имя сервера mpg указано в параметре database.server.name.
    • Имя схемы public указано вместе с именем таблицы measurements в параметре table.include.list.

    Если необходимо получить данные из нескольких таблиц, создайте для каждой из них отдельный топик.

  2. Создайте служебный топик __debezium-heartbeat.mpg с политикой очистки лога compact.

    Имена служебных топиков конструируются по принципу: <префикс для heartbeat>.<имя сервера>.

    Согласно файлу настроек Debezium:

    • Префикс __debezium-heartbeat указан в параметре heartbeat.topics.prefix.
    • Имя сервера mpg указано в параметре database.server.name.

    Если необходимо получить данные из нескольких кластеров-источников, создайте для каждого из них отдельный служебный топик.

  3. Создайте учетную запись debezium.

    Выдайте ей следующие права на созданные топики:

    • ACCESS_ROLE_CONSUMER
    • ACCESS_ROLE_PRODUCER

Запустите Debezium

  1. Укажите в файле ~/examples/mdb/managed-kafka/debezium-cdc/.env переменные окружения, используемые для доступа к кластеру-приемнику:

    BROKERS=<FQDN хоста-брокера>:9091
    USER=debezium
    PASSWORD=<пароль пользователя debezium>
    

    Список FQDN хостов-брокеров можно запросить со списком хостов в кластере.

  2. Запустите контейнер с собранным Docker-образом:

    cd ~/examples/mdb/managed-kafka/debezium-cdc/ && \
    sudo docker run --name debezium --rm --env-file .env \
      -v ~/examples/mdb/managed-kafka/debezium-cdc/plugins:/home/appuser/plugins \
      -v ~/examples/mdb/managed-kafka/debezium-cdc/mdb-connector.properties:/home/appuser/config/connector.properties \
      debezium:latest
    

    Контейнер будет непрерывно передавать новые данные из кластера-источника в кластер-приемник.

Проверьте работоспособность Debezium

  1. Подключитесь к кластеру-приемнику, например, с помощью kafkacat, и проверьте, что данные поступают.

    В сообщениях топика mpg.public.measurements должны присутствовать данные и схема их формата.

    Пример фрагмента сообщения
    {
    "schema": {
        ...
    },
    "payload": {
        "before": null,
        "after": {
            "device_id": "iv9a94th6rztooxh5ur2",
            "datetime": 1591378020000000,
            "latitude": 55.70329,
            "longitude": 37.65472,
            "altitude": 427.5,
            "speed": 0.0,
            "battery_voltage": 23.5,
            "cabin_temperature": 17.0,
            "fuel_level": null
        },
        "source": {
            "version": "1.6.0.Final",
            "connector": "postgresql",
            "name": "mpg",
            "ts_ms": 1628245046882,
            "snapshot": "true",
            "db": "db1",
            "sequence": "[null,\"4328525512\"]",
            "schema": "public",
            "table": "measurements",
            "txId": 8861,
            "lsn": 4328525328,
            "xmin": null
        },
        "op": "r",
        "ts_ms": 1628245046893,
        "transaction": null
      }
    }
    
  2. Добавьте данные в таблицу на кластере-источнике:

    Managed Service for PostgreSQL
    1. Подключитесь к кластеру.

    2. Добавьте еще одну строку в таблицу measurements:

      INSERT INTO public.measurements VALUES ('iv7b74th678tooxh5ur2', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
      
  3. Подключитесь к кластеру-приемнику и убедитесь, что в топик mpg.public.measurements попали новые данные.

Удалите созданные ресурсы

Если созданные ресурсы вам больше не нужны, удалите их:

  1. Удалите виртуальную машину.

  2. Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.

  3. Удалите кластеры:

    • Managed Service for PostgreSQL;
    • Managed Service for Apache Kafka®.

Была ли статья полезна?

Language / Region
© 2022 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Подготовьте кластер-источник
  • Настройте Debezium
  • Подготовьте кластер-приемник
  • Запустите Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы