1. Panoramica - Google Dataproc
Dataproc è un servizio completamente gestito e a scalabilità elevata per l'esecuzione di Apache Spark, Apache Flink, Presto e molti altri strumenti e framework open source. Utilizza Dataproc per la modernizzazione dei data lake, ETL / ELT e data science sicura su scala globale. Dataproc è inoltre completamente integrato con diversi servizi Google Cloud tra cui BigQuery, Cloud Storage, Vertex AI e Dataplex.
Dataproc è disponibile in tre versioni:
- Dataproc Serverless ti consente di eseguire job PySpark senza dover configurare l'infrastruttura e la scalabilità automatica. Dataproc Serverless supporta carichi di lavoro batch e sessioni / blocchi note di PySpark.
- Dataproc su Google Compute Engine consente di gestire un cluster Hadoop YARN per carichi di lavoro Spark basati su YARN, oltre a strumenti open source come Flink e Presto. Puoi personalizzare i cluster basati su cloud con la scalabilità verticale o orizzontale che preferisci, inclusa la scalabilità automatica.
- Dataproc su Google Kubernetes Engine ti consente di configurare i cluster virtuali Dataproc nella tua infrastruttura GKE per l'invio di job Spark, PySpark, SparkR o Spark SQL.
In questo codelab imparerai diversi modi per utilizzare Dataproc Serverless.
Apache Spark è stato originariamente creato per essere eseguito su cluster Hadoop e ha utilizzato YARN come gestore delle risorse. La gestione dei cluster Hadoop richiede una serie specifica di competenze e la garanzia che molte manopole diverse sui cluster siano configurate correttamente. Ciò si aggiunge a un set separato di manopole che Spark richiede all'utente di impostare. Questo porta a molti scenari in cui gli sviluppatori dedicano più tempo alla configurazione dell'infrastruttura invece di lavorare sul codice Spark stesso.
Dataproc Serverless elimina la necessità di configurare manualmente cluster Hadoop o Spark. Dataproc Serverless non viene eseguito su Hadoop e utilizza la propria allocazione delle risorse dinamiche per determinare i requisiti delle risorse, inclusa la scalabilità automatica. Un piccolo sottoinsieme di proprietà Spark è ancora personalizzabile con Dataproc Serverless, ma nella maggior parte delle istanze non è necessario modificarle.
2. Configura
Inizierai configurando l'ambiente e le risorse utilizzati in questo codelab.
Crea un progetto Google Cloud. Puoi utilizzarne uno esistente.
Fai clic su Cloud Shell nella barra degli strumenti della console Cloud.
Cloud Shell fornisce un ambiente Shell pronto all'uso che puoi usare per questo codelab.
Cloud Shell imposterà il nome del progetto per impostazione predefinita. Ricontrolla eseguendo echo $GOOGLE_CLOUD_PROJECT
. Se non vedi il tuo ID progetto nell'output, impostalo.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Imposta una regione di Compute Engine per le tue risorse, ad esempio us-central1
o europe-west2
.
export REGION=<your-region>
Abilita API
Il codelab utilizza le seguenti API:
- BigQuery
- Dataproc
Abilita le API necessarie. L'operazione richiederà circa un minuto. Al termine verrà visualizzato un messaggio di operazione riuscita.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Configurare l'accesso alla rete
Dataproc Serverless richiede l'abilitazione dell'accesso privato Google nella regione in cui verranno eseguiti i job Spark, poiché i driver e gli esecutori Spark hanno solo IP privati. Esegui questo comando per abilitarlo nella subnet default
.
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
Puoi verificare che l'accesso privato Google sia attivato nel seguente modo, che restituirà True
o False
.
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
Crea un bucket di archiviazione
Crea un bucket di archiviazione che verrà utilizzato per archiviare gli asset creati in questo codelab.
Scegli un nome per il bucket. I nomi dei bucket devono essere univoci a livello globale per tutti gli utenti.
export BUCKET=<your-bucket-name>
Crea il bucket nella regione in cui intendi eseguire i job Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Puoi verificare che il bucket è disponibile nella console di Cloud Storage. Puoi anche eseguire gsutil ls
per visualizzare il bucket.
Crea un server di cronologia permanente
L'UI di Spark fornisce un ricco set di strumenti di debug e informazioni sui job Spark. Per visualizzare l'interfaccia utente di Spark per i job serverless Dataproc completati, devi creare un cluster Dataproc a nodo singolo da utilizzare come server di cronologia permanente.
Imposta un nome per il server di cronologia permanente.
PHS_CLUSTER_NAME=my-phs
Esegui il comando seguente.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
La UI di Spark e il server di cronologia permanente verranno esplorati in modo più dettagliato più avanti nel codelab.
3. Esecuzione di job Spark serverless con batch Dataproc
In questo esempio lavorerai con un set di dati del set di dati pubblico Citi Bike Trips di New York (NYC). NYC Citi bikes è un sistema di bike sharing a pagamento all'interno di New York. Eseguirai alcune semplici trasformazioni e stamperai i dieci ID delle stazioni Citi bike più popolari. Inoltre, in questo esempio viene utilizzato in modo particolare il connettore open source spark-bigquery-connector per leggere e scrivere dati senza problemi tra Spark e BigQuery.
Clona il seguente repository GitHub e cd
nella directory contenente il file citibike.py
.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Invia il job a Spark serverless utilizzando Cloud SDK, disponibile per impostazione predefinita in Cloud Shell. Esegui il comando seguente nella shell, che utilizza Cloud SDK e l'API Dataproc Batches per inviare job Spark serverless.
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
Per suddividerlo:
gcloud dataproc batches submit
fa riferimento all'API Dataproc Batches.pyspark
indica che stai inviando un job PySpark.--batch
è il nome del job. Se non viene specificato, verrà utilizzato un UUID generato in modo casuale.--region=${REGION}
è la regione geografica in cui verrà elaborato il job.--deps-bucket=${BUCKET}
è il luogo in cui viene caricato il file Python locale prima dell'esecuzione nell'ambiente serverless.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
include il file jar per spark-bigquery-connector nell'ambiente di runtime Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
è il nome completo del server di cronologia permanente. Qui è dove vengono archiviati i dati sugli eventi Spark (separati dall'output della console) e sono visualizzabili dalla UI Spark.- L'elemento
--
finale indica che tutto il resto sarà argomenti di runtime per il programma. In questo caso, stai inviando il nome del bucket, come richiesto dal job.
Quando il batch viene inviato, viene visualizzato il seguente output.
Batch [citibike-job] submitted.
Dopo un paio di minuti, vedrai il seguente output insieme ai metadati del job.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Nella sezione successiva, imparerai come individuare i log per questo job.
Altre funzionalità
Con Spark Serverless, hai a disposizione ulteriori opzioni per l'esecuzione dei tuoi job.
- Puoi creare un'immagine Docker personalizzata su cui viene eseguito il job. Questo è un ottimo modo per includere dipendenze aggiuntive, tra cui le librerie Python e R.
- Puoi connettere un'istanza Dataproc Metastore al tuo job per accedere ai metadati Hive.
- Per un maggiore controllo, Dataproc Serverless supporta la configurazione di un piccolo insieme di proprietà Spark.
4. Metriche e osservabilità di Dataproc
La console Dataproc Batches elenca tutti i tuoi job Dataproc serverless. Nella console, vedrai ID batch, Posizione, Stato, Ora di creazione, Tempo trascorso e Tipo di ogni job. Fai clic sull'ID batch del job per visualizzare ulteriori informazioni al riguardo.
In questa pagina, troverai informazioni come Monitoring, che mostrano quanti esecutori Spark batch sono stati utilizzati dal job nel tempo (indicando il livello di scalabilità automatica).
Nella scheda Dettagli saranno visualizzati più metadati sul job, inclusi eventuali argomenti e parametri inviati con il job.
Da questa pagina puoi anche accedere a tutti i log. Quando vengono eseguiti i job serverless Dataproc, vengono generati tre diversi set di log:
- Livello di servizio
- Output console
- Logging eventi Spark
A livello di servizio, include i log generati dal servizio serverless Dataproc. Sono inclusi elementi come Dataproc Serverless che richiede CPU aggiuntive per la scalabilità automatica. Per visualizzarli, fai clic su Visualizza log per aprire Cloud Logging.
Puoi visualizzare l'output della console in Output. Si tratta dell'output generato dal job, inclusi i metadati stampati da Spark all'avvio di un processo o eventuali istruzioni di stampa incorporate nel processo.
Il logging eventi di Spark è accessibile dalla UI di Spark. Poiché hai fornito al job Spark un server di cronologia permanente, puoi accedere all'interfaccia utente di Spark facendo clic su Visualizza server di cronologia Spark, che contiene informazioni sui job Spark eseguiti in precedenza. Per ulteriori informazioni sulla UI di Spark, consulta la documentazione ufficiale di Spark.
5. Modelli Dataproc: BQ -> GCS
I modelli Dataproc sono strumenti open source che aiutano a semplificare ulteriormente le attività di elaborazione dei dati nel cloud. Questi fungono da wrapper per Dataproc Serverless e includono modelli per molte attività di importazione ed esportazione di dati, tra cui:
BigQuerytoGCS
eGCStoBigQuery
GCStoBigTable
GCStoJDBC
eJDBCtoGCS
HivetoBigQuery
MongotoGCS
eGCStoMongo
L'elenco completo è disponibile README.
In questa sezione utilizzerai i modelli Dataproc per esportare i dati da BigQuery in GCS.
Clona il repository
Clona il repository e passa alla cartella python
.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
configura l'ambiente
Ora imposta le variabili di ambiente. I modelli Dataproc utilizzano la variabile di ambiente GCP_PROJECT
per l'ID progetto, quindi impostala su un valore pari a GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
La tua regione deve essere impostata nell'ambiente precedente. In caso contrario, impostalo qui.
export REGION=<region>
I modelli Dataproc utilizzano spark-bigquery-conector per elaborare i job BigQuery e richiedono che l'URI sia incluso in una variabile di ambiente JARS
. Imposta la variabile JARS
.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Configura i parametri del modello
Imposta il nome di un bucket temporaneo da utilizzare per il servizio.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Poi, imposta alcune variabili specifiche del job. Per la tabella di input, farò nuovamente riferimento al set di dati BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Puoi scegliere csv
, parquet
, avro
o json
. Per questo codelab, scegli CSV. Nella sezione successiva viene spiegato come utilizzare i modelli Dataproc per convertire tipi di file.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Imposta la modalità di output su overwrite
. Puoi scegliere tra overwrite
, append
, ignore
o errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Imposta la posizione di output GCS in modo che sia un percorso nel bucket.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Esegui il modello
Esegui il modello BIGQUERYTOGCS
specificandolo di seguito e fornendo i parametri di input impostati.
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
L'output sarà abbastanza rumoroso, ma dopo circa un minuto vedrai quanto segue.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Puoi verificare che i file siano stati generati eseguendo questo comando.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Per impostazione predefinita, Spark scrive su più file, a seconda della quantità di dati. In questo caso, verranno generati circa 30 file. I nomi dei file di output di Spark sono formattati con part
seguito da un numero a cinque cifre (che indica il numero di parte) e da una stringa hash. Per grandi quantità di dati, Spark in genere scrive su diversi file. Un nome di file di esempio è part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
.
6. Modelli Dataproc: da CSV a parquet
Ora utilizzerai i modelli Dataproc per convertire i dati in GCS da un tipo di file a un altro utilizzando GCSTOGCS. Questo modello utilizza SparkSQL e offre la possibilità di inviare anche una query SparkSQL da elaborare durante la trasformazione per ulteriori elaborazioni.
Conferma le variabili di ambiente
Verifica che i valori GCP_PROJECT
, REGION
e GCS_STAGING_BUCKET
siano impostati nella sezione precedente.
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
Imposta parametri del modello
Ora imposta i parametri di configurazione per GCStoGCS
. Inizia con la posizione dei file di input. Tieni presente che questa è una directory e non un file specifico, poiché verranno elaborati tutti i file nella directory. Imposta questo elemento su BIGQUERY_GCS_OUTPUT_LOCATION
.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Imposta il formato del file di input.
GCS_TO_GCS_INPUT_FORMAT=csv
Imposta il formato di output desiderato. Puoi scegliere parquet, json, avro o csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Imposta la modalità di output su overwrite
. Puoi scegliere tra overwrite
, append
, ignore
o errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Imposta il percorso di output.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Esegui il modello
Esegui il modello GCStoGCS
.
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
L'output sarà abbastanza rumoroso, ma dopo circa un minuto dovrebbe essere visualizzato un messaggio di operazione riuscita, come di seguito.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Puoi verificare che i file siano stati generati eseguendo questo comando.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Con questo modello, hai anche la possibilità di fornire query SparkSQL passando gcs.to.gcs.temp.view.name
e gcs.to.gcs.sql.query
al modello, consentendo l'esecuzione di una query SparkSQL sui dati prima di scrivere in GCS.
7. Esegui la pulizia delle risorse
Per evitare che al tuo account Google Cloud vengano addebitati inutilmente addebiti dopo il completamento di questo codelab:
- Elimina il bucket Cloud Storage per l'ambiente che hai creato.
gsutil rm -r gs://${BUCKET}
- Elimina il cluster Dataproc utilizzato per il server di cronologia permanente.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- Eliminare i job serverless Dataproc. Vai alla console batch, fai clic sulla casella accanto a ogni job da eliminare, quindi fai clic su ELIMINA.
Se hai creato un progetto solo per questo codelab, puoi anche eliminarlo:
- Nella console Google Cloud, vai alla pagina Progetti.
- Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
- Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.
8. Passaggi successivi
Le seguenti risorse forniscono ulteriori modi per sfruttare Spark serverless:
- Scopri come orchestrare i flussi di lavoro serverless di Dataproc utilizzando Cloud Composer.
- Scopri come integrare Dataproc Serverless con le pipeline di Kubeflow.