Yandex.Cloud
  • Сервисы
  • Почему Yandex.Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Yandex Managed Service for Apache Kafka®
  • Начало работы
  • Пошаговые инструкции
    • Все инструкции
    • Информация об имеющихся кластерах
    • Создание кластера
    • Подключение к кластеру
    • Остановка и запуск кластера
    • Изменение настроек кластера
    • Работа с топиками и разделами
    • Управление учетными записями Kafka
    • Удаление кластера
  • Сценарии использования
    • Поставка данных в Managed Service for ClickHouse
  • Концепции
    • Взаимосвязь ресурсов сервиса
    • Топики и разделы
    • Брокеры
    • Производители и потребители
    • Классы хостов
    • Сеть в Managed Service for Apache Kafka®
    • Квоты и лимиты
    • Типы хранилища
  • Управление доступом
  • Правила тарификации
  • Справочник API
    • Аутентификация в API
    • gRPC
      • Обзор
      • ClusterService
      • OperationService
      • ResourcePresetService
      • TopicService
      • UserService
      • OperationService
    • REST
      • Обзор
      • Cluster
        • Обзор
        • create
        • delete
        • get
        • list
        • listHosts
        • listLogs
        • listOperations
        • move
        • start
        • stop
        • streamLogs
        • update
      • Operation
        • Обзор
        • get
      • ResourcePreset
        • Обзор
        • get
        • list
      • Topic
        • Обзор
        • create
        • delete
        • get
        • list
        • update
      • User
        • Обзор
        • create
        • delete
        • get
        • grantPermission
        • list
        • revokePermission
        • update
  • Вопросы и ответы
  1. Пошаговые инструкции
  2. Подключение к кластеру

Подключение к топикам в кластере Apache Kafka®

  • Получение SSL-сертификата
  • Примеры строк подключения

К хостам кластера Managed Service for Apache Kafka® можно подключиться:

  • Через интернет, если вы настроили публичный доступ для кластера при его создании. К такому кластеру можно подключиться только используя SSL-соединение.
  • С виртуальных машин Yandex.Cloud, расположенных в той же облачной сети. Если к кластеру нет публичного доступа, для подключения с таких ВМ SSL-соединение использовать необязательно.

К кластеру Apache Kafka® можно подключиться как с использованием шифрования (SASL_SSL) — порт 9091, так и без него (SASL_PLAINTEXT) — порт 9092.

Чтобы подключиться к кластеру Apache Kafka®:

  1. Создайте учетные записи для клиентов (производителей и потребителей) с доступами в нужные топики.
  2. Подключите клиентов к кластеру:
    • производителей с помощью Kafka Producer API;
    • потребителей с помощью Kafka Consumer API.

Для большинства популярных языков программирования существуют готовые реализации API Apache Kafka®. Примеры кода для подключения к кластеру приведены в разделе Примеры строк подключения.

Получение SSL-сертификата

Чтобы использовать шифрованное SSL-соединение, необходимо получить SSL-сертификат:

sudo mkdir -p /usr/local/share/ca-certificates/Yandex && \
sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" -O /usr/local/share/ca-certificates/Yandex/YandexCA.crt && \
sudo chmod 655 /usr/local/share/ca-certificates/Yandex/YandexCA.crt

Примеры строк подключения

Примеры проверялись в следующем окружении:

  • Виртуальная машина в Облаке c Ubuntu 20.04 LTS.
  • Bash: 5.0.16.
  • Python: 3.8.2; pip3: 20.0.2.
  • OpenJDK: 11.0.8; Maven: 3.6.3.
  • Go: 1.13.8.

Перед подключением к хостам кластера с использованием SSL-соединения, подготовьте сертификат. В примерах ниже предполагается, что сертификат YandexCA.crt расположен в директории /usr/local/share/ca-certificates/Yandex/.

Пример команды с заполненным FQDN хоста вы можете посмотреть в консоли управления, нажав на кнопку Подключиться на странице кластера.

Bash
Bash (SSL)
Python
Python (SSL)
Java
Java (SSL)
Go
Go (SSL)

Для подключения к кластеру Apache Kafka® из командной строки используйте утилиту kafkacat — приложение с открытым исходным кодом, которое может работать как универсальный производитель или потребитель данных. Подробнее читайте в документации.

Перед подключением установите зависимости:

sudo apt update && sudo apt install -y kafkacat

Чтобы отправить сообщение в топик, выполните команду:

