Предварительная обработка данных BigQuery с помощью PySpark в Dataproc

1. Обзор

В этой лабораторной работе будет рассмотрено, как создать конвейер обработки данных с помощью Apache Spark с Dataproc на Google Cloud Platform . В науке о данных и инженерии данных обычным вариантом использования является чтение данных из одного места хранения, выполнение их преобразований и запись в другое место хранения. Общие преобразования включают изменение содержимого данных, удаление ненужной информации и изменение типов файлов.

В этой лаборатории кода вы узнаете об Apache Spark, запустите пример конвейера с использованием Dataproc с PySpark (API Python Apache Spark), BigQuery , Google Cloud Storage и данных из Reddit.

2. Знакомство с Apache Spark (необязательно)

Согласно веб-сайту, « Apache Spark — это унифицированная аналитическая система для крупномасштабной обработки данных». Он позволяет анализировать и обрабатывать данные параллельно и в памяти, что позволяет выполнять массовые параллельные вычисления на нескольких разных машинах и узлах. Первоначально он был выпущен в 2014 году как обновление традиционного MapReduce и до сих пор остается одной из самых популярных платформ для выполнения крупномасштабных вычислений. Apache Spark написан на Scala и впоследствии имеет API-интерфейсы на Scala, Java, Python и R. Он содержит множество библиотек, таких как Spark SQL для выполнения SQL-запросов к данным, Spark Streaming для потоковой передачи данных, MLlib для машинного обучения и GraphX ​​для обработка графов, все из которых выполняются на движке Apache Spark.

32add0b6a47bafbc.png

Spark может работать сам по себе или использовать для масштабирования службу управления ресурсами, такую ​​как Yarn , Mesos или Kubernetes . Для этой лаборатории кода вы будете использовать Dataproc, в котором используется Yarn.

Данные в Spark изначально загружались в память в так называемый RDD, или устойчивый распределенный набор данных. С тех пор разработка Spark включала добавление двух новых типов данных в виде столбцов: типизированного Dataset и нетипизированного Dataframe. Грубо говоря, RDD отлично подходят для любого типа данных, тогда как наборы данных и фреймы данных оптимизированы для табличных данных. Поскольку наборы данных доступны только с API-интерфейсами Java и Scala, мы продолжим использовать API-интерфейс PySpark Dataframe для этой лаборатории кода. Для получения дополнительной информации обратитесь к документации Apache Spark.

3. Вариант использования

Инженерам по обработке данных часто необходимо, чтобы данные были легко доступны ученым, работающим с данными. Однако данные зачастую изначально «грязные» (их трудно использовать для аналитики в их текущем состоянии), и их необходимо очистить, прежде чем они смогут принести большую пользу. Примером этого являются данные, извлеченные из Интернета, которые могут содержать странные кодировки или посторонние теги HTML.

В ходе этой лабораторной работы вы загрузите набор данных из BigQuery в виде публикаций Reddit в кластер Spark, размещенный на Dataproc, извлечете полезную информацию и сохраните обработанные данные в виде сжатых файлов CSV в облачном хранилище Google.

be2a4551ece63bfc.png

Главный специалист по данным вашей компании заинтересован в том, чтобы его команды работали над различными проблемами обработки естественного языка. В частности, их интересует анализ данных в сабреддите «r/food». Вы создадите конвейер для дампа данных, начиная с обратной засыпки с января 2017 года по август 2019 года.

4. Доступ к BigQuery через BigQuery Storage API

Извлечение данных из BigQuery с помощью метода API tabledata.list может оказаться трудоемким и неэффективным, поскольку объем данных масштабируется. Этот метод возвращает список объектов JSON и требует последовательного чтения одной страницы за раз, чтобы прочитать весь набор данных.

API хранилища BigQuery значительно улучшает доступ к данным в BigQuery с помощью протокола на основе RPC. Он поддерживает параллельное чтение и запись данных, а также различные форматы сериализации, такие как Apache Avro и Apache Arrow . На высоком уровне это приводит к значительному повышению производительности, особенно при работе с большими наборами данных.

В этой лаборатории кода вы будете использовать разъем spark-bigquery-connector для чтения и записи данных между BigQuery и Spark.

5. Создание проекта

Войдите в консоль Google Cloud Platform по адресу console.cloud.google.com и создайте новый проект:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Далее вам необходимо включить биллинг в Cloud Console, чтобы использовать ресурсы Google Cloud.

