Подключение к кластеру Apache Kafka® из приложений
В этом разделе представлены настройки для подключения к хостам кластера Managed Service for Apache Kafka® с помощью инструментов командной строки и из Docker-контейнера. О подключении из кода вашего приложения см. Примеры кода.
Вы можете подключаться к хостам кластера Apache Kafka® в публичном доступе только с использованием SSL-сертификата. В примерах ниже предполагается, что сертификат YandexInternalRootCA.crt
расположен в директории:
/usr/local/share/ca-certificates/Yandex/
для Ubuntu;$HOME\.kafka\
для Windows.
Подключение без использования SSL-сертификата поддерживается только для хостов, находящихся не в публичном доступе. В этом случае трафик внутри виртуальной сети при подключении к БД шифроваться не будет.
При необходимости перед подключением настройте группы безопасности кластера.
Примеры для Linux проверялись в следующем окружении:
- Виртуальная машина в Yandex Cloud с Ubuntu 20.04 LTS.
- Bash:
5.0.16
.
Примеры для Windows проверялись в следующем окружении:
- Виртуальная машина в Yandex Cloud с Windows Server 2019 Datacenter.
- Microsoft OpenJDK:
11.0.11
. - PowerShell:
5.1.17763.1490 Desktop
.
Инструменты командной строки
Примеры кода с заполненным FQDN хоста доступны в консоли управления
Linux (Bash)/macOS (Zsh)
Для подключения к кластеру Apache Kafka® из командной строки используйте утилиту kafkacat
— приложение с открытым исходным кодом, которое может работать как универсальный производитель или потребитель данных. Подробнее читайте в документации
Перед подключением установите зависимости:
sudo apt update && sudo apt install -y kafkacat
-
Запустите команду получения сообщений из топика:
kafkacat -C \ -b <FQDN_брокера>:9092 \ -t <имя_топика> \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_потребителя>" \ -X sasl.password="<пароль_потребителя>" -Z
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "test message" | kafkacat -P \ -b <FQDN_брокера>:9092 \ -t <имя_топика> \ -k key \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_производителя>" \ -X sasl.password="<пароль_производителя>" -Z
-
Запустите команду получения сообщений из топика:
kafkacat -C \ -b <FQDN_брокера>:9091 \ -t <имя_топика> \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_потребителя>" \ -X sasl.password="<пароль_потребителя>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "test message" | kafkacat -P \ -b <FQDN_брокера>:9091 \ -t <имя_топика> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_производителя>" \ -X sasl.password="<пароль_производителя>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
О том, как получить FQDN хоста-брокера, см. инструкцию.
Убедитесь, что в первом терминале отобразилось сообщение key:test message
, отправленное во втором.
Windows (PowerShell)
Перед подключением:
-
Установите последнюю доступную версию Microsoft OpenJDK
. -
Загрузите архив с бинарными файлами
для версии Apache Kafka®, которая используется в кластере. Версия Scala неважна. -
Распакуйте архив.
Совет
Распаковывайте файлы Apache Kafka® в корневой каталог диска, например,
C:\kafka_2.12-2.6.0\
.Если путь к исполняемым и пакетным файлам Apache Kafka® будет слишком длинным, то при попытке запустить их возникнет ошибка
The input line is too long
.
-
Запустите команду получения сообщений из топика:
<путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat ` --bootstrap-server <FQDN_брокера>:9092 ` --topic <имя_топика> ` --property print.key=true ` --property key.separator=":" ` --consumer-property security.protocol=SASL_PLAINTEXT ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<логин_потребителя>' password='<пароль_потребителя>';"
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat ` --bootstrap-server <FQDN_брокера>:9092 ` --topic <имя_топика> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_PLAINTEXT ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<логин_производителя>' password='<пароль_производителя>';"
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре
--storepass
для дополнительной защиты хранилища:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <пароль_хранилища_сертификатов> ` --noprompt
-
Запустите команду получения сообщений из топика:
<путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat ` --bootstrap-server <FQDN_брокера>:9091 ` --topic <имя_топика> ` --property print.key=true ` --property key.separator=":" ` --consumer-property security.protocol=SASL_SSL ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property ssl.truststore.location=$HOME\.kafka\ssl ` --consumer-property ssl.truststore.password=<пароль_хранилища_сертификатов> ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<логин_потребителя>' password='<пароль_потребителя>';"
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat ` --bootstrap-server <FQDN_брокера>:9091 ` --topic <имя_топика> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_SSL ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property ssl.truststore.location=$HOME\.kafka\ssl ` --producer-property ssl.truststore.password=<пароль_хранилища_сертификатов> ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<логин_производителя>' password='<пароль_производителя>';"
О том, как получить FQDN хоста-брокера, см. инструкцию.
Убедитесь, что в первом терминале отобразилось сообщение key:test message
, отправленное во втором.
Подготовка к подключению из Docker-контейнера
Чтобы подключаться к кластеру Managed Service for Apache Kafka® из Docker-контейнера, добавьте в Dockerfile строки:
RUN apt-get update && \
apt-get install kafkacat --yes
RUN apt-get update && \
apt-get install wget kafkacat --yes && \
mkdir --parents /usr/local/share/ca-certificates/Yandex/ && \
wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
--output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \
chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt