1. Обзор - Google Dataproc
Dataproc — это полностью управляемый и масштабируемый сервис для запуска Apache Spark, Apache Flink, Presto и многих других инструментов и фреймворков с открытым исходным кодом. Используйте Dataproc для модернизации озер данных, ETL/ELT и безопасной обработки данных в глобальном масштабе. Dataproc также полностью интегрирован с несколькими сервисами Google Cloud, включая BigQuery , Cloud Storage , Vertex AI и Dataplex .
Dataproc доступен в трех вариантах:
- Dataproc Serverless позволяет запускать задания PySpark без необходимости настройки инфраструктуры и автомасштабирования. Dataproc Serverless поддерживает пакетные рабочие нагрузки PySpark, а также сессии/блокноты.
- Dataproc на Google Compute Engine позволяет управлять кластером Hadoop YARN для рабочих нагрузок Spark на основе YARN, а также использовать инструменты с открытым исходным кодом, такие как Flink и Presto. Вы можете настраивать свои облачные кластеры с любым желаемым вертикальным или горизонтальным масштабированием, включая автомасштабирование.
- Dataproc в Google Kubernetes Engine позволяет настраивать виртуальные кластеры Dataproc в вашей инфраструктуре GKE для отправки заданий Spark, PySpark, SparkR или Spark SQL.
В этом практическом занятии вы узнаете несколько различных способов использования Dataproc Serverless.
Apache Spark изначально создавался для работы на кластерах Hadoop и использовал YARN в качестве менеджера ресурсов. Поддержание кластеров Hadoop требует определённых знаний и обеспечения правильной настройки множества различных параметров кластера. Помимо этого, Spark также требует от пользователя самостоятельной настройки ещё нескольких параметров. Это приводит к тому, что разработчики тратят больше времени на настройку своей инфраструктуры, чем на работу над самим кодом Spark.
Dataproc Serverless избавляет от необходимости вручную настраивать кластеры Hadoop или Spark. Dataproc Serverless не работает на Hadoop и использует собственное динамическое распределение ресурсов для определения своих потребностей в ресурсах, включая автомасштабирование. Небольшое количество свойств Spark по-прежнему можно настраивать в Dataproc Serverless, однако в большинстве случаев вам не потребуется их изменять.
2. Настройка
Для начала вам потребуется настроить среду и ресурсы, используемые в этом практическом занятии.
Создайте проект в Google Cloud. Вы можете использовать уже существующий.
Откройте Cloud Shell, щелкнув по нему на панели инструментов Cloud Console .

Cloud Shell предоставляет готовую к использованию среду командной строки, которую вы можете использовать для выполнения этого практического задания.

