1. Введение

Google Cloud Dataflow
Последнее обновление: 5 июля 2023 г.
Что такое Dataflow?
Dataflow — это управляемый сервис для выполнения самых разнообразных шаблонов обработки данных. Документация на этом сайте показывает, как развернуть конвейеры пакетной и потоковой обработки данных с помощью Dataflow, включая инструкции по использованию функций сервиса.
Apache Beam SDK — это модель программирования с открытым исходным кодом, позволяющая разрабатывать как пакетные, так и потоковые конвейеры обработки данных. Вы создаете свои конвейеры с помощью программы Apache Beam, а затем запускаете их в сервисе Dataflow. Документация Apache Beam содержит подробную концептуальную информацию и справочные материалы по модели программирования Apache Beam, SDK и другим средствам запуска.
Быстрая потоковая обработка данных для анализа.
Dataflow обеспечивает быструю и упрощенную разработку конвейеров потоковой обработки данных с меньшей задержкой.
Упростите операции и управление.
Позвольте командам сосредоточиться на программировании, а не на управлении серверными кластерами, поскольку бессерверный подход Dataflow снимает операционные издержки с задач обработки данных.
Снижение общей стоимости владения
Автоматическое масштабирование ресурсов в сочетании с оптимизированными по стоимости возможностями пакетной обработки означает, что Dataflow предлагает практически неограниченные возможности для управления сезонными и пиковыми нагрузками без перерасхода средств.
Основные характеристики
Автоматизированное управление ресурсами и динамическая перебалансировка рабочей нагрузки
Dataflow автоматизирует выделение и управление вычислительными ресурсами для минимизации задержек и максимизации их использования, так что вам не нужно запускать экземпляры или резервировать их вручную. Разделение задач также автоматизировано и оптимизировано для динамического перераспределения отстающих задач. Нет необходимости искать «горячие клавиши» или предварительно обрабатывать входные данные.
Горизонтальное автоматическое масштабирование
Горизонтальное автоматическое масштабирование трудовых ресурсов для оптимизации производительности приводит к улучшению общего соотношения цены и производительности.
Гибкое ценообразование на основе планирования ресурсов для пакетной обработки
Для обработки заданий с гибким планированием времени, например, заданий, выполняемых в ночное время, гибкое планирование ресурсов (FlexRS) предлагает более низкую стоимость пакетной обработки. Эти гибкие задания помещаются в очередь с гарантией того, что они будут извлечены для выполнения в течение шестичасового окна.
Что вы будете выполнять в рамках этого
Использование интерактивного средства запуска Apache Beam с блокнотами JupyterLab позволяет итеративно разрабатывать конвейеры, анализировать граф конвейера и обрабатывать отдельные PCollections в цикле чтения-вычисления-печати (REPL). Эти блокноты Apache Beam предоставляются через Vertex AI Workbench — управляемый сервис, который размещает виртуальные машины с блокнотами, предварительно установленными с новейшими фреймворками для анализа данных и машинного обучения.
Данный практический урок посвящен функционалу, предоставляемому блокнотами Apache Beam.
Что вы узнаете
- Как создать экземпляр блокнота
- Создание базового конвейера
- Чтение данных из неограниченного источника
- Визуализация данных
- Запуск задания Dataflow из блокнота.
- Сохранение блокнота
Что вам понадобится
- Проект на платформе Google Cloud Platform с включенной функцией выставления счетов.
- Включены Google Cloud Dataflow и Google Cloud PubSub.
2. Настройка
- В консоли Cloud на странице выбора проекта выберите или создайте проект Cloud.
Убедитесь, что у вас включены следующие API:
- API потока данных
- API облачной публикации/подписки
- Вычислительный движок
- API блокнотов
Вы можете убедиться в этом, проверив информацию на странице «API и сервисы».
В этом руководстве мы будем считывать данные из подписки Pub/Sub, поэтому убедитесь, что учетная запись службы Compute Engine по умолчанию имеет роль редактора, или предоставьте ей роль редактора Pub/Sub.
3. Начало работы с блокнотами Apache Beam
Запуск экземпляра Apache Beam Notebooks
- Запустите Dataflow в консоли:
- Выберите страницу «Рабочая среда» в меню слева.
- Убедитесь, что вы находитесь на вкладке «Управляемые пользователем блокноты» .
- На панели инструментов нажмите «Создать блокнот» .
- Выберите Apache Beam > Без графических процессоров .
- На странице «Новый блокнот» выберите подсеть для виртуальной машины блокнота и нажмите «Создать» .
- Когда ссылка станет активной, нажмите «Открыть JupyterLab» . Vertex AI Workbench создаст новый экземпляр блокнота Apache Beam.
4. Создание конвейера
Создание экземпляра блокнота
Перейдите в меню Файл > Создать > Блокнот и выберите ядро Apache Beam версии 2.47 или более поздней.
Начните добавлять код в свой блокнот.
- Скопируйте и вставьте код из каждого раздела в новую ячейку вашей записной книжки.
- Запустите ячейку

Использование интерактивного средства запуска Apache Beam с блокнотами JupyterLab позволяет итеративно разрабатывать конвейеры, анализировать граф конвейера и разбирать отдельные PCollections в цикле чтения-вычисления-печати (REPL).
Apache Beam установлен на вашем экземпляре ноутбука, поэтому добавьте модули interactive_runner и interactive_beam в свой ноутбук.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
Если ваш блокнот использует другие сервисы Google, добавьте следующие операторы импорта:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
Настройка параметров интерактивности
Следующая настройка устанавливает продолжительность сбора данных на 60 секунд. Если вы хотите ускорить итерации, установите меньшую продолжительность, например, «10 секунд».
ib.options.recording_duration = '60s'
Дополнительные интерактивные параметры см. в классе interactive_beam.options .
Инициализируйте конвейер с помощью объекта InteractiveRunner .
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
Чтение и визуализация данных
В следующем примере показан конвейер Apache Beam, который создает подписку на заданную тему Pub/Sub и считывает данные из этой подписки.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
Конвейер обработки данных подсчитывает слова по окнам из источника. Он создает фиксированные окна, каждое из которых длится 10 секунд.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
После обработки данных с помощью оконного метода, количество слов подсчитывается для каждого окна.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
Визуализация данных
Метод show() визуализирует полученную коллекцию PCollection в блокноте.
ib.show(windowed_word_counts, include_window_info=True)

Для отображения визуализаций ваших данных передайте visualize_data=True в метод show() . Добавьте новую ячейку:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
К визуализациям можно применять несколько фильтров. Следующая визуализация позволяет фильтровать данные по меткам и осям:

5. Использование DataFrame Pandas
Еще один полезный инструмент визуализации в блокнотах Apache Beam — это DataFrame Pandas . В следующем примере слова сначала преобразуются в нижний регистр, а затем вычисляется частота каждого слова.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
Метод collect() выводит результат в виде DataFrame Pandas.
ib.collect(windowed_lower_word_counts, include_window_info=True)

6. (Необязательно) Запуск заданий Dataflow из вашего блокнота.
- Для запуска заданий в Dataflow требуются дополнительные разрешения. Убедитесь, что учетная запись службы Compute Engine по умолчанию имеет роль Editor, или предоставьте ей следующие роли IAM:
- Администратор Dataflow
- Рабочий поток данных
- Администратор хранилища и
- Пользователь служебной учетной записи (roles/iam.serviceAccountUser)
Более подробную информацию о ролях см. в документации .
- (Необязательно) Перед использованием ноутбука для запуска заданий Dataflow перезапустите ядро, повторно запустите все ячейки и проверьте результат.
- Удалите следующие операторы импорта:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- Добавьте следующую инструкцию импорта:
from apache_beam.runners import DataflowRunner
- Удалите следующую опцию продолжительности записи:
ib.options.recording_duration = '60s'
- Добавьте следующие параметры в настройки конвейера. Вам потребуется изменить местоположение облачного хранилища, указав на уже имеющийся у вас сегмент, или создать для этой цели новый сегмент . Вы также можете изменить значение региона с
us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- В конструкторе метода
beam.Pipeline()заменитеInteractiveRunnerнаDataflowRunner.p— это объект конвейера, полученный при создании вашего конвейера.
p = beam.Pipeline(DataflowRunner(), options=options)
- Удалите интерактивные вызовы из вашего кода. Например, удалите функции
show(),collect(),head(),show_graph()иwatch()из вашего кода. - Чтобы увидеть результаты, вам потребуется добавить приемник данных. В предыдущем разделе мы визуализировали результаты в блокноте, но на этот раз мы запускаем задачу вне этого блокнота — в Dataflow. Поэтому нам нужно внешнее хранилище для результатов. В этом примере мы будем записывать результаты в текстовые файлы в GCS (Google Cloud Storage). Поскольку это потоковый конвейер с оконной обработкой данных, нам нужно будет создать один текстовый файл для каждого окна. Для этого добавьте следующие шаги в свой конвейер:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- Добавьте
p.run()в конец кода конвейера. - Теперь проверьте код вашего блокнота, чтобы убедиться, что вы внесли все изменения. Он должен выглядеть примерно так:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- Запустите ячейки.
- Вы должны увидеть результат, похожий на следующий:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- Чтобы проверить, запущено ли задание, перейдите на страницу «Задания» в Dataflow. Вы должны увидеть новое задание в списке. Запуск задания займет примерно 5-10 минут для начала обработки данных.
- После начала обработки данных перейдите в Cloud Storage и выберите каталог, где Dataflow хранит результаты (указанный вами
output_gcs_location). Вы увидите список текстовых файлов, по одному файлу в каждом окне.
- Загрузите файл и изучите его содержимое. Он должен содержать список слов с указанием их количества. В качестве альтернативы, вы можете использовать интерфейс командной строки для проверки файлов. Это можно сделать, запустив следующую команду в новой ячейке вашего блокнота:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- Вы увидите примерно такой результат:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- Вот и всё! Не забудьте очистить и остановить созданную вами задачу (см. заключительный шаг этого практического задания).
Пример выполнения этого преобразования в интерактивном блокноте см. в блокноте «Подсчет слов в Dataflow» в вашем экземпляре блокнота.
В качестве альтернативы вы можете экспортировать свой блокнот в виде исполняемого скрипта, изменить сгенерированный файл .py, используя описанные выше шаги, а затем развернуть свой конвейер в службе Dataflow.
7. Сохранение вашего блокнота
Созданные вами блокноты сохраняются локально в работающем экземпляре блокнота. Если вы перезагрузите или выключите экземпляр блокнота во время разработки, эти новые блокноты будут сохраняться до тех пор, пока они созданы в каталоге /home/jupyter . Однако, если экземпляр блокнота будет удален, эти блокноты также будут удалены.
Чтобы сохранить свои блокноты для дальнейшего использования, загрузите их локально на свой рабочий компьютер, сохраните их на GitHub или экспортируйте в другой формат файла.
8. Уборка
После завершения работы с экземпляром блокнота Apache Beam очистите созданные вами ресурсы в Google Cloud, выключив экземпляр блокнота и остановив задачу потоковой передачи , если она была запущена.
В качестве альтернативы, если вы создали проект исключительно для целей этого практического занятия, вы также можете полностью закрыть этот проект .