Работа с заданиями MapReduce
MapReduce — инструмент параллельной обработки больших (порядка нескольких десятков ТБ) наборов данных на кластерах в экосистеме Hadoop. Позволяет работать с данными в разных форматах. Ввод и вывод задания хранится в Yandex Object Storage.
В этой статье на простом примере показывается, как в Data Proc использовать MapReduce. При помощи MapReduce подсчитывается количество жителей 500 самых населенных городов мира из набора данных о городах.
Чтобы запустить MapReduce на Hadoop, используется интерфейс Streaming. При этом для этапов предобработки данных (map) и вычисления финальных данных (reduce) используются программы, читающие из стандартного программного ввода (stdin) и выдающие результат в стандартный вывод (stdout).
Перед началом работы
-
Создайте сервисный аккаунт с ролью
mdb.dataproc.agent
. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READ
для этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITE
для этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте кластер Data Proc со следующими настройками:
- Сервисы:
HDFS
MAPREDUCE
YARN
- Сервисный аккаунт: выберите созданный ранее сервисный аккаунт с ролью
mdb.dataproc.agent
. - Имя бакета: выберите бакет для результатов обработки.
- Сервисы:
Создайте задание MapReduce
-
Скачайте и загрузите в бакет для исходных данных архив CSV-файла с набором данных о городах.
-
Загрузите в бакет для исходных данных файлы на языке Python с кодом программ предобработки данных (этап map)
mapper.py
и вычисления финальных данных (этап reduce)reducer.py
:mapper.py
import sys population = sum(int(line.split('\t')[14]) for line in sys.stdin) print(population)
reducer.py
import sys population = sum(int(value) for value in sys.stdin) print(population)
-
Создайте задание MapReduce с параметрами:
- Основной класс:
org.apache.hadoop.streaming.HadoopStreaming
- Аргументы задания:
-mapper
mapper.py
-reducer
reducer.py
-numReduceTasks
1
-input
s3a://<имя бакета для исходных данных>/cities500.txt.bz2
-output
s3a://<имя бакета для результатов обработки>/<папка для результатов>
- Файлы:
s3a://<имя бакета для исходных данных>/mapper.py
s3a://<имя бакета для исходных данных>/reducer.py
- Настройки:
mapreduce.job.maps: 6
yarn.app.mapreduce.am.resource.mb: 2048
yarn.app.mapreduce.am.command-opts: -Xmx2048m
- Основной класс:
-
Подождите, пока статус задания изменится на
Done
. -
Скачайте из бакета и просмотрите файл с результатом обработки:
part-00000
3157107417
Удалите созданные ресурсы
Если созданные ресурсы вам больше не нужны, удалите их: