1. Übersicht

Was ist Dataflow?
Dataflow ist ein verwalteter Dienst zur Ausführung eines breiten Spektrums an Datenverarbeitungsmustern. Die Dokumentation auf dieser Website zeigt Ihnen, wie Sie Ihre Batch- und Streaming-Datenverarbeitungspipelines mit Dataflow bereitstellen. Sie enthält auch Anweisungen zur Verwendung der Servicefunktionen.
Das Apache Beam SDK ist ein Open-Source-Programmiermodell, mit dem Sie sowohl Batch- als auch Streaming-Pipelines entwickeln können. Sie erstellen Ihre Pipelines mit einem Apache Beam-Programm und führen sie dann im Dataflow-Dienst aus. Die Apache Beam-Dokumentation enthält ausführliche konzeptionelle Informationen und Referenzmaterial für das Apache Beam-Programmiermodell, SDKs und andere Runner.
Analyse von Streamingdaten mit hoher Geschwindigkeit
Dataflow ermöglicht die schnelle, vereinfachte Entwicklung von Streamingdaten-Pipelines mit besonders niedriger Latenz.
Vorgänge und Verwaltung vereinfachen
Dank des serverlosen Ansatzes von Dataflow entfällt der operative Aufwand von Data Engineering-Arbeitslasten, sodass sich Teams auf das Programmieren konzentrieren können und sich nicht um die Verwaltung von Serverclustern kümmern müssen.
Gesamtbetriebskosten reduzieren
Durch das Autoscaling von Ressourcen und eine kostenoptimierte Batchverarbeitung stellt Dataflow praktisch unbegrenzte Kapazitäten für Ihre nur temporär auftretenden Arbeitslasten und Lastspitzen bereit, ohne dass übermäßige Kosten anfallen.
Wichtige Funktionen
Automatisierte Ressourcenverwaltung und dynamischer Arbeitsausgleich
Dataflow automatisiert die Bereitstellung und Verwaltung von Verarbeitungsressourcen, sodass die Latenz minimiert und die Ressourcennutzung optimiert wird. Instanzen müssen dadurch nicht mehr manuell erstellt oder reserviert werden. Die Arbeitsaufteilung wird ebenfalls automatisiert und optimiert, sodass Arbeitsverzögerungen dynamisch ausgeglichen werden. Sie müssen nicht mehr nach „Hot Keys“ suchen, bei denen es durch hohe Aufrufraten zu Verzögerungen kommt, oder Ihre Eingabedaten vorverarbeiten.
Horizontales Autoscaling
Durch horizontales Autoscaling von Worker-Ressourcen zur Durchsatzoptimierung wird das gesamte Preis-Leistungs-Verhältnis verbessert.
Flexible Ressourcenplanungspreise für die Batchverarbeitung
Zur flexiblen Verarbeitung im Rahmen der zeitlichen Planung von Jobs, zum Beispiel über Nacht, ergibt sich mit der flexiblen Ressourcenplanung (FlexRS) ein günstigerer Preis für die Batchverarbeitung. Diese flexiblen Jobs werden in einer Warteschlange mit der Garantie platziert, dass sie innerhalb von sechs Stunden abgerufen und ausgeführt werden.
Diese Anleitung basiert auf https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.
Lerninhalte
- Maven-Projekt mit Apache Beam und dem Java SDK erstellen
- Beispiel-Pipeline mithilfe der Google Cloud Platform Console ausführen
- Zugehörigen Cloud Storage-Bucket und seine Inhalte löschen
Voraussetzungen
Wie werden Sie diese Anleitung verwenden?
Wie würden Sie Ihre Erfahrungen mit der Verwendung von Google Cloud Platform-Diensten bewerten?
2. Einrichtung und Anforderungen
Umgebung zum selbstbestimmten Lernen einrichten
- Melden Sie sich in der Cloud Console an und erstellen Sie ein neues Projekt oder verwenden Sie ein vorhandenes Projekt. Wenn Sie noch kein Gmail- oder G Suite-Konto haben, müssen Sie eines erstellen.
Notieren Sie sich die Projekt-ID, also den projektübergreifend nur einmal vorkommenden Namen eines Google Cloud-Projekts. Der oben angegebene Name ist bereits vergeben und kann leider nicht mehr verwendet werden. Sie wird später in diesem Codelab als PROJECT_ID bezeichnet.
- Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Google Cloud-Ressourcen verwenden zu können.
Die Durchführung dieses Codelabs sollte keine oder nur geringe Kosten verursachen. Folgen Sie bitte der Anleitung im Abschnitt „Bereinigen“, in der Sie erfahren, wie Sie Ressourcen herunterfahren können, damit nach Abschluss dieser Anleitung keine Gebühren anfallen. Neue Nutzer von Google Cloud kommen für das Programm für den kostenlosen Testzeitraum mit einem Guthaben von 300$ infrage.
APIs aktivieren
Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

