การเตรียมข้อมูล BigQuery ก่อนประมวลผลด้วย PySpark ใน Dataproc

1. ภาพรวม

โค้ดแล็บนี้จะอธิบายวิธีสร้างไปป์ไลน์การประมวลผลข้อมูลโดยใช้ Apache Spark กับ Dataproc ใน Google Cloud Platform การอ่านข้อมูลจากตำแหน่งพื้นที่เก็บข้อมูลหนึ่ง เปลี่ยนรูปแบบข้อมูล และเขียนลงในตำแหน่งพื้นที่เก็บข้อมูลอื่นเป็นกรณีการใช้งานที่พบได้ทั่วไปในวิทยาการข้อมูลและวิศวกรข้อมูล การเปลี่ยนรูปแบบที่พบได้ทั่วไป ได้แก่ การเปลี่ยนเนื้อหาของข้อมูล การนำข้อมูลที่ไม่จําเป็นออก และการเปลี่ยนประเภทไฟล์

ในโค้ดแล็บนี้ คุณจะได้เรียนรู้เกี่ยวกับ Apache Spark, เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้ Dataproc พร้อม PySpark (Python API ของ Apache Spark), BigQuery, Google Cloud Storage และข้อมูลจาก Reddit

2. ข้อมูลเบื้องต้นเกี่ยวกับ Apache Spark (ไม่บังคับ)

เว็บไซต์ระบุว่า "Apache Spark เป็นเครื่องมือวิเคราะห์แบบรวมสําหรับการประมวลผลข้อมูลขนาดใหญ่" ซึ่งช่วยให้คุณวิเคราะห์และประมวลผลข้อมูลแบบขนานและในหน่วยความจําได้ ซึ่งจะทําให้สามารถคํานวณแบบขนานจํานวนมากในเครื่องและโหนดต่างๆ ได้ เฟรมเวิร์กนี้เปิดตัวครั้งแรกในปี 2014 เป็นการอัปเกรด MapReduce แบบดั้งเดิม และยังคงเป็นหนึ่งในเฟรมเวิร์กยอดนิยมสําหรับการประมวลผลขนาดใหญ่ Apache Spark เขียนด้วย Scala และมี API ใน Scala, Java, Python และ R โดยประกอบด้วยไลบรารีมากมาย เช่น Spark SQL สําหรับการค้นหา SQL ในข้อมูล, Spark Streaming สําหรับสตรีมมิงข้อมูล, MLlib สําหรับแมชชีนเลิร์นนิง และ GraphX สําหรับการประมวลผลกราฟ ซึ่งทั้งหมดนี้ทํางานบนเครื่องมือ Apache Spark

32add0b6a47bafbc.png

Spark สามารถทํางานได้ด้วยตัวเองหรือจะใช้ประโยชน์จากบริการจัดการทรัพยากร เช่น Yarn, Mesos หรือ Kubernetes เพื่อปรับขนาดก็ได้ คุณจะใช้ Dataproc สําหรับโค้ดแล็บนี้ ซึ่งใช้ Yarn

เดิมข้อมูลใน Spark จะโหลดลงในหน่วยความจําเป็น RDD หรือชุดข้อมูลที่กระจายแบบยืดหยุ่น การพัฒนาใน Spark มีการเพิ่มประเภทข้อมูลแบบคอลัมน์ใหม่ 2 ประเภท ได้แก่ ชุดข้อมูลที่มีการจัดประเภทและ Dataframe ที่ไม่มีการจัดประเภท กล่าวโดยคร่าวๆ คือ RDD เหมาะสําหรับข้อมูลทุกประเภท ส่วนชุดข้อมูลและ DataFrame เพิ่มประสิทธิภาพสําหรับข้อมูลตาราง เนื่องจากชุดข้อมูลใช้ได้กับ Java และ Scala API เท่านั้น เราจะใช้ PySpark Dataframe API สําหรับโค้ดแล็บนี้ ดูข้อมูลเพิ่มเติมได้ที่เอกสารประกอบของ Apache Spark

3. กรณีการใช้งาน

วิศวกรข้อมูลมักต้องการให้นักวิทยาศาสตร์ข้อมูลเข้าถึงข้อมูลได้ง่ายๆ อย่างไรก็ตาม ข้อมูลมักจะไม่สะอาดในตอนแรก (ใช้งานวิเคราะห์ได้ยากในสถานะปัจจุบัน) และต้องได้รับการทำความสะอาดก่อนจึงจะมีประโยชน์มาก ตัวอย่างของกรณีนี้คือข้อมูลที่คัดลอกมาจากเว็บซึ่งอาจมีการเข้ารหัสที่แปลกประหลาดหรือแท็ก HTML ที่ไม่เกี่ยวข้อง

ในบทนี้ คุณจะโหลดชุดข้อมูลจาก BigQuery ในรูปแบบโพสต์ Reddit ไปยังคลัสเตอร์ Spark ที่โฮสต์ใน Dataproc, ดึงข้อมูลที่เป็นประโยชน์ และจัดเก็บข้อมูลที่ประมวลผลแล้วเป็นไฟล์ CSV ที่บีบอัดใน Google Cloud Storage

be2a4551ece63bfc.png

นักวิทยาศาสตร์ข้อมูลระดับอาวุโสของบริษัทคุณสนใจให้ทีมแก้ปัญหาการประมวลผลภาษาที่เป็นธรรมชาติ โดยเฉพาะอย่างยิ่ง ลูกค้ารายนี้สนใจวิเคราะห์ข้อมูลในฟอรัม "r/food" คุณจะต้องสร้างไปป์ไลน์สําหรับการถ่ายโอนข้อมูลโดยเริ่มจากการทดแทนข้อมูลตั้งแต่เดือนมกราคม 2017 ถึงเดือนสิงหาคม 2019

4. การเข้าถึง BigQuery ผ่าน BigQuery Storage API

การดึงข้อมูลจาก BigQuery โดยใช้เมธอด tabledata.list API อาจใช้เวลานานและไม่มีประสิทธิภาพเมื่อปริมาณข้อมูลเพิ่มขึ้น เมธอดนี้จะแสดงรายการออบเจ็กต์ JSON และต้องอ่านทีละหน้าตามลําดับเพื่ออ่านชุดข้อมูลทั้งหมด

BigQuery Storage API มีการปรับปรุงที่สำคัญในการเข้าถึงข้อมูลใน BigQuery โดยใช้โปรโตคอลที่อิงตาม RPC โดยรองรับการอ่านและเขียนข้อมูลพร้อมกัน รวมถึงรูปแบบการแปลงเป็นอนุกรมรูปแบบต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง การดำเนินการนี้จะช่วยให้ประสิทธิภาพดีขึ้นอย่างมาก โดยเฉพาะกับชุดข้อมูลขนาดใหญ่

ในโค้ดแล็บนี้ คุณจะใช้ spark-bigquery-connector ในการอ่านและเขียนข้อมูลระหว่าง BigQuery กับ Spark

5. การสร้างโปรเจ็กต์

ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com และสร้างโปรเจ็กต์ใหม่โดยทำดังนี้

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud

ค่าใช้จ่ายในการทํา Codelab นี้ไม่ควรเกิน 200 บาท แต่อาจมากกว่านั้นหากคุณตัดสินใจใช้ทรัพยากรเพิ่มเติมหรือปล่อยไว้ให้ทํางาน ส่วนสุดท้ายของ Codelab นี้จะแนะนำวิธีล้างข้อมูลโปรเจ็กต์

ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรีมูลค่า$300

6. การตั้งค่าสภาพแวดล้อม

ตอนนี้คุณจะต้องตั้งค่าสภาพแวดล้อมโดยทำดังนี้

  • การเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
  • การกำหนดการตั้งค่าโปรเจ็กต์
  • การสร้างคลัสเตอร์ Dataproc
  • การสร้างที่เก็บข้อมูล Google Cloud Storage

การเปิดใช้ API และการกำหนดค่าสภาพแวดล้อม

เปิด Cloud Shell โดยกดปุ่มที่มุมขวาบนของ Cloud Console

a10c47ee6ca41c54.png

หลังจากโหลด Cloud Shell แล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

ตั้งค่ารหัสโปรเจ็กต์ของโปรเจ็กต์ โดยไปที่หน้าการเลือกโปรเจ็กต์แล้วค้นหาโปรเจ็กต์ ซึ่งอาจไม่ใช่ชื่อเดียวกับโปรเจ็กต์

e682e8227aa3c781.png

76d45fb295728542.png

เรียกใช้คําสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์

gcloud config set project <project_id>

ตั้งค่าภูมิภาคของโปรเจ็กต์โดยเลือกจากรายการที่นี่ ตัวอย่างเช่น us-central1

gcloud config set dataproc/region <region>

เลือกชื่อสำหรับคลัสเตอร์ Dataproc และสร้างตัวแปรสภาพแวดล้อมสำหรับคลัสเตอร์

CLUSTER_NAME=<cluster_name>

การสร้างคลัสเตอร์ Dataproc

สร้างคลัสเตอร์ Dataproc โดยเรียกใช้คําสั่งต่อไปนี้

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

คำสั่งนี้จะใช้เวลาสักครู่จึงจะเสร็จสมบูรณ์ รายละเอียดของคำสั่งมีดังนี้

ซึ่งจะเป็นการเริ่มสร้างคลัสเตอร์ Dataproc ด้วยชื่อที่คุณระบุไว้ก่อนหน้านี้ การใช้ beta API จะเปิดใช้ฟีเจอร์เบต้าของ Dataproc เช่น Component Gateway

gcloud beta dataproc clusters create ${CLUSTER_NAME}

ซึ่งจะตั้งค่าประเภทเครื่องที่จะใช้สำหรับผู้ปฏิบัติงาน

--worker-machine-type n1-standard-8

ซึ่งจะเป็นการกำหนดจำนวนผู้ปฏิบัติงานของคลัสเตอร์

--num-workers 8

ซึ่งจะเป็นการกำหนดเวอร์ชันอิมเมจของ Dataproc

--image-version 1.5-debian

ซึ่งจะกําหนดค่าการดำเนินการเริ่มต้นที่จะใช้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังรวมการดําเนินการเริ่มต้น pip

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

นี่คือข้อมูลเมตาที่จะรวมไว้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังระบุข้อมูลเมตาสําหรับpipการดำเนินการเริ่มต้น

--metadata 'PIP_PACKAGES=google-cloud-storage'

ซึ่งจะเป็นการตั้งค่าให้ติดตั้งคอมโพเนนต์ที่ไม่บังคับในคลัสเตอร์

--optional-components=ANACONDA

ซึ่งจะเปิดใช้เกตเวย์คอมโพเนนต์ที่ช่วยให้คุณใช้เกตเวย์คอมโพเนนต์ของ Dataproc เพื่อดู UI ทั่วไป เช่น Zeppelin, Jupyter หรือประวัติ Spark ได้

--enable-component-gateway

ดูข้อมูลเบื้องต้นที่ละเอียดยิ่งขึ้นเกี่ยวกับ Dataproc ได้ที่ codelab นี้

การสร้างที่เก็บข้อมูล Google Cloud Storage

คุณจะต้องมีที่เก็บข้อมูล Google Cloud Storage สำหรับเอาต์พุตของงาน กำหนดชื่อที่ไม่ซ้ำกันสำหรับที่เก็บข้อมูล แล้วเรียกใช้คำสั่งต่อไปนี้เพื่อสร้างที่เก็บข้อมูลใหม่ ชื่อที่เก็บข้อมูลต้องไม่ซ้ำกันสำหรับผู้ใช้ทุกคนในโปรเจ็กต์ Google Cloud ทั้งหมด คุณจึงอาจต้องลองดำเนินการนี้หลายครั้งโดยใช้ชื่ออื่น ระบบจะสร้างที่เก็บข้อมูลสำเร็จหากคุณไม่ได้รับ ServiceException

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. การวิเคราะห์ข้อมูลเชิงสำรวจ

ก่อนทำการเตรียมข้อมูลเบื้องต้น คุณควรศึกษาเพิ่มเติมเกี่ยวกับลักษณะของข้อมูลที่คุณจัดการ โดยคุณจะได้สํารวจวิธีการสํารวจข้อมูล 2 วิธี ก่อนอื่น คุณจะเห็นข้อมูลดิบบางส่วนโดยใช้เว็บ UI ของ BigQuery จากนั้นจะคํานวณจํานวนโพสต์ต่อแต่ละฟอรัมย่อยโดยใช้ PySpark และ Dataproc

การใช้เว็บ UI ของ BigQuery

เริ่มต้นด้วยการใช้เว็บ UI ของ BigQuery เพื่อดูข้อมูล จากไอคอนเมนูในคอนโซลระบบคลาวด์ ให้เลื่อนลงแล้วกด "BigQuery" เพื่อเปิด UI ทางเว็บของ BigQuery

242a597d7045b4da.png

จากนั้นเรียกใช้คําสั่งต่อไปนี้ในเครื่องมือแก้ไขคําค้นหาของ UI ทางเว็บ BigQuery ซึ่งจะแสดงผลข้อมูล 10 แถวเต็มๆ ตั้งแต่เดือนมกราคม 2017

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

คุณสามารถเลื่อนหน้าเพื่อดูคอลัมน์ทั้งหมดที่ใช้ได้ รวมถึงดูตัวอย่างบางส่วน โดยเฉพาะอย่างยิ่ง คุณจะเห็น 2 คอลัมน์ที่แสดงเนื้อหาที่เป็นข้อความของโพสต์แต่ละรายการ ได้แก่ "title" และ "selftext" โดยคอลัมน์หลังคือเนื้อหาของโพสต์ และดูคอลัมน์อื่นๆ ด้วย เช่น "created_utc" ซึ่งเป็นเวลา UTC ที่สร้างโพสต์ และ "subreddit" ซึ่งเป็นชื่อของ subreddit ที่โพสต์อยู่

การดำเนินการกับงาน PySpark

เรียกใช้คําสั่งต่อไปนี้ใน Cloud Shell เพื่อโคลนที่เก็บที่มีโค้ดตัวอย่างและ cd ไปยังไดเรกทอรีที่ถูกต้อง

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

คุณสามารถใช้ PySpark เพื่อระบุจํานวนโพสต์ที่มีอยู่สําหรับแต่ละฟอรัมย่อยได้ คุณสามารถเปิด Cloud Editor และอ่านสคริปต์ cloud-dataproc/codelabs/spark-bigquery ก่อนที่จะเรียกใช้ในขั้นตอนถัดไป

5d965c6fb66dbd81.png

797cf71de3449bdb.png

คลิกปุ่ม "เปิดเทอร์มินัล" ในเครื่องมือแก้ไขบนระบบคลาวด์เพื่อเปลี่ยนกลับไปที่ Cloud Shell แล้วเรียกใช้คําสั่งต่อไปนี้เพื่อดําเนินการงาน PySpark แรก

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

คำสั่งนี้ช่วยให้คุณส่งงานไปยัง Dataproc ผ่าน Jobs API ได้ ในส่วนนี้คุณกําลังระบุประเภทงานเป็น pyspark คุณสามารถระบุชื่อคลัสเตอร์ พารามิเตอร์ที่ไม่บังคับ และชื่อไฟล์ที่มีงาน ในส่วนนี้ คุณกําลังระบุพารามิเตอร์ --jars ซึ่งช่วยให้คุณรวม spark-bigquery-connector ไว้ในงานได้ นอกจากนี้ คุณยังตั้งค่าระดับเอาต์พุตบันทึกได้โดยใช้ --driver-log-levels root=FATAL ซึ่งจะระงับเอาต์พุตบันทึกทั้งหมดยกเว้นข้อผิดพลาด บันทึกของ Spark มีแนวโน้มที่จะค่อนข้างมีข้อมูลมากเกินไป

การดำเนินการนี้จะใช้เวลา 2-3 นาที และเอาต์พุตสุดท้ายควรมีลักษณะดังนี้

6c185228db47bb18.png

8. การสำรวจ UI ของ Dataproc และ Spark

เมื่อเรียกใช้งาน Spark ใน Dataproc คุณจะมีสิทธิ์เข้าถึง UI 2 แบบเพื่อตรวจสอบสถานะงาน / คลัสเตอร์ รายการแรกคือ UI ของ Dataproc ซึ่งคุณจะเห็นได้โดยคลิกไอคอนเมนูแล้วเลื่อนลงไปที่ Dataproc ในส่วนนี้ คุณจะเห็นหน่วยความจำที่ใช้ได้ในปัจจุบัน รวมถึงหน่วยความจำที่รอดำเนินการและจำนวนผู้ปฏิบัติงาน

6f2987346d15c8e2.png

นอกจากนี้ คุณยังคลิกแท็บงานเพื่อดูงานที่เสร็จแล้วได้ด้วย คุณดูรายละเอียดงาน เช่น บันทึกและเอาต์พุตของงานเหล่านั้นได้โดยคลิกรหัสงานของงานที่ต้องการ 114d90129b0e4c88.png

1b2160f0f484594a.png

นอกจากนี้ คุณยังดู UI ของ Spark ได้ด้วย จากหน้างาน ให้คลิกลูกศรย้อนกลับ แล้วคลิก "อินเทอร์เฟซเว็บ" คุณควรเห็นตัวเลือกหลายรายการในส่วนเกตเวย์คอมโพเนนต์ คุณสามารถเปิดใช้ฟีเจอร์เหล่านี้ได้หลายรายการผ่านคอมโพเนนต์ที่ไม่บังคับเมื่อตั้งค่าคลัสเตอร์ สําหรับห้องทดลองนี้ ให้คลิก "เซิร์ฟเวอร์ประวัติของ Spark

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

ซึ่งจะเป็นการเปิดหน้าต่างต่อไปนี้

8f6786760f994fe8.png

งานที่ทำเสร็จแล้วทั้งหมดจะแสดงที่นี่ และคุณสามารถคลิก application_id ใดก็ได้เพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับงาน ในทํานองเดียวกัน คุณสามารถคลิก "แสดงคําขอที่ยังไม่เสร็จสมบูรณ์" ที่ด้านล่างสุดของหน้า Landing Page เพื่อดูงานที่ทํางานอยู่ทั้งหมด

9. เรียกใช้งานทดแทน

ตอนนี้คุณจะต้องเรียกใช้งานที่โหลดข้อมูลลงในหน่วยความจํา ดึงข้อมูลที่จําเป็น และส่งออกเอาต์พุตไปยังที่เก็บข้อมูล Google Cloud Storage คุณจะดึงข้อมูล "ชื่อ" "เนื้อหา" (ข้อความดิบ) และ "การประทับเวลาที่สร้าง" สำหรับความคิดเห็น Reddit แต่ละรายการ จากนั้นให้นำข้อมูลนี้ไปแปลงเป็น CSV, ใส่ลงในไฟล์ ZIP และโหลดลงในที่เก็บข้อมูลที่มี URI ของ gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz

คุณสามารถไปที่ Cloud Editor อีกครั้งเพื่ออ่านโค้ดของ cloud-dataproc/codelabs/spark-bigquery/backfill.sh ซึ่งเป็นสคริปต์ Wrapper เพื่อเรียกใช้โค้ดใน cloud-dataproc/codelabs/spark-bigquery/backfill.py

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

คุณควรเห็นข้อความแจ้งว่างานเสร็จสมบูรณ์หลายรายการในไม่ช้า การดำเนินการอาจใช้เวลาถึง 15 นาทีจึงจะเสร็จสมบูรณ์ นอกจากนี้ คุณยังตรวจสอบที่เก็บข้อมูลอีกครั้งเพื่อยืนยันว่าการส่งออกข้อมูลสําเร็จโดยใช้ gsutil ได้ด้วย เมื่องานทั้งหมดเสร็จแล้ว ให้เรียกใช้คําสั่งต่อไปนี้

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

คุณควรเห็นเอาต์พุตต่อไปนี้

a7c3c7b2e82f9fca.png

ยินดีด้วย คุณได้ทดแทนข้อมูลความคิดเห็น Reddit เรียบร้อยแล้ว หากสนใจวิธีสร้างโมเดลจากข้อมูลนี้ โปรดไปที่ Codelab Spark-NLP

10. ล้างข้อมูล

โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP โดยไม่จำเป็นหลังจากการเริ่มต้นใช้งานอย่างรวดเร็วนี้เสร็จสมบูรณ์

  1. ลบที่เก็บข้อมูล Cloud Storage ของสภาพแวดล้อมที่คุณสร้างขึ้น
  2. ลบสภาพแวดล้อม Dataproc

หากสร้างโปรเจ็กต์มาเพื่อโค้ดแล็บนี้โดยเฉพาะ คุณก็ลบโปรเจ็กต์ได้เช่นกันโดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเพื่อลบโปรเจ็กต์

ใบอนุญาต

ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์สำหรับยอมรับสิทธิของผู้สร้าง (Creative Commons Attribution License) 3.0 ทั่วไป และสัญญาอนุญาต Apache 2.0