Xử lý trước dữ liệu BigQuery bằng PySpark trên Dataproc

1. Tổng quan

Lớp học lập trình này sẽ giới thiệu cách tạo quy trình xử lý dữ liệu bằng Apache Spark với Dataproc trên Google Cloud Platform. Đây là một trường hợp sử dụng phổ biến trong khoa học dữ liệu và kỹ thuật dữ liệu để đọc dữ liệu từ một vị trí lưu trữ, thực hiện biến đổi và ghi dữ liệu vào một vị trí lưu trữ khác. Các biến đổi phổ biến bao gồm thay đổi nội dung dữ liệu, loại bỏ thông tin không cần thiết và thay đổi loại tệp.

Trong lớp học lập trình này, bạn sẽ tìm hiểu về Apache Spark, chạy một quy trình mẫu bằng Dataproc với PySpark (API Python của Apache Spark), BigQuery, Google Cloud Storage và dữ liệu trên Reddit.

2. Giới thiệu về Apache Spark (Không bắt buộc)

Theo trang web: " Apache Spark là một công cụ phân tích hợp nhất giúp xử lý dữ liệu quy mô lớn". API này cho phép bạn phân tích và xử lý dữ liệu song song và trong bộ nhớ, cho phép thực hiện việc tính toán song song trên quy mô lớn trên nhiều máy và nút khác nhau. Được phát hành lần đầu vào năm 2014 dưới dạng bản nâng cấp của MapReduce (Giảm giá trị) truyền thống và hiện vẫn là một trong những khung phổ biến nhất để thực hiện các phép tính quy mô lớn. Apache Spark được viết bằng Scala và sau đó có các API bằng Scala, Java, Python và R. Lớp này chứa rất nhiều thư viện như Spark SQL để thực hiện truy vấn SQL trên dữ liệu, Spark Streaming để truyền trực tuyến dữ liệu, MLlib cho máy học và GraphX để xử lý biểu đồ, tất cả đều chạy trên công cụ Apache Spark.

32add0b6a47bafbc.png.

Spark có thể tự chạy hoặc có thể tận dụng một dịch vụ quản lý tài nguyên như Yarn, Mesos hoặc Kubernetes để mở rộng quy mô. Bạn sẽ dùng Dataproc cho lớp học lập trình này (lớp học lập trình dùng Yarn).

Dữ liệu trong Spark ban đầu được tải vào bộ nhớ vào bộ nhớ được gọi là RDD (tập dữ liệu phân phối có khả năng phục hồi). Kể từ đó, quá trình phát triển trên Spark bao gồm việc bổ sung hai loại dữ liệu mới kiểu cột: Tập dữ liệu (đã nhập) và Dataframe (không có kiểu). Nói một cách thẳng thì RDD rất phù hợp với mọi loại dữ liệu, trong khi Tập dữ liệu và DataFrame được tối ưu hoá cho dữ liệu dạng bảng. Vì Tập dữ liệu chỉ có sẵn với API Java và API Scala, nên chúng ta sẽ tiếp tục sử dụng PySpark Dataframe API cho lớp học lập trình này. Để biết thêm thông tin, vui lòng tham khảo tài liệu về Apache Spark.

3. Trường hợp sử dụng

Kỹ sư dữ liệu thường cần dữ liệu mà nhà khoa học dữ liệu có thể dễ dàng truy cập. Tuy nhiên, ban đầu dữ liệu thường rất bẩn (khó sử dụng cho việc phân tích ở trạng thái hiện tại) và cần phải được làm sạch trước khi có thể sử dụng được nhiều. Một ví dụ cho vấn đề này là dữ liệu đã được cóp nhặt trên web, có thể chứa cách mã hoá lạ hoặc thẻ HTML không liên quan.

Trong phòng thí nghiệm này, bạn sẽ tải một tập dữ liệu từ BigQuery dưới dạng bài đăng trên Reddit vào cụm Spark được lưu trữ trên Dataproc, trích xuất thông tin hữu ích và lưu trữ dữ liệu đã xử lý dưới dạng tệp CSV nén trong Google Cloud Storage.

be2a4551ece63bfc.png

Nhà khoa học dữ liệu trưởng tại công ty của bạn muốn các nhóm của họ giải quyết nhiều vấn đề xử lý ngôn ngữ tự nhiên. Cụ thể, họ muốn phân tích dữ liệu trong phụ đề "r/food". Bạn sẽ tạo một đường ống cho tệp kết xuất dữ liệu, bắt đầu bằng quá trình chèn lấp từ tháng 1 năm 2017 đến tháng 8 năm 2019.

4. Truy cập BigQuery thông qua BigQuery Storage API

Việc lấy dữ liệu từ BigQuery bằng phương pháp APItabledata.list có thể tốn thời gian và không hiệu quả như số lượng của quy mô dữ liệu. Phương thức này trả về danh sách các đối tượng JSON và yêu cầu đọc tuần tự từng trang một để đọc toàn bộ tập dữ liệu.

BigQuery Storage API mang đến những cải tiến đáng kể cho việc truy cập dữ liệu trong BigQuery bằng cách sử dụng một giao thức dựa trên RPC. Thư viện này hỗ trợ ghi và đọc dữ liệu song song với nhiều định dạng chuyển đổi tuần tự, chẳng hạn như Apache AvroApache Arrow. Ở cấp độ cao, điều này đồng nghĩa với việc hiệu suất được cải thiện đáng kể, đặc biệt là đối với các tập dữ liệu lớn hơn.

Trong lớp học lập trình này, bạn sẽ sử dụng spark-bigquery-connector để đọc và ghi dữ liệu giữa BigQuery và Spark.

5. Tạo dự án

Đăng nhập vào bảng điều khiển Google Cloud Platform tại console.cloud.google.com và tạo một dự án mới:

7e541d932b20c074.pngS

2deefc9295d114ea.png.

a92a49afe05008a.png

Tiếp theo, bạn sẽ cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.

Bạn sẽ không mất quá vài đô la khi chạy qua lớp học lập trình này, nhưng có thể sẽ cao hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để chúng chạy. Phần cuối của lớp học lập trình này sẽ hướng dẫn bạn cách dọn dẹp dự án.

Người dùng mới của Google Cloud Platform đủ điều kiện nhận 300 USD dùng thử miễn phí.

6. Thiết lập môi trường

Bây giờ, bạn sẽ tìm hiểu cách thiết lập môi trường bằng cách:

  • Bật Compute Engine, Dataproc và BigQuery Storage API
  • Định cấu hình chế độ cài đặt dự án
  • Tạo cụm Dataproc
  • Tạo bộ chứa Google Cloud Storage

Bật API và định cấu hình môi trường của bạn

Mở Cloud Shell bằng cách nhấn vào nút ở góc trên cùng bên phải của Cloud Console.

a10c47ee6ca41c54.png

Sau khi Cloud Shell tải, hãy chạy các lệnh sau để bật các API Compute Engine, Dataproc và BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Đặt project id (mã dự án) của dự án. Bạn có thể tìm thấy dự án này bằng cách chuyển đến trang chọn dự án rồi tìm dự án của mình. Tên này có thể không giống với tên dự án của bạn.

e682e8227aa3c781.png

76d45fb295728542.pngS

Chạy lệnh sau để đặt mã dự án của bạn:

gcloud config set project <project_id>

Đặt khu vực cho dự án bằng cách chọn một khu vực trong danh sách tại đây. Ví dụ như us-central1.

gcloud config set dataproc/region <region>

Chọn tên cho cụm Dataproc và tạo một biến môi trường cho cụm Dataproc đó.

CLUSTER_NAME=<cluster_name>

Tạo cụm Dataproc

Tạo một cụm Dataproc bằng cách thực thi lệnh sau:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Lệnh này sẽ mất vài phút để hoàn tất. Cách chia nhỏ lệnh:

Thao tác này sẽ bắt đầu tạo cụm Dataproc với tên mà bạn đã cung cấp trước đó. Việc sử dụng beta API sẽ bật các tính năng thử nghiệm của Dataproc, chẳng hạn như Component Gateway.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Thao tác này sẽ đặt loại máy sẽ sử dụng cho nhân viên của bạn.

--worker-machine-type n1-standard-8

Thao tác này sẽ đặt số lượng nhân viên mà cụm của bạn sẽ có.

--num-workers 8

Thao tác này sẽ đặt phiên bản hình ảnh của Dataproc.

--image-version 1.5-debian

