Hadoop-Wordcount-Job in einem Dataproc-Cluster ausführen

1. Einführung

Workflows sind ein häufiger Anwendungsfall in der Datenanalyse. Sie umfassen das Erfassen, Transformieren und Analysieren von Daten, um aussagekräftige Informationen zu erhalten. Auf der Google Cloud Platform werden Workflows in Cloud Composer orchestriert, einer gehosteten Version des beliebten Open-Source-Workflowtools Apache Airflow. In diesem Lab erstellen Sie mit Cloud Composer einen einfachen Workflow, der einen Cloud Dataproc-Cluster erstellt, ihn mit Cloud Dataproc und Apache Hadoop analysiert und den Cloud Dataproc-Cluster danach löscht.

Was ist Cloud Composer?

Cloud Composer ist ein vollständig verwalteter Workflow-Orchestrierungsdienst, mit dem Sie Pipelines, die sich über Clouds und lokale Rechenzentren erstrecken, erstellen, planen und überwachen können. Der Dienst basiert auf dem Open-Source-Projekt Apache Airflow und wird in der Programmiersprache Python betrieben. Cloud Composer ist anbieterunabhängig und nutzerfreundlich.

Durch die Verwendung von Cloud Composer anstelle einer lokalen Instanz von Apache Airflow können Nutzer vom Besten von Airflow profitieren, ohne dass eine Installation oder ein hoher Verwaltungsaufwand erforderlich sind.

Was ist Apache Airflow?

Apache Airflow ist ein Open-Source-Tool zum programmgesteuerten Erstellen, Planen und Überwachen von Workflows. Im gesamten Lab werden einige wichtige Begriffe im Zusammenhang mit Airflow verwendet:

  • DAG: Ein DAG (Directed Acyclic Graph, gerichteter azyklischer Graph) ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs, auch Workflows genannt, werden in Standard-Python-Dateien definiert.
  • Operator: Ein Operator beschreibt eine einzelne Aufgabe in einem Workflow.

Was ist Cloud Dataproc?

Cloud Dataproc ist der vollständig verwaltete Apache Spark- und Apache Hadoop -Dienst von Google Cloud Platform. Cloud Dataproc lässt sich problemlos in andere GCP-Dienste integrieren und bietet Ihnen eine leistungsstarke und umfassende Plattform für Datenverarbeitung, Analyse und maschinelles Lernen.

Vorgehensweise

In diesem Codelab erfahren Sie, wie Sie einen Apache Airflow-Workflow in Cloud Composer erstellen und ausführen, der die folgenden Aufgaben ausführt:

  • Cloud Dataproc-Cluster erstellen
  • Führt einen Apache Hadoop-Wordcount-Job im Cluster aus und gibt die Ergebnisse in Cloud Storage aus
  • Löscht den Cluster

Lerninhalte

  • Einen Apache Airflow-Workflow in Cloud Composer erstellen und ausführen
  • Cloud Composer und Cloud Dataproc zum Ausführen einer Analyse für ein Dataset verwenden
  • So greifen Sie über die Google Cloud Console, das Cloud SDK und die Airflow-Weboberfläche auf Ihre Cloud Composer-Umgebung zu

Voraussetzungen

  • GCP-Konto
  • Grundkenntnisse der CLI
  • Grundlegendes Verständnis von Python

2. GCP einrichten

Projekt erstellen

Wählen Sie ein Google Cloud Platform-Projekt aus oder erstellen Sie eines.

Notieren Sie sich Ihre Projekt-ID, da Sie sie in späteren Schritten benötigen.

Wenn Sie ein neues Projekt erstellen, finden Sie die Projekt-ID direkt unter dem Projektnamen auf der Erstellungsseite.

Wenn Sie bereits ein Projekt erstellt haben, finden Sie die ID auf der Console-Startseite auf der Karte „Projektinformationen“.

APIs aktivieren

Cloud Composer API, Cloud Dataproc API und Cloud Storage API aktivieren: Sobald die APIs aktiviert sind, können Sie die Schaltfläche „Zu Anmeldedaten“ ignorieren und mit dem nächsten Schritt des Tutorials fortfahren.

