Передача данных из эндпоинта-источника Apache Kafka®
С помощью сервиса Yandex Data Transfer вы можете переносить данные из очереди Apache Kafka® и реализовывать различные сценарии переноса, обработки и трансформации данных. Для реализации трансфера:
- Ознакомьтесь с возможными сценариями передачи данных.
- Подготовьте базу данных Apache Kafka® к трансферу.
- Настройте эндпоинт-источник в Yandex Data Transfer.
- Настройте один из поддерживаемых приемников данных.
- Создайте и запустите трансфер.
- Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
- При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.
Сценарии передачи данных из Apache Kafka®
-
Миграция — перенос данных из одного хранилища в другое, часто, перенос базы в облако из устаревших локальных баз в управляемые облачные.
Отдельной задачей миграции является зеркалирование данных между очередями:
-
Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.
Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.
Подготовка базы данных источника
- Создайте пользователя с ролью
ACCESS_ROLE_CONSUMER
на топике-источнике.
-
Убедитесь, что настройки сети, в которой размещен кластер, разрешают подключение к нему из интернета с IP-адресов, используемых сервисом Data Transfer
. -
Настройте права доступа
пользователя к нужному топику. -
Выдайте права
READ
консьюмер-группе, идентификатор которой совпадает с идентификатором трансфера.bin/kafka-acls --bootstrap-server localhost:9092 \ --command-config adminclient-configs.conf \ --add \ --allow-principal User:username \ --operation Read \ --group <идентификатор_трансфера>
-
(Опционально) Чтобы использовать авторизацию по логину и паролю, настройте SASL-аутентификацию
.
Настройка эндпоинта-источника Apache Kafka®
При создании или изменении эндпоинта вы можете задать:
- Настройки подключения к кластеру Yandex Managed Service for Apache Kafka® или пользовательской инсталляции, в т. ч. на базе виртуальных машин Yandex Compute Cloud. Эти параметры обязательные.
- Расширенные настройки.
Кластер Managed Service for Apache Kafka®
Важно
Для создания или редактирования эндпоинта управляемой базы данных вам потребуется роль managed-kafka.viewer
или примитивная роль viewer
, выданная на каталог кластера этой управляемой базы данных.
Подключение с указанием идентификатора кластера в Yandex Cloud.
-
Кластер Managed Service for Apache Kafka — выберите кластер, к которому необходимо подключиться.
-
Аутентификация — выберите тип:
SASL
илиБез аутентификации
.При выборе значения
SASL
:- Имя пользователя — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
-
Полное имя топика — укажите имя топика, к которому необходимо подключиться.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Пользовательская инсталляция
Подключение к кластеру Apache Kafka® с явным указанием сетевых адресов и портов хостов-брокеров.
-
URL-адреса брокеров — укажите IP-адреса или FQDN хостов-брокеров.
Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
-
SSL — использовать шифрование для защиты соединения.
-
PEM-сертификат — если требуется шифрование передаваемых данных, например для соответствия требованиям PCI DSS
, загрузите файл сертификата или добавьте его содержимое в текстовом виде. -
Сетевой интерфейс для эндпоинта — выберите или создайте подсеть в нужной зоне доступности.
Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.
-
Аутентификация — выберите тип:
SASL
илиБез аутентификации
.При выборе значения
SASL
:- Пользователь — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
- Механизм — выберите механизм хеширования: SHA 256 или SHA 512.
-
Полное имя топика — укажите имя топика, к которому необходимо подключиться.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Расширенные настройки
В расширенных настройках вы можете задать правила трансформации и правила конвертации. Данные обрабатываются в следующем порядке:
-
Трансформация. Данные в формате JSON передаются функции Yandex Cloud Functions. В теле функции содержатся метаинформация и необработанные данные, которые переданы в очередь. С помощью функции данные обрабатываются и возвращаются в Data Transfer.
-
Конвертация. Выполняется парсинг, с помощью которого данные подготавливаются для передачи приемнику.
Если не заданы правила трансформации, то парсинг применяется к необработанным данным из очереди. Если не заданы правила конвертации, то данные сразу передаются в приемник.
-
Правила трансформации:
-
Функция обработки — выберите одну из функций, созданных в сервисе Cloud Functions.
-
Сервисный аккаунт — выберите или создайте сервисный аккаунт, от имени которого будет запускаться функция обработки.
-
Количество попыток — укажите количество попыток вызова функции обработки.
-
Размер буфера для отправки — укажите размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.
Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.
-
Интервал отправки — укажите длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.
Примечание
Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.
-
Таймаут вызова — укажите допустимое время ожидания ответа от функции обработки (в секундах).
Важно
Значения в полях Интервал отправки и Таймаут вызова указываются с постфиксом
s
, например,10s
. -
-
Правила конвертации:
- Формат данных — выберите один из доступных форматов:
-
JSON
— формат JSON. -
Парсер AuditTrails.v1
— формат логов сервиса Audit Trails. -
Парсер CloudLogging
— формат логов сервиса Cloud Logging. -
Парсер Debezium CDC
— Debezium CDC. Позволяет в настройках указать Confluent Schema Registry .Для формата JSON укажите:
- Схема данных — задайте схему в виде списка полей или загрузите файл с описанием схемы в формате JSON.
Пример задания схемы данных[ { "name": "request", "type": "string" } ]
- Использовать значение NULL в ключевых столбцах — выберите эту опцию, чтобы разрешить значение
null
в ключевых колонках. - Добавить неразмеченные столбцы — выберите эту опцию, чтобы поля, отсутствующие в схеме, попадали в колонку
_rest
. - Разэкранировать значения строк — выберите эту опцию, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
Для Debezium CDC укажите: URL для Schema Registry, способ аутентификации (с указанием логина и пароля пользователя в случае использования аутентификации) и CA-сертификат.
-
- Формат данных — выберите один из доступных форматов:
Настройка приемника данных
Настройте один из поддерживаемых приемников данных:
- PostgreSQL;
- MySQL;
- MongoDB
- ClickHouse®;
- Greenplum®;
- Yandex Managed Service for YDB;
- Yandex Object Storage;
- Apache Kafka®;
- YDS;
- Elasticsearch;
- OpenSearch.
Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.
После настройки источника и приемника данных создайте и запустите трансфер.