1. Обзор

Что такое Dataflow?
Dataflow — это управляемый сервис для выполнения самых разнообразных шаблонов обработки данных. Документация на этом сайте показывает, как развернуть конвейеры пакетной и потоковой обработки данных с помощью Dataflow, включая инструкции по использованию функций сервиса.
Apache Beam SDK — это модель программирования с открытым исходным кодом, позволяющая разрабатывать как пакетные, так и потоковые конвейеры обработки данных. Вы создаете свои конвейеры с помощью программы Apache Beam, а затем запускаете их в сервисе Dataflow. Документация Apache Beam содержит подробную концептуальную информацию и справочные материалы по модели программирования Apache Beam, SDK и другим средствам запуска.
Быстрая потоковая обработка данных для анализа.
Dataflow обеспечивает быструю и упрощенную разработку конвейеров потоковой обработки данных с меньшей задержкой.
Упростите операции и управление.
Позвольте командам сосредоточиться на программировании, а не на управлении серверными кластерами, поскольку бессерверный подход Dataflow снимает операционные издержки с задач обработки данных.
Снижение общей стоимости владения
Автоматическое масштабирование ресурсов в сочетании с оптимизированными по стоимости возможностями пакетной обработки означает, что Dataflow предлагает практически неограниченные возможности для управления сезонными и пиковыми нагрузками без перерасхода средств.
Основные характеристики
Автоматизированное управление ресурсами и динамическая перебалансировка рабочей нагрузки
Dataflow автоматизирует выделение и управление вычислительными ресурсами для минимизации задержек и максимизации их использования, так что вам не нужно запускать экземпляры или резервировать их вручную. Разделение задач также автоматизировано и оптимизировано для динамического перераспределения отстающих задач. Нет необходимости искать «горячие клавиши» или предварительно обрабатывать входные данные.
Горизонтальное автоматическое масштабирование
Горизонтальное автоматическое масштабирование трудовых ресурсов для оптимизации производительности приводит к улучшению общего соотношения цены и производительности.
Гибкое ценообразование на основе планирования ресурсов для пакетной обработки
Для обработки заданий с гибким планированием времени, например, заданий, выполняемых в ночное время, гибкое планирование ресурсов (FlexRS) предлагает более низкую стоимость пакетной обработки. Эти гибкие задания помещаются в очередь с гарантией того, что они будут извлечены для выполнения в течение шестичасового окна.
Данный учебный материал адаптирован из https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
Что вы узнаете
- Как создать проект Maven с использованием Apache Beam и Java SDK
- Запустите пример конвейера обработки данных с помощью консоли Google Cloud Platform.
- Как удалить связанный с Cloud Storage сегмент и его содержимое
Что вам понадобится
Как вы будете использовать этот учебный материал?
Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?
2. Настройка и требования
Настройка среды для самостоятельного обучения
- Войдите в Cloud Console и создайте новый проект или используйте существующий. (Если у вас еще нет учетной записи Gmail или G Suite, вам необходимо ее создать .)
Запомните идентификатор проекта (Project ID) — уникальное имя для всех проектов Google Cloud (указанное выше имя уже занято и вам не подойдёт, извините!). В дальнейшем в этом практическом занятии оно будет обозначаться как PROJECT_ID .
- Далее вам потребуется включить оплату в Cloud Console, чтобы использовать ресурсы Google Cloud.
Выполнение этого практического задания не должно стоить дорого, если вообще что-либо. Обязательно следуйте инструкциям в разделе «Очистка», где указано, как отключить ресурсы, чтобы избежать дополнительных расходов после завершения этого урока. Новые пользователи Google Cloud имеют право на бесплатную пробную версию стоимостью 300 долларов США .
Включите API
Нажмите на значок меню в верхнем левом углу экрана.

Выберите API и сервисы > Панель управления из выпадающего списка.

Выберите и включите API и сервисы.

Введите в поле поиска "Compute Engine". В появившемся списке результатов нажмите на "Compute Engine API".

На странице Google Compute Engine нажмите «Включить».

После включения нажмите стрелку, чтобы вернуться назад.
Теперь найдите следующие API и включите их тоже:
- Облачный поток данных
- Stackdriver
- Облачное хранилище
- Облачное хранилище JSON
- BigQuery
- Облачная публикация/подписка
- Облачное хранилище данных
- API Cloud Resource Manager
3. Создайте новый сегмент облачного хранилища.
В консоли Google Cloud Platform нажмите значок меню в левом верхнем углу экрана:

Прокрутите вниз и выберите «Облачное хранилище» > «Браузер» в подразделе «Хранилище» :

Теперь вы должны увидеть браузер облачного хранилища, и, если вы используете проект, в котором в данный момент нет ни одного сегмента облачного хранилища, вы увидите предложение создать новый сегмент. Нажмите кнопку «Создать сегмент» , чтобы создать его:

Введите имя для вашего хранилища. Как указано в диалоговом окне, имена хранилищ должны быть уникальными во всем Cloud Storage. Поэтому, если вы выберете очевидное имя, например, «test», вы, вероятно, обнаружите, что кто-то уже создал хранилище с таким именем, и получите ошибку.
Существуют также некоторые правила относительно того, какие символы разрешены в названиях хранилищ. Если название вашего хранилища начинается и заканчивается буквой или цифрой, и вы используете только дефисы посередине, то все будет в порядке. Если вы попытаетесь использовать специальные символы или попытаетесь начать или закончить название хранилища чем-либо, кроме буквы или цифры, диалоговое окно напомнит вам о правилах.

Введите уникальное имя для вашего хранилища и нажмите « Создать» . Если вы выберете уже используемое имя, вы увидите сообщение об ошибке, показанное выше. После успешного создания хранилища вы будете перенаправлены на страницу вашего нового, пустого хранилища в браузере:

Название ведра, которое вы увидите, разумеется, будет разным, поскольку оно должно быть уникальным для всех проектов.
4. Запустите Cloud Shell
Активировать Cloud Shell
- В консоли Cloud нажмите «Активировать Cloud Shell» .
.
Если вы никогда раньше не запускали Cloud Shell, вам будет показан промежуточный экран (внизу), описывающий его назначение. В этом случае нажмите «Продолжить» (и вы больше никогда его не увидите). Вот как выглядит этот одноразовый экран:
Подготовка и подключение к Cloud Shell займут всего несколько минут.
Эта виртуальная машина оснащена всеми необходимыми инструментами разработки. Она предоставляет постоянный домашний каталог размером 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Большая часть, если не вся, работа в этом практическом задании может быть выполнена с помощью обычного браузера или вашего Chromebook.
После подключения к Cloud Shell вы увидите, что ваша аутентификация пройдена и что проект уже настроен на ваш идентификатор проекта.
- Выполните следующую команду в Cloud Shell, чтобы подтвердить свою аутентификацию:
gcloud auth list
вывод команды
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
вывод команды
[core] project = <PROJECT_ID>
Если это не так, вы можете установить это с помощью следующей команды:
gcloud config set project <PROJECT_ID>
вывод команды
Updated property [core/project].
5. Создайте проект Maven.
После запуска Cloud Shell давайте начнём с создания проекта Maven с использованием Java SDK для Apache Beam.
Apache Beam — это модель программирования с открытым исходным кодом для конвейеров обработки данных. Вы определяете эти конвейеры с помощью программы Apache Beam и можете выбрать средство выполнения, например Dataflow, для запуска вашего конвейера.
Выполните команду mvn archetype:generate в вашей оболочке следующим образом:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
После выполнения команды вы должны увидеть новую директорию с именем first-dataflow в текущей директории. first-dataflow содержит проект Maven, включающий SDK Cloud Dataflow для Java и примеры конвейеров обработки данных.
6. Запустите конвейер обработки текста в Cloud Dataflow.
Начнём с сохранения идентификатора проекта и названий сегментов Cloud Storage в качестве переменных среды. Это можно сделать в Cloud Shell. Обязательно замените <your_project_id> на идентификатор вашего проекта.
export PROJECT_ID=<your_project_id>
Теперь мы сделаем то же самое для сегмента Cloud Storage. Не забудьте заменить <your_bucket_name> на уникальное имя, которое вы использовали при создании сегмента на предыдущем шаге.
export BUCKET_NAME=<your_bucket_name>
Перейдите в каталог first-dataflow/ .
cd first-dataflow
Мы собираемся запустить конвейер обработки текста под названием WordCount, который считывает текст, разбивает текстовые строки на отдельные слова и подсчитывает частоту встречаемости каждого из этих слов. Сначала мы запустим конвейер, а во время его работы посмотрим, что происходит на каждом этапе.
Запустите конвейер, выполнив команду mvn compile exec:java в командной оболочке или терминале. Для аргументов --project, --stagingLocation, и --output приведенная ниже команда использует переменные среды, которые вы настроили ранее на этом шаге.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Пока выполняется задание, давайте найдем его в списке заданий.
Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform . Вы должны увидеть свою задачу подсчета слов со статусом «Выполняется» :

Теперь давайте рассмотрим параметры конвейера. Для начала щелкните по названию вашего задания:

При выборе задания можно просмотреть граф выполнения . Граф выполнения конвейера отображает каждое преобразование в конвейере в виде прямоугольника, содержащего имя преобразования и некоторую информацию о состоянии. Для просмотра более подробной информации можно щелкнуть по значку каретки в правом верхнем углу каждого шага:

Давайте посмотрим, как конвейер преобразует данные на каждом этапе:
- Чтение : На этом этапе конвейер считывает данные из входного источника. В данном случае это текстовый файл из облачного хранилища, содержащий полный текст пьесы Шекспира «Король Лир» . Наш конвейер считывает файл построчно и выводит каждую строку в виде
PCollection, где каждая строка в текстовом файле является элементом коллекции. - CountWords : Шаг
CountWordsсостоит из двух частей. Во-первых, он использует параллельную функцию do (ParDo) с именемExtractWordsдля токенизации каждой строки на отдельные слова. Результатом работы ExtractWords является новая коллекция PCollection, где каждый элемент — это слово. Следующий шаг,Count, использует преобразование, предоставляемое Java SDK, которое возвращает пары ключ-значение, где ключ — это уникальное слово, а значение — количество его вхождений. Вот метод, реализующийCountWords, а полный файл WordCount.java можно посмотреть на GitHub :
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements : Эта функция вызывает функцию
FormatAsTextFn, приведенную ниже, которая форматирует каждую пару ключ-значение в строку, пригодную для печати.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts : На этом шаге мы записываем строки, пригодные для печати, в несколько разделенных текстовых файлов.
Через несколько минут мы взглянем на результаты работы конвейера.
Теперь взгляните на страницу информации о задании справа от графика, которая включает параметры конвейера, которые мы указали в команде mvn compile exec:java .


Также можно просмотреть пользовательские счетчики для конвейера, которые в данном случае показывают, сколько пустых строк было обнаружено на данный момент во время выполнения. Вы можете добавить новые счетчики в свой конвейер для отслеживания метрик, специфичных для приложения.

Чтобы просмотреть конкретные сообщения об ошибках, вы можете щелкнуть значок «Журналы» в нижней части консоли.

По умолчанию на панели отображаются сообщения журнала заданий, сообщающие о состоянии задания в целом. Вы можете использовать селектор «Минимальный уровень серьезности» для фильтрации сообщений о ходе выполнения и состоянии задания.

