Ноутбуки Apache Spark и Jupyter в облаке Dataproc

1. Обзор

В этой лабораторной работе будет рассказано, как настроить и использовать блокноты Apache Spark и Jupyter в Cloud Dataproc.

Блокноты Jupyter широко используются для исследовательского анализа данных и построения моделей машинного обучения, поскольку они позволяют интерактивно запускать код и сразу видеть результаты.

Однако настройка и использование Apache Spark и Jupyter Notebooks может оказаться сложной задачей.

b9ed855863c57d6.png

Cloud Dataproc делает это быстро и легко, позволяя создать кластер Dataproc с Apache Spark, компонентом Jupyter и шлюзом компонентов примерно за 90 секунд.

Что вы узнаете

В этой лаборатории вы узнаете, как:

  • Создайте сегмент Google Cloud Storage для своего кластера.
  • Создайте кластер Dataproc с помощью Jupyter и Component Gateway,
  • Доступ к веб-интерфейсу JupyterLab на Dataproc.
  • Создайте блокнот, используя коннектор Spark BigQuery Storage.
  • Запуск задания Spark и отображение результатов.

Общая стоимость запуска этой лаборатории в Google Cloud составляет около 1 доллара. Полную информацию о ценах на Cloud Dataproc можно найти здесь .

2. Создание проекта

Войдите в консоль Google Cloud Platform по адресу console.cloud.google.com и создайте новый проект:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Далее вам необходимо включить биллинг в Cloud Console, чтобы использовать ресурсы Google Cloud.

Работа с этой кодовой лабораторией не должна стоить вам больше нескольких долларов, но она может стоить больше, если вы решите использовать больше ресурсов или оставите их включенными. Последний раздел этой кодовой лаборатории проведет вас через очистку вашего проекта.

Новые пользователи Google Cloud Platform имеют право на бесплатную пробную версию стоимостью 300 долларов США .

3. Настройка среды

Сначала откройте Cloud Shell, нажав кнопку в правом верхнем углу облачной консоли:

a10c47ee6ca41c54.png

После загрузки Cloud Shell выполните следующую команду, чтобы установить идентификатор проекта из предыдущего шага**:**

gcloud config set project <project_id>

Идентификатор проекта также можно найти, нажав на свой проект в левом верхнем углу облачной консоли:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Затем включите API Dataproc, Compute Engine и BigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Альтернативно это можно сделать в Cloud Console . Нажмите на значок меню в левом верхнем углу экрана.

2bfc27ef9ba2ec7d.png

Выберите Менеджер API из раскрывающегося списка.

408af5f32c4b7c25.png

Нажмите «Включить API и службы» .

a9c0e84296a7ba5b.png

Найдите и включите следующие API:

  • API вычислительного движка
  • API данных
  • API BigQuery
  • API хранилища BigQuery

4. Создайте сегмент GCS.

Создайте сегмент Google Cloud Storage в регионе, ближайшем к вашим данным, и присвойте ему уникальное имя.

Это будет использоваться для кластера Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Вы должны увидеть следующий вывод

Creating gs://<your-bucket-name>/...

5. Создайте кластер Dataproc с помощью Jupyter и Component Gateway.

Создание вашего кластера

Установите переменные env для вашего кластера

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Затем запустите эту команду gcloud, чтобы создать кластер со всеми необходимыми компонентами для работы с Jupyter в вашем кластере.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Во время создания кластера вы должны увидеть следующий вывод:

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Создание кластера займет около 90 секунд, и как только он будет готов, вы сможете получить доступ к своему кластеру из пользовательского интерфейса консоли Dataproc Cloud .

Пока вы ждете, вы можете продолжить чтение ниже, чтобы узнать больше о флагах, используемых в команде gcloud.

После создания кластера вы должны получить следующий результат:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Флаги, используемые в команде создания gcloud dataproc

Вот разбивка флагов, используемых в команде создания gcloud dataproc.

--region=${REGION}

Указывает регион и зону, в которой будет создан кластер. Список доступных регионов вы можете увидеть здесь.

