Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
© 2022 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
  • Пошаговые инструкции
    • Все инструкции
    • Информация об имеющихся кластерах
    • Создание кластера
    • Подключение к кластеру
    • Остановка и запуск кластера
    • Обновление версии Apache Kafka®
    • Изменение настроек кластера
    • Управление хостами Apache Kafka®
    • Работа с топиками и разделами
    • Управление учетными записями Kafka
    • Управление коннекторами
    • Просмотр логов кластера
    • Удаление кластера
    • Мониторинг состояния кластера и хостов
  • Практические руководства
    • Все руководства
    • Поставка данных в Managed Service for ClickHouse
    • Настройка Kafka Connect для работы с Managed Service for Apache Kafka®
    • Поставка данных в ksqlDB
    • Использование схем формата данных с Managed Service for Apache Kafka®
      • Обзор
      • Работа с управляемым реестром схем формата данных
      • Работа с реестром схем формата данных Confluent
    • Миграция данных в Managed Service for Apache Kafka®
    • Поставка данных с помощью Debezium
  • Концепции
    • Взаимосвязь ресурсов сервиса
    • Топики и разделы
    • Брокеры
    • Производители и потребители
    • Управление схемами данных
    • Классы хостов
    • Сеть в Managed Service for Apache Kafka®
    • Квоты и лимиты
    • Типы хранилища
    • Коннекторы
    • Техническое обслуживание
    • Настройки Apache Kafka®
  • Управление доступом
  • Правила тарификации
  • Справочник API
    • Аутентификация в API
    • gRPC (англ.)
      • Overview
      • ClusterService
      • ConnectorService
      • ResourcePresetService
      • TopicService
      • UserService
      • OperationService
    • REST (англ.)
      • Overview
      • Cluster
        • Overview
        • create
        • delete
        • get
        • list
        • listHosts
        • listLogs
        • listOperations
        • move
        • rescheduleMaintenance
        • start
        • stop
        • streamLogs
        • update
      • Connector
        • Overview
        • create
        • delete
        • get
        • list
        • pause
        • resume
        • update
      • ResourcePreset
        • Overview
        • get
        • list
      • Topic
        • Overview
        • create
        • delete
        • get
        • list
        • update
      • User
        • Overview
        • create
        • delete
        • get
        • grantPermission
        • list
        • revokePermission
        • update
      • Operation
        • Overview
        • get
  • История изменений
  • Вопросы и ответы
  1. Практические руководства
  2. Настройка Kafka Connect для работы с Managed Service for Apache Kafka®

Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®

Статья создана
Yandex.Cloud
  • Перед началом работы
  • Настройте виртуальную машину
  • Настройте Kafka Connect
  • Запустите Kafka Connect и проверьте его работу
  • Удалите созданные ресурсы

Примечание

Managed Service for Apache Kafka® имеет встроенную поддержку некоторых коннекторов и позволяет управлять ими. Список доступных коннекторов приведен в разделе Коннекторы. Если вам нужны другие коннекторы, или вы хотите управлять работой Kafka Connect вручную, используйте информацию из этого руководства.

Инструмент Kafka Connect предназначен для перемещения данных между Apache Kafka® и другими хранилищами данных.

Работа с данными в Kafka Connect осуществляется с помощью процессов-исполнителей (workers). Инструмент может быть развернут как в виде распределенной системы (distributed mode) с несколькими процессами-исполнителями, так и в виде отдельной инсталляции из одного процесса-исполнителя (standalone mode).

Непосредственно перемещение данных выполняется с помощью коннекторов, которые запускаются в отдельных потоках процесса-исполнителя.

Подробнее о Kafka Connect см. в документации Apache Kafka®.

Далее будет продемонстрировано, как настроить Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®. Инструмент будет развернут на виртуальной машине Yandex Cloud в виде отдельной инсталляции. Для защиты подключения будет использоваться SSL-шифрование.

Также будет настроен простой коннектор FileStreamSource, с помощью которого Kafka Connect прочитает данные из тестового JSON-файла и передаст их в топик кластера.

Примечание

Вы можете использовать любой другой коннектор Kafka Connect для взаимодействия c кластером Managed Service for Apache Kafka®.

Чтобы настроить Kafka Connect для работы с кластером Managed Service for Apache Kafka®:

  1. Настройте виртуальную машину.
  2. Настройте Kafka Connect.
  3. Запустите Kafka Connect и проверьте его работу.

Если созданные ресурсы вам больше не нужны, удалите их.

