Uruchamianie zadania liczby słów Hadoop w klastrze Dataproc

1. Wprowadzenie

Przepływy pracy są powszechnie stosowane w analityce danych – obejmują pozyskiwanie, przekształcanie i analizowanie danych w celu znalezienia w nich istotnych informacji. W Google Cloud Platform narzędziem do zarządzania przepływami pracy jest Cloud Composer, czyli hostowana wersja popularnego narzędzia open source do zarządzania przepływami pracy Apache Airflow. W tym module dowiesz się, jak przy pomocy Cloud Composer utworzyć prosty przepływ pracy, który utworzy klaster Cloud Dataproc, przeanalizuje go za pomocą Cloud Dataproc i Apache Hadoop, a następnie usunie klaster Cloud Dataproc.

Co to jest Cloud Composer?

Cloud Composer to usługa w pełni zarządzana do orkiestracji przepływów pracy, która umożliwia tworzenie, planowanie i monitorowanie potoków obejmujących chmury i lokalne centra danych. Usługa Cloud Composer jest oparta na popularnym projekcie open source Apache Airflow i działa w języku programowania Python. Jest łatwa w użyciu i nie wiąże się z żadnymi ograniczeniami.

Korzystając z Cloud Composer zamiast lokalnej instancji Apache Airflow, użytkownicy mogą w pełni wykorzystać możliwości Airflow bez konieczności instalowania i zarządzania.

Co to jest Apache Airflow?

Apache Airflow to narzędzie open source służące do programowego tworzenia, planowania i monitorowania przepływów pracy. W tym laboratorium pojawią się te kluczowe terminy związane z Airflow:

  • DAG – DAG (skierowany graf acykliczny) to zbiór uporządkowanych zadań, które chcesz zaplanować i uruchomić. DAG-i, zwane też przepływami pracy, są zdefiniowane w standardowych plikach Pythona.
  • Operator – operator opisuje pojedyncze zadanie w przepływie pracy.

Co to jest Cloud Dataproc?

Cloud Dataproc to w pełni zarządzana usługa Apache SparkApache Hadoop na platformie Google Cloud. Cloud Dataproc łatwo integruje się z innymi usługami GCP, zapewniając wydajną i kompletną platformę do przetwarzania danych, analityki i uczenia maszynowego.

Co zrobisz

Z tego ćwiczenia dowiesz się, jak utworzyć i uruchomić w Cloud Composer przepływ pracy Apache Airflow, który wykonuje te zadania:

  • Tworzy klaster Cloud Dataproc.
  • Uruchamia w klastrze zadanie Apache Hadoop wordcount i zapisuje jego wyniki w Cloud Storage.
  • Usuwa klaster.

Czego się nauczysz

  • Jak utworzyć i uruchomić przepływ pracy Apache Airflow w Cloud Composer
  • Jak używać Cloud Composer i Cloud Dataproc do uruchamiania analizy zbioru danych
  • Jak uzyskać dostęp do środowiska Cloud Composer za pomocą konsoli Google Cloud Platform, pakietu SDK Cloud i interfejsu sieciowego Airflow

Czego potrzebujesz

  • Konto GCP
  • Podstawowa znajomość interfejsu wiersza poleceń
  • podstawowa znajomość Pythona,

2. Konfigurowanie GCP

Tworzenie projektu

Wybierz lub utwórz projekt Google Cloud Platform.

Zanotuj identyfikator projektu, który będzie Ci potrzebny w dalszych krokach.

Jeśli tworzysz nowy projekt, identyfikator projektu znajdziesz na stronie tworzenia bezpośrednio pod nazwą projektu.

Jeśli masz już utworzony projekt, jego identyfikator znajdziesz na stronie głównej konsoli na karcie Informacje o projekcie.

Włączanie interfejsów API

Włącz interfejsy Cloud Composer API, Cloud Dataproc API i Cloud Storage API.Po włączeniu tych interfejsów możesz zignorować przycisk „Przejdź do danych logowania” i przejść do następnego kroku samouczka.

