Запустите конвейер обработки больших данных в Cloud Dataflow.

1. Обзор

Cloud-Dataflow.png

Что такое Dataflow?

Dataflow — это управляемый сервис для выполнения самых разнообразных шаблонов обработки данных. Документация на этом сайте показывает, как развернуть конвейеры пакетной и потоковой обработки данных с помощью Dataflow, включая инструкции по использованию функций сервиса.

Apache Beam SDK — это модель программирования с открытым исходным кодом, позволяющая разрабатывать как пакетные, так и потоковые конвейеры обработки данных. Вы создаете свои конвейеры с помощью программы Apache Beam, а затем запускаете их в сервисе Dataflow. Документация Apache Beam содержит подробную концептуальную информацию и справочные материалы по модели программирования Apache Beam, SDK и другим средствам запуска.

Быстрая потоковая обработка данных для анализа.

Dataflow обеспечивает быструю и упрощенную разработку конвейеров потоковой обработки данных с меньшей задержкой.

Упростите операции и управление.

Позвольте командам сосредоточиться на программировании, а не на управлении серверными кластерами, поскольку бессерверный подход Dataflow снимает операционные издержки с задач обработки данных.

Снижение общей стоимости владения

Автоматическое масштабирование ресурсов в сочетании с оптимизированными по стоимости возможностями пакетной обработки означает, что Dataflow предлагает практически неограниченные возможности для управления сезонными и пиковыми нагрузками без перерасхода средств.

Основные характеристики

Автоматизированное управление ресурсами и динамическая перебалансировка рабочей нагрузки

Dataflow автоматизирует выделение и управление вычислительными ресурсами для минимизации задержек и максимизации их использования, так что вам не нужно запускать экземпляры или резервировать их вручную. Разделение задач также автоматизировано и оптимизировано для динамического перераспределения отстающих задач. Нет необходимости искать «горячие клавиши» или предварительно обрабатывать входные данные.

Горизонтальное автоматическое масштабирование

Горизонтальное автоматическое масштабирование трудовых ресурсов для оптимизации производительности приводит к улучшению общего соотношения цены и производительности.

Гибкое ценообразование на основе планирования ресурсов для пакетной обработки

Для обработки заданий с гибким планированием времени, например, заданий, выполняемых в ночное время, гибкое планирование ресурсов (FlexRS) предлагает более низкую стоимость пакетной обработки. Эти гибкие задания помещаются в очередь с гарантией того, что они будут извлечены для выполнения в течение шестичасового окна.

Данный учебный материал адаптирован из https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

Что вы узнаете

  • Как создать проект Maven с использованием Apache Beam и Java SDK
  • Запустите пример конвейера обработки данных с помощью консоли Google Cloud Platform.
  • Как удалить связанный с Cloud Storage сегмент и его содержимое

Что вам понадобится

Как вы будете использовать этот учебный материал?

Прочитайте только от начала до конца. Прочитайте текст и выполните упражнения.

Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?

Новичок Средний Профессионал

2. Настройка и требования

Настройка среды для самостоятельного обучения

  1. Войдите в Cloud Console и создайте новый проект или используйте существующий. (Если у вас еще нет учетной записи Gmail или G Suite, вам необходимо ее создать .)

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Запомните идентификатор проекта (Project ID) — уникальное имя для всех проектов Google Cloud (указанное выше имя уже занято и вам не подойдёт, извините!). В дальнейшем в этом практическом занятии оно будет обозначаться как PROJECT_ID .

  1. Далее вам потребуется включить оплату в Cloud Console, чтобы использовать ресурсы Google Cloud.

Выполнение этого практического задания не должно стоить дорого, если вообще что-либо. Обязательно следуйте инструкциям в разделе «Очистка», где указано, как отключить ресурсы, чтобы избежать дополнительных расходов после завершения этого урока. Новые пользователи Google Cloud имеют право на бесплатную пробную версию стоимостью 300 долларов США .

Включите API

Нажмите на значок меню в верхнем левом углу экрана.

2bfc27ef9ba2ec7d.png

Выберите API и сервисы > Панель управления из выпадающего списка.

5b65523a6cc0afa6.png

Выберите и включите API и сервисы.

81ed72192c0edd96.png

Введите в поле поиска "Compute Engine". В появившемся списке результатов нажмите на "Compute Engine API".

3f201e991c7b4527.png

На странице Google Compute Engine нажмите «Включить».

ac121653277fa7bb.png

После включения нажмите стрелку, чтобы вернуться назад.

Теперь найдите следующие API и включите их тоже:

  • Облачный поток данных
  • Stackdriver
  • Облачное хранилище
  • Облачное хранилище JSON
  • BigQuery
  • Облачная публикация/подписка
  • Облачное хранилище данных
  • API Cloud Resource Manager

3. Создайте новый сегмент облачного хранилища.

В консоли Google Cloud Platform нажмите значок меню в левом верхнем углу экрана:

2bfc27ef9ba2ec7d.png

Прокрутите вниз и выберите «Облачное хранилище» > «Браузер» в подразделе «Хранилище» :

2b6c3a2a92b47015.png

Теперь вы должны увидеть браузер облачного хранилища, и, если вы используете проект, в котором в данный момент нет ни одного сегмента облачного хранилища, вы увидите предложение создать новый сегмент. Нажмите кнопку «Создать сегмент» , чтобы создать его:

a711016d5a99dc37.png

Введите имя для вашего хранилища. Как указано в диалоговом окне, имена хранилищ должны быть уникальными во всем Cloud Storage. Поэтому, если вы выберете очевидное имя, например, «test», вы, вероятно, обнаружите, что кто-то уже создал хранилище с таким именем, и получите ошибку.

Существуют также некоторые правила относительно того, какие символы разрешены в названиях хранилищ. Если название вашего хранилища начинается и заканчивается буквой или цифрой, и вы используете только дефисы посередине, то все будет в порядке. Если вы попытаетесь использовать специальные символы или попытаетесь начать или закончить название хранилища чем-либо, кроме буквы или цифры, диалоговое окно напомнит вам о правилах.

3a5458648cfe3358.png

Введите уникальное имя для вашего хранилища и нажмите « Создать» . Если вы выберете уже используемое имя, вы увидите сообщение об ошибке, показанное выше. После успешного создания хранилища вы будете перенаправлены на страницу вашего нового, пустого хранилища в браузере:

3bda986ae88c4e71.png

Название ведра, которое вы увидите, разумеется, будет разным, поскольку оно должно быть уникальным для всех проектов.

4. Запустите Cloud Shell

Активировать Cloud Shell

  1. В консоли Cloud нажмите «Активировать Cloud Shell» . H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ .

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Если вы никогда раньше не запускали Cloud Shell, вам будет показан промежуточный экран (внизу), описывающий его назначение. В этом случае нажмите «Продолжить» (и вы больше никогда его не увидите). Вот как выглядит этот одноразовый экран:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Подготовка и подключение к Cloud Shell займут всего несколько минут.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Эта виртуальная машина оснащена всеми необходимыми инструментами разработки. Она предоставляет постоянный домашний каталог размером 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Большая часть, если не вся, работа в этом практическом задании может быть выполнена с помощью обычного браузера или вашего Chromebook.

После подключения к Cloud Shell вы увидите, что ваша аутентификация пройдена и что проект уже настроен на ваш идентификатор проекта.

  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить свою аутентификацию:
gcloud auth list

вывод команды

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

вывод команды

[core]
project = <PROJECT_ID>

Если это не так, вы можете установить это с помощью следующей команды:

gcloud config set project <PROJECT_ID>

вывод команды

Updated property [core/project].

5. Создайте проект Maven.

После запуска Cloud Shell давайте начнём с создания проекта Maven с использованием Java SDK для Apache Beam.

Apache Beam — это модель программирования с открытым исходным кодом для конвейеров обработки данных. Вы определяете эти конвейеры с помощью программы Apache Beam и можете выбрать средство выполнения, например Dataflow, для запуска вашего конвейера.

Выполните команду mvn archetype:generate в вашей оболочке следующим образом:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

