1. Introduzione
I workflow sono un caso d'uso comune nell'analisi dei dati: implicano l'acquisizione, la trasformazione e l'analisi dei dati per individuare le informazioni significative al loro interno. In Google Cloud, lo strumento per orchestrare i workflow è Cloud Composer, una versione ospitata del popolare strumento per workflow open source Apache Airflow. In questo lab, utilizzerai Cloud Composer per creare un semplice workflow che crea un cluster Cloud Dataproc, lo analizza utilizzando Cloud Dataproc e Apache Hadoop, quindi elimina il cluster Cloud Dataproc.
Che cos'è Cloud Composer?
Cloud Composer è un servizio di orchestrazione del flusso di lavoro completamente gestito che consente di creare, programmare e monitorare pipeline su cloud e in data center on-premise. Realizzato sulla base del noto progetto open source Apache Airflow e gestito tramite il linguaggio di programmazione Python, Cloud Composer è privo di vincoli e facile da utilizzare.
Utilizzando Cloud Composer anziché un'istanza locale di Apache Airflow, gli utenti possono usufruire del meglio di Airflow senza costi di installazione o gestione.
Che cos'è Apache Airflow?
Apache Airflow è uno strumento open source utilizzato per creare, pianificare e monitorare i workflow in modo programmatico. Di seguito sono riportati alcuni termini chiave da ricordare relativi ad Airflow che vedrai in tutto il lab:
- DAG: un DAG (grafo diretto aciclico) è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG, chiamati anche flussi di lavoro, sono definiti in file Python standard
- Operatore: un operatore descrive una singola attività in un workflow
Che cos'è Cloud Dataproc?
Cloud Dataproc è il servizio Apache Spark e Apache Hadoop completamente gestito di Google Cloud. Cloud Dataproc si integra facilmente con altri servizi GCP, offrendo una piattaforma completa e avanzata per l'elaborazione dati, l'analisi e il machine learning.
Cosa farai
Questo codelab mostra come creare ed eseguire un flusso di lavoro Apache Airflow in Cloud Composer che completa le seguenti attività:
- Crea un cluster Cloud Dataproc
- Esegue un job di conteggio parole Apache Hadoop sul cluster e invia i risultati a Cloud Storage
- Elimina il cluster
Cosa imparerai a fare
- Come creare ed eseguire un flusso di lavoro Apache Airflow in Cloud Composer
- Come utilizzare Cloud Composer e Cloud Dataproc per eseguire un'analisi su un set di dati
- Come accedere all'ambiente Cloud Composer tramite la console Google Cloud, Cloud SDK e l'interfaccia web di Airflow
Che cosa ti serve
- Account Google Cloud
- Conoscenza di base dell'interfaccia a riga di comando
- Conoscenza di base di Python
2. Configurazione di GCP
Crea il progetto
Seleziona o crea un progetto Google Cloud.
Prendi nota dell'ID progetto, che utilizzerai nei passaggi successivi.
Se stai creando un nuovo progetto, l'ID progetto si trova subito sotto il nome del progetto nella pagina di creazione. |
|
Se hai già creato un progetto, puoi trovare l'ID nella home page della console nella scheda Informazioni sul progetto. |
|
Abilita le API
Abilita le API Cloud Composer, Cloud Dataproc e Cloud Storage.Una volta abilitate, puoi ignorare il pulsante "Vai alle credenziali" e procedere al passaggio successivo del tutorial. |
|
Crea l'ambiente Composer
Crea un ambiente Cloud Composer con la seguente configurazione:
Tutte le altre configurazioni possono rimanere invariate. Fai clic su "Crea" in basso. |
|
Crea un bucket Cloud Storage
Nel tuo progetto, crea un bucket Cloud Storage con la seguente configurazione:
Quando è tutto pronto, premi "Crea". |
|
3. Configurazione di Apache Airflow
Visualizzazione delle informazioni sull'ambiente Composer
Nella console di GCP, apri la pagina Ambienti.
Fai clic sul nome dell'ambiente per visualizzarne i dettagli.
La pagina Dettagli ambiente fornisce informazioni come l'URL dell'interfaccia web di Airflow, l'ID cluster Google Kubernetes Engine, il nome del bucket Cloud Storage e il percorso della cartella /dags.
In Airflow, un DAG (grafo diretto aciclico) è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG, chiamati anche flussi di lavoro, sono definiti in file Python standard. Cloud Composer pianifica solo i DAG nella cartella /dags. La cartella /dags si trova nel bucket Cloud Storage che Cloud Composer crea automaticamente quando crei l'ambiente.
Impostazione delle variabili di ambiente Apache Airflow
Le variabili Apache Airflow sono un concetto specifico di questa piattaforma e sono diverse dalle variabili di ambiente. In questo passaggio, imposterai le tre variabili Airflow seguenti: gcp_project, gcs_bucket e gce_zone.
Utilizzare gcloud per impostare le variabili
Innanzitutto, apri Cloud Shell, in cui Cloud SDK è già installato.
Imposta la variabile di ambiente COMPOSER_INSTANCE sul nome del tuo ambiente Composer
COMPOSER_INSTANCE=my-composer-environment
Per impostare le variabili Airflow utilizzando lo strumento a riga di comando gcloud, utilizza il comando gcloud composer environments run con il sottocomando variables. Questo comando gcloud composer esegue il sottocomando dell'interfaccia a riga di comando di Airflow variables. Il sottocomando passa gli argomenti allo strumento a riga di comando gcloud.
Esegui questo comando tre volte, sostituendo le variabili con quelle pertinenti al tuo progetto.
Imposta gcp_project utilizzando il comando seguente, sostituendo <your-project-id> con l'ID progetto che hai annotato nel passaggio 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcp_project <your-project-id>
L'output sarà simile a questo
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
Imposta gcs_bucket utilizzando il comando seguente, sostituendo <your-bucket-name> con l'ID bucket che hai annotato nel passaggio 2. Se hai seguito il nostro consiglio, il nome del bucket corrisponde all'ID progetto. L'output sarà simile a quello del comando precedente.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
Imposta gce_zone utilizzando il seguente comando. L'output sarà simile a quello dei comandi precedenti.
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --set gce_zone us-central1-a
(Facoltativo) Utilizzo di gcloud per visualizzare una variabile
Per vedere il valore di una variabile, esegui il sottocomando dell'interfaccia a riga di comando di Airflow variables con l'argomento get oppure usa la UI di Airflow.
Ad esempio:
gcloud composer environments run ${COMPOSER_INSTANCE} \
--location us-central1 variables -- --get gcs_bucket
Puoi farlo con una qualsiasi delle tre variabili che hai appena impostato: gcp_project, gcs_bucket e gce_zone.
4. Flusso di lavoro di esempio
Diamo un'occhiata al codice del DAG che utilizzeremo nel passaggio 5. Non preoccuparti ancora di scaricare i file, segui semplicemente i passaggi qui.
Ci sono molti aspetti da analizzare, quindi vediamo di fare un po' di chiarezza.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
Iniziamo con alcuni import di Airflow:
airflow.models: ci consente di accedere e creare dati nel database Airflow.airflow.contrib.operators: dove vivono gli operatori della community. In questo caso, abbiamo bisogno didataproc_operatorper accedere all'API Dataproc.airflow.utils.trigger_rule: per aggiungere regole di attivazione ai nostri operatori. Le regole di attivazione consentono un controllo granulare sull'esecuzione di un operatore in relazione allo stato dei relativi genitori.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
Specifica la posizione del file di output. La riga importante qui è models.Variable.get('gcs_bucket'), che recupererà il valore della variabile gcs_bucket dal database Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR: posizione del file .jar che alla fine verrà eseguito sul cluster Cloud Dataproc. È già ospitato su Google Cloud per te.input_file: percorso del file contenente i dati su cui verrà eseguito il calcolo del job Hadoop. Caricheremo i dati in questa posizione insieme nel passaggio 5.wordcount_args: argomenti che verranno passati al file jar.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
In questo modo otterremo un oggetto datetime equivalente che rappresenta la mezzanotte del giorno precedente. Ad esempio, se viene eseguito alle 11:00 del 4 marzo, l'oggetto datetime rappresenterà le ore 00:00 del 3 marzo. Questo ha a che fare con il modo in cui Airflow gestisce la pianificazione. Scopri di più qui.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
La variabile default_dag_args sotto forma di dizionario deve essere fornita ogni volta che viene creato un nuovo DAG:
'email_on_failure': indica se devono essere inviati avvisi via email quando un'attività non è riuscita'email_on_retry': indica se devono essere inviati avvisi via email quando viene eseguito un nuovo tentativo di esecuzione di un'attività'retries': indica il numero di tentativi che Airflow deve effettuare in caso di errore del DAG'retry_delay': indica per quanto tempo Airflow deve attendere prima di tentare un nuovo tentativo'project_id': indica al DAG l'ID progetto GCP a cui associarlo, che sarà necessario in un secondo momento con l'operatore Dataproc
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
L'utilizzo di with models.DAG indica allo script di includere tutto ciò che si trova sotto all'interno dello stesso DAG. Vediamo anche tre argomenti passati:
- La prima, una stringa, è il nome da assegnare al DAG che stiamo creando. In questo caso, stiamo utilizzando
composer_hadoop_tutorial. schedule_interval: un oggettodatetime.timedelta, che qui abbiamo impostato su un giorno. Ciò significa che questo DAG tenterà di essere eseguito una volta al giorno dopo le ore'start_date'impostate in precedenza in'default_dag_args'default_args: il dizionario che abbiamo creato in precedenza contenente gli argomenti predefiniti per il DAG
Crea un cluster Dataproc
Successivamente, creeremo un dataproc_operator.DataprocClusterCreateOperator che crea un cluster Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
All'interno di questo operatore, vediamo alcuni argomenti, tutti tranne il primo specifici per questo operatore:
task_id: proprio come in BashOperator, questo è il nome che assegniamo all'operatore, che è visibile dall'interfaccia utente di Airflowcluster_name: il nome che assegniamo al cluster Cloud Dataproc. In questo caso, l'abbiamo chiamatacomposer-hadoop-tutorial-cluster-{{ ds_nodash }}(consulta il riquadro informazioni per ulteriori informazioni facoltative).num_workers: il numero di worker che allochiamo al cluster Cloud Dataproczone: la regione geografica in cui vogliamo che risieda il cluster, come salvata nel database Airflow. Verrà letta la variabile'gce_zone'che abbiamo impostato nel passaggio 3.master_machine_type: il tipo di macchina che vogliamo allocare al master Cloud Dataprocworker_machine_type: il tipo di macchina che vogliamo allocare al worker Cloud Dataproc
Inviare un job Apache Hadoop
dataproc_operator.DataProcHadoopOperator ci consente di inviare un job a un cluster Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
Forniamo diversi parametri:
task_id: il nome che assegniamo a questa parte del DAGmain_jar- Percorso del file .jar che vogliamo eseguire sul clustercluster_name: nome del cluster su cui eseguire il job, che noterai è identico a quello che troviamo nell'operatore precedentearguments- Argomenti passati al file jar, come se eseguissi il file .jar dalla riga di comando
Elimina il cluster
L'ultimo operatore che creeremo è dataproc_operator.DataprocClusterDeleteOperator.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
Come suggerisce il nome, questo operatore elimina un determinato cluster Cloud Dataproc. Qui vediamo tre argomentazioni:
task_id: proprio come in BashOperator, questo è il nome che assegniamo all'operatore, che è visibile dall'interfaccia utente di Airflowcluster_name: il nome che assegniamo al cluster Cloud Dataproc. In questo caso, l'abbiamo chiamatocomposer-hadoop-tutorial-cluster-{{ ds_nodash }}(per ulteriori informazioni facoltative, consulta il riquadro informazioni dopo "Crea un cluster Dataproc").trigger_rule: abbiamo menzionato brevemente le regole di attivazione durante le importazioni all'inizio di questo passaggio, ma qui ne abbiamo una in azione. Per impostazione predefinita, un operatore Airflow non viene eseguito a meno che tutti gli operatori upstream non siano stati completati correttamente. La regola di attivazioneALL_DONErichiede solo che tutti gli operatori upstream siano stati completati, indipendentemente dal fatto che abbiano avuto esito positivo o meno. In questo caso, anche se il job Hadoop non è riuscito, vogliamo comunque smantellare il cluster.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Infine, vogliamo che questi operatori vengano eseguiti in un ordine particolare e possiamo indicarlo utilizzando gli operatori di spostamento bit Python. In questo caso, create_dataproc_cluster verrà sempre eseguito per primo, seguito da run_dataproc_hadoop e infine da delete_dataproc_cluster.
Mettendo tutto insieme, il codice ha questo aspetto:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Carica i file Airflow su Cloud Storage
Copiare il DAG nella cartella /dags
- Innanzitutto, apri Cloud Shell, in cui Cloud SDK è già installato.
- Clona il repository dei campioni Python e passa alla directory composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- Esegui questo comando per impostare il nome della cartella dei DAG su una variabile di ambiente
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
- Esegui questo comando
gsutilper copiare il codice del tutorial nella posizione in cui viene creata la cartella /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
L'output sarà simile al seguente:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. usa la UI di Airflow
Per accedere all'interfaccia web di Airflow utilizzando la console Google Cloud:
|
|
Per informazioni sull'interfaccia utente di Airflow, consulta Accesso all'interfaccia web di Airflow.
Visualizza variabili
Le variabili che hai impostato in precedenza vengono mantenute nel tuo ambiente. Puoi visualizzare le variabili selezionando Admin > Variables (Amministratore > Variabili) dalla barra dei menu dell'interfaccia utente di Airflow.

Esplorare le esecuzioni di DAG
Quando carichi il file DAG nella cartella dags di Cloud Storage, Cloud Composer analizza il file. Se non vengono rilevati errori, il nome del workflow viene visualizzato nell'elenco di DAG e il workflow viene messo in coda per essere eseguito immediatamente. Per esaminare i DAG, fai clic su DAG nella parte superiore della pagina.

Fai clic su composer_hadoop_tutorial per aprire la pagina dei dettagli del DAG. Questa pagina include una rappresentazione grafica delle attività e delle dipendenze del workflow.

Ora, nella barra degli strumenti, fai clic su Visualizzazione grafo e passa il mouse sopra il grafico di ogni attività per visualizzarne lo stato. Puoi notare che il bordo attorno a ciascuna attività indica anche lo stato (bordo verde = in esecuzione; rosso = non riuscita e così via).

Per eseguire di nuovo il workflow dalla visualizzazione Grafico:
- Nella visualizzazione a grafo della UI di Airflow, fai clic sull'immagine
create_dataproc_cluster. - Fai clic su Cancella per reimpostare le tre attività, quindi fai clic su Ok per confermare.

Puoi anche controllare lo stato e i risultati del flusso di lavoro composer-hadoop-tutorial accedendo alle seguenti pagine della console GCP:
- Cluster Cloud Dataproc per monitorare la creazione e l'eliminazione del cluster. Tieni presente che il cluster creato dal workflow è temporaneo. Esiste solo per la durata del workflow e viene eliminato dall'ultima attività del workflow.
- Job Cloud Dataproc per visualizzare o monitorare il job di conteggio parole di Apache Hadoop. Fai clic sull'ID job per visualizzarne l'output del log.
- Browser Cloud Storage per visualizzare i risultati del conteggio delle parole nella cartella
wordcountdel bucket Cloud Storage che hai creato per questo codelab.
7. Esegui la pulizia
Per evitare che al tuo account GCP vengano addebitate le risorse utilizzate in questo codelab:
- (Facoltativo) Per salvare i dati, scaricali dal bucket Cloud Storage per l'ambiente Cloud Composer e dal bucket di archiviazione che hai creato per questo codelab.
- Elimina il bucket Cloud Storage che hai creato per questo codelab.
- Elimina il bucket Cloud Storage per l'ambiente.
- Elimina l'ambiente Cloud Composer. Tieni presente che l'eliminazione dell'ambiente non comporta l'eliminazione del bucket di archiviazione per l'ambiente.
Puoi anche eliminare facoltativamente il progetto:
- Nella console di GCP, vai alla pagina Progetti.
- Nell'elenco dei progetti, seleziona quello da eliminare e fai clic su Elimina.
- Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.







