Получение данных из RabbitMQ
- Перед началом работы
- Настройте интеграцию RabbitMQ для кластера Managed Service for ClickHouse
- Изучите формат данных, поступающих от RabbitMQ
- Создайте в кластере Managed Service for ClickHouse таблицу на движке RabbitMQ
- Отправьте тестовые данные в очередь RabbitMQ
- Проверьте наличие тестовых данных в таблице кластера Managed Service for ClickHouse
В кластер Managed Service for ClickHouse можно поставлять данные из RabbitMQ в реальном времени. Managed Service for ClickHouse будет автоматически вставлять в таблицу на движке RabbitMQ данные, поступающие на определенные точки обмена указанных очередей RabbitMQ.
Чтобы настроить поставку данных из RabbitMQ в Managed Service for ClickHouse:
- Настройте интеграцию с RabbitMQ для кластера Managed Service for ClickHouse.
- Изучите формат данных, поступающих от RabbitMQ.
- Создайте в кластере Managed Service for ClickHouse таблицу на движке RabbitMQ.
- Отправьте тестовые данные в очередь RabbitMQ.
- Проверьте наличие тестовых данных в таблице кластера Managed Service for ClickHouse.
Перед началом работы
-
Создайте кластер Managed Service for ClickHouse любой подходящей вам конфигурации с базой данных
db1
и публичным доступом ко всем его хостам.Примечание
Интеграцию с RabbitMQ можно настроить уже на этапе создания кластера. В этом сценарии использования интеграция будет настроена позже.
-
Установите утилиты
amqp-publish
иamqp-declare-queue
для работы с RabbitMQ. Убедитесь, что можете создать в RabbitMQ очередьcars
с помощьюamqp-declare-queue
:amqp-declare-queue \ --url=amqp://<имя пользователя RabbitMQ>:<пароль>@<FQDN хоста RabbitMQ>:5672 \ --queue=cars
-
Установите утилиту
clickhouse-client
и убедитесь, что можете с ее помощью подключиться к кластеру ClickHouse через SSL. -
Установите утилиту для потоковой обработки JSON-файлов jq.
Настройте интеграцию RabbitMQ для кластера Managed Service for ClickHouse
Укажите в настройках кластера Managed Service for ClickHouse имя пользователя и пароль для аутентификации в RabbitMQ (секция Настройки СУБД → Rabbitmq).
Изучите формат данных, поступающих от RabbitMQ
Чтобы обработать данные, поступающие от RabbitMQ, нужно понимать, каким образом они представлены в сообщениях, и соответствующим образом настроить движок RabbitMQ при создании таблицы в кластере Managed Service for ClickHouse.
В качестве примера в очередь cars
на точке обмена exchange
в RabbitMQ будут помещены данные от сенсоров автомобиля в формате JSON:
- строковый идентификатор устройства
device_id
; - дата и время формирования данных
datetime
в форматеYYYY-MM-DD HH:MM:SS
; - координаты автомобиля:
- широта
latitude
; - долгота
longitude
; - высота над уровнем моря
altitude
;
- широта
- текущая скорость
speed
; - напряжение батарей
battery_voltage
(для электромобиля, для автомобиля с ДВС значение этого параметра —null
); - температура в салоне
cabin_temperature
; - уровень топлива
fuel_level
(для автомобиля с ДВС, для электромобиля значение этого параметра —null
).
Эти данные будут передаваться в виде сообщений RabbitMQ. Каждое такое сообщение будет содержать JSON-объект как строку следующего вида:
{"device_id":"iv9a94th6rztooxh5ur2","datetime":"2020-06-05 17:27:00","latitude":"55.70329032","longitude":"37.65472196","altitude":"427.5","speed":"0","battery_voltage":"23.5","cabin_temperature":"17","fuel_level":null}
Кластер Managed Service for ClickHouse будет использовать при вставке в таблицу формат данных JSONEachRow, который позволяет преобразовать строковое представление JSON-объекта из сообщения RabbitMQ в нужный набор значений столбцов.
Создайте в кластере Managed Service for ClickHouse таблицу на движке RabbitMQ
Исходя из формата данных, приведенного выше, создайте в кластере Managed Service for ClickHouse таблицу, в которую будут заноситься поступающие от RabbitMQ данные:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse с помощьюclickhouse-client
. -
Выполните запрос:
CREATE TABLE IF NOT EXISTS db1.cars ( device_id String, datetime DateTime, latitude Float32, longitude Float32, altitude Float32, speed Float32, battery_voltage Nullable(Float32), cabin_temperature Float32, fuel_level Nullable(Float32) ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = '<IP-адрес или FQDN сервера RabbitMQ>:5672', rabbitmq_routing_key_list='cars', rabbitmq_exchange_name = 'exchange', rabbitmq_format = 'JSONEachRow';
Эта таблица будет автоматически наполняться сообщениями, считываемыми из очереди cars
в точке обмена exchange
RabbitMQ. При чтении данных Managed Service for ClickHouse использует указанные ранее данные для аутентификации.
Подробнее о создании таблицы на движке RabbitMQ см. в документации ClickHouse.
Отправьте тестовые данные в очередь RabbitMQ
-
Создайте файл
sample.json
со следующими тестовыми данными:{ "device_id": "iv9a94th6rztooxh5ur2", "datetime": "2020-06-05 17:27:00", "latitude": 55.70329032, "longitude": 37.65472196, "altitude": 427.5, "speed": 0, "battery_voltage": 23.5, "cabin_temperature": 17, "fuel_level": null } { "device_id": "rhibbh3y08qmz3sdbrbu", "datetime": "2020-06-06 09:49:54", "latitude": 55.71294467, "longitude": 37.66542005, "altitude": 429.13, "speed": 55.5, "battery_voltage": null, "cabin_temperature": 18, "fuel_level": 32 } { "device_id": "iv9a94th6rztooxh5ur2", "datetime": "2020-06-07 15:00:10", "latitude": 55.70985913, "longitude": 37.62141918, "altitude": 417.0, "speed": 15.7, "battery_voltage": 10.3, "cabin_temperature": 17, "fuel_level": null }
-
Отправьте в созданную ранее очередь
cars
в точку обменаexchange
данные из файлаsample.json
с помощьюjq
иamqp-publish
.jq --raw-output --compact-output . ./sample.json | amqp-publish \ --url=amqp://<имя пользователя RabbitMQ>:<пароль>@<FQDN хоста RabbitMQ>:5672 \ --routing-key=cars \ --exchange=exchange
Проверьте наличие тестовых данных в таблице кластера Managed Service for ClickHouse
Создайте материализованное представление (MATERIALIZED VIEW
) и используйте его для доступа к данным. Когда к таблице на движке RabbitMQ присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от RabbitMQ и преобразовывать их в необходимый формат с помощью SELECT
.
Примечание
Хотя можно напрямую считать данные из таблицы db1.cars
, это не рекомендуется, т. к. каждое сообщение из очереди может быть прочитано ClickHouse только один раз.
Чтобы создать материализованное представление для таблицы db1.cars
:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse с помощьюclickhouse-client
. -
Выполните запросы:
CREATE TABLE IF NOT EXISTS db1.cars_data_source ( device_id String, datetime DateTime, latitude Float32, longitude Float32, altitude Float32, speed Float32, battery_voltage Nullable(Float32), cabin_temperature Float32, fuel_level Nullable(Float32) ) ENGINE MergeTree() ORDER BY device_id; CREATE MATERIALIZED VIEW db1.cars_view TO db1.cars_data_source AS SELECT * FROM db1.cars;
Чтобы получить все данные из материализованного представления db1.cars_view
:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse с помощьюclickhouse-client
. -
Выполните запрос:
SELECT * FROM db1.cars_view;
После выполнения запроса вы должны получить отправленные в RabbitMQ данные в табличном виде.
Подробнее о работе с данными, поставляемыми из RabbitMQ, см. в документации ClickHouse.