1. Panoramica

Che cos'è Dataflow?
Dataflow è un servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati. La documentazione di questo sito mostra come eseguire il deployment delle pipeline di elaborazione dei dati batch e di streaming utilizzando Dataflow, incluse le istruzioni per l'utilizzo delle funzionalità del servizio.
L'SDK Apache Beam è un modello di programmazione open source che consente di sviluppare pipeline batch e di streaming. Crea le pipeline con un programma Apache Beam e poi le esegui sul servizio Dataflow. La documentazione di Apache Beam fornisce informazioni concettuali approfondite e materiale di riferimento per il modello di programmazione Apache Beam, gli SDK e altri runner.
Analizza rapidamente i flussi di dati
Dataflow velocizza e semplifica lo sviluppo di pipeline di dati in modalità flusso garantendo una latenza dei dati minore.
Semplifica operazioni e gestione
Puoi consentire ai team di concentrarsi sulla programmazione invece che sulla gestione dei cluster di server grazie all'approccio serverless di Dataflow, che elimina i problemi di sovraccarico operativo dai carichi di lavoro di data engineering.
Ridurre il costo totale di proprietà
Grazie alla scalabilità automatica delle risorse e all'ottimizzazione dei costi per l'elaborazione batch, Dataflow offre una capacità praticamente illimitata per gestire i carichi di lavoro durante i picchi e i periodi di punta stagionali senza spendere troppo.
Funzionalità principali
Gestione automatizzata delle risorse e ridistribuzione dinamica del lavoro
Dataflow automatizza il provisioning e la gestione delle risorse di elaborazione per ridurre al minimo i tempi di latenza e ottimizzare l'utilizzo, evitando la necessità di avviare o prenotare le istanze manualmente. Anche il partizionamento del lavoro è automatizzato e ottimizzato per ridistribuire dinamicamente il lavoro in sospeso. Non è necessario andare alla ricerca di tasti di scelta rapida o pre-elaborare i dati di input.
Scalabilità automatica orizzontale
La scalabilità automatica orizzontale delle risorse worker per ottimizzare la velocità effettiva si traduce in un migliore rapporto prezzo-prestazioni complessivo.
Prezzi flessibili di pianificazione delle risorse per l'elaborazione batch
Per consentire un'elaborazione flessibile nei tempi di pianificazione dei job, come i job notturni, la pianificazione flessibile delle risorse (FlexRS) offre prezzi inferiori per l'elaborazione batch. Questi job flessibili sono inseriti in una coda con la garanzia che verranno recuperati per l'esecuzione entro un lasso di tempo di sei ore.
Questo tutorial è adattato da https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
Cosa imparerai a fare
- Come creare un progetto Maven con Apache Beam utilizzando l'SDK Java
- Esegui una pipeline di esempio utilizzando la console Google Cloud
- Come eliminare il bucket Cloud Storage associato e i relativi contenuti
Che cosa ti serve
Come utilizzerai questo tutorial?
Come valuti la tua esperienza di utilizzo dei servizi Google Cloud Platform?
2. Configurazione e requisiti
Configurazione dell'ambiente autonomo
- Accedi alla console Cloud e crea un nuovo progetto o riutilizzane uno esistente. Se non hai già un account Gmail o G Suite, devi crearne uno.
Ricorda l'ID progetto, un nome univoco tra tutti i progetti Google Cloud (il nome sopra è già stato utilizzato e non funzionerà per te, mi dispiace). In questo codelab verrà chiamato PROJECT_ID.
- Successivamente, dovrai abilitare la fatturazione in Cloud Console per utilizzare le risorse Google Cloud.
L'esecuzione di questo codelab non dovrebbe costare molto, se non nulla. Assicurati di seguire le istruzioni riportate nella sezione "Pulizia", che ti consiglia come arrestare le risorse in modo da non incorrere in addebiti oltre questo tutorial. I nuovi utenti di Google Cloud possono beneficiare del programma prova senza costi di 300$.
Abilita le API
Fai clic sull'icona del menu nella parte superiore sinistra dello schermo.

