Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
Практические руководства
  • Веб-сервис
    • Все руководства
    • Статический сайт в Object Storage
    • Сайт на LAMP- или LEMP-стеке
    • Отказоустойчивый сайт с балансировкой нагрузки с помощью Network Load Balancer
    • Отказоустойчивый сайт с балансировкой нагрузки с помощью Application Load Balancer
    • Сайт на базе Joomla с БД PostgreSQL
    • Создание сайта на WordPress
    • Сайт на WordPress с БД MySQL
    • Перенос WordPress сайта с хостинга в Yandex Cloud
    • Сайт на базе 1С-Битрикс
    • Организация виртуального хостинга
    • Создание балансировщика с защитой от DDoS
    • Публикация обновлений для игр с помощью Cloud CDN
    • Интеграция L7-балансировщика с Cloud CDN и Object Storage
    • Сине-зеленое и канареечное развертывание версий сервиса
    • Терминирование TLS-соединений
  • Интернет-магазины
    • Все руководства
    • Интернет-магазин на 1С-Битрикс
    • Интернет-магазин на OpenCart
  • Архив данных
    • Все руководства
    • Однонодовый файловый сервер
    • Настройка SFTP-сервера на Centos 7
    • Резервное копирование в Object Storage через Acronis
    • Резервное копирование в Object Storage с помощью CloudBerry Desktop Backup
    • Резервное копирование в Object Storage через Duplicati
    • Резервное копирование в Object Storage с помощью Bacula
    • Резервное копирование в Object Storage с помощью Veritas Backup Exec
    • Распознавание архива изображений в Vision
  • Тестовая среда
    • Все руководства
    • Тестирование приложений с помощью GitLab
    • Создание тестовых ВМ через GitLab CI
    • Высокопроизводительные вычисления на прерываемых ВМ
    • Эмуляция множества IoT-устройств
    • Нагрузочное тестирование gRPC-сервиса
    • Развертывание и нагрузочное тестирование gRPC-сервиса с масштабированием
    • HTTPS-тест с постоянной нагрузкой с помощью Phantom
    • HTTPS-тест со ступенчатой нагрузкой с помощью Pandora
    • Нагрузочное тестирование с нескольких агентов
  • Управление инфраструктурой
    • Все руководства
    • Начало работы с Terraform
    • Загрузка состояний Terraform в Object Storage
    • Начало работы с Packer
    • Сборка образа ВМ с набором инфраструктурных инструментов с помощью Packer
    • Автоматизация сборки образов с помощью Jenkins и Packer
    • Непрерывное развертывание контейнеризованных приложений с помощью GitLab
    • Создание кластера Linux-серверов «1С:Предприятия» с кластером Managed Service for PostgreSQL
    • Миграция в Yandex Cloud с помощью Hystax Acura
    • Защита от сбоев с помощью Hystax Acura
    • Настройка синхронизации часов с помощью NTP
    • Работа с группой ВМ с автомасштабированием
    • Масштабирование группы ВМ по расписанию
    • Автомасштабирование группы ВМ для обработки сообщений из очереди Message Queue
    • Обновление группы ВМ под нагрузкой
    • Передача логов с ВМ в Cloud Logging
    • Резервное копирование ВМ с помощью Hystax Acura Backup
    • Настройка отказоустойчивой архитектуры в Yandex Cloud
    • Создание SAP-программы в Yandex Cloud
    • Настройка локального кеширующего DNS-резолвера
    • Миграция DNS-зон из Яндекс 360 в Cloud DNS
    • Интеграция Cloud DNS и корпоративного сервиса DNS
    • Создание веб-хука резолвера ACME для ответов на DNS01-проверки
    • Запись логов балансировщика в PostgreSQL
    • Создание триггера для бюджетов, который вызывает функцию для остановки ВМ
  • Построение Data Platform
    • Все руководства
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse с помощью Data Transfer
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Управление схемами данных в Managed Service for Apache Kafka®
    • Использование Managed Schema Registry с Managed Service for Apache Kafka®
    • Использование Confluent Schema Registry с Managed Service for Apache Kafka®
    • Миграция базы данных из MySQL в ClickHouse с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse
    • Обмен данными между Managed Service for ClickHouse и Data Proc
    • Настройка Managed Service for ClickHouse для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse с помощью Data Transfer
    • Получение данных из RabbitMQ в Managed Service for ClickHouse
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse
    • Использование гибридного хранилища в Managed Service for ClickHouse
    • Шардирование таблиц Managed Service for ClickHouse
    • Настройка Cloud DNS для доступа к кластерам управляемых баз данных из других облачных сетей
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse из других облачных сетей
    • Обмен данными между Managed Service for ClickHouse и Data Proc
    • Импорт данных из Managed Service for MySQL в Data Proc с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Data Proc с помощью Sqoop
    • Использование скриптов инициализации для настройки GeeseFS в Data Proc
    • Миграция данных из стороннего кластера Elasticsearch в Managed Service for Elasticsearch с помощью Reindex API
    • Миграция коллекций из стороннего кластера MongoDB в Managed Service for MongoDB
    • Миграция данных в Managed Service for MongoDB
    • Шардирование коллекций MongoDB
    • Анализ производительности и оптимизация MongoDB
    • Миграция БД из стороннего кластера MySQL в кластер Managed Service for MySQL
    • Анализ производительности и оптимизация Managed Service for MySQL
    • Синхронизация данных из стороннего кластера MySQL в Managed Service for MySQL с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL в сторонний кластер MySQL
    • Миграция БД из Managed Service for MySQL в Object Storage с помощью Data Transfer
    • Импорт данных из Managed Service for MySQL в Data Proc с помощью Sqoop
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL в Managed Service for YDB с помощью Data Transfer
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Анализ производительности и оптимизация Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Асинхронная репликация данных из PostgreSQL в ClickHouse
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Импорт данных из Managed Service for PostgreSQL в Data Proc с помощью Sqoop
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Миграция БД из Greenplum® в ClickHouse
    • Миграция БД из Greenplum® в PostgreSQL
    • Миграция БД из стороннего кластера Redis в Managed Service for Redis
    • Использование кластера Managed Service for Redis в качестве хранилища сессий PHP
  • Продукты Microsoft в Yandex Cloud
    • Все руководства
    • Развертывание Active Directory
    • Развертывание Microsoft Exchange
    • Развертывание Remote Desktop Services
    • Развертывание группы доступности Always On с внутренним сетевым балансировщиком
    • Развертывание Remote Desktop Gateway
  • Сетевая инфраструктура
    • Все руководства
    • Архитектура и защита базового интернет-сервиса
    • Настройки DHCP для работы с корпоративным DNS-сервером
    • Маршрутизация с помощью NAT-инстанса
    • Создание туннеля IPSec VPN
    • Установка виртуального роутера Cisco CSR 1000v
    • Установка виртуального роутера Mikrotik CHR
    • Соединение с облачной сетью при помощи OpenVPN
    • Создание и настройка шлюза UserGate в режиме прокси-сервера
    • Создание и настройка шлюза UserGate в режиме межсетевого экрана
    • Настройка сети для Data Proc
  • Визуализация и анализ данных
    • Все руководства
    • Визуализация данных из файла
    • Создание и публикация диаграммы с картой Москвы из CSV-файла
    • Анализ продаж сети магазинов из БД ClickHouse
    • Анализ открытых данных ДТП на дорогах России
    • Анализ продаж и локаций пиццерий на данных из БД ClickHouse и Cloud Marketplace
    • Веб-аналитика с подключением к Яндекс Метрике
    • Веб-аналитика с расчетом воронок и когорт на данных Яндекс Метрики
    • Аналитика мобильного приложения на данных AppMetrica
    • Анализ статистики подкастов Яндекс Музыки (для авторов подкастов)
    • Визуализация данных с помощью QL-чарта
    • Анализ customer journey мобильного приложения на данных AppMetrica
    • Анализ логов Object Storage при помощи DataLens
  • Интернет вещей
    • Руководства по работе с интернетом вещей
    • Мониторинг состояния географически распределенных устройств
    • Мониторинг показаний датчиков и уведомления о событиях
  • Бессерверные технологии
    • Сокращатель ссылок
    • Ввод данных в системы хранения
    • Хранение журналов работы приложения
    • Развертывание веб-приложения с использованием Java Servlet API
    • Разработка Slack-бота
    • Разработка Telegram-бота
    • Разработка пользовательской интеграции в API Gateway
    • Разработка CRUD API для сервиса фильмов
    • Разработка навыка Алисы и сайта с авторизацией
  1. Построение Data Platform
  2. Использование Confluent Schema Registry с Managed Service for Apache Kafka®

Использование Confluent Schema Registry с Yandex Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
,
улучшена
Dmitry A.
  • Перед началом работы
  • Создайте топик для уведомлений об изменении схем форматов данных
  • Установите и настройте Confluent Schema Registry на виртуальной машине
  • Создайте скрипты производителя и потребителя
  • Проверьте правильность работы Confluent Schema Registry
  • Удалите созданные ресурсы

Чтобы использовать Confluent Schema Registry совместно с Managed Service for Apache Kafka®:

  1. Создайте топик для уведомлений об изменении схем форматов данных.
  2. Установите и настройте Confluent Schema Registry на виртуальной машине.
  3. Создайте скрипты производителя и потребителя.
  4. Проверьте правильность работы Confluent Schema Registry.
  5. Удалите созданные ресурсы.

Перед началом работы

  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.

    1. Создайте топик с именем messages для обмена сообщениями между производителем и потребителем.
    2. Создайте учетную запись с именем user и выдайте ей права на топик messages:
      • ACCESS_ROLE_CONSUMER,
      • ACCESS_ROLE_PRODUCER.
  2. В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину с Ubuntu 20.04 LTS из Cloud Marketplace и публичным IP-адресом.

  3. Чтобы разрешить прохождение трафика между кластером Managed Service for Apache Kafka® и виртуальной машиной, настройте группы безопасности.

  4. В группах безопасности виртуальной машины создайте правило для входящего трафика, разрешающее подключение через порт 8081 — через него производитель и потребитель будут обращаться к реестру схем:

    • Диапазон портов: 8081.
    • Протокол: TCP.
    • Назначение: CIDR.
    • CIDR блоки: 0.0.0.0/0 или диапазоны адресов подсетей, в которых работают производитель и потребитель.