Работа с этой кодовой лабораторией не должна стоить вам больше нескольких долларов, но она может стоить больше, если вы решите использовать больше ресурсов или оставите их включенными. Последний раздел этой кодовой лаборатории проведет вас через очистку вашего проекта.

Новые пользователи Google Cloud Platform имеют право на бесплатную пробную версию стоимостью 300 долларов США .

6. Настройка среды

Теперь вы выполните настройку среды следующим образом:

  • Включение API Compute Engine, Dataproc и BigQuery Storage.
  • Настройка параметров проекта
  • Создание кластера Dataproc
  • Создание сегмента Google Cloud Storage

Включение API и настройка вашей среды

Откройте Cloud Shell, нажав кнопку в правом верхнем углу Cloud Console.

a10c47ee6ca41c54.png

После загрузки Cloud Shell выполните следующие команды, чтобы включить API Compute Engine, Dataproc и BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Установите идентификатор вашего проекта. Вы можете найти его, перейдя на страницу выбора проекта и выполнив поиск по вашему проекту. Это может не совпадать с названием вашего проекта.

e682e8227aa3c781.png

76d45fb295728542.png

Запустите следующую команду, чтобы установить идентификатор вашего проекта :

gcloud config set project <project_id>

Установите регион вашего проекта, выбрав его из списка здесь . Примером может быть us-central1 .

gcloud config set dataproc/region <region>

Выберите имя для своего кластера Dataproc и создайте для него переменную среды.

CLUSTER_NAME=<cluster_name>

Создание кластера Dataproc

Создайте кластер Dataproc, выполнив следующую команду:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Выполнение этой команды займет пару минут. Чтобы разбить команду:

Это инициирует создание кластера Dataproc с именем, которое вы указали ранее. Использование beta API позволит использовать бета-функции Dataproc, такие как Component Gateway .

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Это позволит установить тип машины, которую будут использовать ваши работники.

--worker-machine-type n1-standard-8

Это установит количество рабочих в вашем кластере.

--num-workers 8

Это установит версию образа Dataproc.

--image-version 1.5-debian

Это позволит настроить действия инициализации , которые будут использоваться в кластере. Здесь вы включаете действие инициализации пипа .

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Это метаданные, которые нужно включить в кластер. Здесь вы предоставляете метаданные для действия инициализации pip .

--metadata 'PIP_PACKAGES=google-cloud-storage'

Это позволит установить дополнительные компоненты для установки в кластере.

--optional-components=ANACONDA

Это включит шлюз компонентов, который позволит вам использовать шлюз компонентов Dataproc для просмотра распространенных пользовательских интерфейсов, таких как Zeppelin, Jupyter или Spark History.

--enable-component-gateway

Для более подробного ознакомления с Dataproc ознакомьтесь с этой кодовой лабораторией .

Создание сегмента облачного хранилища Google

Для вывода результатов вам понадобится хранилище Google Cloud Storage. Определите уникальное имя для своего сегмента и выполните следующую команду, чтобы создать новый сегмент. Имена сегментов уникальны во всех проектах Google Cloud для всех пользователей, поэтому вам, возможно, придется повторить попытку несколько раз с разными именами. Корзина успешно создана, если вы не получили ServiceException .

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Исследовательский анализ данных

Прежде чем выполнять предварительную обработку, вам следует больше узнать о природе данных, с которыми вы имеете дело. Для этого вы изучите два метода исследования данных. Сначала вы просмотрите некоторые необработанные данные с помощью веб-интерфейса BigQuery, а затем рассчитаете количество сообщений в каждом субреддите с помощью PySpark и Dataproc.

Использование веб-интерфейса BigQuery

Начните с использования веб-интерфейса BigQuery для просмотра данных. Прокрутите значок меню в Cloud Console вниз и нажмите «BigQuery», чтобы открыть веб-интерфейс BigQuery.

242a597d7045b4da.png

Затем выполните следующую команду в редакторе запросов BigQuery Web UI. Это вернет 10 полных строк данных за январь 2017 года:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Вы можете прокрутить страницу, чтобы увидеть все доступные столбцы, а также некоторые примеры. В частности, вы увидите два столбца, которые представляют текстовое содержимое каждого сообщения: «заголовок» и «собственный текст», причем последний является телом сообщения. Также обратите внимание на другие столбцы, такие как «create_utc», обозначающее время в формате UTC, в котором было опубликовано сообщение, и «subreddit», обозначающий субреддит, в котором находится сообщение.

Выполнение задания PySpark

