Обмен данными между Yandex Managed Service for ClickHouse и Yandex Data Proc
С помощью Data Proc вы можете:
- Загрузить данные из Managed Service for ClickHouse в Spark DataFrame.
- Выгрузить данные из Spark DataFrame в Managed Service for ClickHouse.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт с именем
dataproc-sa
и назначьте ему рольdataproc.agent
. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READ
для этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITE
для этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте облачную сеть с именем
dataproc-network
. -
В сети
dataproc-network
создайте подсеть в любой зоне доступности. -
Настройте NAT-шлюз для созданной подсети.
-
Если вы используете группы безопасности, создайте группу безопасности с именем
dataproc-sg
в сетиdataproc-network
и добавьте в нее следующие правила:-
По одному правилу для входящего и исходящего служебного трафика:
- Диапазон портов —
0-65535
. - Протокол —
Любой
(Any
). - Источник —
Группа безопасности
. - Группа безопасности —
Текущая
(Self
).
- Диапазон портов —
-
Правило для исходящего HTTPS-трафика:
- Диапазон портов —
443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Правило для исходящего трафика по протоколу TCP на порт 8443 для доступа к ClickHouse:
- Диапазон портов —
8443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
Функциональность находится на стадии Preview.
-
-
Создайте кластер Data Proc с любой подходящей конфигурацией хостов и следующими настройками:
- Компоненты:
- SPARK;
- YARN;
- HDFS.
- Сервисный аккаунт —
dataproc-sa
. - Имя бакета — бакет, который вы создали для выходных данных.
- Сеть —
dataproc-network
. - Группа безопасности —
dataproc-sg
.
- Компоненты:
-
Создайте кластер Managed Service for ClickHouse любой подходящей конфигурации со следующими настройками:
- С публичным доступом к хостам кластера.
- С базой данных
db1
. - С пользователем
user1
.
-
Если вы используете группы безопасности в кластере Managed Service for ClickHouse, убедитесь, что они настроены правильно и допускают подключение к нему.
Функциональность находится на стадии Preview.
-
Если у вас еще нет Terraform, установите его.
-
Скачайте файл с настройками провайдера. Поместите его в отдельную рабочую директорию и укажите значения параметров.
-
Скачайте в ту же рабочую директорию файл конфигурации data-proc-data-exchange-with-mch.tf.
В этом файле описаны:
- сеть;
- подсеть;
- NAT-шлюз и таблица маршрутизации, необходимые для работы Data Proc;
- группы безопасности, необходимые для кластеров Data Proc и Managed Service for ClickHouse;
- сервисный аккаунт, необходимый для работы кластера Data Proc;
- сервисный аккаунт, необходимый для создания бакетов в Object Storage;
- бакеты для входных и выходных данных;
- кластер Data Proc;
- кластер Managed Service for ClickHouse.
-
Укажите в файле
data-proc-data-exchange-with-mch.tf
:folder_id
— идентификатор облачного каталога, такой же как в настройках провайдера.input_bucket
— имя бакета для входных данных.output_bucket
— имя бакета для выходных данных.dp_ssh_key
— абсолютный путь к публичному ключу для кластера Data Proc. Подробнее см. в разделе SSH-подключение к хосту Data Proc.ch_password
— пароль пользователя ClickHouse.
-
Выполните команду
terraform init
в директории с конфигурационными файлами. Эта команда инициализирует провайдер, указанный в конфигурационном файле, и позволяет работать с ресурсами и источниками данных провайдера. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.
-
Загрузите данные из Managed Service for ClickHouse
Подготовьте таблицу в кластере Managed Service for ClickHouse
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse от имени пользователяuser1
. -
Наполните базу тестовыми данными. В качестве примера используется простая таблица с именами и возрастом людей.
-
Создайте таблицу:
CREATE TABLE persons ( `name` String, `age` UInt8) ENGINE = MergeTree () ORDER BY `name`;
-
Наполните таблицу данными:
INSERT INTO persons VALUES ('Anna', 19), ('Michael', 65), ('Alvar', 28), ('Lilith', 50), ('Max', 27), ('Jaimey', 34), ('Dmitry', 42), ('Qiang', 19), ('Augustyna', 20), ('Maria', 28);
-
Проверьте результат:
SELECT * FROM persons;
-
Перенесите таблицу из Managed Service for ClickHouse
-
Подготовьте файл скрипта:
-
Создайте локальный файл с именем
ch-to-dataproc.py
и скопируйте в него следующий скрипт:ch-to-dataproc.pyfrom pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder.appName("ClickhouseDataproc").getOrCreate() # Указание порта и параметров кластера ClickHouse jdbcPort = 8443 jdbcHostname = "c-<идентификатор кластера Managed Service for ClickHouse>.rw.mdb.yandexcloud.net" jdbcDatabase = "db1" jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true" # Перенос таблицы persons из ClickHouse в DataFrame df = spark.read.format("jdbc") \ .option("url", jdbcUrl) \ .option("user","user1") \ .option("password","<пароль пользователя user1>") \ .option("dbtable","persons") \ .load() # Перенос DataFrame в бакет для проверки df.repartition(1).write.mode("overwrite") \ .csv(path='s3a://<имя выходного бакета>/csv', header=True, sep=',')
-
Укажите в скрипте:
- Идентификатор кластера Managed Service for ClickHouse.
- Пароль пользователя
user1
. - Имя выходного бакета.
-
Создайте в бакете для входных данных папку
scripts
и загрузите в нее файлch-to-dataproc.py
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя входного бакета>/scripts/ch-to-dataproc.py
. -
Дождитесь завершения задания и проверьте, что в папке
csv
выходного бакета появилась исходная таблица.
Выгрузите данные в Managed Service for ClickHouse
-
Подготовьте файл скрипта:
-
Создайте локальный файл с именем
dataproc-to-ch.py
и скопируйте в него следующий скрипт:dataproc-to-ch.pyfrom pyspark.sql import SparkSession from pyspark.sql.types import * # Создание Spark-сессии spark = SparkSession.builder.appName("DataprocClickhouse").getOrCreate() # Создание схемы данных schema = StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True)]) # Создание DataFrame df = spark.createDataFrame([('Alim', 19), ('Fred' ,65), ('Guanmin' , 28), ('Till', 60), ('Almagul', 27), ('Mary', 34), ('Dmitry', 42)], schema) # Указание порта и параметров кластера ClickHouse jdbcPort = 8443 jdbcHostname = "c-<идентификатор кластера Managed Service for ClickHouse>.rw.mdb.yandexcloud.net" jdbcDatabase = "db1" jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true" # Перенос DataFrame в ClickHouse df.write.format("jdbc") \ .mode("error") \ .option("url", jdbcUrl) \ .option("dbtable", "people") \ .option("createTableOptions", "ENGINE = MergeTree() ORDER BY age") \ .option("user","user1") \ .option("password","<пароль к базе данных Managed Service for ClickHouse>") \ .save()
-
Укажите в скрипте:
- Идентификатор кластера Managed Service for ClickHouse.
- Пароль пользователя
user1
.
-
Создайте в бакете для входных данных папку
scripts
и загрузите в нее файлdataproc-to-ch.py
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя входного бакета>/scripts/dataproc-to-ch.py
. -
Дождитесь завершения задания и проверьте, что данные перенеслись в Managed Service for ClickHouse:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse от имени пользователяuser1
. -
Выполните запрос:
SELECT * FROM people;
Если выгрузка прошла успешно, ответ на запрос будет содержать таблицу с данными.
-
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Удалите объекты из бакетов.
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
data-proc-data-exchange-with-mch.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
data-proc-data-exchange-with-mch.tf
, будут удалены. -