Pipeline zur Big-Data-Textverarbeitung in Cloud Dataflow ausführen

1. Übersicht

Cloud-Dataflow.png

Was ist Dataflow?

Dataflow ist ein verwalteter Dienst zur Ausführung eines breiten Spektrums an Datenverarbeitungsmustern. Die Dokumentation auf dieser Website zeigt Ihnen, wie Sie Ihre Pipelines für die Batch- und Streaming-Datenverarbeitung mit Dataflow bereitstellen, einschließlich Anweisungen zur Verwendung von Dienstfunktionen.

Das Apache Beam SDK ist ein Open-Source-Programmiermodell, mit dem Sie sowohl Batch- als auch Streamingpipelines entwickeln können. Sie erstellen Ihre Pipelines mit einem Apache Beam-Programm und führen sie dann im Dataflow-Dienst aus. Die Apache Beam-Dokumentation enthält detaillierte konzeptionelle Informationen und Referenzmaterial für das Apache Beam-Programmiermodell, SDKs und andere Runner.

Datenanalysen schnell streamen

Dataflow ermöglicht die schnelle, vereinfachte Entwicklung von Streamingdaten-Pipelines mit geringerer Datenlatenz.

Betrieb und Verwaltung vereinfachen

Dank des serverlosen Ansatzes von Dataflow entfällt der operative Aufwand von Data Engineering-Arbeitslasten, sodass sich Teams auf das Programmieren konzentrieren können, anstatt Servercluster zu verwalten.

Gesamtbetriebskosten senken

Durch das Autoscaling von Ressourcen und eine kostenoptimierte Batchverarbeitung bietet Dataflow praktisch unbegrenzte Kapazitäten für Ihre saisonalen Arbeitslasten und Lastspitzen, ohne dass unnötige Kosten anfallen.

Wichtige Funktionen

Automatisierte Ressourcenverwaltung und dynamischer Arbeitsausgleich

Dataflow automatisiert die Bereitstellung und Verwaltung von Verarbeitungsressourcen, um die Latenz zu minimieren und die Auslastung zu maximieren, sodass Sie Instanzen nicht manuell erstellen oder reservieren müssen. Die Arbeitsaufteilung wird ebenfalls automatisiert und optimiert, sodass Arbeitsverzögerungen dynamisch ausgeglichen werden. Keine Lust, nach den Hotkeys zu suchen oder Ihre Eingabedaten vorverarbeiten.

Horizontales Autoscaling

Durch horizontales Autoscaling von Worker-Ressourcen für einen optimalen Durchsatz wird das gesamte Preis-Leistungs-Verhältnis verbessert.

Flexible Ressourcenplanungspreise für die Batchverarbeitung

Zur flexiblen Verarbeitung bei der zeitlichen Planung von Jobs, z. B. über Nacht, bietet die flexible Ressourcenplanung (FlexRS) einen niedrigeren Preis für die Batchverarbeitung. Diese flexiblen Jobs werden in einer Warteschlange mit der Garantie platziert, dass sie innerhalb von sechs Stunden abgerufen und ausgeführt werden.

Dieses Tutorial ist angepasst von https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

Aufgaben in diesem Lab

  • Maven-Projekt mit Apache Beam und dem Java SDK erstellen
  • Beispiel-Pipeline mithilfe der Google Cloud Platform Console ausführen
  • Zugehörigen Cloud Storage-Bucket und seine Inhalte löschen

Voraussetzungen

Wie möchten Sie diese Anleitung nutzen?

<ph type="x-smartling-placeholder"></ph> Nur bis zum Ende lesen Lies sie dir durch und absolviere die Übungen

Wie würden Sie Ihre Erfahrungen im Umgang mit Google Cloud Platform-Diensten bewerten?

<ph type="x-smartling-placeholder"></ph> Neuling Mittel Kompetent

2. Einrichtung und Anforderungen

