Esegui una pipeline di elaborazione di testo di big data in Cloud Dataflow

1. Panoramica

Cloud-Dataflow.png

Cos'è Dataflow?

Dataflow è un servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati. La documentazione su questo sito mostra come eseguire il deployment delle pipeline di elaborazione dei dati in modalità flusso e batch utilizzando Dataflow, comprese le indicazioni per l'utilizzo delle funzionalità dei servizi.

L'SDK Apache Beam è un modello di programmazione open source che consente di sviluppare pipeline sia in batch che in flussi. Crei le tue pipeline con un programma Apache Beam e quindi le esegui sul servizio Dataflow. La documentazione di Apache Beam fornisce informazioni concettuali approfondite e materiale di riferimento per il modello di programmazione Apache Beam, gli SDK e altri runner.

Streaming veloce dell'analisi dei dati

Dataflow consente uno sviluppo rapido e semplificato di pipeline di dati in modalità flusso con una latenza dei dati inferiore.

Semplifica operazioni e gestione

Consenti ai team di concentrarsi sulla programmazione anziché sulla gestione dei cluster di server, poiché l'approccio serverless di Dataflow elimina l'overhead operativo dai carichi di lavoro di data engineering.

Ridurre il costo totale di proprietà

Grazie alla scalabilità automatica delle risorse e all'ottimizzazione dei costi per l'elaborazione batch, Dataflow offre una capacità praticamente illimitata per gestire i carichi di lavoro stagionali e picchi senza spendere troppo.

Funzionalità principali

Gestione automatizzata delle risorse e ribilanciamento dinamico del lavoro

Dataflow automatizza il provisioning e la gestione delle risorse di elaborazione per ridurre al minimo la latenza e massimizzare l'utilizzo, in modo da non dover avviare o prenotare le istanze manualmente. Anche il partizionamento del lavoro è automatizzato e ottimizzato per ribilanciare dinamicamente il lavoro in sospeso. Non è necessario andare alla ricerca di "tasti di scelta rapida" o pre-elaborare i dati di input.

Scalabilità automatica orizzontale

La scalabilità automatica orizzontale delle risorse worker per ottimizzare la velocità effettiva si traduce in un migliore rapporto prezzo-prestazioni complessivo.

Prezzi flessibili di pianificazione delle risorse per l'elaborazione batch

Per un'elaborazione flessibile nei tempi di pianificazione dei job, come i job notturni, la pianificazione flessibile delle risorse (FlexRS) offre un prezzo inferiore per l'elaborazione batch. Questi job flessibili vengono inseriti in una coda con la garanzia che verranno recuperati per l'esecuzione entro un lasso di tempo di sei ore.

Questo tutorial è adattato da https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

Cosa imparerai a fare

  • Creare un progetto Maven con Apache Beam utilizzando l'SDK Java
  • Esegui una pipeline di esempio utilizzando la console di Google Cloud Platform
  • Come eliminare il bucket Cloud Storage associato e i relativi contenuti

Che cosa ti serve

Come utilizzerai questo tutorial?

Solo lettura Leggilo e completa gli esercizi

Come giudichi la tua esperienza di utilizzo dei servizi della piattaforma Google Cloud?

Principiante Livello intermedio Eccellente

2. Configurazione e requisiti

Configurazione dell'ambiente da seguire in modo autonomo

  1. Accedi alla console Cloud e crea un nuovo progetto o riutilizzane uno esistente. Se non hai ancora un account Gmail o G Suite, devi crearne uno.

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Ricorda l'ID progetto, un nome univoco in tutti i progetti Google Cloud (il nome precedente è già stato utilizzato e non funzionerà correttamente). Verrà indicato più avanti in questo codelab come PROJECT_ID.

  1. Successivamente, dovrai abilitare la fatturazione in Cloud Console per utilizzare le risorse Google Cloud.

Eseguire questo codelab non dovrebbe costare molto. Assicurati di seguire le istruzioni nella sezione "Pulizia" in cui viene spiegato come arrestare le risorse in modo da non incorrere in fatturazione oltre questo tutorial. I nuovi utenti di Google Cloud sono idonei al programma prova senza costi di 300$.

Abilita le API

Fai clic sull'icona del menu nella parte superiore sinistra dello schermo.

2bfc27ef9ba2ec7d.png

Seleziona API e Servizi > Dashboard dal menu a discesa.

5b65523a6cc0afa6.png

Seleziona + Abilita API e servizi.

81ed72192c0edd96.png

Cerca "Compute Engine" nella casella di ricerca. Fai clic su "API Compute Engine" nell'elenco dei risultati visualizzato.

3f201e991c7b4527.png

Nella pagina Google Compute Engine, fai clic su Abilita.

ac121653277fa7bb.png

Una volta attivato, fai clic sulla freccia per tornare indietro.

Ora cerca le API riportate di seguito e abilitale anche:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • JSON di Cloud Storage
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • API Cloud Resource Manager

3. Crea un nuovo bucket Cloud Storage

Nella console di Google Cloud Platform, fai clic sull'icona Menu in alto a sinistra dello schermo:

2bfc27ef9ba2ec7d.png

Scorri verso il basso e seleziona Cloud Storage > Browser nella sottosezione Spazio di archiviazione:

2b6c3a2a92b47015.png

A questo punto dovresti vedere il browser Cloud Storage e, se utilizzi un progetto che al momento non include bucket Cloud Storage, vedrai un invito a creare un nuovo bucket. Premi il pulsante Crea bucket per crearne uno:

a711016d5a99dc37.png

Inserisci un nome per il bucket. Come indicato nella finestra di dialogo, i nomi dei bucket devono essere univoci in tutto Cloud Storage. Quindi, se scegli un nome ovvio, come "test", probabilmente scoprirai che qualcun altro ha già creato un bucket con quel nome e riceverà un errore.

Esistono anche alcune regole relative ai caratteri consentiti nei nomi dei bucket. Se il nome del bucket inizia e termina con una lettera o un numero e utilizza solo i trattini al centro, non è un problema. Se provi a utilizzare caratteri speciali o a iniziare o terminare il nome del bucket con qualcosa di diverso da una lettera o un numero, la finestra di dialogo ti ricorderà delle regole.

3a5458648cfe3358.png

Inserisci un nome univoco per il bucket e premi Crea. Se scegli un elemento già in uso, verrà visualizzato il messaggio di errore sopra riportato. Una volta creato un bucket, si aprirà il nuovo bucket vuoto nel browser:

3bda986ae88c4e71.png

Il nome del bucket che vedi sarà, ovviamente, diverso, dato che deve essere univoco in tutti i progetti.

4. Avvia Cloud Shell

Attiva Cloud Shell

  1. Dalla console Cloud, fai clic su Attiva Cloud Shell H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LXZ.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Se non hai mai avviato Cloud Shell, verrà visualizzata una schermata intermedia (below the fold) in cui viene descritto di cosa si tratta. In tal caso, fai clic su Continua (e non la vedrai più). Ecco come appare quella singola schermata:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Il provisioning e la connessione a Cloud Shell dovrebbero richiedere solo qualche istante.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Questa macchina virtuale viene caricata con tutti gli strumenti di sviluppo necessari. Offre una home directory permanente da 5 GB e viene eseguita in Google Cloud, migliorando notevolmente le prestazioni di rete e l'autenticazione. Gran parte, se non tutto, del lavoro in questo codelab può essere svolto semplicemente con un browser o Chromebook.

Una volta eseguita la connessione a Cloud Shell, dovresti vedere che il tuo account è già autenticato e il progetto è già impostato sul tuo ID progetto.

  1. Esegui questo comando in Cloud Shell per verificare che l'account sia autenticato:
gcloud auth list

Output comando

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Output comando

[core]
project = <PROJECT_ID>

In caso contrario, puoi impostarlo con questo comando:

gcloud config set project <PROJECT_ID>

Output comando

Updated property [core/project].

5. Crea un progetto Maven

Dopo l'avvio di Cloud Shell, iniziamo creando un progetto Maven utilizzando l'SDK Java per Apache Beam.

Apache Beam è un modello di programmazione open source per le pipeline di dati. Definisci queste pipeline con un programma Apache Beam e puoi scegliere un runner, come Dataflow, per eseguire la pipeline.

