Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
Практические руководства
  • Веб-сервис
    • Все руководства
    • Статический сайт в Object Storage
    • Сайт на LAMP- или LEMP-стеке
    • Отказоустойчивый сайт с балансировкой нагрузки с помощью Network Load Balancer
    • Отказоустойчивый сайт с балансировкой нагрузки с помощью Application Load Balancer
    • Сайт на базе Joomla с БД PostgreSQL
    • Создание сайта на WordPress
    • Сайт на WordPress с БД MySQL
    • Перенос WordPress сайта с хостинга в Yandex Cloud
    • Сайт на базе 1С-Битрикс
    • Организация виртуального хостинга
    • Создание балансировщика с защитой от DDoS
    • Публикация обновлений для игр с помощью Cloud CDN
    • Интеграция L7-балансировщика с Cloud CDN и Object Storage
    • Сине-зеленое и канареечное развертывание версий сервиса
    • Терминирование TLS-соединений
  • Интернет-магазины
    • Все руководства
    • Интернет-магазин на 1С-Битрикс
    • Интернет-магазин на OpenCart
  • Архив данных
    • Все руководства
    • Однонодовый файловый сервер
    • Настройка SFTP-сервера на Centos 7
    • Резервное копирование в Object Storage через Acronis
    • Резервное копирование в Object Storage с помощью CloudBerry Desktop Backup
    • Резервное копирование в Object Storage через Duplicati
    • Резервное копирование в Object Storage с помощью Bacula
    • Резервное копирование в Object Storage с помощью Veritas Backup Exec
    • Распознавание архива изображений в Vision
  • Тестовая среда
    • Все руководства
    • Тестирование приложений с помощью GitLab
    • Создание тестовых ВМ через GitLab CI
    • Высокопроизводительные вычисления на прерываемых ВМ
    • Эмуляция множества IoT-устройств
    • Нагрузочное тестирование gRPC-сервиса
    • Развертывание и нагрузочное тестирование gRPC-сервиса с масштабированием
    • HTTPS-тест с постоянной нагрузкой с помощью Phantom
    • HTTPS-тест со ступенчатой нагрузкой с помощью Pandora
    • Нагрузочное тестирование с нескольких агентов
  • Управление инфраструктурой
    • Все руководства
    • Начало работы с Terraform
    • Загрузка состояний Terraform в Object Storage
    • Начало работы с Packer
    • Сборка образа ВМ с набором инфраструктурных инструментов с помощью Packer
    • Автоматизация сборки образов с помощью Jenkins и Packer
    • Непрерывное развертывание контейнеризованных приложений с помощью GitLab
    • Создание кластера Linux-серверов «1С:Предприятия» с кластером Managed Service for PostgreSQL
    • Миграция в Yandex Cloud с помощью Hystax Acura
    • Защита от сбоев с помощью Hystax Acura
    • Настройка синхронизации часов с помощью NTP
    • Работа с группой ВМ с автомасштабированием
    • Масштабирование группы ВМ по расписанию
    • Автомасштабирование группы ВМ для обработки сообщений из очереди Message Queue
    • Обновление группы ВМ под нагрузкой
    • Передача логов с ВМ в Cloud Logging
    • Резервное копирование ВМ с помощью Hystax Acura Backup
    • Настройка отказоустойчивой архитектуры в Yandex Cloud
    • Создание SAP-программы в Yandex Cloud
    • Настройка локального кеширующего DNS-резолвера
    • Миграция DNS-зон из Яндекс 360 в Cloud DNS
    • Интеграция Cloud DNS и корпоративного сервиса DNS
    • Создание веб-хука резолвера ACME для ответов на DNS01-проверки
    • Запись логов балансировщика в PostgreSQL
    • Создание триггера для бюджетов, который вызывает функцию для остановки ВМ
  • Построение Data Platform
    • Все руководства
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse с помощью Data Transfer
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Управление схемами данных в Managed Service for Apache Kafka®
    • Использование Managed Schema Registry с Managed Service for Apache Kafka®
    • Использование Confluent Schema Registry с Managed Service for Apache Kafka®
    • Миграция базы данных из MySQL в ClickHouse с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse
    • Обмен данными между Managed Service for ClickHouse и Data Proc
    • Настройка Managed Service for ClickHouse для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse с помощью Data Transfer
    • Получение данных из RabbitMQ в Managed Service for ClickHouse
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse
    • Использование гибридного хранилища в Managed Service for ClickHouse
    • Шардирование таблиц Managed Service for ClickHouse
    • Настройка Cloud DNS для доступа к кластерам управляемых баз данных из других облачных сетей
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse из других облачных сетей
    • Обмен данными между Managed Service for ClickHouse и Data Proc
    • Импорт данных из Managed Service for MySQL в Data Proc с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Data Proc с помощью Sqoop
    • Использование скриптов инициализации для настройки GeeseFS в Data Proc
    • Миграция данных из стороннего кластера Elasticsearch в Managed Service for Elasticsearch с помощью Reindex API
    • Миграция коллекций из стороннего кластера MongoDB в Managed Service for MongoDB
    • Миграция данных в Managed Service for MongoDB
    • Шардирование коллекций MongoDB
    • Анализ производительности и оптимизация MongoDB
    • Миграция БД из стороннего кластера MySQL в кластер Managed Service for MySQL
    • Анализ производительности и оптимизация Managed Service for MySQL
    • Синхронизация данных из стороннего кластера MySQL в Managed Service for MySQL с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL в сторонний кластер MySQL
    • Миграция БД из Managed Service for MySQL в Object Storage с помощью Data Transfer
    • Импорт данных из Managed Service for MySQL в Data Proc с помощью Sqoop
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL в Managed Service for YDB с помощью Data Transfer
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Анализ производительности и оптимизация Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Асинхронная репликация данных из PostgreSQL в ClickHouse
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Импорт данных из Managed Service for PostgreSQL в Data Proc с помощью Sqoop
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Миграция БД из Greenplum® в ClickHouse
    • Миграция БД из Greenplum® в PostgreSQL
    • Миграция БД из стороннего кластера Redis в Managed Service for Redis
    • Использование кластера Managed Service for Redis в качестве хранилища сессий PHP
  • Продукты Microsoft в Yandex Cloud
    • Все руководства
    • Развертывание Active Directory
    • Развертывание Microsoft Exchange
    • Развертывание Remote Desktop Services
    • Развертывание группы доступности Always On с внутренним сетевым балансировщиком
    • Развертывание Remote Desktop Gateway
  • Сетевая инфраструктура
    • Все руководства
    • Архитектура и защита базового интернет-сервиса
    • Настройки DHCP для работы с корпоративным DNS-сервером
    • Маршрутизация с помощью NAT-инстанса
    • Создание туннеля IPSec VPN
    • Установка виртуального роутера Cisco CSR 1000v
    • Установка виртуального роутера Mikrotik CHR
    • Соединение с облачной сетью при помощи OpenVPN
    • Создание и настройка шлюза UserGate в режиме прокси-сервера
    • Создание и настройка шлюза UserGate в режиме межсетевого экрана
    • Настройка сети для Data Proc
  • Визуализация и анализ данных
    • Все руководства
    • Визуализация данных из файла
    • Создание и публикация диаграммы с картой Москвы из CSV-файла
    • Анализ продаж сети магазинов из БД ClickHouse
    • Анализ открытых данных ДТП на дорогах России
    • Анализ продаж и локаций пиццерий на данных из БД ClickHouse и Cloud Marketplace
    • Веб-аналитика с подключением к Яндекс Метрике
    • Веб-аналитика с расчетом воронок и когорт на данных Яндекс Метрики
    • Аналитика мобильного приложения на данных AppMetrica
    • Анализ статистики подкастов Яндекс Музыки (для авторов подкастов)
    • Визуализация данных с помощью QL-чарта
    • Анализ customer journey мобильного приложения на данных AppMetrica
    • Анализ логов Object Storage при помощи DataLens
  • Интернет вещей
    • Руководства по работе с интернетом вещей
    • Мониторинг состояния географически распределенных устройств
    • Мониторинг показаний датчиков и уведомления о событиях
  • Бессерверные технологии
    • Сокращатель ссылок
    • Ввод данных в системы хранения
    • Хранение журналов работы приложения
    • Развертывание веб-приложения с использованием Java Servlet API
    • Разработка Slack-бота
    • Разработка Telegram-бота
    • Разработка пользовательской интеграции в API Gateway
    • Разработка CRUD API для сервиса фильмов
    • Разработка навыка Алисы и сайта с авторизацией
  1. Построение Data Platform
  2. Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer

Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Статья создана
Yandex Cloud
,
улучшена
Dmitry A.
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в кластере-источнике Managed Service for PostgreSQL и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Чтобы настроить CDC с использованием сервиса Data Transfer:

  1. Подготовьте кластер-источник.
  2. Подготовьте кластер-приемник.
  3. Подготовьте и активируйте трансфер.
  4. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Перед началом работы

  1. Создайте кластер-источник Managed Service for PostgreSQL любой подходящей конфигурации со следующими настройками:

    • с базой данных db1;
    • с пользователем pg-user;
    • с хостами в публичном доступе.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

  3. Настройте группы безопасности кластеров, чтобы к ним можно было подключаться из интернета:

    • Инструкция для Managed Service for PostgreSQL.
    • Инструкция для Managed Service for Apache Kafka®.
  4. Установите на локальный компьютер утилиту kcat (kafkacat) и клиент командной строки PostgreSQL. Например, в Ubuntu 20.04 выполните команду:

    sudo apt update && sudo apt install kafkacat postgresql-client --yes
    