Cloud Shell по умолчанию задаст имя вашего проекта. Проверьте это, выполнив echo $GOOGLE_CLOUD_PROJECT . Если в выводе не отображается идентификатор вашего проекта, задайте его.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Укажите регион Compute Engine для ваших ресурсов, например, us-central1 или europe-west2 .
export REGION=<your-region>
Включить API
В практическом занятии используются следующие API:
- BigQuery
- Dataproc
Включите необходимые API. Это займет около минуты, и по завершении появится сообщение об успешном выполнении.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Настройка доступа к сети
Для работы Dataproc Serverless требуется включить Google Private Access в регионе, где будут запускаться задания Spark, поскольку драйверы и исполнители Spark имеют только частные IP-адреса. Выполните следующую команду, чтобы включить его в подсети default .
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
Проверить, включен ли Google Private Access, можно следующим способом, который выдаст результат True или False .
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Создайте хранилище (сумку для хранения).
Создайте хранилище (storage bucket), которое будет использоваться для хранения ресурсов, созданных в ходе этого практического занятия.
Выберите название для своего хранилища. Названия хранилищ должны быть уникальными для всех пользователей .
export BUCKET=<your-bucket-name>
Создайте хранилище (bucket) в регионе, где вы планируете запускать задания Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Вы можете убедиться, что ваш бакет доступен в консоли Cloud Storage. Также вы можете запустить gsutil ls , чтобы увидеть свой бакет.
Создайте постоянный сервер истории.
Пользовательский интерфейс Spark предоставляет богатый набор инструментов отладки и аналитической информации о заданиях Spark. Чтобы просмотреть пользовательский интерфейс Spark для завершенных заданий Dataproc Serverless, необходимо создать одноузловой кластер Dataproc, который будет использоваться в качестве сервера постоянной истории .
Задайте имя для сервера сохранения истории.
PHS_CLUSTER_NAME=my-phs
Выполните следующие действия.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Более подробно интерфейс пользователя Spark и сервер постоянной истории будут рассмотрены позже в ходе практического занятия.
3. Запуск бессерверных заданий Spark с помощью Dataproc Packages
В этом примере вы будете работать с набором данных из общедоступного набора данных о поездках на велосипедах Citi Bike в Нью-Йорке (NYC) . Citi Bikes — это платная система проката велосипедов в Нью-Йорке. Вы выполните несколько простых преобразований и выведете десять самых популярных идентификаторов станций Citi Bike. В этом примере также используется открытый исходный код spark-bigquery-connector для бесперебойного чтения и записи данных между Spark и BigQuery.
Клонируйте следующий репозиторий Github и cd в директорию, содержащую файл citibike.py .
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Отправьте задание в Serverless Spark, используя Cloud SDK , доступный по умолчанию в Cloud Shell. Выполните следующую команду в вашей оболочке, которая использует Cloud SDK и API Dataproc Batches для отправки заданий Serverless Spark.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Чтобы разобрать это по пунктам:
-
gcloud dataproc batches submitиспользует API Dataproc Batches . -
pysparkозначает, что вы отправляете задание PySpark. -
--batch— это имя задания. Если оно не указано, будет использован случайно сгенерированный UUID. -
--region=${REGION}— это географический регион, в котором будет обрабатываться задание. -
--deps-bucket=${BUCKET}— это путь, куда загружается ваш локальный файл Python перед запуском в бессерверной среде. -
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarвключает JAR-файл для spark-bigquery-connector в среде выполнения Spark. -
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}— это полное имя сервера постоянной истории. Здесь хранятся данные событий Spark (отдельно от вывода в консоль), которые можно просмотреть в пользовательском интерфейсе Spark. - Символ
--в конце означает, что все, что находится за ним, будет аргументами программы во время выполнения. В данном случае вы указываете имя вашего хранилища, как того требует задание.
После отправки пакета вы увидите следующий результат.
Batch [citibike-job] submitted.
Через пару минут вы увидите следующий результат вместе с метаданными задания.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
В следующем разделе вы узнаете, как найти журналы выполнения этой задачи.
Дополнительные функции
С помощью Spark Serverless у вас появляются дополнительные возможности для запуска заданий.
- Вы можете создать собственный образ Docker , на котором будет выполняться ваша задача. Это отличный способ добавить дополнительные зависимости, включая библиотеки Python и R.
- Для доступа к метаданным Hive вы можете подключить экземпляр Dataproc Metastore к своему заданию.
- Для дополнительного контроля Dataproc Serverless поддерживает настройку небольшого набора свойств Spark .
4. Метрики и наблюдаемость Dataproc
В консоли Dataproc Batches отображаются все ваши задания Dataproc Serverless. В консоли вы увидите идентификатор пакета (Batch ID), местоположение (Location), статус (Status) , время создания (Creation time), затраченное время (Elapsed time) и тип (Type ) каждого задания. Щелкните идентификатор пакета вашего задания, чтобы просмотреть дополнительную информацию о нем.
На этой странице вы увидите такую информацию, как «Мониторинг» , которая показывает, сколько исполнителей Batch Spark использовало ваше задание с течением времени (указывая на степень его автоматического масштабирования).
На вкладке «Подробности» вы увидите дополнительные метаданные о задании, включая любые аргументы и параметры, которые были переданы вместе с заданием.
На этой странице вы также можете получить доступ ко всем журналам. При выполнении заданий Dataproc Serverless генерируются три разных набора журналов:
- Уровень обслуживания
- Вывод консоли
- Журналирование событий Spark
Журналы на уровне сервиса включают записи, созданные службой Dataproc Serverless. К ним относятся, например, запросы Dataproc Serverless на дополнительные ЦП для автомасштабирования. Просмотреть их можно, нажав кнопку «Просмотреть журналы» , которая откроет раздел «Облачное логирование» .
Вывод консоли можно просмотреть в разделе «Вывод». Это вывод, сгенерированный заданием, включая метаданные, которые Spark выводит при запуске задания, или любые операторы печати, включенные в задание.
Журналирование событий Spark доступно через пользовательский интерфейс Spark. Поскольку вы указали для своего задания Spark постоянный сервер истории, вы можете получить доступ к пользовательскому интерфейсу Spark, нажав кнопку «Просмотреть сервер истории Spark» , которая содержит информацию о ранее запущенных заданиях Spark. Более подробную информацию о пользовательском интерфейсе Spark можно найти в официальной документации Spark .
5. Шаблоны Dataproc: BQ -> GCS
Шаблоны Dataproc — это инструменты с открытым исходным кодом, которые помогают еще больше упростить задачи обработки данных в облаке. Они служат оболочкой для Dataproc Serverless и включают шаблоны для множества задач импорта и экспорта данных, в том числе:
-
BigQuerytoGCSиGCStoBigQuery -
GCStoBigTable -
GCStoJDBCиJDBCtoGCS -
HivetoBigQuery -
MongotoGCSиGCStoMongo
Полный список доступен в файле README .
В этом разделе вы будете использовать шаблоны Dataproc для экспорта данных из BigQuery в GCS .
Клонируйте репозиторий
Клонируйте репозиторий и перейдите в папку python .
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Настройте среду
Теперь вам нужно будет установить переменные среды. Шаблоны Dataproc используют переменную среды GCP_PROJECT для идентификатора вашего проекта, поэтому установите для нее значение GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Ваш регион должен быть указан в настройках среды, заданных ранее. Если нет, укажите его здесь.
export REGION=<region>
Шаблоны Dataproc используют spark-bigquery-conector для обработки заданий BigQuery и требуют, чтобы URI был включен в переменную среды JARS . Установите переменную JARS .
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Настройка параметров шаблона
Укажите имя временного хранилища (staging bucket), которое будет использоваться сервисом.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Далее вам нужно будет задать несколько переменных, специфичных для данной задачи. В качестве входной таблицы вы снова будете использовать набор данных BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Вы можете выбрать формат csv , parquet , avro или json . Для этого практического занятия выберите CSV — в следующем разделе мы расскажем, как использовать шаблоны Dataproc для преобразования типов файлов.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Установите режим вывода на overwrite . Вы можете выбрать один из следующих вариантов: overwrite , append , ignore или errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Укажите путь к выходным данным GCS в вашем хранилище.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Запустите шаблон
Запустите шаблон BIGQUERYTOGCS , указав его ниже и предоставив заданные вами входные параметры.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Выходной сигнал будет довольно шумным, но примерно через минуту вы увидите следующее.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Вы можете убедиться в том, что файлы были сгенерированы, выполнив следующую команду.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
По умолчанию Spark записывает данные в несколько файлов, в зависимости от объема данных. В данном случае вы увидите примерно 30 сгенерированных файлов. Имена выходных файлов Spark форматируются следующим образом: part -, за которым следует пятизначное число (указывающее номер детали) и строка хеша. Для больших объемов данных Spark обычно записывает данные в несколько файлов. Пример имени файла: part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv .
6. Шаблоны Dataproc: преобразование CSV в parquet.
Теперь вы будете использовать шаблоны Dataproc для преобразования данных в GCS из одного типа файла в другой с помощью GCSTOGCS . Этот шаблон использует SparkSQL и предоставляет возможность также отправить запрос SparkSQL для обработки во время преобразования для дополнительной обработки.
Подтвердите переменные среды.
Убедитесь, что GCP_PROJECT , REGION и GCS_STAGING_BUCKET установлены в предыдущем разделе.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Задайте параметры шаблона
Теперь вам нужно настроить параметры конфигурации для GCStoGCS . Начните с указания расположения входных файлов. Обратите внимание, что это каталог, а не конкретный файл, поскольку будут обрабатываться все файлы в этом каталоге. Установите значение параметра BIGQUERY_GCS_OUTPUT_LOCATION .
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Укажите формат входного файла.
GCS_TO_GCS_INPUT_FORMAT=csv
Укажите желаемый формат вывода. Вы можете выбрать parquet, json, avro или csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Установите режим вывода на overwrite . Вы можете выбрать один из следующих вариантов: overwrite , append , ignore или errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Укажите место сохранения файла.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Запустите шаблон
Запустите шаблон GCStoGCS .
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Выходные данные будут довольно шумными, но примерно через минуту вы должны увидеть сообщение об успешном завершении, как показано ниже.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Вы можете убедиться в том, что файлы были сгенерированы, выполнив следующую команду.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
С помощью этого шаблона у вас также есть возможность задавать запросы SparkSQL, передавая в шаблон gcs.to.gcs.temp.view.name и gcs.to.gcs.sql.query , что позволяет выполнить запрос SparkSQL к данным перед записью в GCS.
7. Очистка ресурсов
Чтобы избежать ненужных расходов на ваш счет GCP после завершения этого практического занятия:
- Удалите сегмент облачного хранилища для созданной вами среды.
gsutil rm -r gs://${BUCKET}
- Удалите кластер Dataproc, используемый в качестве сервера постоянной истории.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Удалите задания Dataproc Serverless. Перейдите в консоль пакетной обработки , установите флажок рядом с каждым заданием, которое вы хотите удалить, и нажмите кнопку УДАЛИТЬ .
Если вы создали проект специально для этого практического занятия, вы также можете при желании удалить этот проект:
- В консоли GCP перейдите на страницу «Проекты» .
- В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
- В поле введите идентификатор проекта, а затем нажмите «Завершить», чтобы удалить проект.
8. Что дальше?
Следующие ресурсы предоставляют дополнительные возможности для использования Serverless Spark:
- Узнайте, как управлять бессерверными рабочими процессами Dataproc с помощью Cloud Composer .
- Узнайте, как интегрировать Dataproc Serverless с конвейерами Kubeflow .