Бессерверная обработка данных

1. Обзор – Google Dataproc

Dataproc — это полностью управляемый и масштабируемый сервис для запуска Apache Spark, Apache Flink, Presto и многих других инструментов и платформ с открытым исходным кодом. Используйте Dataproc для модернизации озера данных, ETL/ELT и безопасной обработки данных в масштабе всей планеты. Dataproc также полностью интегрирован с несколькими сервисами Google Cloud, включая BigQuery , Cloud Storage , Vertex AI и Dataplex .

Dataproc доступен в трех вариантах:

  • Dataproc Serverless позволяет запускать задания PySpark без необходимости настройки инфраструктуры и автоматического масштабирования. Dataproc Serverless поддерживает пакетные рабочие нагрузки и сеансы/блокноты PySpark.
  • Dataproc в Google Compute Engine позволяет управлять кластером Hadoop YARN для рабочих нагрузок Spark на основе YARN в дополнение к инструментам с открытым исходным кодом, таким как Flink и Presto. Вы можете адаптировать свои облачные кластеры с любым желаемым вертикальным или горизонтальным масштабированием, включая автоматическое масштабирование.
  • Dataproc в Google Kubernetes Engine позволяет вам настраивать виртуальные кластеры Dataproc в вашей инфраструктуре GKE для отправки заданий Spark, PySpark, SparkR или Spark SQL.

В этой лабораторной работе вы узнаете несколько различных способов использования Dataproc Serverless.

Apache Spark изначально был создан для работы в кластерах Hadoop и использовал YARN в качестве менеджера ресурсов. Для обслуживания кластеров Hadoop требуется определенный набор знаний и правильная настройка множества различных регуляторов на кластерах. Это в дополнение к отдельному набору ручек, которые Spark также требует от пользователя. Это приводит ко многим сценариям, когда разработчики тратят больше времени на настройку своей инфраструктуры вместо работы над самим кодом Spark.

Dataproc Serverless устраняет необходимость вручную настраивать кластеры Hadoop или Spark. Dataproc Serverless не работает на Hadoop и использует собственное динамическое распределение ресурсов для определения требований к ресурсам, включая автоматическое масштабирование. Небольшое подмножество свойств Spark по-прежнему можно настраивать с помощью Dataproc Serverless, однако в большинстве случаев вам не потребуется их настраивать.

2. Настройка

Вы начнете с настройки среды и ресурсов, используемых в этой лаборатории кода.

Создайте проект Google Cloud. Вы можете использовать существующий.

Откройте Cloud Shell, щелкнув его на панели инструментов Cloud Console .

ba0bb17945a73543.png

Cloud Shell предоставляет готовую к использованию среду Shell, которую вы можете использовать для этой лаборатории кода.

68c4ebd2a8539764.png

Cloud Shell по умолчанию установит имя вашего проекта. Двойная проверка, запустив echo $GOOGLE_CLOUD_PROJECT . Если вы не видите идентификатор своего проекта в выходных данных, установите его.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Укажите регион Compute Engine для своих ресурсов, например us-central1 или europe-west2 .

export REGION=<your-region>

Включить API

В лаборатории кода используются следующие API:

  • Большой запрос
  • Датапрок

Включите необходимые API. Это займет около минуты, после завершения появится сообщение об успешном завершении.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Настройка доступа к сети

Dataproc Serverless требует, чтобы частный доступ Google был включен в регионе, где вы будете запускать задания Spark, поскольку драйверы и исполнители Spark имеют только частные IP-адреса. Выполните следующую команду, чтобы включить его в подсети default .

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Вы можете проверить, включен ли частный доступ Google, с помощью следующей команды, которая выведет True или False .

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Создайте сегмент хранения

Создайте сегмент хранилища, который будет использоваться для хранения ресурсов, созданных в этой лаборатории кода.

Выберите имя для своего ведра. Имена сегментов должны быть глобально уникальными для всех пользователей .

export BUCKET=<your-bucket-name>

Создайте сегмент в регионе, в котором вы собираетесь выполнять задания Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

Вы можете видеть, что ваша корзина доступна в консоли Cloud Storage. Вы также можете запустить gsutil ls чтобы увидеть свою корзину.

Создайте сервер постоянной истории

Пользовательский интерфейс Spark предоставляет богатый набор инструментов отладки и аналитическую информацию о заданиях Spark. Чтобы просмотреть пользовательский интерфейс Spark для завершенных бессерверных заданий Dataproc, необходимо создать кластер Dataproc с одним узлом, который будет использоваться в качестве сервера постоянной истории .

Задайте имя для вашего сервера постоянной истории.

PHS_CLUSTER_NAME=my-phs

