Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
  • Пошаговые инструкции
    • Все инструкции
    • Подготовка к трансферу
    • Настройка эндпоинтов
      • Управление эндпоинтами
      • Настройка эндпоинтов-источников
        • Apache Kafka®
        • AWS CloudTrail
        • BigQuery
        • ClickHouse
        • Eventhub
        • Greenplum®
        • MongoDB
        • MySQL
        • Oracle
        • PostgreSQL
        • S3
        • Yandex Data Streams
        • Yandex Managed Service for YDB
      • Настройка эндпоинтов-приемников
        • Apache Kafka®
        • ClickHouse
        • Greenplum®
        • MongoDB
        • MySQL
        • Object Storage
        • PostgreSQL
        • Yandex Managed Service for YDB
    • Управление трансфером
    • Работа с базами данных во время трансфера
    • Мониторинг состояния трансфера
  • Практические руководства
    • Все руководства
    • Перенос данных между кластерами Yandex Managed Service for Apache Kafka®
    • Поставка данных из Yandex Managed Service for Apache Kafka® в Yandex Managed Service for ClickHouse
    • Поставка данных из Yandex Managed Service for Apache Kafka® в Yandex Managed Service for YDB
    • Перенос базы данных в Yandex Managed Service for ClickHouse
    • Миграция базы данных в Yandex Managed Service for Greenplum®
    • Миграция базы данных из Greenplum® в ClickHouse
    • Миграция базы данных из Greenplum® в PostgreSQL
    • Миграция базы данных в Yandex Managed Service for MongoDB
    • Миграция базы данных в Yandex Managed Service for MySQL
    • Миграция базы данных из MySQL в ClickHouse
    • Поставка данных из Yandex Managed Service for MySQL в Yandex Managed Service for Apache Kafka®
    • Миграция базы данных из Yandex Managed Service for MySQL в MySQL
    • Миграция базы данных из Yandex Managed Service for MySQL в Yandex Object Storage
    • Миграция базы данных из Yandex Managed Service for MySQL в Yandex Managed Service for YDB
    • Миграция базы данных в Yandex Managed Service for PostgreSQL
    • Миграция базы данных из PostgreSQL в ClickHouse
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka®
    • Миграция базы данных из Yandex Managed Service for PostgreSQL в Yandex Object Storage
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for YDB
    • Сохранение потока данных Yandex Data Streams в ClickHouse
    • Миграция данных из Yandex Data Streams в Yandex Object Storage
  • Концепции
    • Взаимосвязь ресурсов сервиса
    • Типы и жизненные циклы трансферов
    • Объекты, переносимые трансфером
    • Работа Yandex Data Transfer с источниками и приемниками
    • Операции над трансфером
    • Сеть в Yandex Data Transfer
    • Скорость копирования данных в Yandex Data Transfer
    • Захват изменения данных
    • Шардированное копирование
    • Какие задачи решает сервис
    • Квоты и лимиты
  • Решение проблем
  • Управление доступом
  • Правила тарификации
  • Справочник API
    • Аутентификация в API
    • gRPC (англ.)
      • Overview
      • EndpointService
      • TransferService
      • OperationService
    • REST (англ.)
      • Overview
      • Endpoint
        • Overview
        • create
        • delete
        • get
        • list
        • update
      • Transfer
        • Overview
        • activate
        • create
        • deactivate
        • delete
        • get
        • list
        • update
  • Вопросы и ответы
  1. Практические руководства
  2. Поставка данных из Yandex Managed Service for MySQL в Yandex Managed Service for Apache Kafka®

