Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
Yandex Managed Service for MySQL®
  • Начало работы
  • Пошаговые инструкции
    • Все инструкции
    • Информация об имеющихся кластерах
    • Создание кластера
    • Подключение к базе данных
    • Остановка и запуск кластера
    • SQL-запросы в консоли управления
    • Изменение кластера
    • Подключение к DataLens
    • Управление хостами MySQL
    • Управление базами данных
    • Управление пользователями
    • Управление правами пользователей
    • Управление резервными копиями
    • Просмотр логов кластера
    • Удаление кластера
    • Диагностика производительности
    • Мониторинг состояния кластера и хостов
  • Концепции
    • Взаимосвязь ресурсов сервиса
    • Классы хостов
      • Действующие классы хостов
      • Архив
        • До 1 июня 2020 года
      • Использование устаревших классов хостов
    • Сеть в Managed Service for MySQL
    • Квоты и лимиты
    • Типы дисков
    • Резервные копии
    • Репликация
    • Техническое обслуживание
    • Права пользователей
    • Настройки MySQL
    • Ограничения для команд SQL
  • Практические руководства
    • Все сценарии
    • Анализ производительности и оптимизация Managed Service for MySQL
    • Выгрузка базы данных в Yandex Data Proc
    • Миграция базы данных из стороннего кластера MySQL
    • Миграция базы данных из Managed Service for MySQL в MySQL
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium
    • Миграция базы данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Миграция базы данных из Managed Service for MySQL в Yandex Object Storage
  • Управление доступом
  • Правила тарификации
    • Действующие правила
    • Архив
      • До 1 февраля 2020 года
  • Справочник API
    • Аутентификация в API
    • gRPC (англ.)
      • Overview
      • BackupService
      • ClusterService
      • DatabaseService
      • ResourcePresetService
      • UserService
      • OperationService
    • REST (англ.)
      • Overview
      • Backup
        • Overview
        • get
        • list
      • Cluster
        • Overview
        • addHosts
        • backup
        • create
        • delete
        • deleteHosts
        • get
        • list
        • listBackups
        • listHosts
        • listLogs
        • listOperations
        • move
        • rescheduleMaintenance
        • restore
        • start
        • startFailover
        • stop
        • streamLogs
        • update
        • updateHosts
      • Database
        • Overview
        • create
        • delete
        • get
        • list
      • ResourcePreset
        • Overview
        • get
        • list
      • User
        • Overview
        • create
        • delete
        • get
        • grantPermission
        • list
        • revokePermission
        • update
      • Operation
        • Overview
        • get
  • История изменений
  • Вопросы и ответы
    • Общие вопросы
    • Вопросы о MySQL
    • Подключение
    • Проблемы с чтением/записью в кластер
    • Проблемы с производительностью
    • Изменение кластера
    • Мониторинг и логи
    • Миграция/перенос
    • Настройки параметров MySQL
    • Все вопросы на одной странице
  1. Практические руководства
  2. Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium

Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium

Статья создана
Yandex Cloud
  • Перед началом работы
  • Подготовка кластера-источника
  • Настройте коннектор Debezium
  • Подготовьте кластер-приемник
  • Запустите коннектор Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в Managed Service for MySQL и отправлять их в Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Из этой статьи вы узнаете, как создать в Yandex Cloud виртуальную машину и настроить на ней Debezium — программное обеспечение, используемое для CDC.

Перед началом работы

  1. Создайте кластер-источник со следующими настройками:

    • с хостами в публичном доступе;
    • с базой данных db1;
    • с пользователем user1.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

  3. Создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.

  4. Настройте группы безопасности так, чтобы к кластерам можно было подключаться из интернета и созданной виртуальной машины, а к ней — из интернета по SSH:

    • Настройка групп безопасности кластера Managed Service for Apache Kafka®.
    • Настройка групп безопасности кластера Managed Service for MySQL.
  5. Подключитесь к виртуальной машине по SSH и выполните ее предварительную настройку:

    1. Установите зависимости:

      sudo apt update && \
          sudo apt install kafkacat openjdk-17-jre mysql-client --yes
      
    2. Создайте директорию для Apache Kafka®:

      sudo mkdir -p /opt/kafka/
      
    3. Скачайте и распакуйте в эту директорию архив с исполняемыми файлами Apache Kafka®. Например, для загрузки и распаковки Apache Kafka® версии 3.0 выполните команду:

      wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \
      sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
      

      Актуальную версию Apache Kafka® уточняйте на странице загрузок проекта.

    4. Установите на виртуальную машину сертификаты и убедитесь в доступности кластеров:

      • Managed Service for Apache Kafka® (используйте утилиту kafkacat).
      • Managed Service for MySQL (используйте утилиту mysql).
    5. Создайте директорию, в которой будут храниться файлы, необходимые для работы коннектора Debezium:

      sudo mkdir -p /etc/debezium/plugins/
      
    6. Чтобы коннектор Debezium мог подключаться к хостам-брокерам Managed Service for Apache Kafka®, добавьте SSL-сертификат в защищенное хранилище сертификатов Java (Java Key Store). Для дополнительной защиты хранилища в параметре -storepass укажите пароль длиной не меньше 6 символов:

      sudo keytool \
          -importcert \
          -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
          -keystore /etc/debezium/keystore.jks \
          -storepass <пароль JKS> \
          --noprompt
      

