Использование ноутбуков с Google Cloud Dataflow

1. Введение

Cloud-Dataflow.png

Google Cloud Dataflow

Последнее обновление: 5 июля 2023 г.

Что такое Dataflow?

Dataflow — это управляемый сервис для выполнения самых разнообразных шаблонов обработки данных. Документация на этом сайте показывает, как развернуть конвейеры пакетной и потоковой обработки данных с помощью Dataflow, включая инструкции по использованию функций сервиса.

Apache Beam SDK — это модель программирования с открытым исходным кодом, позволяющая разрабатывать как пакетные, так и потоковые конвейеры обработки данных. Вы создаете свои конвейеры с помощью программы Apache Beam, а затем запускаете их в сервисе Dataflow. Документация Apache Beam содержит подробную концептуальную информацию и справочные материалы по модели программирования Apache Beam, SDK и другим средствам запуска.

Быстрая потоковая обработка данных для анализа.

Dataflow обеспечивает быструю и упрощенную разработку конвейеров потоковой обработки данных с меньшей задержкой.

Упростите операции и управление.

Позвольте командам сосредоточиться на программировании, а не на управлении серверными кластерами, поскольку бессерверный подход Dataflow снимает операционные издержки с задач обработки данных.

Снижение общей стоимости владения

Автоматическое масштабирование ресурсов в сочетании с оптимизированными по стоимости возможностями пакетной обработки означает, что Dataflow предлагает практически неограниченные возможности для управления сезонными и пиковыми нагрузками без перерасхода средств.

Основные характеристики

Автоматизированное управление ресурсами и динамическая перебалансировка рабочей нагрузки

Dataflow автоматизирует выделение и управление вычислительными ресурсами для минимизации задержек и максимизации их использования, так что вам не нужно запускать экземпляры или резервировать их вручную. Разделение задач также автоматизировано и оптимизировано для динамического перераспределения отстающих задач. Нет необходимости искать «горячие клавиши» или предварительно обрабатывать входные данные.

Горизонтальное автоматическое масштабирование

Горизонтальное автоматическое масштабирование трудовых ресурсов для оптимизации производительности приводит к улучшению общего соотношения цены и производительности.

Гибкое ценообразование на основе планирования ресурсов для пакетной обработки

Для обработки заданий с гибким планированием времени, например, заданий, выполняемых в ночное время, гибкое планирование ресурсов (FlexRS) предлагает более низкую стоимость пакетной обработки. Эти гибкие задания помещаются в очередь с гарантией того, что они будут извлечены для выполнения в течение шестичасового окна.

Что вы будете выполнять в рамках этого

Использование интерактивного средства запуска Apache Beam с блокнотами JupyterLab позволяет итеративно разрабатывать конвейеры, анализировать граф конвейера и обрабатывать отдельные PCollections в цикле чтения-вычисления-печати (REPL). Эти блокноты Apache Beam предоставляются через Vertex AI Workbench — управляемый сервис, который размещает виртуальные машины с блокнотами, предварительно установленными с новейшими фреймворками для анализа данных и машинного обучения.

Данный практический урок посвящен функционалу, предоставляемому блокнотами Apache Beam.

Что вы узнаете

  • Как создать экземпляр блокнота
  • Создание базового конвейера
  • Чтение данных из неограниченного источника
  • Визуализация данных
  • Запуск задания Dataflow из блокнота.
  • Сохранение блокнота

Что вам понадобится

  • Проект на платформе Google Cloud Platform с включенной функцией выставления счетов.
  • Включены Google Cloud Dataflow и Google Cloud PubSub.

2. Настройка

  1. В консоли Cloud на странице выбора проекта выберите или создайте проект Cloud.

Убедитесь, что у вас включены следующие API:

  • API потока данных
  • API облачной публикации/подписки
  • Вычислительный движок
  • API блокнотов

Вы можете убедиться в этом, проверив информацию на странице «API и сервисы».

В этом руководстве мы будем считывать данные из подписки Pub/Sub, поэтому убедитесь, что учетная запись службы Compute Engine по умолчанию имеет роль редактора, или предоставьте ей роль редактора Pub/Sub.

3. Начало работы с блокнотами Apache Beam

Запуск экземпляра Apache Beam Notebooks

  1. Запустите Dataflow в консоли:

  1. Выберите страницу «Рабочая среда» в меню слева.
  2. Убедитесь, что вы находитесь на вкладке «Управляемые пользователем блокноты» .
  3. На панели инструментов нажмите «Создать блокнот» .
  4. Выберите Apache Beam > Без графических процессоров .
  5. На странице «Новый блокнот» выберите подсеть для виртуальной машины блокнота и нажмите «Создать» .
  6. Когда ссылка станет активной, нажмите «Открыть JupyterLab» . Vertex AI Workbench создаст новый экземпляр блокнота Apache Beam.

4. Создание конвейера

Создание экземпляра блокнота

Перейдите в меню Файл > Создать > Блокнот и выберите ядро ​​Apache Beam версии 2.47 или более поздней.

Начните добавлять код в свой блокнот.

  • Скопируйте и вставьте код из каждого раздела в новую ячейку вашей записной книжки.
  • Запустите ячейку

6bd3dd86cc7cf802.png

Использование интерактивного средства запуска Apache Beam с блокнотами JupyterLab позволяет итеративно разрабатывать конвейеры, анализировать граф конвейера и разбирать отдельные PCollections в цикле чтения-вычисления-печати (REPL).

Apache Beam установлен на вашем экземпляре ноутбука, поэтому добавьте модули interactive_runner и interactive_beam в свой ноутбук.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Если ваш блокнот использует другие сервисы Google, добавьте следующие операторы импорта:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Настройка параметров интерактивности

Следующая настройка устанавливает продолжительность сбора данных на 60 секунд. Если вы хотите ускорить итерации, установите меньшую продолжительность, например, «10 секунд».

ib.options.recording_duration = '60s'

Дополнительные интерактивные параметры см. в классе interactive_beam.options .

Инициализируйте конвейер с помощью объекта InteractiveRunner .

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Чтение и визуализация данных

В следующем примере показан конвейер Apache Beam, который создает подписку на заданную тему Pub/Sub и считывает данные из этой подписки.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Конвейер обработки данных подсчитывает слова по окнам из источника. Он создает фиксированные окна, каждое из которых длится 10 секунд.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

После обработки данных с помощью оконного метода, количество слов подсчитывается для каждого окна.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Визуализация данных

Метод show() визуализирует полученную коллекцию PCollection в блокноте.

ib.show(windowed_word_counts, include_window_info=True)

Метод show отображает объект PCollection в табличной форме.

Для отображения визуализаций ваших данных передайте visualize_data=True в метод show() . Добавьте новую ячейку:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

К визуализациям можно применять несколько фильтров. Следующая визуализация позволяет фильтровать данные по меткам и осям:

Метод show визуализирует PCollection как богатый набор фильтруемых элементов пользовательского интерфейса.

5. Использование DataFrame Pandas

Еще один полезный инструмент визуализации в блокнотах Apache Beam — это DataFrame Pandas . В следующем примере слова сначала преобразуются в нижний регистр, а затем вычисляется частота каждого слова.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

Метод collect() выводит результат в виде DataFrame Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Метод `collect` представляет собой объект `PCollection` в DataFrame Pandas.

6. (Необязательно) Запуск заданий Dataflow из вашего блокнота.

  1. Для запуска заданий в Dataflow требуются дополнительные разрешения. Убедитесь, что учетная запись службы Compute Engine по умолчанию имеет роль Editor, или предоставьте ей следующие роли IAM:
  • Администратор Dataflow
  • Рабочий поток данных
  • Администратор хранилища и
  • Пользователь служебной учетной записи (roles/iam.serviceAccountUser)

Более подробную информацию о ролях см. в документации .

  1. (Необязательно) Перед использованием ноутбука для запуска заданий Dataflow перезапустите ядро, повторно запустите все ячейки и проверьте результат.
  2. Удалите следующие операторы импорта:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Добавьте следующую инструкцию импорта:
from apache_beam.runners import DataflowRunner
  1. Удалите следующую опцию продолжительности записи:
ib.options.recording_duration = '60s'
  1. Добавьте следующие параметры в настройки конвейера. Вам потребуется изменить местоположение облачного хранилища, указав на уже имеющийся у вас сегмент, или создать для этой цели новый сегмент . Вы также можете изменить значение региона с us-central1 .
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. В конструкторе метода beam.Pipeline() замените InteractiveRunner на DataflowRunner . p — это объект конвейера, полученный при создании вашего конвейера.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Удалите интерактивные вызовы из вашего кода. Например, удалите функции show() , collect() , head() , show_graph() и watch() из вашего кода.
  2. Чтобы увидеть результаты, вам потребуется добавить приемник данных. В предыдущем разделе мы визуализировали результаты в блокноте, но на этот раз мы запускаем задачу вне этого блокнота — в Dataflow. Поэтому нам нужно внешнее хранилище для результатов. В этом примере мы будем записывать результаты в текстовые файлы в GCS (Google Cloud Storage). Поскольку это потоковый конвейер с оконной обработкой данных, нам нужно будет создать один текстовый файл для каждого окна. Для этого добавьте следующие шаги в свой конвейер:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Добавьте p.run() в конец кода конвейера.
  2. Теперь проверьте код вашего блокнота, чтобы убедиться, что вы внесли все изменения. Он должен выглядеть примерно так:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Запустите ячейки.
  2. Вы должны увидеть результат, похожий на следующий:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Чтобы проверить, запущено ли задание, перейдите на страницу «Задания» в Dataflow. Вы должны увидеть новое задание в списке. Запуск задания займет примерно 5-10 минут для начала обработки данных.
  2. После начала обработки данных перейдите в Cloud Storage и выберите каталог, где Dataflow хранит результаты (указанный вами output_gcs_location ). Вы увидите список текстовых файлов, по одному файлу в каждом окне. bfcc5ce9e46a8b14.png
  3. Загрузите файл и изучите его содержимое. Он должен содержать список слов с указанием их количества. В качестве альтернативы, вы можете использовать интерфейс командной строки для проверки файлов. Это можно сделать, запустив следующую команду в новой ячейке вашего блокнота:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Вы увидите примерно такой результат:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Вот и всё! Не забудьте очистить и остановить созданную вами задачу (см. заключительный шаг этого практического задания).

Пример выполнения этого преобразования в интерактивном блокноте см. в блокноте «Подсчет слов в Dataflow» в вашем экземпляре блокнота.

В качестве альтернативы вы можете экспортировать свой блокнот в виде исполняемого скрипта, изменить сгенерированный файл .py, используя описанные выше шаги, а затем развернуть свой конвейер в службе Dataflow.

7. Сохранение вашего блокнота

Созданные вами блокноты сохраняются локально в работающем экземпляре блокнота. Если вы перезагрузите или выключите экземпляр блокнота во время разработки, эти новые блокноты будут сохраняться до тех пор, пока они созданы в каталоге /home/jupyter . Однако, если экземпляр блокнота будет удален, эти блокноты также будут удалены.

Чтобы сохранить свои блокноты для дальнейшего использования, загрузите их локально на свой рабочий компьютер, сохраните их на GitHub или экспортируйте в другой формат файла.

8. Уборка

После завершения работы с экземпляром блокнота Apache Beam очистите созданные вами ресурсы в Google Cloud, выключив экземпляр блокнота и остановив задачу потоковой передачи , если она была запущена.

В качестве альтернативы, если вы создали проект исключительно для целей этого практического занятия, вы также можете полностью закрыть этот проект .