Запустите следующее.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Пользовательский интерфейс Spark и сервер постоянной истории будут более подробно рассмотрены позже в лаборатории кода.

3. Запускайте бессерверные задания Spark с помощью пакетов Dataproc.

В этом примере вы будете работать с набором данных из общедоступного набора данных Citi Bike Trips города Нью-Йорка (NYC) . NYC Citi Bikes — это платная система проката велосипедов в Нью-Йорке. Вы выполните несколько простых преобразований и распечатаете десять самых популярных идентификаторов станций Citi Bike. В этом примере также используется коннектор spark-bigquery-connector с открытым исходным кодом для беспрепятственного чтения и записи данных между Spark и BigQuery.

Клонируйте следующий репозиторий Github и cd в каталог, содержащий файл citibike.py .

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Отправьте задание в Serverless Spark с помощью Cloud SDK , доступного в Cloud Shell по умолчанию. Выполните следующую команду в своей оболочке, которая использует Cloud SDK и API Dataproc Batches для отправки бессерверных заданий Spark.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Чтобы разобрать это:

  • gcloud dataproc batches submit ссылки на Dataproc Batches API .
  • pyspark означает, что вы отправляете задание PySpark.
  • --batch — это имя задания. Если он не указан, будет использоваться случайно сгенерированный UUID.
  • --region=${REGION} — географический регион, в котором будет обрабатываться задание.
  • --deps-bucket=${BUCKET} — это место, куда загружается ваш локальный файл Python перед запуском в бессерверной среде.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar включает jar для соединителя spark-bigquery-connector в среде выполнения Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} — полное имя сервера постоянной истории. Здесь хранятся данные событий Spark (отдельно от вывода консоли), которые можно просмотреть в пользовательском интерфейсе Spark.
  • Завершающий -- означает, что все, что выходит за рамки этого, будет аргументами времени выполнения программы. В этом случае вы указываете имя своего сегмента, как того требует задание.

При отправке пакета вы увидите следующий вывод.

Batch [citibike-job] submitted.

Через пару минут вы увидите следующий вывод вместе с метаданными задания.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

В следующем разделе вы узнаете, как найти журналы этого задания.

Дополнительные возможности

Благодаря Spark Serverless у вас есть дополнительные возможности для выполнения ваших заданий.

  • Вы можете создать собственный образ Docker , на котором будет выполняться ваше задание. Это отличный способ включить дополнительные зависимости, включая библиотеки Python и R.
  • Вы можете подключить экземпляр Dataproc Metastore к своему заданию, чтобы получить доступ к метаданным Hive.
  • Для дополнительного контроля Dataproc Serverless поддерживает настройку небольшого набора свойств Spark .

4. Метрики Dataproc и наблюдаемость

В консоли Dataproc Batches перечислены все ваши бессерверные задания Dataproc. В консоли вы увидите идентификатор пакета каждого задания, местоположение, статус , время создания, прошедшее время и тип . Нажмите на идентификатор пакета вашего задания, чтобы просмотреть дополнительную информацию о нем.

На этой странице вы увидите такую ​​информацию, как «Мониторинг» , которая показывает, сколько Batch Spark Executors использовалось вашим заданием с течением времени (с указанием степени автомасштабирования).

На вкладке «Сведения» вы увидите дополнительные метаданные о задании, включая любые аргументы и параметры, отправленные вместе с заданием.

Вы также можете получить доступ ко всем журналам с этой страницы. При запуске бессерверных заданий Dataproc создаются три разных набора журналов:

  • Уровень обслуживания
  • Консольный вывод
  • Регистрация событий Spark

Уровень обслуживания включает журналы, созданные бессерверной службой Dataproc. К ним относятся такие вещи, как Dataproc Serverless, запрашивающий дополнительные процессоры для автоматического масштабирования. Вы можете просмотреть их, нажав «Просмотреть журналы» , после чего откроется Cloud Logging .

Вывод консоли можно просмотреть в разделе «Вывод». Это выходные данные, генерируемые заданием, включая метаданные, которые Spark печатает при запуске задания, или любые операторы печати, включенные в задание.

Журнал событий Spark доступен из пользовательского интерфейса Spark. Поскольку вы предоставили своему заданию Spark постоянный сервер истории, вы можете получить доступ к пользовательскому интерфейсу Spark, щелкнув «Просмотреть сервер истории Spark» , который содержит информацию о ранее запущенных вами заданиях Spark. Подробнее о пользовательском интерфейсе Spark можно узнать из официальной документации Spark .

5. Шаблоны данных: BQ -> GCS.

