Ноутбуки Apache Spark и Jupyter в облаке Dataproc

1. Обзор

В этой лабораторной работе мы рассмотрим, как настроить и использовать Apache Spark и блокноты Jupyter в Cloud Dataproc.

Блокноты Jupyter широко используются для разведочного анализа данных и построения моделей машинного обучения, поскольку они позволяют интерактивно запускать код и сразу же видеть результаты.

Однако настройка и использование Apache Spark и Jupyter Notebooks могут быть сложными.

b9ed855863c57d6.png

Cloud Dataproc делает это быстро и легко, позволяя создать кластер Dataproc с Apache Spark, компонентом Jupyter и Component Gateway примерно за 90 секунд.

Что вы узнаете

В этом практическом занятии вы научитесь:

  • Создайте сегмент Google Cloud Storage для вашего кластера.
  • Создайте кластер Dataproc с использованием Jupyter и Component Gateway.
  • Получите доступ к веб-интерфейсу JupyterLab на платформе Dataproc.
  • Создайте блокнот, используя коннектор Spark BigQuery Storage.
  • Запуск задания Spark и построение графика результатов.

Общая стоимость запуска этой лабораторной работы в Google Cloud составляет около 1 доллара. Подробную информацию о ценах на Cloud Dataproc можно найти здесь .

2. Создание проекта

Войдите в консоль Google Cloud Platform по адресу console.cloud.google.com и создайте новый проект:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Далее вам потребуется включить оплату в консоли Cloud, чтобы использовать ресурсы Google Cloud.

Выполнение этого практического задания не должно обойтись вам дороже нескольких долларов, но может стоить больше, если вы решите использовать дополнительные ресурсы или оставите их запущенными. В последнем разделе этого практического задания мы расскажем вам, как привести ваш проект в порядок.

Новые пользователи Google Cloud Platform могут получить бесплатную пробную версию стоимостью 300 долларов .

3. Настройка среды

Для начала откройте Cloud Shell, нажав кнопку в правом верхнем углу консоли облака:

a10c47ee6ca41c54.png

После загрузки Cloud Shell выполните следующую команду, чтобы установить идентификатор проекта , заданный на предыдущем шаге:

gcloud config set project <project_id>

Идентификатор проекта также можно найти, щелкнув по своему проекту в верхнем левом углу консоли облака:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Далее включите API-интерфейсы Dataproc, Compute Engine и BigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

В качестве альтернативы это можно сделать в облачной консоли . Щелкните значок меню в верхнем левом углу экрана.

2bfc27ef9ba2ec7d.png

Выберите «Менеджер API» из выпадающего списка.

408af5f32c4b7c25.png

Нажмите «Включить API и сервисы» .

a9c0e84296a7ba5b.png

Найдите и активируйте следующие API:

  • API вычислительного движка
  • API Dataproc
  • API BigQuery
  • API хранилища BigQuery

4. Создайте корзину GCS.

Создайте хранилище Google Cloud Storage в регионе, наиболее близком к вашим данным, и дайте ему уникальное имя.

Это будет использоваться для кластера Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Вы должны увидеть следующий результат.

Creating gs://<your-bucket-name>/...

5. Создайте кластер Dataproc с помощью Jupyter и Component Gateway.

Создание кластера

Настройте переменные окружения для вашего кластера.

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Затем выполните эту команду gcloud, чтобы создать кластер со всеми необходимыми компонентами для работы с Jupyter на вашем кластере.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

В процессе создания кластера вы должны увидеть следующий вывод.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Создание кластера займет около 90 секунд, и как только он будет готов, вы сможете получить к нему доступ через пользовательский интерфейс консоли Dataproc Cloud .

Пока вы ждете, вы можете продолжить чтение ниже, чтобы узнать больше о флагах, используемых в команде gcloud.

После создания кластера вы должны увидеть следующий результат:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Флаги, используемые в команде gcloud dataproc create

Ниже приведено описание флагов, используемых в команде gcloud dataproc create.

--region=${REGION}

Указывает регион и зону, где будет создан кластер. Список доступных регионов можно посмотреть здесь.

