Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
© 2022 ООО «Яндекс.Облако»
Практические руководства
  • Веб-сервис
    • Все руководства
    • Cтатический сайт в Object Storage
    • Cайт на LAMP- или LEMP-стеке
    • Отказоустойчивый сайт с балансировкой нагрузки через Network Load Balancer
    • Отказоустойчивый сайт с балансировкой нагрузки через Application Load Balancer
    • Сайт на базе Joomla с БД PostgreSQL
    • Сайт на WordPress
    • Сайт на WordPress с БД MySQL
    • Перенос WordPress сайта с хостинга в Yandex Cloud
    • Веб-сайт на базе 1С-Битрикс
    • Интеграция L7-балансировщика с Cloud CDN и Object Storage
    • Сине-зеленое и канареечное развертывание версий сервиса
  • Интернет-магазины
    • Все руководства
    • Интернет-магазин на 1С-Битрикс
    • Интернет-магазин на Opencart
  • Архив данных
    • Все руководства
    • Однонодовый файловый сервер
    • Настройка SFTP-сервера на Centos 7
    • Резервное копирование в Object Storage через Acronis
    • Резервное копирование в Object Storage через CloudBerry Desktop Backup
    • Резервное копирование в Object Storage через Duplicati
    • Резервное копирование в Object Storage через Bacula
    • Резервное копирование в Object Storage через Veritas Backup Exec
    • Оцифровка архива в Yandex Vision
  • Тестовая среда
    • Все руководства
    • Тестирование приложений с помощью GitLab
    • Создание тестовых ВМ через GitLab CI
    • Высокопроизводительные вычисления на прерываемых виртуальных машинах
    • Эмуляция множества IoT-устройств
    • Нагрузочное тестирование gRPC-сервиса
    • HTTPS-тест с постоянной нагрузкой с помощью Phantom
    • HTTPS-тест со ступенчатой нагрузкой с помощью Pandora
  • Управление инфраструктурой
    • Все руководства
    • Начало работы с Terraform
    • Загрузка состояний Terraform в Object Storage
    • Начало работы с Packer
    • Сборка образа ВМ с набором инфраструктурных инструментов с помощью Packer
    • Автоматизация сборки образов ВМ с помощью Jenkins
    • Непрерывное развертывание контейнеризованных приложений с помощью GitLab
    • Создание кластера Linux-серверов «1С:Предприятия» с кластером Managed Service for PostgreSQL
    • Создание кластера Windows-серверов «1С:Предприятия» с базой данных SQL Server
    • Миграция в Yandex Cloud с помощью Hystax Acura
    • Защита от сбоев с помощью Hystax Acura
    • Настройка отказоустойчивой архитектуры в Yandex Cloud
    • Создание SAP-программы в Yandex Cloud
  • Построение Data Platform
    • Все руководства
    • Синхронизация данных из MySQL с помощью Yandex Data Transfer
    • Миграция базы данных из Yandex Managed Service for MySQL в MySQL
    • Настройка управляемой базы данных в кластере ClickHouse для Graphite
    • Обмен данными между Yandex Managed Service for ClickHouse и Yandex Data Proc
    • Импорт базы данных в Yandex Data Proc с использованием Sqoop
    • Использование Confluent Schema Registry с Yandex Managed Service for Apache Kafka®
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Миграция данных в Yandex Managed Service for Apache Kafka®
    • Перенос коллекций из MongoDB в Yandex Managed Service for MongoDB
    • Миграция базы данных в Yandex Managed Service for SQL Server
    • Перенос данных из PostgreSQL в ClickHouse с помощью Yandex Data Transfer
    • Настройка Kafka Connect для работы с кластером Yandex Managed Service for Apache Kafka®
    • Настройка Yandex Cloud DNS для доступа к кластерам управляемых баз данных из других облачных сетей
    • Миграция в Yandex Managed Service for Elasticsearch с помощью Reindex API
    • Использование скриптов инициализации для настройки GeeseFS в Yandex Data Proc
  • Windows в Yandex Cloud
    • Все руководства
    • Развертывание Active Directory
    • Развертывание Microsoft Exchange
    • Развертывание Remote Desktop Services
    • Развертывание группы доступности Always On
    • Развертывание группы доступности Always On с внутренним сетевым балансировщиком
    • Развертывание Remote Desktop Gateway
  • Сетевая маршрутизация
    • Все руководства
    • Маршрутизация с помощью NAT-инстанса
    • Создание VPN-туннеля
    • Установка виртуального роутера Cisco CSR1000v
    • Установка виртуального роутера Mikrotik CHR
    • Соединение с облачной сетью при помощи OpenVPN
    • Настройка сети для Yandex Data Proc
  • Визуализация и анализ данных
    • Все руководства
    • Визуализация данных из CSV-файла
    • Создание и публикация диаграммы с картой Москвы из CSV-файла
    • Анализ продаж сети магазинов из БД ClickHouse
    • Анализ открытых данных ДТП на дорогах России
    • Анализ продаж и локаций пиццерий на данных из БД ClickHouse и Marketplace
    • Веб-аналитика с подключением к Яндекс Метрике
    • Веб-аналитика с расчетом воронок и когорт на данных Яндекс Метрики
    • Аналитика мобильного приложения на данных AppMetrica
    • Анализ статистики подкастов Яндекс Музыки (для авторов подкастов)
    • Визуализация данных с помощью SQL-чарта
    • Анализ customer journey мобильного приложения на данных AppMetrica
    • Анализ логов Object Storage при помощи DataLens
  • Интернет вещей
    • Руководства по работе с интернетом вещей
    • Мониторинг состояния географически распределенных устройств
    • Мониторинг показаний датчиков и уведомления о событиях
  • Бессерверные технологии
    • Сокращатель ссылок
    • Ввод данных в системы хранения
    • Хранение журналов работы приложения
  1. Построение Data Platform
  2. Миграция данных в Yandex Managed Service for Apache Kafka®

