1. Введение – Google Dataproc
Dataproc — это полностью управляемый и масштабируемый сервис для запуска Apache Spark, Apache Flink, Presto и многих других инструментов и платформ с открытым исходным кодом. Используйте Dataproc для модернизации озера данных, ETL/ELT и безопасной обработки данных в масштабе всей планеты. Dataproc также полностью интегрирован с несколькими сервисами Google Cloud, включая BigQuery , Cloud Storage , Vertex AI и Dataplex .
Dataproc доступен в трех вариантах:
- Dataproc Serverless позволяет запускать задания PySpark без необходимости настройки инфраструктуры и автоматического масштабирования. Dataproc Serverless поддерживает пакетные рабочие нагрузки и сеансы/блокноты PySpark.
- Dataproc в Google Compute Engine позволяет управлять кластером Hadoop YARN для рабочих нагрузок Spark на основе YARN в дополнение к инструментам с открытым исходным кодом, таким как Flink и Presto. Вы можете адаптировать свои облачные кластеры с любым желаемым вертикальным или горизонтальным масштабированием, включая автоматическое масштабирование.
- Dataproc в Google Kubernetes Engine позволяет вам настраивать виртуальные кластеры Dataproc в вашей инфраструктуре GKE для отправки заданий Spark, PySpark, SparkR или Spark SQL.
2. Создайте кластер Dataproc в Google Cloud VPC.
На этом этапе вы создадите кластер Dataproc в Google Cloud с помощью консоли Google Cloud.
В качестве первого шага включите API службы Dataproc на консоли. После включения найдите «Dataproc» в строке поиска и нажмите « Создать кластер» .
Выберите «Кластер в Compute Engine» , чтобы использовать виртуальные машины Google Compute Engine (GCE) в качестве базовой инфраструктуры для запуска кластеров Dataproc.
Теперь вы находитесь на странице создания кластера.
На этой странице:
- Укажите уникальное имя кластера.
- Выберите конкретный регион . Вы также можете выбрать зону, однако Dataproc предоставляет возможность автоматически выбрать ее для вас. Для этой кодовой лаборатории выберите «us-central1» и «us-central1-c»..
- Выберите тип кластера «Стандартный». Это гарантирует наличие одного главного узла.
- На вкладке «Настроить узлы» подтвердите, что количество созданных рабочих будет равно двум.
- В разделе «Настройка кластера» установите флажок « Включить шлюз компонентов». Это обеспечивает доступ к веб-интерфейсам в кластере, включая пользовательский интерфейс Spark, Yarn Node Manager и записные книжки Jupyter.
- В разделе «Дополнительные компоненты» выберите Jupyter Notebook. Это настраивает кластер с сервером ноутбуков Jupyter.
- Оставьте все остальное как есть и нажмите «Создать кластер».
Это запустит кластер Dataproc.
3. Запускаем кластер и подключаемся к нему по SSH.
Как только статус кластера изменится на «Выполняется» , щелкните имя кластера в консоли Dataproc.
Нажмите вкладку «Экземпляр виртуальной машины» , чтобы просмотреть главный узел и два рабочих узла кластера.
Нажмите SSH рядом с главным узлом, чтобы войти в главный узел.
Запустите команды hdfs, чтобы увидеть структуру каталогов.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. Веб-интерфейсы и компонентные шлюзы
В консоли кластера Dataproc щелкните имя вашего кластера, затем перейдите на вкладку ВЕБ-ИНТЕРФЕЙСЫ .
Здесь показаны доступные веб-интерфейсы, включая Jupyter . Нажмите Jupyter , чтобы открыть блокнот Jupyter. Вы можете использовать это для создания записных книжек в PySpark, хранящихся в GCS. чтобы сохранить свой блокнот в Google Cloud Storage и открыть блокнот PySpark для использования в этой лаборатории кода.
5. Мониторинг и наблюдение за работами Spark
После запуска кластера Dataproc создайте пакетное задание PySpark и отправьте его в кластер Dataproc.
Создайте корзину Google Cloud Storage (GCS) для хранения скрипта PySpark. Обязательно создайте сегмент в том же регионе, что и кластер Dataproc.
Теперь, когда сегмент GCS создан, скопируйте в него следующий файл.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
Этот скрипт создает образец DataFrame Spark и записывает его в виде таблицы Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
Отправьте этот сценарий как пакетное задание Spark в Dataproc. Нажмите «Вакансии» в левом навигационном меню, а затем нажмите «Отправить вакансию».
Укажите идентификатор вакансии и регион . Выберите свой кластер и укажите расположение GCS скопированного вами сценария Spark. Это задание будет выполняться как пакетное задание Spark в Dataproc.
В разделе «Свойства» добавьте ключ spark.submit.deployMode
и значение client
, чтобы драйвер работал на главном узле Dataproc, а не на рабочих узлах. Нажмите «Отправить» , чтобы отправить пакетное задание в Dataproc.
Сценарий Spark создаст Dataframe и запишет в таблицу Hive test_table_1
.
После успешного выполнения задания вы увидите операторы печати консоли на вкладке Мониторинг .
Теперь, когда таблица Hive создана, отправьте еще одно задание запроса Hive, чтобы выбрать содержимое таблицы и отобразить его на консоли.
Создайте еще одно задание со следующими свойствами:
Обратите внимание, что для параметра «Тип задания» установлено значение «Hive» , а типом источника запроса является «Текст запроса» . Это означает, что мы напишем весь оператор HiveQL в текстовом поле «Текст запроса» .
Отправьте задание, оставив остальные параметры по умолчанию.
Обратите внимание, как HiveQL выбирает все записи и отображает их на консоли.
6. Автомасштабирование
Автомасштабирование — это задача оценки «правильного» количества рабочих узлов кластера для рабочей нагрузки.
API Dataproc AutoscalingPolicies предоставляет механизм для автоматизации управления ресурсами кластера и обеспечивает автоматическое масштабирование рабочих виртуальных машин кластера. Политика автомасштабирования — это многоразовая конфигурация, описывающая, как должны масштабироваться работники кластера, использующие политику автомасштабирования. Он определяет границы масштабирования, частоту и агрессивность, чтобы обеспечить детальный контроль над ресурсами кластера на протяжении всего срока службы кластера.
Политики автомасштабирования Dataproc записываются с использованием файлов YAML, и эти файлы YAML либо передаются в команде CLI для создания кластера, либо выбираются из корзины GCS при создании кластера из облачной консоли.
Вот пример политики автомасштабирования Dataproc:
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. Настройте дополнительные компоненты Dataproc.
Это запустит кластер Dataproc.
При создании кластера Dataproc в кластере автоматически устанавливаются стандартные компоненты экосистемы Apache Hadoop (см. Список версий Dataproc ). При создании кластера вы можете установить в кластер дополнительные компоненты, называемые дополнительными компонентами .
При создании кластера Dataproc из консоли мы включили дополнительные компоненты и выбрали Jupyter Notebook в качестве дополнительного компонента.
8. Очистите ресурсы
Чтобы очистить кластер, нажмите «Стоп» после выбора кластера в консоли Dataproc. После остановки кластера нажмите «Удалить» , чтобы удалить кластер.
После удаления кластера Dataproc удалите сегменты GCS, в которые был скопирован код.
Чтобы очистить ресурсы и остановить нежелательное выставление счетов, кластер Dataproc необходимо сначала остановить, а затем удалить.
Прежде чем останавливать и удалять кластер, убедитесь, что все данные, записанные в хранилище HDFS, скопированы в GCS для длительного хранения.
Чтобы остановить кластер, нажмите «Стоп» .
После остановки кластера нажмите «Удалить» , чтобы удалить кластер.
В диалоговом окне подтверждения нажмите «Удалить» , чтобы удалить кластер.