1. مقدمة
تم تصميم Apache Airflow لتشغيل الرسوم البيانية الموجّهة غير الدورية (DAG) وفقًا لجدول زمني منتظم، ولكن يمكنك أيضًا تشغيل الرسوم البيانية الموجّهة غير الدورية (DAG) استجابةً للأحداث، مثل تغيير في حزمة Cloud Storage أو رسالة يتم إرسالها إلى Cloud Pub/Sub. لتحقيق ذلك، يمكن تشغيل مخططات DAG في Cloud Composer من خلال Cloud Functions.
يشغّل المثال في هذا المختبر مخططًا موجّهًا غير دوري بسيطًا في كل مرة يحدث فيها تغيير في حزمة Cloud Storage. يستخدم هذا الرسم البياني الموجّه غير الدوري (DAG) أداة BashOperator لتنفيذ أمر bash يعرض معلومات التغيير بشأن المحتوى الذي تم تحميله إلى حزمة Cloud Storage.
قبل بدء هذا التمرين العملي، يُنصح بإكمال التمرينَين العمليَّين مقدمة عن Cloud Composer وبدء استخدام Cloud Functions. إذا أنشأت بيئة Composer في درس تطبيقي حول الترميز Intro to Cloud Composer، يمكنك استخدام هذه البيئة في هذا المختبر.
ما ستنشئه
في هذا الدرس التطبيقي حول الترميز، ستتعرّف على ما يلي:
- حمِّل ملفًا إلى Google Cloud Storage، ما سيؤدي إلى
- تشغيل وظيفة Google Cloud باستخدام وقت تشغيل Node.JS
- ستنفّذ هذه الدالة مخططًا موجّهًا غير دوري (DAG) في Google Cloud Composer.
- يؤدي ذلك إلى تنفيذ أمر bash بسيط يعرض التغيير في حزمة Google Cloud Storage

ما ستتعلمه
- كيفية تشغيل DAG في Apache Airflow باستخدام Google Cloud Functions وNode.js
المتطلبات
- حساب Google Cloud Platform
- فهم أساسي للغة JavaScript
- معرفة أساسية بـ Cloud Composer/Airflow وCloud Functions
- الراحة في استخدام أوامر واجهة سطر الأوامر
2. إعداد GCP
اختيار المشروع أو إنشاؤه
اختَر مشروعًا على Google Cloud Platform أو أنشئ مشروعًا. إذا كنت بصدد إنشاء مشروع جديد، اتّبِع الخطوات الموضّحة هنا.
سجِّل رقم تعريف مشروعك الذي ستستخدمه في الخطوات اللاحقة.
إذا كنت بصدد إنشاء مشروع جديد، يمكنك العثور على رقم تعريف المشروع أسفل "اسم المشروع" مباشرةً في صفحة الإنشاء. |
|
إذا سبق لك إنشاء مشروع، يمكنك العثور على رقم التعريف في الصفحة الرئيسية لوحدة التحكّم ضمن بطاقة "معلومات المشروع". |
|
تفعيل واجهات برمجة التطبيقات
|
إنشاء بيئة Composer
أنشئ بيئة Cloud Composer باستخدام الإعدادات التالية:
يمكن أن تبقى جميع الإعدادات الأخرى على قيمتها التلقائية. انقر على "إنشاء" في أسفل الصفحة.سجِّل اسم بيئة Composer وموقعها الجغرافي، لأنّك ستحتاج إليهما في خطوات لاحقة. |
|
إنشاء حزمة Cloud Storage
في مشروعك، أنشئ حزمة Cloud Storage بالإعدادات التالية:
انقر على "إنشاء" عندما تكون جاهزًا. احرص على تدوين اسم حزمة Cloud Storage لاستخدامه في الخطوات اللاحقة. |
|
3- إعداد Google Cloud Functions (GCF)
لإعداد GCF، سننفّذ أوامر في Google Cloud Shell.
على الرغم من إمكانية تشغيل Google Cloud عن بُعد من الكمبيوتر المحمول باستخدام أداة سطر الأوامر gcloud، سنستخدم في هذا الدرس التطبيقي حول الترميز Google Cloud Shell، وهي بيئة سطر أوامر تعمل في السحابة الإلكترونية.
يتم تحميل هذا الجهاز الافتراضي المستند إلى Debian بجميع أدوات التطوير التي تحتاج إليها. وتوفِّر هذه الآلة دليلاً رئيسيًا دائمًا بسعة 5 غيغابايت ويتمّ تشغيله على Google Cloud، ما يحسّن كثيرًا أداء الشبكة والمصادقة. يعني ذلك أنّ كل ما تحتاج إليه لهذا الدرس التطبيقي حول الترميز هو متصفّح (نعم، يمكن استخدامه على جهاز Chromebook).
لتفعيل Google Cloud Shell، انقر على الزرّ في أعلى يسار صفحة "وحدة تحكّم المطوّرين" (يستغرق توفير البيئة والاتصال بها بضع لحظات فقط): |
|
منح أذونات توقيع الكائن الثنائي الكبير إلى حساب خدمة Cloud Functions
لكي تتمكّن "وظائف السحابة الإلكترونية من Google" من المصادقة على Cloud IAP، وهو الوكيل الذي يحمي خادم الويب Airflow، عليك منح حساب خدمة Appspot دور Service Account Token Creator في "وظائف السحابة الإلكترونية من Google". يمكنك إجراء ذلك من خلال تنفيذ الأمر التالي في Cloud Shell، مع استبدال اسم مشروعك بـ <your-project-id>.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
على سبيل المثال، إذا كان اسم مشروعك هو my-project، سيكون الأمر
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
الحصول على معرّف العميل
لإنشاء رمز مميّز للمصادقة على Cloud IAP، تتطلّب الدالة معرّف العميل للوكيل الذي يحمي خادم الويب Airflow. لا يقدّم Cloud Composer API هذه المعلومات مباشرةً. بدلاً من ذلك، يمكنك إرسال طلب غير مصادق عليه إلى خادم الويب Airflow وتحديد معرّف العميل من عنوان URL لإعادة التوجيه. سننفّذ ذلك من خلال تشغيل ملف Python باستخدام Cloud Shell للحصول على معرّف العميل.
نزِّل الرمز البرمجي اللازم من GitHub عن طريق تنفيذ الأمر التالي في Cloud Shell
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
إذا ظهر لك خطأ لأنّ هذا الدليل متوفّر من قبل، يمكنك تعديله إلى أحدث إصدار من خلال تنفيذ الأمر التالي
cd python-docs-samples/ git pull origin master
غيِّر إلى الدليل المناسب عن طريق تنفيذ
cd python-docs-samples/composer/rest
نفِّذ رمز Python للحصول على رقم تعريف العميل، مع استبدال اسم مشروعك بـ <your-project-id> وموقع بيئة Composer التي أنشأتها سابقًا بـ <your-composer-location> واسم بيئة Composer التي أنشأتها سابقًا بـ <your-composer-environment>.
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
على سبيل المثال، إذا كان اسم مشروعك هو my-project، وكان موقع Composer الجغرافي هو us-central1، وكان اسم بيئتك هو my-composer، سيكون الأمر الذي عليك تنفيذه هو
python3 get_client_id.py my-project us-central1 my-composer
تنفِّذ get_client_id.py ما يلي:
- المصادقة باستخدام Google Cloud
- إجراء طلب HTTP غير مصادق عليه إلى خادم الويب Airflow من أجل الحصول على معرّف الموارد المنتظم لإعادة التوجيه
- يستخرج مَعلمة طلب البحث
client_idمن عملية إعادة التوجيه هذه - طباعتها لتتمكّن من استخدامها
سيتم عرض معرّف العميل في سطر الأوامر وسيبدو على النحو التالي:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. إنشاء الدالة
في Cloud Shell، استنسِخ المستودع الذي يتضمّن رمزًا نموذجيًا لازمًا عن طريق تنفيذ
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
انتقِل إلى الدليل المطلوب واترك Cloud Shell مفتوحًا أثناء إكمال الخطوات القليلة التالية.
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
انتقِل إلى صفحة Google Cloud Functions من خلال النقر على قائمة التنقّل ثم على "Cloud Functions". |
|
انقر على "إنشاء دالة" (CREATE FUNCTION) في أعلى الصفحة. |
|
أطلِق على الدالة اسم "my-function" واترك الذاكرة على القيمة التلقائية، أي 256 ميغابايت. |
|
اضبط "المشغّل" على Cloud Storage، واترك "نوع الحدث" على "إنهاء/إنشاء"، ثم استعرض الحزمة التي أنشأتها في خطوة إنشاء حزمة في Cloud Storage. |
|
اترك رمز المصدر مضبوطًا على "المحرّر المضمّن" واضبط وقت التشغيل على "Node.js 8". |
|
في Cloud Shell، نفِّذ الأمر التالي. سيؤدي ذلك إلى فتح index.js وpackage.json في محرِّر Cloud Shell.
cloudshell edit index.js package.json
انقر على علامة التبويب package.json، وانسخ الرمز والصقه في قسم package.json في المحرّر المضمّن في Cloud Functions. |
|
اضبط "الدالة المطلوب تنفيذها" على triggerDag |
|
انقر على علامة التبويب index.js، وانسخ الرمز، وألصقه في قسم index.js في المحرّر المضمّن في Cloud Functions |
|
غيِّر |
|
في Cloud Shell، شغِّل الأمر التالي، مع استبدال <your-environment-name> باسم بيئة Composer و <your-composer-region> بالمنطقة التي تقع فيها بيئة Composer.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
على سبيل المثال، إذا كان اسم بيئتك هو my-composer-environment وكانت تقع في us-central1، سيكون الأمر الذي تكتبه هو
gcloud composer environments describe my-composer-environment --location us-central1
يجب أن تبدو المخرجات على النحو التالي:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
في هذه النتيجة، ابحث عن المتغيّر الذي يحمل الاسم |
|
انقر على رابط القائمة المنسدلة "المزيد"، ثم اختَر المنطقة الأقرب إليك جغرافيًا. |
|
ضَع علامة في المربّع بجانب "إعادة المحاولة عند حدوث خطأ". |
|
انقر على "إنشاء" لإنشاء Cloud Function. |
|
التنقّل بين أسطر الرمز
سيبدو الرمز الذي نسخته من index.js على النحو التالي:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
لنلقِ نظرة على ما يحدث. تتوفّر ثلاث دوال هنا: triggerDag وauthorizeIap وmakeIapPostRequest
triggerDag هي الدالة التي يتم تشغيلها عند تحميل ملف إلى حزمة Cloud Storage المحدّدة. وهي المكان الذي نضبط فيه المتغيّرات المهمة المستخدَمة في الطلبات الأخرى، مثل PROJECT_ID وCLIENT_ID وWEBSERVER_ID وDAG_NAME. يتصل بالرقمين authorizeIap وmakeIapPostRequest.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
يرسل authorizeIap طلبًا إلى الخادم الوكيل الذي يحمي خادم الويب Airflow، وذلك باستخدام حساب خدمة و "تبديل" رمز JWT برمز مميّز للمعرّف سيتم استخدامه لمصادقة makeIapPostRequest.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
يُجري makeIapPostRequest طلبًا إلى خادم الويب Airflow لتشغيل composer_sample_trigger_response_dag.. يتم تضمين اسم DAG في عنوان URL لخادم الويب Airflow الذي تم إدخاله مع المَعلمة url، وidToken هو الرمز المميز الذي حصلنا عليه في الطلب authorizeIap.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5- إعداد الرسم البياني الدوري الموجّه
في Cloud Shell، انتقِل إلى الدليل الذي يتضمّن نماذج سير العمل. وهي جزء من python-docs-samples التي نزّلتها من GitHub في خطوة الحصول على رقم تعريف العميل.
cd cd python-docs-samples/composer/workflows
تحميل الرسم البياني الدوري الموجّه إلى Composer
حمِّل نموذج DAG إلى حزمة تخزين DAG في بيئة Composer باستخدام الأمر التالي، حيث يمثّل <environment_name> اسم بيئة Composer و<location> اسم المنطقة التي تقع فيها. trigger_response_dag.py هو الرسم البياني الموجّه غير الدوري الذي سنعمل معه.
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
على سبيل المثال، إذا كان اسم بيئة Composer هو my-composer وكانت تقع في us-central1، سيكون الأمر كما يلي:
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
التنقّل بين مهام الرسم البياني الدوري الموجّه
يبدو رمز DAG في trigger_response.py على النحو التالي
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
يحتوي القسم default_args على الوسيطات التلقائية على النحو المطلوب من خلال نموذج BaseOperator في Apache Airflow. سيظهر هذا القسم مع هذه المَعلمات في أي رسم بياني موجّه غير دوري (DAG) في Apache Airflow. تم ضبط owner حاليًا على Composer Example، ولكن يمكنك تغيير ذلك إلى اسمك إذا أردت. يُظهر لنا depends_on_past أنّ هذا الرسم البياني الموجّه غير الحلقي لا يعتمد على أي رسومات بيانية موجّهة غير حلقية سابقة. تم ضبط أقسام البريد الإلكتروني الثلاثة، email وemail_on_failure وemail_on_retry، بحيث لا تصل أي إشعارات عبر البريد الإلكتروني استنادًا إلى حالة هذا الرسم البياني الموجّه غير الدوري. ستتم إعادة محاولة تنفيذ DAG مرة واحدة فقط، لأنّ قيمة retries مضبوطة على 1، وسيتم ذلك بعد خمس دقائق، وفقًا لـ retry_delay. يحدّد start_date عادةً وقت تشغيل الرسم البياني الموجّه غير الدوري (DAG)، وذلك بالتزامن مع schedule_interval (الذي يتم ضبطه لاحقًا)، ولكن في حالة هذا الرسم البياني الموجّه غير الدوري، لا يكون ذلك مهمًا. تم ضبطه على 1 يناير 2017، ولكن يمكن ضبطه على أي تاريخ سابق.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
يضبط قسم with airflow.DAG الرسم البياني الموجّه غير الدوري (DAG) الذي سيتم تنفيذه. سيتم تشغيلها باستخدام معرّف المهمة composer_sample_trigger_response_dag، والوسيطات التلقائية من القسم default_args، والأهم من ذلك، باستخدام schedule_interval بقيمة None. تم ضبط schedule_interval على None لأنّنا نشغّل هذا الرسم البياني الموجّه غير الدوري (DAG) باستخدام Cloud Function. لهذا السبب، لا صلة بين start_date وdefault_args.
عند تنفيذ DAG، يتم عرض إعداداته، كما هو محدّد في المتغيّر print_gcs_info.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. اختبار الدالة
افتح بيئة Composer، وفي الصف الذي يتضمّن اسم البيئة، انقر على رابط Airflow. |
|
افتح |
|
افتح علامة تبويب منفصلة وحمِّل أي ملف إلى حزمة Cloud Storage التي أنشأتها سابقًا وحدّدتها كمشغّل لـ Cloud Function. يمكنك إجراء ذلك من خلال وحدة التحكّم أو باستخدام أمر gsutil. |
|
انتقِل مجددًا إلى علامة التبويب التي تتضمّن واجهة مستخدم Airflow وانقر على "عرض الرسم البياني" (Graph View). |
|
انقر على المهمة |
|
انقر على "عرض السجلّ" في أعلى يسار القائمة |
|
في السجلات، ستظهر لك معلومات عن الملف الذي حمّلته إلى حزمة Cloud Storage. |
|
تهانينا! لقد شغّلت للتوّ مخططًا موجّهًا غير دوري (DAG) في Airflow باستخدام Node.js وGoogle Cloud Functions.
7. تنظيف
لتجنُّب تحمّل رسوم في حسابك على GCP مقابل الموارد المستخدَمة في هذا الدليل السريع، اتّبِع الخطوات التالية:
- (اختياري) لحفظ بياناتك، نزِّل البيانات من حزمة Cloud Storage لبيئة Cloud Composer وحزمة التخزين التي أنشأتها لهذا التشغيل السريع.
- احذف حزمة Cloud Storage الخاصة بالبيئة التي أنشأتها.
- احذف بيئة Cloud Composer. يُرجى العِلم أنّ حذف بيئتك لا يؤدي إلى حذف حزمة التخزين الخاصة بها.
- (اختياري) باستخدام الحوسبة بدون خادم، تكون أول مليوني عملية استدعاء شهريًا مجانية، وعندما تقلّل حجم وظيفتك إلى صفر، لن يتم تحصيل أي رسوم منك (راجِع الأسعار لمزيد من التفاصيل). ومع ذلك، إذا أردت حذف Cloud Function، يمكنك إجراء ذلك من خلال النقر على "حذف" في أعلى يسار صفحة النظرة العامة للدالة.

يمكنك أيضًا حذف المشروع بشكل اختياري:
- في وحدة تحكّم Google Cloud Platform، انتقِل إلى صفحة المشاريع.
- في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
- في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.


























