การเตรียมข้อมูล BigQuery ก่อนประมวลผลด้วย PySpark ใน Dataproc

การเตรียมข้อมูล BigQuery ก่อนประมวลผลด้วย PySpark ใน Dataproc

เกี่ยวกับ Codelab นี้

subjectอัปเดตล่าสุดเมื่อ ม.ค. 24, 2022
account_circleเขียนโดย bmiro

1 ภาพรวม

โค้ดแล็บนี้จะอธิบายวิธีสร้างไปป์ไลน์การประมวลผลข้อมูลโดยใช้ Apache Spark กับ Dataproc ใน Google Cloud Platform การอ่านข้อมูลจากตำแหน่งพื้นที่เก็บข้อมูลหนึ่ง เปลี่ยนรูปแบบข้อมูล และเขียนลงในตำแหน่งพื้นที่เก็บข้อมูลอื่นเป็นกรณีการใช้งานที่พบได้ทั่วไปในวิทยาการข้อมูลและวิศวกรข้อมูล การเปลี่ยนรูปแบบที่พบได้ทั่วไป ได้แก่ การเปลี่ยนเนื้อหาของข้อมูล การนำข้อมูลที่ไม่จําเป็นออก และการเปลี่ยนประเภทไฟล์

ในโค้ดแล็บนี้ คุณจะได้เรียนรู้เกี่ยวกับ Apache Spark, เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้ Dataproc พร้อม PySpark (Python API ของ Apache Spark), BigQuery, Google Cloud Storage และข้อมูลจาก Reddit

2 ข้อมูลเบื้องต้นเกี่ยวกับ Apache Spark (ไม่บังคับ)

เว็บไซต์ระบุว่า "Apache Spark เป็นเครื่องมือวิเคราะห์แบบรวมสําหรับการประมวลผลข้อมูลขนาดใหญ่" ซึ่งช่วยให้คุณวิเคราะห์และประมวลผลข้อมูลแบบขนานและในหน่วยความจําได้ ซึ่งจะทําให้สามารถคํานวณแบบขนานจํานวนมากในเครื่องและโหนดต่างๆ ได้ เฟรมเวิร์กนี้เปิดตัวครั้งแรกในปี 2014 เป็นการอัปเกรด MapReduce แบบดั้งเดิม และยังคงเป็นหนึ่งในเฟรมเวิร์กยอดนิยมสําหรับการประมวลผลขนาดใหญ่ Apache Spark เขียนด้วย Scala และมี API ใน Scala, Java, Python และ R โดยประกอบด้วยไลบรารีมากมาย เช่น Spark SQL สําหรับการค้นหา SQL ในข้อมูล, Spark Streaming สําหรับสตรีมมิงข้อมูล, MLlib สําหรับแมชชีนเลิร์นนิง และ GraphX สําหรับการประมวลผลกราฟ ซึ่งทั้งหมดนี้ทํางานบนเครื่องมือ Apache Spark

32add0b6a47bafbc.png

Spark สามารถทํางานได้ด้วยตัวเองหรือจะใช้ประโยชน์จากบริการจัดการทรัพยากร เช่น Yarn, Mesos หรือ Kubernetes เพื่อปรับขนาดก็ได้ คุณจะใช้ Dataproc สําหรับโค้ดแล็บนี้ ซึ่งใช้ Yarn

เดิมข้อมูลใน Spark จะโหลดลงในหน่วยความจําเป็น RDD หรือชุดข้อมูลที่กระจายแบบยืดหยุ่น การพัฒนาใน Spark มีการเพิ่มประเภทข้อมูลแบบคอลัมน์ใหม่ 2 ประเภท ได้แก่ ชุดข้อมูลที่มีการจัดประเภทและ Dataframe ที่ไม่มีการจัดประเภท กล่าวโดยคร่าวๆ คือ RDD เหมาะสําหรับข้อมูลทุกประเภท ส่วนชุดข้อมูลและ DataFrame เพิ่มประสิทธิภาพสําหรับข้อมูลตาราง เนื่องจากชุดข้อมูลใช้ได้กับ Java และ Scala API เท่านั้น เราจะใช้ PySpark Dataframe API สําหรับโค้ดแล็บนี้ ดูข้อมูลเพิ่มเติมได้ที่เอกสารประกอบของ Apache Spark

3 กรณีการใช้งาน

วิศวกรข้อมูลมักต้องการให้นักวิทยาศาสตร์ข้อมูลเข้าถึงข้อมูลได้ง่ายๆ อย่างไรก็ตาม ข้อมูลมักจะไม่สะอาดในตอนแรก (ใช้งานวิเคราะห์ได้ยากในสถานะปัจจุบัน) และต้องได้รับการทำความสะอาดก่อนจึงจะมีประโยชน์มาก ตัวอย่างของกรณีนี้คือข้อมูลที่คัดลอกมาจากเว็บซึ่งอาจมีการเข้ารหัสที่แปลกประหลาดหรือแท็ก HTML ที่ไม่เกี่ยวข้อง

ในบทนี้ คุณจะโหลดชุดข้อมูลจาก BigQuery ในรูปแบบโพสต์ Reddit ไปยังคลัสเตอร์ Spark ที่โฮสต์ใน Dataproc, ดึงข้อมูลที่เป็นประโยชน์ และจัดเก็บข้อมูลที่ประมวลผลแล้วเป็นไฟล์ CSV ที่บีบอัดใน Google Cloud Storage

be2a4551ece63bfc.png

นักวิทยาศาสตร์ข้อมูลระดับอาวุโสของบริษัทคุณสนใจให้ทีมแก้ปัญหาการประมวลผลภาษาที่เป็นธรรมชาติ โดยเฉพาะอย่างยิ่ง ลูกค้ารายนี้สนใจวิเคราะห์ข้อมูลในฟอรัม "r/food" คุณจะต้องสร้างไปป์ไลน์สําหรับการถ่ายโอนข้อมูลโดยเริ่มจากการทดแทนข้อมูลตั้งแต่เดือนมกราคม 2017 ถึงเดือนสิงหาคม 2019

4 การเข้าถึง BigQuery ผ่าน BigQuery Storage API

การดึงข้อมูลจาก BigQuery โดยใช้เมธอด tabledata.list API อาจใช้เวลานานและไม่มีประสิทธิภาพเมื่อปริมาณข้อมูลเพิ่มขึ้น เมธอดนี้จะแสดงรายการออบเจ็กต์ JSON และต้องอ่านทีละหน้าตามลําดับเพื่ออ่านชุดข้อมูลทั้งหมด

BigQuery Storage API มีการปรับปรุงที่สำคัญในการเข้าถึงข้อมูลใน BigQuery โดยใช้โปรโตคอลที่อิงตาม RPC โดยรองรับการอ่านและเขียนข้อมูลพร้อมกัน รวมถึงรูปแบบการแปลงเป็นอนุกรมรูปแบบต่างๆ เช่น Apache Avro และ Apache Arrow ในระดับสูง การดำเนินการนี้จะช่วยให้ประสิทธิภาพดีขึ้นอย่างมาก โดยเฉพาะกับชุดข้อมูลขนาดใหญ่

ในโค้ดแล็บนี้ คุณจะใช้ spark-bigquery-connector ในการอ่านและเขียนข้อมูลระหว่าง BigQuery กับ Spark

5 การสร้างโปรเจ็กต์

ลงชื่อเข้าใช้คอนโซล Google Cloud Platform ที่ console.cloud.google.com และสร้างโปรเจ็กต์ใหม่โดยทำดังนี้

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud

ค่าใช้จ่ายในการทํา Codelab นี้ไม่ควรเกิน 200 บาท แต่อาจมากกว่านั้นหากคุณตัดสินใจใช้ทรัพยากรเพิ่มเติมหรือปล่อยไว้ให้ทํางาน ส่วนสุดท้ายของ Codelab นี้จะแนะนำวิธีล้างข้อมูลโปรเจ็กต์

ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์รับช่วงทดลองใช้ฟรีมูลค่า$300

6 การตั้งค่าสภาพแวดล้อม

ตอนนี้คุณจะต้องตั้งค่าสภาพแวดล้อมโดยทำดังนี้

  • การเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API
  • การกำหนดการตั้งค่าโปรเจ็กต์
  • การสร้างคลัสเตอร์ Dataproc
  • การสร้างที่เก็บข้อมูล Google Cloud Storage

การเปิดใช้ API และการกำหนดค่าสภาพแวดล้อม

เปิด Cloud Shell โดยกดปุ่มที่มุมขวาบนของ Cloud Console

a10c47ee6ca41c54.png

หลังจากโหลด Cloud Shell แล้ว ให้เรียกใช้คำสั่งต่อไปนี้เพื่อเปิดใช้ Compute Engine, Dataproc และ BigQuery Storage API

gcloud services enable compute.googleapis.com \
  dataproc
.googleapis.com \
  bigquerystorage
.googleapis.com