echo "test message" | kafkacat -P  \
       -b <FQDN брокера>:9092 \
       -t <имя топика> \
       -k key \
       -X security.protocol=SASL_PLAINTEXT \
       -X sasl.mechanisms=SCRAM-SHA-512 \
       -X sasl.username=<логин производителя> \
       -X sasl.password=<пароль производителя> -Z

Чтобы получить сообщения из топика, выполните команду:

kafkacat -C \
        -b <FQDN брокера>:9092 \
        -t <имя топика> \
        -X security.protocol=SASL_PLAINTEXT \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=<логин потребителя> \
        -X sasl.password=<пароль потребителя> -Z -K:

Для подключения к кластеру Apache Kafka® из командной строки используйте утилиту kafkacat — приложение с открытым исходным кодом, которое может работать как универсальный производитель или потребитель данных. Подробнее читайте в документации.

Перед подключением установите зависимости:

sudo apt update && sudo apt install -y kafkacat

Чтобы отправить сообщение в топик, выполните команду:

echo "test message" | kafkacat -P  \
    -b <FQDN брокера>:9091 \
    -t <имя топика> \
    -k key \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=SCRAM-SHA-512 \
    -X sasl.username=<логин производителя> \
    -X sasl.password=<пароль производителя> \
    -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z   

Чтобы получить сообщения из топика, выполните команду:

kafkacat -C  \
    -b <FQDN брокера>:9091 \
    -t <имя топика> \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=SCRAM-SHA-512 \
    -X sasl.username=<логин потребителя> \
    -X sasl.password=<пароль потребителя> \
    -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexCA.crt -Z -K:   

Перед подключением установите зависимости:

sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c

Пример кода для отправки сообщения в топик:

producer.py

from kafka import KafkaProducer

producer = KafkaProducer(
  bootstrap_servers='<FQDN хоста-брокера>:9092',
  security_protocol="SASL_PLAINTEXT",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<пароль производителя>',
  sasl_plain_username='<имя производителя>')

producer.send('<имя топика>', b'test message', b'key')
producer.flush()
producer.close()

Пример кода для получения сообщений из топика:

consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('<имя топика>',
  bootstrap_servers='<FQDN брокера>:9092',
  security_protocol="SASL_PLAINTEXT",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<пароль потребителя>',
  sasl_plain_username='<имя потребителя>')

print("ready")

for msg in consumer:
  print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))

Запуск приложений:

python3 producer.py
python3 consumer.py

Перед подключением установите зависимости:

sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c

Пример кода для отправки сообщения в топик:

producer.py

from kafka import KafkaProducer

producer = KafkaProducer(
  bootstrap_servers='<FQDN хоста-брокера>:9091',
  security_protocol="SASL_SSL",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<пароль производителя>',
  sasl_plain_username='<имя производителя>',
  ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexCA.crt")

producer.send('<имя топика>', b'test message', b'key')
producer.flush()
producer.close()

Пример кода для получения сообщений из топика:

consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('<имя топика>',
  bootstrap_servers='<FQDN брокера>:9091',
  security_protocol="SASL_SSL",
  sasl_mechanism="SCRAM-SHA-512",
  sasl_plain_password='<пароль потребителя>',
  sasl_plain_username='<имя потребителя>',
  ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexCA.crt")

print("ready")

for msg in consumer:
  print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))

Запуск приложений:

python3 producer.py
python3 consumer.py

Перед подключением:

  1. Установите зависимости:

    sudo apt update && sudo apt install -y default-jdk maven
    
  2. Создайте директорию для проекта Maven:

    cd ~/ && mkdir project && cd project && mkdir -p consumer/src/java/com/example producer/src/java/com/example && cd ~/project
    
  3. Создайте конфигурационный файл для Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Актуальные версии зависимостей для Maven:

    • kafka-clients.
    • jackson-databind.
    • slf4j-simple.
  4. Скопируйте pom.xml в директории приложения-производителя и приложения-потребителя:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    

Пример кода для отправки сообщений в топик:

producer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;

public class App {

  public static void main(String[] args) {

    int MSG_COUNT = 5;

    String HOST = "<FQDN брокера>:9092";
    String TOPIC = "<имя топика>";
    String USER = "<имя производителя>";
    String PASS = "<пароль производителя>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String KEY = "key";

    String serializer = StringSerializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("acks", "all");
    props.put("key.serializer", serializer);
    props.put("value.serializer", serializer);
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);

    Producer<String, String> producer = new KafkaProducer<>(props);

    try {
     for (int i = 1; i <= MSG_COUNT; i++){
       producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
       System.out.println("Test message " + i);
      }
     producer.flush();
     producer.close();
    } catch (Exception ex) {
        System.out.println(ex);
        producer.close();
    }
  }
}

Пример кода для получения сообщений из топика:

consumer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;

public class App {

  public static void main(String[] args) {

    String HOST = "<FQDN брокера>:9092";
    String TOPIC = "<имя топика>";
    String USER = "<имя потребителя>";
    String PASS = "<пароль потребителя>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String GROUP = "demo";

    String deserializer = StringDeserializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("group.id", GROUP);
    props.put("key.deserializer", deserializer);
    props.put("value.deserializer", deserializer);
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(new String[] {TOPIC}));

    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.key() + ":" + record.value());
      }
    }
  }
}

Сборка приложений:

cd ~/project/producer && mvn clean package && \
cd ~/project/consumer && mvn clean package 

Запуск приложений:

java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar

Перед подключением:

  1. Установите зависимости:

    sudo apt update && sudo apt install -y default-jdk maven
    
  2. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. При этом задайте пароль в параметре -storepass для дополнительной защиты хранилища:

    cd /etc/security && \
    sudo keytool -importcert -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexCA.crt \
           -keystore ssl -storepass <пароль хранилища сертификатов> \
           --noprompt
    
  3. Создайте директорию для проекта Maven:

    cd ~/ && mkdir project && cd project && mkdir -p consumer/src/java/com/example producer/src/java/com/example && cd ~/project
    
  4. Создайте конфигурационный файл для Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Актуальные версии зависимостей для Maven:

    • kafka-clients;
    • jackson-databind;
    • slf4j-simple.
  5. Скопируйте pom.xml в директории приложения-производителя и приложения-потребителя:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    

Пример кода для отправки сообщений в топик:

producer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;

public class App {

  public static void main(String[] args) {

    int MSG_COUNT = 5;

    String HOST = "<FQDN брокера>:9091";
    String TOPIC = "<имя топика>";
    String USER = "<имя производителя>";
    String PASS = "<пароль производителя>";
    String TS_FILE = "/etc/security/ssl";
    String TS_PASS = "<пароль от хранилища сертификатов>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String KEY = "key";

    String serializer = StringSerializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("acks", "all");
    props.put("key.serializer", serializer);
    props.put("value.serializer", serializer);
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);
    props.put("ssl.truststore.location", TS_FILE);
    props.put("ssl.truststore.password", TS_PASS);

    Producer<String, String> producer = new KafkaProducer<>(props);

    try {
     for (int i = 1; i <= MSG_COUNT; i++){
       producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
       System.out.println("Test message " + i);
      }
     producer.flush();
     producer.close();
    } catch (Exception ex) {
        System.out.println(ex);
        producer.close();
    }
  }
}

Пример кода для получения сообщений из топика:

consumer/src/java/com/example/App.java

package com.example;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;

public class App {

  public static void main(String[] args) {

    String HOST = "<FQDN брокера>:9091";
    String TOPIC = "<имя топика>";
    String USER = "<имя потребителя>";
    String PASS = "<пароль потребителя>";
    String TS_FILE = "/etc/security/ssl"; 
    String TS_PASS = "<пароль от хранилища сертификатов>";

    String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, USER, PASS);
    String GROUP = "demo";

    String deserializer = StringDeserializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("group.id", GROUP);
    props.put("key.deserializer", deserializer);
    props.put("value.deserializer", deserializer);
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);
    props.put("ssl.truststore.location", TS_FILE);
    props.put("ssl.truststore.password", TS_PASS);

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(new String[] {TOPIC}));

    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.key() + ":" + record.value());
      }
    }
  }
}

Сборка приложений:

cd ~/project/producer && mvn clean package && \
cd ~/project/consumer && mvn clean package 

Запуск приложений:

java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar

Перед подключением:

  1. Установите зависимости:

    sudo apt update && sudo apt install -y golang git && \
    go get github.com/Shopify/sarama && \
    go get github.com/xdg/scram
    
  2. Создайте директорию для проекта:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Создайте файл scram.go, содержащий код для использования SCRAM, общий для приложения-производителя и приложения-потребителя:

    scram.go
    package main
    
    import (
     	"crypto/sha256"
     	"crypto/sha512"
     	"hash"
    
     	"github.com/xdg/scram"
    )
    
    var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
    
    type XDGSCRAMClient struct {
       *scram.Client
     	*scram.ClientConversation
     	scram.HashGeneratorFcn
    }
    
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
     	x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
     	if err != nil {
     		return err
     	}
     	x.ClientConversation = x.Client.NewConversation()
     	return nil
    }
    
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
     	response, err = x.ClientConversation.Step(challenge)
     	return
    }
    
    func (x *XDGSCRAMClient) Done() bool {
     	return x.ClientConversation.Done()
    }
    
  4. Скопируйте scram.go в директории приложения-производителя и приложения-потребителя:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    

Пример кода для отправки сообщения в топик:

producer/main.go

package main

import (
      "fmt"
      "os"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN хоста-брокера>:9092"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Producer.Return.Successes = true
      conf.Version = sarama.V0_10_0_0
      conf.ClientID = "sasl_scram_client"
      conf.Net.SASL.Enable = true
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.User = "<имя производителя>"
      conf.Net.SASL.Password = "<пароль производителя>"
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Couldn't create producer: ", err.Error())
              os.Exit(0)
      }
      publish("test message", syncProducer)

}

func publish(message string, producer sarama.SyncProducer) {
  // publish sync
  msg := &sarama.ProducerMessage {
      Topic: "<имя топика>",
      Value: sarama.StringEncoder(message),
  }
  p, o, err := producer.SendMessage(msg)
  if err != nil {
      fmt.Println("Error publish: ", err.Error())
  }

  fmt.Println("Partition: ", p)
  fmt.Println("Offset: ", o)
}

Пример кода для получения сообщений из топика:

consumer/main.go

package main

import (
      "fmt"
      "os"
      "os/signal"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN хоста-брокера>:9092"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Version = sarama.V0_10_0_0
      conf.Consumer.Return.Errors = true
      conf.ClientID = "sasl_scram_client"
      conf.Metadata.Full = true
      conf.Net.SASL.Enable = true
      conf.Net.SASL.User =  "<имя потребителя>"
      conf.Net.SASL.Password = "<пароль потребителя>"
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      master, err := sarama.NewConsumer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Coulnd't create consumer: ", err.Error())
              os.Exit(1)
      }

      defer func() {
              if err := master.Close(); err != nil {
                      panic(err)
              }
      }()

      topic := "<имя топика>"

      consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
      if err != nil {
              panic(err)
      }

      signals := make(chan os.Signal, 1)
      signal.Notify(signals, os.Interrupt)

      // Count how many message processed
      msgCount := 0

      // Get signal for finish
      doneCh := make(chan struct{})
      go func() {
              for {
                      select {
                      case err := <-consumer.Errors():
                              fmt.Println(err)
                      case msg := <-consumer.Messages():
                              msgCount++
                              fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                      case <-signals:
                              fmt.Println("Interrupt is detected")
                              doneCh <- struct{}{}
                      }
              }
      }()

      <-doneCh
      fmt.Println("Processed", msgCount, "messages")
}

Сборка приложений:

cd ~/go-project/producer && go build && \
cd ~/go-project/consumer && go build 

Запуск приложений:

~/go-project/producer/producer
~/go-project/consumer/consumer

Перед подключением:

  1. Установите зависимости:

    sudo apt update && sudo apt install -y golang git && \
    go get github.com/Shopify/sarama && \
    go get github.com/xdg/scram
    
  2. Создайте директорию для проекта:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Создайте файл scram.go, содержащий код для использования SCRAM, общий для приложения-производителя и приложения-потребителя:

    scram.go
    package main
    
    import (
     	"crypto/sha256"
     	"crypto/sha512"
     	"hash"
    
     	"github.com/xdg/scram"
    )
    
    var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
    
    type XDGSCRAMClient struct {
       *scram.Client
     	*scram.ClientConversation
     	scram.HashGeneratorFcn
    }
    
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
     	x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
     	if err != nil {
     		return err
     	}
     	x.ClientConversation = x.Client.NewConversation()
     	return nil
    }
    
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
     	response, err = x.ClientConversation.Step(challenge)
     	return
    }
    
    func (x *XDGSCRAMClient) Done() bool {
     	return x.ClientConversation.Done()
    }
    
  4. Скопируйте scram.go в директории приложения-производителя и приложения-потребителя:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    

Пример кода для отправки сообщения в топик:

producer/main.go

package main

