Yandex.Cloud
  • Сервисы
  • Почему Yandex.Cloud
  • Сообщество
  • Решения
  • Тарифы
  • Документация
  • Связаться с нами
Подключиться
Yandex Data Proc
  • Сценарии использования
    • Настройка сети для кластеров Data Proc
    • Использование Apache Hive
    • Запуск Spark-приложений
    • Запуск приложений с удаленного хоста
    • Копирование файлов из Yandex Object Storage
  • Пошаговые инструкции
    • Все инструкции
    • Создание кластера
    • Подключение к кластеру
    • Изменение подкластера
    • Управление подкластерами
    • Удаление кластера
  • Концепции
    • Обзор Data Proc
    • Классы хостов
    • Версии Hadoop и компонентов
    • Интерфейсы и порты компонентов
    • Веб-интерфейсы компонентов
    • Автоматическое масштабирование
    • Декомиссия подкластеров и хостов
    • Сеть в Data Proc
    • Квоты и лимиты
  • Управление доступом
  • Правила тарификации
  • Справочник API
    • Аутентификация в API
    • gRPC
      • Обзор
      • ClusterService
      • JobService
      • ResourcePresetService
      • SubclusterService
      • OperationService
    • REST
      • Обзор
      • Cluster
        • Обзор
        • create
        • delete
        • get
        • list
        • listHosts
        • listOperations
        • start
        • stop
        • update
      • Job
        • Обзор
        • create
        • get
        • list
      • ResourcePreset
        • Обзор
        • get
        • list
      • Subcluster
        • Обзор
        • create
        • delete
        • get
        • list
        • update
  • Вопросы и ответы
  1. Сценарии использования
  2. Запуск Spark-приложений

Запуск и управление приложениями для Spark и PySpark

  • Подготовка данных
  • Использование Spark Shell
  • Использование Spark Submit
  • Завершение работы приложения
  • Запуск заданий (jobs) с помощью CLI Yandex.Cloud
    • Запуск Spark Job
    • Запустите задачу в кластере Data Proc
    • Запуск PySpark Job

Чтобы запускать Spark-приложения в кластере Data Proc, подготовьте данные для обработки, а затем выберите нужный вариант запуска:

  • Spark Shell (командная оболочка для языков программирования Scala и Python). Подробнее о ней читайте в документации Spark.
  • Скрипт spark-submit. Более подробно этот способ описан в документации Spark.
  • Команды CLI Yandex.Cloud.

Подготовка данных

Следуя этой инструкции вы рассчитаете статистику по воздушному трафику США за 2018-й год по данным с сайта transtats.bts.gov. Набор данных подготовлен в формате Parquet в публичном бакете Yandex Object Storage с именем yc-mdb-examples.

Для работы с Object Storage рекомендуется настроить S3cmd.

Список файлов можно получить с помощью команды:

$ s3cmd ls s3://yc-mdb-examples/dataproc/example01/set01/

2019-09-13 17:17  19327838   s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet
2019-09-13 17:17  21120204   s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet
...

Примечание

Перед настройкой доступа к сервисам Yandex.Cloud и интернет-ресурсам убедитесь, что сеть кластера настроена правильно.

Использование Spark Shell

  1. Запустите Spark Shell на мастер-хосте:

    /usr/bin/pyspark
    

    Количество ядер и процессов выполнения задач (executor) ограничено только конфигурацией вашего кластера Data Proc.

  2. Построчно введите следующий код:

    sql = SQLContext(sc)
    df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
    

    Последняя строка — чтение данных из публичного бакета с набором данных для примера. После выполнения этой строки в текущей сессии будет доступен организованный набор данных (DataFrame) df с прочитанными данными.

  3. Чтобы увидеть схему полученного DataFrame, выполните команду:

    df.printSchema()
    

    В терминале будет выведен список столбцов с их типами.

  4. Рассчитайте статистику перелетов по месяцам и найдите первую десятку городов по количеству вылетов:

    • Количество перелетов по месяцам:

      df.groupBy("Month").count().orderBy("Month").show()
      
    • Первая десятка городов по количеству вылетов:

      df.groupBy("OriginCityName").count().orderBy("count", ascending=False).show(10)
      

Использование Spark Submit