Перед началом работы

  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.

    1. Создайте топик с именем messages для обмена сообщениями между Kafka Connect и кластером Managed Service for Apache Kafka®.
    2. Создайте учетную запись с именем user и выдайте ей права на топик messages:
      • ACCESS_ROLE_CONSUMER,
      • ACCESS_ROLE_PRODUCER.
  2. В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину c Ubuntu 20.04 и публичным IP-адресом.

  3. Подготовьте виртуальную машину к работе с Kafka Connect:

    1. Подключитесь к виртуальной машине по SSH.

    2. Установите JDK:

      sudo apt update && \
      sudo apt install openjdk-11-jre-headless --yes
      
    3. Скачайте и распакуйте архив с Apache Kafka®:

      wget https://downloads.apache.org/kafka/2.6.2/kafka_2.12-2.6.2.tgz && \
      tar -xvf kafka_2.12-2.6.2.tgz
      
    4. Установите утилиту Kafkacat:

      sudo apt install kafkacat
      

Настройте виртуальную машину

  1. Подключитесь к виртуальной машине по SSH.

  2. Получите SSL-сертификат.

  3. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре -storepass для дополнительной защиты хранилища:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
                 -keystore ssl -storepass <пароль хранилища сертификатов, не короче 6 символов> \
                 --noprompt
    
  4. Создайте каталог с настройками процесса-исполнителя и скопируйте туда хранилище:

    sudo mkdir -p /etc/kafka-connect-worker && \
    sudo cp ssl /etc/kafka-connect-worker/client.truststore.jks
    
  5. Создайте файл /var/log/sample.json с тестовыми данными. В этом файле приведены данные от сенсоров нескольких автомобилей в формате JSON:

    sample.json
    {"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-05 17:27:00","latitude":55.70329032,"longitude":37.65472196,"altitude":427.5,"speed":0,"battery_voltage":23.5,"cabin_temperature":17,"fuel_level":null}
    {"device_id":"rhibbh3y08qmz3sdbrbu","datetime":"2020-06-06 09:49:54","latitude":55.71294467,"longitude":37.66542005,"altitude":429.13,"speed":55.5,"battery_voltage":null,"cabin_temperature":18,"fuel_level":32}
    {"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-07 15:00:10","latitude":55.70985913,"longitude":37.62141918,"altitude":417,"speed":15.7,"battery_voltage":10.3,"cabin_temperature":17,"fuel_level":null}
    

Настройте Kafka Connect

  1. Создайте файл настроек процесса-исполнителя /etc/kafka-connect-worker/worker.properties:

    worker.properties
    # AdminAPI connect properties
    bootstrap.servers=<FQDN хоста-брокера>:9091
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
    ssl.truststore.password=<пароль к хранилищу сертификата>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль учетной записи user>";
    
    # Producer connect properties
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.security.protocol=SASL_SSL
    producer.ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
    producer.ssl.truststore.password=<пароль к хранилищу сертификата>
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль учетной записи user>";
    
    # Worker properties
    plugin.path=/etc/kafka-connect-worker/plugins
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/etc/kafka-connect-worker/worker.offset
    

    Kafka Connect будет подключаться к кластеру Managed Service for Apache Kafka® от имени учетной записи user, созданной ранее.

    FQDN хостов-брокеров можно запросить со списком хостов в кластере.

  2. Создайте файл настроек коннектора /etc/kafka-connect-worker/file-connector.properties:

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/var/log/sample.json
    topic=messages
    

    Где:

    • file — имя файла, из которого коннектор будет читать данные.
    • topic — имя топика в кластере Managed Service for Apache Kafka®, куда конектор будет передавать данные.

Запустите Kafka Connect и проверьте его работу

  1. Чтобы отправить тестовые данные в кластер, запустите процесс-исполнитель на виртуальной машине:

    cd ~/kafka_2.12-2.6.2/bin/ && \
    sudo ./connect-standalone.sh \
         /etc/kafka-connect-worker/worker.properties \
         /etc/kafka-connect-worker/file-connector.properties
    
  2. Подключитесь к кластеру с помощью Kafkacat и получите данные из топика кластера:

    kafkacat -C \
        -b <FQDN хоста-брокера>:9091 \
        -t messages \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=user \
        -X sasl.password="<пароль учетной записи user>" \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z -K:
    

    FQDN хостов-брокеров можно запросить со списком хостов в кластере.

    В выводе команды вы увидите содержимое тестового файла /var/log/sample.json, переданное на предыдущем шаге.

Удалите созданные ресурсы

Если созданные ресурсы вам больше не нужны, удалите их:

  • Удалите виртуальную машину.

  • Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.

  • Удалите кластер Managed Service for Apache Kafka®.

Была ли статья полезна?

Language / Region
© 2022 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Настройте виртуальную машину
  • Настройте Kafka Connect
  • Запустите Kafka Connect и проверьте его работу
  • Удалите созданные ресурсы