Выбор этапа конвейера на графике изменяет представление на журналы, сгенерированные вашим кодом, и на журналы кода, выполняющегося на этом этапе конвейера.
Чтобы вернуться к журналу заданий, снимите выделение с шага, щелкнув за пределами графика или используя кнопку «Закрыть» на правой боковой панели.
На вкладке «Журналы» вы можете использовать кнопку «Журналы рабочих процессов» , чтобы просмотреть журналы рабочих процессов для экземпляров Compute Engine, на которых выполняется ваш конвейер. Журналы рабочих процессов состоят из строк логов, сгенерированных вашим кодом и кодом, сгенерированным Dataflow для его выполнения.
Если вы пытаетесь отладить сбой в конвейере, зачастую в журналах рабочих процессов (Worker Logs) содержится дополнительная информация, которая помогает решить проблему. Имейте в виду, что эти журналы агрегируются по всем рабочим процессам и могут быть отфильтрованы и найдены.

Интерфейс мониторинга потоков данных отображает только самые последние сообщения журнала. Вы можете просмотреть все журналы, щелкнув ссылку Google Cloud Observability в правой части панели журналов.

Ниже приведено краткое описание различных типов журналов, доступных для просмотра на странице «Мониторинг → Журналы»:
- Журналы сообщений заданий содержат сообщения уровня задания, генерируемые различными компонентами Dataflow. Примеры включают конфигурацию автомасштабирования, запуск или остановку рабочих процессов, ход выполнения шага задания и ошибки задания. Ошибки уровня рабочих процессов, возникающие из-за сбоев пользовательского кода и присутствующие в журналах рабочих процессов , также распространяются в журналы сообщений заданий .
- Журналы рабочих процессов создаются рабочими процессами Dataflow. Рабочие процессы выполняют большую часть работы в конвейере (например, применяют ваши ParDos к данным). Журналы рабочих процессов содержат сообщения, регистрируемые вашим кодом и Dataflow.
- Журналы запуска рабочих процессов присутствуют в большинстве заданий Dataflow и могут содержать сообщения, связанные с процессом запуска. Процесс запуска включает в себя загрузку JAR-файлов задания из облачного хранилища, а затем запуск рабочих процессов. Если возникают проблемы с запуском рабочих процессов, эти журналы — хорошее место для поиска информации.
- Журналы перемешивания содержат сообщения от рабочих процессов, которые объединяют результаты параллельных операций конвейера.
- В логах Docker и Kubelet содержатся сообщения, относящиеся к этим общедоступным технологиям, которые используются на рабочих процессах Dataflow.
На следующем этапе мы проверим, успешно ли выполнена ваша задача.
7. Убедитесь, что ваша работа выполнена успешно.
Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform .
Вначале задание подсчета слов должно отображаться со статусом « Выполняется» , а затем — «Успешно завершено» .

Выполнение задачи займет приблизительно 3-4 минуты.
Помните, когда вы запускали конвейер и указывали выходной сегмент? Давайте посмотрим на результат (ведь вам наверняка интересно, сколько раз встречалось каждое слово в «Короле Лире» ?!). Вернитесь в браузер Cloud Storage в консоли Google Cloud Platform. В вашем сегменте вы должны увидеть выходные файлы и файлы промежуточного хранения, созданные вашим заданием:

8. Отключите свои ресурсы.
Вы можете отключить свои ресурсы через консоль Google Cloud Platform .
Откройте браузер Cloud Storage в консоли Google Cloud Platform.

Установите флажок рядом с созданным вами сегментом и нажмите кнопку УДАЛИТЬ , чтобы навсегда удалить сегмент и его содержимое.


9. Поздравляем!
Вы научились создавать проект Maven с помощью Cloud Dataflow SDK, запускать пример конвейера с использованием консоли Google Cloud Platform и удалять связанный сегмент Cloud Storage и его содержимое.
Узнать больше
- Документация по Dataflow: https://cloud.google.com/dataflow/docs/
Лицензия
Данная работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.