Tworzenie środowiska Composer

Utwórz środowisko Cloud Composer o tej konfiguracji:

  • Nazwa: my-composer-environment
  • Lokalizacja: us-central1
  • Strefa: us-central1-a

Wszystkie pozostałe konfiguracje mogą pozostać domyślne. U dołu kliknij „Utwórz”.

Utworzenie zasobnika Cloud Storage

W projekcie utwórz zasobnik Cloud Storage o tej konfiguracji:

  • Nazwa: <your-project-id>
  • Domyślna klasa pamięci masowej: wiele regionów
  • Lokalizacja: Stany Zjednoczone
  • Model kontroli dostępu: szczegółowy

Gdy wszystko będzie gotowe, kliknij „Utwórz”.

3. Konfigurowanie Apache Airflow

Wyświetlanie informacji o środowisku Composer

W konsoli GCP otwórz stronę Środowiska.

Kliknij nazwę środowiska, aby wyświetlić jego szczegóły.

Na stronie Szczegóły środowiska znajdziesz informacje takie jak adres URL interfejsu internetowego Airflow, identyfikator klastra Google Kubernetes Engine, nazwa zasobnika Cloud Storage i ścieżka do folderu /dags.

W Airflow DAG (skierowany graf acykliczny) to zbiór uporządkowanych zadań, które chcesz zaplanować i uruchomić. DAG-i, zwane też przepływami pracy, są zdefiniowane w standardowych plikach Pythona. Cloud Composer planuje tylko DAG-i w folderze /dags. Folder /dags znajduje się w zasobniku Cloud Storage, który Cloud Composer tworzy automatycznie podczas tworzenia środowiska.

Ustawianie zmiennych środowiskowych Apache Airflow

Zmienne Apache Airflow to pojęcie specyficzne dla Airflow, które różni się od zmiennych środowiskowych. W tym kroku ustawisz te 3 zmienne Airflow: gcp_project, gcs_bucketgce_zone.

Używanie gcloud do ustawiania zmiennych

Najpierw otwórz Cloud Shell, w którym pakiet SDK Cloud jest już zainstalowany.

Ustaw zmienną środowiskową COMPOSER_INSTANCE na nazwę środowiska Composer.

COMPOSER_INSTANCE=my-composer-environment

Aby ustawić zmienne Airflow za pomocą narzędzia wiersza poleceń gcloud, użyj polecenia gcloud composer environments run z poleceniem podrzędnym variables. To polecenie gcloud composer wykonuje podpolecenie interfejsu wiersza poleceń Airflow variables. Podpolecenie przekazuje argumenty do narzędzia wiersza poleceń gcloud.

To polecenie uruchomisz 3 razy, zastępując zmienne odpowiednimi wartościami dla Twojego projektu.

Ustaw gcp_project za pomocą tego polecenia, zastępując <your-project-id> identyfikatorem projektu zanotowanym w kroku 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Dane wyjściowe będą wyglądać mniej więcej tak:

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Ustaw gcs_bucket za pomocą tego polecenia, zastępując <your-bucket-name> identyfikatorem zasobnika zanotowanym w kroku 2. Jeśli postępujesz zgodnie z naszymi zaleceniami, nazwa zasobnika jest taka sama jak identyfikator projektu. Dane wyjściowe będą podobne do tych z poprzedniego polecenia.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Ustaw gce_zone za pomocą tego polecenia. Dane wyjściowe będą podobne do tych z poprzednich poleceń.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Opcjonalnie) Używanie gcloud do wyświetlania zmiennej

Aby wyświetlić wartość zmiennej, uruchom polecenie podrzędne interfejsu wiersza poleceń Airflow variables z argumentem get lub użyj interfejsu Airflow.

Na przykład:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Możesz to zrobić za pomocą dowolnej z 3 ustawionych zmiennych: gcp_project, gcs_bucketgce_zone.