--image-version=1.4

Версия образа, которую следует использовать в вашем кластере. Список доступных версий можно посмотреть здесь .

--bucket=${BUCKET_NAME}

Укажите созданный ранее сегмент Google Cloud Storage, который будет использоваться для кластера. Если вы не укажете сегмент GCS, он будет создан автоматически.

Здесь же будут сохраняться ваши ноутбуки, даже если вы удалите кластер, поскольку хранилище GCS при этом не удаляется.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Типы машин, которые можно использовать для вашего кластера Dataproc. Список доступных типов машин можно посмотреть здесь .

По умолчанию создается 1 главный узел и 2 рабочих узла, если не задан флаг –num-workers.

--optional-components=ANACONDA,JUPYTER

Установка этих значений для необязательных компонентов установит все необходимые библиотеки для Jupyter и Anaconda (которая требуется для работы с блокнотами Jupyter) в вашем кластере.

--enable-component-gateway

Включение Component Gateway создает связь с App Engine с использованием Apache Knox и инвертирующего прокси, что обеспечивает простой, безопасный и аутентифицированный доступ к веб-интерфейсам Jupyter и JupyterLab, а значит, вам больше не нужно создавать SSH-туннели.

Это также создаст ссылки на другие инструменты в кластере, включая менеджер ресурсов Yarn и сервер истории Spark, которые полезны для отслеживания производительности ваших заданий и закономерностей использования кластера.

6. Создайте блокнот Apache Spark.

Доступ к веб-интерфейсу JupyterLab

После того как кластер будет готов, вы сможете найти ссылку на веб-интерфейс JupyterLab через Component Gateway, перейдя в раздел Dataproc Clusters - Cloud console , щелкнув по созданному вами кластеру и перейдя на вкладку Web Interfaces.

afc40202d555de47.png

Вы заметите, что вам доступен Jupyter, представляющий собой классический интерфейс блокнота, или JupyterLab, который описывается как пользовательский интерфейс нового поколения для проекта Jupyter.

В JupyterLab появилось множество замечательных новых функций пользовательского интерфейса, поэтому, если вы новичок в использовании ноутбуков или ищете последние улучшения, рекомендуется использовать JupyterLab, поскольку, согласно официальной документации, он в конечном итоге заменит классический интерфейс Jupyter.

Создайте ноутбук с ядром Python 3.

a463623f2ebf0518.png

На вкладке запуска щелкните значок блокнота Python 3, чтобы создать блокнот с ядром Python 3 (не ядром PySpark), что позволит вам настроить SparkSession в блокноте и включить spark-bigquery-connector, необходимый для использования API хранилища BigQuery .

Переименуйте блокнот.

196a3276ed07e1f3.png

Щелкните правой кнопкой мыши по названию блокнота на боковой панели слева или в верхней панели навигации и переименуйте блокнот в "BigQuery Storage & Spark DataFrames.ipynb".

Запустите свой код Spark в блокноте.

fbac38062e5bb9cf.png

В этом блокноте вы будете использовать spark-bigquery-connector — инструмент для чтения и записи данных между BigQuery и Spark с помощью API хранилища BigQuery .

API хранилища BigQuery значительно улучшает доступ к данным в BigQuery, используя протокол на основе RPC. Он поддерживает параллельное чтение и запись данных, а также различные форматы сериализации, такие как Apache Avro и Apache Arrow . В целом, это приводит к значительному повышению производительности, особенно при работе с большими наборами данных.

В первой ячейке проверьте версию Scala вашего кластера, чтобы включить правильную версию jar-файла spark-bigquery-connector.

Ввод [1]:

!scala -version

Вывод [1]: f580e442576b8b1f.png Создайте сессию Spark и подключите пакет spark-bigquery-connector.

