Notebooks mit Google Cloud Dataflow verwenden

1. Einführung

Cloud-Dataflow.png

Google Cloud Dataflow

Zuletzt aktualisiert:5. Juli 2023

Was ist Dataflow?

Dataflow ist ein verwalteter Dienst zur Ausführung eines breiten Spektrums an Datenverarbeitungsmustern. Die Dokumentation auf dieser Website zeigt Ihnen, wie Sie Ihre Pipelines für die Batch- und Streaming-Datenverarbeitung mit Dataflow bereitstellen, einschließlich Anweisungen zur Verwendung von Dienstfunktionen.

Das Apache Beam SDK ist ein Open-Source-Programmiermodell, mit dem Sie sowohl Batch- als auch Streamingpipelines entwickeln können. Sie erstellen Ihre Pipelines mit einem Apache Beam-Programm und führen sie dann im Dataflow-Dienst aus. Die Apache Beam-Dokumentation enthält detaillierte konzeptionelle Informationen und Referenzmaterial für das Apache Beam-Programmiermodell, SDKs und andere Runner.

Datenanalysen schnell streamen

Dataflow ermöglicht die schnelle, vereinfachte Entwicklung von Streamingdaten-Pipelines mit geringerer Datenlatenz.

Betrieb und Verwaltung vereinfachen

Dank des serverlosen Ansatzes von Dataflow entfällt der operative Aufwand von Data Engineering-Arbeitslasten, sodass sich Teams auf das Programmieren konzentrieren können, anstatt Servercluster zu verwalten.

Gesamtbetriebskosten senken

Durch das Autoscaling von Ressourcen und eine kostenoptimierte Batchverarbeitung bietet Dataflow praktisch unbegrenzte Kapazitäten für Ihre saisonalen Arbeitslasten und Lastspitzen, ohne dass unnötige Kosten anfallen.

Wichtige Funktionen

Automatisierte Ressourcenverwaltung und dynamischer Arbeitsausgleich

Dataflow automatisiert die Bereitstellung und Verwaltung von Verarbeitungsressourcen, um die Latenz zu minimieren und die Auslastung zu maximieren, sodass Sie Instanzen nicht manuell erstellen oder reservieren müssen. Die Arbeitsaufteilung wird ebenfalls automatisiert und optimiert, sodass Arbeitsverzögerungen dynamisch ausgeglichen werden. Keine Lust, nach den Hotkeys zu suchen oder Ihre Eingabedaten vorverarbeiten.

Horizontales Autoscaling

Durch horizontales Autoscaling von Worker-Ressourcen für einen optimalen Durchsatz wird das gesamte Preis-Leistungs-Verhältnis verbessert.

Flexible Ressourcenpreisplanung für die Batchverarbeitung

Zur flexiblen Verarbeitung bei der zeitlichen Planung von Jobs, z. B. über Nacht, bietet die flexible Ressourcenplanung (FlexRS) einen niedrigeren Preis für die Batchverarbeitung. Diese flexiblen Jobs werden in einer Warteschlange mit der Garantie platziert, dass sie innerhalb von sechs Stunden abgerufen und ausgeführt werden.

Was Sie im Rahmen dieses Kurses durchführen

Wenn Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, können Sie Pipelines iterativ entwickeln, Ihre Pipelinegrafik prüfen und einzelne PCollections in einem REPL-Workflow (Read-Eval-Print-Loop) analysieren. Diese Apache Beam-Notebooks werden über Vertex AI Workbench zur Verfügung gestellt, einen verwalteten Dienst, der virtuelle Notebook-Maschinen hostet, auf denen die neuesten Data-Science- und ML-Frameworks vorinstalliert sind.

Dieses Codelab konzentriert sich auf die Funktionen, die von Apache Beam-Notebooks eingeführt werden.

Aufgaben in diesem Lab

  • Notebookinstanz erstellen
  • Einfache Pipeline erstellen
  • Daten aus unbegrenzter Quelle lesen
  • Daten visualisieren
  • Dataflow-Job vom Notebook aus starten
  • Notebook speichern

Voraussetzungen

  • Ein Google Cloud Platform-Projekt mit aktivierter Abrechnung.
  • Google Cloud Dataflow und Google Cloud PubSub sind aktiviert.

2. Einrichtung

  1. Wählen Sie in der Cloud Console auf der Seite für die Projektauswahl ein Cloud-Projekt aus oder erstellen Sie eines.

Achten Sie darauf, dass die folgenden APIs aktiviert sind:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

Sie können dies überprüfen, indem Sie die & Seite „Dienste“.

In dieser Anleitung lesen wir Daten aus einem Pub/Sub-Abo. Achten Sie daher darauf, dass das Compute Engine-Standarddienstkonto die Rolle „Bearbeiter“ hat, oder gewähren Sie ihm die Rolle „Pub/Sub-Bearbeiter“.

