Notebooks mit Google Cloud Dataflow verwenden

1. Einführung

Cloud-Dataflow.png

Google Cloud Dataflow

Letzte Aktualisierung:05.07.2023

Was ist Dataflow?

Dataflow ist ein verwalteter Dienst zur Ausführung eines breiten Spektrums an Datenverarbeitungsmustern. Die Dokumentation auf dieser Website zeigt Ihnen, wie Sie Ihre Batch- und Streaming-Datenverarbeitungspipelines mit Dataflow bereitstellen. Sie enthält auch Anweisungen zur Verwendung der Servicefunktionen.

Das Apache Beam SDK ist ein Open-Source-Programmiermodell, mit dem Sie sowohl Batch- als auch Streaming-Pipelines entwickeln können. Sie erstellen Ihre Pipelines mit einem Apache Beam-Programm und führen sie dann im Dataflow-Dienst aus. Die Apache Beam-Dokumentation enthält ausführliche konzeptionelle Informationen und Referenzmaterial für das Apache Beam-Programmiermodell, SDKs und andere Runner.

Analyse von Streamingdaten mit hoher Geschwindigkeit

Dataflow ermöglicht die schnelle, vereinfachte Entwicklung von Streamingdaten-Pipelines mit besonders niedriger Latenz.

Vorgänge und Verwaltung vereinfachen

Dank des serverlosen Ansatzes von Dataflow entfällt der operative Aufwand von Data Engineering-Arbeitslasten, sodass sich Teams auf das Programmieren konzentrieren können und sich nicht um die Verwaltung von Serverclustern kümmern müssen.

Gesamtbetriebskosten reduzieren

Durch das Autoscaling von Ressourcen und eine kostenoptimierte Batchverarbeitung stellt Dataflow praktisch unbegrenzte Kapazitäten für Ihre nur temporär auftretenden Arbeitslasten und Lastspitzen bereit, ohne dass übermäßige Kosten anfallen.

Wichtige Funktionen

Automatisierte Ressourcenverwaltung und dynamischer Arbeitsausgleich

Dataflow automatisiert die Bereitstellung und Verwaltung von Verarbeitungsressourcen, sodass die Latenz minimiert und die Ressourcennutzung optimiert wird. Instanzen müssen dadurch nicht mehr manuell erstellt oder reserviert werden. Die Arbeitsaufteilung wird ebenfalls automatisiert und optimiert, sodass Arbeitsverzögerungen dynamisch ausgeglichen werden. Sie müssen nicht mehr nach „Hot Keys“ suchen, bei denen es durch hohe Aufrufraten zu Verzögerungen kommt, oder Ihre Eingabedaten vorverarbeiten.

Horizontales Autoscaling

Durch horizontales Autoscaling von Worker-Ressourcen zur Durchsatzoptimierung wird das gesamte Preis-Leistungs-Verhältnis verbessert.

Flexible Ressourcenpreisplanung für die Batchverarbeitung

Zur flexiblen Verarbeitung im Rahmen der zeitlichen Planung von Jobs, zum Beispiel über Nacht, ergibt sich mit der flexiblen Ressourcenplanung (FlexRS) ein günstigerer Preis für die Batchverarbeitung. Diese flexiblen Jobs werden in einer Warteschlange mit der Garantie platziert, dass sie innerhalb von sechs Stunden abgerufen und ausgeführt werden.

Was Sie im Rahmen dieser Kampagne ausführen

Wenn Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, können Sie Pipelines iterativ entwickeln, Ihre Pipelinegrafik prüfen und einzelne PCollections in einem REPL-Workflow (Read-Eval-Print-Loop) analysieren. Apache Beam-Notebooks werden über Vertex AI Workbench bereitgestellt, einem verwalteten Dienst, der virtuelle Notebookmaschinen hostet, auf denen die neuesten Data-Science- und ML-Frameworks vorinstalliert sind.

In diesem Codelab geht es um die von Apache Beam-Notebooks eingeführte Funktionalität.

Lerninhalte

  • Notebook-Instanz erstellen
  • Einfache Pipeline erstellen
  • Daten aus einer unbegrenzten Quelle lesen
  • Daten visualisieren
  • Dataflow-Job über das Notebook starten
  • Notebook speichern

Voraussetzungen

  • Ein Google Cloud-Projekt mit aktivierter Abrechnung.
  • Google Cloud Dataflow und Google Cloud Pub/Sub sind aktiviert.

2. Einrichtung

  1. Wählen Sie in der Cloud Console auf der Seite für die Projektauswahl ein Cloud-Projekt aus oder erstellen Sie eines.

Prüfen Sie, ob die folgenden APIs aktiviert sind:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

Sie können dies auf der Seite „APIs & Dienste“ überprüfen.

