Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
  • Пошаговые инструкции
    • Все инструкции
    • Информация об имеющихся кластерах
    • Создание кластера
    • Подключение к кластеру
    • Остановка и запуск кластера
    • Обновление версии Apache Kafka®
    • Изменение настроек кластера
    • Управление хостами Apache Kafka®
    • Работа с топиками и разделами
    • Управление пользователями Apache Kafka®
    • Управление коннекторами
    • Просмотр логов кластера
    • Удаление кластера
    • Мониторинг состояния кластера и хостов
  • Практические руководства
    • Все руководства
    • Настройка Kafka Connect для работы с Managed Service for Apache Kafka®
    • Использование схем формата данных с Managed Service for Apache Kafka®
      • Обзор
      • Работа с управляемым реестром схем формата данных
      • Использование Confluent Schema Registry с Managed Service for Apache Kafka®
    • Миграция базы данных из стороннего кластера Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных из Yandex Managed Service for PostgreSQL с помощью Debezium
    • Поставка данных из Yandex Managed Service for MySQL с помощью Debezium
    • Поставка данных из Yandex Managed Service for PostgreSQL с помощью Yandex Data Transfer
    • Поставка данных в Managed Service for ClickHouse
    • Поставка данных в Yandex Managed Service for ClickHouse с помощью Yandex Data Transfer
    • Поставка данных в ksqlDB
    • Поставка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
  • Концепции
    • Взаимосвязь ресурсов сервиса
    • Топики и разделы
    • Брокеры
    • Производители и потребители
    • Управление схемами данных
    • Классы хостов
    • Сеть в Managed Service for Apache Kafka®
    • Квоты и лимиты
    • Типы дисков
    • Коннекторы
    • Техническое обслуживание
    • Настройки Apache Kafka®
  • Управление доступом
  • Правила тарификации
  • Справочник API
    • Аутентификация в API
    • gRPC (англ.)
      • Overview
      • ClusterService
      • ConnectorService
      • ResourcePresetService
      • TopicService
      • UserService
      • OperationService
    • REST (англ.)
      • Overview
      • Cluster
        • Overview
        • create
        • delete
        • get
        • list
        • listHosts
        • listLogs
        • listOperations
        • move
        • rescheduleMaintenance
        • start
        • stop
        • streamLogs
        • update
      • Connector
        • Overview
        • create
        • delete
        • get
        • list
        • pause
        • resume
        • update
      • ResourcePreset
        • Overview
        • get
        • list
      • Topic
        • Overview
        • create
        • delete
        • get
        • list
        • update
      • User
        • Overview
        • create
        • delete
        • get
        • grantPermission
        • list
        • revokePermission
        • update
      • Operation
        • Overview
        • get
  • История изменений
  • Вопросы и ответы
  1. Практические руководства
  2. Настройка Kafka Connect для работы с Managed Service for Apache Kafka®

Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
,
улучшена
Dmitry A.
  • Перед началом работы
  • Настройте виртуальную машину
  • Подготовьте тестовые данные
  • Настройте Kafka Connect
  • Запустите Kafka Connect и проверьте его работу
  • Удалите созданные ресурсы

Примечание

Managed Service for Apache Kafka® имеет встроенную поддержку некоторых коннекторов и позволяет управлять ими. Список доступных коннекторов приведен в разделе Коннекторы. Если вам нужны другие коннекторы, или вы хотите управлять работой Kafka Connect вручную, используйте информацию из этого руководства.

Инструмент Kafka Connect предназначен для перемещения данных между Apache Kafka® и другими хранилищами данных.

Работа с данными в Kafka Connect осуществляется с помощью процессов-исполнителей (workers). Инструмент может быть развернут как в виде распределенной системы (distributed mode) с несколькими процессами-исполнителями, так и в виде отдельной инсталляции из одного процесса-исполнителя (standalone mode).

Непосредственно перемещение данных выполняется с помощью коннекторов, которые запускаются в отдельных потоках процесса-исполнителя.

Подробнее о Kafka Connect см. в документации Apache Kafka®.

Далее будет продемонстрировано, как настроить Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®. Инструмент будет развернут на виртуальной машине Yandex Cloud в виде отдельной инсталляции. Для защиты подключения будет использоваться SSL-шифрование.

Также будет настроен простой коннектор FileStreamSource, с помощью которого Kafka Connect прочитает данные из тестового JSON-файла и передаст их в топик кластера.

Примечание

Вы можете использовать любой другой коннектор Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®.

Чтобы настроить Kafka Connect для работы с кластером Managed Service for Apache Kafka®:

  1. Настройте виртуальную машину.
  2. Подготовьте тестовые данные.
  3. Настройте Kafka Connect.
  4. Запустите Kafka Connect и проверьте его работу.

Если созданные ресурсы вам больше не нужны, удалите их.

Перед началом работы