Esegui il comando mvn archetype:generate nella shell in questo modo:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Dopo aver eseguito il comando, dovresti vedere una nuova directory denominata first-dataflow sotto la directory attuale. first-dataflow contiene un progetto Maven che include l'SDK Cloud Dataflow per Java e pipeline di esempio.

6. Esegui una pipeline di elaborazione testo su Cloud Dataflow

Iniziamo salvando l'ID progetto e i nomi dei bucket Cloud Storage come variabili di ambiente. Puoi farlo in Cloud Shell. Assicurati di sostituire <your_project_id> con il tuo ID progetto.

 export PROJECT_ID=<your_project_id>

Ora faremo lo stesso per il bucket Cloud Storage. Ricorda di sostituire <your_bucket_name> con il nome univoco che hai utilizzato per creare il bucket in un passaggio precedente.

 export BUCKET_NAME=<your_bucket_name>

Passa alla directory first-dataflow/.

 cd first-dataflow

Eseguiremo una pipeline chiamata WordCount, che legge il testo, tokenizza le righe di testo in singole parole ed esegue un conteggio della frequenza su ciascuna di queste parole. Per prima cosa eseguiremo la pipeline, poi esamineremo cosa succede in ogni passaggio.

Avvia la pipeline eseguendo il comando mvn compile exec:java nella shell o nella finestra del terminale. Per gli argomenti --project, --stagingLocation, e --output, il comando seguente fa riferimento alle variabili di ambiente che hai configurato in precedenza in questo passaggio.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Mentre il job è in esecuzione, lo cerchiamo nell'elenco dei job.

Apri l'interfaccia utente web di Cloud Dataflow nella console della piattaforma Google Cloud. Dovresti vedere il job di conteggio parole con lo stato In esecuzione:

3623be74922e3209.png

Ora diamo un'occhiata ai parametri della pipeline. Inizia facendo clic sul nome del tuo job:

816d8f59c72797d7.png

Quando selezioni un job, puoi visualizzare il grafico di esecuzione. Il grafico di esecuzione di una pipeline rappresenta ogni trasformazione nella pipeline come una casella che contiene il nome della trasformazione e alcune informazioni sullo stato. Puoi fare clic sul carato nell'angolo in alto a destra di ogni passaggio per visualizzare ulteriori dettagli:

80a972dd19a6f1eb.png

Vediamo in che modo la pipeline trasforma i dati in ogni passaggio:

  • Lettura: in questo passaggio, la pipeline legge da un'origine di input. In questo caso, si tratta di un file di testo di Cloud Storage con il testo completo dell'opera di Shakespeare King Lear. La nostra pipeline legge il file riga per riga e restituisce a ciascuno un valore PCollection, dove ogni riga del file di testo è un elemento della raccolta.
  • CountWords: il passaggio CountWords è composto da due parti. In primo luogo, utilizza una funzione parallela di obbligo (ParDo) denominata ExtractWords per tokenizzare ogni riga in singole parole. L'output di ExtractWords è una nuova PCollection in cui ogni elemento è una parola. Il passaggio successivo, Count, utilizza una trasformazione fornita dall'SDK Java che restituisce chiavi e coppie di valori in cui la chiave è una parola univoca e il valore è il numero di volte in cui si verifica. Ecco il metodo di implementazione di CountWords. Puoi controllare il file WordCount.java completo su GitHub:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: richiama il FormatAsTextFn, copiato di seguito, che formatta ogni coppia chiave-valore in una stringa stampabile.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: in questo passaggio scriviamo le stringhe stampabili in più file di testo con sharding.

Tra qualche minuto esamineremo l'output risultante dalla pipeline.

Ora dai un'occhiata alla pagina Informazioni job a destra del grafico, che include i parametri della pipeline che abbiamo incluso nel comando mvn compile exec:java.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Puoi anche vedere i contatori personalizzati per la pipeline, che in questo caso mostrano quante righe vuote sono state riscontrate finora durante l'esecuzione. Puoi aggiungere nuovi contatori alla pipeline per tenere traccia delle metriche specifiche dell'applicazione.

a2e2800e2c6893f8.png

Puoi fare clic sull'icona Log nella parte inferiore della console per visualizzare i messaggi di errore specifici.

23c64138a1027f8.png