Seleziona API e servizi > Dashboard dal menu a discesa.

Seleziona + Abilita API e servizi.

Cerca "Compute Engine" nella casella di ricerca. Fai clic su "API Compute Engine" nell'elenco dei risultati visualizzato.

Nella pagina di Google Compute Engine, fai clic su Attiva.

Una volta attivata, fai clic sulla freccia per tornare indietro.
Ora cerca le seguenti API e abilita anche queste:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- API Cloud Resource Manager
3. Crea un nuovo bucket Cloud Storage
Nella console di Google Cloud Platform, fai clic sull'icona Menu in alto a sinistra dello schermo:

Scorri verso il basso e seleziona Cloud Storage > Browser nella sezione secondaria Storage:

Ora dovresti vedere il browser Cloud Storage e, se utilizzi un progetto che attualmente non ha bucket Cloud Storage, vedrai un invito a creare un nuovo bucket. Premi il pulsante Crea bucket per crearne uno:

Inserisci un nome per il bucket. Come indicato nella finestra di dialogo, i nomi dei bucket devono essere univoci in tutto Cloud Storage. Pertanto, se scegli un nome ovvio, ad esempio "test", probabilmente scoprirai che qualcun altro ha già creato un bucket con quel nome e riceverai un errore.
Esistono anche alcune regole relative ai caratteri consentiti nei nomi dei bucket. Se il nome del bucket inizia e termina con una lettera o un numero e contiene solo trattini al centro, non avrai problemi. Se tenti di utilizzare caratteri speciali o di iniziare o terminare il nome del bucket con un carattere diverso da una lettera o un numero, la finestra di dialogo ti ricorderà le regole.

Inserisci un nome univoco per il bucket e premi Crea. Se scegli un nome già in uso, verrà visualizzato il messaggio di errore mostrato sopra. Una volta creato correttamente un bucket, si aprirà il nuovo bucket vuoto nel browser:

Il nome del bucket che vedi sarà ovviamente diverso, poiché deve essere univoco in tutti i progetti.
4. Avvia Cloud Shell
Attiva Cloud Shell
- Nella console Cloud, fai clic su Attiva Cloud Shell
.
Se non hai mai avviato Cloud Shell, viene visualizzata una schermata intermedia (sotto la piega) che ne descrive le funzionalità. In questo caso, fai clic su Continua e non comparirà più. Ecco come si presenta la schermata intermedia:
Bastano pochi istanti per eseguire il provisioning e connettersi a Cloud Shell.
Questa macchina virtuale è caricata con tutti gli strumenti per sviluppatori di cui avrai bisogno. Offre una home directory permanente da 5 GB e viene eseguita in Google Cloud, migliorando notevolmente le prestazioni e l'autenticazione della rete. Gran parte del lavoro per questo codelab, se non tutto, può essere svolto semplicemente con un browser o con Chromebook.
Una volta eseguita la connessione a Cloud Shell, dovresti vedere che il tuo account è già autenticato e il progetto è già impostato sul tuo ID progetto.
- Esegui questo comando in Cloud Shell per verificare che l'account sia autenticato:
gcloud auth list
Output comando
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
Output comando
[core] project = <PROJECT_ID>
In caso contrario, puoi impostarlo con questo comando:
gcloud config set project <PROJECT_ID>
Output comando
Updated property [core/project].
5. Crea un progetto Maven
Dopo l'avvio di Cloud Shell, iniziamo creando un progetto Maven utilizzando l'SDK Java per Apache Beam.
Apache Beam è un modello di programmazione open source per pipeline di dati. Definisci queste pipeline con un programma Apache Beam e puoi scegliere un runner, ad esempio Dataflow, per eseguire la pipeline.
Esegui il comando mvn archetype:generate nella shell nel seguente modo:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Dopo aver eseguito il comando, dovresti visualizzare una nuova directory denominata first-dataflow nella directory corrente. first-dataflow contiene un progetto Maven che include l'SDK Cloud Dataflow per Java e pipeline di esempio.
6. Esegui una pipeline di elaborazione del testo su Cloud Dataflow
Iniziamo salvando l'ID progetto e i nomi dei bucket Cloud Storage come variabili di ambiente. Puoi farlo in Cloud Shell. Assicurati di sostituire <your_project_id> con l'ID del tuo progetto.
export PROJECT_ID=<your_project_id>
Ora faremo lo stesso per il bucket Cloud Storage. Ricorda di sostituire <your_bucket_name> con il nome univoco che hai utilizzato per creare il bucket in un passaggio precedente.
export BUCKET_NAME=<your_bucket_name>
Passa alla directory first-dataflow/.
cd first-dataflow
Eseguiremo una pipeline chiamata WordCount, che legge il testo, tokenizza le righe di testo in parole singole ed esegue un conteggio della frequenza per ciascuna parola. Innanzitutto, eseguiamo la pipeline e, mentre è in esecuzione, diamo un'occhiata a cosa succede in ogni passaggio.
Avvia la pipeline eseguendo il comando mvn compile exec:java nella finestra della shell o del terminale. Per gli argomenti --project, --stagingLocation, e --output, il comando riportato di seguito fa riferimento alle variabili di ambiente che hai configurato in precedenza in questo passaggio.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Mentre il job è in esecuzione, cerchiamolo nell'elenco dei job.
Apri la UI web di Cloud Dataflow nella console Google Cloud. Dovresti vedere il tuo job wordcount con lo stato In esecuzione:

Ora esaminiamo i parametri della pipeline. Inizia facendo clic sul nome del job:

Quando selezioni un job, puoi visualizzare il grafico di esecuzione. Il grafico di esecuzione di una pipeline rappresenta ogni trasformazione nella pipeline come una casella che contiene il nome della trasformazione e alcune informazioni sullo stato. Puoi fare clic sul simbolo a V nell'angolo in alto a destra di ogni passaggio per visualizzare ulteriori dettagli:

Vediamo come la pipeline trasforma i dati in ogni passaggio:
- Lettura: in questo passaggio, la pipeline legge da un'origine di input. In questo caso, si tratta di un file di testo di Cloud Storage con l'intero testo dell'opera teatrale di Shakespeare King Lear. La nostra pipeline legge il file riga per riga e restituisce un
PCollection, in cui ogni riga del file di testo è un elemento della raccolta. - CountWords: il passaggio
CountWordsè composto da due parti. Innanzitutto, utilizza una funzione do parallela (ParDo) denominataExtractWordsper tokenizzare ogni riga in singole parole. L'output di ExtractWords è un nuovo PCollection in cui ogni elemento è una parola. Il passaggio successivo,Count, utilizza una trasformazione fornita dall'SDK Java che restituisce coppie chiave-valore in cui la chiave è una parola univoca e il valore è il numero di volte in cui si verifica. Ecco il metodo che implementaCountWords. Puoi consultare il file WordCount.java completo su GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: richiama
FormatAsTextFn, copiato di seguito, che formatta ogni coppia chiave-valore in una stringa stampabile.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: in questo passaggio scriviamo le stringhe stampabili in più file di testo suddivisi in shard.
Esamineremo l'output risultante della pipeline tra qualche minuto.
Ora dai un'occhiata alla pagina Informazioni sul job a destra del grafico, che include i parametri della pipeline che abbiamo incluso nel comando mvn compile exec:java.


Puoi anche visualizzare i contatori personalizzati per la pipeline, che in questo caso mostrano quante righe vuote sono state rilevate finora durante l'esecuzione. Puoi aggiungere nuovi contatori alla pipeline per monitorare le metriche specifiche dell'applicazione.

Puoi fare clic sull'icona Log nella parte inferiore della console per visualizzare i messaggi di errore specifici.

Per impostazione predefinita, il riquadro mostra i messaggi del log dei job che segnalano lo stato del job nel suo complesso. Puoi utilizzare il selettore Gravità minima per filtrare i messaggi sullo stato e sull'avanzamento del job.

Se selezioni un passaggio della pipeline nel grafico, la visualizzazione cambia e mostra i log generati dal codice e il codice generato in esecuzione nel passaggio della pipeline.
Per tornare a Registri attività, deseleziona il passaggio facendo clic all'esterno del grafico o utilizzando il pulsante Chiudi nel riquadro a destra.
Puoi utilizzare il pulsante Log worker nella scheda Log per visualizzare i log worker per le istanze di Compute Engine che eseguono la pipeline. I log dei worker sono costituiti da righe di log generate dal tuo codice e dal codice generato da Dataflow che lo esegue.
Se stai tentando di eseguire il debug di un errore nella pipeline, spesso nei log del worker sono presenti log aggiuntivi che aiutano a risolvere il problema. Tieni presente che questi log vengono aggregati in tutti i worker e possono essere filtrati e cercati.

L'interfaccia di monitoraggio di Dataflow mostra solo i messaggi di log più recenti. Puoi visualizzare tutti i log facendo clic sul link Google Cloud Observability sul lato destro del riquadro dei log.

Di seguito è riportato un riepilogo dei diversi tipi di log disponibili per la visualizzazione dalla pagina Monitoraggio→Log:
- I log job-message contengono messaggi a livello di job generati da vari componenti di Dataflow. Gli esempi includono la configurazione della scalabilità automatica, l'avvio o l'arresto dei worker, l'avanzamento del passaggio del job e gli errori del job. Gli errori a livello di worker che hanno origine dal codice utente in arresto anomalo e che sono presenti nei log worker vengono propagati anche ai log job-message.
- I log worker vengono generati dai worker Dataflow. I worker svolgono la maggior parte del lavoro della pipeline (ad esempio, applicano i ParDo ai dati). I log worker contengono i messaggi registrati dal codice e da Dataflow.
- I log worker-startup sono presenti nella maggior parte dei job Dataflow e possono acquisire messaggi relativi al processo di avvio. Il processo di avvio include il download dei file JAR di un job da Cloud Storage, quindi l'avvio dei worker. Se si verifica un problema durante l'avvio dei worker, questi log sono un buon punto di partenza.
- I log di shuffler contengono messaggi dei worker che consolidano i risultati delle operazioni della pipeline parallela.
- I log di docker e kubelet contengono messaggi relativi a queste tecnologie pubbliche, utilizzate sui worker Dataflow.
Nel passaggio successivo, verificheremo che il job sia stato eseguito correttamente.
7. verifica l'esito del job
Apri la UI web di Cloud Dataflow nella console Google Cloud.
Inizialmente dovresti vedere il tuo job wordcount con stato In esecuzione, poi Riuscito:

L'esecuzione del job richiederà circa 3-4 minuti.
Ricordi quando hai eseguito la pipeline e hai specificato un bucket di output? Diamo un'occhiata al risultato (perché non vuoi vedere quante volte compare ogni parola in Re Lear?). Torna al browser Cloud Storage nella console Google Cloud. Nel bucket dovresti vedere i file di output e di staging creati dal job:

8. Chiudere le risorse
Puoi arrestare le risorse dalla console di Google Cloud Platform.
Apri il browser di Cloud Storage nella console Google Cloud.

Seleziona la casella di controllo accanto al bucket che hai creato e fai clic su ELIMINA per eliminare definitivamente il bucket e i relativi contenuti.


9. Complimenti!
Hai imparato a creare un progetto Maven con Cloud Dataflow SDK, a eseguire una pipeline di esempio utilizzando la console Google Cloud e a eliminare il bucket Cloud Storage associato e i relativi contenuti.
Scopri di più
- Documentazione di Dataflow: https://cloud.google.com/dataflow/docs/
Licenza
Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 3.0 Generic e di una licenza Apache 2.0.