Шаблоны Dataproc — это инструменты с открытым исходным кодом, которые помогают еще больше упростить задачи обработки данных в облаке. Они служат оболочкой для Dataproc Serverless и включают шаблоны для многих задач импорта и экспорта данных, в том числе:

  • BigQuerytoGCS и GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC и JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS и GCStoMongo

Полный список доступен README .

В этом разделе вы будете использовать шаблоны Dataproc для экспорта данных из BigQuery в GCS .

Клонировать репозиторий

Клонируйте репо и перейдите в папку python .

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Настройка среды

Теперь вы установите переменные среды. Шаблоны Dataproc используют переменную среды GCP_PROJECT для идентификатора вашего проекта, поэтому установите ее равной GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Ваш регион должен быть установлен в среде, указанной ранее. Если нет, установите это здесь.

export REGION=<region>

Шаблоны Dataproc используют Spark-bigquery-conector для обработки заданий BigQuery и требуют, чтобы URI был включен в переменную среды JARS . Установите переменную JARS .

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Настройка параметров шаблона

Задайте имя промежуточного сегмента, который будет использовать служба.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Далее вы установите некоторые переменные, специфичные для задания. Для входной таблицы вы снова будете использовать набор данных BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Вы можете выбрать csv , parquet , avro или json . Для этой лаборатории кода выберите CSV — в следующем разделе описано, как использовать шаблоны Dataproc для преобразования типов файлов.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Установите режим вывода на overwrite . Вы можете выбирать между overwrite , append , ignore или errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Задайте выходное местоположение GCS как путь в вашем сегменте.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Запустите шаблон

Запустите шаблон BIGQUERYTOGCS указав его ниже и указав заданные вами входные параметры.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Вывод будет довольно шумным, но примерно через минуту вы увидите следующее.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Вы можете убедиться, что файлы были созданы, выполнив следующую команду.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark по умолчанию записывает в несколько файлов, в зависимости от объема данных. В этом случае вы увидите примерно 30 сгенерированных файлов. Имена выходных файлов Spark форматируются с использованием part -, за которой следует пятизначное число (обозначающее номер детали) и хэш-строка. Для больших объемов данных Spark обычно записывает в несколько файлов. Пример имени файла — part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv .

6. Шаблоны Dataproc: CSV в паркет

Теперь вы будете использовать шаблоны Dataproc для преобразования данных в GCS из одного типа файлов в другой с помощью GCSTOGCS . Этот шаблон использует SparkSQL и предоставляет возможность также отправить запрос SparkSQL для дополнительной обработки во время преобразования.

Подтвердите переменные среды

Убедитесь, что GCP_PROJECT , REGION и GCS_STAGING_BUCKET установлены из предыдущего раздела.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Установить параметры шаблона

Теперь вы установите параметры конфигурации для GCStoGCS . Начните с расположения входных файлов. Обратите внимание, что это каталог, а не конкретный файл, поскольку будут обработаны все файлы в каталоге. Установите для этого значения BIGQUERY_GCS_OUTPUT_LOCATION .

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Установите формат входного файла.

GCS_TO_GCS_INPUT_FORMAT=csv

Установите желаемый формат вывода. Вы можете выбрать паркет, json, avro или csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Установите режим вывода на overwrite . Вы можете выбирать между overwrite , append , ignore или errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Установите место вывода.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Запустите шаблон

Запустите шаблон GCStoGCS .

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Вывод будет довольно шумным, но примерно через минуту вы должны увидеть сообщение об успехе, как показано ниже.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Вы можете убедиться, что файлы были созданы, выполнив следующую команду.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

С помощью этого шаблона у вас также есть возможность предоставлять запросы SparkSQL, передавая gcs.to.gcs.temp.view.name и gcs.to.gcs.sql.query в шаблон, что позволяет выполнить запрос SparkSQL для данных перед пишу в ГКС.

7. Очистите ресурсы

Чтобы избежать ненужных расходов на вашу учетную запись GCP после завершения этой лаборатории кода:

  1. Удалите сегмент Cloud Storage для созданной вами среды.
gsutil rm -r gs://${BUCKET}
  1. Удалите кластер Dataproc, используемый для вашего сервера постоянной истории.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Удалите бессерверные задания Dataproc. Перейдите в консоль пакетной обработки , установите флажок рядом с каждым заданием, которое вы хотите удалить, и нажмите УДАЛИТЬ .

Если вы создали проект только для этой лаборатории кода, вы также можете при желании удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» .
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
  3. В поле введите идентификатор проекта и нажмите «Завершить работу», чтобы удалить проект.

8. Что дальше

Следующие ресурсы предоставляют дополнительные способы использования Serverless Spark: