Использование Yandex Object Storage в Data Proc
В этом разделе рассмотрены различные способы доступа к объектам из бакетов Object Storage для процессов, запущенных на кластерах Data Proc.
Примечание
Настройте сеть кластера перед настройкой доступа к сервисам Yandex Cloud и интернет-ресурсам.
На скорость чтения и записи файлов в бакеты влияют настройки компонентов:
- Настройки, заданные при создании кластера, влияют на все запущенные в кластере задания.
- Настройки, заданные при создании заданий, переопределяют настройки уровня кластера и могут быть индивидуальными для каждого задания.
DistCp
Для копирования файлов из Object Storage в HDFS используйте утилиту DistCp. Она предназначена для копирования данных как внутри кластера, так и между кластерами и внешними хранилищами.
Для аутентификации в Object Storage можно использовать один из подходов:
- Использовать IAM-токен сервисного аккаунта кластера.
- Использовать CredentialProvider.
- Передавать параметры
access key
иsecret key
статических ключей доступа при создании задачи.
Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера
-
При создании кластера укажите сервисный аккаунт. Если кластер уже создан, добавьте сервисный аккаунт с помощью кнопки Изменить кластер в консоли управления.
-
У сервисного аккаунта должен быть доступ к нужному бакету. Для этого выдайте сервисному аккаунту права в ACL бакета, либо роль
storage.viewer
илиstorage.editor
.Подробнее про эти роли см. в документации Object Storage.
Например, получите список файлов, находящихся в публичном бакете
yc-mdb-examples
по путиdataproc/example01/set01
.
- Подключитесь к кластеру.
- Выполните команду:
hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01
Результат:
Found 12 items -rw-rw-rw- 1 root root 19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet -rw-rw-rw- 1 root root 21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet -rw-rw-rw- 1 root root 20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/ ...
Копирование с использованием CredentialProvider
Чтобы воспользоваться провайдером для хранения секретов, разместите эти секреты в компонентах, которым нужен доступ к Object Storage. Для этого можно воспользоваться JCEKS (Java Cryptography Extension KeyStore): в примере вы создадите файл с секретами, который затем разместите в HDFS.
-
Укажите
access key
иsecret key
, например:hadoop credential create fs.s3a.access.key -value <access key> -provider localjceks://file/home/jack/yc.jceks hadoop credential create fs.s3a.secret.key -value <secret key> -provider localjceks://file/home/jack/yc.jceks
-
Перенесите файл секрета в локальный HDFS:
hdfs dfs -put /home/jack/yc.jceks /user/root/
-
Скопируйте файл из Object Storage непосредственно в HDFS:
hadoop distcp \ -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \ -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \ -update \ -skipcrccheck \ -numListstatusThreads 10 \ s3a://yc-mdb-examples/dataproc/example01/set01 \ hdfs://<хост HDFS>/<путь>/
<хост HDFS>
— целевой сервер HDFS, который вы используете. Сервер по умолчанию можно получить с помощью команды:hdfs getconf -confKey fs.defaultFS
Пример команды копирования файлов из бакета:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Копирование файлов с передачей ключей в аргументах
Вы можете не создавать файл секретов, а передавать ключи в аргументах команды:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D fs.s3a.bucket.dataproc-examples.access.key=<access_key> \
-D fs.s3a.bucket.dataproc-examples.secret.key=<secret_key> \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Оптимизация чтения файлов из Object Storage
Способ чтения данных из бакета определяется настройкой fs.s3a.experimental.input.fadvise
. Ее значение зависит от версии используемого образа:
- В образах версий
1.0
—1.4
по умолчанию используется значениеsequential
. Оно подходит для операций последовательного чтения файлов, но для произвольного работает медленно. Если вы чаще используете произвольный доступ к файлам, добавьте в свойства компонентов кластера или укажите в настройках задания значениеrandom
. - В образе версии
2.0
по умолчанию используется значениеnormal
: работа с файлами происходит в последовательном режиме, но если приложение выполняет операции произвольного доступа, режим автоматически переключается наrandom
.
Подробнее об используемых версиях компонентов см. в разделе Среда исполнения.
Оптимизация записи файлов в Object Storage
Способ записи данных в бакет Object Storage определяется настройкой core:fs.s3a.fast.upload
. Ее значение зависит от версии используемого образа:
- В образах версий
1.0
—1.4
по умолчанию используется значениеfalse
для экономии RAM. Укажите для этой настройки значениеtrue
в свойствах компонентов кластера или настройках задания. Это ускорит запись в бакет больших файлов и предотвратит переполнение хранилищ узлов. - В образе версии
2.0
настройкаfs.s3a.fast.upload
включена по умолчанию.
При необходимости укажите значения других настроек, отвечающих за режим записи в Object Storage:
fs.s3a.fast.upload.active.blocks
— максимальное количество блоков в одном потоке вывода.fs.s3a.fast.upload.buffer
— тип буфера, используемого для временного хранения загружаемых данных:disk
— данные сохраняются в каталог, указанный в настройкеfs.s3a.buffer.dir
;array
— используются массивы в куче JVM;bytebuffer
— используется RAM вне кучи JVM.
fs.s3a.multipart.size
— размер кусков (chunk) в байтах, на которые будут разбиты данные при копировании или выгрузке в бакет.
Подробнее см. в разделе Свойства компонентов.
Использование s3fs
s3fs
позволяет монтировать бакеты Object Storage посредством Fuse. Более подробно о ее использовании можно узнать на странице s3fs.
Использование Object Storage в Spark
Реализуйте нужный вариант доступа:
-
С использованием JCEKS:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь к файлу JCEKS>");
-
По ключу доступа и секрету:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("fs.s3a.access.key","<ключ доступа>"); sc.hadoopConfiguration.set("fs.s3a.secret.key","<секрет бакета>");
После этого можно читать файл из Object Storage:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<имя бакета>/<путь к объекту>")
Выберите способ доступа:
-
Доступ к объектам Object Storage c использование JCEKS:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь к файлу JCEKS>")
-
Доступ по ключу доступа и секрету бакета:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<ключ доступа>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<секрет бакета>")
Получив доступ, вы можете читать файл напрямую из Object Storage:
sql = SQLContext(sc)
df = sql.read.parquet("s3a://<имя бакета>/<путь к объекту>")
Подробнее см. на странице Настройки Spark для работы с Object Storage.