ตั้งค่ารหัสโปรเจ็กต์ของโปรเจ็กต์ โดยไปที่หน้าการเลือกโปรเจ็กต์แล้วค้นหาโปรเจ็กต์ ซึ่งอาจไม่ใช่ชื่อเดียวกับโปรเจ็กต์

e682e8227aa3c781.png

76d45fb295728542.png

เรียกใช้คําสั่งต่อไปนี้เพื่อตั้งค่ารหัสโปรเจ็กต์

gcloud config set project <project_id>

ตั้งค่าภูมิภาคของโปรเจ็กต์โดยเลือกจากรายการที่นี่ ตัวอย่างเช่น us-central1

gcloud config set dataproc/region <region>

เลือกชื่อสำหรับคลัสเตอร์ Dataproc และสร้างตัวแปรสภาพแวดล้อมสำหรับคลัสเตอร์

CLUSTER_NAME=<cluster_name>

การสร้างคลัสเตอร์ Dataproc

สร้างคลัสเตอร์ Dataproc โดยเรียกใช้คําสั่งต่อไปนี้

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

คำสั่งนี้จะใช้เวลาสักครู่จึงจะเสร็จสมบูรณ์ รายละเอียดของคำสั่งมีดังนี้

ซึ่งจะเป็นการเริ่มสร้างคลัสเตอร์ Dataproc ด้วยชื่อที่คุณระบุไว้ก่อนหน้านี้ การใช้ beta API จะเปิดใช้ฟีเจอร์เบต้าของ Dataproc เช่น Component Gateway

gcloud beta dataproc clusters create ${CLUSTER_NAME}

ซึ่งจะตั้งค่าประเภทเครื่องที่จะใช้สำหรับผู้ปฏิบัติงาน

--worker-machine-type n1-standard-8

ซึ่งจะเป็นการกำหนดจำนวนผู้ปฏิบัติงานของคลัสเตอร์

--num-workers 8

ซึ่งจะเป็นการกำหนดเวอร์ชันอิมเมจของ Dataproc

--image-version 1.5-debian

ซึ่งจะกําหนดค่าการดำเนินการเริ่มต้นที่จะใช้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังรวมการดําเนินการเริ่มต้น pip

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

นี่คือข้อมูลเมตาที่จะรวมไว้ในคลัสเตอร์ ในส่วนนี้ คุณกําลังระบุข้อมูลเมตาสําหรับpipการดำเนินการเริ่มต้น

--metadata 'PIP_PACKAGES=google-cloud-storage'

ซึ่งจะเป็นการตั้งค่าให้ติดตั้งคอมโพเนนต์ที่ไม่บังคับในคลัสเตอร์

--optional-components=ANACONDA

ซึ่งจะเปิดใช้เกตเวย์คอมโพเนนต์ที่ช่วยให้คุณใช้เกตเวย์คอมโพเนนต์ของ Dataproc เพื่อดู UI ทั่วไป เช่น Zeppelin, Jupyter หรือประวัติ Spark ได้

--enable-component-gateway

ดูข้อมูลเบื้องต้นที่ละเอียดยิ่งขึ้นเกี่ยวกับ Dataproc ได้ที่ codelab นี้

การสร้างที่เก็บข้อมูล Google Cloud Storage

คุณจะต้องมีที่เก็บข้อมูล Google Cloud Storage สำหรับเอาต์พุตของงาน กำหนดชื่อที่ไม่ซ้ำกันสำหรับที่เก็บข้อมูล แล้วเรียกใช้คำสั่งต่อไปนี้เพื่อสร้างที่เก็บข้อมูลใหม่ ชื่อที่เก็บข้อมูลต้องไม่ซ้ำกันสำหรับผู้ใช้ทุกคนในโปรเจ็กต์ Google Cloud ทั้งหมด คุณจึงอาจต้องลองดำเนินการนี้หลายครั้งโดยใช้ชื่ออื่น ระบบจะสร้างที่เก็บข้อมูลสำเร็จหากคุณไม่ได้รับ ServiceException

BUCKET_NAME=<bucket_name>
gsutil
mb gs://${BUCKET_NAME}

7 การวิเคราะห์ข้อมูลเชิงสำรวจ

ก่อนทำการเตรียมข้อมูลเบื้องต้น คุณควรศึกษาเพิ่มเติมเกี่ยวกับลักษณะของข้อมูลที่คุณจัดการ โดยคุณจะได้สํารวจวิธีการสํารวจข้อมูล 2 วิธี ก่อนอื่น คุณจะเห็นข้อมูลดิบบางส่วนโดยใช้เว็บ UI ของ BigQuery จากนั้นจะคํานวณจํานวนโพสต์ต่อแต่ละฟอรัมย่อยโดยใช้ PySpark และ Dataproc