Wählen Sie im Drop-down-Menü APIs & Dienste > Dashboard aus.

Wählen Sie + APIs und Dienste aktivieren aus.

Suchen Sie im Suchfeld nach „Compute Engine“. Klicken Sie in der angezeigten Ergebnisliste auf „Compute Engine API“.

Klicken Sie auf der Seite „Google Compute Engine“ auf Aktivieren.

Klicken Sie nach der Aktivierung auf den Pfeil, um zurückzugehen.
Suchen Sie nun nach den folgenden APIs und aktivieren Sie sie ebenfalls:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager APIs
3. Neuen Cloud Storage-Bucket erstellen
Klicken Sie in der Google Cloud Platform Console oben links auf das Symbol Menü:

Scrollen Sie nach unten und wählen Sie im Unterabschnitt Speicher die Option Cloud Storage > Browser aus:

Sie sollten jetzt den Cloud Storage-Browser sehen. Wenn Sie ein Projekt verwenden, in dem derzeit keine Cloud Storage-Buckets vorhanden sind, werden Sie aufgefordert, einen neuen Bucket zu erstellen. Klicken Sie auf die Schaltfläche Bucket erstellen, um einen Bucket zu erstellen:

Geben Sie einen Namen für den Bucket ein. Wie im Dialogfeld angegeben, müssen Bucket-Namen in Cloud Storage eindeutig sein. Wenn Sie also einen offensichtlichen Namen wie „test“ auswählen, hat wahrscheinlich schon jemand einen Bucket mit diesem Namen erstellt und Sie erhalten eine Fehlermeldung.
Außerdem gibt es einige Regeln dazu, welche Zeichen in Bucket-Namen zulässig sind. Wenn Sie den Bucket-Namen mit einem Buchstaben oder einer Zahl beginnen und beenden und nur Bindestriche in der Mitte verwenden, ist alles in Ordnung. Wenn Sie versuchen, Sonderzeichen zu verwenden oder den Bucket-Namen mit etwas anderem als einem Buchstaben oder einer Ziffer beginnen oder beenden, werden Sie im Dialogfeld an die Regeln erinnert.

Geben Sie einen eindeutigen Namen für den Bucket ein und klicken Sie auf Erstellen. Wenn Sie etwas auswählen, das bereits verwendet wird, wird die oben gezeigte Fehlermeldung angezeigt. Anschließend sehen Sie den neuen, leeren Bucket im Browser:

Der Bucket-Name, den Sie sehen, ist natürlich anders, da er für alle Projekte eindeutig sein muss.
4. Cloud Shell starten
Cloud Shell aktivieren
- Klicken Sie in der Cloud Console auf Cloud Shell aktivieren
.
Wenn Sie Cloud Shell noch nie gestartet haben, wird ein Fenster mit einer Beschreibung eingeblendet. Klicken Sie in diesem Fall einfach auf Weiter. So sieht dieses Fenster aus:
Das Herstellen der Verbindung mit der Cloud Shell sollte nur wenige Augenblicke dauern.
Diese virtuelle Maschine verfügt über sämtliche Entwicklertools, die Sie benötigen. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft in Google Cloud, was die Netzwerkleistung und Authentifizierung erheblich verbessert. Die meisten, wenn nicht sogar alle Aufgaben in diesem Codelab können mit einem Browser oder Ihrem Chromebook erledigt werden.
Sobald die Verbindung mit der Cloud Shell hergestellt ist, sehen Sie, dass Sie bereits authentifiziert sind und für das Projekt schon Ihre Projekt-ID eingestellt ist.
- Führen Sie in der Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:
gcloud auth list
Befehlsausgabe
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
Befehlsausgabe
[core] project = <PROJECT_ID>
Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:
gcloud config set project <PROJECT_ID>
Befehlsausgabe
Updated property [core/project].
5. Maven-Projekt erstellen
Nachdem Cloud Shell gestartet wurde, erstellen wir zuerst ein Maven-Projekt mit dem Java SDK für Apache Beam.
Apache Beam ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren diese Pipelines mit einem Apache Beam-Programm und können einen Runner wie Dataflow zum Ausführen Ihrer Pipeline auswählen.
Führen Sie den Befehl mvn archetype:generate in der Shell so aus:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Nachdem Sie den Befehl ausgeführt haben, sollte ein neues Verzeichnis namens first-dataflow unter dem aktuellen Verzeichnis angezeigt werden. first-dataflow enthält ein Maven-Projekt mit dem Cloud Dataflow SDK für Java und Beispielpipelines.
6. Textverarbeitungspipeline in Cloud Dataflow ausführen
Speichern Sie zuerst die Projekt-ID und die Namen der Cloud Storage-Buckets als Umgebungsvariablen. Sie können dies in Cloud Shell tun. Ersetzen Sie <your_project_id> durch Ihre eigene Projekt-ID.
export PROJECT_ID=<your_project_id>
Wiederholen Sie den Vorgang für den Cloud Storage-Bucket. Ersetzen Sie <your_bucket_name> durch den eindeutigen Namen, den Sie zum Erstellen des Buckets in einem früheren Schritt verwendet haben.
export BUCKET_NAME=<your_bucket_name>
Wechseln Sie zum Verzeichnis first-dataflow/.
cd first-dataflow
Wir führen eine Pipeline mit dem Namen "WordCount" aus, die Text liest, Textzeilen in einzelne Wörter tokenisiert und für jedes dieser Wörter eine Häufigkeitszählung durchführt. Zuerst führen wir die Pipeline aus und sehen uns dann an, was bei den einzelnen Schritten passiert.
Starten Sie die Pipeline, indem Sie den Befehl mvn compile exec:java in der Shell oder im Terminalfenster ausführen. Für die Argumente --project, --stagingLocation, und --output wird im Befehl unten auf die Umgebungsvariablen verwiesen, die Sie zuvor in diesem Schritt eingerichtet haben.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Suchen Sie den Job in der Jobliste, solange der Job ausgeführt wird.
Öffnen Sie die Cloud Dataflow-Web-UI in der Google Cloud Platform Console. Für den Job zum Zählen der Wörter sollte nun der Status Wird ausgeführt angezeigt werden:

Sehen wir uns nun die Pipeline-Parameter an. Klicken Sie zuerst auf den Namen des Jobs.

Wenn Sie einen Job auswählen, können Sie sich die Ausführungsgrafik ansehen. Darin wird jede Transformation in der Pipeline als Feld dargestellt, das den Transformationsnamen und einige Statusangaben enthält. Falls Sie weitere Details zu einem Schritt sehen möchten, klicken Sie auf das Caret-Zeichen oben rechts.

So wandelt die Pipeline die Daten in den einzelnen Schritten um:
- Lesen: In diesem Schritt liest die Pipeline aus einer Eingabequelle. In diesem Fall ist es eine Textdatei aus Cloud Storage mit dem gesamten Text des Shakespeare-Stücks King Lear. Unsere Pipeline liest die Datei zeilenweise und gibt für jede Zeile eine
PCollectionaus. Jede Zeile in unserer Textdatei ist ein Element in der Sammlung. - CountWords: Der Schritt
CountWordszum Zählen der Wörter besteht aus zwei Teilen. Zuerst wird eine parallele Do-Funktion (ParDo) mit dem NamenExtractWordsverwendet, um jede Zeile in einzelne Wörter zu zerlegen. Die Ausgabe von ExtractWords ist eine neue PCollection, in der jedes Element ein Wort ist. Im nächsten Schritt,Count, wird eine Transformation verwendet, die vom Java SDK bereitgestellt wird und Schlüssel/Wert-Paare zurückgibt, wobei der Schlüssel ein eindeutiges Wort und der Wert die Anzahl der Vorkommen ist. Unten sehen Sie die Methode zur Implementierung vonCountWords. Die vollständige Datei „WordCount.java“ finden Sie auf GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: Hiermit wird die unten kopierte
FormatAsTextFnaufgerufen, die jedes Schlüssel/Wert-Paar in einen druckbaren String formatiert.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: In diesem Schritt werden die druckbaren Strings in mehrere partitionierte Textdateien geschrieben.
Wir sehen uns die Ausgabe der Pipeline gleich an.
Sehen Sie sich nun die Seite Jobinfo rechts neben dem Diagramm an. Sie enthält die Pipeline-Parameter, die wir in den mvn compile exec:java-Befehl aufgenommen haben.


Sie sehen dort auch benutzerdefinierte Zähler für die Pipeline. In diesem Fall wird angezeigt, wie viele leere Zeilen bisher erkannt wurden. Sie können Ihrer Pipeline neue Zähler hinzufügen, um anwendungsspezifische Messwerte zu erfassen.

Sie können unten in der Konsole auf das Symbol Logs klicken, um die spezifischen Fehlermeldungen aufzurufen.

Im Bereich werden standardmäßig Job-Lognachrichten angezeigt, die den Status des Jobs insgesamt angeben. Mit der Auswahl „Minimale Wichtigkeit“ können Sie den Jobfortschritt und Statusnachrichten filtern.

Durch Auswählen eines Pipelineschritts in der Grafik wird die Ansicht so geändert, dass sie die durch den Code generierten Logs und den generierten Code anzeigt, der in dem Pipelineschritt ausgeführt wird.
Wenn Sie wieder zu den Joblogs wechseln möchten, schließen Sie die Ansicht des Schritts. Klicken Sie dazu außerhalb der Grafik oder verwenden Sie die Schaltfläche „Schließen“ im rechten Fensterbereich.
Über die Schaltfläche Worker-Logs auf dem Tab „Logs“ können Sie Worker-Logs für die Compute Engine-Instanzen aufrufen, die Ihre Pipeline ausführen. Worker-Logs bestehen aus Log-Zeilen, die anhand Ihres Codes und des in Dataflow generierten Codes ausgegeben werden.
Wenn Sie versuchen, einen Fehler in der Pipeline zu beheben, finden Sie häufig zusätzliche Protokolle in den Worker-Logs, die Ihnen bei der Problemlösung helfen. Diese Logs werden für alle Worker zusammengefasst und können gefiltert und durchsucht werden.

Auf der Dataflow-Überwachungsoberfläche werden nur die neuesten Log-Nachrichten angezeigt. Sie können alle Logs aufrufen, indem Sie rechts im Logbereich auf den Link „Google Cloud Observability“ klicken.

Hier finden Sie eine Zusammenfassung der verschiedenen Logtypen, die auf der Seite „Monitoring“ → „Logs“ angezeigt werden können:
- job-message-Logs enthalten Jobnachrichten, die von verschiedenen Komponenten von Dataflow generiert werden. Beispiele hierfür sind die automatische Skalierungskonfiguration beim Starten oder Herunterfahren von Workern, der Fortschritt des Jobschritts und Jobfehler. Fehler auf Worker-Ebene, die durch einen Absturz des Nutzercodes verursacht wurden und in Worker-Logs vorhanden sind, werden auch in die job-message-Logs übertragen.
- Worker-Logs werden von Dataflow-Workern erstellt. Worker erledigen die meisten Pipelineaufgaben. Sie wenden z. B. ParDos auf Daten an. Worker-Logs enthalten von Ihrem Code und von Dataflow erfasste Nachrichten.
- Worker-Startup-Logs sind in den meisten Dataflow-Jobs vorhanden und erfassen mit dem Startvorgang zusammenhängende Nachrichten. Dieser umfasst das Herunterladen der JAR-Dateien eines Jobs aus Cloud Storage und das anschließende Starten der Worker. Bei Problemen mit dem Worker-Start können diese Logs aufschlussreich sein.
- Shuffler-Logs enthalten Nachrichten von Workern, die die Ergebnisse paralleler Pipelineoperationen konsolidieren.
- Docker- und Kubelet-Logs enthalten Nachrichten, die mit diesen öffentlichen Technologien in Zusammenhang stehen und für Dataflow-Worker genutzt werden.
Im nächsten Schritt prüfen wir, ob Ihr Job erfolgreich ausgeführt wurde.
7. Erfolgreiche Ausführung des Jobs überprüfen
Öffnen Sie die Cloud Dataflow-Web-UI in der Google Cloud Platform Console.
Ihr Job zum Zählen der Wörter sollte zuerst den Status Wird ausgeführt und dann Erfolgreich haben:

Die Ausführung des Jobs dauert etwa drei bis vier Minuten.
Wir haben zuvor die Pipeline ausgeführt und einen Bucket für die Ausgabe angegeben. Sehen wir uns nun das Ergebnis an, denn Sie möchten bestimmt wissen, wie häufig jedes einzelne Wort in König Lear vorkommt. Kehren Sie in der Google Cloud Console zum Cloud Storage-Browser zurück. In Ihrem Bucket sollten Sie die Ausgabedateien und die Staging-Dateien sehen, die von Ihrem Job erstellt wurden:

8. Ressourcen herunterfahren
Sie können Ihre Ressourcen über die Google Cloud Console herunterfahren.
Öffnen Sie in der Google Cloud Platform Console den Cloud Storage-Browser.

Klicken Sie auf das Kästchen neben dem von Ihnen erstellten Bucket und dann auf LÖSCHEN, um den Bucket und seinen Inhalt dauerhaft zu löschen.


9. Glückwunsch!
Sie haben gelernt, wie Sie mit dem Cloud Dataflow SDK ein Maven-Projekt erstellen und mithilfe der Google Cloud Platform Console eine Beispiel-Pipeline ausführen sowie den zugehörigen Cloud Storage-Bucket und dessen Inhalte löschen.
Weitere Informationen
Lizenz
Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.