Spark Submit позволяет запускать заранее написанные приложения через скрипт spark-submit. В качестве примера рассмотрим приложение для расчета количества перелетов по месяцам.

PySpark Submit
Spark Submit
  1. На мастер-хосте создайте файл month_stat.py со следующим кодом:

     import sys
     
     from pyspark import SparkContext, SparkConf
     from pyspark.sql import SQLContext
     
     def main():
             conf = SparkConf().setAppName("Month Stat - Python")
             conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
             sc = SparkContext(conf=conf)
     
             sql = SQLContext(sc)
             df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
             defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS")
             month_stat = df.groupBy("Month").count()
             month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat")
     
     if __name__ == "__main__":
             main()
    
  2. Запустите приложение:

    $ /usr/bin/spark-submit month_stat.py
    
  3. Результат работы приложения будет выгружен в HDFS. Список получившихся файлов можно вывести командой:

    $ hdfs dfs -ls /tmp/month_stat
    

В примере рассматривается сборка и запуск приложения на языке программирования Scala. Для сборки приложений используется sbt, стандартная утилита сборки Scala.

Чтобы создать и запустить Spark-приложение:

  1. Выполните команду scala -version на хосте в подкластере MASTERNODE, чтобы узнать необходимую версию Scala.

    Следите за тем, чтобы версия Scala соответствовала версиям библиотек, которые развернуты в кластере Data Proc, и библиотек, которые используются в приложении. Набор библиотек по умолчанию можно найти в каталоге /usr/lib/spark/jars на хосте-мастере Data Proc.

  2. Создайте папку, например spark-app.

  3. В созданную папку добавьте файл с путем ./src/main/scala/app.scala.

  4. Скопируйте следующий код в файл app.scala:

    package com.yandex.cloud.dataproc.scala
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
      object Main {
        def main(args: Array[String]) {
          val conf = new SparkConf().setAppName("Month Stat - Scala App")
          val sc = new SparkContext(conf)
          sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
          val df = sqlContext.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
          val month_stat = df.groupBy("Month").count()
          val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS")
          month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat")
    
          sc.stop()
        }
      }
    
  5. В папке spark-app создайте файл build.sbt со следующей конфигурацией:

    scalaVersion  := "2.11.6"
    
    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % "2.2.3" % "provided",
        "org.apache.spark" %% "spark-sql" % "2.2.3" % "provided"
    )
    

    Примечание

    Версия Scala и библиотек может измениться с обновлениями компонентов Data Proc.

  6. Скомпилируйте и соберите jar-файл:

    $ sbt compile
    $ sbt package
    
  7. Запустите получившееся приложение:

    /usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main ./target/scala-2.11/scala-app_2.11-0.1-SNAPSHOT.jar
    
  8. Результат работы приложения будет выгружен в HDFS. Список получившихся файлов можно вывести командой:

    hdfs dfs -ls /tmp/month_stat
    

Завершение работы приложения

По умолчанию ресурсы запускаемого приложения управляются компонентом YARN. Если приложение необходимо завершить или убрать из очереди, используйте утилиту yarn:

  1. Выведите список приложений:

    yarn application -list
    
  2. Завершите ненужное приложение:

    yarn application -kill <application_id>
    

Более подробно с командами YARN можно ознакомиться на странице YARN Commands.

Запуск заданий (jobs) с помощью CLI Yandex.Cloud

Если у вас еще нет интерфейса командной строки Yandex.Cloud, установите и инициализируйте его.

Запуск заданий с помощью CLI Yandex.Cloud происходит посредством агента Data Proc, установленного на мастер-хосте кластера. Параметры заданий передаются агенту через Data Proc API.

Исполняемый файл и его зависимости должны находиться в хранилище, к которому есть доступ у сервисного аккаунта кластера Data Proc. У самого запускаемого приложения должен быть доступ к хранилищам, в которых хранятся исходный набор данных и результаты запуска.

Результат расчета приложение сохраняет либо с помощью компонента HDFS в кластере Data Proc, либо в указанный вами бакет Object Storage.

Служебная и отладочная информация сохраняется в бакете Object Storage, который был указан при создании кластера Data Proc. Для каждого задания агент Data Proc создает отдельную папку с путем вида dataproc/clusters/<ID кластера>/jobs/{job_id}. Перед первым запуском следует назначить права WRITE на бакет для сервисного аккаунта, под которым будут запускаться задания.

