1. Einführung
Google Cloud Dataflow
Zuletzt aktualisiert:5. Juli 2023
Was ist Dataflow?
Dataflow ist ein verwalteter Dienst zur Ausführung eines breiten Spektrums an Datenverarbeitungsmustern. Die Dokumentation auf dieser Website zeigt Ihnen, wie Sie Ihre Pipelines für die Batch- und Streaming-Datenverarbeitung mit Dataflow bereitstellen, einschließlich Anweisungen zur Verwendung von Dienstfunktionen.
Das Apache Beam SDK ist ein Open-Source-Programmiermodell, mit dem Sie sowohl Batch- als auch Streamingpipelines entwickeln können. Sie erstellen Ihre Pipelines mit einem Apache Beam-Programm und führen sie dann im Dataflow-Dienst aus. Die Apache Beam-Dokumentation enthält detaillierte konzeptionelle Informationen und Referenzmaterial für das Apache Beam-Programmiermodell, SDKs und andere Runner.
Datenanalysen schnell streamen
Dataflow ermöglicht die schnelle, vereinfachte Entwicklung von Streamingdaten-Pipelines mit geringerer Datenlatenz.
Betrieb und Verwaltung vereinfachen
Dank des serverlosen Ansatzes von Dataflow entfällt der operative Aufwand von Data Engineering-Arbeitslasten, sodass sich Teams auf das Programmieren konzentrieren können, anstatt Servercluster zu verwalten.
Gesamtbetriebskosten senken
Durch das Autoscaling von Ressourcen und eine kostenoptimierte Batchverarbeitung bietet Dataflow praktisch unbegrenzte Kapazitäten für Ihre saisonalen Arbeitslasten und Lastspitzen, ohne dass unnötige Kosten anfallen.
Wichtige Funktionen
Automatisierte Ressourcenverwaltung und dynamischer Arbeitsausgleich
Dataflow automatisiert die Bereitstellung und Verwaltung von Verarbeitungsressourcen, um die Latenz zu minimieren und die Auslastung zu maximieren, sodass Sie Instanzen nicht manuell erstellen oder reservieren müssen. Die Arbeitsaufteilung wird ebenfalls automatisiert und optimiert, sodass Arbeitsverzögerungen dynamisch ausgeglichen werden. Keine Lust, nach den Hotkeys zu suchen oder Ihre Eingabedaten vorverarbeiten.
Horizontales Autoscaling
Durch horizontales Autoscaling von Worker-Ressourcen für einen optimalen Durchsatz wird das gesamte Preis-Leistungs-Verhältnis verbessert.
Flexible Ressourcenpreisplanung für die Batchverarbeitung
Zur flexiblen Verarbeitung bei der zeitlichen Planung von Jobs, z. B. über Nacht, bietet die flexible Ressourcenplanung (FlexRS) einen niedrigeren Preis für die Batchverarbeitung. Diese flexiblen Jobs werden in einer Warteschlange mit der Garantie platziert, dass sie innerhalb von sechs Stunden abgerufen und ausgeführt werden.
Was Sie im Rahmen dieses Kurses durchführen
Wenn Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, können Sie Pipelines iterativ entwickeln, Ihre Pipelinegrafik prüfen und einzelne PCollections in einem REPL-Workflow (Read-Eval-Print-Loop) analysieren. Diese Apache Beam-Notebooks werden über Vertex AI Workbench zur Verfügung gestellt, einen verwalteten Dienst, der virtuelle Notebook-Maschinen hostet, auf denen die neuesten Data-Science- und ML-Frameworks vorinstalliert sind.
Dieses Codelab konzentriert sich auf die Funktionen, die von Apache Beam-Notebooks eingeführt werden.
Aufgaben in diesem Lab
- Notebookinstanz erstellen
- Einfache Pipeline erstellen
- Daten aus unbegrenzter Quelle lesen
- Daten visualisieren
- Dataflow-Job vom Notebook aus starten
- Notebook speichern
Voraussetzungen
- Ein Google Cloud Platform-Projekt mit aktivierter Abrechnung.
- Google Cloud Dataflow und Google Cloud PubSub sind aktiviert.
2. Einrichtung
- Wählen Sie in der Cloud Console auf der Seite für die Projektauswahl ein Cloud-Projekt aus oder erstellen Sie eines.
Achten Sie darauf, dass die folgenden APIs aktiviert sind:
- Dataflow API
- Cloud Pub/Sub API
- Compute Engine
- Notebooks API
Sie können dies überprüfen, indem Sie die & Seite „Dienste“.
In dieser Anleitung lesen wir Daten aus einem Pub/Sub-Abo. Achten Sie daher darauf, dass das Compute Engine-Standarddienstkonto die Rolle „Bearbeiter“ hat, oder gewähren Sie ihm die Rolle „Pub/Sub-Bearbeiter“.
3. Erste Schritte mit Apache Beam-Notebooks
Apache Beam-Notebookinstanz starten
- Starten Sie Dataflow in der Console:
- Wählen Sie im Menü auf der linken Seite die Seite Workbench aus.
- Achten Sie darauf, dass Sie sich auf dem Tab Nutzerverwaltete Notebooks befinden.
- Klicken Sie in der Symbolleiste auf Neues Notebook.
- Wählen Sie Apache Beam > Ohne GPUs aus.
- Wählen Sie auf der Seite Neues Notebook ein Subnetzwerk für die Notebook-VM aus und klicken Sie auf Erstellen.
- Klicken Sie auf JupyterLab öffnen, wenn der Link aktiv wird. Vertex AI Workbench erstellt eine neue Apache Beam-Notebookinstanz.
4. Pipeline erstellen
Notebookinstanz erstellen
Wählen Sie Datei > Neu > Notebook und wählen Sie einen Kernel mit Apache Beam 2.47 oder höher aus.
Mit dem Hinzufügen von Code zu Ihrem Notebook beginnen
- Kopieren Sie den Code aus jedem Abschnitt und fügen Sie ihn in eine neue Zelle in Ihrem Notebook ein
- Zelle ausführen
Wenn Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, können Sie Pipelines iterativ entwickeln, Ihre Pipelinegrafik prüfen und einzelne PCollections in einem REPL-Workflow (Read-Eval-Print-Loop) analysieren.
Apache Beam ist auf Ihrer Notebookinstanz installiert. Fügen Sie daher die Module interactive_runner
und interactive_beam
bei Ihrem Notebook ein.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
Wenn Ihr Notebook andere Google-Dienste verwendet, fügen Sie die folgenden Importanweisungen hinzu:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
Interaktivitätsoptionen festlegen
Im Folgenden wird die Dauer der Datenerfassung auf 60 Sekunden festgelegt. Wenn Sie die Iteration beschleunigen möchten, legen Sie eine kürzere Dauer fest, zum Beispiel „10s“.
ib.options.recording_duration = '60s'
Weitere interaktive Optionen finden Sie in der interactive_beam.options-Klasse.
Initialisieren Sie die Pipeline mit einem InteractiveRunner
-Objekt.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
Daten lesen und visualisieren
Das folgende Beispiel zeigt eine Apache Beam-Pipeline, die ein Abo für das angegebene Pub/Sub-Thema erstellt und aus dem Abo liest.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
Die Pipeline zählt die Wörter nach Fenstern aus der Quelle. Sie erstellt feste Fensteraufrufe mit einer Dauer von jeweils 10 Sekunden pro Fenster.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
Nachdem die Daten im Fenstermodus angezeigt werden, werden die Wörter pro Fenster gezählt.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
Daten visualisieren
Die Methode show()
visualisiert die resultierende PCollection im Notebook.
ib.show(windowed_word_counts, include_window_info=True)
Übergeben Sie visualize_data=True
an die show()
-Methode, um Ihre Daten visuell darzustellen. So fügen Sie eine neue Zelle hinzu:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
Sie können mehrere Filter auf Ihre Visualisierungen anwenden. Mit der folgenden Visualisierung können Sie nach Label und Achse filtern:
5. Pandas DataFrame verwenden
Eine weitere nützliche Visualisierung in Apache Beam-Notebooks ist ein Pandas DataFrame. Im folgenden Beispiel werden die Wörter zuerst in Kleinbuchstaben umgewandelt und dann die Häufigkeit jedes Worts berechnet.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
Die collect()
-Methode stellt die Ausgabe in einem Pandas-DataFrame bereit.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (Optional) Dataflow-Jobs von Ihrem Notebook aus starten
- Zum Ausführen von Jobs in Dataflow benötigen Sie zusätzliche Berechtigungen. Achten Sie darauf, dass das Compute Engine-Standarddienstkonto die Rolle „Bearbeiter“ hat, oder weisen Sie ihm die folgenden IAM-Rollen zu:
- Dataflow-Administrator
- Dataflow-Worker
- Storage Admin und
- Dienstkontonutzer (roles/iam.serviceAccountUser)
Weitere Informationen zu Rollen finden Sie in der Dokumentation.
- (Optional) Starten Sie den Kernel neu, starten Sie alle Zellen neu und überprüfen Sie die Ausgabe, bevor Sie Ihr Notebook zum Ausführen von Dataflow-Jobs verwenden.
- Entfernen Sie die folgenden Importanweisungen:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- Fügen Sie die folgende Importanweisung hinzu:
from apache_beam.runners import DataflowRunner
- Entfernen Sie die folgende Option für die Aufnahmedauer:
ib.options.recording_duration = '60s'
- Fügen Sie den Pipelineoptionen Folgendes hinzu. Sie müssen den Cloud Storage-Speicherort so anpassen, dass er auf einen Bucket verweist, der Ihnen bereits gehört. Alternativ können Sie zu diesem Zweck einen neuen Bucket erstellen. Sie können den Regionswert auch von
us-central1
ändern.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- Ersetzen Sie im Konstruktor von
beam.Pipeline()
InteractiveRunner
durchDataflowRunner
.p
ist das Pipelineobjekt, das beim Erstellen der Pipeline verwendet wird.
p = beam.Pipeline(DataflowRunner(), options=options)
- Entfernen Sie die interaktiven Aufrufe aus Ihrem Code. Entfernen Sie beispielsweise
show()
,collect()
,head()
,show_graph()
undwatch()
aus Ihrem Code. - Damit Ergebnisse angezeigt werden, müssen Sie eine Senke hinzufügen. Im vorherigen Abschnitt haben wir Ergebnisse im Notebook visualisiert. Dieses Mal führen wir den Job jedoch außerhalb dieses Notebooks aus – in Dataflow. Daher benötigen wir einen externen Speicherort für unsere Ergebnisse. In diesem Beispiel schreiben wir die Ergebnisse in Textdateien in GCS (Google Cloud Storage). Da dies eine Streaming-Pipeline mit Daten-Windowing ist, möchten wir eine Textdatei pro Fenster erstellen. Fügen Sie dazu die folgenden Schritte zu Ihrer Pipeline hinzu:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- Fügen Sie am Ende des Pipelinecodes
p.run()
hinzu. - Überprüfen Sie nun Ihren Notebook-Code, um sicherzustellen, dass alle Änderungen berücksichtigt wurden. Sie sollte in etwa so aussehen:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- Führen Sie die Zellen aus.
- Die Ausgabe sollte in etwa so aussehen:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- Rufen Sie die Seite „Jobs“ für Dataflow auf, um zu prüfen, ob der Job ausgeführt wird. In der Liste sollte nun ein neuer Job zu sehen sein. Es dauert etwa 5 bis 10 Minuten, bis der Job mit der Verarbeitung der Daten beginnt.
- Wechseln Sie nach der Datenverarbeitung zu Cloud Storage und rufen Sie das Verzeichnis auf, in dem Dataflow die Ergebnisse speichert (Ihr definierter
output_gcs_location
). Sie sollten eine Liste mit Textdateien mit einer Datei pro Fenster sehen. - Laden Sie die Datei herunter und prüfen Sie den Inhalt. Er sollte die Liste der Wörter und ihre jeweilige Anzahl enthalten. Alternativ können Sie die Dateien über die Befehlszeile prüfen. Führen Sie dazu folgenden Befehl in einer neuen Zelle in Ihrem Notebook aus:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- Die Ausgabe sollte in etwa so aussehen:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- Fertig! Vergessen Sie nicht, den von Ihnen erstellten Job zu bereinigen und zu stoppen (siehe letzten Schritt dieses Codelabs).
Ein Beispiel zur Durchführung dieser Konvertierung in einem interaktiven Notebook finden Sie im Dataflow-Notebook "Word Count" in Ihrer Notebookinstanz.
Alternativ können Sie Ihr Notebook als ausführbares Skript exportieren, die generierte PY-Datei mithilfe der vorherigen Schritte ändern und dann Ihre Pipeline im Dataflow-Dienst bereitstellen.
7. Notebook speichern
Von Ihnen erstellte Notebooks werden lokal in Ihrer ausgeführten Notebookinstanz gespeichert. Wenn Sie die Notebookinstanz während der Entwicklung zurücksetzen oder herunterfahren, bleiben diese neuen Notebooks bestehen, solange sie im Verzeichnis /home/jupyter
erstellt werden. Wenn eine Notebookinstanz jedoch gelöscht wird, werden diese Notebooks auch gelöscht.
Um Ihre Notebooks für die zukünftige Verwendung zu behalten, laden Sie sie lokal auf Ihre Workstation herunter, speichern Sie sie auf GitHub oder exportieren Sie sie in ein anderes Dateiformat.
8. Bereinigen
Nachdem Sie die Verwendung Ihrer Apache Beam-Notebookinstanz abgeschlossen haben, bereinigen Sie die in Google Cloud erstellten Ressourcen. Beenden Sie die Notebookinstanz und beenden Sie den Streamingjob, falls Sie eine ausgeführt haben.
Wenn Sie ein Projekt ausschließlich für dieses Codelab erstellt haben, können Sie das Projekt auch vollständig beenden.