После выполнения команды вы должны увидеть новую директорию с именем first-dataflow в текущей директории. first-dataflow содержит проект Maven, включающий SDK Cloud Dataflow для Java и примеры конвейеров обработки данных.

6. Запустите конвейер обработки текста в Cloud Dataflow.

Начнём с сохранения идентификатора проекта и названий сегментов Cloud Storage в качестве переменных среды. Это можно сделать в Cloud Shell. Обязательно замените <your_project_id> на идентификатор вашего проекта.

 export PROJECT_ID=<your_project_id>

Теперь мы сделаем то же самое для сегмента Cloud Storage. Не забудьте заменить <your_bucket_name> на уникальное имя, которое вы использовали при создании сегмента на предыдущем шаге.

 export BUCKET_NAME=<your_bucket_name>

Перейдите в каталог first-dataflow/ .

 cd first-dataflow

Мы собираемся запустить конвейер обработки текста под названием WordCount, который считывает текст, разбивает текстовые строки на отдельные слова и подсчитывает частоту встречаемости каждого из этих слов. Сначала мы запустим конвейер, а во время его работы посмотрим, что происходит на каждом этапе.

Запустите конвейер, выполнив команду mvn compile exec:java в командной оболочке или терминале. Для аргументов --project, --stagingLocation, и --output приведенная ниже команда использует переменные среды, которые вы настроили ранее на этом шаге.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Пока выполняется задание, давайте найдем его в списке заданий.

Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform . Вы должны увидеть свою задачу подсчета слов со статусом «Выполняется» :

3623be74922e3209.png

Теперь давайте рассмотрим параметры конвейера. Для начала щелкните по названию вашего задания:

816d8f59c72797d7.png

При выборе задания можно просмотреть граф выполнения . Граф выполнения конвейера отображает каждое преобразование в конвейере в виде прямоугольника, содержащего имя преобразования и некоторую информацию о состоянии. Для просмотра более подробной информации можно щелкнуть по значку каретки в правом верхнем углу каждого шага:

80a972dd19a6f1eb.png

Давайте посмотрим, как конвейер преобразует данные на каждом этапе:

  • Чтение : На этом этапе конвейер считывает данные из входного источника. В данном случае это текстовый файл из облачного хранилища, содержащий полный текст пьесы Шекспира «Король Лир» . Наш конвейер считывает файл построчно и выводит каждую строку в виде PCollection , где каждая строка в текстовом файле является элементом коллекции.
  • CountWords : Шаг CountWords состоит из двух частей. Во-первых, он использует параллельную функцию do (ParDo) с именем ExtractWords для токенизации каждой строки на отдельные слова. Результатом работы ExtractWords является новая коллекция PCollection, где каждый элемент — это слово. Следующий шаг, Count , использует преобразование, предоставляемое Java SDK, которое возвращает пары ключ-значение, где ключ — это уникальное слово, а значение — количество его вхождений. Вот метод, реализующий CountWords , а полный файл WordCount.java можно посмотреть на GitHub :
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements : Эта функция вызывает функцию FormatAsTextFn , приведенную ниже, которая форматирует каждую пару ключ-значение в строку, пригодную для печати.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts : На этом шаге мы записываем строки, пригодные для печати, в несколько разделенных текстовых файлов.

Через несколько минут мы взглянем на результаты работы конвейера.

Теперь взгляните на страницу информации о задании справа от графика, которая включает параметры конвейера, которые мы указали в команде mvn compile exec:java .

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Также можно просмотреть пользовательские счетчики для конвейера, которые в данном случае показывают, сколько пустых строк было обнаружено на данный момент во время выполнения. Вы можете добавить новые счетчики в свой конвейер для отслеживания метрик, специфичных для приложения.

a2e2800e2c6893f8.png

Чтобы просмотреть конкретные сообщения об ошибках, вы можете щелкнуть значок «Журналы» в нижней части консоли.

23c64138a1027f8.png

По умолчанию на панели отображаются сообщения журнала заданий, сообщающие о состоянии задания в целом. Вы можете использовать селектор «Минимальный уровень серьезности» для фильтрации сообщений о ходе выполнения и состоянии задания.

94ba42015fdafbe2.png