import (
      "fmt"
      "crypto/tls"
      "crypto/x509"
      "io/ioutil"
      "os"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN хоста-брокера>:9091"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Producer.Return.Successes = true
      conf.Version = sarama.V0_10_0_0
      conf.ClientID = "sasl_scram_client"
      conf.Net.SASL.Enable = true
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.User = "<имя производителя>"
      conf.Net.SASL.Password = "<пароль производителя>"
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      certs := x509.NewCertPool()
      pemPath := "/usr/local/share/ca-certificates/Yandex/YandexCA.crt"
      pemData, err := ioutil.ReadFile(pemPath)
      if err != nil {
              fmt.Println("Couldn't load cert: ", err.Error())
          // handle the error
      }
      certs.AppendCertsFromPEM(pemData)

      conf.Net.TLS.Enable = true
      conf.Net.TLS.Config = &tls.Config{
        InsecureSkipVerify: true,
        RootCAs: certs,
      }

      syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Couldn't create producer: ", err.Error())
              os.Exit(0)
      }
      publish("test message", syncProducer)

}

func publish(message string, producer sarama.SyncProducer) {
  // publish sync
  msg := &sarama.ProducerMessage {
      Topic: "<имя топика>",
      Value: sarama.StringEncoder(message),
  }
  p, o, err := producer.SendMessage(msg)
  if err != nil {
      fmt.Println("Error publish: ", err.Error())
  }

  fmt.Println("Partition: ", p)
  fmt.Println("Offset: ", o)
}

Пример кода для получения сообщений из топика:

consumer/main.go

package main

import (
      "fmt"
      "crypto/tls"
      "crypto/x509"
      "io/ioutil"
      "os"
      "os/signal"
      "strings"

      "github.com/Shopify/sarama"
)

func main() {
      brokers := "<FQDN хоста-брокера>:9091"
      splitBrokers := strings.Split(brokers, ",")
      conf := sarama.NewConfig()
      conf.Producer.RequiredAcks = sarama.WaitForAll
      conf.Version = sarama.V0_10_0_0
      conf.Consumer.Return.Errors = true
      conf.ClientID = "sasl_scram_client"
      conf.Metadata.Full = true
      conf.Net.SASL.Enable = true
      conf.Net.SASL.User =  "<имя потребителя>"
      conf.Net.SASL.Password = "<пароль потребителя>"
      conf.Net.SASL.Handshake = true
      conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
      conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

      certs := x509.NewCertPool()
      pemPath := "/usr/local/share/ca-certificates/Yandex/YandexCA.crt"
      pemData, err := ioutil.ReadFile(pemPath)
      if err != nil {
          fmt.Println("Couldn't load cert: ", err.Error())
              // handle the error
      }
      certs.AppendCertsFromPEM(pemData)

      conf.Net.TLS.Enable = true
      conf.Net.TLS.Config = &tls.Config{
                InsecureSkipVerify: true,
                  RootCAs: certs,
      }

      master, err := sarama.NewConsumer(splitBrokers, conf)
      if err != nil {
              fmt.Println("Coulnd't create consumer: ", err.Error())
              os.Exit(1)
      }

      defer func() {
              if err := master.Close(); err != nil {
                      panic(err)
              }
      }()

      topic := "<имя топика>"

      consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
      if err != nil {
              panic(err)
      }

      signals := make(chan os.Signal, 1)
      signal.Notify(signals, os.Interrupt)

      // Count how many message processed
      msgCount := 0

      // Get signal for finish
      doneCh := make(chan struct{})
      go func() {
              for {
                      select {
                      case err := <-consumer.Errors():
                              fmt.Println(err)
                      case msg := <-consumer.Messages():
                              msgCount++
                              fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                      case <-signals:
                              fmt.Println("Interrupt is detected")
                              doneCh <- struct{}{}
                      }
              }
      }()

      <-doneCh
      fmt.Println("Processed", msgCount, "messages")
}

Сборка приложений:

cd ~/go-project/producer && go build && \
cd ~/go-project/consumer && go build 

Запуск приложений:

~/go-project/producer/producer
~/go-project/consumer/consumer

Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик Apache Kafka® одно или несколько сообщений test message с ключом key. Приложение-потребитель отобразит сообщения, отправленные в топик.

В этой статье:
  • Получение SSL-сертификата
  • Примеры строк подключения
Language
Вакансии
Политика конфиденциальности
Условия использования
© 2021 ООО «Яндекс.Облако»