Pipeline zur Big-Data-Textverarbeitung in Cloud Dataflow ausführen

1. Übersicht

Cloud-Dataflow.png

Was ist Dataflow?

Dataflow ist ein verwalteter Dienst zur Ausführung eines breiten Spektrums an Datenverarbeitungsmustern. Die Dokumentation auf dieser Website zeigt Ihnen, wie Sie Ihre Batch- und Streaming-Datenverarbeitungspipelines mit Dataflow bereitstellen. Sie enthält auch Anweisungen zur Verwendung der Servicefunktionen.

Das Apache Beam SDK ist ein Open-Source-Programmiermodell, mit dem Sie sowohl Batch- als auch Streaming-Pipelines entwickeln können. Sie erstellen Ihre Pipelines mit einem Apache Beam-Programm und führen sie dann im Dataflow-Dienst aus. Die Apache Beam-Dokumentation enthält ausführliche konzeptionelle Informationen und Referenzmaterial für das Apache Beam-Programmiermodell, SDKs und andere Runner.

Analyse von Streamingdaten mit hoher Geschwindigkeit

Dataflow ermöglicht die schnelle, vereinfachte Entwicklung von Streamingdaten-Pipelines mit besonders niedriger Latenz.

Vorgänge und Verwaltung vereinfachen

Dank des serverlosen Ansatzes von Dataflow entfällt der operative Aufwand von Data Engineering-Arbeitslasten, sodass sich Teams auf das Programmieren konzentrieren können und sich nicht um die Verwaltung von Serverclustern kümmern müssen.

Gesamtbetriebskosten reduzieren

Durch das Autoscaling von Ressourcen und eine kostenoptimierte Batchverarbeitung stellt Dataflow praktisch unbegrenzte Kapazitäten für Ihre nur temporär auftretenden Arbeitslasten und Lastspitzen bereit, ohne dass übermäßige Kosten anfallen.

Wichtige Funktionen

Automatisierte Ressourcenverwaltung und dynamischer Arbeitsausgleich

Dataflow automatisiert die Bereitstellung und Verwaltung von Verarbeitungsressourcen, sodass die Latenz minimiert und die Ressourcennutzung optimiert wird. Instanzen müssen dadurch nicht mehr manuell erstellt oder reserviert werden. Die Arbeitsaufteilung wird ebenfalls automatisiert und optimiert, sodass Arbeitsverzögerungen dynamisch ausgeglichen werden. Sie müssen nicht mehr nach „Hot Keys“ suchen, bei denen es durch hohe Aufrufraten zu Verzögerungen kommt, oder Ihre Eingabedaten vorverarbeiten.

Horizontales Autoscaling

Durch horizontales Autoscaling von Worker-Ressourcen zur Durchsatzoptimierung wird das gesamte Preis-Leistungs-Verhältnis verbessert.

Flexible Ressourcenplanungspreise für die Batchverarbeitung

Zur flexiblen Verarbeitung im Rahmen der zeitlichen Planung von Jobs, zum Beispiel über Nacht, ergibt sich mit der flexiblen Ressourcenplanung (FlexRS) ein günstigerer Preis für die Batchverarbeitung. Diese flexiblen Jobs werden in einer Warteschlange mit der Garantie platziert, dass sie innerhalb von sechs Stunden abgerufen und ausgeführt werden.

Diese Anleitung basiert auf https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.

Lerninhalte

  • Maven-Projekt mit Apache Beam und dem Java SDK erstellen
  • Beispiel-Pipeline mithilfe der Google Cloud Platform Console ausführen
  • Zugehörigen Cloud Storage-Bucket und seine Inhalte löschen

Voraussetzungen

Wie werden Sie diese Anleitung verwenden?

Nur lesen Lesen und Übungen durchführen

Wie würden Sie Ihre Erfahrungen mit der Verwendung von Google Cloud Platform-Diensten bewerten?

Anfänger Mittelstufe Fortgeschritten

2. Einrichtung und Anforderungen

Umgebung zum selbstbestimmten Lernen einrichten

  1. Melden Sie sich in der Cloud Console an und erstellen Sie ein neues Projekt oder verwenden Sie ein vorhandenes Projekt. Wenn Sie noch kein Gmail- oder G Suite-Konto haben, müssen Sie eines erstellen.

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Notieren Sie sich die Projekt-ID, also den projektübergreifend nur einmal vorkommenden Namen eines Google Cloud-Projekts. Der oben angegebene Name ist bereits vergeben und kann leider nicht mehr verwendet werden. Sie wird später in diesem Codelab als PROJECT_ID bezeichnet.

  1. Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Google Cloud-Ressourcen verwenden zu können.

Die Durchführung dieses Codelabs sollte keine oder nur geringe Kosten verursachen. Folgen Sie bitte der Anleitung im Abschnitt „Bereinigen“, in der Sie erfahren, wie Sie Ressourcen herunterfahren können, damit nach Abschluss dieser Anleitung keine Gebühren anfallen. Neue Nutzer von Google Cloud kommen für das Programm für den kostenlosen Testzeitraum mit einem Guthaben von 300$ infrage.

APIs aktivieren

Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

2bfc27ef9ba2ec7d.png

Wählen Sie im Drop-down-Menü APIs & Dienste > Dashboard aus.

5b65523a6cc0afa6.png

Wählen Sie + APIs und Dienste aktivieren aus.

81ed72192c0edd96.png

Suchen Sie im Suchfeld nach „Compute Engine“. Klicken Sie in der angezeigten Ergebnisliste auf „Compute Engine API“.

3f201e991c7b4527.png

Klicken Sie auf der Seite „Google Compute Engine“ auf Aktivieren.

ac121653277fa7bb.png

Klicken Sie nach der Aktivierung auf den Pfeil, um zurückzugehen.

Suchen Sie nun nach den folgenden APIs und aktivieren Sie sie ebenfalls:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager APIs

3. Neuen Cloud Storage-Bucket erstellen

Klicken Sie in der Google Cloud Platform Console oben links auf das Symbol Menü:

2bfc27ef9ba2ec7d.png

Scrollen Sie nach unten und wählen Sie im Unterabschnitt Speicher die Option Cloud Storage > Browser aus:

2b6c3a2a92b47015.png

Sie sollten jetzt den Cloud Storage-Browser sehen. Wenn Sie ein Projekt verwenden, in dem derzeit keine Cloud Storage-Buckets vorhanden sind, werden Sie aufgefordert, einen neuen Bucket zu erstellen. Klicken Sie auf die Schaltfläche Bucket erstellen, um einen Bucket zu erstellen:

a711016d5a99dc37.png

Geben Sie einen Namen für den Bucket ein. Wie im Dialogfeld angegeben, müssen Bucket-Namen in Cloud Storage eindeutig sein. Wenn Sie also einen offensichtlichen Namen wie „test“ auswählen, hat wahrscheinlich schon jemand einen Bucket mit diesem Namen erstellt und Sie erhalten eine Fehlermeldung.

Außerdem gibt es einige Regeln dazu, welche Zeichen in Bucket-Namen zulässig sind. Wenn Sie den Bucket-Namen mit einem Buchstaben oder einer Zahl beginnen und beenden und nur Bindestriche in der Mitte verwenden, ist alles in Ordnung. Wenn Sie versuchen, Sonderzeichen zu verwenden oder den Bucket-Namen mit etwas anderem als einem Buchstaben oder einer Ziffer beginnen oder beenden, werden Sie im Dialogfeld an die Regeln erinnert.

3a5458648cfe3358.png

Geben Sie einen eindeutigen Namen für den Bucket ein und klicken Sie auf Erstellen. Wenn Sie etwas auswählen, das bereits verwendet wird, wird die oben gezeigte Fehlermeldung angezeigt. Anschließend sehen Sie den neuen, leeren Bucket im Browser:

3bda986ae88c4e71.png

Der Bucket-Name, den Sie sehen, ist natürlich anders, da er für alle Projekte eindeutig sein muss.

4. Cloud Shell starten

Cloud Shell aktivieren

  1. Klicken Sie in der Cloud Console auf Cloud Shell aktivieren H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Wenn Sie Cloud Shell noch nie gestartet haben, wird ein Fenster mit einer Beschreibung eingeblendet. Klicken Sie in diesem Fall einfach auf Weiter. So sieht dieses Fenster aus:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Das Herstellen der Verbindung mit der Cloud Shell sollte nur wenige Augenblicke dauern.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Diese virtuelle Maschine verfügt über sämtliche Entwicklertools, die Sie benötigen. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft in Google Cloud, was die Netzwerkleistung und Authentifizierung erheblich verbessert. Die meisten, wenn nicht sogar alle Aufgaben in diesem Codelab können mit einem Browser oder Ihrem Chromebook erledigt werden.

Sobald die Verbindung mit der Cloud Shell hergestellt ist, sehen Sie, dass Sie bereits authentifiziert sind und für das Projekt schon Ihre Projekt-ID eingestellt ist.

  1. Führen Sie in der Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:
gcloud auth list

Befehlsausgabe

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Befehlsausgabe

[core]
project = <PROJECT_ID>

Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:

gcloud config set project <PROJECT_ID>

Befehlsausgabe

Updated property [core/project].

5. Maven-Projekt erstellen

Nachdem Cloud Shell gestartet wurde, erstellen wir zuerst ein Maven-Projekt mit dem Java SDK für Apache Beam.

Apache Beam ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren diese Pipelines mit einem Apache Beam-Programm und können einen Runner wie Dataflow zum Ausführen Ihrer Pipeline auswählen.

Führen Sie den Befehl mvn archetype:generate in der Shell so aus:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Nachdem Sie den Befehl ausgeführt haben, sollte ein neues Verzeichnis namens first-dataflow unter dem aktuellen Verzeichnis angezeigt werden. first-dataflow enthält ein Maven-Projekt mit dem Cloud Dataflow SDK für Java und Beispielpipelines.

6. Textverarbeitungspipeline in Cloud Dataflow ausführen

Speichern Sie zuerst die Projekt-ID und die Namen der Cloud Storage-Buckets als Umgebungsvariablen. Sie können dies in Cloud Shell tun. Ersetzen Sie <your_project_id> durch Ihre eigene Projekt-ID.

 export PROJECT_ID=<your_project_id>

Wiederholen Sie den Vorgang für den Cloud Storage-Bucket. Ersetzen Sie <your_bucket_name> durch den eindeutigen Namen, den Sie zum Erstellen des Buckets in einem früheren Schritt verwendet haben.

 export BUCKET_NAME=<your_bucket_name>

Wechseln Sie zum Verzeichnis first-dataflow/.

 cd first-dataflow

Wir führen eine Pipeline mit dem Namen "WordCount" aus, die Text liest, Textzeilen in einzelne Wörter tokenisiert und für jedes dieser Wörter eine Häufigkeitszählung durchführt. Zuerst führen wir die Pipeline aus und sehen uns dann an, was bei den einzelnen Schritten passiert.

Starten Sie die Pipeline, indem Sie den Befehl mvn compile exec:java in der Shell oder im Terminalfenster ausführen. Für die Argumente --project, --stagingLocation, und --output wird im Befehl unten auf die Umgebungsvariablen verwiesen, die Sie zuvor in diesem Schritt eingerichtet haben.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Suchen Sie den Job in der Jobliste, solange der Job ausgeführt wird.

Öffnen Sie die Cloud Dataflow-Web-UI in der Google Cloud Platform Console. Für den Job zum Zählen der Wörter sollte nun der Status Wird ausgeführt angezeigt werden:

3623be74922e3209.png

Sehen wir uns nun die Pipeline-Parameter an. Klicken Sie zuerst auf den Namen des Jobs.

