1. Introduzione
Google Cloud Dataflow
Ultimo aggiornamento: 5-lug-2023
Cos'è Dataflow?
Dataflow è un servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati. La documentazione su questo sito mostra come eseguire il deployment delle pipeline di elaborazione dei dati in modalità flusso e batch utilizzando Dataflow, comprese le indicazioni per l'utilizzo delle funzionalità dei servizi.
L'SDK Apache Beam è un modello di programmazione open source che consente di sviluppare pipeline sia in batch che in flussi. Crei le tue pipeline con un programma Apache Beam e quindi le esegui sul servizio Dataflow. La documentazione di Apache Beam fornisce informazioni concettuali approfondite e materiale di riferimento per il modello di programmazione Apache Beam, gli SDK e altri runner.
Streaming veloce dell'analisi dei dati
Dataflow consente uno sviluppo rapido e semplificato di pipeline di dati in modalità flusso con una latenza dei dati inferiore.
Semplifica operazioni e gestione
Consenti ai team di concentrarsi sulla programmazione anziché sulla gestione dei cluster di server, poiché l'approccio serverless di Dataflow elimina l'overhead operativo dai carichi di lavoro di data engineering.
Ridurre il costo totale di proprietà
Grazie alla scalabilità automatica delle risorse e all'ottimizzazione dei costi per l'elaborazione batch, Dataflow offre una capacità praticamente illimitata per gestire i carichi di lavoro stagionali e picchi senza spendere troppo.
Funzionalità principali
Gestione automatizzata delle risorse e ribilanciamento dinamico del lavoro
Dataflow automatizza il provisioning e la gestione delle risorse di elaborazione per ridurre al minimo la latenza e massimizzare l'utilizzo, in modo da non dover avviare o prenotare le istanze manualmente. Anche il partizionamento del lavoro è automatizzato e ottimizzato per ribilanciare dinamicamente il lavoro in sospeso. Non è necessario andare alla ricerca di "tasti di scelta rapida" o pre-elaborare i dati di input.
Scalabilità automatica orizzontale
La scalabilità automatica orizzontale delle risorse worker per ottimizzare la velocità effettiva si traduce in un migliore rapporto prezzo-prestazioni complessivo.
Prezzi flessibili di pianificazione delle risorse per l'elaborazione batch
Per un'elaborazione flessibile nei tempi di pianificazione dei job, come i job notturni, la pianificazione flessibile delle risorse (FlexRS) offre un prezzo inferiore per l'elaborazione batch. Questi job flessibili vengono inseriti in una coda con la garanzia che verranno recuperati per l'esecuzione entro un lasso di tempo di sei ore.
Che cosa eseguirai in questa fase
L'utilizzo del runner interattivo Apache Beam con i blocchi note JupyterLab consente di sviluppare iterativamente le pipeline, ispezionare il grafico della pipeline e analizzare singole PCollection in un flusso di lavoro read-eval-print-loop (REPL). Questi blocchi note Apache Beam sono resi disponibili tramite Vertex AI Workbench, un servizio gestito che ospita macchine virtuali per blocchi note preinstallate con i più recenti framework di data science e machine learning.
Questo codelab è incentrato sulla funzionalità introdotta dai blocchi note Apache Beam.
Cosa imparerai a fare
- Come creare un'istanza del blocco note
- Creazione di una pipeline di base
- Lettura dei dati da un'origine illimitata
- Visualizzazione dei dati
- Avvio di un job Dataflow dal blocco note
- Salvataggio di un blocco note in corso...
Che cosa ti serve
- Un progetto Google Cloud con la fatturazione abilitata.
- Google Cloud Dataflow e Google Cloud PubSub abilitati.
2. Preparazione
- In Cloud Console, nella pagina del selettore dei progetti, seleziona o crea un progetto Cloud.
Assicurati di aver abilitato le seguenti API:
- API Dataflow
- API Cloud Pub/Sub
- Compute Engine
- API Notebooks
Puoi verificarlo controllando le API e Servizi.
In questa guida leggeremo i dati di una sottoscrizione Pub/Sub, perciò assicurati che l'account di servizio predefinito di Compute Engine abbia il ruolo Editor oppure concedi quest'ultimo al ruolo Editor Pub/Sub.
3. Introduzione ai blocchi note Apache Beam
Avvio di un'istanza di blocchi note Apache Beam
- Avvia Dataflow nella console:
- Seleziona la pagina Workbench utilizzando il menu a sinistra.
- Assicurati di essere nella scheda Blocchi note gestiti dall'utente.
- Fai clic su Nuovo blocco note nella barra degli strumenti.
- Seleziona Apache Beam > Senza GPU.
- Nella pagina Nuovo blocco note, seleziona una subnet per la VM del blocco note e fai clic su Crea.
- Fai clic su Apri JupyterLab quando il collegamento diventa attivo. Vertex AI Workbench crea una nuova istanza di blocco note Apache Beam.
4. crea la pipeline
Creazione di un'istanza di blocco note
Vai a File > Nuovo > Blocco note e seleziona un kernel Apache Beam 2.47 o versioni successive.
Inizia ad aggiungere codice al blocco note
- Copia e incolla il codice da ogni sezione all'interno di una nuova cella nel blocco note
- Esegui la cella
L'utilizzo del runner interattivo Apache Beam con i blocchi note JupyterLab consente di sviluppare iterativamente le pipeline, ispezionare il grafico della pipeline e analizzare singole PCollection in un flusso di lavoro read-eval-print-loop (REPL).
Apache Beam è installato sull'istanza del blocco note, quindi includi i moduli interactive_runner
e interactive_beam
nel blocco note.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
Se il blocco note utilizza altri servizi Google, aggiungi le seguenti istruzioni di importazione:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
Impostare le opzioni di interattività
Di seguito viene impostata la durata dell'acquisizione dei dati su 60 secondi. Se vuoi eseguire l'iterazione più velocemente, imposta una durata inferiore, ad esempio "10 secondi".
ib.options.recording_duration = '60s'
Per ulteriori opzioni interattive, consulta la classe interactive_beam.options.
Inizializza la pipeline utilizzando un oggetto InteractiveRunner
.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
Leggere e visualizzare i dati
L'esempio seguente mostra una pipeline Apache Beam che crea una sottoscrizione all'argomento Pub/Sub specificato e legge i dati dalla sottoscrizione.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
La pipeline conteggia le parole per finestre dall'origine. Crea un windowing fisso con una durata di 10 secondi.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
Quando i dati vengono inseriti in una finestra, le parole vengono conteggiate per finestra.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
Visualizzare i dati
Il metodo show()
visualizza la PCollection risultante nel blocco note.
ib.show(windowed_word_counts, include_window_info=True)
Per mostrare le visualizzazioni dei tuoi dati, passa visualize_data=True
al metodo show()
. Aggiungi una nuova cella:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
Puoi applicare più filtri alle visualizzazioni. La seguente visualizzazione ti consente di filtrare per etichetta e asse:
5. Utilizzo di un DataFrame Pandas
Un'altra visualizzazione utile nei blocchi note Apache Beam è un DataFrame Panda. L'esempio seguente converte prima le parole in lettere minuscole, quindi calcola la frequenza di ogni parola.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
Il metodo collect()
fornisce l'output in un DataFrame Pandas.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (Facoltativo) Avvio dei job Dataflow dal blocco note
- Per eseguire i job su Dataflow, hai bisogno di autorizzazioni aggiuntive. Assicurati che l'account di servizio predefinito di Compute Engine abbia il ruolo Editor oppure concedi i seguenti ruoli IAM:
- Dataflow Admin
- Dataflow Worker
- Amministratore Storage e
- Utente account di servizio (roles/iam.serviceAccountUser)
Scopri di più sui ruoli nella documentazione.
- (Facoltativo) Prima di utilizzare il blocco note per eseguire job Dataflow, riavvia il kernel, esegui nuovamente tutte le celle e verifica l'output.
- Rimuovi le seguenti istruzioni di importazione:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- Aggiungi la seguente istruzione di importazione:
from apache_beam.runners import DataflowRunner
- Rimuovi la seguente opzione relativa alla durata della registrazione:
ib.options.recording_duration = '60s'
- Aggiungi quanto segue alle opzioni della pipeline. Dovrai modificare la località di Cloud Storage in modo che punti a un bucket che già possiedi oppure puoi creare un nuovo bucket a questo scopo. Puoi anche modificare il valore della regione da
us-central1
.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- Nel costruttore di
beam.Pipeline()
, sostituisciInteractiveRunner
conDataflowRunner
.p
è l'oggetto pipeline di creazione della pipeline.
p = beam.Pipeline(DataflowRunner(), options=options)
- Rimuovi le chiamate interattive dal codice. Ad esempio, rimuovi
show()
,collect()
,head()
,show_graph()
ewatch()
dal codice. - Per poter visualizzare i risultati, dovrai aggiungere un sink. Nella sezione precedente abbiamo visualizzato i risultati nel blocco note, ma questa volta stiamo eseguendo il job al di fuori di questo blocco note, in Dataflow. Di conseguenza, abbiamo bisogno di una posizione esterna per i nostri risultati. In questo esempio, scriveremo i risultati in file di testo in GCS (Google Cloud Storage). Poiché si tratta di una pipeline in modalità flusso, con windowing dei dati, vogliamo creare un file di testo per finestra. A questo scopo, aggiungi i seguenti passaggi alla pipeline:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- Aggiungi
p.run()
alla fine del codice della pipeline. - Ora rivedi il codice del blocco note per verificare di aver incorporato tutte le modifiche. Dovrebbe avere il seguente aspetto:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- Esegui le celle.
- Dovresti vedere un output simile al seguente:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- Per verificare se il job è in esecuzione, vai alla pagina Job per Dataflow. Dovresti vedere un nuovo job nell'elenco. L'avvio dell'elaborazione dei dati del job impiegherà circa 5-10 minuti.
- Al termine dell'elaborazione dei dati, vai a Cloud Storage e vai alla directory in cui Dataflow memorizza i risultati (il
output_gcs_location
che hai definito). Dovresti vedere un elenco di file di testo, con un file per finestra. - Scarica il file e controlla i contenuti. Deve contenere l'elenco delle parole abbinate al relativo conteggio. In alternativa, utilizza l'interfaccia a riga di comando per ispezionare i file. Puoi farlo eseguendo il comando seguente in una nuova cella del blocco note:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- Verrà visualizzato un output simile a questo:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- È tutto. Non dimenticare di eseguire la pulizia e di arrestare il job che hai creato (vedi il passaggio finale di questo codelab).
Per un esempio su come eseguire questa conversione in un blocco note interattivo, vedi il blocco note Conteggio parole di Dataflow nella tua istanza del blocco note.
In alternativa, puoi esportare il blocco note come script eseguibile, modificare il file .py generato seguendo i passaggi precedenti, quindi eseguire il deployment della pipeline nel servizio Dataflow.
7. Salvataggio del blocco note in corso...
Notebooks che crei vengono salvati localmente nell'istanza del blocco note in esecuzione. Se reimposti o arresti l'istanza del blocco note durante lo sviluppo, i nuovi blocchi note saranno mantenuti a condizione che vengano creati nella directory /home/jupyter
. Tuttavia, se viene eliminata un'istanza di blocco note, vengono eliminati anche i relativi blocchi note.
Per conservare i blocchi note per l'uso futuro, scaricali localmente sulla workstation, salvali su GitHub o esportali in un formato file diverso.
8. esegui la pulizia
Dopo aver completato l'utilizzo dell'istanza del blocco note Apache Beam, esegui la pulizia delle risorse che hai creato su Google Cloud arrestando l'istanza del blocco note e interrompendo il job di flusso, se ne hai eseguito uno.
In alternativa, se hai creato un progetto ai soli fini di questo codelab, puoi anche chiuderlo completamente.