Выбор этапа конвейера на графике изменяет представление на журналы, сгенерированные вашим кодом, и на журналы кода, выполняющегося на этом этапе конвейера.

Чтобы вернуться к журналу заданий, снимите выделение с шага, щелкнув за пределами графика или используя кнопку «Закрыть» на правой боковой панели.

На вкладке «Журналы» вы можете использовать кнопку «Журналы рабочих процессов» , чтобы просмотреть журналы рабочих процессов для экземпляров Compute Engine, на которых выполняется ваш конвейер. Журналы рабочих процессов состоят из строк логов, сгенерированных вашим кодом и кодом, сгенерированным Dataflow для его выполнения.

Если вы пытаетесь отладить сбой в конвейере, зачастую в журналах рабочих процессов (Worker Logs) содержится дополнительная информация, которая помогает решить проблему. Имейте в виду, что эти журналы агрегируются по всем рабочим процессам и могут быть отфильтрованы и найдены.

5a53c244f28d5478.png

Интерфейс мониторинга потоков данных отображает только самые последние сообщения журнала. Вы можете просмотреть все журналы, щелкнув ссылку Google Cloud Observability в правой части панели журналов.

2bc704a4d6529b31.png

Ниже приведено краткое описание различных типов журналов, доступных для просмотра на странице «Мониторинг → Журналы»:

  • Журналы сообщений заданий содержат сообщения уровня задания, генерируемые различными компонентами Dataflow. Примеры включают конфигурацию автомасштабирования, запуск или остановку рабочих процессов, ход выполнения шага задания и ошибки задания. Ошибки уровня рабочих процессов, возникающие из-за сбоев пользовательского кода и присутствующие в журналах рабочих процессов , также распространяются в журналы сообщений заданий .
  • Журналы рабочих процессов создаются рабочими процессами Dataflow. Рабочие процессы выполняют большую часть работы в конвейере (например, применяют ваши ParDos к данным). Журналы рабочих процессов содержат сообщения, регистрируемые вашим кодом и Dataflow.
  • Журналы запуска рабочих процессов присутствуют в большинстве заданий Dataflow и могут содержать сообщения, связанные с процессом запуска. Процесс запуска включает в себя загрузку JAR-файлов задания из облачного хранилища, а затем запуск рабочих процессов. Если возникают проблемы с запуском рабочих процессов, эти журналы — хорошее место для поиска информации.
  • Журналы перемешивания содержат сообщения от рабочих процессов, которые объединяют результаты параллельных операций конвейера.
  • В логах Docker и Kubelet содержатся сообщения, относящиеся к этим общедоступным технологиям, которые используются на рабочих процессах Dataflow.

На следующем этапе мы проверим, успешно ли выполнена ваша задача.

7. Убедитесь, что ваша работа выполнена успешно.

Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform .

Вначале задание подсчета слов должно отображаться со статусом « Выполняется» , а затем — «Успешно завершено» .

4c408162416d03a2.png

Выполнение задачи займет приблизительно 3-4 минуты.

Помните, когда вы запускали конвейер и указывали выходной сегмент? Давайте посмотрим на результат (ведь вам наверняка интересно, сколько раз встречалось каждое слово в «Короле Лире» ?!). Вернитесь в браузер Cloud Storage в консоли Google Cloud Platform. В вашем сегменте вы должны увидеть выходные файлы и файлы промежуточного хранения, созданные вашим заданием:

25a5d3d4b5d0b567.png

8. Отключите свои ресурсы.

Вы можете отключить свои ресурсы через консоль Google Cloud Platform .

Откройте браузер Cloud Storage в консоли Google Cloud Platform.

2b6c3a2a92b47015.png

Установите флажок рядом с созданным вами сегментом и нажмите кнопку УДАЛИТЬ , чтобы навсегда удалить сегмент и его содержимое.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Поздравляем!

Вы научились создавать проект Maven с помощью Cloud Dataflow SDK, запускать пример конвейера с использованием консоли Google Cloud Platform и удалять связанный сегмент Cloud Storage и его содержимое.

Узнать больше

Лицензия

Данная работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.