Déclencher un DAG avec Node.JS et Google Cloud Functions

1. Introduction

Apache Airflow est conçu pour exécuter des DAG de façon régulière, mais vous pouvez également déclencher des DAG en réponse à des événements, tels qu'une modification apportée à un bucket Cloud Storage ou un message envoyé à Cloud Pub/Sub. Pour ce faire, les DAG Cloud Composer peuvent être déclenchés par Cloud Functions.

L'exemple de cet atelier exécute un DAG simple à chaque modification d'un bucket Cloud Storage. Ce DAG utilise BashOperator pour exécuter une commande Bash qui affiche les informations sur les modifications apportées au contenu du bucket Cloud Storage.

Avant de commencer cet atelier, nous vous recommandons de suivre les ateliers de programmation Présentation de Cloud Composer et Premiers pas avec Cloud Functions. Si vous avez créé un environnement Composer dans l'atelier de programmation "Présentation de Cloud Composer", vous pouvez l'utiliser dans cet atelier.

Ce que vous allez faire

Au cours de cet atelier de programmation, vous apprendrez à :

  1. Importez un fichier dans Google Cloud Storage, qui
  2. Déclencher une fonction Google Cloud à l'aide de l'environnement d'exécution Node.js
  3. Cette fonction exécute un DAG dans Google Cloud Composer.
  4. Cela exécute une simple commande bash qui affiche la modification apportée au bucket Google Cloud Storage.

1d3d3736624a923f.png

Ce que vous allez apprendre

  • Déclencher un DAG Apache Airflow à l'aide de Google Cloud Functions et Node.js

Ce dont vous aurez besoin

  • Compte GCP
  • Connaissances de base sur JavaScript
  • Connaissances de base de Cloud Composer/Airflow et de Cloud Functions
  • Aisance avec les commandes CLI

2. Configurer GCP

Sélectionner ou créer le projet

Sélectionnez ou créez un projet Google Cloud Platform. Si vous créez un projet, suivez les étapes décrites ici.

Notez l'ID de votre projet, car vous en aurez besoin lors des prochaines étapes.

Si vous créez un projet, l'ID du projet se trouve juste en dessous du nom du projet sur la page de création.

Si vous avez déjà créé un projet, vous trouverez son ID sur la page d'accueil de la console, dans la fiche "Informations sur le projet".

Activer les API

Activez les API Cloud Composer, Google Cloud Functions, Cloud Identity et Google Identity and Access Management (IAM).

Créer un environnement Composer

Créez un environnement Cloud Composer avec la configuration suivante :

  • Name (Nom) : my-composer-environment
  • Emplacement : celui qui est géographiquement le plus proche de vous
  • Zone : n'importe quelle zone de cette région

Vous pouvez conserver les autres configurations par défaut. Cliquez sur "Créer" en bas de l'écran.Notez le nom et l'emplacement de votre environnement Composer, car vous en aurez besoin lors des prochaines étapes.

Créer un bucket Cloud Storage

Dans votre projet, créez un bucket Cloud Storage avec la configuration suivante :

  • Nom : <your-project-id>
  • Classe de stockage par défaut : multirégionale
  • Emplacement : l'emplacement géographiquement le plus proche de la région Cloud Composer que vous utilisez
  • Modèle de contrôle des accès : définir des autorisations au niveau de l'objet et du bucket

Appuyez sur "Créer" lorsque vous êtes prêt. Veillez à noter le nom de votre bucket Cloud Storage pour les étapes suivantes.

3. Configurer Google Cloud Functions (GCF)

Pour configurer GCF, nous allons exécuter des commandes dans Google Cloud Shell.

Bien que Google Cloud puisse être utilisé à distance depuis votre ordinateur portable à l'aide de l'outil de ligne de commande gcloud, nous allons nous servir de Google Cloud Shell pour cet atelier de programmation, un environnement de ligne de commande exécuté dans le cloud.

Cette machine virtuelle basée sur Debian contient tous les outils de développement dont vous aurez besoin. Elle intègre un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud, ce qui améliore nettement les performances du réseau et l'authentification. Cela signifie que tout ce dont vous avez besoin pour cet atelier de programmation est un navigateur (oui, tout fonctionne sur un Chromebook).