Composer-Umgebung erstellen

Erstellen Sie eine Cloud Composer-Umgebung mit der folgenden Konfiguration:

  • Name: my-composer-environment
  • Standort: us-central1
  • Zone: us-central1-a

Alle anderen Konfigurationen können bei den Standardeinstellungen belassen werden. Klicken Sie unten auf „Erstellen“.

Cloud Storage-Bucket erstellen

Erstellen Sie in Ihrem Projekt einen Cloud Storage-Bucket mit der folgenden Konfiguration:

  • Name: <your-project-id>
  • Standardspeicherklasse: Multiregional
  • Standort: USA
  • Zugriffssteuerungsmodell: detailliert

Klicken Sie auf „Erstellen“, wenn Sie fertig sind.

3. Apache Airflow einrichten

Informationen zur Composer-Umgebung ansehen

Öffnen Sie in der GCP Console die Seite Umgebungen.

Klicken Sie auf den Namen der Umgebung, um die Details aufzurufen.

Die Seite Umgebungsdetails enthält verschiedene Informationen, wie die URL der Airflow-Weboberfläche, die Cluster-ID der Google Kubernetes Engine, den Namen des Cloud Storage-Buckets und den Pfad für den Ordner „/dags“.

In Airflow ist ein DAG (Directed Acyclic Graph, gerichteter azyklischer Graph) eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs, auch Workflows genannt, werden in Standard-Python-Dateien definiert. Cloud Composer plant nur die DAGs im Ordner „/dags“. Der Ordner „/dags“ befindet sich im Cloud Storage-Bucket, den Cloud Composer automatisch erstellt, wenn Sie Ihre Umgebung erstellen.

Apache Airflow-Umgebungsvariablen festlegen

Apache Airflow-Variablen sind ein Airflow-spezifisches Konzept, das sich von Umgebungsvariablen unterscheidet. In diesem Schritt legen Sie die folgenden drei Airflow-Variablen fest: gcp_project, gcs_bucket und gce_zone.

gcloud zum Festlegen von Variablen verwenden

Öffnen Sie zuerst Cloud Shell, in der das Cloud SDK bereits installiert ist.

Legen Sie für die Umgebungsvariable COMPOSER_INSTANCE den Namen Ihrer Composer-Umgebung fest.

COMPOSER_INSTANCE=my-composer-environment

Wenn Sie Airflow-Variablen mit dem gcloud-Befehlszeilentool festlegen möchten, verwenden Sie den Befehl gcloud composer environments run mit dem Unterbefehl variables. Mit diesem gcloud composer-Befehl wird der Unterbefehl variables der Airflow-Befehlszeile ausgeführt. Dabei werden die Argumente an das gcloud-Befehlszeilentool übergeben.

Sie führen diesen Befehl dreimal aus und ersetzen die Variablen durch die für Ihr Projekt relevanten Variablen.

Legen Sie die gcp_project mit dem folgenden Befehl fest. Ersetzen Sie dabei <your-project-id> durch die Projekt-ID, die Sie sich in Schritt 2 notiert haben.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Die Ausgabe sieht in etwa so aus:

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Legen Sie die gcs_bucket mit dem folgenden Befehl fest und ersetzen Sie <your-bucket-name> durch die Bucket-ID, die Sie sich in Schritt 2 notiert haben. Wenn Sie unserer Empfehlung gefolgt sind, entspricht der Bucket-Name Ihrer Projekt-ID. Die Ausgabe ist ähnlich wie beim vorherigen Befehl.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Legen Sie gce_zone mit dem folgenden Befehl fest. Ihre Ausgabe sieht ähnlich aus wie bei den vorherigen Befehlen.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Optional) gcloud verwenden, um eine Variable aufzurufen

Um den Wert einer Variablen abzurufen, führen Sie den Airflow-CLI-Unterbefehl variables mit dem Argument get aus oder verwenden Sie die Airflow-Benutzeroberfläche.

Beispiel:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Das ist mit jeder der drei Variablen möglich, die Sie gerade festgelegt haben: gcp_project, gcs_bucket und gce_zone.

