Получение данных из Managed Service for Apache Kafka®
- Перед началом работы
- Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse
- Изучите формат данных, поступающих от Managed Service for Apache Kafka®
- Создайте в кластере Managed Service for ClickHouse таблицу на движке Apache Kafka®
- Отправьте тестовые данные в кластер Managed Service for Apache Kafka®
- Проверьте наличие тестовых данных в таблице кластера Managed Service for ClickHouse
В кластер Managed Service for ClickHouse можно поставлять данные из кластера Managed Service for Apache Kafka® в реальном времени. Managed Service for ClickHouse будет автоматически вставлять в таблицу на движке Apache Kafka® данные, поступающие в определенные топики Apache Kafka®.
Чтобы настроить поставку данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse:
- Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse.
- Изучите формат данных, поступающих от Managed Service for Apache Kafka®.
- Создайте в кластере Managed Service for ClickHouse таблицу на движке Apache Kafka®.
- Отправьте тестовые данные в кластер Managed Service for Apache Kafka®.
- Проверьте наличие тестовых данных в таблице кластера Managed Service for ClickHouse.
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей вам конфигурации.
-
Создайте кластер Managed Service for ClickHouse любой подходящей вам конфигурации с базой данных
db1
.Примечание
Интеграцию с Apache Kafka® можно настроить уже на этапе создания кластера. В этом сценарии использования интеграция будет настроена позже.
-
Создайте топик
datastore
в кластере Managed Service for Apache Kafka®. -
Создайте учетные записи в кластере Managed Service for Apache Kafka® для работы с топиком
datastore
:- учетную запись производителя
writer
; - учетную запись потребителя
reader
.
- учетную запись производителя
-
Установите утилиту
kafkacat
и убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for Apache Kafka® через SSL. -
Установите утилиту
clickhouse-client
и убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for ClickHouse через SSL. -
Установите утилиту для потоковой обработки JSON-файлов jq.
Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse
Укажите в настройках кластера Managed Service for ClickHouse данные для аутентификации в кластере Managed Service for Apache Kafka® (секция Настройки СУБД → Kafka):
- Sasl mechanism —
SCRAM-SHA-512
. - Sasl password — пароль учетной записи потребителя
reader
. - Sasl username — имя учетной записи потребителя (в этой пошаговой инструкции —
reader
). - Security protocol —
SASL_SSL
.
Кластер Managed Service for ClickHouse будет использовать эти данные для аутентификации при обращении к любому топику Apache Kafka®.
Изучите формат данных, поступающих от Managed Service for Apache Kafka®
Чтобы обработать данные, поступающие от Managed Service for Apache Kafka®, нужно понимать, каким образом они представлены в сообщении Apache Kafka®, и соответствующим образом настроить движок Apache Kafka® при создании таблицы в кластере Managed Service for ClickHouse.
В качестве примера в топик Apache Kafka® datastore
будут помещены данные от сенсоров автомобиля в формате JSON:
- строковый идентификатор устройства
device_id
; - дата и время формирования данных
datetime
в форматеYYYY-MM-DD HH:MM:SS
; - координаты автомобиля:
- широта
latitude
; - долгота
longitude
; - высота над уровнем моря
altitude
:
- широта
- текущая скорость
speed
; - напряжение батарей
battery_voltage
(для электромобиля, для автомобиля с с ДВС значение этого параметра —null
); - температура в салоне
cabin_temperature
; - уровень топлива
fuel_level
(для автомобиля с ДВС, для электромобиля значение этого параметра —null
).
Эти данные будут передаваться в виде сообщений Apache Kafka®. Каждое такое сообщение будет содержать JSON-объект как строку следующего вида:
{"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-05 17:27:00","latitude":"55.70329032","longitude":"37.65472196","altitude":"427.5","speed":"0","battery_voltage":"23.5","cabin_temperature":"17","fuel_level":null}
Кластер Managed Service for ClickHouse будет использовать при вставке в таблицу формат данных JSONEachRow, который позволяет преобразовать строковое представление JSON-объекта из сообщения Apache Kafka® в нужный набор значений столбцов.
Создайте в кластере Managed Service for ClickHouse таблицу на движке Apache Kafka®
Исходя из формата данных, приведенного выше, создайте в кластере Managed Service for ClickHouse таблицу, в которую будут заноситься поступающие от Managed Service for Apache Kafka® данные:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse с помощьюclickhouse-client
. -
Выполните запрос:
CREATE TABLE IF NOT EXISTS db1.kafka ( device_id String, datetime DateTime, latitude Float32, longitude Float32, altitude Float32, speed Float32, battery_voltage Nullable(Float32), cabin_temperature Float32, fuel_level Nullable(Float32) ) ENGINE = Kafka() SETTINGS kafka_broker_list = '<FQDN хоста-брокера Kafka>:9091', kafka_topic_list = 'datastore', kafka_group_name = 'sample_group', kafka_format = 'JSONEachRow';
Эта таблица будет автоматически наполняться сообщениями, считываемыми из топика datastore
кластера Managed Service for Apache Kafka®. При чтении данных Managed Service for ClickHouse использует указанные ранее настройки для учетной записи потребителя reader
.
Подробнее о создании таблицы на движке Apache Kafka® см. в документации ClickHouse.
Отправьте тестовые данные в кластер Managed Service for Apache Kafka®
-
Создайте файл
sample.json
со следующими тестовыми данными:{ "device_id": "iv9a94th6rztooxh5ur2", "datetime": "2020-06-05 17:27:00", "latitude": 55.70329032, "longitude": 37.65472196, "altitude": 427.5, "speed": 0, "battery_voltage": 23.5, "cabin_temperature": 17, "fuel_level": null } { "device_id": "rhibbh3y08qmz3sdbrbu", "datetime": "2020-06-06 09:49:54", "latitude": 55.71294467, "longitude": 37.66542005, "altitude": 429.13, "speed": 55.5, "battery_voltage": null, "cabin_temperature": 18, "fuel_level": 32 } { "device_id": "iv9a94th6rztooxh5ur2", "datetime": "2020-06-07 15:00:10", "latitude": 55.70985913, "longitude": 37.62141918, "altitude": 417.0, "speed": 15.7, "battery_voltage": 10.3, "cabin_temperature": 17, "fuel_level": null }
-
Отправьте данные из файла
sample.json
в топикdatastore
кластера Managed Service for Apache Kafka® с помощьюjq
иkafkacat
:jq -rc . sample.json | kafkacat kafkacat -P \ -b rc1a-gjnru23feni92o5a.mdb.yandexcloud.net:9091 \ -t datastore \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=writer \ -X sasl.password=<пароль производителя> \ -X ssl.ca.location=<путь к корневому сертификату Yandex> -Z
Данные отправляются с помощью учетной записи производителя writer
. Подробнее о настройке SSL-сертификата и работе с kafkacat
см. в разделе Подключение к топикам в кластере Apache Kafka®.
Проверьте наличие тестовых данных в таблице кластера Managed Service for ClickHouse
Хотя можно напрямую считать данные из таблицы db1.kafka
, это не рекомендуется, т.к. каждое сообщение из топика может быть прочитано ClickHouse только один раз.
Практичнее создать материализованное представление (MATERIALIZED VIEW
) и использовать его для доступа к данным. Когда к таблице на движке Apache Kafka® присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Apache Kafka® и преобразовывать их в необходимый формат с помощью SELECT
.
Чтобы создать такое представление для таблицы db1.kafka
:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse с помощьюclickhouse-client
. -
Выполните запросы:
CREATE TABLE db1.kafka_data_source ( device_id String, datetime DateTime, latitude Float32, longitude Float32, altitude Float32, speed Float32, battery_voltage Nullable(Float32), cabin_temperature Float32, fuel_level Nullable(Float32) ) ENGINE = MergeTree() ORDER BY device_id; CREATE MATERIALIZED VIEW db1.data_view TO db1.kafka_data_source AS SELECT * FROM db1.kafka;
Чтобы получить все данные из материализованного представления db1.data_view
:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse с помощьюclickhouse-client
. -
Выполните запрос:
SELECT * FROM db1.data_view;
После выполнения запроса вы должны получить отправленные в Managed Service for Apache Kafka® данные в табличном виде.
Подробнее о работе с данными, поставляемыми из Apache Kafka®, см. в документации ClickHouse.