۱. مقدمه
آپاچی ایرفلو طوری طراحی شده است که DAGها را طبق یک برنامه منظم اجرا کند، اما شما میتوانید DAGها را در پاسخ به رویدادهایی مانند تغییر در یک باکت ذخیرهسازی ابری یا پیامی که به Cloud Pub/Sub ارسال میشود، نیز فعال کنید. برای انجام این کار، DAGهای Cloud Composer میتوانند توسط Cloud Functions فعال شوند.
مثال این آزمایش هر بار که تغییری در یک مخزن ذخیرهسازی ابری رخ میدهد، یک DAG ساده اجرا میکند. این DAG از BashOperator برای اجرای یک دستور bash استفاده میکند که اطلاعات تغییر مربوط به آنچه در مخزن ذخیرهسازی ابری آپلود شده است را چاپ میکند.
قبل از شروع این آزمایش، توصیه میشود که آزمایشگاههای کد «مقدمهای بر Cloud Composer» و «شروع به کار با توابع ابری» را تکمیل کنید. اگر در آزمایشگاه کد «مقدمهای بر Cloud Composer» یک محیط Composer ایجاد کنید، میتوانید از آن محیط در این آزمایش استفاده کنید.
آنچه خواهید ساخت
در این آزمایشگاه کد، شما:
- یک فایل را در فضای ذخیرهسازی ابری گوگل آپلود کنید، که ...
- فعال کردن یک تابع Google Cloud با استفاده از زمان اجرای Node.JS
- این تابع یک DAG را در Google Cloud Composer اجرا میکند.
- این یک دستور bash ساده را اجرا میکند که تغییر را در مخزن ذخیرهسازی ابری گوگل چاپ میکند.

آنچه یاد خواهید گرفت
- نحوهی راهاندازی DAG آپاچی ایرفلو با استفاده از توابع گوگل کلود + نود جی اس
آنچه نیاز دارید
- حساب GCP
- درک اولیه از جاوا اسکریپت
- دانش پایه در مورد Cloud Composer/Airflow و توابع ابری
- راحتی در استفاده از دستورات CLI
۲. راهاندازی GCP
انتخاب یا ایجاد پروژه
یک پروژه پلتفرم ابری گوگل (Google Cloud Platform Project) انتخاب یا ایجاد کنید. اگر در حال ایجاد یک پروژه جدید هستید، مراحل موجود در اینجا را دنبال کنید.
شناسه پروژه خود را یادداشت کنید، که در مراحل بعدی از آن استفاده خواهید کرد.
اگر در حال ایجاد یک پروژه جدید هستید، شناسه پروژه درست زیر نام پروژه در صفحه ایجاد پروژه قرار دارد. |
|
اگر قبلاً پروژهای ایجاد کردهاید، میتوانید شناسه آن را در صفحه اصلی کنسول در کارت اطلاعات پروژه پیدا کنید. |
|
فعال کردن APIها
|
ایجاد محیط کامپوزر
یک محیط Cloud Composer با پیکربندی زیر ایجاد کنید :
تمام تنظیمات دیگر میتوانند به صورت پیشفرض باقی بمانند. روی «ایجاد» در پایین کلیک کنید. نام و مکان محیط کامپوزر خود را یادداشت کنید - در مراحل بعدی به آنها نیاز خواهید داشت. |
|
ایجاد سطل ذخیرهسازی ابری
در پروژه خود، یک مخزن ذخیرهسازی ابری با پیکربندی زیر ایجاد کنید :
وقتی آماده بودید، روی «ایجاد» کلیک کنید. حتماً نام مخزن ذخیرهسازی ابری خود را برای مراحل بعدی یادداشت کنید. |
|
۳. راهاندازی توابع ابری گوگل (GCF)
برای راهاندازی GCF، دستورات را در Google Cloud Shell اجرا خواهیم کرد.
اگرچه میتوان گوگل کلود را از راه دور و با استفاده از ابزار خط فرمان gcloud از لپتاپ خود مدیریت کرد، اما در این آزمایشگاه کد، ما از Google Cloud Shell ، یک محیط خط فرمان که در فضای ابری اجرا میشود، استفاده خواهیم کرد.
این ماشین مجازی مبتنی بر دبیان، تمام ابزارهای توسعه مورد نیاز شما را در خود جای داده است. این ماشین مجازی یک دایرکتوری خانگی دائمی ۵ گیگابایتی ارائه میدهد و روی فضای ابری گوگل اجرا میشود که عملکرد شبکه و احراز هویت را تا حد زیادی بهبود میبخشد. این بدان معناست که تنها چیزی که برای این آزمایشگاه کد نیاز دارید یک مرورگر است (بله، روی کرومبوک هم کار میکند).
برای فعال کردن Google Cloud Shell، از کنسول توسعهدهندگان روی دکمهای که در سمت راست بالا قرار دارد کلیک کنید (فعالسازی و اتصال به محیط فقط چند لحظه طول میکشد): |
|
مجوزهای امضای blob را به حساب سرویس توابع ابری اعطا کنید
برای اینکه GCF بتواند در Cloud IAP ، پروکسی که از وبسرور Airflow محافظت میکند، احراز هویت شود، باید به Appspot Service Account GCF نقش Service Account Token Creator را اعطا کنید. این کار را با اجرای دستور زیر در Cloud Shell خود انجام دهید و نام پروژه خود را به جای <your-project-id> قرار دهید.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
برای مثال، اگر پروژه شما my-project نام دارد، دستور شما به صورت زیر خواهد بود:
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
دریافت شناسه مشتری
برای ساخت یک توکن جهت احراز هویت در Cloud IAP ، این تابع به شناسه کلاینت پروکسی که از وبسرور Airflow محافظت میکند، نیاز دارد. API مربوط به Cloud Composer این اطلاعات را مستقیماً ارائه نمیدهد. در عوض، یک درخواست احراز هویت نشده به وبسرور Airflow ارسال کنید و شناسه کلاینت را از URL ریدایرکت دریافت کنید. ما این کار را با اجرای یک فایل پایتون با استفاده از Cloud Shell برای دریافت شناسه کلاینت انجام خواهیم داد.
با اجرای دستور زیر در Cloud Shell خود، کد لازم را از GitHub دانلود کنید.
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
اگر به دلیل وجود این دایرکتوری خطایی دریافت کردید، با اجرای دستور زیر آن را به آخرین نسخه بهروزرسانی کنید.
cd python-docs-samples/ git pull origin master
با اجرای دستور زیر به دایرکتوری مناسب تغییر دهید.
cd python-docs-samples/composer/rest
کد پایتون را اجرا کنید تا شناسه کلاینت خود را دریافت کنید، نام پروژه خود را به جای <your-project-id> ، مکان محیط Composer که قبلاً ایجاد کردهاید را به جای <your-composer-location> و نام محیط Composer که قبلاً ایجاد کردهاید را به جای <your-composer-environment> جایگزین کنید.
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
برای مثال، اگر نام پروژه شما my-project ، مکان Composer شما us-central1 و نام محیط شما my-composer باشد، دستور شما به صورت زیر خواهد بود:
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py کارهای زیر را انجام میدهد:
- با Google Cloud احراز هویت میشود
- یک درخواست HTTP احراز هویت نشده به وب سرور Airflow ارسال میکند تا URI مربوط به تغییر مسیر را دریافت کند.
- پارامتر کوئری
client_idرا از آن ریدایرکت استخراج میکند. - چاپ میکند تا بتوانید از آن استفاده کنید
شناسه کلاینت شما در خط فرمان چاپ میشود و چیزی شبیه به این خواهد بود:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
۴. تابع خود را ایجاد کنید
در Cloud Shell خود، با اجرای دستور زیر، مخزن را با کد نمونه لازم کلون کنید:
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
به دایرکتوری مورد نظر بروید و Cloud Shell خود را تا زمانی که مراحل بعدی را انجام میدهید، باز بگذارید.
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
با کلیک روی منوی ناوبری و سپس کلیک روی «توابع ابری» به صفحه توابع ابری گوگل بروید. |
|
روی «ایجاد تابع» در بالای صفحه کلیک کنید |
|
نام تابع خود را "my-function" بگذارید و حافظه را به طور پیشفرض، ۲۵۶ مگابایت، رها کنید. |
|
تریگر را روی «فضای ابری» تنظیم کنید، نوع رویداد را «نهایی/ایجاد» بگذارید و به باکتی که در مرحله «ایجاد یک باکت فضای ابری» ایجاد کردهاید، بروید. |
|
کد منبع را روی «ویرایشگر درونخطی» (Inline Editor) بگذارید و زمان اجرا را روی «Node.js 8» تنظیم کنید. |
|
در Cloud Shell خود، دستور زیر را اجرا کنید. این دستور فایلهای index.js و package.json را در ویرایشگر Cloud Shell باز میکند.
cloudshell edit index.js package.json
روی تب package.json کلیک کنید، آن کد را کپی کرده و در بخش package.json ویرایشگر درونخطی Cloud Functions قرار دهید. |
|
«تابع اجرا» را روی triggerDag تنظیم کنید |
|
روی تب index.js کلیک کنید، کد را کپی کنید و آن را در بخش index.js ویرایشگر درونخطی Cloud Functions قرار دهید. |
|
|
|
در Cloud Shell خود، دستور زیر را اجرا کنید و <your-environment-name> را با نام محیط Composer خود و <your-composer-region> را با منطقهای که محیط Composer شما در آن قرار دارد، جایگزین کنید.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
برای مثال، اگر محیط شما my-composer-environment نام دارد و در us-central1 واقع شده است، دستور شما به صورت زیر خواهد بود:
gcloud composer environments describe my-composer-environment --location us-central1
خروجی باید چیزی شبیه به این باشد:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
در آن خروجی، به دنبال متغیری به نام |
|
روی لینک کشویی «بیشتر» کلیک کنید، سپس منطقهای را که از نظر جغرافیایی به شما نزدیکتر است انتخاب کنید. |
|
تیک گزینه "تلاش مجدد در صورت عدم موفقیت" را بزنید |
|
برای ایجاد تابع ابری خود، روی «ایجاد» کلیک کنید |
|
قدم به قدم کد را بررسی کنید
کدی که از index.js کپی کردهاید چیزی شبیه به این خواهد بود:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
بیایید نگاهی به آنچه اتفاق میافتد بیندازیم. در اینجا سه تابع وجود دارد: triggerDag ، authorizeIap و makeIapPostRequest
triggerDag تابعی است که وقتی چیزی را در مخزن ذخیرهسازی ابری تعیینشده آپلود میکنیم، فعال میشود. این تابع جایی است که متغیرهای مهم مورد استفاده در سایر درخواستها، مانند PROJECT_ID ، CLIENT_ID ، WEBSERVER_ID و DAG_NAME را پیکربندی میکنیم. این تابع authorizeIap و makeIapPostRequest را فراخوانی میکند.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap با استفاده از یک حساب کاربری سرویس و "مبادله" یک JWT برای یک توکن شناسه که برای احراز هویت makeIapPostRequest استفاده خواهد شد، درخواستی را به پروکسی که از وبسرور Airflow محافظت میکند، ارسال میکند.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest یک فراخوانی به وبسرور Airflow انجام میدهد تا composer_sample_trigger_response_dag. نام DAG در URL وبسرور Airflow که با پارامتر url ارسال شده است، تعبیه شده است و idToken توکنی است که ما در درخواست authorizeIap به دست آوردهایم.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
۵. DAG خود را تنظیم کنید
در Cloud Shell خود، به دایرکتوری حاوی گردشهای کاری نمونه بروید. این دایرکتوری بخشی از python-docs-samples است که در مرحلهی «دریافت شناسهی کلاینت» از گیتهاب دانلود کردهاید.
cd cd python-docs-samples/composer/workflows
DAG را در Composer آپلود کنید
نمونه DAG را با دستور زیر در مخزن ذخیرهسازی DAG محیط Composer خود آپلود کنید، که در آن <environment_name> نام محیط Composer شما و <location> نام منطقهای است که در آن قرار دارد. trigger_response_dag.py DAG است که با آن کار خواهیم کرد.
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
برای مثال، اگر محیط Composer شما my-composer نام داشت و در us-central1 قرار داشت، دستور شما به صورت زیر خواهد بود:
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
گام برداشتن در DAG
کد DAG در trigger_response.py به این شکل است:
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
بخش default_args شامل آرگومانهای پیشفرض مورد نیاز مدل BaseOperator در Apache Airflow است. این بخش را با این پارامترها در هر DAG آپاچی ایرفلو مشاهده خواهید کرد. owner در حال حاضر روی Composer Example تنظیم شده است، اما در صورت تمایل میتوانید آن را به نام خود تغییر دهید. depends_on_past به ما نشان میدهد که این DAG به هیچ DAG قبلی وابسته نیست. سه بخش ایمیل، email ، email_on_failure و email_on_retry طوری تنظیم شدهاند که هیچ اعلان ایمیلی بر اساس وضعیت این DAG ارسال نشود. DAG فقط یک بار دوباره تلاش میکند، زیرا retries روی ۱ تنظیم شده است و این کار را پس از پنج دقیقه، به ازای هر retry_delay انجام میدهد. start_date معمولاً زمان اجرای یک DAG را همراه با schedule_interval آن (که بعداً تنظیم میشود) تعیین میکند، اما در مورد این DAG، اهمیتی ندارد. روی ۱ ژانویه ۲۰۱۷ تنظیم شده است، اما میتواند روی هر تاریخ گذشتهای تنظیم شود.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
بخش with airflow.DAG DAG مورد نظر را پیکربندی میکند. این DAG با شناسه وظیفه composer_sample_trigger_response_dag ، آرگومانهای پیشفرض از بخش default_args و از همه مهمتر، با مقدار schedule_interval برابر با None اجرا خواهد شد. مقدار schedule_interval برابر با None تنظیم شده است زیرا ما این DAG خاص را با تابع Cloud خود فعال میکنیم. به همین دلیل است که start_date در default_args اهمیتی ندارد.
هنگام اجرا، DAG پیکربندی خود را، همانطور که در متغیر print_gcs_info دیکته شده است، چاپ میکند.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
۶. عملکرد خود را آزمایش کنید
محیط کامپوزر خود را باز کنید و در ردیفی که نام محیط شما قرار دارد، روی لینک Airflow کلیک کنید. |
|
با کلیک روی نام |
|
یک تب جداگانه باز کنید و هر فایلی را که قبلاً ایجاد کردهاید و به عنوان تریگر برای تابع ابری خود مشخص کردهاید، در آن آپلود کنید. میتوانید این کار را از طریق کنسول یا با استفاده از دستور gsutil انجام دهید. |
|
به برگه رابط کاربری Airflow خود برگردید و روی Graph View کلیک کنید. |
|
روی وظیفه |
|
روی «مشاهده گزارش» در بالا سمت راست منو کلیک کنید |
|
در گزارشها، اطلاعاتی درباره فایلی که در فضای ذخیرهسازی ابری خود آپلود کردهاید، مشاهده خواهید کرد. |
|
تبریک! شما همین الان یک DAG از Airflow را با استفاده از Node.js و توابع Google Cloud راهاندازی کردید!
۷. پاکسازی
برای جلوگیری از تحمیل هزینه به حساب GCP خود برای منابع استفاده شده در این راهنمای سریع:
- (اختیاری) برای ذخیره دادههایتان، دادهها را از مخزن ذخیرهسازی ابری برای محیط Cloud Composer و مخزن ذخیرهسازی که برای این شروع سریع ایجاد کردهاید، دانلود کنید .
- سطل ذخیرهسازی ابری مربوط به محیط و آنچه ایجاد کردهاید را حذف کنید.
- محیط Cloud Composer را حذف کنید . توجه داشته باشید که حذف محیط، فضای ذخیرهسازی آن محیط را حذف نمیکند.
- (اختیاری) با محاسبات بدون سرور، ۲ میلیون فراخوانی اول در هر ماه رایگان است و وقتی عملکرد خود را به صفر میرسانید، هزینهای از شما دریافت نمیشود (برای جزئیات بیشتر به قیمتگذاری مراجعه کنید). با این حال، اگر میخواهید عملکرد ابری خود را حذف کنید، با کلیک روی «حذف» در سمت راست بالای صفحه نمای کلی عملکرد خود، این کار را انجام دهید.

همچنین میتوانید به صورت اختیاری پروژه را حذف کنید:
- در کنسول GCP، به صفحه پروژهها بروید.
- در لیست پروژهها، پروژهای را که میخواهید حذف کنید انتخاب کرده و روی حذف کلیک کنید.
- در کادر، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.


