--image-version=1.4

Версия образа, которая будет использоваться в вашем кластере. Список доступных версий вы можете увидеть здесь .

--bucket=${BUCKET_NAME}

Укажите сегмент Google Cloud Storage, который вы создали ранее, чтобы использовать его для кластера. Если вы не предоставите сегмент GCS, он будет создан для вас.

Здесь также будут сохраняться ваши записные книжки, даже если вы удалите кластер, поскольку сегмент GCS не будет удален.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Типы компьютеров, которые будут использоваться для вашего кластера Dataproc. Список доступных типов машин вы можете увидеть здесь .

По умолчанию создается 1 главный узел и 2 рабочих узла, если вы не установите флаг –num-workers

--optional-components=ANACONDA,JUPYTER

Установка этих значений для дополнительных компонентов приведет к установке всех необходимых библиотек для Jupyter и Anaconda (которые необходимы для ноутбуков Jupyter) в вашем кластере.

--enable-component-gateway

Включение Component Gateway создает ссылку App Engine с использованием Apache Knox и инвертирующего прокси-сервера, которая обеспечивает простой, безопасный и аутентифицированный доступ к веб-интерфейсам Jupyter и JupyterLab, что означает, что вам больше не нужно создавать туннели SSH.

Он также создаст ссылки на другие инструменты в кластере, включая диспетчер ресурсов Yarn и сервер истории Spark, которые полезны для просмотра производительности ваших заданий и моделей использования кластера.

6. Создайте блокнот Apache Spark.

Доступ к веб-интерфейсу JupyterLab

Как только кластер будет готов, вы сможете найти ссылку Component Gateway на веб-интерфейс JupyterLab, перейдя в Dataproc Clusters — Cloud console , щелкнув созданный вами кластер и перейдя на вкладку «Веб-интерфейсы».

afc40202d555de47.png

Вы заметите, что у вас есть доступ к Jupyter, который представляет собой классический интерфейс блокнота, или к JupyterLab, который описывается как пользовательский интерфейс следующего поколения для Project Jupyter.

В JupyterLab есть много замечательных новых функций пользовательского интерфейса, поэтому, если вы новичок в использовании блокнотов или ищете последние улучшения, рекомендуется использовать JupyterLab, поскольку в конечном итоге он заменит классический интерфейс Jupyter, согласно официальной документации.

Создайте блокнот с ядром Python 3.

а463623f2ebf0518.png

На вкладке средства запуска щелкните значок записной книжки Python 3, чтобы создать записную книжку с ядром Python 3 (а не ядром PySpark), которое позволит вам настроить SparkSession в записной книжке и включить в нее разъем spark-bigquery, необходимый для использования хранилища BigQuery. API .

Переименуйте блокнот

196a3276ed07e1f3.png

Щелкните правой кнопкой мыши имя записной книжки на боковой панели слева или в верхней части навигации и переименуйте записную книжку в «BigQuery Storage & Spark DataFrames.ipynb».

Запустите свой код Spark в записной книжке.

fbac38062e5bb9cf.png

В этом блокноте вы будете использовать spark-bigquery-connector , который представляет собой инструмент для чтения и записи данных между BigQuery и Spark с использованием BigQuery Storage API .

API хранилища BigQuery значительно улучшает доступ к данным в BigQuery с помощью протокола на основе RPC. Он поддерживает параллельное чтение и запись данных, а также различные форматы сериализации, такие как Apache Avro и Apache Arrow . На высоком уровне это приводит к значительному повышению производительности, особенно при работе с большими наборами данных.

В первой ячейке проверьте версию Scala вашего кластера, чтобы можно было включить правильную версию jar-коннектора spark-bigquery-connector.

Ввод [1]:

!scala -version

Выход [1]: f580e442576b8b1f.png Создайте сеанс Spark и включите пакет spark-bigquery-connector.