816d8f59c72797d7.png

Wenn Sie einen Job auswählen, können Sie sich die Ausführungsgrafik ansehen. Darin wird jede Transformation in der Pipeline als Feld dargestellt, das den Transformationsnamen und einige Statusangaben enthält. Falls Sie weitere Details zu einem Schritt sehen möchten, klicken Sie auf das Caret-Zeichen oben rechts.

80a972dd19a6f1eb.png

So wandelt die Pipeline die Daten in den einzelnen Schritten um:

  • Lesen: In diesem Schritt liest die Pipeline aus einer Eingabequelle. In diesem Fall ist es eine Textdatei aus Cloud Storage mit dem gesamten Text des Shakespeare-Stücks King Lear. Unsere Pipeline liest die Datei zeilenweise und gibt für jede Zeile eine PCollection aus. Jede Zeile in unserer Textdatei ist ein Element in der Sammlung.
  • CountWords: Der Schritt CountWords zum Zählen der Wörter besteht aus zwei Teilen. Zuerst wird eine parallele Do-Funktion (ParDo) mit dem Namen ExtractWords verwendet, um jede Zeile in einzelne Wörter zu zerlegen. Die Ausgabe von ExtractWords ist eine neue PCollection, in der jedes Element ein Wort ist. Im nächsten Schritt, Count, wird eine Transformation verwendet, die vom Java SDK bereitgestellt wird und Schlüssel/Wert-Paare zurückgibt, wobei der Schlüssel ein eindeutiges Wort und der Wert die Anzahl der Vorkommen ist. Unten sehen Sie die Methode zur Implementierung von CountWords. Die vollständige Datei „WordCount.java“ finden Sie auf GitHub:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: Hiermit wird die unten kopierte FormatAsTextFn aufgerufen, die jedes Schlüssel/Wert-Paar in einen druckbaren String formatiert.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: In diesem Schritt werden die druckbaren Strings in mehrere partitionierte Textdateien geschrieben.

Wir sehen uns die Ausgabe der Pipeline gleich an.

Sehen Sie sich nun die Seite Jobinfo rechts neben dem Diagramm an. Sie enthält die Pipeline-Parameter, die wir in den mvn compile exec:java-Befehl aufgenommen haben.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Sie sehen dort auch benutzerdefinierte Zähler für die Pipeline. In diesem Fall wird angezeigt, wie viele leere Zeilen bisher erkannt wurden. Sie können Ihrer Pipeline neue Zähler hinzufügen, um anwendungsspezifische Messwerte zu erfassen.

a2e2800e2c6893f8.png

Sie können unten in der Konsole auf das Symbol Logs klicken, um die spezifischen Fehlermeldungen aufzurufen.

23c64138a1027f8.png

Im Bereich werden standardmäßig Job-Lognachrichten angezeigt, die den Status des Jobs insgesamt angeben. Mit der Auswahl „Minimale Wichtigkeit“ können Sie den Jobfortschritt und Statusnachrichten filtern.

94ba42015fdafbe2.png

Durch Auswählen eines Pipelineschritts in der Grafik wird die Ansicht so geändert, dass sie die durch den Code generierten Logs und den generierten Code anzeigt, der in dem Pipelineschritt ausgeführt wird.

Wenn Sie wieder zu den Joblogs wechseln möchten, schließen Sie die Ansicht des Schritts. Klicken Sie dazu außerhalb der Grafik oder verwenden Sie die Schaltfläche „Schließen“ im rechten Fensterbereich.

Über die Schaltfläche Worker-Logs auf dem Tab „Logs“ können Sie Worker-Logs für die Compute Engine-Instanzen aufrufen, die Ihre Pipeline ausführen. Worker-Logs bestehen aus Log-Zeilen, die anhand Ihres Codes und des in Dataflow generierten Codes ausgegeben werden.

Wenn Sie versuchen, einen Fehler in der Pipeline zu beheben, finden Sie häufig zusätzliche Protokolle in den Worker-Logs, die Ihnen bei der Problemlösung helfen. Diese Logs werden für alle Worker zusammengefasst und können gefiltert und durchsucht werden.

5a53c244f28d5478.png

Auf der Dataflow-Überwachungsoberfläche werden nur die neuesten Log-Nachrichten angezeigt. Sie können alle Logs aufrufen, indem Sie rechts im Logbereich auf den Link „Google Cloud Observability“ klicken.

2bc704a4d6529b31.png

Hier finden Sie eine Zusammenfassung der verschiedenen Logtypen, die auf der Seite „Monitoring“ → „Logs“ angezeigt werden können:

  • job-message-Logs enthalten Jobnachrichten, die von verschiedenen Komponenten von Dataflow generiert werden. Beispiele hierfür sind die automatische Skalierungskonfiguration beim Starten oder Herunterfahren von Workern, der Fortschritt des Jobschritts und Jobfehler. Fehler auf Worker-Ebene, die durch einen Absturz des Nutzercodes verursacht wurden und in Worker-Logs vorhanden sind, werden auch in die job-message-Logs übertragen.
  • Worker-Logs werden von Dataflow-Workern erstellt. Worker erledigen die meisten Pipelineaufgaben. Sie wenden z. B. ParDos auf Daten an. Worker-Logs enthalten von Ihrem Code und von Dataflow erfasste Nachrichten.
  • Worker-Startup-Logs sind in den meisten Dataflow-Jobs vorhanden und erfassen mit dem Startvorgang zusammenhängende Nachrichten. Dieser umfasst das Herunterladen der JAR-Dateien eines Jobs aus Cloud Storage und das anschließende Starten der Worker. Bei Problemen mit dem Worker-Start können diese Logs aufschlussreich sein.
  • Shuffler-Logs enthalten Nachrichten von Workern, die die Ergebnisse paralleler Pipelineoperationen konsolidieren.
  • Docker- und Kubelet-Logs enthalten Nachrichten, die mit diesen öffentlichen Technologien in Zusammenhang stehen und für Dataflow-Worker genutzt werden.

Im nächsten Schritt prüfen wir, ob Ihr Job erfolgreich ausgeführt wurde.

7. Erfolgreiche Ausführung des Jobs überprüfen

Öffnen Sie die Cloud Dataflow-Web-UI in der Google Cloud Platform Console.

Ihr Job zum Zählen der Wörter sollte zuerst den Status Wird ausgeführt und dann Erfolgreich haben:

4c408162416d03a2.png

Die Ausführung des Jobs dauert etwa drei bis vier Minuten.

Wir haben zuvor die Pipeline ausgeführt und einen Bucket für die Ausgabe angegeben. Sehen wir uns nun das Ergebnis an, denn Sie möchten bestimmt wissen, wie häufig jedes einzelne Wort in König Lear vorkommt. Kehren Sie in der Google Cloud Console zum Cloud Storage-Browser zurück. In Ihrem Bucket sollten Sie die Ausgabedateien und die Staging-Dateien sehen, die von Ihrem Job erstellt wurden:

25a5d3d4b5d0b567.png

8. Ressourcen herunterfahren

Sie können Ihre Ressourcen über die Google Cloud Console herunterfahren.

Öffnen Sie in der Google Cloud Platform Console den Cloud Storage-Browser.

2b6c3a2a92b47015.png

Klicken Sie auf das Kästchen neben dem von Ihnen erstellten Bucket und dann auf LÖSCHEN, um den Bucket und seinen Inhalt dauerhaft zu löschen.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Glückwunsch!

Sie haben gelernt, wie Sie mit dem Cloud Dataflow SDK ein Maven-Projekt erstellen und mithilfe der Google Cloud Platform Console eine Beispiel-Pipeline ausführen sowie den zugehörigen Cloud Storage-Bucket und dessen Inhalte löschen.

Weitere Informationen

Lizenz

Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.