Поставка данных из Yandex Managed Service for MySQL в Yandex Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
    • Особенности поставки данных с помощью Data Transfer
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в кластере-источнике Managed Service for MySQL и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Чтобы настроить CDC с использованием сервиса Data Transfer:

  1. Подготовьте кластер-источник.
  2. Подготовьте кластер-приемник.
  3. Подготовьте и активируйте трансфер.
  4. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Перед началом работы

  1. Создайте кластер-источник Managed Service for MySQL любой подходящей конфигурации со следующими настройками:

    • с базой данных db1;
    • с пользователем my-user;
    • с хостами в публичном доступе.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

  3. Настройте группы безопасности кластеров, чтобы к ним можно было подключаться из интернета:

    • Инструкция для Managed Service for MySQL.
    • Инструкция для Managed Service for Apache Kafka®.
  4. Установите на локальный компьютер утилиту kcat (kafkacat) и утилиту командной строки MySQL. Например, в Ubuntu 20.04 выполните команду:

    sudo apt update && sudo apt install kafkacat mysql-client --yes
    

Подготовьте кластер-источник

  1. Чтобы сервис Data Transfer мог получать от кластера Managed Service for MySQL уведомления об изменениях в данных, в кластере-источнике необходимо настроить внешнюю репликацию. Чтобы пользователь my-user мог выполнять репликацию, назначьте ему роль ALL_PRIVILEGES для базы данных db1 и выдайте глобальные привилегии REPLICATION CLIENT и REPLICATION SLAVE.

  2. Подключитесь к базе данных db1 от имени пользователя my-user.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию, поступающую от некоторых датчиков автомобиля.

    Создайте таблицу:

    CREATE TABLE db1.measurements (
        device_id varchar(200) NOT NULL,
        datetime timestamp NOT NULL,
        latitude real NOT NULL,
        longitude real NOT NULL,
        altitude real NOT NULL,
        speed real NOT NULL,
        battery_voltage real,
        cabin_temperature real NOT NULL,
        fuel_level real,
        PRIMARY KEY (device_id)
    );
    

    Наполните таблицу данными:

    INSERT INTO db1.measurements VALUES
        ('iv9a94th6rztooxh5ur2', '2022-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
        ('rhibbh3y08qmz3sdbrbu', '2022-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32);
    

Подготовьте кластер-приемник

Настройки различаются в зависимости от используемого способа управления топиками. При этом имена топиков для данных формируются по следующему принципу — <префикс топика>.<имя схемы>.<имя таблицы>. В этом руководстве в качестве примера будет использоваться префикс cdc.

Интерфейсы Yandex Cloud
Admin API

Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, YC CLI, Terraform, API):

  1. Создайте топик с именем cdc.db1.measurements.

    Для отслеживания изменений в нескольких таблицах создайте для каждой из них отдельный топик.

  2. Создайте пользователя с именем kafka-user и ролями ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER, действующими для созданных топиков. Чтобы включить все такие топики, укажите в имени топика cdc.*.

Если для управления топиками используется Kafka Admin API:

  1. Создайте пользователя-администратора с именем kafka-user.

  2. Помимо роли ACCESS_ROLE_ADMIN назначьте пользователю-администратору роли ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER для топиков, имена которых начинаются с префикса cdc.

    Необходимые топики будут созданы автоматически при первом событии изменения в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запас свободного места в хранилище кластера. Подробнее см. в разделе Типы дисков в Managed Service for Apache Kafka®.

Подготовьте и активируйте трансфер

  1. Создайте эндпоинт для источника MySQL с настройками:

    • Тип базы данных — MySQL.
    • Параметры эндпоинта:
      • Настройки подключения — Кластер MDB.
      • Кластер MDB — выберите созданный ранее кластер Managed Service for MySQL.
      • Имя базы данных — db1.
      • Имя пользователя — my-user.
      • Пароль — укажите пароль пользователя my-user.
      • Список включенных таблиц — db1.measurements.
  2. Создайте эндпоинт для приемника Apache Kafka® с настройками:

    • Тип базы данных — Kafka.

    • Параметры эндпоинта:

      • Подключение — Managed Kafka.

      • Managed Kafka:

        • ID кластера Managed Kafka — выберите созданный ранее кластер Managed Service for Apache Kafka®.
        • Аутентификация — укажите данные созданного ранее пользователя kafka-user.
      • Настройки топика Apache Kafka — Полное имя топика.

      • Полное имя топика — cdc.db1.measurements.

      Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:

      • Настройки топика Apache Kafka — Префикс топика.
      • Префикс топика — укажите префикс cdc, использованный при формировании имен топиков.
  3. Создайте трансфер типа Репликация с созданными эндпоинтами для источника и приемника.

  4. Активируйте трансфер и дождитесь его перехода в статус Реплицируется.

Проверьте работоспособность трансфера

  1. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN хоста-брокера-1>:9091,...,<FQDN хоста-брокера N>:9091 \
        -t cdc.db1.measurements \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=kafka-user \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt \
        -Z \
        -K:
    

    FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.

  2. Подключитесь к кластеру-источнику и добавьте данные в таблицу measurements:

    INSERT INTO db1.measurements VALUES
        ('iv7b74th678tooxh5ur2', '2022-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL),
        ('iv9a94th678tooxh5ur2', '2022-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
    
  3. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились схема формата данных таблицы db1.measurements и сведения о добавленных строках.

    Пример фрагмента сообщения
    {
        "payload": {
            "device_id": "iv7b74th678tooxh5ur2"
        },
        "schema": {
            "fields": [
                {
                    "field": "device_id",
                    "optional": false,
                    "type": "string"
                }
            ],
            "name": "cdc.db1.measurements.Key",
            "optional": false,
            "type": "struct"
        }
    }: {
        "payload": {
            "after": {
                "altitude": 378,
                "battery_voltage": 5.3,
                "cabin_temperature": 20,
                "datetime": "2020-06-08T17:45:00Z",
                "device_id": "iv7b74th678tooxh5ur2",
                "fuel_level": null,
                "latitude": 53.70987913,
                "longitude": 36.62549834,
                "speed": 20.5
            },
            "before": null,
            "op": "c",
            "source": {
                "connector": "mysql",
                "db": "db1",
                "file": "mysql-log.000016",
                "gtid": "1e46a80b-2e96-11ed-adf7-d00d18378058:1-98501",
                "name": "cdc",
                "pos": 1547357,
                "query": null,
                "row": 0,
                "server_id": 0,
                "snapshot": "false",
                "table": "measurements",
                "thread": null,
                "ts_ms": 1662632515000,
                "version": "1.1.2.Final"
            },
            "transaction": null,
            "ts_ms": 1662632515000
        },
        "schema": {
            "fields": [
                {
                    "field": "before",
                    "fields": [
                        {
                            "field": "device_id",
                            "optional": false,
                            "type": "string"
                        },
                        ...
                    ],
                    "name": "cdc.db1.measurements.Value",
                    "optional": true,
                    "type": "struct"
                },
                {
                    "field": "after",
                    "fields": [
                        ...
                    ],
                    "name": "cdc.db1.measurements.Value",
                    "optional": true,
                    "type": "struct"
                },
                {
                    "field": "source",
                    "fields": [
                        {
                            "field": "version",
                            "optional": false,
                            "type": "string"
                        },
                        ...
                    ],
                    "name": "io.debezium.connector.mysql.Source",
                    "optional": false,
                    "type": "struct"
                },
                {
                    "field": "op",
                    "optional": false,
                    "type": "string"
                },
                ...
            ],
            "name": "cdc.db1.measurements.Envelope",
            "optional": false,
            "type": "struct"
        }
    }
    

Особенности поставки данных с помощью Data Transfer

  • При переносе данных из MySQL в Apache Kafka® некоторые типы данных переносятся с изменениями:

    • тип tinyint(1) переносится как boolean;
    • тип real переносится как double;
    • тип bigint unsigned переносится как int64.
  • В блоке метаинформации об источнике payload.source параметры server_id и thread не заполняются.

Удалите созданные ресурсы

Если созданные ресурсы вам больше не нужны, удалите их:

  1. Деактивируйте и удалите трансфер.

  2. Удалите эндпоинты.

  3. Удалите кластеры:

    • Managed Service for Apache Kafka®;
    • Managed Service for MySQL.
  4. Если для доступа к хостам кластеров использовались статические публичные IP-адреса, освободите и удалите их.

Была ли статья полезна?

Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Особенности поставки данных с помощью Data Transfer
  • Удалите созданные ресурсы