Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
Примечание
Managed Service for Apache Kafka® имеет встроенную поддержку некоторых коннекторов и позволяет управлять ими. Список доступных коннекторов приведен в разделе Коннекторы. Если вам нужны другие коннекторы, или вы хотите управлять работой Kafka Connect вручную, используйте информацию из этого руководства.
Инструмент Kafka Connect предназначен для перемещения данных между Apache Kafka® и другими хранилищами данных.
Работа с данными в Kafka Connect осуществляется с помощью процессов-исполнителей (workers). Инструмент может быть развернут как в виде распределенной системы (distributed mode) с несколькими процессами-исполнителями, так и в виде отдельной инсталляции из одного процесса-исполнителя (standalone mode).
Непосредственно перемещение данных выполняется с помощью коннекторов, которые запускаются в отдельных потоках процесса-исполнителя.
Подробнее о Kafka Connect см. в документации Apache Kafka®.
Далее будет продемонстрировано, как настроить Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®. Инструмент будет развернут на виртуальной машине Yandex Cloud в виде отдельной инсталляции. Для защиты подключения будет использоваться SSL-шифрование.
Также будет настроен простой коннектор FileStreamSource, с помощью которого Kafka Connect прочитает данные из тестового JSON-файла и передаст их в топик кластера.
Примечание
Вы можете использовать любой другой коннектор Kafka Connect для взаимодействия c кластером Managed Service for Apache Kafka®.
Чтобы настроить Kafka Connect для работы с кластером Managed Service for Apache Kafka®:
- Настройте виртуальную машину.
- Настройте Kafka Connect.
- Запустите Kafka Connect и проверьте его работу.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.
- Создайте топик с именем
messages
для обмена сообщениями между Kafka Connect и кластером Managed Service for Apache Kafka®. - Создайте учетную запись с именем
user
и выдайте ей права на топикmessages
:ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
.
- Создайте топик с именем
-
В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину c Ubuntu 20.04 и публичным IP-адресом.
-
Подготовьте виртуальную машину к работе с Kafka Connect:
Настройте виртуальную машину
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре
-storepass
для дополнительной защиты хранилища:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \ -keystore ssl -storepass <пароль хранилища сертификатов, не короче 6 символов> \ --noprompt
-
Создайте каталог с настройками процесса-исполнителя и скопируйте туда хранилище:
sudo mkdir -p /etc/kafka-connect-worker && \ sudo cp ssl /etc/kafka-connect-worker/client.truststore.jks
-
Создайте файл
/var/log/sample.json
с тестовыми данными. В этом файле приведены данные от сенсоров нескольких автомобилей в формате JSON:sample.json{"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-05 17:27:00","latitude":55.70329032,"longitude":37.65472196,"altitude":427.5,"speed":0,"battery_voltage":23.5,"cabin_temperature":17,"fuel_level":null} {"device_id":"rhibbh3y08qmz3sdbrbu","datetime":"2020-06-06 09:49:54","latitude":55.71294467,"longitude":37.66542005,"altitude":429.13,"speed":55.5,"battery_voltage":null,"cabin_temperature":18,"fuel_level":32} {"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-07 15:00:10","latitude":55.70985913,"longitude":37.62141918,"altitude":417,"speed":15.7,"battery_voltage":10.3,"cabin_temperature":17,"fuel_level":null}
Настройте Kafka Connect
-
Создайте файл настроек процесса-исполнителя
/etc/kafka-connect-worker/worker.properties
:worker.properties# AdminAPI connect properties bootstrap.servers=<FQDN хоста-брокера>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks ssl.truststore.password=<пароль к хранилищу сертификата> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль учетной записи user>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks producer.ssl.truststore.password=<пароль к хранилищу сертификата> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль учетной записи user>"; # Worker properties plugin.path=/etc/kafka-connect-worker/plugins key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/kafka-connect-worker/worker.offset
Kafka Connect будет подключаться к кластеру Managed Service for Apache Kafka® от имени учетной записи
user
, созданной ранее.FQDN хостов-брокеров можно запросить со списком хостов в кластере.
-
Создайте файл настроек коннектора
/etc/kafka-connect-worker/file-connector.properties
:name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/var/log/sample.json topic=messages
Где:
file
— имя файла, из которого коннектор будет читать данные.topic
— имя топика в кластере Managed Service for Apache Kafka®, куда конектор будет передавать данные.
Запустите Kafka Connect и проверьте его работу
-
Чтобы отправить тестовые данные в кластер, запустите процесс-исполнитель на виртуальной машине:
cd ~/kafka_2.12-2.6.2/bin/ && \ sudo ./connect-standalone.sh \ /etc/kafka-connect-worker/worker.properties \ /etc/kafka-connect-worker/file-connector.properties
-
Подключитесь к кластеру с помощью Kafkacat и получите данные из топика кластера:
kafkacat -C \ -b <FQDN хоста-брокера>:9091 \ -t messages \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=user \ -X sasl.password="<пароль учетной записи user>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z -K:
FQDN хостов-брокеров можно запросить со списком хостов в кластере.
В выводе команды вы увидите содержимое тестового файла
/var/log/sample.json
, переданное на предыдущем шаге.
Удалите созданные ресурсы
Если созданные ресурсы вам больше не нужны, удалите их:
-
Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.