Pour activer Google Cloud Shell, cliquez sur le bouton en haut à droite de la console de développement (le provisionnement de l'environnement et la connexion ne devraient prendre que quelques minutes) :

Accorder des autorisations de signature d'objet blob au compte de service Cloud Functions

Pour que GCF puisse s'authentifier auprès de Cloud IAP, le proxy qui protège le serveur Web Airflow, vous devez attribuer le rôle Service Account Token Creator au compte de service Appspot GCF. Pour ce faire, exécutez la commande suivante dans Cloud Shell, en remplaçant <your-project-id> par le nom de votre projet.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Par exemple, si votre projet s'appelle my-project, votre commande sera

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Obtenir l'ID client

Pour créer un jeton permettant de s'authentifier auprès de Cloud IAP, la fonction requiert l'ID client du proxy qui protège le serveur Web Airflow. L'API Cloud Composer ne fournit pas ces informations directement. À la place, envoyez une requête non authentifiée au serveur Web Airflow et récupérez l'ID client à partir de l'URL de redirection. Pour ce faire, nous allons exécuter un fichier Python à l'aide de Cloud Shell afin de capturer l'ID client.

Téléchargez le code nécessaire depuis GitHub en exécutant la commande suivante dans Cloud Shell.

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Si vous avez reçu une erreur indiquant que ce répertoire existe déjà, mettez-le à jour vers la dernière version en exécutant la commande suivante :

cd python-docs-samples/
git pull origin master

Accédez au répertoire approprié en exécutant

cd python-docs-samples/composer/rest

Exécutez le code Python pour obtenir votre ID client, en remplaçant le nom de votre projet par <your-project-id>, l'emplacement de l'environnement Composer que vous avez créé précédemment par <your-composer-location> et le nom de l'environnement Composer que vous avez créé précédemment par <your-composer-environment>.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Par exemple, si le nom de votre projet est my-project, votre emplacement Composer est us-central1 et le nom de votre environnement est my-composer, votre commande sera

python3 get_client_id.py my-project us-central1 my-composer

La fonction get_client_id.py effectue les opérations suivantes :

  • S'authentifie auprès de Google Cloud
  • Envoie une requête HTTP non authentifiée au serveur Web Airflow pour obtenir l'URI de redirection
  • Extrait le paramètre de requête client_id de cette redirection.
  • l'imprime pour que vous puissiez l'utiliser.

Votre ID client sera affiché sur la ligne de commande et ressemblera à ceci :

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Créer votre fonction

Dans Cloud Shell, clonez le dépôt contenant l'exemple de code nécessaire en exécutant la commande suivante :

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Accédez au répertoire nécessaire et laissez Cloud Shell ouvert pendant que vous effectuez les prochaines étapes.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Accédez à la page Google Cloud Functions en cliquant sur le menu de navigation, puis sur "Cloud Functions".

Cliquez sur "CRÉER UNE FONCTION" en haut de la page.

Nommez votre fonction "my-function" et laissez la mémoire à la valeur par défaut de 256 Mo.

Définissez le déclencheur sur "Cloud Storage", laissez le type d'événement sur "Finaliser/Créer", puis accédez au bucket que vous avez créé à l'étape "Créer un bucket Cloud Storage".

Laissez le code source défini sur "Éditeur intégré" et définissez l'environnement d'exécution sur "Node.js 8".

Dans Cloud Shell, exécutez la commande suivante. Les fichiers index.js et package.json s'ouvrent dans l'éditeur Cloud Shell.

cloudshell edit index.js package.json

Cliquez sur l'onglet "package.json", copiez ce code et collez-le dans la section "package.json" de l'éditeur intégré Cloud Functions.

Définissez "Fonction à exécuter" sur triggerDag.

Cliquez sur l'onglet index.js, copiez le code et collez-le dans la section index.js de l'éditeur intégré Cloud Functions.

Remplacez PROJECT_ID par l'ID de votre projet et CLIENT_ID par l'ID client que vous avez enregistré à l'étape "Obtenir l'ID client". Ne cliquez pas encore sur "Créer", car il reste encore quelques informations à saisir.

Dans Cloud Shell, exécutez la commande suivante en remplaçant <your-environment-name> par le nom de votre environnement Composer et <your-composer-region> par la région dans laquelle se trouve votre environnement Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Par exemple, si votre environnement s'appelle my-composer-environment et se trouve dans us-central1, votre commande sera

gcloud composer environments describe my-composer-environment --location us-central1

Le résultat devrait ressembler à ceci :

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Dans ce résultat, recherchez la variable appelée airflowUri. Dans votre code index.js, remplacez WEBSERVER_ID par l'ID du serveur Web Airflow. Il s'agit de la partie de la variable airflowUri qui se termine par "-tp", par exemple abc123efghi456k-tp.

Cliquez sur le lien du menu déroulant "Plus", puis sélectionnez la région géographiquement la plus proche de vous.

Cochez "Réessayer après échec".

Cliquez sur "Créer" pour créer votre fonction Cloud.

Examiner le code

Le code que vous avez copié à partir d'index.js doit ressembler à ceci :

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Voyons ce qui se passe. Il existe trois fonctions : triggerDag, authorizeIap et makeIapPostRequest.

triggerDag est la fonction qui se déclenche lorsque nous importons un élément dans le bucket Cloud Storage désigné. C'est là que nous configurons les variables importantes utilisées dans les autres requêtes, comme PROJECT_ID, CLIENT_ID, WEBSERVER_ID et DAG_NAME. Il appelle authorizeIap et makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap envoie une requête au proxy qui protège le serveur Web Airflow, à l'aide d'un compte de service et en "échangeant" un jeton JWT contre un jeton d'identité qui sera utilisé pour authentifier makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest appelle le serveur Web Airflow pour déclencher le composer_sample_trigger_response_dag.. Le nom du DAG est intégré à l'URL du serveur Web Airflow transmise avec le paramètre url, et idToken correspond au jeton que nous avons obtenu dans la requête authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Configurer votre DAG

Dans Cloud Shell, accédez au répertoire contenant les exemples de workflows. Il fait partie des exemples de documentation Python que vous avez téléchargés depuis GitHub lors de l'étape "Obtenir l'ID client".

cd
cd python-docs-samples/composer/workflows

Importer le DAG dans Composer

Importez l'exemple de DAG dans le bucket de stockage des DAG de votre environnement Composer à l'aide de la commande suivante, où <environment_name> est le nom de votre environnement Composer et <location> est le nom de la région dans laquelle il se trouve. trigger_response_dag.py est le DAG avec lequel nous allons travailler.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Par exemple, si votre environnement Composer s'appelle my-composer et se trouve dans us-central1, votre commande sera

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Parcourir le DAG

Le code DAG dans trigger_response.py se présente comme suit :

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

La section default_args contient les arguments par défaut requis par le modèle BaseOperator dans Apache Airflow. Vous verrez cette section avec ces paramètres dans n'importe quel DAG Apache Airflow. Le owner est actuellement défini sur Composer Example, mais vous pouvez le remplacer par votre nom si vous le souhaitez. depends_on_past nous montre que ce DAG ne dépend d'aucun DAG précédent. Les trois sections d'e-mails (email, email_on_failure et email_on_retry) sont configurées de sorte qu'aucune notification par e-mail ne soit envoyée en fonction de l'état de ce DAG. Le DAG n'effectuera qu'une seule nouvelle tentative, car retries est défini sur 1, et ce, au bout de cinq minutes, conformément à retry_delay. Le start_date dicte normalement le moment où un DAG doit s'exécuter, en même temps que son schedule_interval (défini ultérieurement), mais dans le cas de ce DAG, il n'est pas pertinent. Elle est définie sur le 1er janvier 2017, mais peut être définie sur n'importe quelle date antérieure.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

La section with airflow.DAG configure le DAG qui sera exécuté. Il sera exécuté avec l'ID de tâche composer_sample_trigger_response_dag, les arguments par défaut de la section default_args et, surtout, avec un schedule_interval de None. La valeur de schedule_interval est définie sur None, car nous déclenchons ce DAG spécifique avec notre fonction Cloud. C'est pourquoi le start_date dans default_args n'est pas pertinent.

Lors de son exécution, le DAG affiche sa configuration, comme indiqué dans la variable print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Tester votre fonction

Ouvrez votre environnement Composer et, sur la ligne portant le nom de votre environnement, cliquez sur le lien Airflow.

Ouvrez le composer_sample_trigger_response_dag en cliquant sur son nom. Pour le moment, aucune exécution DAG ne sera visible, car nous n'avons pas encore déclenché l'exécution du DAG.Si ce DAG n'est pas visible ou cliquable, patientez une minute et actualisez la page.

Ouvrez un autre onglet et importez un fichier dans le bucket Cloud Storage que vous avez créé précédemment et spécifié comme déclencheur pour votre fonction Cloud. Pour ce faire, utilisez la console ou une commande gsutil.

Revenez à l'onglet de votre interface utilisateur Airflow et cliquez sur "Graph View" (Vue graphique).

Cliquez sur la tâche print_gcs_info, qui devrait être entourée en vert.

Cliquez sur "Afficher le journal" en haut à droite du menu.

Dans les journaux, vous trouverez des informations sur le fichier que vous avez importé dans votre bucket Cloud Storage.

Félicitations ! Vous venez de déclencher un DAG Airflow à l'aide de Node.js et de Google Cloud Functions.

7. Nettoyage

Afin d'éviter que des frais ne soient facturés sur votre compte GCP pour les ressources utilisées dans ce démarrage rapide, procédez comme suit :

  1. (Facultatif) Pour enregistrer vos données, téléchargez-les depuis le bucket Cloud Storage de l'environnement Cloud Composer et le bucket de stockage que vous avez créé pour ce démarrage rapide.
  2. Supprimez le bucket Cloud Storage pour l'environnement que vous avez créé.
  3. Supprimez l'environnement Cloud Composer. Notez que la suppression de votre environnement n'entraîne pas celle de son bucket de stockage.
  4. (Facultatif) Avec le calcul sans serveur, les deux premiers millions d'invocations par mois sont sans frais. De plus, lorsque vous mettez votre fonction à zéro, vous n'êtes pas facturé (pour en savoir plus, consultez les tarifs). Toutefois, si vous souhaitez supprimer votre fonction Cloud, cliquez sur "SUPPRIMER" en haut à droite de la page de présentation de votre fonction.

4fe11e1b41b32ba2.png

Vous pouvez également supprimer le projet (facultatif) :

  1. Dans la console GCP, accédez à la page Projets.
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.