Ниже приведены два варианта приложения — для Scala и Python.

Запуск Spark Job

Основные шаги:

  1. Соберите Scala-приложение в единый JAR-файл с помощью SBT.
  2. Загрузите JAR-файл в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера.
  3. Запустите задание в кластере Data Proc.

Соберите Scala-приложение

Чтобы упростить управление зависимостями, соберите приложение в один JAR-файл (fat JAR) с помощью плагина sbt-assembly.

Необходимую версию Scala можно узнать исполнив команду scala -version на мастер-хосте. Для примера используется 2.11.12 из образа Data Proc версии 1.1. Для приложений Spark рекомендуется строго соблюдать версию Scala и версии библиотек, которые развернуты в кластере Data Proc. Набор библиотек по умолчанию можно посмотреть в каталоге /usr/lib/spark/jars.

Структура приложения:

spark-app
|-project
|  |-plugins.sbt
|-src
|  |-main
|    |-scala
|      |-app.scala
|-build.sbt

Чтобы собрать приложение:

  1. Создайте папку spark-app, в ней — папки project и src/main/scala.

  2. Создайте файл project/plugins.sbt, который описывает подключение плагина sbt-assembly для сборки единого JAR-файла:

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
    
  3. Создайте файл build.sbt с описанием версии Scala, зависимостей и стратегии их слияния в одном JAR-файле:

    scalaVersion  := "2.11.12"
    
    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % "2.4.4",
        "org.apache.spark" %% "spark-sql" % "2.4.4",
    
    )
    
    
    assemblyMergeStrategy in assembly := {
      case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last
      case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
      case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
      case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
      case PathList("org", "apache", xs @ _*) => MergeStrategy.last
      case PathList("com", "google", xs @ _*) => MergeStrategy.last
      case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
      case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
      case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
      case "about.html" => MergeStrategy.rename
      case "overview.html" => MergeStrategy.last
      case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
      case "META-INF/mailcap" => MergeStrategy.last
      case "META-INF/mimetypes.default" => MergeStrategy.last
      case "plugin.properties" => MergeStrategy.last
      case "log4j.properties" => MergeStrategy.last
      case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
    }
    
  4. Создайте файл src/main/scalaapp.scala с кодом приложения:

    package com.yandex.cloud.dataproc.scala
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
      object Main {
        def main(args: Array[String]) {
          if (args.length != 2){ //проверяем аргумент
            System.err.println("Usage spark-app.jar <input_dir> <output_dir>");
            System.exit(-1);
          }
          val inDir = args(0); //URI на исходные данные
          val outDir = args(1); //URI на директорию куда записать результат
          val conf = new SparkConf().setAppName("Month Stat - Scala App")
          val sc = new SparkContext(conf)
          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
          val df = sqlContext.read.parquet(inDir)
          val monthStat = df.groupBy("Month").count()
          val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") //получить эндпоинт сервера HDFS
          val jobId = conf.get("spark.yarn.tags").replace("dataproc_job_", ""); //получить идентификатор задания
          if (outDir.toLowerCase().startsWith("s3a://")) {
            monthStat.repartition(1).write.format("csv").save(outDir + jobId)
          } else {
            monthStat.repartition(1).write.format("csv").save(defaultFS + "/" + outDir + jobId)
          }
    
          sc.stop()
        }
      }
    
  5. Запустите сборку приложения в папке spark-app:

    sbt clean && sbt compile && sbt assembly
    

Файл будет будет доступен по следующему пути: ./target/scala-2.11/spark-app-assembly-0.1.0-SNAPSHOT.jar

Загрузите JAR-файл в Object Storage

Чтобы Spark имел доступ к собранному JAR-файлу, загрузите файл в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера Data Proc. Загрузить файл можно с помощью s3cmd:

s3cmd put ./target/scala-2.11/spark-app_2.11-0.1.0-SNAPSHOT.jar s3://<ваш бакет>/bin/

Для текущего примера файл загружается по адресу s3://<ваш бакет>/bin/spark-app_2.11-0.1.0-SNAPSHOT.jar.

Запустите задачу в кластере Data Proc

