Yandex Cloud
  • Сервисы
  • Решения
  • Почему Yandex Cloud
  • Сообщество
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Language / Region
© 2022 ООО «Яндекс.Облако»
Yandex Data Proc
  • Практические руководства
    • Все практические руководства
    • Работа с заданиями
      • Обзор
      • Работа с заданиями Hive
      • Работа с заданиями MapReduce
      • Работа с заданиями PySpark
      • Работа с заданиями Spark
      • Запуск заданий Apache Hive
      • Запуск Spark-приложений
      • Запуск заданий с удаленного хоста
    • Настройка сети для Data Proc
    • Использование Yandex Object Storage в Data Proc
    • Обмен данными с Managed Service for ClickHouse
    • Импорт базы данных с использованием Sqoop
    • Использование скриптов инициализации для настройки GeeseFS в Data Proc
  • Пошаговые инструкции
    • Все инструкции
    • Информация об имеющихся кластерах
    • Создание кластера
    • Подключение к кластеру
    • Изменение кластера
    • Управление подкластерами
    • Изменение подкластера
    • Подключение к интерфейсам компонентов
    • Использование Sqoop
    • Управление заданиями
      • Все задания
      • Задания Spark
      • Задания PySpark
      • Задания Hive
      • Задания MapReduce
    • Удаление кластера
    • Работа с логами
    • Мониторинг состояния кластера и хостов
  • Концепции
    • Взаимосвязь ресурсов сервиса
    • Классы хостов
    • Среда исполнения
    • Интерфейсы и порты компонентов Data Proc
    • Задания в Data Proc
    • Автоматическое масштабирование
    • Декомиссия подкластеров и хостов
    • Сеть в Data Proc
    • Техническое обслуживание
    • Квоты и лимиты
    • Хранилище в Data Proc
    • Свойства компонентов
    • Логи в Data Proc
    • Скрипты инициализации
  • Управление доступом
  • Правила тарификации
  • Справочник API
    • Аутентификация в API
    • gRPC (англ.)
      • Overview
      • ClusterService
      • JobService
      • ResourcePresetService
      • SubclusterService
      • OperationService
    • REST (англ.)
      • Overview
      • Cluster
        • Overview
        • create
        • delete
        • get
        • list
        • listHosts
        • listOperations
        • listUILinks
        • start
        • stop
        • update
      • Job
        • Overview
        • cancel
        • create
        • get
        • list
        • listLog
      • ResourcePreset
        • Overview
        • get
        • list
      • Subcluster
        • Overview
        • create
        • delete
        • get
        • list
        • update
  • История изменений
    • Изменения сервиса
    • Образы
  • Вопросы и ответы
  1. Практические руководства
  2. Использование Yandex Object Storage в Data Proc

Использование Yandex Object Storage в Data Proc

Статья создана
Yandex Cloud
  • DistCp
    • Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера
    • Копирование с использованием CredentialProvider
    • Копирование файлов с передачей ключей в аргументах
  • Оптимизация чтения файлов из Object Storage
  • Оптимизация записи файлов в Object Storage
  • Использование s3fs
  • Использование Object Storage в Spark

В этом разделе рассмотрены различные способы доступа к объектам из бакетов Object Storage для процессов, запущенных на кластерах Data Proc.

Примечание

Настройте сеть кластера перед настройкой доступа к сервисам Yandex Cloud и интернет-ресурсам.

На скорость чтения и записи файлов в бакеты влияют настройки компонентов:

  • Настройки, заданные при создании кластера, влияют на все запущенные в кластере задания.
  • Настройки, заданные при создании заданий, переопределяют настройки уровня кластера и могут быть индивидуальными для каждого задания.

DistCp

Для копирования файлов из Object Storage в HDFS используйте утилиту DistCp. Она предназначена для копирования данных как внутри кластера, так и между кластерами и внешними хранилищами.

Для аутентификации в Object Storage можно использовать один из подходов:

  1. Использовать IAM-токен сервисного аккаунта кластера.
  2. Использовать CredentialProvider.
  3. Передавать параметры access key и secret key статических ключей доступа при создании задачи.

Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера

  1. При создании кластера укажите сервисный аккаунт. Если кластер уже создан, добавьте сервисный аккаунт с помощью кнопки Изменить кластер в консоли управления.

  2. У сервисного аккаунта должен быть доступ к нужному бакету. Для этого выдайте сервисному аккаунту права в ACL бакета, либо роль storage.viewer или storage.editor.

    Подробнее про эти роли см. в документации Object Storage.

Например, получите список файлов, находящихся в публичном бакете yc-mdb-examples по пути dataproc/example01/set01.

  1. Подключитесь к кластеру.
  2. Выполните команду:
hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01

Результат:

Found 12 items
-rw-rw-rw-   1 root root   19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet
-rw-rw-rw-   1 root root   21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet
-rw-rw-rw-   1 root root   20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/
...

Копирование с использованием CredentialProvider