4. Beispielworkflow

Sehen wir uns den Code für den DAG an, den wir in Schritt 5 verwenden. Sie müssen noch keine Dateien herunterladen. Folgen Sie einfach den Anweisungen.

Hier gibt es einiges zu beachten.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Wir beginnen mit einigen Airflow-Importen:

  • airflow.models: Ermöglicht uns den Zugriff auf und die Erstellung von Daten in der Airflow-Datenbank.
  • airflow.contrib.operators – Hier sind die Operatoren aus der Community zu finden. In diesem Fall benötigen wir die dataproc_operator, um auf die Cloud Dataproc API zuzugreifen.
  • airflow.utils.trigger_rule – Zum Hinzufügen von Triggerregeln zu unseren Operatoren. Mit Auslöserregeln lässt sich genau steuern, ob ein Vorgang in Bezug auf den Status seiner übergeordneten Elemente ausgeführt werden soll.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Damit wird der Speicherort unserer Ausgabedatei angegeben. Die wichtige Zeile hier ist models.Variable.get('gcs_bucket'), mit der der Wert der Variablen gcs_bucket aus der Airflow-Datenbank abgerufen wird.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR: Speicherort der JAR-Datei, die wir schließlich im Cloud Dataproc-Cluster ausführen. Sie wird bereits für Sie auf GCP gehostet.
  • input_file: Speicherort der Datei mit den Daten, die von unserem Hadoop-Job verarbeitet werden. Die Daten werden in Schritt 5 an diesen Speicherort hochgeladen.
  • wordcount_args – Argumente, die an die JAR-Datei übergeben werden.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

So erhalten wir ein datetime-Objekt, das Mitternacht des Vortags entspricht. Wenn dieser Befehl beispielsweise am 4. März um 11:00 Uhr ausgeführt wird, stellt das datetime-Objekt den 3. März um 00:00 Uhr dar. Das hängt damit zusammen, wie Airflow die Planung handhabt. Weitere Informationen

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Die Variable default_dag_args in Form eines Dictionary sollte immer dann angegeben werden, wenn ein neuer DAG erstellt wird:

  • 'email_on_failure': Gibt an, ob E-Mail-Benachrichtigungen gesendet werden sollen, wenn eine Aufgabe fehlgeschlagen ist.
  • 'email_on_retry': Gibt an, ob E-Mail-Benachrichtigungen gesendet werden sollen, wenn eine Aufgabe wiederholt wird.
  • 'retries': Gibt an, wie viele Wiederholungsversuche Airflow im Falle eines DAG-Fehlers unternehmen soll.
  • 'retry_delay': Gibt an, wie lange Airflow warten soll, bevor ein Wiederholungsversuch gestartet wird.
  • 'project_id' – Gibt an, welche GCP-Projekt-ID der DAG zugeordnet werden soll. Diese wird später für den Dataproc-Operator benötigt.
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Mit with models.DAG wird dem Skript mitgeteilt, dass alles darunter in denselben DAG aufgenommen werden soll. Außerdem werden drei Argumente übergeben:

  • Der erste ist ein String und gibt den Namen des zu erstellenden DAG an. In diesem Fall verwenden wir composer_hadoop_tutorial.
  • schedule_interval: Ein datetime.timedelta-Objekt, das hier auf einen Tag festgelegt ist. Das bedeutet, dass diese DAG einmal täglich nach dem 'start_date' ausgeführt wird, das zuvor in 'default_dag_args' festgelegt wurde.
  • default_args: Das Wörterbuch, das wir zuvor erstellt haben und das die Standardargumente für den DAG enthält.

Dataproc-Cluster erstellen

Als Nächstes erstellen wir eine dataproc_operator.DataprocClusterCreateOperator, mit der ein Cloud Dataproc-Cluster erstellt wird.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

