Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
  • Пошаговые инструкции
    • Все инструкции
    • Информация об имеющихся кластерах
    • Создание кластера
    • Подключение к кластеру
    • Остановка и запуск кластера
    • Обновление версии Apache Kafka®
    • Изменение настроек кластера
    • Управление хостами Apache Kafka®
    • Работа с топиками и разделами
    • Управление пользователями Apache Kafka®
    • Управление коннекторами
    • Просмотр логов кластера
    • Удаление кластера
    • Мониторинг состояния кластера и хостов
  • Практические руководства
    • Все руководства
    • Настройка Kafka Connect для работы с Managed Service for Apache Kafka®
    • Использование схем формата данных с Managed Service for Apache Kafka®
      • Обзор
      • Работа с управляемым реестром схем формата данных
      • Использование Confluent Schema Registry с Managed Service for Apache Kafka®
    • Миграция базы данных из стороннего кластера Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных из Yandex Managed Service for PostgreSQL с помощью Debezium
    • Поставка данных из Yandex Managed Service for MySQL с помощью Debezium
    • Поставка данных из Yandex Managed Service for PostgreSQL с помощью Yandex Data Transfer
    • Поставка данных в Managed Service for ClickHouse
    • Поставка данных в Yandex Managed Service for ClickHouse с помощью Yandex Data Transfer
    • Поставка данных в ksqlDB
    • Поставка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
  • Концепции
    • Взаимосвязь ресурсов сервиса
    • Топики и разделы
    • Брокеры
    • Производители и потребители
    • Управление схемами данных
    • Классы хостов
    • Сеть в Managed Service for Apache Kafka®
    • Квоты и лимиты
    • Типы дисков
    • Коннекторы
    • Техническое обслуживание
    • Настройки Apache Kafka®
  • Управление доступом
  • Правила тарификации
  • Справочник API
    • Аутентификация в API
    • gRPC (англ.)
      • Overview
      • ClusterService
      • ConnectorService
      • ResourcePresetService
      • TopicService
      • UserService
      • OperationService
    • REST (англ.)
      • Overview
      • Cluster
        • Overview
        • create
        • delete
        • get
        • list
        • listHosts
        • listLogs
        • listOperations
        • move
        • rescheduleMaintenance
        • start
        • stop
        • streamLogs
        • update
      • Connector
        • Overview
        • create
        • delete
        • get
        • list
        • pause
        • resume
        • update
      • ResourcePreset
        • Overview
        • get
        • list
      • Topic
        • Overview
        • create
        • delete
        • get
        • list
        • update
      • User
        • Overview
        • create
        • delete
        • get
        • grantPermission
        • list
        • revokePermission
        • update
      • Operation
        • Overview
        • get
  • История изменений
  • Вопросы и ответы
  1. Практические руководства
  2. Поставка данных в ksqlDB

Поставка данных в ksqlDB

Статья создана
Yandex Cloud
  • Перед началом работы
  • Настройте интеграцию с Apache Kafka® для базы ksqlDB
  • Изучите формат данных, поступающих от Managed Service for Apache Kafka®
  • Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
  • Получите тестовые данные из кластера Managed Service for Apache Kafka®
  • Запишите тестовые данные в ksqlDB
  • Проверьте наличие тестовых данных в топике Apache Kafka®
  • Удалите созданные ресурсы

ksqlDB — это база данных, которая предназначена для потоковой обработки сообщений, поступающих из топиков Apache Kafka®. Работа с потоком сообщений в ksqlDB похожа на работу с таблицами в обычной базе данных. Таблица ksqlDB автоматически пополняется данными, поступающими из топика, а данные, которые вы добавите в таблицу ksqlDB, отправляются в топик Apache Kafka®. Подробнее см. в документации ksqlDB.

Чтобы настроить поставку данных из Managed Service for Apache Kafka® в ksqlDB:

  1. Настройте интеграцию с Apache Kafka® для базы ksqlDB.
  2. Изучите формат данных, поступающих от Managed Service for Apache Kafka®.
  3. Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®.
  4. Получите тестовые данные из кластера Managed Service for Apache Kafka®
  5. Запишите тестовые данные в ksqlDB.
  6. Проверьте наличие тестовых данных в топике Apache Kafka®.

Если созданные ресурсы вам больше не нужны, удалите их.

Перед началом работы

  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей вам конфигурации:

    • Если сервер ksqlDB размещен в интернете, создайте кластер Managed Service for Apache Kafka® с публичным доступом.

    • Если сервер ksqlDB размещен в Yandex Cloud, создайте кластер Managed Service for Apache Kafka® в той же облачной сети, где находится ksqlDB.

  2. Создайте топики в кластере Managed Service for Apache Kafka®:

    1. Служебный топик _confluent-ksql-default__command_topic с настройками:
      • Фактор репликации — 1.
      • Количество разделов — 1.
      • Политика очистки лога — Delete.
      • Время жизни сегмента лога, мс — -1.
      • Минимальное число синхронных реплик — 1.
    2. Служебный топик default_ksql_processing_log. Настройки топика могут быть любыми.
    3. Топик для хранения данных locations. Настройки топика могут быть любыми.
  3. Создайте пользователя с именем ksql и назначьте ему роли ACCESS_ROLE_PRODUCER и ACCESS_ROLE_CONSUMER для всех созданных ранее топиков.

  4. Убедитесь, что вы можете подключиться к серверу ksqlDB.

  5. Установите утилиту kafkacat на сервер ksqlDB и убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for Apache Kafka® через SSL.

  6. Установите утилиту для потоковой обработки JSON-файлов jq на сервер ksqlDB.

Настройте интеграцию с Apache Kafka® для базы ksqlDB

  1. Подключитесь к серверу ksqlDB.

  2. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы ksqlDB мог использовать этот сертификат при защищенном подключении к хостам кластера. При этом задайте пароль в параметре -storepass для дополнительной защиты хранилища:

    cd /etc/ksqldb && \
    sudo keytool -importcert -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
    -keystore ssl -storepass <пароль хранилища сертификатов> \
    --noprompt
    
  3. Укажите в файле конфигурации ksqlDB /etc/ksqldb/ksql-server.properties данные для аутентификации в кластере Managed Service for Apache Kafka®:

    bootstrap.servers=<FQDN брокера 1:9091, ..., FQDN брокера N:9091>
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/ksqldb/ssl
    ssl.truststore.password=<пароль хранилища сертификатов>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ksql" password="<пароль пользователя ksql>";
    

    FQDN брокеров можно запросить со списком хостов в кластере, имя кластера — со списком кластеров в каталоге.

  4. Перезапустите сервис ksqlDB командой:

    sudo systemctl restart confluent-ksqldb.service
    

Изучите формат данных, поступающих от Managed Service for Apache Kafka®

Обработка потока данных из Managed Service for Apache Kafka® зависит от формата представления в сообщении Apache Kafka®.

В примере в топик Apache Kafka® locations будут записываться геоданные в формате JSON:

  • идентификатор profileId;
  • широта latitude;
  • долгота longitude;

Эти данные будут передаваться в виде сообщений Apache Kafka®. Каждое такое сообщение будет содержать JSON-объект как строку следующего вида:

{"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}

База ksqlDB будет использовать таблицу из трех столбцов, в которой хранятся значения соответствующих параметров из сообщений Apache Kafka®.

Далее выполним настройку полей потоковой таблицы в базе ksqlDB.

Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®

Чтобы записывать информацию из топика Apache Kafka®, создайте в базе ksqlDB таблицу. Структура таблицы соответствует формату данных, которые поступают из Managed Service for Apache Kafka®:

  1. Подключитесь к серверу ksqlDB.

  2. Запустите клиент ksql командой:

    ksql http://0.0.0.0:8088
    
  3. Выполните запрос:

    CREATE STREAM riderLocations 
    (
      profileId VARCHAR,
      latitude DOUBLE,
      longitude DOUBLE
    ) WITH 
    (
      kafka_topic='locations', 
      value_format='json', 
      partitions=<количество разделов топика "locations">
    );
    

    Эта потоковая таблица будет автоматически наполняться сообщениями из топика locations кластера Managed Service for Apache Kafka®. Для чтения сообщений ksqlDB использует настройки пользователя ksql.

    Подробнее о создании потоковой таблицы на движке ksqlDB см. в документации ksqlDB.

  4. Выполните запрос:

    SELECT * FROM riderLocations WHERE 
             GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 
             EMIT CHANGES;
    

    Запрос ожидает появления данных в таблице в реальном времени.

Получите тестовые данные из кластера Managed Service for Apache Kafka®

  1. Подключитесь к серверу ksqlDB.

  2. Создайте файл sample.json со следующими тестовыми данными:

    {
      "profileId": "c2309eec", 
      "latitude": 37.7877,
      "longitude": -122.4205
    }
    
    {
      "profileId": "4ab5cbad", 
      "latitude": 37.3952,
      "longitude": -122.0813
    }
    
    {
      "profileId": "4a7c7b41", 
      "latitude": 37.4049,
      "longitude": -122.0822
    }   
    
  3. Отправьте файл sample.json в топик locations кластера Managed Service for Apache Kafka® с помощью jq и kafkacat:

    jq -rc . sample.json | kafkacat -P  \
       -b <FQDN брокера 1:9091, ..., FQDN брокера N:9091> \
       -t locations \
       -X security.protocol=SASL_SSL \
       -X sasl.mechanisms=SCRAM-SHA-512 \
       -X sasl.username=ksql \
       -X sasl.password="<пароль пользователя ksql>" \
       -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z
    

    Информация отправляется с помощью пользователя ksql. Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к топикам в кластере Apache Kafka®.

  4. Убедитесь, что в сессии отобразились данные, которые были отправлены в топик:

    +--------------------------+--------------------------+------------------------+
    |PROFILEID                 |LATITUDE                  |LONGITUDE               |
    +--------------------------+--------------------------+------------------------+
    |4ab5cbad                  |37.3952                   |-122.0813               | 
    |4a7c7b41                  |37.4049                   |-122.0822               |
    

Данные считываются с помощью пользователя ksql.

Запишите тестовые данные в ksqlDB

  1. Подключитесь к серверу ksqlDB.

  2. Запустите клиент ksql командой:

    ksql http://0.0.0.0:8088
    
  3. Вставьте тестовые данные в таблицу riderLocations:

    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
    

    Эти данные синхронно отправляются в топик Apache Kafka® locations с помощью пользователя ksql.

Проверьте наличие тестовых данных в топике Apache Kafka®

  1. Подключитесь к серверу ksqlDB.

  2. Проверьте сообщения в топике locations кластера Managed Service for Apache Kafka® с помощью kafkacat и пользователя ksql:

    kafkacat -C  \
     -b <FQDN брокера 1:9091, ..., FQDN брокера N:9091> \
     -t locations \
     -X security.protocol=SASL_SSL \
     -X sasl.mechanisms=SCRAM-SHA-512 \
     -X sasl.username=ksql \
     -X sasl.password="<пароль пользователя ksql>" \
     -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z -K:   
    
  3. Убедитесь, что в консоли отображаются сообщения, которые вы записали в таблицу.

Удалите созданные ресурсы

Если созданные ресурсы вам больше не нужны, удалите их:

  • Удалите виртуальную машину.
  • Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
  • Удалите кластер Managed Service for Apache Kafka®.

Была ли статья полезна?

Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Настройте интеграцию с Apache Kafka® для базы ksqlDB
  • Изучите формат данных, поступающих от Managed Service for Apache Kafka®
  • Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
  • Получите тестовые данные из кластера Managed Service for Apache Kafka®
  • Запишите тестовые данные в ksqlDB
  • Проверьте наличие тестовых данных в топике Apache Kafka®
  • Удалите созданные ресурсы