Подготовка кластера-источника

  1. Назначьте пользователю user1 глобальные привилегии REPLICATION CLIENT и REPLICATION SLAVE.

  2. Подключитесь к базе данных db1 от имени пользователя user1.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.

    1. Создайте таблицу:

      CREATE TABLE measurements (
        `device_id` VARCHAR(32) PRIMARY KEY NOT NULL,
        `datetime` TIMESTAMP NOT NULL,
        `latitude` REAL NOT NULL,
        `longitude` REAL NOT NULL,
        `altitude` REAL NOT NULL,
        `speed` REAL NOT NULL,
        `battery_voltage` REAL,
        `cabin_temperature` REAL NOT NULL,
        `fuel_level` REAL
      );
      
    2. Наполните таблицу данными:

      INSERT INTO measurements VALUES
        ('iv9a94th6rztooxh5ur2', '2020-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
        ('rhibbh3y08qmz3sdbrbu', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
        ('iv9a94th678tooxh5ur2', '2020-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
      

Настройте коннектор Debezium

  1. Подключитесь к виртуальной машине по SSH.

  2. Скачайте и распакуйте актуальный Debezium-коннектор в директорию /etc/debezium/plugins/.

    Актуальную версию коннектора уточняйте на странице проекта. Ниже приведены команды для версии 1.9.4.Final.

    VERSION="1.9.4.Final"
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/${VERSION}/debezium-connector-mysql-${VERSION}-plugin.tar.gz && \
    sudo tar -xzvf debezium-connector-mysql-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
    
  3. Создайте файл /etc/debezium/mdb-connector.conf с настройками коннектора Debezium для подключения к кластеру-источнику:

    name=debezium-mmy
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=c-<идентификатор кластера>.rw.mdb.yandexcloud.net
    database.port=3306
    database.user=user1
    database.password=<пароль пользователя user1>
    database.dbname=db1
    database.server.name=mmy
    database.ssl.mode=required_identity
    table.include.list=db1.measurements
    heartbeat.interval.ms=15000
    heartbeat.topics.prefix=__debezium-heartbeat
    
    snapshot.mode=never
    include.schema.changes=false
    database.history.kafka.topic=dbhistory.mmy
    database.history.kafka.bootstrap.servers=<FQDN хоста-брокера 1>:9091,...,<FQDN хоста-брокера N>:9091
    
    # Producer settings
    database.history.producer.security.protocol=SSL
    database.history.producer.ssl.truststore.location=/etc/debezium/keystore.jks
    database.history.producer.ssl.truststore.password=<пароль JKS>
    database.history.producer.sasl.mechanism=SCRAM-SHA-512
    database.history.producer.security.protocol=SASL_SSL
    database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль пользователя debezium>";
    
    # Consumer settings
    database.history.consumer.security.protocol=SSL
    database.history.consumer.ssl.truststore.location=/etc/debezium/keystore.jks
    database.history.consumer.ssl.truststore.password=<пароль JKS>
    database.history.consumer.sasl.mechanism=SCRAM-SHA-512
    database.history.consumer.security.protocol=SASL_SSL
    database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль пользователя debezium>";
    

    Здесь:

    • name — логическое имя коннектора Debezium. Используется для внутренних нужд коннектора.

    • database.hostname — особый FQDN для подключения к хосту-мастеру кластера-источника.

      Идентификатор кластера можно получить со списком кластеров в каталоге.

    • database.user — имя пользователя MySQL.

    • database.dbname — имя базы данных MySQL.

    • database.server.name — имя сервера баз данных, которое Debezium будет использовать при выборе топика для отправки сообщений.

    • table.include.list — имена таблиц, для которых Debezium должен отслеживать изменения. Укажите полные имена, включающие в себя имя базы данных (db1). Debezium будет использовать значения настроек из этого поля при выборе топика для отправки сообщений.

    • heartbeat.interval.ms и heartbeat.topics.prefix — настройки heartbeat, необходимые для работы Debezium.

    • database.history.kafka.topic — имя служебного топика, используемого коннектором для отправки уведомлений об изменениях в схеме данных кластера-источника.

Подготовьте кластер-приемник

  1. Создайте топик, в который будут помещаться данные, поступающие от кластера-источника:

    • Имя — mmy.db1.measurements.

      Имена топиков для данных конструируются по принципу <имя сервера>.<имя базы данных>.<имя таблицы>.

      Согласно файлу настроек коннектора Debezium:

      • Имя сервера mmy указано в параметре database.server.name.
      • Имя базы данных db1 указано вместе с именем таблицы measurements в параметре table.include.list.

    Если необходимо отслеживать изменения в нескольких таблицах, создайте для каждой из них отдельный топик.

  2. Создайте служебный топик для отслеживания состояния коннектора:

    • Имя — __debezium-heartbeat.mmy.

      Имена служебных топиков конструируются по принципу <префикс для heartbeat>.<имя сервера>.

      Согласно файлу настроек коннектора Debezium:

      • Префикс __debezium-heartbeat указан в параметре heartbeat.topics.prefix.
      • Имя сервера mmy указано в параметре database.server.name.
    • Политика очистки лога — Compact.

    Если необходимо получать данные из нескольких кластеров-источников, создайте для каждого из них отдельный служебный топик.

  3. Создайте служебный топик для отслеживания изменений в схеме формата данных:

    • Имя — dbhistory.mmy.
    • Политика очистки лога — Delete.
    • Количество разделов — 1.
  4. Создайте пользователя с именем debezium.

  5. Выдайте пользователю debezium права ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER на созданные топики.

Запустите коннектор Debezium

  1. Создайте файл с настройками воркера Debezium:

    /etc/debezium/worker.conf

    # AdminAPI connect properties
    bootstrap.servers=<FQDN хоста-брокера 1>:9091,...,<FQDN хоста-брокера N>:9091
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/debezium/keystore.jks
    ssl.truststore.password=<пароль JKS>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль пользователя debezium>";
    
    # Producer connect properties
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.security.protocol=SASL_SSL
    producer.ssl.truststore.location=/etc/debezium/keystore.jks
    producer.ssl.truststore.password=<пароль JKS>
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль пользователя debezium>";
    
    # Worker properties
    plugin.path=/etc/debezium/plugins/
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/etc/debezium/worker.offset
    
  2. В отдельном терминале запустите коннектор:

    sudo /opt/kafka/bin/connect-standalone.sh \
        /etc/debezium/worker.conf \
        /etc/debezium/mdb-connector.properties
    

Проверьте работоспособность Debezium

  1. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN хоста-брокера-1>:9091,...,<FQDN хоста-брокера N>:9091 \
        -t mmy.db1.measurements \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=debezium \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt \
        -Z \
        -K:
    

    Будет выведена схема формата данных таблицы db1.measurements и данные о добавленных в нее ранее строках.

    Пример фрагмента сообщения
    {
    "schema": {
        ...
    },
    "payload": {
        "before": null,
        "after": {
            "device_id": "iv9a94th6rztooxh5ur2",
            "datetime": 1591378020000000,
            "latitude": 55.70329,
            "longitude": 37.65472,
            "altitude": 427.5,
            "speed": 0.0,
            "battery_voltage": 23.5,
            "cabin_temperature": 17.0,
            "fuel_level": null
        },
        "source": {
            "version": "1.8.1.Final",
            "connector": "mysql",
            "name": "mmy",
            "ts_ms": 1628245046882,
            "snapshot": "true",
            "db": "db1",
            "sequence": "[null,\"4328525512\"]",
            "table": "measurements",
            "txId": 8861,
            "lsn": 4328525328,
            "xmin": null
        },
        "op": "r",
        "ts_ms": 1628245046893,
        "transaction": null
      }
    }
    
  2. Подключитесь к кластеру-источнику и добавьте еще одну строку в таблицу measurements:

    INSERT INTO measurements VALUES ('iv7b74th678tooxh5ur2', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
    
  3. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились сведения о добавленной строке.

Удалите созданные ресурсы

Если созданные ресурсы вам больше не нужны, удалите их:

  1. Удалите виртуальную машину.

    Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, освободите и удалите его.

  2. Удалите кластеры:

    • Managed Service for Apache Kafka®.
    • Managed Service for MySQL.

Была ли статья полезна?

Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Подготовка кластера-источника
  • Настройте коннектор Debezium
  • Подготовьте кластер-приемник
  • Запустите коннектор Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы