Использование ноутбуков с Google Cloud Dataflow

1. Введение

Облако-Dataflow.png

Облачный поток данных Google

Последнее обновление: 5 июля 2023 г.

Что такое поток данных?

Dataflow — это управляемая служба для выполнения самых разных шаблонов обработки данных. В документации на этом сайте показано, как развернуть конвейеры пакетной и потоковой обработки данных с помощью Dataflow, включая инструкции по использованию функций службы.

Apache Beam SDK — это модель программирования с открытым исходным кодом, которая позволяет разрабатывать как пакетные, так и потоковые конвейеры. Вы создаете свои конвейеры с помощью программы Apache Beam, а затем запускаете их в службе Dataflow. Документация Apache Beam содержит подробную концептуальную информацию и справочные материалы по модели программирования Apache Beam, пакетам SDK и другим средам выполнения.

Потоковая аналитика данных с высокой скоростью

Dataflow обеспечивает быструю и упрощенную разработку конвейера потоковых данных с меньшей задержкой данных.

Упрощение операций и управления

Позвольте командам сосредоточиться на программировании, а не на управлении кластерами серверов, поскольку бессерверный подход Dataflow устраняет операционные накладные расходы при рабочих нагрузках по обработке данных.

Снижение совокупной стоимости владения

Автоматическое масштабирование ресурсов в сочетании с возможностями пакетной обработки, оптимизированными по затратам, означает, что Dataflow предлагает практически безграничные возможности для управления сезонными и скачкообразными рабочими нагрузками без перерасхода средств.

Ключевые особенности

Автоматизированное управление ресурсами и динамическая ребалансировка работы

Dataflow автоматизирует предоставление ресурсов обработки и управление ими, чтобы минимизировать задержку и максимально повысить эффективность использования, поэтому вам не нужно разворачивать экземпляры или резервировать их вручную. Разделение работы также автоматизировано и оптимизировано для динамической балансировки отстающей работы. Нет необходимости искать «горячие клавиши» или предварительно обрабатывать входные данные.

Горизонтальное автомасштабирование

Горизонтальное автоматическое масштабирование рабочих ресурсов для оптимальной пропускной способности приводит к улучшению общего соотношения цены и производительности.

Гибкое планирование ресурсов для пакетной обработки

Для обработки с гибким временем планирования заданий, например заданий в ночное время, гибкое планирование ресурсов (FlexRS) предлагает более низкую цену за пакетную обработку. Эти гибкие задания помещаются в очередь с гарантией, что они будут извлечены для выполнения в течение шести часов.

Что вы будете проводить в рамках этого

Использование интерактивного средства выполнения Apache Beam с блокнотами JupyterLab позволяет итеративно разрабатывать конвейеры, проверять граф конвейера и анализировать отдельные PCollections в рабочем процессе чтения-оценки-печати-цикла (REPL). Эти ноутбуки Apache Beam доступны через Vertex AI Workbench — управляемую службу, в которой размещаются виртуальные машины ноутбуков, на которых предварительно установлены новейшие платформы обработки данных и машинного обучения.

В этой лаборатории кода основное внимание уделяется функциональным возможностям, представленным в блокнотах Apache Beam.

Что вы узнаете

  • Как создать экземпляр блокнота
  • Создание базового конвейера
  • Чтение данных из неограниченного источника
  • Визуализация данных
  • Запуск задания потока данных из записной книжки
  • Сохранение блокнота

Что вам понадобится

  • Проект Google Cloud Platform с включенной оплатой.
  • Google Cloud Dataflow и Google Cloud PubSub включены.

2. Приступаем к настройке

  1. В Cloud Console на странице выбора проекта выберите или создайте облачный проект.

Убедитесь, что у вас включены следующие API:

  • API потока данных
  • Cloud Pub/Sub API
  • Вычислительный двигатель
  • API для ноутбуков

Вы можете убедиться в этом, посетив страницу API и сервисы.

В этом руководстве мы будем считывать данные из подписки Pub/Sub, поэтому убедитесь, что учетная запись службы Compute Engine по умолчанию имеет роль редактора, или предоставьте ей роль редактора Pub/Sub.

3. Начало работы с ноутбуками Apache Beam

Запуск экземпляра блокнотов Apache Beam

  1. Запустите Dataflow на консоли:

  1. Выберите страницу Workbench с помощью меню слева.
  2. Убедитесь, что вы находитесь на вкладке «Блокноты, управляемые пользователями» .
  3. На панели инструментов нажмите «Новый блокнот» .
  4. Выберите Apache Beam > Без графических процессоров .
  5. На странице «Новый блокнот» выберите подсеть для виртуальной машины блокнота и нажмите « Создать» .
  6. Нажмите «Открыть JupyterLab» , когда ссылка станет активной. Vertex AI Workbench создает новый экземпляр блокнота Apache Beam.

4. Создание конвейера

Создание экземпляра блокнота

Перейдите в «Файл» > «Создать» > «Блокнот» и выберите ядро ​​Apache Beam 2.47 или новее.

Начните добавлять код в свой блокнот

  • Скопируйте и вставьте код из каждого раздела в новую ячейку записной книжки.
  • Запустить ячейку

6bd3dd86cc7cf802.png

Использование интерактивного средства выполнения Apache Beam с блокнотами JupyterLab позволяет итеративно разрабатывать конвейеры, проверять граф конвейера и анализировать отдельные PCollections в рабочем процессе чтения-оценки-печати-цикла (REPL).

Apache Beam установлен на вашем экземпляре записной книжки, поэтому включите в свою записную книжку модули interactive_runner и interactive_beam .

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Если ваш блокнот использует другие службы Google, добавьте следующие операторы импорта:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Настройка параметров интерактивности

Следующее устанавливает продолжительность сбора данных на 60 секунд. Если вы хотите выполнять итерацию быстрее, установите меньшую продолжительность, например «10 секунд».

ib.options.recording_duration = '60s'

Дополнительные интерактивные параметры см. в классе active_beam.options .

Инициализируйте конвейер с помощью объекта InteractiveRunner .

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Чтение и визуализация данных

В следующем примере показан конвейер Apache Beam, который создает подписку на данную тему Pub/Sub и считывает данные из подписки.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Конвейер считает слова по окнам из источника. Он создает фиксированное окно, длительность каждого окна составляет 10 секунд.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

После того как данные разбиты на окна, слова подсчитываются по окну.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Визуализация данных

Метод show() визуализирует результирующую PCollection в блокноте.

ib.show(windowed_word_counts, include_window_info=True)

Метод show, визуализирующий PCollection в табличной форме.

Чтобы отобразить визуализацию ваших данных, передайте visualize_data=True в метод show() . Добавьте новую ячейку:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

К визуализациям можно применить несколько фильтров. Следующая визуализация позволяет фильтровать по метке и оси:

Метод show, визуализирующий PCollection как богатый набор фильтруемых элементов пользовательского интерфейса.

5. Использование кадра данных Pandas

Еще одна полезная визуализация в блокнотах Apache Beam — Pandas DataFrame . В следующем примере слова сначала преобразуются в нижний регистр, а затем вычисляет частоту каждого слова.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

Метод collect() предоставляет выходные данные в DataFrame Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Метод сбора, представляющий PCollection в кадре данных Pandas.

6. (Необязательно) Запуск заданий Dataflow из записной книжки.

  1. Для запуска заданий в Dataflow вам потребуются дополнительные разрешения. Убедитесь, что учетной записи службы Compute Engine по умолчанию назначена роль редактора, или предоставьте ей следующие роли IAM:
  • Администратор потока данных
  • Рабочий потока данных
  • Администратор хранилища и
  • Пользователь учетной записи службы (roles/iam.serviceAccountUser)

Подробнее о ролях смотрите в документации .

  1. (Необязательно) Прежде чем использовать записную книжку для запуска заданий потока данных, перезапустите ядро, повторно запустите все ячейки и проверьте выходные данные.
  2. Удалите следующие операторы импорта:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Добавьте следующий оператор импорта:
from apache_beam.runners import DataflowRunner
  1. Удалите следующую опцию продолжительности записи:
ib.options.recording_duration = '60s'
  1. Добавьте следующее в параметры конвейера. Вам нужно будет настроить местоположение облачного хранилища так, чтобы оно указывало на уже имеющуюся у вас корзину, или вы можете создать для этой цели новую корзину . Вы также можете изменить значение региона с us-central1 .
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. В конструкторе beam.Pipeline() замените InteractiveRunner на DataflowRunner . p — объект конвейера, полученный при создании вашего конвейера.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Удалите интерактивные вызовы из вашего кода. Например, удалите show() , collect() , head() , show_graph() и watch() из вашего кода.
  2. Чтобы увидеть результаты, вам необходимо добавить раковину. В предыдущем разделе мы визуализировали результаты в блокноте, но на этот раз мы выполняем задание за пределами этого блокнота — в потоке данных. Поэтому нам нужно внешнее местоположение для наших результатов. В этом примере мы запишем результаты в текстовые файлы в GCS (Google Cloud Storage). Поскольку это потоковый конвейер с оконным управлением данными, нам нужно создать по одному текстовому файлу для каждого окна. Для этого добавьте в свой конвейер следующие шаги:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Добавьте p.run() в конец кода вашего конвейера.
  2. Теперь проверьте код своего блокнота, чтобы убедиться, что вы включили все изменения. Это должно выглядеть примерно так:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Запустите клетки.
  2. Вы должны увидеть вывод, аналогичный следующему:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Чтобы проверить, выполняется ли задание, перейдите на страницу «Задания» для потока данных. Вы должны увидеть новую работу в списке. Для начала обработки данных потребуется около 5–10 минут.
  2. После обработки данных перейдите в Cloud Storage и перейдите в каталог, в котором Dataflow хранит результаты (определенный вами output_gcs_location ). Вы должны увидеть список текстовых файлов, по одному файлу в каждом окне. bfcc5ce9e46a8b14.png
  3. Загрузите файл и проверьте содержимое. Он должен содержать список слов с указанием их количества. Альтернативно, используйте интерфейс командной строки для проверки файлов. Вы можете сделать это, выполнив следующую команду в новой ячейке записной книжки:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Вы увидите аналогичный вывод:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Вот и все! Не забудьте очистить и остановить созданное вами задание (см. последний шаг этой лаборатории кода).

Пример выполнения этого преобразования в интерактивной записной книжке см. в записной книжке «Подсчет слов потока данных» в экземпляре записной книжки.

Альтернативно вы можете экспортировать записную книжку в виде исполняемого сценария, изменить созданный файл .py, выполнив предыдущие шаги, а затем развернуть свой конвейер в службе Dataflow.

7. Сохранение записной книжки

Создаваемые вами записные книжки сохраняются локально в работающем экземпляре записной книжки. Если вы сбросите или закроете экземпляр блокнота во время разработки, эти новые блокноты будут сохраняться до тех пор, пока они создаются в каталоге /home/jupyter . Однако если экземпляр записной книжки удаляется, эти записные книжки также удаляются.

Чтобы сохранить свои записные книжки для будущего использования, загрузите их локально на свою рабочую станцию, сохраните на GitHub или экспортируйте в другой формат файла.

8. Уборка

Завершив использование экземпляра блокнота Apache Beam, очистите ресурсы, созданные вами в Google Cloud, завершив экземпляр блокнота и остановив задание потоковой передачи , если вы его запускали.

Альтернативно, если вы создали проект исключительно для этой лаборатории кода, вы также можете полностью закрыть проект .