Создайте топик для уведомлений об изменении схем форматов данных

  1. Создайте служебный топик с именем _schemas со следующими настройками:

    • Количество разделов — 1.
    • Политика очистки лога — Compact.

    Важно

    Указанные значения настроек Количество разделов и Политика очистки лога необходимы для работы Confluent Schema Registry.

  2. Создайте учетную запись с именем registry и выдайте ей права на топик _schemas:

    • ACCESS_ROLE_CONSUMER,
    • ACCESS_ROLE_PRODUCER.

    От имени этой учетной записи Confluent Schema Registry будет работать со служебным топиком _schemas.

Установите и настройте Confluent Schema Registry на виртуальной машине

  1. Подключитесь к виртуальной машине по SSH.

  2. Подключите репозиторий Confluent Schema Registry:

    wget -qO - https://packages.confluent.io/deb/6.2/archive.key | sudo apt-key add - && \
    sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.2 stable main"
    
  3. Установите пакеты:

    sudo apt-get update && \
    sudo apt-get install \
         confluent-schema-registry \
         openjdk-11-jre-headless \
         python3-pip --yes
    
  4. Получите SSL-сертификат.

  5. Создайте для сертификата защищенное хранилище:

    sudo keytool \
         -keystore /etc/schema-registry/client.truststore.jks \
         -alias CARoot \
         -import -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
         -storepass <пароль защищенного хранилища сертификатов> \
         --noprompt
    
  6. Создайте файл /etc/schema-registry/jaas.conf с настройками для подключения к кластеру:

    KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="registry"
      password="<пароль учетной записи registry>";
    };
    
  7. Измените файл /etc/schema-registry/schema-registry.properties, отвечающий за настройки Confluent Schema Registry:

    1. Закомментируйте строку:

      kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
      
    2. Раскомментируйте строку с параметром listeners. Она отвечает за сетевой адрес и порт, которые будет слушать Confluent Schema Registry. По умолчанию используется порт 8081 на всех сетевых интерфейсах:

      listeners=http://0.0.0.0:8081
      
    3. Добавьте в конец файла строки:

      kafkastore.bootstrap.servers=SASL_SSL://<FQDN 1-го хоста-брокера:9091>,<FQDN 2-го хоста-брокера:9091>,...,<FQDN N-го хоста-брокера:9091>
      kafkastore.ssl.truststore.location=/etc/schema-registry/client.truststore.jks
      kafkastore.ssl.truststore.password=<пароль защищенного хранилища сертификатов>
      kafkastore.sasl.mechanism=SCRAM-SHA-512
      kafkastore.security.protocol=SASL_SSL
      

      Список хостов-брокеров можно получить со списком хостов кластера.

  8. Измените файл с описанием модуля systemd /lib/systemd/system/confluent-schema-registry.service.

    1. Перейдите в блок [Service].

    2. Добавьте параметр Environment с настройками Java:

      ...
      
      [Service]
      Type=simple
      User=cp-schema-registry
      Group=confluent
      Environment="LOG_DIR=/var/log/confluent/schema-registry"
      Environment="_JAVA_OPTIONS='-Djava.security.auth.login.config=/etc/schema-registry/jaas.conf'"
      ...
      
  9. Обновите сведения о модулях systemd:

    sudo systemctl daemon-reload
    
  10. Запустите сервис Confluent Schema Registry:

    sudo systemctl start confluent-schema-registry.service
    
  11. Включите автоматический запуск Confluent Schema Registry после перезагрузки ОС:

    sudo systemctl enable confluent-schema-registry.service
    