Umgebung für das selbstbestimmte Lernen einrichten

  1. Melden Sie sich in der Cloud Console an und erstellen Sie ein neues Projekt oder verwenden Sie ein vorhandenes Projekt. Wenn Sie noch kein Gmail- oder G Suite-Konto haben, müssen Sie ein Konto erstellen.

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Notieren Sie sich die Projekt-ID, also den projektübergreifend nur einmal vorkommenden Namen eines Google Cloud-Projekts. Der oben angegebene Name ist bereits vergeben und kann leider nicht mehr verwendet werden. Sie wird in diesem Codelab später als PROJECT_ID bezeichnet.

  1. Als Nächstes müssen Sie in der Cloud Console die Abrechnung aktivieren, um Google Cloud-Ressourcen nutzen zu können.

Dieses Codelab sollte möglichst wenig kosten. Folgen Sie der Anleitung im Abschnitt „Bereinigen“, . Hier erfahren Sie, wie Sie Ressourcen herunterfahren, damit Ihnen über dieses Tutorial hinaus keine Kosten entstehen. Neue Google Cloud-Nutzer können an einem kostenlosen Testzeitraum mit 300$Guthaben teilnehmen.

APIs aktivieren

Klicken Sie auf das Menüsymbol oben links auf dem Bildschirm.

2bfc27ef9ba2ec7d.png

Wählen Sie APIs und Dienste > Dashboard aus.

5b65523a6cc0afa6.png

Wählen Sie + APIs und Dienste aktivieren aus.

81ed72192c0edd96.png

Suchen Sie nach „Compute Engine“. in das Suchfeld ein. Klicken Sie auf „Compute Engine API“. in der angezeigten Ergebnisliste.

3f201e991c7b4527.png

Klicken Sie auf der Seite "Google Compute Engine" auf Aktivieren.

ac121653277fa7bb.png

Klicken Sie anschließend auf den Pfeil, um zurückzugehen.

Suchen Sie jetzt nach den folgenden APIs und aktivieren Sie sie ebenfalls:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage-JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager-APIs

3. Neuen Cloud Storage-Bucket erstellen

Klicken Sie in der Google Cloud Platform Console oben links auf das Symbol Menü:

2bfc27ef9ba2ec7d.png

Scrollen Sie nach unten und wählen Sie Cloud Storage > Browser im Unterabschnitt Speicher:

2b6c3a2a92b47015.png

Sie sollten jetzt den Cloud Storage-Browser sehen. Wenn Sie ein Projekt verwenden, das derzeit keine Cloud Storage-Buckets hat, wird eine Einladung zum Erstellen eines neuen Buckets angezeigt. Klicken Sie auf die Schaltfläche Bucket erstellen, um einen zu erstellen:

a711016d5a99dc37.png

Geben Sie einen Namen für den Bucket ein. Wie im Dialogfeld vermerkt, müssen Bucket-Namen innerhalb von Cloud Storage eindeutig sein. Wenn Sie also einen offensichtlichen Namen wie "test" wählen, werden Sie wahrscheinlich feststellen, dass eine andere Person bereits einen Bucket mit diesem Namen erstellt hat und Sie eine Fehlermeldung erhalten.

Außerdem gibt es einige Regeln bezüglich der in Bucket-Namen zulässigen Zeichen. Wenn Sie Ihren Bucket-Namen mit einem Buchstaben oder einer Ziffer beginnen und enden und nur Bindestriche in der Mitte verwenden, ist kein Problem. Wenn Sie versuchen, Sonderzeichen zu verwenden oder den Bucket-Namen mit einem anderen Namen als einem Buchstaben oder einer Ziffer zu beginnen oder zu enden, werden Sie im Dialogfeld an die Regeln erinnert.

7a5118648cfe2358.png

Geben Sie einen eindeutigen Namen für den Bucket ein und klicken Sie auf Erstellen. Wenn Sie etwas auswählen, das bereits verwendet wird, sehen Sie die oben angezeigte Fehlermeldung. Wenn Sie einen Bucket erstellt haben, wird Ihr neuer, leerer Bucket im Browser angezeigt:

2bda986ae88c4e71.png