In dieser Anleitung lesen wir Daten aus einem Pub/Sub-Abo. Achten Sie daher darauf, dass das Compute Engine-Standarddienstkonto die Rolle „Bearbeiter“ hat oder weisen Sie ihm die Rolle „Pub/Sub-Bearbeiter“ zu.

3. Erste Schritte mit Apache Beam-Notebooks

Apache Beam-Notebookinstanz starten

  1. Dataflow in der Console starten:

  1. Wählen Sie im Menü auf der linken Seite die Seite Workbench aus.
  2. Achten Sie darauf, dass Sie sich auf dem Tab Nutzerverwaltete Notebooks befinden.
  3. Klicken Sie in der Symbolleiste auf Neues Notebook.
  4. Wählen Sie Apache Beam > Ohne GPUs aus.
  5. Wählen Sie auf der Seite Neues Notebook ein Subnetzwerk für die Notebook-VM aus und klicken Sie auf Erstellen.
  6. Klicken Sie auf JupyterLab öffnen, wenn der Link aktiv wird. Vertex AI Workbench erstellt eine neue Apache Beam-Notebookinstanz.

4. Pipeline erstellen

Notebookinstanz erstellen

Wählen Sie unter Datei > Neu > Notebook einen Kernel mit Apache Beam 2.47 oder höher aus.

Code in Ihr Notebook einfügen

  • Kopieren Sie den Code aus jedem Abschnitt und fügen Sie ihn in eine neue Zelle in Ihrem Notebook ein.
  • Zelle ausführen

6bd3dd86cc7cf802.png

Wenn Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, können Sie Pipelines iterativ entwickeln, Ihre Pipelinegrafik prüfen und einzelne PCollections in einem REPL-Workflow (Read-Eval-Print-Loop) analysieren.

Apache Beam ist auf Ihrer Notebookinstanz installiert. Fügen Sie daher die Module interactive_runner und interactive_beam bei Ihrem Notebook ein.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Wenn Ihr Notebook andere Google-Dienste verwendet, fügen Sie die folgenden Importanweisungen hinzu:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Interaktivitätsoptionen festlegen

Im Folgenden wird die Dauer der Datenerfassung auf 60 Sekunden festgelegt. Wenn Sie schneller iterieren möchten, legen Sie eine kürzere Dauer fest, z. B. „10s“.

ib.options.recording_duration = '60s'

Weitere Interaktivitätsoptionen finden Sie in der interactive_beam.options-Klasse.

Initialisieren Sie die Pipeline mit einem InteractiveRunner-Objekt.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Daten lesen und visualisieren

Das folgende Beispiel zeigt eine Apache Beam-Pipeline, die ein Abo für das angegebene Pub/Sub-Thema erstellt und aus dem Abo liest.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Die Pipeline zählt die Wörter nach Fenstern aus der Quelle. Sie erstellt feste Fensteraufrufe mit einer Dauer von jeweils 10 Sekunden pro Fenster.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Nachdem die Daten im Fenstermodus angezeigt werden, werden die Wörter pro Fenster gezählt.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Daten visualisieren

Die Methode show() visualisiert die resultierende PCollection im Notebook.

ib.show(windowed_word_counts, include_window_info=True)

Die show-Methode, die eine PCollection in Tabellenform visualisiert.

Zum Aufrufen von Visualisierungen Ihrer Daten übergeben Sie visualize_data=True an die show()-Methode. So fügen Sie eine neue Zelle hinzu:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Sie können mehrere Filter auf Ihre Visualisierungen anwenden. Mit der folgenden Visualisierung können Sie nach Label und Achse filtern:

Die show-Methode zur Visualisierung einer PCollection als umfangreiche Gruppe von filterbaren UI-Elementen.

5. Pandas-DataFrame verwenden

Eine weitere hilfreiche Visualisierung in Apache Beam-Notebooks ist ein Pandas DataFrame. Im folgenden Beispiel werden die Wörter zuerst in Kleinbuchstaben umgewandelt und dann die Häufigkeit jedes Worts berechnet.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

Die collect()-Methode stellt die Ausgabe in einem Pandas-DataFrame bereit.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Die collect-Methode, die eine PCollection in einem Pandas DataFrame darstellt.

6. (Optional) Dataflow-Jobs von Ihrem Notebook aus starten

  1. Zum Ausführen von Jobs in Dataflow benötigen Sie zusätzliche Berechtigungen. Achten Sie darauf, dass das Compute Engine-Standarddienstkonto die Rolle „Bearbeiter“ hat, oder weisen Sie ihm die folgenden IAM-Rollen zu:
  • Dataflow-Administrator
  • Dataflow-Worker
  • Storage-Administrator und
  • Dienstkontonutzer (roles/iam.serviceAccountUser)

Weitere Informationen zu Rollen finden Sie in der Dokumentation.

  1. (Optional) Starten Sie den Kernel neu, starten Sie alle Zellen neu und überprüfen Sie die Ausgabe, bevor Sie Ihr Notebook zum Ausführen von Dataflow-Jobs verwenden.
  2. Entfernen Sie die folgenden Importanweisungen:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Fügen Sie die folgende Importanweisung hinzu:
from apache_beam.runners import DataflowRunner
  1. Entfernen Sie die folgende Option für die Aufnahmedauer:
ib.options.recording_duration = '60s'
  1. Fügen Sie den Pipelineoptionen Folgendes hinzu. Sie müssen den Cloud Storage-Speicherort so anpassen, dass er auf einen Bucket verweist, der Ihnen bereits gehört. Alternativ können Sie einen neuen Bucket erstellen. Sie können den Wert für die Region auch über us-central1 ändern.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. Ersetzen Sie im Konstruktor von beam.Pipeline() InteractiveRunner durch DataflowRunner. p ist das Pipelineobjekt, das Sie beim Erstellen der Pipeline erstellt haben.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Entfernen Sie die interaktiven Aufrufe aus Ihrem Code. Entfernen Sie beispielsweise show(), collect(), head(), show_graph() und watch() aus Ihrem Code.
  2. Damit Sie Ergebnisse sehen können, müssen Sie ein Ziel hinzufügen. Im vorherigen Abschnitt haben wir die Ergebnisse im Notebook visualisiert. Dieses Mal führen wir den Job jedoch außerhalb dieses Notebooks aus, nämlich in Dataflow. Daher benötigen wir einen externen Speicherort für unsere Ergebnisse. In diesem Beispiel schreiben wir die Ergebnisse in Textdateien in GCS (Google Cloud Storage). Da es sich um eine Streamingpipeline mit Datenfenstern handelt, möchten wir eine Textdatei pro Fenster erstellen. Fügen Sie dazu Ihrer Pipeline die folgenden Schritte hinzu:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Fügen Sie am Ende des Pipelinecodes p.run() hinzu.
  2. Sehen Sie sich nun den Notebook-Code an, um zu prüfen, ob Sie alle Änderungen vorgenommen haben. Sie sollte in etwa so aussehen:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Führen Sie die Zellen aus.
  2. Die Ausgabe sollte in etwa so aussehen:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Rufen Sie die Seite „Jobs“ für Dataflow auf, um zu prüfen, ob der Job ausgeführt wird. In der Liste sollte ein neuer Job angezeigt werden. Es dauert etwa 5 bis 10 Minuten, bis mit der Verarbeitung der Daten begonnen wird.
  2. Sobald die Daten verarbeitet werden, rufen Sie Cloud Storage auf und navigieren Sie zu dem Verzeichnis, in dem Dataflow die Ergebnisse speichert (Ihr definierter output_gcs_location). Dort sollte eine Liste von Textdateien angezeigt werden, mit einer Datei pro Fenster. bfcc5ce9e46a8b14.png
  3. Laden Sie die Datei herunter und prüfen Sie den Inhalt. Sie sollte die Liste der Wörter zusammen mit ihrer Anzahl enthalten. Alternativ können Sie die Dateien über die Befehlszeile prüfen. Führen Sie dazu den folgenden Code in einer neuen Zelle in Ihrem Notebook aus:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Die Ausgabe sollte in etwa so aussehen:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Geschafft! Vergessen Sie nicht, den erstellten Job zu bereinigen und zu beenden (siehe letzten Schritt dieses Codelabs).

Ein Beispiel zur Durchführung dieser Konvertierung in einem interaktiven Notebook finden Sie im Dataflow-Notebook "Word Count" in Ihrer Notebookinstanz.

Alternativ können Sie Ihr Notebook als ausführbares Skript exportieren, die generierte .py-Datei mithilfe der vorherigen Schritte ändern und dann Ihre Pipeline im Dataflow-Dienst bereitstellen.

7. Notebook speichern

Von Ihnen erstellte Notebooks werden lokal in Ihrer ausgeführten Notebookinstanz gespeichert. Wenn Sie die Notebookinstanz zurücksetzen oder während der Entwicklung herunterfahren, werden diese neuen Notebooks beibehalten, solange sie im Verzeichnis /home/jupyter erstellt werden. Wenn eine Notebookinstanz jedoch gelöscht wird, werden diese Notebooks auch gelöscht.

Zur Beibehaltung der Notebooks für die zukünftige Verwendung laden Sie sie lokal auf Ihre Workstation herunter, speichern Sie sie in GitHub oder exportieren Sie sie in ein anderes Dateiformat.

8. Bereinigen

Nachdem Sie die Verwendung der Apache Beam-Notebookinstanz abgeschlossen haben, bereinigen Sie die in Google Cloud erstellten Ressourcen, indem Sie die Notebookinstanz herunterfahren und den Streamingjob beenden, falls Sie einen ausgeführt haben.

Wenn Sie ein Projekt nur für dieses Codelab erstellt haben, können Sie es auch vollständig schließen.