In diesem Operator sehen wir einige Argumente, von denen alle bis auf das erste spezifisch für diesen Operator sind:

  • task_id: Wie beim BashOperator ist dies der Name, den wir dem Operator zuweisen. Er ist in der Airflow-Benutzeroberfläche sichtbar.
  • cluster_name: Der Name, den wir dem Cloud Dataproc-Cluster zuweisen. Hier haben wir sie composer-hadoop-tutorial-cluster-{{ ds_nodash }} genannt (siehe Infobox für optionale zusätzliche Informationen).
  • num_workers: Die Anzahl der Worker, die dem Cloud Dataproc-Cluster zugewiesen werden.
  • zone: Die geografische Region, in der sich der Cluster befinden soll, wie in der Airflow-Datenbank gespeichert. Dadurch wird die Variable 'gce_zone' gelesen, die wir in Schritt 3 festgelegt haben.
  • master_machine_type: Der Maschinentyp, der dem Cloud Dataproc-Master zugewiesen werden soll.
  • worker_machine_type – Der Maschinentyp, der dem Cloud Dataproc-Worker zugewiesen werden soll

Apache Hadoop-Job senden

Mit dataproc_operator.DataProcHadoopOperator können wir einen Job an einen Cloud Dataproc-Cluster senden.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Wir stellen mehrere Parameter zur Verfügung:

  • task_id: Der Name, den wir diesem Teil des DAG zuweisen.
  • main_jar – Speicherort der JAR-Datei, die für den Cluster ausgeführt werden soll
  • cluster_name: Name des Clusters, für den der Job ausgeführt werden soll. Dieser ist identisch mit dem Namen im vorherigen Operator.
  • arguments: Argumente, die an die JAR-Datei übergeben werden, so als würden Sie die .jar-Datei über die Befehlszeile ausführen.

Cluster löschen

Der letzte Operator, den wir erstellen, ist dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Wie der Name schon sagt, löscht dieser Operator einen angegebenen Cloud Dataproc-Cluster. Wir sehen hier drei Argumente:

  • task_id: Wie beim BashOperator ist dies der Name, den wir dem Operator zuweisen. Er ist in der Airflow-Benutzeroberfläche sichtbar.
  • cluster_name: Der Name, den wir dem Cloud Dataproc-Cluster zuweisen. In diesem Beispiel haben wir den Namen composer-hadoop-tutorial-cluster-{{ ds_nodash }} verwendet. Optionale zusätzliche Informationen finden Sie im Infofeld nach „Dataproc-Cluster erstellen“.
  • trigger_rule: Triggerregeln wurden zu Beginn dieses Schritts kurz erwähnt. Hier sehen Sie eine in Aktion. Standardmäßig wird ein Airflow-Operator erst ausgeführt, wenn alle Upstream-Operatoren erfolgreich abgeschlossen wurden. Für die Triggerregel ALL_DONE muss nur sichergestellt sein, dass alle Upstream-Operatoren abgeschlossen sind, unabhängig davon, ob sie erfolgreich waren. Das bedeutet, dass der Cluster auch dann heruntergefahren werden soll, wenn der Hadoop-Job fehlgeschlagen ist.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Schließlich sollen diese Operatoren in einer bestimmten Reihenfolge ausgeführt werden. Das können wir mit Python-Bitshift-Operatoren angeben. In diesem Fall wird immer zuerst create_dataproc_cluster, dann run_dataproc_hadoop und schließlich delete_dataproc_cluster ausgeführt.

Der vollständige Code sieht so aus:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Airflow-Dateien in Cloud Storage hochladen

DAG in den Ordner „/dags“ kopieren

  1. Öffnen Sie zuerst Cloud Shell, in der das Cloud SDK bereits installiert ist.
  2. Klonen Sie das Repository mit den Python-Beispielen und wechseln Sie in das Verzeichnis „composer/workflows“.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. Führen Sie den folgenden Befehl aus, um den Namen Ihres DAGs-Ordners als Umgebungsvariable festzulegen.
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Führen Sie den folgenden gsutil-Befehl aus, um den Tutorial-Code in das Verzeichnis zu kopieren, in dem der Ordner „/dags“ erstellt wird.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Die Ausgabe sieht in etwa so aus:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Airflow-UI verwenden