Миграция данных в Yandex Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
  • Перенос данных с использованием сервиса Yandex Managed Service for Apache Kafka® Connector
    • Перед началом работы
    • Создайте коннектор
    • Проверьте наличие данных в топике кластера-приемника
  • Перенос данных с использованием утилиты MirrorMaker
    • Перед началом работы
    • Настройте конфигурацию MirrorMaker
    • Запустите репликацию
    • Проверьте наличие данных в топике кластера-приемника
    • Удалите созданные ресурсы

Перенести топики из кластера-источника Apache Kafka® в кластер-приемник Managed Service for Apache Kafka® можно двумя способами:

  • С помощью встроенного в Yandex Managed Service for Apache Kafka® MirrorMaker-коннектора.

    Этот способ прост в настройке и не требует создания промежуточной виртуальной машины.

  • С помощью утилиты MirrorMaker 2.0.

    Для этого потребуется самостоятельная инсталляция и настройка утилиты на промежуточной виртуальной машине. Используйте этот способ только в том случае, если перенос данных с помощью встроенного MirrorMaker-коннектора по каким-либо причинам невозможен.

Перенос данных с использованием сервиса Yandex Managed Service for Apache Kafka® Connector

  1. Создайте коннектор.
  2. Проверьте наличие данных в топике кластера-приемника.

Перед началом работы

  1. Подготовьте кластер-приемник:

    • Включите управление топиками через Admin API.
    • Создайте учетную запись администратора с именем admin-cloud.
    • Включите настройку Auto create topics enable.
    • Настройте группы безопасности, если это требуется для подключения к кластеру-приемнику.
  2. Создайте в кластере-источнике учетную запись admin-source с правом управления топиками через Admin API.

  3. Настройте в кластере-источнике межсетевой экран (firewall), если это требуется для подключения извне к кластеру.

Создайте коннектор

Создайте для кластера-приемника коннектор с типом MirrorMaker и настройками:

  • Топики — список топиков, которые нужно перенести. Также можно указать регулярное выражение для выбора топиков. Для переноса всех топиков укажите .*.

  • В блоке Кластер-источник укажите параметры для подключения к кластеру-источнику:

    • Псевдоним — префикс для обозначения кластера-источника в настройках коннектора. По умолчанию source. Топики в кластере-приемнике будут созданы с указанным префиксом.
    • Бутстрап-серверы — список FQDN хостов-брокеров кластера-источника с номерами портов для подключения, разделенный запятыми.
    • SASL имя пользователя, SASL пароль — имя и пароль созданной ранее учетной записи admin-source.
    • SASL механизм — механизм шифрования имени и пароля SCRAM-SHA-512.
    • Протокол безопасности — выберите протокол подключения коннектора:
      • SASL_PLAINTEXT — если к кластеру-источнику подключаются без SSL;
      • SASL_SSL – если к кластеру-источнику подключаются с SSL.
  • В блоке Кластер-приемник выберите опцию Использовать этот кластер.

После создания коннектор автоматически активируется и начнется перенос данных.

Проверьте наличие данных в топике кластера-приемника

  1. Подключитесь к топику кластера-приемника с помощью утилиты kafkacat. К имени топика кластера-источника добавьте префикс source: например, топик mytopic будет перенесен на кластер-приемник как source.mytopic.
  2. Убедитесь, что в консоли отображаются сообщения из топика кластера-источника.

Перенос данных с использованием утилиты MirrorMaker

  1. Настройте конфигурацию MirrorMaker.
  2. Запустите репликацию.
  3. Проверьте наличие данных в топике кластера-приемника.