4. Przykładowy przepływ pracy

Przyjrzyjmy się kodowi DAG, którego użyjemy w kroku 5. Nie musisz jeszcze pobierać plików, po prostu postępuj zgodnie z instrukcjami.

Jest tu wiele kwestii do omówienia, więc przyjrzyjmy się im bliżej.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Zaczynamy od importów Airflow:

  • airflow.models – umożliwia nam dostęp do danych w bazie danych Airflow i ich tworzenie.
  • airflow.contrib.operators – miejsce, w którym mieszkają operatorzy ze społeczności. W tym przypadku potrzebujemy dataproc_operator, aby uzyskać dostęp do interfejsu Cloud Dataproc API.
  • airflow.utils.trigger_rule – do dodawania reguł aktywacji do naszych operatorów. Reguły wyzwalania umożliwiają precyzyjne określenie, czy operator powinien być wykonywany w zależności od stanu elementów nadrzędnych.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Określa lokalizację pliku wyjściowego. Ważny wiersz to models.Variable.get('gcs_bucket'), który pobierze wartość zmiennej gcs_bucket z bazy danych Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR – lokalizacja pliku .jar, który ostatecznie uruchomimy w klastrze Cloud Dataproc. Jest on już hostowany w GCP.
  • input_file – lokalizacja pliku zawierającego dane, na których będzie ostatecznie wykonywane zadanie Hadoop. W kroku 5 prześlemy dane do tej lokalizacji.
  • wordcount_args – argumenty, które przekażemy do pliku JAR.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

W ten sposób uzyskamy obiekt datetime reprezentujący północ poprzedniego dnia. Jeśli na przykład to polecenie zostanie wykonane 4 marca o godz. 11:00, obiekt daty i godziny będzie reprezentować 3 marca o godz. 00:00. Jest to związane ze sposobem, w jaki Airflow obsługuje harmonogramowanie. Więcej informacji na ten temat znajdziesz tutaj.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Zmienna default_dag_args w formie słownika powinna być podawana za każdym razem, gdy tworzony jest nowy DAG:

  • 'email_on_failure' – wskazuje, czy w przypadku niepowodzenia zadania mają być wysyłane alerty e-mail.
  • 'email_on_retry' – wskazuje, czy alerty e-mail powinny być wysyłane, gdy zadanie jest ponawiane.
  • 'retries' – oznacza liczbę ponownych prób, które Airflow powinien podjąć w przypadku niepowodzenia DAG-u.
  • 'retry_delay' – określa, jak długo Airflow ma czekać przed ponowną próbą.
  • 'project_id' – informuje DAG, z którym identyfikatorem projektu GCP ma być powiązany, co będzie później potrzebne w przypadku operatora Dataproc.
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Użycie with models.DAG informuje skrypt, że wszystko, co znajduje się poniżej, ma być uwzględnione w tym samym DAG-u. Widzimy też 3 argumenty:

  • Pierwszy argument to ciąg tekstowy, czyli nazwa tworzonego DAG-u. W tym przypadku używamy composer_hadoop_tutorial.
  • schedule_interval – obiekt datetime.timedelta, który w tym przypadku ustawiliśmy na 1 dzień. Oznacza to, że ten DAG będzie próbował wykonać się raz dziennie po godzinie 'start_date', która została wcześniej ustawiona w 'default_dag_args'.
  • default_args – utworzony wcześniej słownik zawierający argumenty domyślne DAG

Tworzenie klastra Dataproc

Następnie utworzymy dataproc_operator.DataprocClusterCreateOperator, który utworzy klaster Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