So greifen Sie mit der GCP Console auf die Airflow-Weboberfläche zu:

  1. Öffnen Sie die Seite Umgebungen.
  2. Klicken Sie in der Spalte Airflow-Webserver für die Umgebung auf das Symbol für ein neues Fenster. Die Airflow-Weboberfläche wird in einem neuen Browserfenster geöffnet.

So greifen Sie auf die Airflow-Weboberfläche zu.

Variablen ansehen

Die zuvor erstellten Variablen sind dauerhaft in Ihrer Umgebung vorhanden. Sie können sie aufrufen, indem Sie in der Airflow-Menüleiste Admin > Variablen auswählen.

Der Tab „Liste“ ist ausgewählt und zeigt eine Tabelle mit den folgenden Schlüsseln und Werten: Schlüssel: „gcp_project“, Wert: „project-id“ Schlüssel: „gcs_bucket“, Wert: „gs://bucket-name“ Schlüssel: „gce_zone“, Wert: „zone“

DAG-Ausführungen ansehen

Wenn Sie Ihre DAG-Datei in den Ordner dags in Cloud Storage hochladen, parst Cloud Composer die Datei. Werden keine Fehler gefunden, ist der Name des Workflows in der DAG-Liste zu sehen und der Workflow wird zur sofortigen Ausführung in die Warteschlange gestellt. Wenn Sie sich Ihre DAGs ansehen möchten, klicken Sie oben auf der Seite auf DAGs.

84a29c71f20bff98.png

Klicken Sie auf composer_hadoop_tutorial, um die DAG-Detailseite zu öffnen. Sie enthält eine grafische Darstellung der Workflowaufgaben und ‑abhängigkeiten.

f4f1663c7a37f47c.png

Klicken Sie in der Symbolleiste auf Graph View und bewegen Sie den Mauszeiger über die Grafik, um den Status der Aufgaben zu sehen. Der Rahmen um jede Aufgabe gibt auch deren Status an (grün = läuft; rot = fehlgeschlagen usw.).

4c5a0c6fa9f88513.png

So führen Sie den Workflow über die Grafikansicht noch einmal aus:

  1. Klicken Sie in der Grafikansicht der Airflow-Benutzeroberfläche auf die Grafik create_dataproc_cluster.
  2. Klicken Sie auf Clear, um die drei Aufgaben zurückzusetzen, und dann zur Bestätigung auf OK.

fd1b23b462748f47.png

Sie können den Status und die Ergebnisse des Workflows composer-hadoop-tutorial auch auf den folgenden GCP Console-Seiten prüfen:

  • Cloud Dataproc-Cluster zum Monitoring des Erstellens und Löschens von Clustern. Beachten Sie, dass der vom Workflow erstellte Cluster sitzungsspezifisch ist. Er ist nur für die Dauer des Workflows vorhanden und wird mit der letzten Workflowaufgabe gelöscht.
  • Cloud Dataproc-Jobs zur Anzeige oder zum Monitoring eines Apache Hadoop-Wordcount-Jobs. Klicken Sie auf die Job-ID, um die Ausgabe des Job-Logs aufzurufen.
  • Cloud Storage-Browser zur Anzeige der Ergebnisse der Wortzählung im Ordner wordcount des Cloud Storage-Buckets, den Sie für dieses Codelab erstellt haben.

7. Bereinigen

So vermeiden Sie, dass Ihrem GCP-Konto die in diesem Codelab verwendeten Ressourcen in Rechnung gestellt werden:

  1. (Optional) Wenn Sie Ihre Daten speichern möchten, laden Sie die Daten aus dem Cloud Storage-Bucket für die Cloud Composer-Umgebung und dem Speicher-Bucket herunter, den Sie für dieses Codelab erstellt haben.
  2. Löschen Sie den Cloud Storage-Bucket, den Sie für dieses Codelab erstellt haben.
  3. Löschen Sie den Cloud Storage-Bucket für die Umgebung.
  4. Löschen Sie die Cloud Composer-Umgebung. Hinweis: Wenn Sie die Umgebung löschen, bleibt der Storage-Bucket für die Umgebung erhalten.

Optional können Sie das Projekt auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie im Feld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.