Создайте скрипты производителя и потребителя

Приведенные скрипты отправляют и принимают сообщения в топике messages в виде пары ключ:значение. В качестве примера схемы форматов данных описаны в формате Avro.

Примечание

Скрипты на Python приведены в демонстрационных целях. Вы можете подготовить и передать схемы форматов данных и сами данные, создав аналогичный скрипт на другом языке программирования.

  1. Установите необходимые пакеты Python:

    sudo pip3 install avro confluent_kafka
    
  2. Создайте Python-скрипт потребителя.

    Алгоритм работы скрипта:

    1. Подключиться к топику messages и реестру схем Confluent Schema Registry.
    2. В непрерывном цикле считывать поступающие в топик messages сообщения.
    3. При получении сообщения запросить в реестре схем Confluent Schema Registry нужные схемы для разбора сообщения.
    4. Разобрать бинарные данные из сообщения в соответствии со схемами для ключа и значения и вывести результат на экран.

    consumer.py

    #!/usr/bin/python3
    
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError
    
    
    c = AvroConsumer(
        {
            "bootstrap.servers": ','.join([
                "<FQDN 1-го хоста-брокера>:9091",
                ...
                "<FQDN N-го хоста-брокера>:9091",
            ]),
            "group.id": "avro-consumer",
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<пароль учетной записи user>",
            "schema.registry.url": "http://<FQDN или IP-адрес сервера Confluent Schema Registry>:8081",
        }
    )
    
    c.subscribe(["messages"])
    
    while True:
        try:
            msg = c.poll(10)
    
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            break
    
        if msg is None:
            continue
    
        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            continue
    
        print(msg.value())
    
    c.close()
    
  3. Создайте Python-скрипт производителя.

    Алгоритм работы скрипта:

    1. Подключиться к реестру схем и передать ему схемы форматов данных для ключа и значения.
    2. Сформировать на основе переданных схем ключ и значение.
    3. Отправить в топик messages сообщение, состоящее из пары ключ:значение. Номера версий схем будут добавлены в сообщение автоматически.

    producer.py

    #!/usr/bin/python3
    
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
        "namespace": "my.test",
        "name": "value",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    key_schema_str = """
    {
        "namespace": "my.test",
        "name": "key",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}
    
    
    def delivery_report(err, msg):
        """Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush()."""
        if err is not None:
            print("Message delivery failed: {}".format(err))
        else:
            print("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))
    
    
    avroProducer = AvroProducer(
        {
            "bootstrap.servers": ','.join([
                "<FQDN 1-го хоста-брокера>:9091",
                ...
                "<FQDN N-го хоста-брокера>:9091",
            ]),
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<пароль учетной записи user>",
            "on_delivery": delivery_report,
            "schema.registry.url": "http://<FQDN или IP-адрес сервера Schema Registry>:8081",
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema,
    )
    
    avroProducer.produce(topic="messages", key=key, value=value)
    avroProducer.flush()
    

Проверьте правильность работы Confluent Schema Registry

  1. Запустите потребитель:

    python3 ./consumer.py
    
  2. В отдельном терминале запустите производитель:

    python3 ./producer.py
    
  3. Убедитесь, что данные, отправленные производителем, получены и правильно интерпретированы потребителем:

    {'name': 'Value'}
    

Удалите созданные ресурсы

Если вам больше не нужны созданные ресурсы, удалите виртуальную машину и кластер Managed Service for Apache Kafka®.

Если вы зарезервировали для созданной виртуальной машины публичный статический IP-адрес, удалите его.

Была ли статья полезна?

Language / Region
Проект Яндекса
© 2023 ООО «Яндекс.Облако»
В этой статье:
  • Перед началом работы
  • Создайте топик для уведомлений об изменении схем форматов данных
  • Установите и настройте Confluent Schema Registry на виртуальной машине
  • Создайте скрипты производителя и потребителя
  • Проверьте правильность работы Confluent Schema Registry
  • Удалите созданные ресурсы