W tym operatorze widzimy kilka argumentów, z których wszystkie oprócz pierwszego są specyficzne dla tego operatora:

  • task_id – podobnie jak w przypadku BashOperator, jest to nazwa przypisana do operatora, którą można wyświetlić w interfejsie Airflow.
  • cluster_name – nazwa, którą przypisujemy do klastra Cloud Dataproc. W tym przypadku nazwaliśmy go composer-hadoop-tutorial-cluster-{{ ds_nodash }} (w polu informacyjnym znajdziesz opcjonalne dodatkowe informacje).
  • num_workers – liczba instancji roboczych, które przydzielamy do klastra Cloud Dataproc.
  • zone – region geograficzny, w którym ma się znajdować klaster, zapisany w bazie danych Airflow. Spowoduje to odczytanie zmiennej 'gce_zone' ustawionej w kroku 3.
  • master_machine_type – typ maszyny, którą chcemy przydzielić do węzła głównego Cloud Dataproc.
  • worker_machine_type – typ maszyny, którą chcemy przydzielić do węzła roboczego Cloud Dataproc.

Przesyłanie zadania Apache Hadoop

Symbol dataproc_operator.DataProcHadoopOperator umożliwia przesyłanie zadań do klastra Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Udostępniamy kilka parametrów:

  • task_id – nazwa przypisana do tego elementu DAG-a.
  • main_jar – lokalizacja pliku JAR, który chcemy uruchomić w klastrze.
  • cluster_name – nazwa klastra, w którym ma zostać uruchomione zadanie. Zauważ, że jest ona identyczna z nazwą w poprzednim operatorze.
  • arguments – argumenty przekazywane do pliku JAR, tak jak w przypadku wykonywania pliku .jar z wiersza poleceń.

Usuwanie klastra

Ostatnim operatorem, który utworzymy, jest dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Jak sugeruje nazwa, ten operator usunie dany klaster Cloud Dataproc. Widzimy tu 3 argumenty:

  • task_id – podobnie jak w przypadku BashOperator, jest to nazwa przypisana do operatora, która jest widoczna w interfejsie Airflow.
  • cluster_name – nazwa, którą przypisujemy do klastra Cloud Dataproc. W tym przykładzie nadaliśmy mu nazwę composer-hadoop-tutorial-cluster-{{ ds_nodash }} (dodatkowe informacje znajdziesz w polu informacyjnym po sekcji „Tworzenie klastra Dataproc”).
  • trigger_rule – Reguły wyzwalania zostały krótko wspomniane na początku tego kroku podczas importowania, ale tutaj widzimy je w działaniu. Domyślnie operator Airflow nie jest wykonywany, dopóki wszystkie operatory poprzedzające nie zostaną ukończone. Reguła wyzwalająca ALL_DONE wymaga tylko, aby wszystkie operatory nadrzędne zostały ukończone, niezależnie od tego, czy zakończyły się powodzeniem. W tym przypadku oznacza to, że nawet jeśli zadanie Hadoop się nie powiedzie, nadal chcemy zamknąć klaster.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Chcemy też, aby te operatory były wykonywane w określonej kolejności, co możemy oznaczyć za pomocą operatorów przesunięcia bitowego w Pythonie. W tym przypadku najpierw zawsze zostanie wykonana funkcja create_dataproc_cluster, potem run_dataproc_hadoop, a na końcu delete_dataproc_cluster.

Po połączeniu wszystkich elementów kod wygląda tak:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Przesyłanie plików Airflow do Cloud Storage

Skopiuj DAG do folderu /dags

  1. Najpierw otwórz Cloud Shell, w którym pakiet SDK Cloud jest już zainstalowany.
  2. Skopiuj repozytorium przykładowych plików Pythona i przejdź do katalogu composer/workflows.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. Uruchom to polecenie, aby ustawić nazwę folderu DAG jako zmienną środowiskową:
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Uruchom to polecenie gsutil, aby skopiować kod samouczka do miejsca, w którym utworzony jest folder /dags.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Dane wyjściowe będą wyglądać mniej więcej tak:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Korzystanie z interfejsu Airflow

