Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
© 2022 ООО «Яндекс.Облако»
Yandex Data Streams
  • Начало работы
    • Подготовка окружения
    • Создание потока данных
    • Сбор и поставка данных
      • Fluentd
      • Logstash
      • AWS CLI
    • Сохранение данных в ClickHouse
    • Обработка потока изменений из Debezium
  • Пошаговые инструкции
    • Все инструкции
    • Управление потоками данных
    • Работа с AWS CLI
      • Создание потока данных
      • Получение информации о потоке
      • Отправка данных в поток
      • Чтение данных из потока
      • Удаление потока данных
    • Работа с AWS SDK
      • Подготовка окружения
      • Создание потока данных
      • Отправка данных в поток
      • Чтение данных из потока
      • Удаление потока данных
  • Концепции
    • Обзор
    • Термины и определения
    • Cегменты и ключи сегментов
    • Квоты и лимиты
    • Сравнение с Yandex Message Queue
  • Практические руководства
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
  • Управление доступом
  • Правила тарификации
  • HTTP API, совместимый с Amazon Kinesis Data Streams
    • Обзор
    • Методы
      • CreateStream
      • DecreaseStreamRetentionPeriod
      • DeleteStream
      • DescribeStream
      • IncreaseStreamRetentionPeriod
      • GetRecords
      • GetShardIterator
      • ListStreams
      • PutRecord
      • PutRecords
      • UpdateShardCount
    • Общие ошибки
    • Примеры
  • Вопросы и ответы
  1. Начало работы
  2. Обработка потока изменений из Debezium

Обработка потока изменений Debezium

Статья создана
Yandex Cloud
  • Настройка
    • Создание потока данных
    • Настройка реквизитов подключения к Yandex Data Streams
    • Настройка Debezium Server
  • Настроить триггер в Cloud Functions

Debezium — это сервис для захвата изменений в базах данных (Change Data Capture) и отправки их на обработку в другие системы. С помощью Yandex Data Streams можно захватывать эти изменения и отправлять их в Cloud Functions.

Ниже приведена архитектура решения:
debezium

Настройка

Для получения потока данных необходимо:

  1. Создать поток данных Yandex Data Streams.
  2. Настроить реквизиты подключения к Yandex Data Streams
  3. Настроить и запустить Debezium Server.
  4. Настроить триггер в Cloud Functions для обработки данных.

Создание потока данных

Создайте поток данных Yandex Data Streams с именем debezium. Процедура создания потока данных подробно описана в документации Yandex Data Streams

Настройка реквизитов подключения к Yandex Data Streams

  1. Войдите в консоль управления. Если вы еще не зарегистрированы, перейдите в консоль управления и следуйте инструкциям.
  2. На странице биллинга убедитесь, что у вас подключен платежный аккаунт, и он находится в статусе ACTIVE или TRIAL_ACTIVE. Если платежного аккаунта нет, создайте его.
  3. Если у вас еще нет каталога, создайте его.
  4. Создайте сервисный аккаунт и назначьте ему роль editor на ваш каталог.
  5. Создайте статический ключ доступа.
  6. Настройте AWS CLI:
    1. Установите AWS CLI и выполните команду:

      aws configure
      
    2. Последовательно введите:

      • AWS Access Key ID [None]: — идентификатор ключа сервисного аккаунта.
      • AWS Secret Access Key [None]: — секретный ключ сервисного аккаунта.
      • Default region name [None]: — зону доступности ru-central1.

Настройка Debezium Server

  1. В данном примере рассматривается взаимодействие Debezium и PostgreSQL. Далее будем считать, что Debezium будет устанавливаться на сервере, где запущен PostgreSQL. Установите Debezium server по инструкции.

  2. Перейдите в каталог conf и создайте файл application.properties со следующим содержимым:

    debezium.sink.type=kinesis
    debezium.sink.kinesis.region=ru-central1
    debezium.sink.kinesis.endpoint=<YDS_STREAM_ENDPOINT>
    debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
    debezium.source.offset.storage.file.filename=data/offsets.dat
    debezium.source.offset.flush.interval.ms=0
    debezium.source.database.hostname=localhost
    debezium.source.database.port=5432
    debezium.source.database.user=<DATABASE_USER>
    debezium.source.database.password=<DATABASE_PASSWORD>
    debezium.source.database.dbname=<DATABASE_NAME>
    debezium.source.database.server.name=debezium
    debezium.source.plugin.name=pgoutput
    
    debezium.source.transforms=Reroute
    debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
    debezium.source.transforms.Reroute.topic.regex=(.*)
    debezium.source.transforms.Reroute.topic.replacement=<YDS_STREAM_NAME>
    

    Где:

    • <YDS_STREAM_ENDPOINT> - полный endpoint к потоку данных Data Streams, например, https://yds.serverless.yandexcloud.net/ru-central1/b1п89ae43m6he2ooql88r/etn01eg4rn184nemdbb. Полный endpoint доступен в UI Yandex Data Streams. Для просмотра полного endpoint'a нажмите кнопку Подключиться.
    • <YDS_STREAM_NAME> - название потока данных Data Streams.
    • <DATABASE_NAME> - название базы данных PostgreSQL.
    • <DATABASE_USER> - имя пользователя для подключения к базе данных PostgreSQL.
    • <DATABASE_PASSWORD> - пароль пользователя для подключения к базе данных PostgreSQL.
  3. Запустите Debezium следующей командой:

    JAVA_OPTS=-Daws.cborEnabled=false ./run.sh
    
  4. Выполните какие-либо изменения в базе данных PostgreSQL, например, вставьте данные в таблицу.

  5. При правильной настройке в консоли Debezium появятся сообщения вида:

    2022-02-11 07:31:12,850 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
    

Настроить триггер в Cloud Functions

Создайте триггер в Cloud Functions к потоку данных Yandex Data Streams debezium, который был создан выше.

Процедура создания триггера подробно описана в документации Cloud Functions.

В триггер Cloud Functions будут отправлять нотификации обо всех изменениях в базе данных. В коде триггера вы можете обработать эти изменения, реализовав любую необходимую программную обработку.

Была ли статья полезна?

Language / Region
© 2022 ООО «Яндекс.Облако»
В этой статье:
  • Настройка
  • Создание потока данных
  • Настройка реквизитов подключения к Yandex Data Streams
  • Настройка Debezium Server
  • Настроить триггер в Cloud Functions