Chạy công việc đếm từ Hadoop trên cụm Dataproc

1. Giới thiệu

Quy trình làm việc là một trường hợp sử dụng phổ biến trong hoạt động phân tích dữ liệu. Quy trình này bao gồm việc nhập, chuyển đổi và phân tích dữ liệu để tìm ra thông tin có ý nghĩa. Trong Google Cloud Platform, công cụ điều phối quy trình công việc là Cloud Composer. Đây là phiên bản được lưu trữ của công cụ quy trình công việc nguồn mở phổ biến Apache Airflow. Trong phòng thí nghiệm này, bạn sẽ sử dụng Cloud Composer để tạo một quy trình công việc đơn giản nhằm tạo một cụm Cloud Dataproc, phân tích cụm đó bằng Cloud Dataproc và Apache Hadoop, sau đó xoá cụm Cloud Dataproc.

Cloud Composer là gì?

Cloud Composer là một dịch vụ điều phối quy trình làm việc được quản lý toàn diện, giúp bạn tạo, lên lịch và giám sát các pipeline trải rộng trên nhiều đám mây và trung tâm dữ liệu tại chỗ. Được xây dựng dựa trên dự án nguồn mở Apache Airflow phổ biến và hoạt động bằng ngôn ngữ lập trình Python, Cloud Composer không bị ràng buộc và dễ sử dụng.

Bằng cách sử dụng Cloud Composer thay vì một thực thể cục bộ của Apache Airflow, người dùng có thể tận dụng tối đa Airflow mà không cần phải cài đặt hoặc quản lý.

Apache Airflow là gì?

Apache Airflow là một công cụ nguồn mở được dùng để tạo, lên lịch và giám sát quy trình công việc theo phương thức lập trình. Có một số thuật ngữ chính cần nhớ liên quan đến Airflow mà bạn sẽ thấy trong suốt khoá học này:

  • DAG – DAG (Đồ thị không chu trình có hướng) là một tập hợp các tác vụ được sắp xếp mà bạn muốn lên lịch và chạy. DAG (còn gọi là quy trình làm việc) được xác định trong các tệp Python tiêu chuẩn
  • Thao tác – thao tác mô tả một tác vụ duy nhất trong quy trình làm việc

Cloud Dataproc là gì?

Cloud Dataproc là dịch vụ Apache SparkApache Hadoop được quản lý toàn diện của Google Cloud Platform. Cloud Dataproc dễ dàng tích hợp với các dịch vụ khác của GCP, mang đến cho bạn một nền tảng mạnh mẽ và toàn diện để xử lý dữ liệu, phân tích và học máy.

Những việc bạn sẽ làm

Lớp học lập trình này hướng dẫn bạn cách tạo và chạy một quy trình công việc Apache Airflow trong Cloud Composer để hoàn thành các tác vụ sau:

  • Tạo một cụm Cloud Dataproc
  • Chạy một công việc đếm từ Apache Hadoop trên cụm và xuất kết quả của công việc đó vào Cloud Storage
  • Xoá cụm

Kiến thức bạn sẽ học được

  • Cách tạo và chạy quy trình công việc Apache Airflow trong Cloud Composer
  • Cách sử dụng Cloud Composer và Cloud Dataproc để chạy một quy trình phân tích trên một tập dữ liệu
  • Cách truy cập vào môi trường Cloud Composer thông qua Bảng điều khiển Google Cloud Platform, Cloud SDK và giao diện web Airflow

Bạn cần có

  • Tài khoản GCP
  • Kiến thức cơ bản về CLI
  • Hiểu biết cơ bản về Python

2. Thiết lập GCP

Tạo dự án

Chọn hoặc tạo một dự án trên Google Cloud Platform.

Ghi lại mã dự án để sử dụng trong các bước sau.

Nếu bạn đang tạo một dự án mới, thì mã dự án sẽ nằm ngay bên dưới Tên dự án trên trang tạo

Nếu đã tạo một dự án, bạn có thể tìm thấy mã nhận dạng trên trang chủ của bảng điều khiển trong thẻ Thông tin dự án

Bật các API

Bật Cloud Composer, Cloud Dataproc và Cloud Storage API.Sau khi bật, bạn có thể bỏ qua nút "Go to Credentials" (Chuyển đến phần Thông tin đăng nhập) và chuyển sang bước tiếp theo của hướng dẫn.

Tạo môi trường Composer

Tạo một môi trường Cloud Composer có cấu hình sau:

  • Tên: my-composer-environment
  • Vị trí: us-central1
  • Khu vực: us-central1-a

Bạn có thể giữ nguyên tất cả các cấu hình khác theo mặc định. Nhấp vào "Tạo" ở dưới cùng.

Tạo bộ chứa Cloud Storage

Trong dự án của bạn, hãy tạo một bộ chứa Cloud Storage có cấu hình như sau:

  • Tên: <your-project-id>
  • Lớp lưu trữ mặc định: Đa khu vực
  • Địa điểm: Hoa Kỳ
  • Mô hình kiểm soát quyền truy cập: chi tiết

Nhấn vào "Tạo" khi bạn đã sẵn sàng

3. Thiết lập Apache Airflow

Xem thông tin về môi trường Composer

Trong Bảng điều khiển GCP, hãy mở trang Môi trường

Nhấp vào tên của môi trường để xem thông tin chi tiết.

Trang Thông tin chi tiết về môi trường cung cấp thông tin, chẳng hạn như URL giao diện web Airflow, mã nhận dạng cụm Google Kubernetes Engine, tên của bộ chứa Cloud Storage và đường dẫn cho thư mục /dags.

Trong Airflow, DAG (Đồ thị không chu trình có hướng) là một tập hợp các tác vụ được sắp xếp mà bạn muốn lên lịch và chạy. DAG (còn gọi là quy trình công việc) được xác định trong các tệp Python tiêu chuẩn. Cloud Composer chỉ lên lịch các DAG trong thư mục /dags. Thư mục /dags nằm trong bộ chứa Cloud Storage mà Cloud Composer tự động tạo khi bạn tạo môi trường.

Thiết lập các biến môi trường Apache Airflow

Các biến Apache Airflow là một khái niệm dành riêng cho Airflow, khác với biến môi trường. Trong bước này, bạn sẽ thiết lập 3 biến Airflow sau đây: gcp_project, gcs_bucketgce_zone.

Sử dụng gcloud để đặt biến

Trước tiên, hãy mở Cloud Shell. Cloud SDK đã được cài đặt sẵn trong Cloud Shell để thuận tiện cho bạn.

Đặt biến môi trường COMPOSER_INSTANCE thành tên của môi trường Composer

COMPOSER_INSTANCE=my-composer-environment

Để đặt các biến Airflow bằng công cụ dòng lệnh gcloud, hãy dùng lệnh gcloud composer environments run với lệnh phụ variables. Lệnh gcloud composer này thực thi lệnh phụ variables của Airflow CLI. Lệnh phụ này sẽ truyền các đối số đến công cụ dòng lệnh gcloud.

Bạn sẽ chạy lệnh này 3 lần, thay thế các biến bằng những biến liên quan đến dự án của bạn.

Đặt gcp_project bằng lệnh sau, thay thế <your-project-id> bằng mã dự án mà bạn đã ghi lại ở Bước 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Đầu ra của bạn sẽ có dạng như sau

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Đặt gcs_bucket bằng lệnh sau, thay thế <your-bucket-name> bằng mã nhận dạng bộ chứa mà bạn đã ghi lại ở Bước 2. Nếu bạn làm theo đề xuất của chúng tôi, tên nhóm của bạn sẽ giống với mã dự án. Kết quả của bạn sẽ tương tự như lệnh trước.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Đặt gce_zone bằng lệnh sau. Kết quả của bạn sẽ tương tự như các lệnh trước.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Không bắt buộc) Sử dụng gcloud để xem một biến

Để xem giá trị của một biến, hãy chạy lệnh phụ variables Airflow CLI bằng đối số get hoặc sử dụng giao diện người dùng Airflow.

Ví dụ:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Bạn có thể thực hiện việc này với bất kỳ biến nào trong số 3 biến mà bạn vừa đặt: gcp_project, gcs_bucketgce_zone.

4. Quy trình công việc mẫu

Hãy xem mã cho DAG mà chúng ta sẽ dùng trong bước 5. Bạn chưa cần tải tệp xuống, chỉ cần làm theo hướng dẫn tại đây.

Có rất nhiều điều cần giải thích ở đây, vì vậy, hãy phân tích từng phần.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Chúng ta bắt đầu bằng một số lệnh nhập Airflow:

  • airflow.models – Cho phép chúng tôi truy cập và tạo dữ liệu trong cơ sở dữ liệu Airflow.
  • airflow.contrib.operators – Nơi các nhà khai thác trong cộng đồng sinh sống. Trong trường hợp này, chúng ta cần dataproc_operator để truy cập vào Cloud Dataproc API.
  • airflow.utils.trigger_rule – Để thêm quy tắc kích hoạt cho toán tử. Quy tắc kích hoạt cho phép kiểm soát chi tiết việc một toán tử có nên thực thi liên quan đến trạng thái của các toán tử mẹ hay không.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Thao tác này chỉ định vị trí của tệp đầu ra. Dòng đáng chú ý ở đây là models.Variable.get('gcs_bucket'). Dòng này sẽ lấy giá trị biến gcs_bucket từ cơ sở dữ liệu Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR – Vị trí của tệp .jar mà cuối cùng chúng ta sẽ chạy trên cụm Cloud Dataproc. Nền tảng này đã được lưu trữ trên GCP cho bạn.
  • input_file – Vị trí của tệp chứa dữ liệu mà công việc Hadoop của chúng ta sẽ tính toán. Chúng tôi sẽ tải dữ liệu lên vị trí đó cùng nhau ở Bước 5.
  • wordcount_args – Các đối số mà chúng ta sẽ truyền vào tệp jar.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

Thao tác này sẽ cung cấp cho chúng ta một đối tượng datetime tương đương biểu thị nửa đêm của ngày hôm trước. Ví dụ: nếu được thực thi lúc 11:00 ngày 4 tháng 3, đối tượng ngày giờ sẽ biểu thị 00:00 ngày 3 tháng 3. Điều này liên quan đến cách Airflow xử lý việc lập lịch. Bạn có thể xem thêm thông tin về vấn đề này tại đây.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Bạn nên cung cấp biến default_dag_args dưới dạng từ điển bất cứ khi nào tạo một DAG mới:

  • 'email_on_failure' – Cho biết có nên gửi cảnh báo qua email khi một tác vụ không thành công hay không
  • 'email_on_retry' – Cho biết liệu có nên gửi cảnh báo qua email khi một tác vụ được thử lại hay không
  • 'retries' – Cho biết số lần thử lại mà Airflow nên thực hiện trong trường hợp DAG gặp lỗi
  • 'retry_delay' – Cho biết khoảng thời gian Airflow sẽ đợi trước khi thử lại
  • 'project_id' – Cho DAG biết Mã dự án GCP cần liên kết với DAG. Mã này sẽ cần thiết sau này với Dataproc Operator
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Việc sử dụng with models.DAG sẽ yêu cầu tập lệnh đưa mọi thứ bên dưới vào cùng một DAG. Chúng ta cũng thấy 3 đối số được truyền vào:

  • Tham số đầu tiên là một chuỗi, đây là tên mà bạn sẽ đặt cho DAG mà bạn đang tạo. Trong trường hợp này, chúng ta sẽ sử dụng composer_hadoop_tutorial.
  • schedule_interval – Một đối tượng datetime.timedelta, ở đây chúng ta đã đặt thành một ngày. Điều này có nghĩa là DAG này sẽ cố gắng thực thi mỗi ngày một lần sau 'start_date' được đặt trước đó trong 'default_dag_args'
  • default_args – Từ điển mà chúng ta đã tạo trước đó, chứa các đối số mặc định cho DAG

Tạo một Cụm Dataproc

Tiếp theo, chúng ta sẽ tạo một dataproc_operator.DataprocClusterCreateOperator để tạo Cụm Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

Trong toán tử này, chúng ta thấy một số đối số, tất cả trừ đối số đầu tiên đều dành riêng cho toán tử này:

  • task_id – Giống như trong BashOperator, đây là tên mà chúng ta chỉ định cho toán tử, có thể xem được từ giao diện người dùng Airflow
  • cluster_name – Tên mà chúng tôi chỉ định cho cụm Cloud Dataproc. Ở đây, chúng tôi đặt tên là composer-hadoop-tutorial-cluster-{{ ds_nodash }} (xem hộp thông tin để biết thêm thông tin bổ sung không bắt buộc)
  • num_workers – Số lượng worker mà chúng tôi phân bổ cho cụm Cloud Dataproc
  • zone – Khu vực địa lý nơi chúng ta muốn cụm lưu trú, được lưu trong cơ sở dữ liệu Airflow. Thao tác này sẽ đọc biến 'gce_zone' mà chúng ta đã đặt ở Bước 3
  • master_machine_type – Loại máy mà chúng ta muốn phân bổ cho máy chủ chính Cloud Dataproc
  • worker_machine_type – Loại máy mà chúng ta muốn phân bổ cho worker Cloud Dataproc

Gửi một công việc Apache Hadoop

dataproc_operator.DataProcHadoopOperator cho phép chúng ta gửi một công việc đến một cụm Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Chúng tôi cung cấp một số tham số:

  • task_id – Tên mà chúng tôi chỉ định cho phần này của DAG
  • main_jar – Vị trí của tệp .jar mà chúng ta muốn chạy trên cụm
  • cluster_name – Tên của cụm để chạy công việc, bạn sẽ nhận thấy tên này giống hệt với tên mà chúng ta tìm thấy trong toán tử trước đó
  • arguments – Các đối số được truyền vào tệp jar, như khi bạn thực thi tệp .jar từ dòng lệnh

Xoá Cụm

Thao tác cuối cùng mà chúng ta sẽ tạo là dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Đúng như tên gọi, toán tử này sẽ xoá một cụm Cloud Dataproc nhất định. Chúng ta thấy 3 đối số ở đây:

  • task_id – Giống như trong BashOperator, đây là tên mà chúng ta chỉ định cho toán tử, có thể xem được từ giao diện người dùng Airflow
  • cluster_name – Tên mà chúng tôi chỉ định cho cụm Cloud Dataproc. Ở đây, chúng ta đặt tên là composer-hadoop-tutorial-cluster-{{ ds_nodash }} (xem hộp thông tin sau phần "Tạo một cụm Dataproc" để biết thêm thông tin bổ sung không bắt buộc)
  • trigger_rule – Chúng ta đã đề cập ngắn gọn đến Quy tắc kích hoạt trong quá trình nhập ở đầu bước này, nhưng ở đây chúng ta có một quy tắc đang hoạt động. Theo mặc định, một toán tử Airflow sẽ không thực thi trừ phi tất cả các toán tử nguồn của nó đã hoàn tất thành công. Quy tắc kích hoạt ALL_DONE chỉ yêu cầu tất cả các toán tử nguồn đã hoàn tất, bất kể có thành công hay không. Ở đây, điều này có nghĩa là ngay cả khi công việc Hadoop không thành công, chúng ta vẫn muốn huỷ cụm.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Cuối cùng, chúng ta muốn các toán tử này thực thi theo một thứ tự cụ thể và chúng ta có thể biểu thị điều này bằng cách sử dụng các toán tử dịch bit của Python. Trong trường hợp này, create_dataproc_cluster sẽ luôn thực thi trước, sau đó là run_dataproc_hadoop và cuối cùng là delete_dataproc_cluster.

Kết hợp tất cả lại với nhau, mã sẽ có dạng như sau:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Tải tệp Airflow lên Cloud Storage

Sao chép DAG vào thư mục /dags

  1. Trước tiên, hãy mở Cloud Shell. Cloud SDK đã được cài đặt sẵn trong Cloud Shell để thuận tiện cho bạn.
  2. Tạo bản sao kho lưu trữ mẫu python và thay đổi thành thư mục composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. Chạy lệnh sau để đặt tên thư mục DAG thành một biến môi trường
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Chạy lệnh gsutil sau đây để sao chép mã hướng dẫn vào nơi thư mục /dags được tạo
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Đầu ra của bạn sẽ có dạng như sau:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Sử dụng giao diện người dùng Airflow

Cách truy cập vào giao diện web Airflow bằng bảng điều khiển GCP:

  1. Mở trang Environments (Môi trường).
  2. Trong cột Máy chủ web Airflow cho môi trường, hãy nhấp vào biểu tượng cửa sổ mới. Giao diện người dùng web Airflow sẽ mở trong một cửa sổ trình duyệt mới.

Để biết thông tin về giao diện người dùng Airflow, hãy xem phần Truy cập vào giao diện web.

Xem các biến

Các biến mà bạn đã đặt trước đó sẽ được duy trì trong môi trường của bạn. Bạn có thể xem các biến bằng cách chọn Admin > Variables (Quản trị > Biến) trên thanh trình đơn của giao diện người dùng Airflow.

Thẻ List (Danh sách) được chọn và cho thấy một bảng có các khoá và giá trị sau: khoá: gcp_project, giá trị: project-id khoá: gcs_bucket, giá trị: gs://bucket-name khoá: gce_zone, giá trị: zone

Khám phá các lần chạy DAG

Khi bạn tải tệp DAG lên thư mục dags trong Cloud Storage, Cloud Composer sẽ phân tích cú pháp tệp đó. Nếu không có lỗi, tên của quy trình sẽ xuất hiện trong danh sách DAG và quy trình sẽ được đưa vào hàng đợi để chạy ngay lập tức. Để xem DAG, hãy nhấp vào DAG ở đầu trang.

84a29c71f20bff98.png

Nhấp vào composer_hadoop_tutorial để mở trang thông tin chi tiết DAG. Trang này có hình ảnh minh hoạ các nhiệm vụ và phần phụ thuộc trong quy trình làm việc.

f4f1663c7a37f47c.png

Giờ đây, trong thanh công cụ, hãy nhấp vào Chế độ xem biểu đồ rồi di chuột lên hình ảnh của từng việc để xem trạng thái của việc đó. Xin lưu ý rằng đường viền xung quanh mỗi tác vụ cũng cho biết trạng thái (đường viền màu xanh lục = đang chạy; màu đỏ = không thành công, v.v.).

4c5a0c6fa9f88513.png

Cách chạy lại quy trình công việc từ Chế độ xem biểu đồ:

  1. Trong chế độ xem biểu đồ của giao diện người dùng Airflow, hãy nhấp vào hình ảnh create_dataproc_cluster.
  2. Nhấp vào Xoá để đặt lại 3 việc cần làm, rồi nhấp vào OK để xác nhận.

fd1b23b462748f47.png

Bạn cũng có thể kiểm tra trạng thái và kết quả của quy trình composer-hadoop-tutorial bằng cách chuyển đến các trang sau trên GCP Console:

  • Cụm Cloud Dataproc để giám sát quá trình tạo và xoá cụm. Xin lưu ý rằng cụm do quy trình làm việc tạo ra chỉ tồn tại trong thời gian diễn ra quy trình làm việc và sẽ bị xoá trong quá trình thực hiện tác vụ cuối cùng của quy trình làm việc.
  • Công việc Cloud Dataproc để xem hoặc giám sát công việc đếm từ của Apache Hadoop. Nhấp vào ID công việc để xem đầu ra của nhật ký công việc.
  • Cloud Storage Browser để xem kết quả của số lượng từ trong thư mục wordcount trong bộ chứa Cloud Storage mà bạn đã tạo cho lớp học lập trình này.

7. Dọn dẹp

Để tránh phát sinh các khoản phí cho tài khoản GCP của bạn đối với các tài nguyên được dùng trong lớp học lập trình này, hãy làm như sau:

  1. (Không bắt buộc) Để lưu dữ liệu, hãy tải dữ liệu xuống từ bộ chứa Cloud Storage cho môi trường Cloud Composer và bộ chứa lưu trữ mà bạn đã tạo cho lớp học lập trình này.
  2. Xoá bộ chứa Cloud Storage mà bạn đã tạo cho lớp học lập trình này.
  3. Xoá bộ chứa Cloud Storage cho môi trường.
  4. Xoá môi trường Cloud Composer. Xin lưu ý rằng việc xoá môi trường sẽ không xoá bộ chứa lưu trữ cho môi trường đó.

Bạn cũng có thể xoá dự án (không bắt buộc):

  1. Trong Bảng điều khiển của GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.