Der Bucket-Name, den Sie sehen, wird natürlich anders sein, da er projektübergreifend eindeutig sein muss.

4. Cloud Shell starten

Cloud Shell aktivieren

  1. Klicken Sie in der Cloud Console auf Cloud Shell aktivieren H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtZWQFrF.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Wenn Sie Cloud Shell zum ersten Mal verwenden, wird ein Zwischenbildschirm (below the fold) angezeigt, in dem beschrieben wird, worum es sich dabei handelt. Klicken Sie in diesem Fall auf Weiter. Der Chat wird nie wieder angezeigt. So sieht dieser einmalige Bildschirm aus:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Die Bereitstellung und Verbindung mit Cloud Shell dauert nur einen Moment.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Diese virtuelle Maschine verfügt über sämtliche Entwicklertools, die Sie benötigen. Es bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und wird in Google Cloud ausgeführt. Dadurch werden die Netzwerkleistung und die Authentifizierung erheblich verbessert. Viele, wenn nicht sogar alle Arbeiten in diesem Codelab können Sie ganz einfach mit einem Browser oder Ihrem Chromebook erledigen.

Sobald Sie mit Cloud Shell verbunden sind, sollten Sie sehen, dass Sie bereits authentifiziert sind und dass das Projekt bereits auf Ihre Projekt-ID eingestellt ist.

  1. Führen Sie in Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:
gcloud auth list

Befehlsausgabe

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Befehlsausgabe

[core]
project = <PROJECT_ID>

Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:

gcloud config set project <PROJECT_ID>

Befehlsausgabe

Updated property [core/project].

5. Maven-Projekt erstellen

Nach dem Start von Cloud Shell können Sie mit dem Java SDK für Apache Beam ein Maven-Projekt erstellen.

Apache Beam ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren diese Pipelines mit einem Apache Beam-Programm und können einen Runner wie Dataflow zum Ausführen Ihrer Pipeline auswählen.

Führen Sie den Befehl mvn archetype:generate in Ihrer Shell so aus:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Nachdem Sie den Befehl ausgeführt haben, sollte unter Ihrem aktuellen Verzeichnis ein neues Verzeichnis namens first-dataflow angezeigt werden. first-dataflow enthält ein Maven-Projekt, das das Cloud Dataflow SDK für Java und Beispielpipelines enthält.

6. Textverarbeitungspipeline in Cloud Dataflow ausführen

Zunächst speichern wir die Projekt-ID und die Namen der Cloud Storage-Buckets als Umgebungsvariablen. Dies ist in Cloud Shell möglich. Achten Sie darauf, <your_project_id> durch Ihre eigene Projekt-ID zu ersetzen.

 export PROJECT_ID=<your_project_id>

Wiederholen Sie den Vorgang für den Cloud Storage-Bucket. Denken Sie daran, <your_bucket_name> durch den eindeutigen Namen zu ersetzen, mit dem Sie in einem früheren Schritt den Bucket erstellt haben.

 export BUCKET_NAME=<your_bucket_name>

Wechseln Sie zum Verzeichnis first-dataflow/.

 cd first-dataflow

Wir führen eine Pipeline mit dem Namen "WordCount" aus, die Text liest, Textzeilen in einzelne Wörter tokenisiert und für jedes dieser Wörter eine Häufigkeitszählung durchführt. Zuerst führen wir die Pipeline aus und sehen uns währenddessen an, was bei jedem Schritt passiert.

Starten Sie die Pipeline, indem Sie den Befehl mvn compile exec:java in Ihrem Shell- oder Terminalfenster ausführen. Bei den Argumenten --project, --stagingLocation, und --output verweist der folgende Befehl auf die Umgebungsvariablen, die Sie zuvor in diesem Schritt eingerichtet haben.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Suchen Sie den Job in der Jobliste, während der Job ausgeführt wird.

Öffnen Sie die Cloud Dataflow-Web-UI in der Google Cloud Platform Console. Für den Job zum Zählen der Wörter sollte nun der Status Wird ausgeführt angezeigt werden:

3623be74922e3209.png

Sehen wir uns nun die Pipelineparameter an. Klicken Sie zuerst auf den Namen des Jobs.

816d8f59c72797d7.png

Wenn Sie einen Job auswählen, können Sie die Ausführungsgrafik aufrufen. Darin wird jede Transformation in der Pipeline als Feld dargestellt, das den Transformationsnamen und einige Statusangaben enthält. Falls Sie weitere Details zu einem Schritt sehen möchten, klicken Sie auf das Caret-Zeichen oben rechts.

80a972dd19a6f1eb.png

So wandelt die Pipeline die Daten in den einzelnen Schritten um:

  • Lesen: In diesem Schritt liest die Pipeline aus einer Eingabequelle. In diesem Fall ist es eine Textdatei aus Cloud Storage mit dem gesamten Text des Shakespeare-Stücks König Lear. Unsere Pipeline liest die Datei Zeile für Zeile und gibt jeweils ein PCollection aus, wobei jede Zeile in unserer Textdatei ein Element in der Sammlung ist.
  • CountWords: Der Schritt CountWords besteht aus zwei Teilen. Zuerst wird eine parallele Do-Funktion (ParDo) mit dem Namen ExtractWords verwendet, um jede Zeile in einzelne Wörter zu tokenisieren. Die Ausgabe von ExtractWords ist eine neue PCollection, in der jedes Element einem Wort entspricht. Im nächsten Schritt, Count, wird eine vom Java SDK bereitgestellte Transformation verwendet, die Schlüssel/Wert-Paare zurückgibt. Der Schlüssel ist dabei ein eindeutiges Wort und der Wert gibt an, wie oft es vorkommt. Hier ist die Methode zur Implementierung von CountWords. Sie können sich auf GitHub die vollständige Datei WordCount.java ansehen:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: Dadurch wird das unten kopierte FormatAsTextFn aufgerufen, das jedes Schlüssel/Wert-Paar als druckbarer String formatiert.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: In diesem Schritt schreiben wir die druckbaren Strings in mehrere fragmentierte Textdateien.

Wir sehen uns die Ausgabe der Pipeline gleich an.

Werfen Sie nun einen Blick auf die Seite Jobinformationen rechts neben der Grafik. Dort finden Sie Pipelineparameter, die im Befehl mvn compile exec:java enthalten sind.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Sie sehen dort auch benutzerdefinierte Zähler für die Pipeline. In diesem Fall wird angezeigt, wie viele leere Zeilen bisher erkannt wurden. Sie können Ihrer Pipeline neue Zähler hinzufügen, um anwendungsspezifische Messwerte zu verfolgen.

a2e2800e2c6893f8.png

Klicken Sie unten in der Konsole auf das Symbol Protokolle, um die spezifischen Fehlermeldungen aufzurufen.

23c64138a1027f8.png

Im Steuerfeld werden standardmäßig Job-Log-Nachrichten angezeigt, die den Status des Jobs insgesamt angeben. Mit der Auswahl „Minimaler Schweregrad“ können Sie den Jobfortschritt und Statusmeldungen filtern.

94ba42015fdafbe2.png

Wenn Sie einen Pipelineschritt in der Grafik auswählen, werden die Logs angezeigt, die von Ihrem Code generiert wurden, und den generierten Code, der in dem Pipelineschritt ausgeführt wird.

Wenn Sie zu den Joblogs zurückkehren möchten, heben Sie die Auswahl des Schritts auf. Klicken Sie dazu außerhalb der Grafik oder verwenden Sie die Schaltfläche „Schließen“ in der rechten Seitenleiste.

Über die Schaltfläche Worker-Logs auf dem Tab "Logs" können Sie Worker-Logs für die Compute Engine-Instanzen aufrufen, auf denen Ihre Pipeline ausgeführt wird. Worker-Logs bestehen aus Log-Zeilen, die anhand Ihres Codes und des in Dataflow generierten Codes ausgegeben werden.

Wenn Sie versuchen, einen Fehler in der Pipeline zu beheben, sind oft zusätzliche Protokolle in den Worker-Logs zur Lösung des Problems erforderlich. Denken Sie daran, dass diese Logs für alle Worker aggregiert werden und gefiltert und durchsucht werden können.

5a53c244f28d5478.png

Die Dataflow Monitoring-Oberfläche zeigt nur die neuesten Lognachrichten an. Sie können alle Logs ansehen, indem Sie rechts im Logbereich auf den Link „Google Cloud Observability“ klicken.

2bc704a4d6529b31.png

Hier sehen Sie eine Zusammenfassung der verschiedenen Logtypen, die auf der Seite „Monitoring → Logs“ angezeigt werden können:

  • job-message-Logs enthalten Jobnachrichten, die von verschiedenen Komponenten von Dataflow generiert werden. Beispiele hierfür sind die Autoscaling-Konfiguration, das Starten oder Herunterfahren von Workern, der Fortschritt im Jobschritt und Jobfehler. Fehler auf Worker-Ebene, die durch einen Absturz von Nutzercode verursacht werden und in Worker-Logs vorhanden sind, werden auch in die job-message-Logs weitergegeben.
  • Worker-Logs werden von Dataflow-Workern erstellt. Worker erledigen die meisten Pipelineaufgaben (sie wenden z. B. Ihre Pardoxs auf Daten an). Worker-Logs enthalten von Ihrem Code und von Dataflow protokollierte Nachrichten.
  • worker-startup-Logs sind in den meisten Dataflow-Jobs vorhanden und erfassen mit dem Startvorgang zusammenhängende Nachrichten. Der Startvorgang umfasst das Herunterladen der JAR-Dateien eines Jobs aus Cloud Storage und das Starten der Worker. Wenn beim Starten von Workern Probleme auftreten, können Sie sich diese Logs ansehen.
  • Shuffler-Logs enthalten Nachrichten von Workern, die die Ergebnisse paralleler Pipelinevorgänge konsolidieren.
  • docker- und Kubelet-Logs enthalten Nachrichten, die mit diesen öffentlichen Technologien in Zusammenhang stehen und auf Dataflow-Workern verwendet werden.

Im nächsten Schritt prüfen wir, ob Ihr Job erfolgreich ausgeführt wurde.

7. Erfolgreiche Ausführung des Jobs überprüfen

Öffnen Sie die Cloud Dataflow-Web-UI in der Google Cloud Platform Console.

Der Job zum Zählen der Wörter sollte zuerst den Status Wird ausgeführt und dann Erfolgreich haben:

4c408162416d03a2.png

Die Ausführung des Jobs dauert etwa drei bis vier Minuten.

Wir haben zuvor die Pipeline ausgeführt und einen Bucket für die Ausgabe angegeben. Sehen wir uns nun das Ergebnis an, denn Sie möchten bestimmt wissen, wie häufig jedes einzelne Wort in König Lear vorkommt. Gehen Sie zurück zum Cloud Storage-Browser in der Google Cloud Platform Console. In Ihrem Bucket sollten Sie die Ausgabedateien und die Staging-Dateien sehen, die von Ihrem Job erstellt wurden:

25a5d3d4b5d0b567.png

8. Ressourcen herunterfahren

Sie können Ihre Ressourcen über die Google Cloud Platform Console herunterfahren.

Öffnen Sie in der Google Cloud Platform Console den Cloud Storage-Browser.

2b6c3a2a92b47015.png

Klicken Sie auf das Kästchen neben dem von Ihnen erstellten Bucket und dann auf LÖSCHEN, um den Bucket und seinen Inhalt endgültig zu löschen.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Glückwunsch!

Sie haben gelernt, wie Sie mit dem Cloud Dataflow SDK ein Maven-Projekt erstellen und mithilfe der Google Cloud Platform Console eine Beispiel-Pipeline ausführen sowie den zugehörigen Cloud Storage-Bucket und dessen Inhalte löschen.

Weitere Informationen

Lizenz

Dieses Werk ist unter einer Creative Commons Attribution 3.0 Generic License und einer Apache 2.0-Lizenz lizenziert.