3. Erste Schritte mit Apache Beam-Notebooks

Apache Beam-Notebookinstanz starten

  1. Starten Sie Dataflow in der Console:

  1. Wählen Sie im Menü auf der linken Seite die Seite Workbench aus.
  2. Achten Sie darauf, dass Sie sich auf dem Tab Nutzerverwaltete Notebooks befinden.
  3. Klicken Sie in der Symbolleiste auf Neues Notebook.
  4. Wählen Sie Apache Beam > Ohne GPUs aus.
  5. Wählen Sie auf der Seite Neues Notebook ein Subnetzwerk für die Notebook-VM aus und klicken Sie auf Erstellen.
  6. Klicken Sie auf JupyterLab öffnen, wenn der Link aktiv wird. Vertex AI Workbench erstellt eine neue Apache Beam-Notebookinstanz.

4. Pipeline erstellen

Notebookinstanz erstellen

Wählen Sie Datei > Neu > Notebook und wählen Sie einen Kernel mit Apache Beam 2.47 oder höher aus.

Mit dem Hinzufügen von Code zu Ihrem Notebook beginnen

  • Kopieren Sie den Code aus jedem Abschnitt und fügen Sie ihn in eine neue Zelle in Ihrem Notebook ein
  • Zelle ausführen

6bd3dd86cc7cf802.png

Wenn Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, können Sie Pipelines iterativ entwickeln, Ihre Pipelinegrafik prüfen und einzelne PCollections in einem REPL-Workflow (Read-Eval-Print-Loop) analysieren.

Apache Beam ist auf Ihrer Notebookinstanz installiert. Fügen Sie daher die Module interactive_runner und interactive_beam bei Ihrem Notebook ein.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Wenn Ihr Notebook andere Google-Dienste verwendet, fügen Sie die folgenden Importanweisungen hinzu:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Interaktivitätsoptionen festlegen

Im Folgenden wird die Dauer der Datenerfassung auf 60 Sekunden festgelegt. Wenn Sie die Iteration beschleunigen möchten, legen Sie eine kürzere Dauer fest, zum Beispiel „10s“.

ib.options.recording_duration = '60s'

Weitere interaktive Optionen finden Sie in der interactive_beam.options-Klasse.

Initialisieren Sie die Pipeline mit einem InteractiveRunner-Objekt.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Daten lesen und visualisieren

Das folgende Beispiel zeigt eine Apache Beam-Pipeline, die ein Abo für das angegebene Pub/Sub-Thema erstellt und aus dem Abo liest.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Die Pipeline zählt die Wörter nach Fenstern aus der Quelle. Sie erstellt feste Fensteraufrufe mit einer Dauer von jeweils 10 Sekunden pro Fenster.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Nachdem die Daten im Fenstermodus angezeigt werden, werden die Wörter pro Fenster gezählt.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Daten visualisieren

Die Methode show() visualisiert die resultierende PCollection im Notebook.

ib.show(windowed_word_counts, include_window_info=True)

Die show-Methode, die eine PCollection in Tabellenform visualisiert.

Übergeben Sie visualize_data=True an die show()-Methode, um Ihre Daten visuell darzustellen. So fügen Sie eine neue Zelle hinzu:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Sie können mehrere Filter auf Ihre Visualisierungen anwenden. Mit der folgenden Visualisierung können Sie nach Label und Achse filtern:

Die show-Methode zur Visualisierung einer PCollection als umfangreiche Gruppe von filterbaren UI-Elementen.

5. Pandas DataFrame verwenden

Eine weitere nützliche Visualisierung in Apache Beam-Notebooks ist ein Pandas DataFrame. Im folgenden Beispiel werden die Wörter zuerst in Kleinbuchstaben umgewandelt und dann die Häufigkeit jedes Worts berechnet.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

Die collect()-Methode stellt die Ausgabe in einem Pandas-DataFrame bereit.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Die collect-Methode, die eine PCollection in einem Pandas DataFrame darstellt.

6. (Optional) Dataflow-Jobs von Ihrem Notebook aus starten

  1. Zum Ausführen von Jobs in Dataflow benötigen Sie zusätzliche Berechtigungen. Achten Sie darauf, dass das Compute Engine-Standarddienstkonto die Rolle „Bearbeiter“ hat, oder weisen Sie ihm die folgenden IAM-Rollen zu:
  • Dataflow-Administrator
  • Dataflow-Worker
  • Storage Admin und
  • Dienstkontonutzer (roles/iam.serviceAccountUser)

Weitere Informationen zu Rollen finden Sie in der Dokumentation.

  1. (Optional) Starten Sie den Kernel neu, starten Sie alle Zellen neu und überprüfen Sie die Ausgabe, bevor Sie Ihr Notebook zum Ausführen von Dataflow-Jobs verwenden.
  2. Entfernen Sie die folgenden Importanweisungen:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Fügen Sie die folgende Importanweisung hinzu:
from apache_beam.runners import DataflowRunner
  1. Entfernen Sie die folgende Option für die Aufnahmedauer:
ib.options.recording_duration = '60s'
  1. Fügen Sie den Pipelineoptionen Folgendes hinzu. Sie müssen den Cloud Storage-Speicherort so anpassen, dass er auf einen Bucket verweist, der Ihnen bereits gehört. Alternativ können Sie zu diesem Zweck einen neuen Bucket erstellen. Sie können den Regionswert auch von us-central1 ändern.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. Ersetzen Sie im Konstruktor von beam.Pipeline() InteractiveRunner durch DataflowRunner. p ist das Pipelineobjekt, das beim Erstellen der Pipeline verwendet wird.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Entfernen Sie die interaktiven Aufrufe aus Ihrem Code. Entfernen Sie beispielsweise show(), collect(), head(), show_graph() und watch() aus Ihrem Code.
  2. Damit Ergebnisse angezeigt werden, müssen Sie eine Senke hinzufügen. Im vorherigen Abschnitt haben wir Ergebnisse im Notebook visualisiert. Dieses Mal führen wir den Job jedoch außerhalb dieses Notebooks aus – in Dataflow. Daher benötigen wir einen externen Speicherort für unsere Ergebnisse. In diesem Beispiel schreiben wir die Ergebnisse in Textdateien in GCS (Google Cloud Storage). Da dies eine Streaming-Pipeline mit Daten-Windowing ist, möchten wir eine Textdatei pro Fenster erstellen. Fügen Sie dazu die folgenden Schritte zu Ihrer Pipeline hinzu:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Fügen Sie am Ende des Pipelinecodes p.run() hinzu.
  2. Überprüfen Sie nun Ihren Notebook-Code, um sicherzustellen, dass alle Änderungen berücksichtigt wurden. Sie sollte in etwa so aussehen:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Führen Sie die Zellen aus.
  2. Die Ausgabe sollte in etwa so aussehen:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Rufen Sie die Seite „Jobs“ für Dataflow auf, um zu prüfen, ob der Job ausgeführt wird. In der Liste sollte nun ein neuer Job zu sehen sein. Es dauert etwa 5 bis 10 Minuten, bis der Job mit der Verarbeitung der Daten beginnt.
  2. Wechseln Sie nach der Datenverarbeitung zu Cloud Storage und rufen Sie das Verzeichnis auf, in dem Dataflow die Ergebnisse speichert (Ihr definierter output_gcs_location). Sie sollten eine Liste mit Textdateien mit einer Datei pro Fenster sehen. bfcc5ce9e46a8b14.png
  3. Laden Sie die Datei herunter und prüfen Sie den Inhalt. Er sollte die Liste der Wörter und ihre jeweilige Anzahl enthalten. Alternativ können Sie die Dateien über die Befehlszeile prüfen. Führen Sie dazu folgenden Befehl in einer neuen Zelle in Ihrem Notebook aus:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Die Ausgabe sollte in etwa so aussehen:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Fertig! Vergessen Sie nicht, den von Ihnen erstellten Job zu bereinigen und zu stoppen (siehe letzten Schritt dieses Codelabs).

Ein Beispiel zur Durchführung dieser Konvertierung in einem interaktiven Notebook finden Sie im Dataflow-Notebook "Word Count" in Ihrer Notebookinstanz.

Alternativ können Sie Ihr Notebook als ausführbares Skript exportieren, die generierte PY-Datei mithilfe der vorherigen Schritte ändern und dann Ihre Pipeline im Dataflow-Dienst bereitstellen.

7. Notebook speichern

Von Ihnen erstellte Notebooks werden lokal in Ihrer ausgeführten Notebookinstanz gespeichert. Wenn Sie die Notebookinstanz während der Entwicklung zurücksetzen oder herunterfahren, bleiben diese neuen Notebooks bestehen, solange sie im Verzeichnis /home/jupyter erstellt werden. Wenn eine Notebookinstanz jedoch gelöscht wird, werden diese Notebooks auch gelöscht.

Um Ihre Notebooks für die zukünftige Verwendung zu behalten, laden Sie sie lokal auf Ihre Workstation herunter, speichern Sie sie auf GitHub oder exportieren Sie sie in ein anderes Dateiformat.

8. Bereinigen

Nachdem Sie die Verwendung Ihrer Apache Beam-Notebookinstanz abgeschlossen haben, bereinigen Sie die in Google Cloud erstellten Ressourcen. Beenden Sie die Notebookinstanz und beenden Sie den Streamingjob, falls Sie eine ausgeführt haben.

Wenn Sie ein Projekt ausschließlich für dieses Codelab erstellt haben, können Sie das Projekt auch vollständig beenden.