เกี่ยวกับ Codelab นี้
1 ภาพรวม
โค้ดแล็บนี้จะอธิบายวิธีสร้างไปป์ไลน์การประมวลผลข้อมูลโดยใช้ Apache Spark กับ Dataproc ใน Google Cloud Platform การอ่านข้อมูลจากตำแหน่งพื้นที่เก็บข้อมูลหนึ่ง เปลี่ยนรูปแบบข้อมูล และเขียนลงในตำแหน่งพื้นที่เก็บข้อมูลอื่นเป็นกรณีการใช้งานที่พบได้ทั่วไปในวิทยาการข้อมูลและวิศวกรข้อมูล การเปลี่ยนรูปแบบที่พบได้ทั่วไป ได้แก่ การเปลี่ยนเนื้อหาของข้อมูล การนำข้อมูลที่ไม่จําเป็นออก และการเปลี่ยนประเภทไฟล์
ในโค้ดแล็บนี้ คุณจะได้เรียนรู้เกี่ยวกับ Apache Spark, เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้ Dataproc พร้อม PySpark (Python API ของ Apache Spark), BigQuery, Google Cloud Storage และข้อมูลจาก Reddit
2 ข้อมูลเบื้องต้นเกี่ยวกับ Apache Spark (ไม่บังคับ)
เว็บไซต์ระบุว่า "Apache Spark เป็นเครื่องมือวิเคราะห์แบบรวมสําหรับการประมวลผลข้อมูลขนาดใหญ่" ซึ่งช่วยให้คุณวิเคราะห์และประมวลผลข้อมูลแบบขนานและในหน่วยความจําได้ ซึ่งจะทําให้สามารถคํานวณแบบขนานจํานวนมากในเครื่องและโหนดต่างๆ ได้ เฟรมเวิร์กนี้เปิดตัวครั้งแรกในปี 2014 เป็นการอัปเกรด MapReduce แบบดั้งเดิม และยังคงเป็นหนึ่งในเฟรมเวิร์กยอดนิยมสําหรับการประมวลผลขนาดใหญ่ Apache Spark เขียนด้วย Scala และมี API ใน Scala, Java, Python และ R โดยประกอบด้วยไลบรารีมากมาย เช่น Spark SQL สําหรับการค้นหา SQL ในข้อมูล, Spark Streaming สําหรับสตรีมมิงข้อมูล, MLlib สําหรับแมชชีนเลิร์นนิง และ GraphX สําหรับการประมวลผลกราฟ ซึ่งทั้งหมดนี้ทํางานบนเครื่องมือ Apache Spark
Spark สามารถทํางานได้ด้วยตัวเองหรือจะใช้ประโยชน์จากบริการจัดการทรัพยากร เช่น Yarn, Mesos หรือ Kubernetes เพื่อปรับขนาดก็ได้ คุณจะใช้ Dataproc สําหรับโค้ดแล็บนี้ ซึ่งใช้ Yarn
เดิมข้อมูลใน Spark จะโหลดลงในหน่วยความจําเป็น RDD หรือชุดข้อมูลที่กระจายแบบยืดหยุ่น การพัฒนาใน Spark มีการเพิ่มประเภทข้อมูลแบบคอลัมน์ใหม่ 2 ประเภท ได้แก่ ชุดข้อมูลที่มีการจัดประเภทและ Dataframe ที่ไม่มีการจัดประเภท กล่าวโดยคร่าวๆ คือ RDD เหมาะสําหรับข้อมูลทุกประเภท ส่วนชุดข้อมูลและ DataFrame เพิ่มประสิทธิภาพสําหรับข้อมูลตาราง เนื่องจากชุดข้อมูลใช้ได้กับ Java และ Scala API เท่านั้น เราจะใช้ PySpark Dataframe API สําหรับโค้ดแล็บนี้ ดูข้อมูลเพิ่มเติมได้ที่เอกสารประกอบของ Apache Spark
3 กรณีการใช้งาน
วิศวกรข้อมูลมักต้องการให้นักวิทยาศาสตร์ข้อมูลเข้าถึงข้อมูลได้ง่ายๆ อย่างไรก็ตาม ข้อมูลมักจะไม่สะอาดในตอนแรก (ใช้งานวิเคราะห์ได้ยากในสถานะปัจจุบัน) และต้องได้รับการทำความสะอาดก่อนจึงจะมีประโยชน์มาก ตัวอย่างของกรณีนี้คือข้อมูลที่คัดลอกมาจากเว็บซึ่งอาจมีการเข้ารหัสที่แปลกประหลาดหรือแท็ก HTML ที่ไม่เกี่ยวข้อง
ในบทนี้ คุณจะโหลดชุดข้อมูลจาก BigQuery ในรูปแบบโพสต์ Reddit ไปยังคลัสเตอร์ Spark ที่โฮสต์ใน Dataproc, ดึงข้อมูลที่เป็นประโยชน์ และจัดเก็บข้อมูลที่ประมวลผลแล้วเป็นไฟล์ CSV ที่บีบอัดใน Google Cloud Storage
นักวิทยาศาสตร์ข้อมูลระดับอาวุโสของบริษัทคุณสนใจให้ทีมแก้ปัญหาการประมวลผลภาษาที่เป็นธรรมชาติ โดยเฉพาะอย่างยิ่ง ลูกค้ารายนี้สนใจวิเคราะห์ข้อมูลในฟอรัม "r/food" คุณจะต้องสร้างไปป์ไลน์สําหรับการถ่ายโอนข้อมูลโดยเริ่มจากการทดแทนข้อมูลตั้งแต่เดือนมกราคม 2017 ถึงเดือนสิงหาคม 2019
4 การเข้าถึง BigQuery ผ่าน BigQuery Storage API
การดึงข้อมูลจาก BigQuery โดยใช้เมธอด tabledata.list API อาจใช้เวลานานและไม่มีประสิทธิภาพเมื่อปริมาณข้อมูลเพิ่มขึ้น เมธอดนี้จะแสดงรายการออบเจ็กต์ JSON และต้องอ่านทีละหน้าตามลําดับเพื่ออ่านชุดข้อมูลทั้งหมด
BigQuery Storage API มีการปรับปรุงที่สำคัญในการเข้าถึงข้อมูลใน BigQuery โดยใช้โปรโตคอลที่อิงตาม RPC โดยรองรับการอ่านและเขียนข้อมูลพร้อมกัน รวมถึงรูปแบบการแปลงเป็นอนุกรมรูปแบบต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง การดำเนินการนี้จะช่วยให้ประสิทธิภาพดีขึ้นอย่างมาก โดยเฉพาะกับชุดข้อมูลขนาดใหญ่
ในโค้ดแล็บนี้ คุณจะใช้ spark-bigquery-connector ในการอ่านและเขียนข้อมูลระหว่าง BigQuery กับ Spark
5 การสร้างโปรเจ็กต์
ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com และสร้างโปรเจ็กต์ใหม่โดยทำดังนี้
ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud
ค่าใช้จ่ายในการทํา Codelab นี้ไม่ควรเกิน 200 บาท แต่อาจมากกว่านั้นหากคุณตัดสินใจใช้ทรัพยากรเพิ่มเติมหรือปล่อยไว้ให้ทํางาน ส่วนสุดท้ายของ Codelab นี้จะแนะนำวิธีล้างข้อมูลโปรเจ็กต์
ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรีมูลค่า$300
6 การตั้งค่าสภาพแวดล้อม
ตอนนี้คุณจะต้องตั้งค่าสภาพแวดล้อมโดยทำดังนี้
- การเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
- การกำหนดการตั้งค่าโปรเจ็กต์
- การสร้างคลัสเตอร์ Dataproc
- การสร้างที่เก็บข้อมูล Google Cloud Storage
การเปิดใช้ API และการกำหนดค่าสภาพแวดล้อม
เปิด Cloud Shell โดยกดปุ่มที่มุมขวาบนของ Cloud Console
หลังจากโหลด Cloud Shell แล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
ตั้งค่ารหัสโปรเจ็กต์ของโปรเจ็กต์ โดยไปที่หน้าการเลือกโปรเจ็กต์แล้วค้นหาโปรเจ็กต์ ซึ่งอาจไม่ใช่ชื่อเดียวกับโปรเจ็กต์
เรียกใช้คําสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์
gcloud config set project <project_id>
ตั้งค่าภูมิภาคของโปรเจ็กต์โดยเลือกจากรายการที่นี่ ตัวอย่างเช่น us-central1
gcloud config set dataproc/region <region>
เลือกชื่อสำหรับคลัสเตอร์ Dataproc และสร้างตัวแปรสภาพแวดล้อมสำหรับคลัสเตอร์
CLUSTER_NAME=<cluster_name>
การสร้างคลัสเตอร์ Dataproc
สร้างคลัสเตอร์ Dataproc โดยเรียกใช้คําสั่งต่อไปนี้
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
คำสั่งนี้จะใช้เวลาสักครู่จึงจะเสร็จสมบูรณ์ รายละเอียดของคำสั่งมีดังนี้
ซึ่งจะเป็นการเริ่มสร้างคลัสเตอร์ Dataproc ด้วยชื่อที่คุณระบุไว้ก่อนหน้านี้ การใช้ beta
API จะเปิดใช้ฟีเจอร์เบต้าของ Dataproc เช่น Component Gateway
gcloud beta dataproc clusters create ${CLUSTER_NAME}
ซึ่งจะตั้งค่าประเภทเครื่องที่จะใช้สำหรับผู้ปฏิบัติงาน
--worker-machine-type n1-standard-8
ซึ่งจะเป็นการกำหนดจำนวนผู้ปฏิบัติงานของคลัสเตอร์
--num-workers 8
ซึ่งจะเป็นการกำหนดเวอร์ชันอิมเมจของ Dataproc
--image-version 1.5-debian
ซึ่งจะกําหนดค่าการดำเนินการเริ่มต้นที่จะใช้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังรวมการดําเนินการเริ่มต้น pip
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
นี่คือข้อมูลเมตาที่จะรวมไว้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังระบุข้อมูลเมตาสําหรับpip
การดำเนินการเริ่มต้น
--metadata 'PIP_PACKAGES=google-cloud-storage'
ซึ่งจะเป็นการตั้งค่าให้ติดตั้งคอมโพเนนต์ที่ไม่บังคับในคลัสเตอร์
--optional-components=ANACONDA
ซึ่งจะเปิดใช้เกตเวย์คอมโพเนนต์ที่ช่วยให้คุณใช้เกตเวย์คอมโพเนนต์ของ Dataproc เพื่อดู UI ทั่วไป เช่น Zeppelin, Jupyter หรือประวัติ Spark ได้
--enable-component-gateway
ดูข้อมูลเบื้องต้นที่ละเอียดยิ่งขึ้นเกี่ยวกับ Dataproc ได้ที่ codelab นี้
การสร้างที่เก็บข้อมูล Google Cloud Storage
คุณจะต้องมีที่เก็บข้อมูล Google Cloud Storage สำหรับเอาต์พุตของงาน กำหนดชื่อที่ไม่ซ้ำกันสำหรับที่เก็บข้อมูล แล้วเรียกใช้คำสั่งต่อไปนี้เพื่อสร้างที่เก็บข้อมูลใหม่ ชื่อที่เก็บข้อมูลต้องไม่ซ้ำกันสำหรับผู้ใช้ทุกคนในโปรเจ็กต์ Google Cloud ทั้งหมด คุณจึงอาจต้องลองดำเนินการนี้หลายครั้งโดยใช้ชื่ออื่น ระบบจะสร้างที่เก็บข้อมูลสำเร็จหากคุณไม่ได้รับ ServiceException
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7 การวิเคราะห์ข้อมูลเชิงสำรวจ
ก่อนทำการเตรียมข้อมูลเบื้องต้น คุณควรศึกษาเพิ่มเติมเกี่ยวกับลักษณะของข้อมูลที่คุณจัดการ โดยคุณจะได้สํารวจวิธีการสํารวจข้อมูล 2 วิธี ก่อนอื่น คุณจะเห็นข้อมูลดิบบางส่วนโดยใช้เว็บ UI ของ BigQuery จากนั้นจะคํานวณจํานวนโพสต์ต่อแต่ละฟอรัมย่อยโดยใช้ PySpark และ Dataproc
การใช้เว็บ UI ของ BigQuery
เริ่มต้นด้วยการใช้เว็บ UI ของ BigQuery เพื่อดูข้อมูล จากไอคอนเมนูในคอนโซลระบบคลาวด์ ให้เลื่อนลงแล้วกด "BigQuery" เพื่อเปิด UI ทางเว็บของ BigQuery
จากนั้นเรียกใช้คําสั่งต่อไปนี้ในเครื่องมือแก้ไขคําค้นหาของ UI ทางเว็บ BigQuery ซึ่งจะแสดงผลข้อมูล 10 แถวเต็มๆ ตั้งแต่เดือนมกราคม 2017
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
คุณสามารถเลื่อนหน้าเพื่อดูคอลัมน์ทั้งหมดที่ใช้ได้ รวมถึงดูตัวอย่างบางส่วน โดยเฉพาะอย่างยิ่ง คุณจะเห็น 2 คอลัมน์ที่แสดงเนื้อหาที่เป็นข้อความของโพสต์แต่ละรายการ ได้แก่ "title" และ "selftext" โดยคอลัมน์หลังคือเนื้อหาของโพสต์ และดูคอลัมน์อื่นๆ ด้วย เช่น "created_utc" ซึ่งเป็นเวลา UTC ที่สร้างโพสต์ และ "subreddit" ซึ่งเป็นชื่อของ subreddit ที่โพสต์อยู่
การดำเนินการกับงาน PySpark
เรียกใช้คําสั่งต่อไปนี้ใน Cloud Shell เพื่อโคลนที่เก็บที่มีโค้ดตัวอย่างและ cd ไปยังไดเรกทอรีที่ถูกต้อง
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
คุณสามารถใช้ PySpark เพื่อระบุจํานวนโพสต์ที่มีอยู่สําหรับแต่ละฟอรัมย่อยได้ คุณสามารถเปิด Cloud Editor และอ่านสคริปต์ cloud-dataproc/codelabs/spark-bigquery
ก่อนที่จะเรียกใช้ในขั้นตอนถัดไป
คลิกปุ่ม "เปิดเทอร์มินัล" ในเครื่องมือแก้ไขบนระบบคลาวด์เพื่อเปลี่ยนกลับไปที่ Cloud Shell แล้วเรียกใช้คําสั่งต่อไปนี้เพื่อดําเนินการงาน PySpark แรก
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
คำสั่งนี้ช่วยให้คุณส่งงานไปยัง Dataproc ผ่าน Jobs API ได้ ในส่วนนี้คุณกําลังระบุประเภทงานเป็น pyspark
คุณสามารถระบุชื่อคลัสเตอร์ พารามิเตอร์ที่ไม่บังคับ และชื่อไฟล์ที่มีงาน ในส่วนนี้ คุณกําลังระบุพารามิเตอร์ --jars
ซึ่งช่วยให้คุณรวม spark-bigquery-connector
ไว้ในงานได้ นอกจากนี้ คุณยังตั้งค่าระดับเอาต์พุตบันทึกได้โดยใช้ --driver-log-levels root=FATAL
ซึ่งจะระงับเอาต์พุตบันทึกทั้งหมดยกเว้นข้อผิดพลาด บันทึกของ Spark มีแนวโน้มที่จะค่อนข้างมีข้อมูลมากเกินไป
การดำเนินการนี้จะใช้เวลา 2-3 นาที และเอาต์พุตสุดท้ายควรมีลักษณะดังนี้
8 การสำรวจ UI ของ Dataproc และ Spark
เมื่อเรียกใช้งาน Spark ใน Dataproc คุณจะมีสิทธิ์เข้าถึง UI 2 แบบเพื่อตรวจสอบสถานะงาน / คลัสเตอร์ รายการแรกคือ UI ของ Dataproc ซึ่งคุณจะเห็นได้โดยคลิกไอคอนเมนูแล้วเลื่อนลงไปที่ Dataproc ในส่วนนี้ คุณจะเห็นหน่วยความจำที่ใช้ได้ในปัจจุบัน รวมถึงหน่วยความจำที่รอดำเนินการและจำนวนผู้ปฏิบัติงาน
นอกจากนี้ คุณยังคลิกแท็บงานเพื่อดูงานที่เสร็จแล้วได้ด้วย คุณดูรายละเอียดงาน เช่น บันทึกและเอาต์พุตของงานเหล่านั้นได้โดยคลิกรหัสงานของงานที่ต้องการ
นอกจากนี้ คุณยังดู UI ของ Spark ได้ด้วย จากหน้างาน ให้คลิกลูกศรย้อนกลับ แล้วคลิก "อินเทอร์เฟซเว็บ" คุณควรเห็นตัวเลือกหลายรายการในส่วนเกตเวย์คอมโพเนนต์ คุณสามารถเปิดใช้ฟีเจอร์เหล่านี้ได้หลายรายการผ่านคอมโพเนนต์ที่ไม่บังคับเมื่อตั้งค่าคลัสเตอร์ สําหรับห้องทดลองนี้ ให้คลิก "เซิร์ฟเวอร์ประวัติของ Spark
ซึ่งจะเป็นการเปิดหน้าต่างต่อไปนี้
งานที่ทำเสร็จแล้วทั้งหมดจะแสดงที่นี่ และคุณสามารถคลิก application_id ใดก็ได้เพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับงาน ในทํานองเดียวกัน คุณสามารถคลิก "แสดงคําขอที่ยังไม่เสร็จสมบูรณ์" ที่ด้านล่างสุดของหน้า Landing Page เพื่อดูงานที่ทํางานอยู่ทั้งหมด
9 เรียกใช้งานทดแทน
ตอนนี้คุณจะต้องเรียกใช้งานที่โหลดข้อมูลลงในหน่วยความจํา ดึงข้อมูลที่จําเป็น และส่งออกเอาต์พุตไปยังที่เก็บข้อมูล Google Cloud Storage คุณจะดึงข้อมูล "ชื่อ" "เนื้อหา" (ข้อความดิบ) และ "การประทับเวลาที่สร้าง" สำหรับความคิดเห็น Reddit แต่ละรายการ จากนั้นให้นำข้อมูลนี้ไปแปลงเป็น CSV, ใส่ลงในไฟล์ ZIP และโหลดลงในที่เก็บข้อมูลที่มี URI ของ gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz
คุณสามารถไปที่ Cloud Editor อีกครั้งเพื่ออ่านโค้ดของ cloud-dataproc/codelabs/spark-bigquery/backfill.sh
ซึ่งเป็นสคริปต์ Wrapper เพื่อเรียกใช้โค้ดใน cloud-dataproc/codelabs/spark-bigquery/backfill.py
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
คุณควรเห็นข้อความแจ้งว่างานเสร็จสมบูรณ์หลายรายการในไม่ช้า การดำเนินการอาจใช้เวลาถึง 15 นาทีจึงจะเสร็จสมบูรณ์ นอกจากนี้ คุณยังตรวจสอบที่เก็บข้อมูลอีกครั้งเพื่อยืนยันว่าการส่งออกข้อมูลสําเร็จโดยใช้ gsutil ได้ด้วย เมื่องานทั้งหมดเสร็จแล้ว ให้เรียกใช้คําสั่งต่อไปนี้
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
คุณควรเห็นเอาต์พุตต่อไปนี้
ยินดีด้วย คุณได้ทดแทนข้อมูลความคิดเห็น Reddit เรียบร้อยแล้ว หากสนใจวิธีสร้างโมเดลจากข้อมูลนี้ โปรดไปที่ Codelab Spark-NLP
10 ล้างข้อมูล
โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP โดยไม่จำเป็นหลังจากการเริ่มต้นใช้งานอย่างรวดเร็วนี้เสร็จสมบูรณ์
- ลบที่เก็บข้อมูล Cloud Storage ของสภาพแวดล้อมที่คุณสร้างขึ้น
- ลบสภาพแวดล้อม Dataproc
หากสร้างโปรเจ็กต์มาเพื่อโค้ดแล็บนี้โดยเฉพาะ คุณก็ลบโปรเจ็กต์ได้เช่นกันโดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเพื่อลบโปรเจ็กต์
ใบอนุญาต
ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์สำหรับยอมรับสิทธิของผู้สร้าง (Creative Commons Attribution License) 3.0 ทั่วไป และสัญญาอนุญาต Apache 2.0