Запуск и управление приложениями для Spark и PySpark

Следуя этой инструкции вы рассчитаете статистику по воздушному трафику США за 2018-й год по данным с сайта transtats.bts.gov. Набор данных подготовлен в формате Parquet в публичном бакете Yandex Object Storage с именем yc-mdb-examples.

Для работы с Object Storage рекомендуется настроить S3cmd.

Список файлов можно получить с помощью команды:

$ s3cmd ls s3://yc-mdb-examples/dataproc/example01/set01/

2019-09-13 17:17  19327838   s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet
2019-09-13 17:17  21120204   s3://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet
...

Примечание

Перед настройкой доступа к сервисам Облака и интернет-ресурсам убедитесь в правильности настройки сети кластера.

Spark-приложения можно запускать с помощью:

  1. Spark Shell, доступный для Scala и Python. Более подробно с ним можно ознакомиться на странице Quick Start - Spark Documentation.
  2. Скрипта spark-submit для запуска готовых приложений на кластере. Более подробно этот способ описан можно ознакомиться на странице Submitting Applications.

Использование интерактивного Spark Shell

  1. Запустите Spark Shell на мастер-хосте:

    pyspark
    

    Количество executer'ов и ядер ограничено только конфигурацией вашего кластера Data Proc.

  2. Построчно введите следующий код:

    sc._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
    sql = SQLContext(sc)
    df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
    

    После этого текущая сессия будет содержать DataFrame df с прочитанными данными.

  3. Чтобы посмотреть схему этого DataFrame выполните команду:

    df.printSchema()
    

    В терминале будет выведен список столбцов с их типами.

  4. Рассчитайте статистику перелетов по месяцам и найдите топ-10 городов по количеству вылетов:

    • Количество перелетов по месяцам:

      df.groupBy("Month").count().orderBy("Month").show()
      
    • Топ-10 городов с самых большим количеством вылетов:

      df.groupBy("OriginCityName").count().orderBy("count", ascending=False).show(10)
      

Использование Spark Submit

Spark Submit позволяет запускать заранее написанные приложения через скрипт spark-submit. В качестве примера рассмотрим приложение для расчета количества перелетов по месяцам.

  1. На мастер-хосте создайте файл month_stat.py со следующим кодом:

    import sys
    
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext
    
    def main():
            conf = SparkConf().setAppName("Month Stat - Python")
            conf.set("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net")
            conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
            sc = SparkContext(conf=conf)
    
            sql = SQLContext(sc)
            df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
            defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS")
            month_stat = df.groupBy("Month").count()
            month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat")
    
    if __name__ == "__main__":
            main()
    
  2. Запустите приложение:

    $ spark-submit month_stat.py`
    
  3. Посмотрите результат выполнения в HDFS, в списке файлов результата запуска приложения:

    $ hdfs dfs -ls /tmp/month_stat
    

В данном примере рассмотрим сборку и запуск приложения на языке программирования Scala. Рекомендуется использовать sbt для сборки приложений.

  1. Чтобы узнать необходимую версию Scala, выполните команду scala -version на мастер-хосте. Для Spark приложений рекомендуется строго соблюдать версию Scala и версии библиотек, которые развернуты в кластере Data Proc и используются в приложении. Набор библиотек по умолчанию можно посмотреть в каталоге /usr/lib/spark/jars.

  2. Чтобы собрать приложение, создайте папку, например spark-app.

  3. В созданную папку добавьте файл с путем ./src/main/scala.

  4. Скопируйте следующий код в файл app.scala:

    package com.yandex.cloud.mdb.dataproc.scala
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
      object Main {
        def main(args: Array[String]) {
          val conf = new SparkConf().setAppName("Month Stat - Scala App")
          val sc = new SparkContext(conf)
          sc.hadoopConfiguration.set("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net")
          sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
          val df = sqlContext.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
          val month_stat = df.groupBy("Month").count()
          val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS")
          month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat")
    
          sc.stop()
        }
      }
    
  5. В папке spark-app создайте файл build.sbt со следующей конфигурацией:

    scalaVersion  := "2.11.6"
    
    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % "2.2.3" % "provided",
        "org.apache.spark" %% "spark-sql" % "2.2.3" % "provided"
    )
    

    {% note info %} Версия Scala и библиотек может измениться с обновлениями компонентов Data Proc.

  6. Скомпилируйте и соберите jar-файл:

    $ sbt complie
    $ sbt pacakge
    
  7. Запустите получившееся приложение:

    $ spark-submit --class com.yandex.cloud.mdb.dataproc.scala.Main ./target/scala-2.11/scala-app_2.11-0.1-SNAPSHOT.jar
    
  8. Посмотрите результат выполнения в HDFS, в списке файлов результата запуска приложения:

    $ hdfs dfs -ls /tmp/month_stat
    

Завершение работы приложения

По умолчанию, ресурсы для запускаемого приложения управляются YARN. Если по приложение необходимо завершить или убрать из очереди, используйте утилиту yarn:

  1. Выведите список приложений:

    $ yarn application -list
    
  2. Завершите ненужное приложение:

    $ yarn application -kill <application_id>
    

Более подробно с командами YARN можно ознакомиться на странице YARN Commands.