Вручную
С помощью Terraform
  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.

  2. Создайте топик с именем messages для обмена сообщениями между Kafka Connect и кластером Managed Service for Apache Kafka®.

  3. Создайте пользователя с именем user и выдайте ему права на топик messages:

    • ACCESS_ROLE_CONSUMER,
    • ACCESS_ROLE_PRODUCER.
  4. В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.

  1. Если у вас еще нет Terraform, установите его.

  2. Скачайте файл с настройками провайдера. Поместите его в отдельную рабочую директорию и укажите значения параметров.

  3. Скачайте в ту же рабочую директорию файл конфигурации kafka-connect.tf.

    В этом файле описаны:

    • сеть;
    • подсеть;
      * группа безопасности по умолчанию и правила, необходимые для подключения к кластеру и виртуальной машине из интернета;
    • виртуальная машина с Ubuntu 20.04;
    • кластер Managed Service for Apache Kafka® с необходимыми настройками.
  4. Укажите в файле пароль для пользователя user, который будет использоваться для доступа к кластеру Managed Service for Apache Kafka®, а также имя пользователя и публичную часть SSH-ключа для виртуальной машины. Если на виртуальную машину будет установлена Ubuntu 20.04 из рекомендованного списка образов, то указанное здесь имя пользователя игнорируется. В таком случае при подключении используйте имя пользователя ubuntu.

  5. Выполните команду terraform init в директории с конфигурационным файлом. Эта команда инициализирует провайдеров, указанных в конфигурационных файлах, и позволяет работать с ресурсами и источниками данных провайдера.

  6. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  7. Создайте необходимую инфраструктуру:

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

Настройте виртуальную машину

  1. Подключитесь к виртуальной машине по SSH.

  2. Установите JDK и утилиту kcat:

    sudo apt update && \
    sudo apt install default-jdk --yes && \
    sudo apt install kafkacat
    
  3. Скачайте и распакуйте архив с Apache Kafka®:

    wget https://downloads.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz && tar -xvf kafka_2.12-3.1.0.tgz --strip 1 --directory /opt/kafka/
    

    В данном примере используется Apache Kafka® версии 3.1.0.

  4. Получите SSL-сертификат.

  5. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре -storepass для дополнительной защиты хранилища:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
                 -keystore ssl -storepass <пароль хранилища сертификатов, не короче 6 символов> \
                 --noprompt
    
  6. Создайте каталог с настройками процесса-исполнителя и скопируйте туда хранилище:

    sudo mkdir --parents /etc/kafka-connect-worker && \
    sudo cp ssl /etc/kafka-connect-worker/client.truststore.jks
    

Подготовьте тестовые данные

Создайте файл /var/log/sample.json с тестовыми данными. В этом файле приведены данные от сенсоров нескольких автомобилей в формате JSON:

sample.json
{"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-05 17:27:00","latitude":55.70329032,"longitude":37.65472196,"altitude":427.5,"speed":0,"battery_voltage":23.5,"cabin_temperature":17,"fuel_level":null}
{"device_id":"rhibbh3y08qmz3sdbrbu","datetime":"2020-06-06 09:49:54","latitude":55.71294467,"longitude":37.66542005,"altitude":429.13,"speed":55.5,"battery_voltage":null,"cabin_temperature":18,"fuel_level":32}
{"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-07 15:00:10","latitude":55.70985913,"longitude":37.62141918,"altitude":417,"speed":15.7,"battery_voltage":10.3,"cabin_temperature":17,"fuel_level":null}

Настройте Kafka Connect

  1. Создайте файл настроек процесса-исполнителя /etc/kafka-connect-worker/worker.properties:

    # AdminAPI connect properties
    bootstrap.servers=<FQDN хоста-брокера>:9091
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
    ssl.truststore.password=<пароль к хранилищу сертификата>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль пользователя user>";
    
    # Producer connect properties
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.security.protocol=SASL_SSL
    producer.ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
    producer.ssl.truststore.password=<пароль к хранилищу сертификата>
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль пользователя user>";
    
    # Worker properties
    plugin.path=/etc/kafka-connect-worker/plugins
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/etc/kafka-connect-worker/worker.offset
    

    Kafka Connect будет подключаться к кластеру Managed Service for Apache Kafka® от имени пользователя user, созданного ранее.

    FQDN хостов-брокеров можно запросить со списком хостов в кластере.

  2. Создайте файл настроек коннектора /etc/kafka-connect-worker/file-connector.properties:

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/var/log/sample.json
    topic=messages
    

    Где:

    • file — имя файла, из которого коннектор будет читать данные.
    • topic — имя топика в кластере Managed Service for Apache Kafka®, куда коннектор будет передавать данные.

Запустите Kafka Connect и проверьте его работу

  1. Чтобы отправить тестовые данные в кластер, запустите процесс-исполнитель на виртуальной машине:

    cd ~/opt/kafka/bin/ && \
    sudo ./connect-standalone.sh \
         /etc/kafka-connect-worker/worker.properties \
         /etc/kafka-connect-worker/file-connector.properties
    
  2. Подключитесь к кластеру с помощью kcat и получите данные из топика кластера:

    kafkacat -C \
        -b <FQDN хоста-брокера>:9091 \
        -t messages \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=user \
        -X sasl.password="<пароль учетной записи user>" \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z -K:
    

    FQDN хостов-брокеров можно запросить со списком хостов в кластере.

    В выводе команды вы увидите содержимое тестового файла /var/log/sample.json, переданное на предыдущем шаге.

Удалите созданные ресурсы

Если созданные ресурсы вам больше не нужны, удалите их:

Вручную
С помощью Terraform
  1. Удалите виртуальную машину.
  2. Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
  3. Удалите кластер Managed Service for Apache Kafka®.

Чтобы удалить инфраструктуру, созданную с помощью Terraform:

  1. В терминале перейдите в директорию с планом инфраструктуры.

  2. Удалите конфигурационный файл kafka-connect.tf.

  3. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  4. Подтвердите изменение ресурсов.

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    Все ресурсы, которые были описаны в конфигурационном файле, будут удалены.

Была ли статья полезна?

Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Настройте виртуальную машину
  • Подготовьте тестовые данные
  • Настройте Kafka Connect
  • Запустите Kafka Connect и проверьте его работу
  • Удалите созданные ресурсы