Если ваша версия Scala — 2.11, используйте следующий пакет.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Если ваша версия Scala — 2.12, используйте следующий пакет.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Ввод [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Включить repl.eagerEval

Это позволит выводить результаты DataFrames на каждом этапе без необходимости показа df.show(), а также улучшит форматирование вывода.

Ввод [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Чтение таблицы BigQuery в Spark DataFrame

Создайте DataFrame Spark, прочитав данные из общедоступного набора данных BigQuery. При этом используется Spark-bigquery-connector и BigQuery Storage API для загрузки данных в кластер Spark.

Создайте DataFrame Spark и загрузите данные из общедоступного набора данных BigQuery для просмотров страниц Википедии . Вы заметите, что не выполняете запрос к данным, поскольку используете разъем spark-bigquery-connector для загрузки данных в Spark, где будет происходить обработка данных. Когда этот код запускается, он фактически не загружает таблицу, поскольку это ленивая оценка в Spark, и выполнение произойдет на следующем шаге.

Ввод [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Выход [4]:

c107a33f6fc30ca.png

Выберите необходимые столбцы и примените фильтр, используяwhere where() , который является псевдонимом для filter() .

Когда этот код запускается, он запускает действие Spark, и на этом этапе данные считываются из BigQuery Storage.

Ввод [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Выход [5]:

ad363cbe510d625a.png

Группируйте по названию и упорядочивайте по просмотрам страниц, чтобы увидеть самые популярные страницы.

Ввод [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Выход [6]: f718abd05afc0f4.png

7. Используйте библиотеки построения графиков Python в блокноте.

Вы можете использовать различные библиотеки построения графиков, доступные в Python, для построения выходных данных заданий Spark.

Преобразование фрейма данных Spark в фрейм данных Pandas

Преобразуйте DataFrame Spark в DataFrame Pandas и установите дату и час в качестве индекса. Это полезно, если вы хотите работать с данными непосредственно в Python и отображать данные, используя множество доступных библиотек построения графиков Python.

Ввод [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Выход [7]:

3df2aaa2351f028d.png

Построение кадра данных Pandas

Импортируйте библиотеку matplotlib, необходимую для отображения графиков в блокноте.

Ввод [8]:

import matplotlib.pyplot as plt

Используйте функцию графика Pandas , чтобы создать линейную диаграмму из Pandas DataFrame.

Ввод [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Выход [9]: bade7042c3033594.png

Убедитесь, что записная книжка сохранена в GCS.

Теперь у вас должен быть запущен первый блокнот Jupyter в кластере Dataproc. Дайте своему блокноту имя, и он будет автоматически сохранен в сегменте GCS, который использовался при создании кластера.

Вы можете проверить это с помощью этой команды gsutil в облачной оболочке.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Вы должны увидеть следующий вывод

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Совет по оптимизации: кэшируйте данные в памяти

Могут быть сценарии, когда вам нужно, чтобы данные хранились в памяти, а не каждый раз считывались из BigQuery Storage.

Это задание прочитает данные из BigQuery и отправит фильтр в BigQuery. Затем агрегирование будет рассчитано в Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Вы можете изменить приведенное выше задание, включив в него кэш таблицы, и теперь фильтр в столбце вики будет применяться в памяти Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Затем вы можете отфильтровать другой язык вики, используя кэшированные данные вместо повторного чтения данных из хранилища BigQuery, и, следовательно, это будет работать намного быстрее.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Вы можете удалить кеш, запустив

df_wiki_all.unpersist()

9. Примеры блокнотов для большего количества вариантов использования

В репозитории Cloud Dataproc на GitHub представлены блокноты Jupyter с общими шаблонами Apache Spark для загрузки данных, сохранения данных и построения графиков с помощью различных продуктов Google Cloud Platform и инструментов с открытым исходным кодом:

10. Очистка

Чтобы избежать ненужных расходов с вашего аккаунта GCP после завершения этого краткого руководства:

  1. Удалите сегмент Cloud Storage для среды и созданный вами.
  2. Удалите среду Dataproc .

Если вы создали проект только для этой лаборатории кода, вы также можете при желании удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» .
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
  3. В поле введите идентификатор проекта и нажмите «Завершить работу» , чтобы удалить проект.

Лицензия

Эта работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.