การใช้เว็บ UI ของ BigQuery

เริ่มต้นด้วยการใช้เว็บ UI ของ BigQuery เพื่อดูข้อมูล จากไอคอนเมนูในคอนโซลระบบคลาวด์ ให้เลื่อนลงแล้วกด "BigQuery" เพื่อเปิด UI ทางเว็บของ BigQuery

242a597d7045b4da.png

จากนั้นเรียกใช้คําสั่งต่อไปนี้ในเครื่องมือแก้ไขคําค้นหาของ UI ทางเว็บ BigQuery ซึ่งจะแสดงผลข้อมูล 10 แถวเต็มๆ ตั้งแต่เดือนมกราคม 2017

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

คุณสามารถเลื่อนหน้าเพื่อดูคอลัมน์ทั้งหมดที่ใช้ได้ รวมถึงดูตัวอย่างบางส่วน โดยเฉพาะอย่างยิ่ง คุณจะเห็น 2 คอลัมน์ที่แสดงเนื้อหาที่เป็นข้อความของโพสต์แต่ละรายการ ได้แก่ "title" และ "selftext" โดยคอลัมน์หลังคือเนื้อหาของโพสต์ และดูคอลัมน์อื่นๆ ด้วย เช่น "created_utc" ซึ่งเป็นเวลา UTC ที่สร้างโพสต์ และ "subreddit" ซึ่งเป็นชื่อของ subreddit ที่โพสต์อยู่

การดำเนินการกับงาน PySpark

เรียกใช้คําสั่งต่อไปนี้ใน Cloud Shell เพื่อโคลนที่เก็บที่มีโค้ดตัวอย่างและ cd ไปยังไดเรกทอรีที่ถูกต้อง

cd
git clone https
://github.com/GoogleCloudPlatform/cloud-dataproc

คุณสามารถใช้ PySpark เพื่อระบุจํานวนโพสต์ที่มีอยู่สําหรับแต่ละฟอรัมย่อยได้ คุณสามารถเปิด Cloud Editor และอ่านสคริปต์ cloud-dataproc/codelabs/spark-bigquery ก่อนที่จะเรียกใช้ในขั้นตอนถัดไป

5d965c6fb66dbd81.png

797cf71de3449bdb.png

คลิกปุ่ม "เปิดเทอร์มินัล" ในเครื่องมือแก้ไขบนระบบคลาวด์เพื่อเปลี่ยนกลับไปที่ Cloud Shell แล้วเรียกใช้คําสั่งต่อไปนี้เพื่อดําเนินการงาน PySpark แรก

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud
dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

คำสั่งนี้ช่วยให้คุณส่งงานไปยัง Dataproc ผ่าน Jobs API ได้ ในส่วนนี้คุณกําลังระบุประเภทงานเป็น pyspark คุณสามารถระบุชื่อคลัสเตอร์ พารามิเตอร์ที่ไม่บังคับ และชื่อไฟล์ที่มีงาน ในส่วนนี้ คุณกําลังระบุพารามิเตอร์ --jars ซึ่งช่วยให้คุณรวม spark-bigquery-connector ไว้ในงานได้ นอกจากนี้ คุณยังตั้งค่าระดับเอาต์พุตบันทึกได้โดยใช้ --driver-log-levels root=FATAL ซึ่งจะระงับเอาต์พุตบันทึกทั้งหมดยกเว้นข้อผิดพลาด บันทึกของ Spark มีแนวโน้มที่จะค่อนข้างมีข้อมูลมากเกินไป

การดำเนินการนี้จะใช้เวลา 2-3 นาที และเอาต์พุตสุดท้ายควรมีลักษณะดังนี้

6c185228db47bb18.png

8 การสำรวจ UI ของ Dataproc และ Spark

เมื่อเรียกใช้งาน Spark ใน Dataproc คุณจะมีสิทธิ์เข้าถึง UI 2 แบบเพื่อตรวจสอบสถานะงาน / คลัสเตอร์ รายการแรกคือ UI ของ Dataproc ซึ่งคุณจะเห็นได้โดยคลิกไอคอนเมนูแล้วเลื่อนลงไปที่ Dataproc ในส่วนนี้ คุณจะเห็นหน่วยความจำที่ใช้ได้ในปัจจุบัน รวมถึงหน่วยความจำที่รอดำเนินการและจำนวนผู้ปฏิบัติงาน

6f2987346d15c8e2.png

นอกจากนี้ คุณยังคลิกแท็บงานเพื่อดูงานที่เสร็จแล้วได้ด้วย คุณดูรายละเอียดงาน เช่น บันทึกและเอาต์พุตของงานเหล่านั้นได้โดยคลิกรหัสงานของงานที่ต้องการ 114d90129b0e4c88.png

1b2160f0f484594a.png

นอกจากนี้ คุณยังดู UI ของ Spark ได้ด้วย จากหน้างาน ให้คลิกลูกศรย้อนกลับ แล้วคลิก "อินเทอร์เฟซเว็บ" คุณควรเห็นตัวเลือกหลายรายการในส่วนเกตเวย์คอมโพเนนต์ คุณสามารถเปิดใช้ฟีเจอร์เหล่านี้ได้หลายรายการผ่านคอมโพเนนต์ที่ไม่บังคับเมื่อตั้งค่าคลัสเตอร์ สําหรับห้องทดลองนี้ ให้คลิก "เซิร์ฟเวอร์ประวัติของ Spark

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

ซึ่งจะเป็นการเปิดหน้าต่างต่อไปนี้

8f6786760f994fe8.png

งานที่ทำเสร็จแล้วทั้งหมดจะแสดงที่นี่ และคุณสามารถคลิก application_id ใดก็ได้เพื่อดูข้อมูลเพิ่มเติมเกี่ยวกับงาน ในทํานองเดียวกัน คุณสามารถคลิก "แสดงคําขอที่ยังไม่เสร็จสมบูรณ์" ที่ด้านล่างสุดของหน้า Landing Page เพื่อดูงานที่ทํางานอยู่ทั้งหมด

9 เรียกใช้งานทดแทน

ตอนนี้คุณจะต้องเรียกใช้งานที่โหลดข้อมูลลงในหน่วยความจํา ดึงข้อมูลที่จําเป็น และส่งออกเอาต์พุตไปยังที่เก็บข้อมูล Google Cloud Storage คุณจะดึงข้อมูล "ชื่อ" "เนื้อหา" (ข้อความดิบ) และ "การประทับเวลาที่สร้าง" สำหรับความคิดเห็น Reddit แต่ละรายการ จากนั้นให้นำข้อมูลนี้ไปแปลงเป็น CSV, ใส่ลงในไฟล์ ZIP และโหลดลงในที่เก็บข้อมูลที่มี URI ของ gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz

คุณสามารถไปที่ Cloud Editor อีกครั้งเพื่ออ่านโค้ดของ cloud-dataproc/codelabs/spark-bigquery/backfill.sh ซึ่งเป็นสคริปต์ Wrapper เพื่อเรียกใช้โค้ดใน cloud-dataproc/codelabs/spark-bigquery/backfill.py

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash
backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

คุณควรเห็นข้อความแจ้งว่างานเสร็จสมบูรณ์หลายรายการในไม่ช้า การดำเนินการอาจใช้เวลาถึง 15 นาทีจึงจะเสร็จสมบูรณ์ นอกจากนี้ คุณยังตรวจสอบที่เก็บข้อมูลอีกครั้งเพื่อยืนยันว่าการส่งออกข้อมูลสําเร็จโดยใช้ gsutil ได้ด้วย เมื่องานทั้งหมดเสร็จแล้ว ให้เรียกใช้คําสั่งต่อไปนี้

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

คุณควรเห็นเอาต์พุตต่อไปนี้

a7c3c7b2e82f9fca.png

ยินดีด้วย คุณได้ทดแทนข้อมูลความคิดเห็น Reddit เรียบร้อยแล้ว หากสนใจวิธีสร้างโมเดลจากข้อมูลนี้ โปรดไปที่ Codelab Spark-NLP

10 ล้างข้อมูล

โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP โดยไม่จำเป็นหลังจากการเริ่มต้นใช้งานอย่างรวดเร็วนี้เสร็จสมบูรณ์

  1. ลบที่เก็บข้อมูล Cloud Storage ของสภาพแวดล้อมที่คุณสร้างขึ้น
  2. ลบสภาพแวดล้อม Dataproc

หากสร้างโปรเจ็กต์มาเพื่อโค้ดแล็บนี้โดยเฉพาะ คุณก็ลบโปรเจ็กต์ได้เช่นกันโดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเพื่อลบโปรเจ็กต์

ใบอนุญาต

ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์สำหรับยอมรับสิทธิของผู้สร้าง (Creative Commons Attribution License) 3.0 ทั่วไป และสัญญาอนุญาต Apache 2.0