Подготовьте кластер-источник

  1. Чтобы сервис Data Transfer мог получать от кластера Managed Service for PostgreSQL уведомления об изменениях в данных, в кластере-источнике необходимо создать публикацию (publication). Чтобы пользователь pg-user мог создать публикацию, назначьте ему роль mdb_replication.

  2. Подключитесь к базе данных db1 от имени пользователя pg-user.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.

    Создайте таблицу:

    CREATE TABLE public.measurements (
        "device_id" text PRIMARY KEY NOT NULL,
        "datetime" timestamp NOT NULL,
        "latitude" real NOT NULL,
        "longitude" real NOT NULL,
        "altitude" real NOT NULL,
        "speed" real NOT NULL,
        "battery_voltage" real,
        "cabin_temperature" real NOT NULL,
        "fuel_level" real
    );
    

    Наполните таблицу данными:

    INSERT INTO public.measurements VALUES
        ('iv9a94th6rztooxh5ur2', '2020-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
        ('rhibbh3y08qmz3sdbrbu', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
        ('iv9a94th678tooxh5ur2', '2020-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
    

Подготовьте кластер-приемник

Настройки различаются в зависимости от используемого способа управления топиками. При этом имена топиков для данных конструируются по тому же принципу, что и в Debezium — <префикс топика>.<имя схемы>.<имя таблицы>. В этом руководстве в качестве примера будет использоваться префикс cdc.

Интерфейсы Yandex Cloud
Admin API

Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, YC CLI, Terraform, API):

  1. Создайте топик с именем cdc.public.measurements.

    Если необходимо отслеживать изменения в нескольких таблицах, создайте для каждой из них отдельный топик.

  2. Создайте пользователя с именем kafka-user и ролями ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER, действующими на созданные топики.

Если для управления топиками используется Kafka Admin API:

  1. Создайте пользователя-администратора с именем kafka-user.

  2. Помимо роли ACCESS_ROLE_ADMIN назначьте пользователю-администратору роли ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER на топики, имена которых начинаются с префикса cdc.

    Необходимые топики будут созданы автоматически при первом событии изменения в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запас свободного места в хранилище кластера. Подробнее см. в разделе Типы дисков в Managed Service for Apache Kafka®.

Подготовьте и активируйте трансфер

  1. Создайте эндпоинты.

    • Эндпоинт для источника:

      • Тип базы данных — PostgreSQL.
      • Параметры эндпоинта:
        • Настройки подключения — Кластер MDB.
        • Кластер MDB — выберите созданный ранее кластер Managed Service for PostgreSQL.
        • Имя базы данных — db1.
        • Имя пользователя — pg-user.
        • Пароль — укажите пароль пользователя pg-user.
        • Список включенных таблиц — public.measurements.
    • Эндпоинт для приемника:

      • Тип базы данных — Kafka.

      • Параметры эндпоинта:

        • Подключение — Managed Kafka.

        • Managed Kafka:

          • ID кластера Managed Kafka — выберите кластер-приемник.
          • Аутентификация — укажите данные созданного ранее пользователя kafka-user.
        • Настройки кафка-топика — Полное имя топика.

        • Полное имя топика — cdc.public.measurements.

        Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:

        • Настройки кафка-топика — Префикс топика.
        • Префикс топика — укажите префикс cdc, использованный при формировании имен топиков.
  2. Создайте трансфер со следующими настройками:

    • Эндпоинты:
      • Источник — созданный ранее эндпоинт для источника.
      • Приемник — созданный ранее эндпоинт для приемника.
    • Тип трансфера — Репликация.
  3. Активируйте трансфер и дождитесь его перехода в статус Реплицируется.

Проверьте работоспособность трансфера

  1. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN хоста-брокера-1>:9091,...,<FQDN хоста-брокера N>:9091 \
        -t cdc.public.measurements \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=kafka-user \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt \
        -Z \
        -K:
    

    FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.

    Будет выведена схема формата данных таблицы public.measurements и данные о добавленных в нее ранее строках.

    Пример фрагмента сообщения
    {
      "payload": {
        "consumer":"dttuhfpp97l30jaka3ql"
      },
      "schema": {
        "fields": [
          {
            "field": "consumer",
            "optional":false,
            "type":"string"
          }
        ],
        "name": "__data_transfer_stub.public.__consumer_keeper.Key",
        "optional":false,
        "type":"struct"
      }
    }:{
      "payload": {
        "after": {
          "consumer":"dttuhfpp97l30jaka3ql",
          "locked_by":"dttuhfpp97l30jaka3ql-1",
          "locked_till":"2022-05-15T09:55:18Z"
        },
      "before": null,
      "op":"u",
      "source": {
        "connector":"postgresql",
        "db":"db1",
        "lsn":85865797008,
        "name":"__data_transfer_stub",
        "schema":"public",
        "snapshot":"false",
        "table":"__consumer_keeper",
        "ts_ms":1652608518883,
        "txId":245165,
        "version":"1.1.2.Final",
        "xmin":null
      },
    ...
    
  2. Подключитесь к кластеру-источнику и добавьте данные в таблицу measurements:

    INSERT INTO public.measurements VALUES ('iv7b74th678tooxh5ur2', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
    
  3. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились сведения о добавленной строке.

Удалите созданные ресурсы

Если созданные ресурсы вам больше не нужны, удалите их:

  1. Деактивируйте и удалите трансфер.

  2. Удалите эндпоинты.

  3. Удалите кластеры:

    • Managed Service for Apache Kafka®.
    • Managed Service for PostgreSQL.
  4. Если для доступа к хостам кластеров использовались статические публичные IP-адреса, освободите и удалите их.

Была ли статья полезна?

Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы