การเรียกใช้งานจำนวนคำของ Hadoop บนคลัสเตอร์ Dataproc

1. บทนำ

เวิร์กโฟลว์เป็นกรณีการใช้งานทั่วไปในการวิเคราะห์ข้อมูล ซึ่งเกี่ยวข้องกับการนำเข้า การเปลี่ยนรูป และการวิเคราะห์ข้อมูลเพื่อค้นหาข้อมูลที่มีความหมายภายใน ใน Google Cloud Platform เครื่องมือสำหรับจัดระเบียบเวิร์กโฟลว์คือ Cloud Composer ซึ่งเป็นเครื่องมือเวิร์กโฟลว์โอเพนซอร์สยอดนิยม Apache Airflow เวอร์ชันที่โฮสต์ ในแล็บนี้ คุณจะได้ใช้ Cloud Composer เพื่อสร้างเวิร์กโฟลว์อย่างง่ายที่จะสร้างคลัสเตอร์ Cloud Dataproc วิเคราะห์คลัสเตอร์โดยใช้ Cloud Dataproc และ Apache Hadoop จากนั้นลบคลัสเตอร์ Cloud Dataproc ในภายหลัง

Cloud Composer คืออะไร

Cloud Composer เป็นบริการจัดการเวิร์กโฟลว์แบบครบวงจรที่ช่วยให้คุณเขียน กำหนดเวลา และตรวจสอบไปป์ไลน์ที่ครอบคลุมระบบคลาวด์และศูนย์ข้อมูลในองค์กรได้ Cloud Composer สร้างขึ้นจากโปรเจ็กต์โอเพนซอร์ส Apache Airflow ยอดนิยมและดำเนินการโดยใช้ภาษาโปรแกรม Python จึงไม่มีการล็อกอินและใช้งานง่าย

การใช้ Cloud Composer แทนอินสแตนซ์ Apache Airflow ในเครื่องจะช่วยให้ผู้ใช้ได้รับประโยชน์จาก Airflow อย่างเต็มที่โดยไม่ต้องมีค่าใช้จ่ายในการติดตั้งหรือการจัดการ

Apache Airflow คืออะไร

Apache Airflow เป็นเครื่องมือโอเพนซอร์สที่ใช้ในการเขียนแบบเป็นโปรแกรม จัดกำหนดการ และตรวจสอบเวิร์กโฟลว์ คำศัพท์สำคัญบางคำที่ควรทราบเกี่ยวกับ Airflow ซึ่งคุณจะเห็นตลอดทั้งแล็บมีดังนี้

  • DAG - DAG (Directed Acyclic Graph) คือชุดของงานที่จัดระเบียบแล้วซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG ซึ่งเรียกอีกอย่างว่าเวิร์กโฟลว์ จะกำหนดไว้ในไฟล์ Python มาตรฐาน
  • โอเปอเรเตอร์ - โอเปอเรเตอร์อธิบายงานเดียวในเวิร์กโฟลว์

Cloud Dataproc คืออะไร

Cloud Dataproc คือบริการ Apache Spark และ Apache Hadoop ที่มีการจัดการครบวงจรของ Google Cloud Platform Cloud Dataproc ผสานรวมกับบริการอื่นๆ ของ GCP ได้อย่างง่ายดาย ทำให้คุณมีแพลตฟอร์มที่ทรงประสิทธิภาพและสมบูรณ์แบบสำหรับการประมวลผลข้อมูล การวิเคราะห์ และแมชชีนเลิร์นนิง

สิ่งที่คุณต้องทำ

Codelab นี้แสดงวิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer ซึ่งจะทำงานต่อไปนี้ให้เสร็จสมบูรณ์

  • สร้างคลัสเตอร์ Cloud Dataproc
  • เรียกใช้งาน Wordcount ของ Apache Hadoop ในคลัสเตอร์ และส่งออกผลลัพธ์ไปยัง Cloud Storage
  • ลบคลัสเตอร์

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer
  • วิธีใช้ Cloud Composer และ Cloud Dataproc เพื่อเรียกใช้การวิเคราะห์ในชุดข้อมูล
  • วิธีเข้าถึงสภาพแวดล้อม Cloud Composer ผ่าน Google Cloud Platform Console, Cloud SDK และเว็บอินเทอร์เฟซของ Airflow

สิ่งที่คุณต้องมี

  • บัญชี GCP
  • ความรู้พื้นฐานเกี่ยวกับ CLI
  • ความเข้าใจพื้นฐานเกี่ยวกับ Python

2. การตั้งค่า GCP

สร้างโปรเจ็กต์

เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform

จดรหัสโปรเจ็กต์ไว้เพื่อใช้ในขั้นตอนต่อๆ ไป

หากคุณสร้างโปรเจ็กต์ใหม่ คุณจะเห็นรหัสโปรเจ็กต์อยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง

หากสร้างโปรเจ็กต์แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์

เปิดใช้ API

เปิดใช้ Cloud Composer, Cloud Dataproc และ Cloud Storage API เมื่อเปิดใช้แล้ว คุณสามารถข้ามปุ่มที่ระบุว่า "ไปที่ข้อมูลเข้าสู่ระบบ" และไปยังขั้นตอนถัดไปของบทแนะนำได้

สร้างสภาพแวดล้อมคอมโพสเซอร์

สร้างสภาพแวดล้อม Cloud Composer ที่มีการกำหนดค่าต่อไปนี้

  • ชื่อ: my-composer-environment
  • สถานที่ตั้ง: us-central1
  • โซน: us-central1-a

การกำหนดค่าอื่นๆ ทั้งหมดสามารถคงค่าเริ่มต้นไว้ได้ คลิก "สร้าง" ที่ด้านล่าง

สร้างที่เก็บข้อมูล Cloud Storage

สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์ด้วยการกำหนดค่าต่อไปนี้

  • ชื่อ: <your-project-id>
  • คลาสพื้นที่เก็บข้อมูลเริ่มต้น: หลายภูมิภาค
  • สถานที่: สหรัฐอเมริกา
  • โมเดลควบคุมการเข้าถึง: แบบละเอียด

กด "สร้าง" เมื่อพร้อม

3. การตั้งค่า Apache Airflow

การดูข้อมูลสภาพแวดล้อมของ Composer

ในคอนโซล GCP ให้เปิดหน้าสภาพแวดล้อม

คลิกชื่อสภาพแวดล้อมเพื่อดูรายละเอียด

หน้ารายละเอียดสภาพแวดล้อมจะแสดงข้อมูล เช่น URL ของอินเทอร์เฟซเว็บ Airflow, รหัสคลัสเตอร์ Google Kubernetes Engine, ชื่อของที่เก็บข้อมูล Cloud Storage และเส้นทางสำหรับโฟลเดอร์ /dags

ใน Airflow DAG (Directed Acyclic Graph) คือชุดของงานที่จัดระเบียบแล้วซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG หรือที่เรียกว่าเวิร์กโฟลว์จะกำหนดไว้ในไฟล์ Python มาตรฐาน Cloud Composer จะกำหนดเวลาเฉพาะ DAG ในโฟลเดอร์ /dags โฟลเดอร์ /dags อยู่ในที่เก็บข้อมูล Cloud Storage ที่ Cloud Composer สร้างขึ้นโดยอัตโนมัติเมื่อคุณสร้างสภาพแวดล้อม

การตั้งค่าตัวแปรสภาพแวดล้อมของ Apache Airflow

ตัวแปร Apache Airflow เป็นแนวคิดเฉพาะของ Airflow ซึ่งแตกต่างจากตัวแปรสภาพแวดล้อม ในขั้นตอนนี้ คุณจะต้องตั้งค่าตัวแปร Airflow 3 รายการต่อไปนี้ ได้แก่ gcp_project, gcs_bucket และ gce_zone

การใช้ gcloud เพื่อตั้งค่าตัวแปร

ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งไว้ให้คุณอย่างสะดวก

ตั้งค่าตัวแปรสภาพแวดล้อม COMPOSER_INSTANCE เป็นชื่อสภาพแวดล้อมของ Composer

COMPOSER_INSTANCE=my-composer-environment

หากต้องการตั้งค่าตัวแปร Airflow โดยใช้เครื่องมือบรรทัดคำสั่ง gcloud ให้ใช้คำสั่ง gcloud composer environments run กับคำสั่งย่อย variables gcloud composer คำสั่งนี้จะเรียกใช้คำสั่งย่อยของ Airflow CLI variables คำสั่งย่อยจะส่งอาร์กิวเมนต์ไปยังเครื่องมือบรรทัดคำสั่ง gcloud

คุณจะเรียกใช้คำสั่งนี้ 3 ครั้ง โดยแทนที่ตัวแปรด้วยตัวแปรที่เกี่ยวข้องกับโปรเจ็กต์

ตั้งค่า gcp_project โดยใช้คำสั่งต่อไปนี้ โดยแทนที่ <your-project-id> ด้วยรหัสโปรเจ็กต์ที่คุณจดไว้ในขั้นตอนที่ 2

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

เอาต์พุตจะมีลักษณะดังนี้

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

ตั้งค่า gcs_bucket โดยใช้คำสั่งต่อไปนี้ แทนที่ <your-bucket-name> ด้วยรหัส Bucket ที่คุณจดไว้ในขั้นตอนที่ 2 หากคุณทำตามคำแนะนำของเรา ชื่อที่เก็บข้อมูลจะเหมือนกับรหัสโปรเจ็กต์ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

ตั้งค่า gce_zone โดยใช้คำสั่งต่อไปนี้ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(ไม่บังคับ) การใช้ gcloud เพื่อดูตัวแปร

หากต้องการดูค่าของตัวแปร ให้เรียกใช้คำสั่งย่อยของ Airflow CLI variables ด้วยอาร์กิวเมนต์ get หรือใช้ Airflow UI

เช่น

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

คุณทำได้โดยใช้ตัวแปร 3 ตัวที่คุณเพิ่งตั้งค่า ได้แก่ gcp_project, gcs_bucket และ gce_zone

4. เวิร์กโฟลว์ตัวอย่าง

มาดูโค้ดสำหรับ DAG ที่เราจะใช้ในขั้นตอนที่ 5 กัน คุณยังไม่ต้องกังวลเรื่องการดาวน์โหลดไฟล์ เพียงทำตามขั้นตอนที่นี่

มีหลายเรื่องที่ต้องชี้แจงในกรณีนี้ เรามาเจาะลึกกันหน่อย

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

เราจะเริ่มต้นด้วยการนำเข้า Airflow ดังนี้

  • airflow.models - ช่วยให้เราเข้าถึงและสร้างข้อมูลในฐานข้อมูล Airflow ได้
  • airflow.contrib.operators - ที่ซึ่งผู้ปฏิบัติงานจากชุมชนอาศัยอยู่ ในกรณีนี้ เราต้องใช้ dataproc_operator เพื่อเข้าถึง Cloud Dataproc API
  • airflow.utils.trigger_rule - สำหรับการเพิ่มกฎทริกเกอร์ให้กับโอเปอเรเตอร์ของเรา กฎทริกเกอร์ช่วยให้ควบคุมได้อย่างละเอียดว่าผู้ดำเนินการควรดำเนินการตามสถานะขององค์ประกอบระดับบนสุดหรือไม่
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

ซึ่งจะระบุตำแหน่งของไฟล์เอาต์พุต บรรทัดที่สำคัญในที่นี้คือ models.Variable.get('gcs_bucket') ซึ่งจะดึงค่าตัวแปร gcs_bucket จากฐานข้อมูล Airflow

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - ตำแหน่งของไฟล์ .jar ที่เราจะเรียกใช้ในคลัสเตอร์ Cloud Dataproc ในที่สุด เราโฮสต์ไว้ใน GCP ให้คุณแล้ว
  • input_file - ตำแหน่งของไฟล์ที่มีข้อมูลที่งาน Hadoop จะคำนวณในที่สุด เราจะอัปโหลดข้อมูลไปยังตำแหน่งดังกล่าวพร้อมกันในขั้นตอนที่ 5
  • wordcount_args - อาร์กิวเมนต์ที่เราจะส่งไปยังไฟล์ JAR
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

ซึ่งจะทำให้เราได้ออบเจ็กต์ datetime ที่เทียบเท่ากับเวลาเที่ยงคืนของวันก่อนหน้า เช่น หากดำเนินการนี้ในเวลา 11:00 น. ของวันที่ 4 มีนาคม ออบเจ็กต์ datetime จะแสดงเวลา 00:00 น. ของวันที่ 3 มีนาคม ซึ่งเกี่ยวข้องกับวิธีที่ Airflow จัดการการกำหนดเวลา ดูข้อมูลเพิ่มเติมเกี่ยวกับเรื่องนี้ได้ที่นี่

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

คุณควรระบุตัวแปร default_dag_args ในรูปแบบของพจนานุกรมทุกครั้งที่สร้าง DAG ใหม่

  • 'email_on_failure' - ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่องานล้มเหลวหรือไม่
  • 'email_on_retry' - ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่อมีการลองทำงานอีกครั้งหรือไม่
  • 'retries' - ระบุจำนวนครั้งที่ Airflow ควรพยายามลองอีกครั้งในกรณีที่ DAG ล้มเหลว
  • 'retry_delay' - ระบุระยะเวลาที่ Airflow ควรรอก่อนที่จะลองอีกครั้ง
  • 'project_id' - บอก DAG ว่าจะเชื่อมโยงกับรหัสโปรเจ็กต์ GCP ใด ซึ่งจะต้องใช้ในภายหลังกับ Dataproc Operator
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

การใช้ with models.DAG จะบอกให้สคริปต์รวมทุกอย่างที่อยู่ด้านล่างไว้ใน DAG เดียวกัน นอกจากนี้ เรายังเห็นอาร์กิวเมนต์ 3 รายการที่ส่งเข้ามาด้วย

  • ตัวแรกคือสตริง ซึ่งเป็นชื่อที่จะตั้งให้กับ DAG ที่เรากำลังสร้าง ในกรณีนี้ เราใช้ composer_hadoop_tutorial
  • schedule_interval - ออบเจ็กต์ datetime.timedelta ซึ่งในที่นี้เราตั้งค่าเป็น 1 วัน ซึ่งหมายความว่า DAG นี้จะพยายามดำเนินการวันละครั้งหลังจาก 'start_date' ที่ตั้งค่าไว้ก่อนหน้านี้ใน 'default_dag_args'
  • default_args - พจนานุกรมที่เราสร้างไว้ก่อนหน้านี้ซึ่งมีอาร์กิวเมนต์เริ่มต้นสำหรับ DAG

สร้างคลัสเตอร์ Dataproc

จากนั้นเราจะสร้าง dataproc_operator.DataprocClusterCreateOperator ซึ่งจะสร้างคลัสเตอร์ Cloud Dataproc

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

ในโอเปอเรเตอร์นี้ เราจะเห็นอาร์กิวเมนต์ 2-3 รายการ ซึ่งทั้งหมดนี้ยกเว้นรายการแรกเป็นอาร์กิวเมนต์เฉพาะสำหรับโอเปอเรเตอร์นี้

  • task_id - เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งดูได้จาก UI ของ Airflow
  • cluster_name - ชื่อที่เรากำหนดให้กับคลัสเตอร์ Cloud Dataproc ในที่นี้ เราตั้งชื่อว่า composer-hadoop-tutorial-cluster-{{ ds_nodash }} (ดูข้อมูลเพิ่มเติมที่ไม่บังคับในกล่องข้อมูล)
  • num_workers - จำนวนผู้ปฏิบัติงานที่เราจัดสรรให้กับคลัสเตอร์ Cloud Dataproc
  • zone - ภูมิภาคทางภูมิศาสตร์ที่เราต้องการให้คลัสเตอร์อยู่ ซึ่งบันทึกไว้ในฐานข้อมูล Airflow ซึ่งจะอ่านตัวแปร 'gce_zone' ที่เราตั้งค่าไว้ในขั้นตอนที่ 3
  • master_machine_type - ประเภทเครื่องที่เราต้องการจัดสรรให้กับมาสเตอร์ Cloud Dataproc
  • worker_machine_type - ประเภทเครื่องที่เราต้องการจัดสรรให้กับผู้ปฏิบัติงาน Cloud Dataproc

ส่งงาน Apache Hadoop

dataproc_operator.DataProcHadoopOperator ช่วยให้เราส่งงานไปยังคลัสเตอร์ Cloud Dataproc ได้

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

เรามีพารามิเตอร์หลายรายการ ดังนี้

  • task_id - ชื่อที่เรากำหนดให้กับส่วนนี้ของ DAG
  • main_jar - ตำแหน่งของไฟล์ .jar ที่เราต้องการเรียกใช้กับคลัสเตอร์
  • cluster_name - ชื่อของคลัสเตอร์ที่จะเรียกใช้งานเทียบกับงาน ซึ่งคุณจะเห็นว่าเหมือนกับที่เราพบในตัวดำเนินการก่อนหน้า
  • arguments - อาร์กิวเมนต์ที่ส่งผ่านไปยังไฟล์ jar เหมือนกับที่คุณจะทำหากเรียกใช้ไฟล์ .jar จากบรรทัดคำสั่ง

ลบคลัสเตอร์

ตัวดำเนินการสุดท้ายที่เราจะสร้างคือ dataproc_operator.DataprocClusterDeleteOperator

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

ดังที่ชื่อระบุ โอเปอเรเตอร์นี้จะลบคลัสเตอร์ Cloud Dataproc ที่ระบุ เราเห็นข้อโต้แย้ง 3 ประการที่นี่

  • task_id - เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งดูได้จาก UI ของ Airflow
  • cluster_name - ชื่อที่เรากำหนดให้กับคลัสเตอร์ Cloud Dataproc ในที่นี้ เราตั้งชื่อว่า composer-hadoop-tutorial-cluster-{{ ds_nodash }} (ดูข้อมูลเพิ่มเติมที่ไม่บังคับได้ในกล่องข้อมูลหลังจาก "สร้างคลัสเตอร์ Dataproc")
  • trigger_rule - เราได้กล่าวถึงกฎทริกเกอร์สั้นๆ ในระหว่างการนำเข้าที่จุดเริ่มต้นของขั้นตอนนี้ แต่ที่นี่เรามีกฎทริกเกอร์ที่ใช้งานจริง โดยค่าเริ่มต้น โอเปอเรเตอร์ Airflow จะไม่ดำเนินการจนกว่าโอเปอเรเตอร์ต้นทางทั้งหมดจะทำงานเสร็จสมบูรณ์ ALL_DONE กฎทริกเกอร์กำหนดให้โอเปอเรเตอร์ต้นทางทั้งหมดต้องเสร็จสมบูรณ์ ไม่ว่าโอเปอเรเตอร์เหล่านั้นจะทำงานสำเร็จหรือไม่ก็ตาม ในกรณีนี้ หมายความว่าแม้ว่างาน Hadoop จะล้มเหลว เราก็ยังต้องการปิดคลัสเตอร์
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

สุดท้ายนี้ เราต้องการให้โอเปอเรเตอร์เหล่านี้ทำงานตามลำดับที่เฉพาะเจาะจง และเราสามารถระบุได้โดยใช้โอเปอเรเตอร์การเลื่อนบิตของ Python ในกรณีนี้ create_dataproc_cluster จะทํางานก่อนเสมอ ตามด้วย run_dataproc_hadoop และสุดท้ายคือ delete_dataproc_cluster

เมื่อรวมทุกอย่างเข้าด้วยกัน โค้ดจะมีลักษณะดังนี้

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. อัปโหลดไฟล์ Airflow ไปยัง Cloud Storage

คัดลอก DAG ไปยังโฟลเดอร์ /dags

  1. ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งไว้ให้คุณอย่างสะดวก
  2. โคลนที่เก็บตัวอย่าง Python และเปลี่ยนเป็นไดเรกทอรี composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งชื่อโฟลเดอร์ DAG เป็นตัวแปรสภาพแวดล้อม
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. เรียกใช้gsutilคำสั่งต่อไปนี้เพื่อคัดลอกโค้ดบทแนะนำไปยังตำแหน่งที่สร้างโฟลเดอร์ /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

เอาต์พุตจะมีลักษณะดังนี้

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. การใช้ UI ของ Airflow

วิธีเข้าถึงอินเทอร์เฟซเว็บของ Airflow โดยใช้คอนโซล GCP

  1. เปิดหน้าสภาพแวดล้อม
  2. ในคอลัมน์ Airflow webserver ของสภาพแวดล้อม ให้คลิกไอคอนหน้าต่างใหม่ UI เว็บของ Airflow จะเปิดขึ้นในหน้าต่างเบราว์เซอร์ใหม่

ดูข้อมูลเกี่ยวกับ UI ของ Airflow ได้ที่การเข้าถึงอินเทอร์เฟซเว็บ

ดูตัวแปร

ระบบจะเก็บตัวแปรที่คุณตั้งค่าไว้ก่อนหน้านี้ไว้ในสภาพแวดล้อม คุณดูตัวแปรได้โดยเลือกผู้ดูแลระบบ > ตัวแปรจากแถบเมนู UI ของ Airflow

เลือกแท็บรายการและแสดงตารางที่มีคีย์และค่าต่อไปนี้ คีย์: gcp_project, ค่า: project-id คีย์: gcs_bucket, ค่า: gs://bucket-name คีย์: gce_zone, ค่า: zone

สำรวจการเรียกใช้ DAG

เมื่ออัปโหลดไฟล์ DAG ไปยังโฟลเดอร์ dags ใน Cloud Storage แล้ว Cloud Composer จะแยกวิเคราะห์ไฟล์ หากไม่พบข้อผิดพลาด ชื่อของเวิร์กโฟลว์จะปรากฏในรายการ DAG และเวิร์กโฟลว์จะเข้าคิวเพื่อเรียกใช้ทันที หากต้องการดู DAG ให้คลิก DAG ที่ด้านบนของหน้า

84a29c71f20bff98.png

คลิก composer_hadoop_tutorial เพื่อเปิดหน้ารายละเอียด DAG หน้านี้มีภาพกราฟิกที่แสดงถึงงานในเวิร์กโฟลว์และสิ่งที่ต้องใช้

f4f1663c7a37f47c.png

ตอนนี้ในแถบเครื่องมือ ให้คลิกมุมมองกราฟ แล้ววางเมาส์เหนือกราฟิกของแต่ละงานเพื่อดูสถานะ โปรดทราบว่าเส้นขอบรอบๆ แต่ละงานจะระบุสถานะด้วย (เส้นขอบสีเขียว = กำลังทำงาน, สีแดง = ไม่สำเร็จ ฯลฯ)

4c5a0c6fa9f88513.png

วิธีเรียกใช้เวิร์กโฟลว์อีกครั้งจากมุมมองกราฟ

  1. ในมุมมองกราฟของ UI ของ Airflow ให้คลิกกราฟิก create_dataproc_cluster
  2. คลิกล้างเพื่อรีเซ็ตงานทั้ง 3 รายการ แล้วคลิกตกลงเพื่อยืนยัน

fd1b23b462748f47.png

นอกจากนี้ คุณยังตรวจสอบสถานะและผลลัพธ์ของcomposer-hadoop-tutorialเวิร์กโฟลว์ได้โดยไปที่หน้า GCP Console ต่อไปนี้

  • คลัสเตอร์ Cloud Dataproc เพื่อตรวจสอบการสร้างและการลบคลัสเตอร์ โปรดทราบว่าคลัสเตอร์ที่สร้างโดยเวิร์กโฟลว์เป็นคลัสเตอร์ชั่วคราว ซึ่งจะอยู่เฉพาะในช่วงระยะเวลาของเวิร์กโฟลว์และจะถูกลบเป็นส่วนหนึ่งของงานเวิร์กโฟลว์สุดท้าย
  • งาน Cloud Dataproc เพื่อดูหรือตรวจสอบงาน Wordcount ของ Apache Hadoop คลิกรหัสงานเพื่อดูเอาต์พุตบันทึกของงาน
  • เบราว์เซอร์ Cloud Storage เพื่อดูผลลัพธ์ของ Wordcount ในโฟลเดอร์ wordcount ใน Bucket ของ Cloud Storage ที่คุณสร้างขึ้นสำหรับ Codelab นี้

7. ล้างข้อมูล

โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ใน Codelab นี้

  1. (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจาก Bucket ของ Cloud Storage สำหรับสภาพแวดล้อม Cloud Composer และ Bucket ที่เก็บข้อมูลที่คุณสร้างขึ้นสำหรับ Codelab นี้
  2. ลบ Bucket ของ Cloud Storage ที่คุณสร้างขึ้นสำหรับ Codelab นี้
  3. ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อม
  4. ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบบัคเก็ตพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อม

นอกจากนี้ คุณยังเลือกที่จะลบโปรเจกต์ได้ด้วย โดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. พิมพ์รหัสโปรเจ็กต์ในช่อง แล้วคลิกปิดเพื่อลบโปรเจ็กต์