Запуск и управление приложениями для Spark и PySpark
Следуя этой инструкции вы рассчитаете статистику по воздушному трафику США за 2018-й год по данным с сайта transtats.bts.gov. Набор данных подготовлен в формате Parquet в публичном бакете Yandex Object Storage с именем yc-mdb-examples
.
Для работы с Object Storage рекомендуется настроить S3cmd.
Список файлов можно получить с помощью команды:
$ s3cmd ls s3://yc-mdb-examples/dataproc/example01/set01/
2019-09-13 17:17 19327838 s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet
2019-09-13 17:17 21120204 s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet
...
Примечание
Перед настройкой доступа к сервисам Облака и интернет-ресурсам убедитесь в правильности настройки сети кластера.
Spark-приложения можно запускать с помощью:
- Spark Shell, доступный для Scala и Python. Более подробно с ним можно ознакомиться на странице Quick Start - Spark Documentation.
- Скрипта
spark-submit
для запуска готовых приложений на кластере. Более подробно этот способ описан можно ознакомиться на странице Submitting Applications.
Использование интерактивного Spark Shell
-
Запустите Spark Shell на мастер-хосте:
pyspark
Количество executer'ов и ядер ограничено только конфигурацией вашего кластера Data Proc.
-
Построчно введите следующий код:
sc._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
После этого текущая сессия будет содержать DataFrame
df
с прочитанными данными. -
Чтобы посмотреть схему этого DataFrame выполните команду:
df.printSchema()
В терминале будет выведен список столбцов с их типами.
-
Рассчитайте статистику перелетов по месяцам и найдите топ-10 городов по количеству вылетов:
-
Количество перелетов по месяцам:
df.groupBy("Month").count().orderBy("Month").show()
-
Топ-10 городов с самых большим количеством вылетов:
df.groupBy("OriginCityName").count().orderBy("count", ascending=False).show(10)
-
Использование Spark Submit
Spark Submit позволяет запускать заранее написанные приложения через скрипт spark-submit
. В качестве примера рассмотрим приложение для расчета количества перелетов по месяцам.
-
На мастер-хосте создайте файл
month_stat.py
со следующим кодом:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): conf = SparkConf().setAppName("Month Stat - Python") conf.set("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") month_stat = df.groupBy("Month").count() month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") if __name__ == "__main__": main()
-
Запустите приложение:
$ spark-submit month_stat.py`
-
Посмотрите результат выполнения в HDFS, в списке файлов результата запуска приложения:
$ hdfs dfs -ls /tmp/month_stat
В данном примере рассмотрим сборку и запуск приложения на языке программирования Scala. Рекомендуется использовать sbt для сборки приложений.
-
Чтобы узнать необходимую версию Scala, выполните команду
scala -version
на мастер-хосте. Для Spark приложений рекомендуется строго соблюдать версию Scala и версии библиотек, которые развернуты в кластере Data Proc и используются в приложении. Набор библиотек по умолчанию можно посмотреть в каталоге/usr/lib/spark/jars
. -
Чтобы собрать приложение, создайте папку, например
spark-app
. -
В созданную папку добавьте файл с путем
./src/main/scala
. -
Скопируйте следующий код в файл
app.scala
:package com.yandex.cloud.mdb.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) sc.hadoopConfiguration.set("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") val month_stat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") sc.stop() } }
-
В папке
spark-app
создайте файлbuild.sbt
со следующей конфигурацией:scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.2.3" % "provided", "org.apache.spark" %% "spark-sql" % "2.2.3" % "provided" )
{% note info %} Версия Scala и библиотек может измениться с обновлениями компонентов Data Proc.
-
Скомпилируйте и соберите jar-файл:
$ sbt complie $ sbt pacakge
-
Запустите получившееся приложение:
$ spark-submit --class com.yandex.cloud.mdb.dataproc.scala.Main ./target/scala-2.11/scala-app_2.11-0.1-SNAPSHOT.jar
-
Посмотрите результат выполнения в HDFS, в списке файлов результата запуска приложения:
$ hdfs dfs -ls /tmp/month_stat
Завершение работы приложения
По умолчанию, ресурсы для запускаемого приложения управляются YARN. Если по приложение необходимо завершить или убрать из очереди, используйте утилиту yarn
:
-
Выведите список приложений:
$ yarn application -list
-
Завершите ненужное приложение:
$ yarn application -kill <application_id>
Более подробно с командами YARN можно ознакомиться на странице YARN Commands.