Если созданные ресурсы вам больше не нужны, удалите их.

Перед началом работы

Подготовьте инфраструктуру

Вручную
С помощью Terraform
  1. Создайте кластер-приемник Managed Service for Apache Kafka®:

    • С включенным управлением топиками через Admin API.
    • С учетной записью администратора с именем admin-cloud.
    • С включенной настройкой Auto create topics enable.
  2. Создайте новую ВМ Linux для MirrorMaker в той же сети, к которой подключен кластер-приемник. Для подключения к виртуальной машине с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ при ее создании.

  1. Если у вас еще нет Terraform, установите его.

  2. Скачайте файл с настройками провайдера. Поместите его в отдельную рабочую директорию и укажите значения параметров.

  3. Скачайте в ту же рабочую директорию файл конфигурации kafka-mirror-maker.tf.

    В этом файле описаны:

    • сеть;
    • подсеть;
    • группа безопасности по умолчанию и правила, необходимые для подключения к кластеру и виртуальной машине из интернета;
    • кластер Managed Service for Apache Kafka® с включенным управлением топиками через Admin API, с учетной записью администратора admin-cloud и с включенной настройкой Auto create topics enable;
    • виртуальная машина с публичным доступом из интернета.
  4. Укажите в файле kafka-mirror-maker.tf:

    • Пароль учетной записи администратора Managed Service for Apache Kafka®.
    • Идентификатор публичного образа с Ubuntu без GPU. Например, для Ubuntu 20.04 LTS.
    • Логин и путь к файлу открытого ключа, которые будут использоваться для доступа к виртуальной машине. По умолчанию в используемом образе указанный логин игнорируется, вместо него создается пользователь с логином ubuntu. Используйте его для подключения к виртуальной машине.
  5. Выполните команду terraform init в директории с конфигурационным файлом. Эта команда инициализирует провайдеров, указанных в конфигурационных файлах, и позволяет работать с ресурсами и источниками данных провайдера.

  6. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  7. Создайте необходимую инфраструктуру:

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

Выполните дополнительные настройки

  1. Создайте в кластере-источнике учетную запись admin-source с правом управления топиками через Admin API.

  2. Подключитесь к виртуальной машине по SSH.

    1. Установите JDK:

      sudo apt update && sudo apt install --yes default-jdk
      
    2. Скачайте и распакуйте архив Apache Kafka® той же версии, которая установлена на кластере-приемнике. Например, для версии 2.8:

      wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz && \
      tar -xvf kafka_2.12-2.8.0.tgz
      
    3. Установите утилиту kafkacat:

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику и кластеру-приемнику через SSL.

  3. Настройте межсетевой экран (firewall) и группы безопасности, если это требуется для подключения MirrorMaker к кластеру-приемнику и кластеру-источнику.

