Attivazione di un DAG con Node.JS e Google Cloud Functions

1. Introduzione

Apache Airflow è progettato per eseguire i DAG in base a una pianificazione regolare, ma puoi anche attivarli in risposta a eventi, ad esempio una modifica in un bucket Cloud Storage o un messaggio inviato a Cloud Pub/Sub. Per farlo, i DAG di Cloud Composer possono essere attivati da Cloud Functions.

L'esempio in questo lab esegue un DAG semplice ogni volta che si verifica una modifica in un bucket Cloud Storage. Questo DAG utilizza BashOperator per eseguire un comando bash che stampa le informazioni sulle modifiche relative a ciò che è stato caricato nel bucket Cloud Storage.

Prima di iniziare questo lab, ti consigliamo di completare i codelab Introduzione a Cloud Composer e Introduzione a Cloud Functions. Se crei un ambiente Composer nel codelab Introduzione a Cloud Composer, puoi utilizzarlo in questo lab.

Cosa creerai

In questo codelab imparerai a:

  1. Carica un file su Google Cloud Storage, che
  2. Attiva una funzione Cloud Functions utilizzando il runtime Node.JS
  3. Questa funzione eseguirà un DAG in Google Cloud Composer
  4. che esegue un semplice comando Bash che stampa la modifica nel bucket Cloud Storage di Google Cloud Storage

1d3d3736624a923f.png

Obiettivi didattici

  • Come attivare un DAG Apache Airflow utilizzando Google Cloud Functions + Node.js

Cosa ti servirà

  • Account Google Cloud
  • Conoscenza di base di JavaScript
  • Conoscenza di base di Cloud Composer/Airflow e Cloud Functions
  • Familiarità con l'utilizzo dei comandi della CLI

2. Configurazione di GCP

Seleziona o crea il progetto

Seleziona o crea un progetto Google Cloud. Se stai creando un nuovo progetto, segui i passaggi descritti qui.

Prendi nota dell'ID progetto, che utilizzerai nei passaggi successivi.

Se stai creando un nuovo progetto, l'ID progetto si trova subito sotto il nome del progetto nella pagina di creazione.

Se hai già creato un progetto, puoi trovare l'ID nella home page della console nella scheda Informazioni sul progetto.

Abilita le API

Abilita le API Cloud Composer, Google Cloud Functions e Cloud Identity e Google Identity and Access Management (IAM).

Crea l'ambiente Composer

Crea un ambiente Cloud Composer con la seguente configurazione:

  • Nome: my-composer-environment
  • Posizione: la posizione geograficamente più vicina a te
  • Zona: qualsiasi zona della regione

Tutte le altre configurazioni possono rimanere invariate. Fai clic su "Crea" in basso.Prendi nota del nome e della posizione dell'ambiente Composer, ti serviranno nei passaggi successivi.

Crea un bucket Cloud Storage

Nel tuo progetto, crea un bucket Cloud Storage con la seguente configurazione:

  • Nome: <your-project-id>
  • Classe di archiviazione predefinita: multi-regionale
  • Posizione: qualsiasi posizione geograficamente più vicina alla regione Cloud Composer che utilizzi
  • Modello di controllo dell'accesso: imposta le autorizzazioni a livello di oggetto e bucket

Premi "Crea" quando è tutto pronto. Assicurati di annotare il nome del bucket Cloud Storage per i passaggi successivi.

3. Configurazione di Google Cloud Functions (GCF)

Per configurare GCF, eseguiremo i comandi in Google Cloud Shell.

Sebbene Google Cloud possa essere gestito da remoto dal tuo laptop utilizzando lo strumento a riga di comando gcloud, in questo codelab utilizzeremo Google Cloud Shell, un ambiente a riga di comando in esecuzione nel cloud.

Questa macchina virtuale basata su Debian viene caricata con tutti gli strumenti di sviluppo di cui avrai bisogno. Offre una home directory permanente da 5 GB e viene eseguita su Google Cloud, migliorando notevolmente le prestazioni e l'autenticazione della rete. Ciò significa che per questo codelab ti servirà solo un browser (sì, funziona su Chromebook).

Per attivare Google Cloud Shell, fai clic sul pulsante in alto a destra nella console per gli sviluppatori (bastano pochi istanti per eseguire il provisioning e connettersi all'ambiente):

Concedi le autorizzazioni di firma dei blob al service account Cloud Functions

Affinché GCF possa autenticarsi in Cloud IAP, il proxy che protegge il web server Airflow, devi concedere al service account GCF Appspot il ruolo Service Account Token Creator. Per farlo, esegui il comando seguente in Cloud Shell, sostituendo il nome del tuo progetto a <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Ad esempio, se il tuo progetto si chiama my-project, il comando sarà

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Recuperare l'ID client

Per creare un token per l'autenticazione a Cloud IAP, la funzione richiede l'ID client del proxy che protegge il web server Airflow. L'API Cloud Composer non fornisce direttamente queste informazioni. In alternativa, invia una richiesta non autenticata al web server Airflow e acquisisci l'ID client dall'URL di reindirizzamento. Lo faremo eseguendo un file Python utilizzando Cloud Shell per acquisire l'ID client.

Scarica il codice necessario da GitHub eseguendo questo comando in Cloud Shell:

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Se hai ricevuto un errore perché questa directory esiste già, aggiornala all'ultima versione eseguendo il seguente comando

cd python-docs-samples/
git pull origin master

Passa alla directory appropriata eseguendo

cd python-docs-samples/composer/rest

Esegui il codice Python per ottenere l'ID client, sostituendo il nome del tuo progetto con <your-project-id>, la posizione dell'ambiente Composer che hai creato in precedenza con <your-composer-location> e il nome dell'ambiente Composer che hai creato in precedenza con <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Ad esempio, se il nome del progetto è my-project, la posizione di Composer è us-central1 e il nome dell'ambiente è my-composer, il comando sarà

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py esegue le seguenti operazioni:

  • Esegue l'autenticazione con Google Cloud
  • Esegue una richiesta HTTP non autenticata al web server Airflow per ottenere l'URI di reindirizzamento
  • Estrae il parametro di query client_id da questo reindirizzamento
  • Lo stampa per consentirti di utilizzarlo

L'ID client verrà stampato sulla riga di comando e avrà un aspetto simile a questo:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Crea la funzione

In Cloud Shell, clona il repository con il codice campione necessario eseguendo

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Passa alla directory necessaria e lascia aperto Cloud Shell mentre completi i passaggi successivi.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Vai alla pagina Google Cloud Functions facendo clic sul menu di navigazione e poi su "Cloud Functions".

Fai clic su "CREA FUNZIONE" nella parte superiore della pagina.

Assegna alla funzione il nome "my-function" e lascia la memoria al valore predefinito di 256 MB.

Imposta il trigger su "Cloud Storage", lascia il tipo di evento su "Finalizza/Crea" e vai al bucket che hai creato nel passaggio Crea un bucket Cloud Storage.

Lascia il codice sorgente impostato su "Editor incorporato" e imposta il runtime su "Node.js 8".

In Cloud Shell, esegui questo comando. Verranno aperti index.js e package.json nell'editor di Cloud Shell.

cloudshell edit index.js package.json

Fai clic sulla scheda package.json, copia il codice e incollalo nella sezione package.json dell'editor in linea di Cloud Functions.

Imposta "Funzione da eseguire" su triggerDag

Fai clic sulla scheda index.js, copia il codice e incollalo nella sezione index.js dell'editor incorporato di Cloud Functions.

Modifica PROJECT_ID con il tuo ID progetto e CLIENT_ID con l'ID client che hai salvato nel passaggio Ottenere l'ID client. NON fare ancora clic su "Crea", perché devi compilare ancora alcune informazioni.

In Cloud Shell, esegui il comando seguente, sostituendo <your-environment-name> con il nome del tuo ambiente Composer e <your-composer-region> con la regione in cui si trova il tuo ambiente Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Ad esempio, se il tuo ambiente si chiama my-composer-environment e si trova in us-central1, il comando sarà

gcloud composer environments describe my-composer-environment --location us-central1

L'output dovrebbe essere simile al seguente:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

In questo output, cerca la variabile denominata airflowUri. Nel codice index.js, modifica WEBSERVER_ID in modo che corrisponda all'ID del web server Airflow. Si tratta della parte della variabile airflowUri che termina con "-tp", ad esempio abc123efghi456k-tp.

Fai clic sul link del menu a discesa "Altro", quindi scegli la regione geograficamente più vicina a te.

Seleziona "Riprova in caso di errore"

Fai clic su "Crea" per creare la funzione Cloud Functions.

Esecuzione passo passo del codice

Il codice copiato da index.js avrà un aspetto simile al seguente:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Vediamo cosa sta succedendo. Qui sono presenti tre funzioni: triggerDag, authorizeIap e makeIapPostRequest

triggerDag è la funzione che viene attivata quando carichiamo qualcosa nel bucket Cloud Storage designato. È qui che configuriamo le variabili importanti utilizzate nelle altre richieste, come PROJECT_ID, CLIENT_ID, WEBSERVER_ID e DAG_NAME. Chiama authorizeIap e makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap invia una richiesta al proxy che protegge il web server Airflow, utilizzando un service account e "scambiando" un JWT con un token ID che verrà utilizzato per autenticare makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest effettua una chiamata al server web Airflow per attivare composer_sample_trigger_response_dag.. Il nome del DAG è incorporato nell'URL del server web Airflow passato con il parametro url e idToken è il token ottenuto nella richiesta authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Configurare il DAG

In Cloud Shell, passa alla directory con i flussi di lavoro di esempio. Fa parte di python-docs-samples che hai scaricato da GitHub nel passaggio Recupero dell'ID client.

cd
cd python-docs-samples/composer/workflows

Carica il DAG in Composer

Carica il DAG di esempio nel bucket di archiviazione DAG del tuo ambiente Composer con il seguente comando, dove <environment_name> è il nome del tuo ambiente Composer e <location> è il nome della regione in cui si trova. trigger_response_dag.py è il DAG con cui lavoreremo.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Ad esempio, se l'ambiente Composer si chiamava my-composer e si trovava in us-central1, il comando sarebbe

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Scorrere il DAG

Il codice DAG in trigger_response.py ha il seguente aspetto

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

La sezione default_args contiene gli argomenti predefiniti richiesti dal modello BaseOperator in Apache Airflow. Vedrai questa sezione con questi parametri in qualsiasi DAG di Apache Airflow. Il owner è attualmente impostato su Composer Example, ma se vuoi puoi modificarlo e impostare il tuo nome. depends_on_past ci mostra che questo DAG non dipende da DAG precedenti. Le tre sezioni email, email, email_on_failure e email_on_retry, sono impostate in modo che non vengano inviate notifiche email in base allo stato di questo DAG. Il DAG verrà ritentato una sola volta, poiché retries è impostato su 1, e ciò avverrà dopo cinque minuti, in base a retry_delay. L'start_date normalmente determina quando deve essere eseguito un DAG, insieme al relativo schedule_interval (impostato in un secondo momento), ma nel caso di questo DAG non è pertinente. È impostata sul 1° gennaio 2017, ma potrebbe essere impostata su qualsiasi data passata.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

La sezione with airflow.DAG configura il DAG che verrà eseguito. Verrà eseguito con l'ID attività composer_sample_trigger_response_dag, gli argomenti predefiniti della sezione default_args e, soprattutto, con un schedule_interval di None. schedule_interval è impostato su None perché stiamo attivando questo DAG specifico con la nostra funzione Cloud Functions. Per questo motivo, il start_date in default_args non è pertinente.

Quando viene eseguito, il DAG stampa la sua configurazione, come indicato nella variabile print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Testare la funzione

Apri l'ambiente Composer e nella riga con il nome dell'ambiente, fai clic sul link Airflow.

Apri composer_sample_trigger_response_dag facendo clic sul nome. Al momento non sono presenti prove di esecuzioni DAG, perché non abbiamo ancora attivato l'esecuzione del DAG.Se questo DAG non è visibile o non è possibile farci clic, attendi un minuto e aggiorna la pagina.

Apri una scheda separata e carica un file nel bucket Cloud Storage che hai creato in precedenza e specificato come trigger per la tua funzione Cloud Functions. Puoi farlo tramite la console o utilizzando un comando gsutil.

Torna alla scheda con la UI di Airflow e fai clic su Visualizzazione grafico.

Fai clic sull'attività print_gcs_info, che dovrebbe essere evidenziata in verde.

Fai clic su "Visualizza log" in alto a destra nel menu.

Nei log, vedrai informazioni sul file che hai caricato nel bucket Cloud Storage.

Complimenti! Hai appena attivato un DAG Airflow utilizzando Node.js e Google Cloud Functions.

7. Esegui la pulizia

Per evitare che al tuo account GCP vengano addebitate le risorse utilizzate in questa guida rapida:

  1. (Facoltativo) Per salvare i dati, scaricali dal bucket Cloud Storage per l'ambiente Cloud Composer e dal bucket di archiviazione che hai creato per questa guida rapida.
  2. Elimina il bucket Cloud Storage per l'ambiente che hai creato
  3. Elimina l'ambiente Cloud Composer. Tieni presente che l'eliminazione dell'ambiente non comporta l'eliminazione del bucket di archiviazione per l'ambiente.
  4. (Facoltativo) Con il computing serverless, i primi 2 milioni di chiamate al mese sono senza costi e, quando riduci la scalabilità della funzione a zero, non ti vengono addebitati costi (per maggiori dettagli, consulta i prezzi). Tuttavia, se vuoi eliminare la tua funzione Cloud Functions, fai clic su "ELIMINA" in alto a destra nella pagina di riepilogo della funzione.

4fe11e1b41b32ba2.png

Puoi anche eliminare facoltativamente il progetto:

  1. Nella console di GCP, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona quello da eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.