1. Panoramica - Google Dataproc
Dataproc è un servizio completamente gestito e a scalabilità elevata per l'esecuzione di Apache Spark, Apache Flink, Presto e molti altri strumenti e framework open source. Utilizza Dataproc per la modernizzazione dei data lake, l'ETL / ELT e la data science sicura su scala planetaria. Dataproc è anche completamente integrato con diversi servizi Google Cloud, tra cui BigQuery, Cloud Storage, Vertex AI e Dataplex.
Dataproc è disponibile in tre versioni:
- Dataproc Serverless consente di eseguire job PySpark senza dover configurare l'infrastruttura e la scalabilità automatica. Dataproc Serverless supporta le sessioni e i carichi di lavoro batch PySpark (notebook).
- Dataproc su Google Compute Engine consente di gestire un cluster Hadoop YARN per carichi di lavoro Spark basati su YARN, oltre a strumenti open source come Flink e Presto. Puoi personalizzare i cluster basati su cloud con la scalabilità verticale o orizzontale che preferisci, inclusa la scalabilità automatica.
- Dataproc su Google Kubernetes Engine consente di configurare cluster virtuali Dataproc nella tua infrastruttura GKE per l'invio di job Spark, PySpark, SparkR o Spark SQL.
In questo codelab imparerai diversi modi per utilizzare Dataproc Serverless.
Apache Spark è stato creato in origine per essere eseguito su cluster Hadoop e ha utilizzato YARN come gestore di risorse. La manutenzione dei cluster Hadoop richiede un insieme specifico di competenze e la configurazione corretta di molti parametri diversi sui cluster. Questo si aggiunge a un insieme separato di parametri che anche Spark richiede all'utente di impostare. Ciò porta a molti scenari in cui gli sviluppatori dedicano più tempo alla configurazione dell'infrastruttura anziché a lavorare sul codice Spark stesso.
Dataproc Serverless elimina la necessità di configurare manualmente i cluster Hadoop o Spark. Dataproc Serverless non viene eseguito su Hadoop e utilizza la propria allocazione dinamica delle risorse per determinare i requisiti delle risorse, inclusa la scalabilità automatica. Un piccolo sottoinsieme di proprietà Spark è ancora personalizzabile con Dataproc Serverless, ma nella maggior parte dei casi non sarà necessario modificarle.
2. Configura
Inizia configurando l'ambiente e le risorse utilizzate in questo codelab.
Crea un progetto Google Cloud. Puoi utilizzarne uno esistente.
Apri Cloud Shell facendo clic sull'icona nella barra degli strumenti della console Cloud.

Cloud Shell fornisce un ambiente shell pronto all'uso che puoi utilizzare per questo codelab.

Cloud Shell imposta il nome del progetto per impostazione predefinita. Verifica eseguendo echo $GOOGLE_CLOUD_PROJECT. Se non vedi l'ID progetto nell'output, impostalo.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Imposta una regione di Compute Engine per le tue risorse, ad esempio us-central1 o europe-west2.
export REGION=<your-region>
Abilita API
Il codelab utilizza le seguenti API:
- BigQuery
- Dataproc
Abilita le API necessarie. L'operazione richiede circa un minuto e, al termine, viene visualizzato un messaggio di conferma.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Configura l'accesso alla rete
Dataproc Serverless richiede l'abilitazione dell'accesso privato Google nella regione in cui verranno eseguiti i job Spark, poiché i driver e gli esecutori Spark hanno solo IP privati. Esegui il comando seguente per abilitarlo nella subnet default.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
Puoi verificare che l'accesso privato Google sia abilitato tramite il comando seguente, che restituirà True o False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Crea un bucket di archiviazione
Crea un bucket di archiviazione che verrà utilizzato per archiviare gli asset creati in questo codelab.
Scegli un nome per il bucket. I nomi dei bucket devono essere univoci a livello globale per tutti gli utenti.
export BUCKET=<your-bucket-name>
Crea il bucket nella regione in cui intendi eseguire i job Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Puoi vedere che il bucket è disponibile nella console Cloud Storage console. Puoi anche eseguire gsutil ls per visualizzare il bucket.
Crea un server di cronologia permanente
L'interfaccia utente di Spark fornisce un ricco set di strumenti di debug e insight sui job Spark. Per visualizzare l'interfaccia utente di Spark per i job Dataproc Serverless completati, devi creare un cluster Dataproc a nodo singolo da utilizzare come server di cronologia permanente.
Imposta un nome per il server di cronologia permanente.
PHS_CLUSTER_NAME=my-phs
Esegui il comando seguente.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
L'interfaccia utente di Spark e il server di cronologia permanente verranno esplorati in modo più dettagliato più avanti nel codelab.
3. Esegui job Spark serverless con batch Dataproc
In questo esempio, utilizzerai un set di dati del set di dati pubblico Citi Bike Trips di New York (NYC). NYC Citi Bikes è un sistema di bike sharing a pagamento all'interno di NYC. Eseguirai alcune semplici trasformazioni e stamperai i primi dieci ID delle stazioni Citi Bike più utilizzati. Questo esempio utilizza anche il connettore open source spark-bigquery-connector per leggere e scrivere dati senza problemi tra Spark e BigQuery.
Clona il seguente repository GitHub e cd nella directory contenente il file citibike.py.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Invia il job a Spark serverless utilizzando il Cloud SDK, disponibile in Cloud Shell per impostazione predefinita. Esegui il comando seguente nella shell, che utilizza Cloud SDK e l'API Dataproc Batches per inviare job Spark serverless.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Per analizzare questo comando:
gcloud dataproc batches submitfa riferimento all'API Dataproc Batches.pysparkindica che stai inviando un job PySpark.--batchè il nome del job. Se non viene fornito, verrà utilizzato un UUID generato casualmente.--region=${REGION}è la regione geografica in cui verrà elaborato il job.--deps-bucket=${BUCKET}è la posizione in cui viene caricato il file Python locale prima dell'esecuzione nell'ambiente serverless.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarinclude il file JAR per il connettore spark-bigquery-connector nell'ambiente di runtime Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}è il nome completo del server di cronologia permanente. Qui vengono archiviati i dati sugli eventi di Spark (separati dall'output della console) e sono visualizzabili dall'interfaccia utente di Spark.- Il
--finale indica che tutto ciò che segue saranno argomenti di runtime per il programma. In questo caso, stai inviando il nome del bucket, come richiesto dal job.
Quando il batch viene inviato, viene visualizzato il seguente output.
Batch [citibike-job] submitted.
Dopo un paio di minuti, verrà visualizzato il seguente output insieme ai metadati del job.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Nella prossima sezione, imparerai a individuare i log di questo job.
Altre funzionalità
Con Spark serverless, hai altre opzioni per l'esecuzione dei job.
- Puoi creare un'immagine Docker personalizzata su cui viene eseguito il job. Questo è un ottimo modo per includere ulteriori dipendenze, tra cui le librerie Python e R.
- Puoi connettere un'istanza Dataproc Metastore al job per accedere ai metadati Hive.
- Per un maggiore controllo, Dataproc Serverless supporta la configurazione di un piccolo insieme di proprietà Spark.
4. Metriche e osservabilità di Dataproc
La console Batch di Dataproc elenca tutti i job Dataproc Serverless. Nella console, vedrai ID batch, Posizione, Stato, Ora di creazione, Tempo trascorso e Tipo di ogni job. Fai clic su ID batch del job per visualizzare ulteriori informazioni.
In questa pagina vedrai informazioni come Monitoraggio, che mostrano quanti esecutori Spark batch sono stati utilizzati nel tempo dal tuo job (indicando la quantità di scalabilità automatica).
Nella scheda Dettagli vedrai altri metadati sul job, inclusi gli argomenti e i parametri inviati con il job.
Puoi anche accedere a tutti i log da questa pagina. Quando vengono eseguiti i job Dataproc Serverless, vengono generati tre diversi set di log:
- A livello di servizio
- Output console
- Logging degli eventi Spark
A livello di servizio, include i log generati dal servizio Dataproc Serverless. Questi includono elementi come Dataproc Serverless che richiede CPU aggiuntive per la scalabilità automatica. Puoi visualizzarli facendo clic su Visualizza log , che aprirà Cloud Logging.
L'output della console può essere visualizzato in Output. Questo è l'output generato dal job, inclusi i metadati stampati da Spark all'inizio di un job o le istruzioni di stampa incorporate nel job.
Il logging degli eventi Spark è accessibile dall'interfaccia utente di Spark. Poiché hai fornito al job Spark un server di cronologia permanente, puoi accedere all'interfaccia utente di Spark facendo clic su Visualizza server di cronologia Spark, che contiene informazioni sui job Spark eseguiti in precedenza. Puoi scoprire di più sull'interfaccia utente di Spark nella documentazione ufficiale di Spark.
5. Modelli Dataproc: BQ -> GCS
I modelli Dataproc sono strumenti open source che semplificano ulteriormente le attività di elaborazione dei dati nel cloud. Fungono da wrapper per Dataproc Serverless e includono modelli per molte attività di importazione ed esportazione dei dati, tra cui:
BigQuerytoGCSeGCStoBigQueryGCStoBigTableGCStoJDBCeJDBCtoGCSHivetoBigQueryMongotoGCSeGCStoMongo
L'elenco completo è disponibile nel file README.
In questa sezione utilizzerai i modelli Dataproc per esportare i dati da BigQuery a GCS.
Clona il repository
Clona il repository e passa alla cartella python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Configura l'ambiente
Ora imposterai le variabili di ambiente. I modelli Dataproc utilizzano la variabile di ambiente GCP_PROJECT per l'ID progetto, quindi impostala su GOOGLE_CLOUD_PROJECT..
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
La regione dovrebbe essere impostata nell'ambiente in precedenza. In caso contrario, impostala qui.
export REGION=<region>
I modelli Dataproc utilizzano il connettore spark-bigquery-connector per l'elaborazione dei job BigQuery e richiedono che l'URI sia incluso in una variabile di ambiente JARS. Imposta la variabile JARS.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Configura i parametri del modello
Imposta il nome di un bucket di gestione temporanea da utilizzare per il servizio.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Poi imposterai alcune variabili specifiche del job. Per la tabella di input, farai di nuovo riferimento al set di dati BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Puoi scegliere tra csv, parquet, avro o json. Per questo codelab, scegli CSV. Nella prossima sezione vedrai come utilizzare i modelli Dataproc per convertire i tipi di file.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Imposta la modalità di output su overwrite. Puoi scegliere tra overwrite, append, ignore o errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Imposta la posizione di output GCS su un percorso nel bucket.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Esegui il modello
Esegui il modello BIGQUERYTOGCS specificandolo di seguito e fornendo i parametri di input impostati.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
L'output sarà piuttosto rumoroso, ma dopo circa un minuto vedrai quanto segue.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Puoi verificare che i file siano stati generati eseguendo il comando seguente.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Per impostazione predefinita, Spark scrive in più file, a seconda della quantità di dati. In questo caso, vedrai circa 30 file generati. I nomi dei file di output di Spark sono formattati con part- seguito da un numero di cinque cifre (che indica il numero di parte) e da una stringa hash. Per grandi quantità di dati, Spark in genere scrive in più file. Un esempio di nome file è part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Modelli Dataproc: CSV in Parquet
Ora utilizzerai i modelli Dataproc per convertire i dati in GCS da un tipo di file a un altro utilizzando il GCSTOGCS. Questo modello utilizza SparkSQL e offre la possibilità di inviare anche una query SparkSQL da elaborare durante la trasformazione per un'ulteriore elaborazione.
Conferma le variabili di ambiente
Verifica che GCP_PROJECT, REGION e GCS_STAGING_BUCKET siano impostati nella sezione precedente.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Imposta parametri del modello
Ora imposterai i parametri di configurazione per GCStoGCS. Inizia con la posizione dei file di input. Tieni presente che si tratta di una directory e non di un file specifico, poiché verranno elaborati tutti i file nella directory. Imposta questo valore su BIGQUERY_GCS_OUTPUT_LOCATION.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Imposta il formato del file di input.
GCS_TO_GCS_INPUT_FORMAT=csv
Imposta il formato di output desiderato. Puoi scegliere tra Parquet, JSON, Avro o CSV.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Imposta la modalità di output su overwrite. Puoi scegliere tra overwrite, append, ignore o errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Imposta la posizione di output.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Esegui il modello
Esegui il modello GCStoGCS.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
L'output sarà piuttosto rumoroso, ma dopo circa un minuto dovresti visualizzare un messaggio di conferma simile al seguente.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Puoi verificare che i file siano stati generati eseguendo il comando seguente.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Con questo modello, hai anche la possibilità di fornire query SparkSQL passando gcs.to.gcs.temp.view.name e gcs.to.gcs.sql.query al modello, consentendo l'esecuzione di una query SparkSQL sui dati prima della scrittura in GCS.
7. Libera spazio
Per evitare addebiti non necessari sul tuo account GCP al termine di questo codelab:
- Elimina il bucket Cloud Storage per l'ambiente che hai creato.
gsutil rm -r gs://${BUCKET}
- Elimina il cluster Dataproc utilizzato per il server di cronologia permanente.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Elimina i job Dataproc Serverless. Vai alla console Batch, fai clic sulla casella accanto a ogni job che vuoi eliminare e fai clic su ELIMINA.
Se hai creato un progetto solo per questo codelab, puoi anche eliminare il progetto (facoltativo):
- Nella console di GCP, vai alla pagina Progetti.
- Nell'elenco dei progetti, seleziona quello da eliminare e fai clic su Elimina.
- Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.
8. Passaggi successivi
Le seguenti risorse forniscono altri modi per sfruttare Spark serverless:
- Scopri come orchestrare i flussi di lavoro Dataproc Serverless utilizzando Cloud Composer.
- Scopri come integrare Dataproc Serverless con le pipeline Kubeflow.