Если у вас версия Scala 2.11, используйте следующий пакет.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Если у вас версия Scala 2.12, используйте следующий пакет.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Ввод [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Включить repl.eagerEval

Это позволит выводить результаты DataFrames на каждом шаге без необходимости использования метода `show()` для отображения данных, а также улучшит форматирование вывода.

Ввод [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Считывание данных из таблицы BigQuery в DataFrame Spark.

Создайте Spark DataFrame, считав данные из общедоступного набора данных BigQuery. Для этого используются spark-bigquery-connector и API хранилища BigQuery для загрузки данных в кластер Spark.

Создайте Spark DataFrame и загрузите данные из общедоступного набора данных BigQuery для просмотра страниц Википедии . Обратите внимание, что вы не выполняете запрос к данным, поскольку используете spark-bigquery-connector для загрузки данных в Spark, где будет происходить их обработка. При выполнении этого кода таблица фактически не будет загружена, так как это ленивая оценка в Spark, и выполнение произойдет на следующем шаге.

Вход [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Вывод [4]:

c107a33f6fc30ca.png

Выберите необходимые столбцы и примените фильтр, используя функцию where() , которая является псевдонимом для filter() .

При выполнении этого кода запускается действие Spark, и в этот момент данные считываются из хранилища BigQuery.

Ввод [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Вывод [5]:

ad363cbe510d625a.png

Чтобы увидеть страницы, занимающие первые места в результатах поиска, сгруппируйте их по названию и отсортируйте по количеству просмотров.

Ввод [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Вывод [6]: f718abd05afc0f4.png

7. Используйте библиотеки Python для построения графиков в блокноте.

Для построения графиков результатов выполнения заданий Spark можно использовать различные библиотеки для построения графиков, доступные в Python.

Преобразование DataFrame Spark в DataFrame Pandas

Преобразуйте DataFrame Spark в DataFrame Pandas и установите datehour в качестве индекса. Это полезно, если вы хотите работать с данными непосредственно в Python и строить графики, используя множество доступных библиотек для построения графиков в Python.

Ввод [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Вывод [7]:

3df2aaa2351f028d.png

Построение графика с помощью DataFrame Pandas

Для отображения графиков в блокноте необходимо импортировать библиотеку matplotlib.

Вход [8]:

import matplotlib.pyplot as plt

Используйте функцию plot библиотеки Pandas для создания линейного графика на основе DataFrame Pandas.

Ввод [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Вывод [9]: bade7042c3033594.png

Проверьте, сохранен ли блокнот в GCS.

Теперь ваш первый Jupyter-блокнот должен быть запущен и работать в кластере Dataproc. Присвойте блокноту имя, и он будет автоматически сохранен в хранилище GCS, использованном при создании кластера.

Это можно проверить с помощью команды gsutil в облачной оболочке.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Вы должны увидеть следующий результат.

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Совет по оптимизации — кэшируйте данные в памяти.

В некоторых ситуациях вам может потребоваться хранить данные в оперативной памяти, а не каждый раз считывать их из хранилища BigQuery.

Эта задача будет считывать данные из BigQuery и передавать фильтр в BigQuery. Затем агрегация будет вычислена в Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Вы можете изменить приведенное выше задание, добавив кэш таблицы, и теперь фильтр по столбцу wiki будет применяться в памяти Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Затем вы можете отфильтровать данные по другому языку вики, используя кэшированные данные, вместо повторного чтения данных из хранилища BigQuery, и, следовательно, это будет работать намного быстрее.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Вы можете очистить кэш, выполнив команду...

df_wiki_all.unpersist()

9. Примеры блокнотов для различных вариантов использования.

В репозитории Cloud Dataproc на GitHub представлены блокноты Jupyter, использующие распространенные шаблоны Apache Spark для загрузки, сохранения и построения графиков данных с помощью различных продуктов Google Cloud Platform и инструментов с открытым исходным кодом:

10. Уборка

Чтобы избежать ненужных расходов на ваш счет GCP после завершения этого краткого руководства:

  1. Удалите созданный вами сегмент облачного хранилища для данной среды.
  2. Удалите среду Dataproc .

Если вы создали проект специально для этого практического занятия, вы также можете при желании удалить этот проект:

  1. В консоли GCP перейдите на страницу «Проекты» .
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
  3. В поле введите идентификатор проекта, а затем нажмите «Завершить» , чтобы удалить проект.

Лицензия

Данная работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.