Настройте конфигурацию MirrorMaker

  1. Подключитесь к ВМ с MirrorMaker по SSH.

  2. Скачайте SSL-сертификат для подключения к кластеру Managed Service for Apache Kafka®.

  3. В домашней директории создайте каталог mirror-maker для хранения сертификатов Java Keystore и конфигураций MirrorMaker:

    mkdir --parents /home/<домашняя директория>/mirror-maker
    
  4. Выберите пароль для хранилища сертификатов, создайте хранилище и добавьте в него SSL-сертификат для подключения к кластеру:

    sudo keytool --noprompt -importcert -alias YandexCA \
       -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
       -keystore /home/<домашняя директория>/mirror-maker/keystore \
       -storepass <пароль хранилища сертификатов, не короче 6 символов>
    
  5. Создайте в каталоге mirror-maker файл конфигурации MirrorMaker mm2.properties:

    # Kafka clusters
    clusters=cloud, source
    source.bootstrap.servers=<FQDN брокера кластера-источника>:9092
    cloud.bootstrap.servers=<FQDN брокера 1 кластера-приемника>:9091, ..., <FQDN брокера N кластера-приемника>:9091
    
    # Source and target cluster settings
    source->cloud.enabled=true
    cloud->source.enabled=false
    source.cluster.alias=source
    cloud.cluster.alias=cloud
    
    # Internal topics settings
    source.config.storage.replication.factor=<R>
    source.status.storage.replication.factor=<R>
    source.offset.storage.replication.factor=<R>
    source.offsets.topic.replication.factor=<R>
    source.errors.deadletterqueue.topic.replication.factor=<R>
    source.offset-syncs.topic.replication.factor=<R>
    source.heartbeats.topic.replication.factor=<R>
    source.checkpoints.topic.replication.factor=<R>
    source.transaction.state.log.replication.factor=<R>
    cloud.config.storage.replication.factor=<R>
    cloud.status.storage.replication.factor=<R>
    cloud.offset.storage.replication.factor=<R>
    cloud.offsets.topic.replication.factor=<R>
    cloud.errors.deadletterqueue.topic.replication.factor=<R>
    cloud.offset-syncs.topic.replication.factor=<R>
    cloud.heartbeats.topic.replication.factor=<R>
    cloud.checkpoints.topic.replication.factor=<R>
    cloud.transaction.state.log.replication.factor=<R>
    
    # Topics
    topics=.*
    groups=.*
    topics.blacklist=.*[\-\.]internal, .*\replica, __consumer_offsets
    groups.blacklist=console-consumer-.*, connect-.*, __.*
    replication.factor=<M>
    refresh.topics.enable=true
    sync.topic.configs.enabled=true
    refresh.topics.interval.seconds=10
    
    # Tasks
    tasks.max=<T>
    
    # Source cluster authentication parameters. Comment out if no authentication required
    source.client.id=mm2_consumer_test
    source.group.id=mm2_consumer_group
    source.security.protocol=SASL_PLAINTEXT
    source.sasl.mechanism=SCRAM-SHA-512
    source.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin-source" password="<пароль>";
    
    # Target cluster authentication parameters
    cloud.client.id=mm2_producer_test
    cloud.group.id=mm2_producer_group
    cloud.ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
    cloud.ssl.truststore.location=/home/<домашняя директория>/mirror-maker/keystore
    cloud.ssl.truststore.password=<пароль хранилища сертификатов>
    cloud.ssl.protocol=TLS
    cloud.security.protocol=SASL_SSL
    cloud.sasl.mechanism=SCRAM-SHA-512
    cloud.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin-cloud" password="<пароль>";
    
    # Enable heartbeats and checkpoints
    source->target.emit.heartbeats.enabled=true
    source->target.emit.checkpoints.enabled=true
    

    Пояснения к конфигурации MirrorMaker:

    • Выполняется односторонняя репликация (параметры source->cloud.enabled = true, cloud->source.enabled = false).
    • В параметре topics указывается список топиков, которые нужно перенести. Также можно указать регулярное выражение для выбора топиков. Для переноса всех топиков укажите .*. В данной конфигурации реплицируются все топики.
    • Названия топиков в кластере-приемнике совпадают с их названиями в кластере-источнике.
    • Параметр <R> — фактор репликации служебных топиков MirrorMaker. Значение этого параметра не должно превышать меньшего между количеством брокеров в кластере-источнике и количеством брокеров в кластере-приемнике.
    • Параметр <M> — фактор репликации по умолчанию, установленный для топиков в кластере-приемнике.
    • Параметр <T> — количество одновременно работающих процессов MirrorMaker. Рекомендуется указывать не менее 2 для равномерного распределения нагрузки репликации. Подробнее см. в документации Apache Kafka®.

    FQDN брокеров Managed Service for Apache Kafka® можно запросить со списком хостов в кластере.

Запустите репликацию

Запустите MirrorMaker на ВМ командой:

<путь установки Kafka>/bin/connect-mirror-maker.sh /home/<домашняя директория>/mirror-maker/mm2.properties

Проверьте наличие данных в топике кластера-приемника

  1. Подключитесь к топику кластера-приемника с помощью утилиты kafkacat. К имени топика кластера-источника добавьте префикс source: например, топик mytopic будет перенесен на кластер-приемник как source.mytopic.
  2. Убедитесь, что в консоли отображаются сообщения из топика кластера-источника.

Подробнее о работе с сервисом MirrorMaker 2.0 см. в документации Apache Kafka®.

Удалите созданные ресурсы

Вручную
С помощью Terraform

Если созданные ресурсы вам больше не нужны, удалите их:

  • Удалите кластер Yandex Managed Service for Apache Kafka®.
  • Удалите виртуальную машину.
  • Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их

Чтобы удалить инфраструктуру, созданную с помощью Terraform:

  1. В терминале перейдите в директорию с планом инфраструктуры.

  2. Удалите конфигурационный файл kafka-mirror-maker.tf.

  3. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  4. Подтвердите изменение ресурсов.

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    Все ресурсы, которые были описаны в конфигурационном файле kafka-mirror-maker.tf, будут удалены.

Была ли статья полезна?

Language / Region
© 2022 ООО «Яндекс.Облако»
В этой статье:
  • Перенос данных с использованием сервиса Yandex Managed Service for Apache Kafka® Connector
  • Перед началом работы
  • Создайте коннектор
  • Проверьте наличие данных в топике кластера-приемника
  • Перенос данных с использованием утилиты MirrorMaker
  • Перед началом работы
  • Настройте конфигурацию MirrorMaker
  • Запустите репликацию
  • Проверьте наличие данных в топике кластера-приемника
  • Удалите созданные ресурсы