Чтобы воспользоваться провайдером для хранения секретов, разместите эти секреты в компонентах, которым нужен доступ к Object Storage. Для этого можно воспользоваться JCEKS (Java Cryptography Extension KeyStore): в примере вы создадите файл с секретами, который затем разместите в HDFS.

  1. Укажите access key и secret key, например:

    hadoop credential create fs.s3a.access.key -value <access key> -provider localjceks://file/home/jack/yc.jceks
    hadoop credential create fs.s3a.secret.key -value <secret key> -provider localjceks://file/home/jack/yc.jceks
    
  2. Перенесите файл секрета в локальный HDFS:

    hdfs dfs -put /home/jack/yc.jceks /user/root/
    
  3. Скопируйте файл из Object Storage непосредственно в HDFS:

    hadoop distcp \
           -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
           -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
           -update \
           -skipcrccheck \
           -numListstatusThreads 10 \
           s3a://yc-mdb-examples/dataproc/example01/set01 \
           hdfs://<хост HDFS>/<путь>/
    

    <хост HDFS> — целевой сервер HDFS, который вы используете. Сервер по умолчанию можно получить с помощью команды:

    hdfs getconf -confKey fs.defaultFS
    

Пример команды копирования файлов из бакета:

hadoop distcp \
       -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
       -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
       -update \
       -skipcrccheck \
       -numListstatusThreads 10 \
       s3a://yc-mdb-examples/dataproc/example01/set01 \
       hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/

Копирование файлов с передачей ключей в аргументах

Вы можете не создавать файл секретов, а передавать ключи в аргументах команды:

hadoop distcp \
       -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
       -D fs.s3a.bucket.dataproc-examples.access.key=<access_key> \
       -D fs.s3a.bucket.dataproc-examples.secret.key=<secret_key> \
       -update \
       -skipcrccheck \
       -numListstatusThreads 10 \
       s3a://yc-mdb-examples/dataproc/example01/set01 \
       hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/

Оптимизация чтения файлов из Object Storage

Способ чтения данных из бакета определяется настройкой fs.s3a.experimental.input.fadvise. Ее значение зависит от версии используемого образа:

  • В образах версий 1.0—1.4 по умолчанию используется значение sequential. Оно подходит для операций последовательного чтения файлов, но для произвольного работает медленно. Если вы чаще используете произвольный доступ к файлам, добавьте в свойства компонентов кластера или укажите в настройках задания значение random.
  • В образе версии 2.0 по умолчанию используется значение normal: работа с файлами происходит в последовательном режиме, но если приложение выполняет операции произвольного доступа, режим автоматически переключается на random.

Подробнее об используемых версиях компонентов см. в разделе Среда исполнения.

Оптимизация записи файлов в Object Storage

Способ записи данных в бакет Object Storage определяется настройкой core:fs.s3a.fast.upload. Ее значение зависит от версии используемого образа:

  • В образах версий 1.0—1.4 по умолчанию используется значение false для экономии RAM. Укажите для этой настройки значение true в свойствах компонентов кластера или настройках задания. Это ускорит запись в бакет больших файлов и предотвратит переполнение хранилищ узлов.
  • В образе версии 2.0 настройка fs.s3a.fast.upload включена по умолчанию.

При необходимости укажите значения других настроек, отвечающих за режим записи в Object Storage:

  • fs.s3a.fast.upload.active.blocks — максимальное количество блоков в одном потоке вывода.
  • fs.s3a.fast.upload.buffer — тип буфера, используемого для временного хранения загружаемых данных:
    • disk — данные сохраняются в каталог, указанный в настройке fs.s3a.buffer.dir;
    • array — используются массивы в куче JVM;
    • bytebuffer — используется RAM вне кучи JVM.
  • fs.s3a.multipart.size — размер кусков (chunk) в байтах, на которые будут разбиты данные при копировании или выгрузке в бакет.

Подробнее см. в разделе Свойства компонентов.

Использование s3fs

s3fs позволяет монтировать бакеты Object Storage посредством Fuse. Более подробно о ее использовании можно узнать на странице s3fs.

Использование Object Storage в Spark

Spark Shell
PySpark Shell

Реализуйте нужный вариант доступа:

  • С использованием JCEKS:

    sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net");
    sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", "");
    sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
    sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь к файлу JCEKS>");
    
  • По ключу доступа и секрету:

    sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net");
    sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", "");
    sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
    sc.hadoopConfiguration.set("fs.s3a.access.key","<ключ доступа>");
    sc.hadoopConfiguration.set("fs.s3a.secret.key","<секрет бакета>");
    

После этого можно читать файл из Object Storage:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<имя бакета>/<путь к объекту>")

Выберите способ доступа:

  • Доступ к объектам Object Storage c использование JCEKS:

    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net")
    sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь к файлу JCEKS>")
    
  • Доступ по ключу доступа и секрету бакета:

    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net")
    sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<ключ доступа>")
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<секрет бакета>")
    

Получив доступ, вы можете читать файл напрямую из Object Storage:

sql = SQLContext(sc)
df = sql.read.parquet("s3a://<имя бакета>/<путь к объекту>")

Подробнее см. на странице Настройки Spark для работы с Object Storage.

Была ли статья полезна?

Language / Region
© 2022 ООО «Яндекс.Облако»
В этой статье:
  • DistCp
  • Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера
  • Копирование с использованием CredentialProvider
  • Копирование файлов с передачей ключей в аргументах
  • Оптимизация чтения файлов из Object Storage
  • Оптимизация записи файлов в Object Storage
  • Использование s3fs
  • Использование Object Storage в Spark