Поставка данных в ksqlDB
- Перед началом работы
- Настройте интеграцию с Apache Kafka® для базы ksqlDB
- Изучите формат данных, поступающих от Managed Service for Apache Kafka®
- Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
- Получите тестовые данные из кластера Managed Service for Apache Kafka®
- Запишите тестовые данные в ksqlDB
- Проверьте наличие тестовых данных в топике Apache Kafka®
- Удалите созданные ресурсы
ksqlDB — это база данных, которая предназначена для потоковой обработки сообщений, поступающих из топиков Apache Kafka®. Работа с потоком сообщений в ksqlDB похожа на работу с таблицами в обычной базе данных. Таблица ksqlDB автоматически пополняется данными, поступающими из топика, а данные, которые вы добавите в таблицу ksqlDB, отправляются в топик Apache Kafka®. Подробнее см. в документации ksqlDB.
Чтобы настроить поставку данных из Managed Service for Apache Kafka® в ksqlDB:
- Настройте интеграцию с Apache Kafka® для базы ksqlDB.
- Изучите формат данных, поступающих от Managed Service for Apache Kafka®.
- Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®.
- Получите тестовые данные из кластера Managed Service for Apache Kafka®
- Запишите тестовые данные в ksqlDB.
- Проверьте наличие тестовых данных в топике Apache Kafka®.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей вам конфигурации:
-
Если сервер ksqlDB размещен в интернете, создайте кластер Managed Service for Apache Kafka® с публичным доступом.
-
Если сервер ksqlDB размещен в Yandex Cloud, создайте кластер Managed Service for Apache Kafka® в той же облачной сети, где находится ksqlDB.
-
-
Создайте топики в кластере Managed Service for Apache Kafka®:
- Служебный топик
_confluent-ksql-default__command_topic
с настройками:- Фактор репликации —
1
. - Количество разделов —
1
. - Политика очистки лога —
Delete
. - Время жизни сегмента лога, мс —
-1
. - Минимальное число синхронных реплик —
1
.
- Фактор репликации —
- Служебный топик
default_ksql_processing_log
. Настройки топика могут быть любыми. - Топик для хранения данных
locations
. Настройки топика могут быть любыми.
- Служебный топик
-
Создайте пользователя с именем
ksql
и назначьте ему ролиACCESS_ROLE_PRODUCER
иACCESS_ROLE_CONSUMER
для всех созданных ранее топиков. -
Убедитесь, что вы можете подключиться к серверу ksqlDB.
-
Установите утилиту
kafkacat
на сервер ksqlDB и убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for Apache Kafka® через SSL. -
Установите утилиту для потоковой обработки JSON-файлов jq на сервер ksqlDB.
Настройте интеграцию с Apache Kafka® для базы ksqlDB
-
Подключитесь к серверу ksqlDB.
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы ksqlDB мог использовать этот сертификат при защищенном подключении к хостам кластера. При этом задайте пароль в параметре
-storepass
для дополнительной защиты хранилища:cd /etc/ksqldb && \ sudo keytool -importcert -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \ -keystore ssl -storepass <пароль хранилища сертификатов> \ --noprompt
-
Укажите в файле конфигурации ksqlDB
/etc/ksqldb/ksql-server.properties
данные для аутентификации в кластере Managed Service for Apache Kafka®:bootstrap.servers=<FQDN брокера 1:9091, ..., FQDN брокера N:9091> sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/ksqldb/ssl ssl.truststore.password=<пароль хранилища сертификатов> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ksql" password="<пароль пользователя ksql>";
FQDN брокеров можно запросить со списком хостов в кластере, имя кластера — со списком кластеров в каталоге.
-
Перезапустите сервис ksqlDB командой:
sudo systemctl restart confluent-ksqldb.service
Изучите формат данных, поступающих от Managed Service for Apache Kafka®
Обработка потока данных из Managed Service for Apache Kafka® зависит от формата представления в сообщении Apache Kafka®.
В примере в топик Apache Kafka® locations
будут записываться геоданные в формате JSON:
- идентификатор
profileId
; - широта
latitude
; - долгота
longitude
;
Эти данные будут передаваться в виде сообщений Apache Kafka®. Каждое такое сообщение будет содержать JSON-объект как строку следующего вида:
{"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
База ksqlDB будет использовать таблицу из трех столбцов, в которой хранятся значения соответствующих параметров из сообщений Apache Kafka®.
Далее выполним настройку полей потоковой таблицы в базе ksqlDB.
Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
Чтобы записывать информацию из топика Apache Kafka®, создайте в базе ksqlDB таблицу. Структура таблицы соответствует формату данных, которые поступают из Managed Service for Apache Kafka®:
-
Подключитесь к серверу ksqlDB.
-
Запустите клиент
ksql
командой:ksql http://0.0.0.0:8088
-
Выполните запрос:
CREATE STREAM riderLocations ( profileId VARCHAR, latitude DOUBLE, longitude DOUBLE ) WITH ( kafka_topic='locations', value_format='json', partitions=<количество разделов топика "locations"> );
Эта потоковая таблица будет автоматически наполняться сообщениями из топика
locations
кластера Managed Service for Apache Kafka®. Для чтения сообщений ksqlDB использует настройки пользователяksql
.Подробнее о создании потоковой таблицы на движке ksqlDB см. в документации ksqlDB.
-
Выполните запрос:
SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
Запрос ожидает появления данных в таблице в реальном времени.
Получите тестовые данные из кластера Managed Service for Apache Kafka®
-
Подключитесь к серверу ksqlDB.
-
Создайте файл
sample.json
со следующими тестовыми данными:{ "profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205 } { "profileId": "4ab5cbad", "latitude": 37.3952, "longitude": -122.0813 } { "profileId": "4a7c7b41", "latitude": 37.4049, "longitude": -122.0822 }
-
Отправьте файл
sample.json
в топикlocations
кластера Managed Service for Apache Kafka® с помощьюjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN брокера 1:9091, ..., FQDN брокера N:9091> \ -t locations \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=ksql \ -X sasl.password="<пароль пользователя ksql>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z
Информация отправляется с помощью пользователя
ksql
. Подробнее о настройке SSL-сертификата и работе сkafkacat
см. в разделе Подключение к топикам в кластере Apache Kafka®. -
Убедитесь, что в сессии отобразились данные, которые были отправлены в топик:
+--------------------------+--------------------------+------------------------+ |PROFILEID |LATITUDE |LONGITUDE | +--------------------------+--------------------------+------------------------+ |4ab5cbad |37.3952 |-122.0813 | |4a7c7b41 |37.4049 |-122.0822 |
Данные считываются с помощью пользователя ksql
.
Запишите тестовые данные в ksqlDB
-
Подключитесь к серверу ksqlDB.
-
Запустите клиент
ksql
командой:ksql http://0.0.0.0:8088
-
Вставьте тестовые данные в таблицу
riderLocations
:INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
Эти данные синхронно отправляются в топик Apache Kafka®
locations
с помощью пользователяksql
.
Проверьте наличие тестовых данных в топике Apache Kafka®
-
Подключитесь к серверу ksqlDB.
-
Проверьте сообщения в топике
locations
кластера Managed Service for Apache Kafka® с помощьюkafkacat
и пользователяksql
:kafkacat -C \ -b <FQDN брокера 1:9091, ..., FQDN брокера N:9091> \ -t locations \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=ksql \ -X sasl.password="<пароль пользователя ksql>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z -K:
-
Убедитесь, что в консоли отображаются сообщения, которые вы записали в таблицу.
Удалите созданные ресурсы
Если созданные ресурсы вам больше не нужны, удалите их:
- Удалите виртуальную машину.
- Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
- Удалите кластер Managed Service for Apache Kafka®.