Выполните следующие команды в Cloud Shell, чтобы клонировать репозиторий с примером кода, и перейдите в правильный каталог:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Вы можете использовать PySpark, чтобы определить количество сообщений, существующих для каждого субреддита. Вы можете открыть Cloud Editor и прочитать скрипт cloud-dataproc/codelabs/spark-bigquery перед его выполнением на следующем шаге:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Нажмите кнопку «Открыть терминал» в Cloud Editor, чтобы вернуться в Cloud Shell, и выполните следующую команду, чтобы выполнить свое первое задание PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Эта команда позволяет отправлять задания в Dataproc через API заданий. Здесь вы указываете тип задания как pyspark . Вы можете указать имя кластера, дополнительные параметры и имя файла, содержащего задание. Здесь вы предоставляете параметр --jars , который позволяет вам включить в ваше задание spark-bigquery-connector . Вы также можете установить уровни вывода журнала, используя --driver-log-levels root=FATAL который будет подавлять весь вывод журнала, за исключением ошибок. Искровые бревна имеют тенденцию быть довольно шумными.

Это займет несколько минут, и ваш окончательный результат должен выглядеть примерно так:

6c185228db47bb18.png

8. Изучение интерфейсов Dataproc и Spark.

При запуске заданий Spark в Dataproc у вас есть доступ к двум пользовательским интерфейсам для проверки состояния ваших заданий/кластеров. Первый — это пользовательский интерфейс Dataproc, который можно найти, щелкнув значок меню и прокрутив вниз до Dataproc. Здесь вы можете увидеть текущую доступную память, а также ожидающую память и количество рабочих процессов.

6f2987346d15c8e2.png

Вы также можете нажать на вкладку «Задания», чтобы просмотреть выполненные задания. Вы можете просмотреть подробную информацию о задании, например журналы и выходные данные этих заданий, щелкнув идентификатор задания для конкретного задания. 114d90129b0e4c88.png

1b2160f0f484594a.png

Вы также можете просмотреть пользовательский интерфейс Spark. На странице вакансии щелкните стрелку назад и выберите «Веб-интерфейсы». Вы должны увидеть несколько опций в разделе «Шлюз компонента». Многие из них можно включить с помощью дополнительных компонентов при настройке кластера. Для этой лабораторной работы нажмите «Сервер истории Spark».

5da7944327d193dc.png

6a349200289c69c1.pnge63b36bdc90ff610.png

Должно открыться следующее окно:

8f6786760f994fe8.png

Здесь будут отображаться все выполненные задания, и вы можете нажать на любой application_id, чтобы узнать дополнительную информацию о задании. Аналогичным образом вы можете нажать «Показать неполные приложения» в самом низу целевой страницы, чтобы просмотреть все выполняемые в данный момент задания.

9. Запуск задания обратной засыпки

Теперь вы запустите задание, которое загружает данные в память, извлекает необходимую информацию и сбрасывает выходные данные в корзину Google Cloud Storage. Вы извлекаете «заголовок», «тело» (необработанный текст) и «метку времени создания» для каждого комментария Reddit. Затем вы возьмете эти данные, преобразуете их в CSV, заархивируете и загрузите в корзину с URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Вы можете снова обратиться к облачному редактору, чтобы прочитать код cloud-dataproc/codelabs/spark-bigquery/backfill.sh который представляет собой сценарий-оболочку для выполнения кода в cloud-dataproc/codelabs/spark-bigquery/backfill.py .

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Вскоре вы должны увидеть кучу сообщений о завершении задания. Выполнение задания может занять до 15 минут. Вы также можете дважды проверить сегмент хранилища, чтобы убедиться в успешном выводе данных с помощью gsutil. Как только все задания будут выполнены, выполните следующую команду:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Вы должны увидеть следующий вывод:

a7c3c7b2e82f9fca.png

Поздравляем, вы успешно завершили резервное заполнение данных комментариев Reddit! Если вас интересует, как можно строить модели на основе этих данных, перейдите к кодовой лаборатории Spark-NLP .

10. Очистка

Чтобы избежать ненужных расходов с вашего аккаунта GCP после завершения этого краткого руководства:

  1. Удалите сегмент Cloud Storage для среды и созданный вами.
  2. Удалите среду Dataproc .

Если вы создали проект только для этой лаборатории кода, вы также можете при желании удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» .
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
  3. В поле введите идентификатор проекта и нажмите «Завершить работу» , чтобы удалить проект.

Лицензия

Эта работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.