Per impostazione predefinita, il riquadro mostra i messaggi di log dei job che segnalano lo stato del job nel suo complesso. Puoi utilizzare il selettore di gravità minima per filtrare i messaggi di stato e di avanzamento del job.

94ba42015fdafbe2.png

Se selezioni un passaggio della pipeline nel grafico, la visualizzazione mostrerà i log generati dal codice e il codice generato in esecuzione nel passaggio della pipeline.

Per tornare a Log dei job, deseleziona il passaggio facendo clic all'esterno del grafico o utilizzando il pulsante Chiudi nel riquadro laterale a destra.

Puoi utilizzare il pulsante Log dei worker nella scheda Log per visualizzare i log dei worker per le istanze di Compute Engine che eseguono la pipeline. I log dei worker sono costituiti da righe di log generate dal codice e dal codice generato da Dataflow che lo esegue.

Se stai tentando di eseguire il debug di un errore nella pipeline, spesso nei log dei worker sarà presente un ulteriore logging che aiuta a risolvere il problema. Tieni presente che questi log sono aggregati per tutti i worker e possono essere filtrati e cercati.

5a53c244f28d5478.png

L'interfaccia di monitoraggio di Dataflow mostra solo i messaggi di log più recenti. Puoi visualizzare tutti i log facendo clic sul link Google Cloud Observability sul lato destro del riquadro dei log.

2bc704a4d6529b31.png

Ecco un riepilogo dei diversi tipi di log disponibili per la visualizzazione nella pagina Monitoraggio→Log:

  • I log job-message contengono messaggi a livello di job generati da vari componenti di Dataflow. Alcuni esempi sono la configurazione della scalabilità automatica, l'avvio o l'arresto dei worker, l'avanzamento del passaggio del job ed errori del job. Gli errori a livello di worker, che hanno origine dall'arresto anomalo del codice utente e che sono presenti nei log worker, si propagano anche ai log di job-message.
  • I log worker vengono prodotti dai worker Dataflow. I worker svolgono la maggior parte del lavoro della pipeline (ad esempio, applicando i ParDos ai dati). I log dei lavoratori contengono i messaggi registrati dal tuo codice e da Dataflow.
  • I log worker-startup sono presenti nella maggior parte dei job Dataflow e possono acquisire messaggi relativi al processo di avvio. Il processo di avvio include il download dei jar di un job da Cloud Storage, quindi l'avvio dei worker. Se si verifica un problema all'avvio dei worker, è consigliabile consultare questi log.
  • I log shuffler contengono messaggi dei worker che consolidano i risultati delle operazioni della pipeline parallela.
  • I log docker e kubelet contengono messaggi relativi a queste tecnologie pubbliche, che vengono utilizzate sui worker Dataflow.

Nel passaggio successivo verificheremo che il job sia andato a buon fine.

7. verifica l'esito del job

Apri l'interfaccia utente web di Cloud Dataflow nella console della piattaforma Google Cloud.

Inizialmente dovresti vedere il job di conteggio parole con stato In esecuzione, quindi Riuscito:

4c408162416d03a2.png

L'esecuzione del job richiederà circa 3-4 minuti.

Ricordi quando hai eseguito la pipeline e specificato un bucket di output? Diamo un'occhiata al risultato (perché non vuoi vedere quante volte si è verificata ogni parola di King Lear?!). Torna al browser Cloud Storage nella console di Google Cloud Platform. Nel bucket dovresti vedere i file di output e di gestione temporanea creati dal job:

25a5d3d4b5d0b567.png

8. Arresta le risorse

Puoi arrestare le risorse dalla console di Google Cloud Platform.

Apri il browser Cloud Storage nella console di Google Cloud Platform.

2b6c3a2a92b47015.png

Seleziona la casella di controllo accanto al bucket che hai creato e fai clic su ELIMINA per eliminare definitivamente il bucket e i relativi contenuti.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Complimenti!

Hai imparato a creare un progetto Maven con l'SDK di Cloud Dataflow, a eseguire una pipeline di esempio utilizzando la console di Google Cloud ed eliminare il bucket Cloud Storage associato e i relativi contenuti.

Scopri di più

Licenza

Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic, e Apache 2.0.