1. Einführung
Apache Airflow ist für das regelmäßige Ausführen von DAGs nach Plan konzipiert. Sie können DAGs aber auch als Reaktion auf Ereignisse auslösen lassen, z. B. bei einer Änderung in einem Cloud Storage-Bucket oder bei einer per Push an Cloud Pub/Sub übertragenen Nachricht. Zu diesem Zweck können Cloud Composer-DAGs über Cloud Functions ausgelöst werden.
Im vorliegenden Beispiel wird bei jeder Änderung an einem Cloud Storage-Bucket ein einfacher DAG ausgeführt. In diesem DAG wird mit dem BashOperator ein Bash-Befehl ausgeführt, der die Änderungsdetails zu den Inhalten ausgibt, die in den Cloud Storage-Bucket hochgeladen wurden.
Bevor Sie mit diesem Lab beginnen, sollten Sie die Codelabs Einführung in Cloud Composer und Erste Schritte mit Cloud Functions durcharbeiten. Wenn Sie im Codelab „Einführung in Cloud Composer“ eine Composer-Umgebung erstellt haben, können Sie diese Umgebung in diesem Lab verwenden.
Umfang
In diesem Codelab werden Sie:
- Laden Sie eine Datei in Google Cloud Storage hoch.
- Google Cloud Functions-Funktion mit der Node.JS-Laufzeit auslösen
- Mit dieser Funktion wird ein DAG in Google Cloud Composer ausgeführt.
- Dadurch wird ein einfacher Bash-Befehl ausgeführt, der die Änderung am Google Cloud Storage-Bucket ausgibt.

Lerninhalte
- Apache Airflow-DAG mit Google Cloud Functions und Node.js auslösen
Voraussetzungen
- GCP-Konto
- Grundlegendes Verständnis von JavaScript
- Grundkenntnisse von Cloud Composer/Airflow und Cloud Functions
- Vertrautheit mit CLI-Befehlen
2. GCP einrichten
Projekt auswählen oder erstellen
Wählen Sie ein Google Cloud Platform-Projekt aus oder erstellen Sie eines. Wenn Sie ein neues Projekt erstellen, folgen Sie der Anleitung.
Notieren Sie sich Ihre Projekt-ID, da Sie sie in späteren Schritten benötigen.
Wenn Sie ein neues Projekt erstellen, finden Sie die Projekt-ID direkt unter dem Projektnamen auf der Erstellungsseite. |
|
Wenn Sie bereits ein Projekt erstellt haben, finden Sie die ID auf der Console-Startseite auf der Karte „Projektinformationen“. |
|
APIs aktivieren
|
Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer-Umgebung mit der folgenden Konfiguration:
Alle anderen Konfigurationen können bei den Standardeinstellungen belassen werden. Klicken Sie unten auf „Erstellen“. Notieren Sie sich den Namen und den Standort Ihrer Composer-Umgebung, da Sie sie in späteren Schritten benötigen. |
|
Cloud Storage-Bucket erstellen
Erstellen Sie in Ihrem Projekt einen Cloud Storage-Bucket mit der folgenden Konfiguration:
Klicken Sie auf „Erstellen“, wenn Sie bereit sind. Notieren Sie sich den Namen Ihres Cloud Storage-Buckets für spätere Schritte. |
|
3. Google Cloud Functions (GCF) einrichten
Zum Einrichten von GCF führen wir Befehle in Google Cloud Shell aus.
Während Sie Google Cloud von Ihrem Laptop aus per Fernzugriff mit dem gcloud-Befehlszeilentool nutzen können, verwenden wir in diesem Codelab Google Cloud Shell, eine Befehlszeilenumgebung, die in der Cloud ausgeführt wird.
Diese Debian-basierte virtuelle Maschine verfügt über alle Entwicklungstools, die Sie benötigen. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft auf der Google Cloud, wodurch Netzwerkleistung und Authentifizierung deutlich verbessert werden. Für dieses Codelab benötigen Sie also nur einen Browser (es funktioniert auch auf einem Chromebook).
Klicken Sie in der Entwicklerkonsole rechts oben auf die Schaltfläche, um Google Cloud Shell zu aktivieren. Die Bereitstellung und Verbindung mit der Umgebung sollte nur wenige Augenblicke dauern: |
|
Dem Cloud Functions-Dienstkonto Berechtigungen zur Blobsignatur gewähren
Damit GCF sich bei Cloud IAP, dem Proxy, der den Airflow-Webserver schützt, authentifizieren kann, müssen Sie dem Appspot-Dienstkonto von GCF die Rolle Service Account Token Creator zuweisen. Führen Sie dazu den folgenden Befehl in Cloud Shell aus und ersetzen Sie <your-project-id> durch den Namen Ihres Projekts.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Wenn Ihr Projekt beispielsweise my-project heißt, lautet der Befehl
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Client-ID abrufen
Zum Erstellen eines Tokens für die Authentifizierung bei Cloud IAP benötigt die Funktion die Client-ID des Proxys, der den Airflow-Webserver schützt. Die Cloud Composer API stellt diese Informationen nicht direkt zur Verfügung. Stellen Sie stattdessen eine nicht authentifizierte Anfrage an den Airflow-Webserver und erfassen Sie die Client-ID über die Weiterleitungs-URL. Dazu führen wir eine Python-Datei mit Cloud Shell aus, um die Client-ID zu erfassen.
Laden Sie den erforderlichen Code von GitHub herunter, indem Sie den folgenden Befehl in Ihrer Cloud Shell ausführen:
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Wenn Sie eine Fehlermeldung erhalten, weil dieses Verzeichnis bereits vorhanden ist, aktualisieren Sie es mit dem folgenden Befehl auf die neueste Version.
cd python-docs-samples/ git pull origin master
Wechseln Sie in das entsprechende Verzeichnis, indem Sie
cd python-docs-samples/composer/rest
Führen Sie den Python-Code aus, um Ihre Client-ID abzurufen. Ersetzen Sie dabei <your-project-id> durch den Namen Ihres Projekts, <your-composer-location> durch den Speicherort der zuvor erstellten Composer-Umgebung und <your-composer-environment> durch den Namen der zuvor erstellten Composer-Umgebung.
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Wenn Ihr Projektname beispielsweise my-project, Ihr Composer-Standort us-central1 und Ihr Umgebungsname my-composer ist, lautet der Befehl
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py führt Folgendes aus:
- Authentifizierung bei Google Cloud
- Stellt eine nicht authentifizierte HTTP-Anfrage an den Airflow-Webserver, um den Weiterleitungs-URI abzurufen
- Extrahiert den
client_id-Abfrageparameter aus dieser Weiterleitung. - Es wird für Sie ausgedruckt.
Ihre Client-ID wird in der Befehlszeile ausgegeben und sieht in etwa so aus:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Funktion erstellen
Klonen Sie in Cloud Shell das Repository mit dem erforderlichen Beispielcode, indem Sie
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Wechseln Sie in das erforderliche Verzeichnis und lassen Sie Cloud Shell für die nächsten Schritte geöffnet.
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Rufen Sie die Google Cloud Functions-Seite auf, indem Sie auf das Navigationsmenü und dann auf „Cloud Functions“ klicken. |
|
Klicken Sie oben auf der Seite auf „FUNKTION ERSTELLEN“. |
|
Geben Sie Ihrer Funktion den Namen „my-function“ und belassen Sie den Speicher bei der Standardeinstellung von 256 MB. |
|
Legen Sie den Trigger auf „Cloud Storage“ fest, lassen Sie den Ereignistyp auf „Finalize/Create“ (Abschließen/Erstellen) und suchen Sie nach dem Bucket, den Sie im Schritt „Cloud Storage-Bucket erstellen“ erstellt haben. |
|
Lassen Sie den Quellcode auf „Inline-Editor“ eingestellt und legen Sie die Laufzeit auf „Node.js 8“ fest. |
|
Führen Sie in Cloud Shell den folgenden Befehl aus. Dadurch werden die Dateien „index.js“ und „package.json“ im Cloud Shell-Editor geöffnet.
cloudshell edit index.js package.json
Klicken Sie auf den Tab „package.json“, kopieren Sie den Code und fügen Sie ihn in den Abschnitt „package.json“ des Inline-Editors von Cloud Functions ein. |
|
Legen Sie „Auszuführende Funktion“ auf „triggerDag“ fest. |
|
Klicken Sie auf den Tab „index.js“, kopieren Sie den Code und fügen Sie ihn in den Abschnitt „index.js“ des Inline-Editors für Cloud Functions ein. |
|
Ersetzen Sie |
|
Führen Sie in Ihrer Cloud Shell den folgenden Befehl aus. Ersetzen Sie dabei <your-environment-name> durch den Namen Ihrer Composer-Umgebung und <your-composer-region> durch die Region, in der sich Ihre Composer-Umgebung befindet.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Wenn Ihre Umgebung beispielsweise den Namen my-composer-environment hat und sich in us-central1 befindet, lautet der Befehl
gcloud composer environments describe my-composer-environment --location us-central1
Die Ausgabe sollte in etwa so aussehen:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
Suchen Sie in dieser Ausgabe nach der Variablen |
|
Klicken Sie auf den Drop-down-Link „Mehr“ und wählen Sie dann die Region aus, die Ihnen geografisch am nächsten liegt. |
|
„Bei Fehler noch einmal versuchen“ aktivieren |
|
Klicken Sie auf „Erstellen“, um die Cloud Functions-Funktion zu erstellen. |
|
Code schrittweise durchgehen
Der Code, den Sie aus index.js kopiert haben, sieht in etwa so aus:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Sehen wir uns an, was passiert. Es gibt drei Funktionen: triggerDag, authorizeIap und makeIapPostRequest.
triggerDag ist die Funktion, die ausgelöst wird, wenn wir etwas in den dafür vorgesehenen Cloud Storage-Bucket hochladen. Hier konfigurieren wir wichtige Variablen, die in den anderen Anfragen verwendet werden, z. B. PROJECT_ID, CLIENT_ID, WEBSERVER_ID und DAG_NAME. Dabei werden authorizeIap und makeIapPostRequest aufgerufen.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap sendet eine Anfrage an den Proxy, der den Airflow-Webserver schützt. Dabei wird ein Dienstkonto verwendet und ein JWT gegen ein ID-Token „eingetauscht“, mit dem die makeIapPostRequest authentifiziert wird.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest ruft den Airflow-Webserver auf, um den composer_sample_trigger_response_dag. auszulösen. Der DAG-Name ist in die Airflow-Webserver-URL eingebettet, die mit dem Parameter url übergeben wird. idToken ist das Token, das wir in der authorizeIap-Anfrage erhalten haben.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. DAG einrichten
Wechseln Sie in Cloud Shell in das Verzeichnis mit den Beispielworkflows. Sie ist Teil der python-docs-samples, die Sie im Schritt „Client-ID abrufen“ von GitHub heruntergeladen haben.
cd cd python-docs-samples/composer/workflows
DAG in Composer hochladen
Laden Sie den Beispiel-DAG mit dem folgenden Befehl in den Cloud Storage-Bucket für DAGs Ihrer Composer-Umgebung hoch. Dabei ist <environment_name> der Name Ihrer Composer-Umgebung und <location> der Name der Region, in der sich die Umgebung befindet. trigger_response_dag.py ist der DAG, mit dem wir arbeiten werden.
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
Wenn Ihre Composer-Umgebung beispielsweise den Namen my-composer hat und sich in us-central1 befindet, lautet der Befehl
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
DAG durchlaufen
Der DAG-Code in trigger_response.py sieht so aus:
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
Der Abschnitt default_args enthält die Standardargumente, die für das BaseOperator-Modell in Apache Airflow erforderlich sind. Dieser Abschnitt mit diesen Parametern ist in jedem Apache Airflow-DAG enthalten. Derzeit ist owner auf Composer Example festgelegt. Sie können das aber bei Bedarf ändern. depends_on_past zeigt, dass diese DAG nicht von vorherigen DAGs abhängig ist. Die drei E‑Mail-Abschnitte email, email_on_failure und email_on_retry sind so eingestellt, dass keine E‑Mail-Benachrichtigungen basierend auf dem Status dieses DAG eingehen. Die DAG wird nur einmal wiederholt, da retries auf 1 gesetzt ist. Die Wiederholung erfolgt nach fünf Minuten, wie in retry_delay festgelegt. Das start_date gibt normalerweise in Verbindung mit dem schedule_interval (wird später festgelegt) an, wann ein DAG ausgeführt werden soll. Im Fall dieses DAG ist es jedoch nicht relevant. Das Datum ist auf den 1. Januar 2017 festgelegt, könnte aber auch ein anderes Datum in der Vergangenheit sein.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
Im Abschnitt with airflow.DAG wird der DAG konfiguriert, der ausgeführt wird. Er wird mit der Aufgaben-ID composer_sample_trigger_response_dag, den Standardargumenten aus dem Abschnitt default_args und vor allem mit einem schedule_interval von None ausgeführt. schedule_interval ist auf None festgelegt, da wir diesen bestimmten DAG mit unserer Cloud-Funktion auslösen. Daher ist die start_date in default_args nicht relevant.
Bei der Ausführung gibt der DAG seine Konfiguration gemäß der Variablen print_gcs_info aus.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Funktion testen
Öffnen Sie Ihre Composer-Umgebung und klicken Sie in der Zeile mit dem Namen Ihrer Umgebung auf den Airflow-Link. |
|
Öffnen Sie |
|
Öffnen Sie einen separaten Tab und laden Sie eine beliebige Datei in den Cloud Storage-Bucket hoch, den Sie zuvor erstellt und als Trigger für Ihre Cloud-Funktion angegeben haben. Sie können dies über die Console oder mit einem gsutil-Befehl tun. |
|
Kehren Sie zum Tab mit der Airflow-Benutzeroberfläche zurück und klicken Sie auf „Graph View“ (Grafikansicht). |
|
Klicken Sie auf die Aufgabe |
|
Klicken Sie oben rechts im Menü auf „View Log“ (Protokoll ansehen). |
|
In den Logs finden Sie Informationen zur Datei, die Sie in Ihren Cloud Storage-Bucket hochgeladen haben. |
|
Glückwunsch! Sie haben gerade einen Airflow-DAG mit Node.js und Google Cloud Functions ausgelöst.
7. Bereinigen
So vermeiden Sie, dass Ihrem GCP-Konto die in dieser Kurzanleitung verwendeten Ressourcen in Rechnung gestellt werden:
- (Optional) Wenn Sie Ihre Daten speichern möchten, laden Sie die Daten aus dem Cloud Storage-Bucket für die Cloud Composer-Umgebung und dem Storage-Bucket herunter, den Sie für diese Kurzanleitung erstellt haben.
- Löschen Sie den Cloud Storage-Bucket für die Umgebung, die Sie erstellt haben.
- Löschen Sie die Cloud Composer-Umgebung. Hinweis: Wenn Sie die Umgebung löschen, bleibt der Storage-Bucket für die Umgebung erhalten.
- Optional: Bei Serverless Computing sind die ersten 2 Millionen Aufrufe pro Monat kostenlos. Wenn Sie Ihre Funktion auf null skalieren, fallen keine Gebühren an. Weitere Informationen finden Sie unter Preise. Wenn Sie Ihre Cloud-Funktion löschen möchten, klicken Sie oben rechts auf der Übersichtsseite für Ihre Funktion auf „LÖSCHEN“.

Optional können Sie das Projekt auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie im Feld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.


























