Запуск и управление приложениями для Spark и PySpark
Чтобы запускать Spark-приложения в кластере Data Proc, подготовьте данные для обработки, а затем выберите нужный вариант запуска:
- Spark Shell (командная оболочка для языков программирования Scala и Python). Подробнее о ней читайте в документации Spark.
- Скрипт spark-submit. Более подробно этот способ описан в документации Spark.
- Команды CLI Yandex.Cloud.
Подготовка данных
Следуя этой инструкции вы рассчитаете статистику по воздушному трафику США за 2018-й год по данным с сайта transtats.bts.gov. Набор данных подготовлен в формате Parquet в публичном бакете Yandex Object Storage с именем yc-mdb-examples
.
Для работы с Object Storage рекомендуется настроить S3cmd.
Список файлов можно получить с помощью команды:
$ s3cmd ls s3://yc-mdb-examples/dataproc/example01/set01/
2019-09-13 17:17 19327838 s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet
2019-09-13 17:17 21120204 s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet
...
Примечание
Перед настройкой доступа к сервисам Yandex.Cloud и интернет-ресурсам убедитесь, что сеть кластера настроена правильно.
Использование Spark Shell
-
Запустите Spark Shell на мастер-хосте:
/usr/bin/pyspark
Количество ядер и процессов выполнения задач (executor) ограничено только конфигурацией вашего кластера Data Proc.
-
Построчно введите следующий код:
sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
Последняя строка — чтение данных из публичного бакета с набором данных для примера. После выполнения этой строки в текущей сессии будет доступен организованный набор данных (DataFrame)
df
с прочитанными данными. -
Чтобы увидеть схему полученного DataFrame, выполните команду:
df.printSchema()
В терминале будет выведен список столбцов с их типами.
-
Рассчитайте статистику перелетов по месяцам и найдите первую десятку городов по количеству вылетов:
-
Количество перелетов по месяцам:
df.groupBy("Month").count().orderBy("Month").show()
-
Первая десятка городов по количеству вылетов:
df.groupBy("OriginCityName").count().orderBy("count", ascending=False).show(10)
-
Использование Spark Submit
Spark Submit позволяет запускать заранее написанные приложения через скрипт spark-submit
. В качестве примера рассмотрим приложение для расчета количества перелетов по месяцам.
-
На мастер-хосте создайте файл
month_stat.py
со следующим кодом:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): conf = SparkConf().setAppName("Month Stat - Python") conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") month_stat = df.groupBy("Month").count() month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") if __name__ == "__main__": main()
-
Запустите приложение:
$ /usr/bin/spark-submit month_stat.py
-
Результат работы приложения будет выгружен в HDFS. Список получившихся файлов можно вывести командой:
$ hdfs dfs -ls /tmp/month_stat
В примере рассматривается сборка и запуск приложения на языке программирования Scala. Для сборки приложений используется sbt, стандартная утилита сборки Scala.
Чтобы создать и запустить Spark-приложение:
-
Выполните команду
scala -version
на хосте в подкластере MASTERNODE, чтобы узнать необходимую версию Scala.Следите за тем, чтобы версия Scala соответствовала версиям библиотек, которые развернуты в кластере Data Proc, и библиотек, которые используются в приложении. Набор библиотек по умолчанию можно найти в каталоге
/usr/lib/spark/jars
на хосте-мастере Data Proc. -
Создайте папку, например
spark-app
. -
В созданную папку добавьте файл с путем
./src/main/scala/app.scala
. -
Скопируйте следующий код в файл
app.scala
:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") val month_stat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") sc.stop() } }
-
В папке
spark-app
создайте файлbuild.sbt
со следующей конфигурацией:scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.2.3" % "provided", "org.apache.spark" %% "spark-sql" % "2.2.3" % "provided" )
Примечание
Версия Scala и библиотек может измениться с обновлениями компонентов Data Proc.
-
Скомпилируйте и соберите jar-файл:
$ sbt compile $ sbt package
-
Запустите получившееся приложение:
/usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main ./target/scala-2.11/scala-app_2.11-0.1-SNAPSHOT.jar
-
Результат работы приложения будет выгружен в HDFS. Список получившихся файлов можно вывести командой:
hdfs dfs -ls /tmp/month_stat
Завершение работы приложения
По умолчанию ресурсы запускаемого приложения управляются компонентом YARN. Если приложение необходимо завершить или убрать из очереди, используйте утилиту yarn
:
-
Выведите список приложений:
yarn application -list
-
Завершите ненужное приложение:
yarn application -kill <application_id>
Более подробно с командами YARN можно ознакомиться на странице YARN Commands.
Запуск заданий (jobs) с помощью CLI Yandex.Cloud
Если у вас еще нет интерфейса командной строки Yandex.Cloud, установите и инициализируйте его.
Запуск заданий с помощью CLI Yandex.Cloud происходит посредством агента Data Proc, установленного на мастер-хосте кластера. Параметры заданий передаются агенту через Data Proc API.
Исполняемый файл и его зависимости должны находиться в хранилище, к которому есть доступ у сервисного аккаунта кластера Data Proc. У самого запускаемого приложения должен быть доступ к хранилищам, в которых хранятся исходный набор данных и результаты запуска.
Результат расчета приложение сохраняет либо с помощью компонента HDFS в кластере Data Proc, либо в указанный вами бакет Object Storage.
Служебная и отладочная информация сохраняется в бакете Object Storage, который был указан при создании кластера Data Proc. Для каждого задания агент Data Proc создает отдельную папку с путем вида dataproc/clusters/<ID кластера>/jobs/{job_id}
. Перед первым запуском следует назначить права WRITE
на бакет для сервисного аккаунта, под которым будут запускаться задания.
Ниже приведены два варианта приложения — для Scala и Python.
Запуск Spark Job
Основные шаги:
- Соберите Scala-приложение в единый JAR-файл с помощью SBT.
- Загрузите JAR-файл в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера.
- Запустите задание в кластере Data Proc.
Соберите Scala-приложение
Чтобы упростить управление зависимостями, соберите приложение в один JAR-файл (fat JAR) с помощью плагина sbt-assembly.
Необходимую версию Scala можно узнать исполнив команду scala -version
на мастер-хосте. Для примера используется 2.11.12
из образа Data Proc версии 1.1. Для приложений Spark рекомендуется строго соблюдать версию Scala и версии библиотек, которые развернуты в кластере Data Proc. Набор библиотек по умолчанию можно посмотреть в каталоге /usr/lib/spark/jars
.
Структура приложения:
spark-app
|-project
| |-plugins.sbt
|-src
| |-main
| |-scala
| |-app.scala
|-build.sbt
Чтобы собрать приложение:
-
Создайте папку
spark-app
, в ней — папкиproject
иsrc/main/scala
. -
Создайте файл
project/plugins.sbt
, который описывает подключение плагинаsbt-assembly
для сборки единого JAR-файла:addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
-
Создайте файл
build.sbt
с описанием версии Scala, зависимостей и стратегии их слияния в одном JAR-файле:scalaVersion := "2.11.12" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.4.4", "org.apache.spark" %% "spark-sql" % "2.4.4", ) assemblyMergeStrategy in assembly := { case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last case PathList("javax", "inject", xs @ _*) => MergeStrategy.last case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("javax", "activation", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("com", "google", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case "overview.html" => MergeStrategy.last case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last case "plugin.properties" => MergeStrategy.last case "log4j.properties" => MergeStrategy.last case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }
-
Создайте файл
src/main/scalaapp.scala
с кодом приложения:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { if (args.length != 2){ //проверяем аргумент System.err.println("Usage spark-app.jar <input_dir> <output_dir>"); System.exit(-1); } val inDir = args(0); //URI на исходные данные val outDir = args(1); //URI на директорию куда записать результат val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet(inDir) val monthStat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") //получить эндпоинт сервера HDFS val jobId = conf.get("spark.yarn.tags").replace("dataproc_job_", ""); //получить идентификатор задания if (outDir.toLowerCase().startsWith("s3a://")) { monthStat.repartition(1).write.format("csv").save(outDir + jobId) } else { monthStat.repartition(1).write.format("csv").save(defaultFS + "/" + outDir + jobId) } sc.stop() } }
-
Запустите сборку приложения в папке
spark-app
:sbt clean && sbt compile && sbt assembly
Файл будет будет доступен по следующему пути: ./target/scala-2.11/spark-app-assembly-0.1.0-SNAPSHOT.jar
Загрузите JAR-файл в Object Storage
Чтобы Spark имел доступ к собранному JAR-файлу, загрузите файл в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера Data Proc. Загрузить файл можно с помощью s3cmd:
s3cmd put ./target/scala-2.11/spark-app_2.11-0.1.0-SNAPSHOT.jar s3://<ваш бакет>/bin/
Для текущего примера файл загружается по адресу s3://<ваш бакет>/bin/spark-app_2.11-0.1.0-SNAPSHOT.jar
.
Запустите задачу в кластере Data Proc
Чтобы Data Proc Agent смог забрать задачу из подсети пользователя необходимо включить NAT в интернет, или настроить NAT-инстанс. О том, как это сделать, читайте в разделе Настройка сети для Data Proc.
Ниже приведены два шаблона команды CLI для запуска Spark-задания — c выводом результата в Object Storage и в HDFS.
yc dataproc job create-spark --cluster-id <ID кластера> --name <имя задачи> --main-class "com.yandex.cloud.dataproc.scala.Main" --main-jar-file-uri "s3a://<ваш бакет>/bin/spark-app_2.11-0.1.0-SNAPSHOT.jar" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "s3a://{your_target_bucket}/jobs_results/"
CSV-файл с результатом создается в папке /tmp/jobs/<идентификатор задачи>/
в HDFS.
yc dataproc job create-spark --cluster-id <ID кластера> --name <имя задачи> --main-class "com.yandex.cloud.dataproc.scala.Main" --main-jar-file-uri "s3a://<ваш бакет>/bin/spark-app_2.11-0.1.0-SNAPSHOT.jar" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "tmp/jobs/"
Пример сообщения об успешном запуске задачи:
done (1m2s)
id: {your_job_id}
cluster_id: {your_cluster_id}
name: test02
status: DONE
spark_job:
args:
- s3a://yc-mdb-examples/dataproc/example01/set01
- s3a://<бакет для результатов>/jobs_results/
main_jar_file_uri: s3a://<ваш бакет>/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar
main_class: com.yandex.cloud.dataproc.scala.Main
Запуск PySpark Job
Основные шаги:
- Подготовьте код Python-приложения.
- Загрузите файл с кодом в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера.
- Запустите задачу в кластере Data Proc.
Версия Python-приложения должна совпадать с версией, доступной из образа. Проверить версию можно на странице Среда исполнения на подкластерах. Для версии образа 1.1 следует использовать Python 3.7.
Чтобы запустить приложение:
-
Создайте файл
job.py
со следующим кодом:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): if len(sys.argv) != 3: print('Usage job.py <input_dir> <output_dir>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName('Month Stat - Python') sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet(in_dir) month_stat = df.groupBy('Month').count() job_id = dict(sc._conf.getAll())['spark.yarn.tags'].replace('dataproc_job_', '') if out_dir.startswith('s3a://'): month_stat.repartition(1).write.format('csv').save(out_dir + job_id) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') month_stat.repartition(1).write.format('csv').save(default_fs + out_dir + job_id) if __name__ == '__main__': main()
-
Чтобы PySpark имел доступ к вашему коду, загрузите файл
job.py
в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера Data Proc. Загрузить файл можно с помощью s3cmd:s3cmd put ./job.py s3://<ваш бакет>/bin/
-
Запустите команду CLI c записью результата:
-
В бакет Object Storage:
yc dataproc job create-pyspark --cluster-id <ID кластера> --name <имя задачи> --main-python-file-uri "s3a://<ваш бакет>/bin/job.py" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "s3a://<ваш бакет>/jobs_results/"
-
В HDFS:
yc dataproc job create-pyspark --cluster-id <ID кластера> --name <имя задачи> --main-python-file-uri "s3a://<ваш бакет>/bin/job.py" --args "s3a://yc-mdb-examples/dataproc/example01/set01" --args "tmp/jobs/"
CSV-файл с результатом создается в папке
/tmp/jobs/<идентификатор задачи>/
в HDFS.
-