Aby uzyskać dostęp do interfejsu internetowego Airflow za pomocą konsoli GCP:

  1. Otwórz stronę Środowiska.
  2. W kolumnie Serwer internetowy Airflow środowiska kliknij ikonę nowego okna. Interfejs internetowy Airflow otworzy się w nowym oknie przeglądarki.

Więcej informacji o interfejsie Airflow znajdziesz w artykule Uzyskiwanie dostępu do interfejsu internetowego.

Wyświetlanie zmiennych

Ustawione wcześniej zmienne są zachowywane w środowisku. Zmienne możesz wyświetlić, wybierając Admin > Variables (Administracja > Zmienne) na pasku menu interfejsu Airflow.

Wybrana jest karta Lista, która zawiera tabelę z tymi kluczami i wartościami: klucz: gcp_project, wartość: project-id; klucz: gcs_bucket, wartość: gs://bucket-name; klucz: gce_zone, wartość: zone.

Przeglądanie uruchomień DAG-ów

Gdy prześlesz plik DAG do folderu dags w Cloud Storage, Cloud Composer go przeanalizuje. Jeśli nie zostaną znalezione żadne błędy, nazwa przepływu pracy pojawi się na liście DAG, a przepływ pracy zostanie umieszczony w kolejce do natychmiastowego uruchomienia. Aby wyświetlić DAG-i, u góry strony kliknij DAG-i.

84a29c71f20bff98.png

Kliknij composer_hadoop_tutorial, aby otworzyć stronę z informacjami o DAG. Ta strona zawiera graficzne przedstawienie zadań przepływu pracy i zależności.

f4f1663c7a37f47c.png

Na pasku narzędzi kliknij Widok wykresu, a następnie najedź kursorem na grafikę każdego zadania, aby zobaczyć jego stan. Pamiętaj, że obramowanie każdego zadania również wskazuje jego stan (zielone obramowanie = uruchomione, czerwone = nieudane itp.).

4c5a0c6fa9f88513.png

Aby ponownie uruchomić przepływ pracy w widoku wykresu:

  1. W widoku wykresu interfejsu Airflow kliknij ikonę create_dataproc_cluster.
  2. Kliknij Wyczyść, aby zresetować 3 zadania, a następnie kliknij OK, aby potwierdzić.

fd1b23b462748f47.png

Stan i wyniki przepływu pracy composer-hadoop-tutorial możesz też sprawdzić na tych stronach w konsoli GCP:

  • Klastry Cloud Dataproc, aby monitorować tworzenie i usuwanie klastrów. Pamiętaj, że klaster utworzony przez przepływ pracy jest tymczasowy: istnieje tylko przez czas trwania przepływu pracy i jest usuwany w ramach ostatniego zadania przepływu pracy.
  • Zadania Cloud Dataproc, aby wyświetlić lub monitorować zadanie Apache Hadoop wordcount. Kliknij identyfikator zadania, aby wyświetlić dane wyjściowe logu zadania.
  • Przeglądarka Cloud Storage, aby wyświetlić wyniki zliczania słów w folderze wordcount w zasobniku Cloud Storage utworzonym na potrzeby tego ćwiczenia.

7. Czyszczenie

Aby uniknąć obciążenia konta Google Cloud Platform opłatami za zasoby zużyte w tym ćwiczeniu:

  1. (Opcjonalnie) Aby zapisać dane, pobierz je z zasobnika Cloud Storage w środowisku Cloud Composer i z utworzonego przez Ciebie na potrzeby tego ćwiczenia zasobnika pamięci.
  2. Usuń zasobnik Cloud Storage utworzony na potrzeby tego ćwiczenia.
  3. Usuń zasobnik Cloud Storage dla środowiska.
  4. Usuń środowisko Cloud Composer. Pamiętaj, że usunięcie środowiska nie powoduje usunięcia zasobnika pamięci masowej środowiska.

Opcjonalnie możesz też usunąć projekt:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt, który chcesz usunąć, i kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.