Thao tác này sẽ định cấu hình các thao tác khởi chạy sẽ được sử dụng trên cụm. Tại đây, bạn sẽ đưa vào cả thao tác khởi chạy pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Đây là siêu dữ liệu cần đưa vào cụm. Ở đây, bạn sẽ cung cấp siêu dữ liệu cho thao tác khởi chạy pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Thao tác này sẽ thiết lập để Thành phần không bắt buộc được cài đặt trên cụm.

--optional-components=ANACONDA

Thao tác này sẽ bật cổng thành phần, cho phép bạn sử dụng Cổng thành phần của Dataproc để xem các giao diện người dùng phổ biến như Zeppelin, Jupyter hoặc Lịch sử Spark

--enable-component-gateway

Để được giới thiệu sâu hơn về Dataproc, vui lòng xem lớp học lập trình này.

Tạo bộ chứa Google Cloud Storage

Bạn sẽ cần có một bộ chứa Google Cloud Storage để thực hiện công việc. Xác định tên riêng biệt cho bộ chứa của bạn và chạy lệnh sau để tạo một bộ chứa mới. Tên bộ chứa là tên duy nhất trong mọi dự án trên Google Cloud dành cho tất cả người dùng. Vì vậy, có thể bạn cần thử lại vài lần bằng tên khác. Đã tạo thành công một bộ chứa nếu bạn không nhận được ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Phân tích dữ liệu khám phá

Trước khi thực hiện xử lý trước, bạn nên tìm hiểu thêm về bản chất của dữ liệu mà bạn đang xử lý. Để thực hiện việc này, bạn sẽ khám phá hai phương pháp khám phá dữ liệu. Trước tiên, bạn sẽ xem một số dữ liệu thô bằng cách sử dụng giao diện người dùng web BigQuery, sau đó bạn sẽ tính toán số lượng bài đăng trên mỗi chuyên mục bằng PySpark và Dataproc.

Sử dụng giao diện người dùng web BigQuery

Hãy bắt đầu bằng cách sử dụng giao diện người dùng web BigQuery để xem dữ liệu của bạn. Từ biểu tượng trình đơn trong Cloud Console, hãy di chuyển xuống rồi nhấn vào "BigQuery" để mở giao diện người dùng web BigQuery.

242a597d7045b4da.pngS

Tiếp theo, hãy chạy lệnh sau trong Trình chỉnh sửa truy vấn giao diện người dùng web BigQuery. Thao tác này sẽ trả về 10 hàng dữ liệu đầy đủ từ tháng 1 năm 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Bạn có thể cuộn qua trang để xem tất cả các cột có sẵn cũng như một số ví dụ. Cụ thể, bạn sẽ thấy hai cột thể hiện nội dung văn bản của mỗi bài đăng: "title" (tiêu đề) và "selftext", văn bản sau là phần nội dung của bài đăng. Đồng thời, hãy chú ý đến các cột khác, chẳng hạn như "created_utc" đây là thời điểm đăng một bài và là "subreddit" đây là tài khoản phụ mà bài đăng hiển thị trong đó.

Thực hiện công việc PySpark

Chạy các lệnh sau trong Cloud Shell để sao chép kho lưu trữ bằng mã mẫu và đĩa cd vào đúng thư mục:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Bạn có thể sử dụng PySpark để xác định số lượng bài đăng tồn tại cho mỗi chuyên mục. Bạn có thể mở Cloud Editor rồi đọc tập lệnh cloud-dataproc/codelabs/spark-bigquery trước khi thực thi tập lệnh đó ở bước tiếp theo:

5d965c6fb66dbd81.pngS

797cf71de3449bdb.png.

Nhấp vào "Open Terminal" (Mở cửa sổ dòng lệnh) trong Cloud Editor để quay lại Cloud Shell và chạy lệnh sau để thực thi công việc PySpark đầu tiên của bạn:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Lệnh này cho phép bạn gửi công việc đến Dataproc qua API Việc làm. Ở đây, bạn chỉ rõ loại công việc là pyspark. Bạn có thể cung cấp tên cụm, các tham số không bắt buộc và tên tệp chứa công việc. Ở đây, bạn sẽ cung cấp tham số --jars để cho phép bạn thêm spark-bigquery-connector vào công việc của mình. Bạn cũng có thể đặt các cấp độ đầu ra nhật ký bằng --driver-log-levels root=FATAL để chặn mọi đầu ra nhật ký, ngoại trừ các lỗi. Nhật ký tia sáng thường khá ồn.

Bạn sẽ mất vài phút để chạy và kết quả cuối cùng sẽ có dạng như sau:

6c185228db47bb18.pngS

8. Khám phá giao diện người dùng Dataproc và Spark

Khi chạy công việc Spark trên Dataproc, bạn có quyền truy cập vào hai giao diện người dùng để kiểm tra trạng thái của các công việc / cụm của bạn. Đầu tiên là giao diện người dùng Dataproc. Bạn có thể tìm thấy giao diện này bằng cách nhấp vào biểu tượng trình đơn rồi cuộn xuống Dataproc. Tại đây, bạn có thể thấy bộ nhớ hiện tại còn trống cũng như bộ nhớ đang chờ xử lý và số lượng worker.

6f2987346d15c8e2.pngs

Bạn cũng có thể nhấp vào thẻ việc làm để xem các công việc đã hoàn tất. Bạn có thể xem chi tiết công việc, chẳng hạn như nhật ký và dữ liệu đầu ra của các công việc đó bằng cách nhấp vào Mã công việc của một công việc cụ thể. 114d90129b0e4c88.pngS

1b2160f0f484594a.pngS

Bạn cũng có thể xem giao diện người dùng Spark. Từ trang công việc, hãy nhấp vào mũi tên quay lại, sau đó nhấp vào Giao diện web. Bạn sẽ thấy một số tuỳ chọn trong cổng thành phần. Bạn có thể bật nhiều tính năng trong số này thông qua Các thành phần không bắt buộc khi thiết lập cụm. Đối với phòng thí nghiệm này, hãy nhấp vào "Spark History Server.

5da7944327d193dc.png.

6a349200289c69c1.pngS e63b36bdc90ff610.png

Thao tác này sẽ mở cửa sổ sau:

8f6786760f994fe8.pngs

Tất cả công việc đã hoàn thành sẽ xuất hiện ở đây và bạn có thể nhấp vào bất kỳ application_id nào để tìm hiểu thêm thông tin về công việc. Tương tự, bạn có thể nhấp vào "Hiển thị ứng dụng chưa hoàn tất" ở cuối trang đích để xem tất cả công việc hiện đang chạy.

9. Chạy công việc chèn lấp

Bây giờ, bạn sẽ chạy một tác vụ tải dữ liệu vào bộ nhớ, trích xuất thông tin cần thiết và kết xuất dữ liệu đầu ra vào một bộ chứa Google Cloud Storage. Bạn sẽ trích xuất "title", "body" (văn bản thô) và "dấu thời gian đã tạo" cho mỗi bình luận trên Reddit. Sau đó, bạn sẽ lấy dữ liệu này, chuyển đổi dữ liệu thành tệp csv, nén và tải dữ liệu vào một bộ chứa có URI là gs://${BUCKET_NAME}/reddit_post/YYYY/MM/food.csv.gz.

Bạn có thể tham khảo lại Cloud Editor để đọc qua cho cloud-dataproc/codelabs/spark-bigquery/backfill.sh. Đây là tập lệnh trình bao bọc dùng để thực thi trong cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Bạn sẽ sớm nhìn thấy một loạt các thông báo hoàn thành công việc. Có thể mất tối đa 15 phút để hoàn tất công việc này. Bạn cũng có thể kiểm tra kỹ bộ chứa lưu trữ để xác minh việc đầu ra dữ liệu có thành công hay không bằng cách sử dụng GCR. Sau khi hoàn tất tất cả các công việc, hãy chạy lệnh sau:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Bạn sẽ thấy kết quả sau đây:

a7c3c7b2e82f9fca.png

Xin chúc mừng! Bạn đã hoàn tất thành công việc thay thế dữ liệu bình luận trên Reddit! Nếu bạn quan tâm đến cách xây dựng các mô hình dựa trên dữ liệu này, vui lòng chuyển đến lớp học lập trình Spark-NLP.

10. Dọn dẹp

Để tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP của bạn sau khi hoàn tất phần bắt đầu nhanh này, hãy làm như sau:

  1. Xoá bộ chứa Cloud Storage đối với môi trường và bộ chứa bạn đã tạo
  2. Xoá môi trường Dataproc.

Nếu đã tạo một dự án chỉ dành cho lớp học lập trình này, bạn cũng có thể xoá dự án đó (không bắt buộc):

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.

Giấy phép

Tác phẩm này được cấp phép theo Giấy phép Creative Commons ghi nhận tác giả 3.0 chung và giấy phép Apache 2.0.