1. Introduzione - Google Dataproc
Dataproc è un servizio completamente gestito e a scalabilità elevata per l'esecuzione di Apache Spark, Apache Flink, Presto e molti altri strumenti e framework open source. Utilizza Dataproc per la modernizzazione dei data lake, ETL / ELT e data science sicura su scala globale. Dataproc è inoltre completamente integrato con diversi servizi Google Cloud tra cui BigQuery, Cloud Storage, Vertex AI e Dataplex.
Dataproc è disponibile in tre versioni:
- Dataproc Serverless ti consente di eseguire job PySpark senza dover configurare l'infrastruttura e la scalabilità automatica. Dataproc Serverless supporta carichi di lavoro batch e sessioni / blocchi note di PySpark.
- Dataproc su Google Compute Engine consente di gestire un cluster Hadoop YARN per carichi di lavoro Spark basati su YARN, oltre a strumenti open source come Flink e Presto. Puoi personalizzare i cluster basati su cloud con la scalabilità verticale o orizzontale che preferisci, inclusa la scalabilità automatica.
- Dataproc su Google Kubernetes Engine ti consente di configurare i cluster virtuali Dataproc nella tua infrastruttura GKE per l'invio di job Spark, PySpark, SparkR o Spark SQL.
2. Creazione di un cluster Dataproc su un VPC di Google Cloud
In questo passaggio creerai un cluster Dataproc su Google Cloud utilizzando la console Google Cloud.
Come primo passaggio, abilita l'API di servizio Dataproc nella console. Una volta attivato, cerca "Dataproc" nella barra di ricerca e fai clic su Crea cluster.
Seleziona Cluster su Compute Engine per utilizzare le VM di Google Compute Engine(GCE) come infrastruttura sottostante per eseguire i cluster Dataproc.
Ora ti trovi nella pagina Creazione del cluster.
Su questa pagina:
- Fornisci un nome univoco per il cluster.
- Seleziona la regione specifica. Puoi anche selezionare una zona, ma Dataproc offre la possibilità di sceglierne automaticamente una. Per questo codelab, seleziona "us-central1" e "us-central1-c".
- Seleziona lo stato "Standard" un tipo di cluster. Ciò garantisce la presenza di un nodo master.
- Nella scheda Configura nodi, verifica che il numero di worker creati sia due.
- Nella sezione Personalizza cluster, seleziona la casella accanto ad Abilita gateway dei componenti. Ciò consente l'accesso alle interfacce web sul cluster, tra cui la UI Spark, Yarn Node Manager e i blocchi note Jupyter.
- Nella sezione Componenti facoltativi, seleziona Blocco note Jupyter. Questa operazione configura il cluster con un server di blocchi note Jupyter.
- Lascia tutto invariato e fai clic su Crea cluster.
Verrà avviato un cluster Dataproc.
3. Avvia il cluster e accedi tramite SSH
Quando lo stato del cluster diventa In esecuzione, fai clic sul nome del cluster nella console Dataproc.
Fai clic sulla scheda Istanza VM per visualizzare il nodo master e i due nodi worker del cluster.
Fai clic su SSH accanto al nodo master per accedere al nodo master.
Esegui i comandi hdfs per visualizzare la struttura della directory.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. Interfacce web e gateway dei componenti
Nella console del cluster Dataproc, fai clic sul nome del cluster, quindi sulla scheda INTERFACCE WEB.
Mostra le interfacce web disponibili, tra cui Jupyter. Fai clic su Jupyter per aprire un blocco note Jupyter. Puoi utilizzarlo per creare blocchi note in PySpark archiviati su GCS. per archiviare il tuo blocco note su Google Cloud Storage e aprire un blocco note PySpark da utilizzare in questo codelab.
5. Monitora e osserva i job Spark
Quando il cluster Dataproc è in esecuzione, crea un job batch PySpark e invialo al cluster Dataproc.
Crea un bucket Google Cloud Storage (GCS) per archiviare lo script PySpark. Assicurati di creare il bucket nella stessa regione del cluster Dataproc.
Ora che il bucket GCS è stato creato, copia il file seguente in questo bucket.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
Questo script crea un DataFrame Spark di esempio e lo scrive come tabella Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
Invia questo script come job batch Spark in Dataproc. Fai clic su Job nel menu di navigazione a sinistra, quindi su Invia job.
Fornisci un ID job e una regione. Seleziona il cluster e fornisci il percorso GCS dello script Spark che hai copiato. Questo job verrà eseguito come job batch Spark su Dataproc.
In Proprietà, aggiungi la chiave spark.submit.deployMode
e il valore client
per assicurarti che il driver venga eseguito nel nodo master di Dataproc e non nei nodi worker. Fai clic su Invia per inviare il job batch a Dataproc.
Lo script Spark crea un DataFrame e scrive su una tabella Hive test_table_1
.
Una volta eseguito il job, potrai visualizzare le istruzioni di stampa della console nella scheda Monitoring.
Ora che la tabella Hive è stata creata, invia un altro job di query Hive per selezionare i contenuti della tabella e visualizzarli nella console.
Crea un altro job con le seguenti proprietà:
Nota che Tipo di job è impostato su Hive e il tipo di origine della query è Testo query, il che significa che scriveremo l'intera istruzione HiveQL all'interno della casella di testo Testo query.
Invia il job, mantenendo gli altri parametri come predefiniti.
Nota come HiveQL seleziona tutti i record e i display sulla console.
6. Scalabilità automatica
La scalabilità automatica è l'attività di stimare il "giusto" di nodi worker del cluster per un carico di lavoro.
L'API Dataproc responsabile della scalabilità automatica fornisce un meccanismo per automatizzare la gestione delle risorse del cluster e consente la scalabilità automatica delle VM dei worker del cluster. Un criterio di scalabilità automatica è una configurazione riutilizzabile che descrive in che modo i worker del cluster che utilizzano il criterio di scalabilità automatica dovrebbero scalare. Definisce i limiti, la frequenza e l'aggressività della scalabilità per fornire un controllo granulare sulle risorse del cluster nel corso del suo ciclo di vita.
I criteri di scalabilità automatica di Dataproc vengono scritti utilizzando file YAML e questi file vengono passati nel comando dell'interfaccia a riga di comando per la creazione del cluster o selezionati da un bucket GCS quando viene creato un cluster dalla console Cloud.
Ecco un esempio di criterio di scalabilità automatica di Dataproc :
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. Configurazione dei componenti facoltativi di Dataproc
Verrà avviato un cluster Dataproc.
Quando crei un cluster Dataproc, i componenti standard dell'ecosistema Apache Hadoop vengono installati automaticamente nel cluster (vedi Elenco delle versioni di Dataproc). Puoi installare sul cluster componenti aggiuntivi, denominati Componenti facoltativi, durante la sua creazione.
Durante la creazione del cluster Dataproc dalla console, abbiamo abilitato i componenti facoltativi e selezionato Blocco note Jupyter come componente facoltativo.
8. Esegui la pulizia delle risorse
Per eseguire la pulizia del cluster, fai clic su Arresta dopo aver selezionato il cluster dalla console Dataproc. Una volta arrestato il cluster, fai clic su Elimina per eliminarlo.
Dopo aver eliminato il cluster Dataproc, elimina i bucket GCS in cui è stato copiato il codice.
Per eseguire la pulizia delle risorse e interrompere qualsiasi fatturazione indesiderata, il cluster Dataproc deve essere prima arrestato e poi eliminato.
Prima di arrestare ed eliminare il cluster, assicurati che tutti i dati scritti nello spazio di archiviazione HDFS vengano copiati in GCS per un'archiviazione durevole.
Per arrestare il cluster, fai clic su Arresta.
Una volta arrestato il cluster, fai clic su Elimina per eliminarlo.
Nella finestra di dialogo di conferma, fai clic su Elimina per eliminare il cluster.