Чтобы Data Proc Agent смог забрать задачу из подсети пользователя необходимо включить NAT в интернет, или настроить NAT-инстанс. О том, как это сделать, читайте в разделе Настройка сети для Data Proc.

Ниже приведены два шаблона команды CLI для запуска Spark-задания — c выводом результата в Object Storage и в HDFS.

Object Storage
HDFS
yc dataproc job create-spark --cluster-id <ID кластера> --name <имя задачи> --main-class "com.yandex.cloud.dataproc.scala.Main" --main-jar-file-uri "s3a://<ваш бакет>/bin/spark-app_2.11-0.1.0-SNAPSHOT.jar" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "s3a://{your_target_bucket}/jobs_results/"

CSV-файл с результатом создается в папке /tmp/jobs/<идентификатор задачи>/ в HDFS.

yc dataproc job create-spark --cluster-id <ID кластера> --name <имя задачи> --main-class "com.yandex.cloud.dataproc.scala.Main" --main-jar-file-uri "s3a://<ваш бакет>/bin/spark-app_2.11-0.1.0-SNAPSHOT.jar" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "tmp/jobs/"

Пример сообщения об успешном запуске задачи:

done (1m2s)
id: {your_job_id}
cluster_id: {your_cluster_id}
name: test02
status: DONE
spark_job:
  args:
  - s3a://yc-mdb-examples/dataproc/example01/set01
  - s3a://<бакет для результатов>/jobs_results/
  main_jar_file_uri: s3a://<ваш бакет>/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar
  main_class: com.yandex.cloud.dataproc.scala.Main

Запуск PySpark Job

Основные шаги:

  1. Подготовьте код Python-приложения.
  2. Загрузите файл с кодом в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера.
  3. Запустите задачу в кластере Data Proc.

Версия Python-приложения должна совпадать с версией, доступной из образа. Проверить версию можно на странице Среда исполнения на подкластерах. Для версии образа 1.1 следует использовать Python 3.7.

Чтобы запустить приложение:

  1. Создайте файл job.py со следующим кодом:

    import sys
    
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext
    
    
    def main():
    
        if len(sys.argv) != 3:
            print('Usage job.py <input_dir> <output_dir>')
            sys.exit(1)
    
        in_dir = sys.argv[1]
        out_dir = sys.argv[2]
    
        conf = SparkConf().setAppName('Month Stat - Python')
        sc = SparkContext(conf=conf)
        sql = SQLContext(sc)
        df = sql.read.parquet(in_dir)
        month_stat = df.groupBy('Month').count()
        job_id = dict(sc._conf.getAll())['spark.yarn.tags'].replace('dataproc_job_', '')
        if out_dir.startswith('s3a://'):
            month_stat.repartition(1).write.format('csv').save(out_dir + job_id)
        else:
            default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS')
            month_stat.repartition(1).write.format('csv').save(default_fs + out_dir + job_id)
    
    
    if __name__ == '__main__':
        main()
    
  2. Чтобы PySpark имел доступ к вашему коду, загрузите файл job.py в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера Data Proc. Загрузить файл можно с помощью s3cmd:

    s3cmd put ./job.py s3://<ваш бакет>/bin/
    
  3. Запустите команду CLI c записью результата:

    • В бакет Object Storage:

      yc dataproc job create-pyspark  --cluster-id <ID кластера> --name <имя задачи>  --main-python-file-uri "s3a://<ваш бакет>/bin/job.py" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "s3a://<ваш бакет>/jobs_results/"
      
    • В HDFS:

      yc dataproc job create-pyspark  --cluster-id <ID кластера> --name <имя задачи>  --main-python-file-uri "s3a://<ваш бакет>/bin/job.py" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "tmp/jobs/"
      

      CSV-файл с результатом создается в папке /tmp/jobs/<идентификатор задачи>/ в HDFS.

В этой статье:
  • Подготовка данных
  • Использование Spark Shell
  • Использование Spark Submit
  • Завершение работы приложения
  • Запуск заданий (jobs) с помощью CLI Yandex.Cloud
  • Запуск Spark Job
  • Запустите задачу в кластере Data Proc
  • Запуск PySpark Job
Language
Вакансии
Политика конфиденциальности
Условия использования
© 2021 ООО «Яндекс.Облако»