Xử lý trước dữ liệu BigQuery bằng PySpark trên Dataproc

1. Tổng quan

Lớp học lập trình này sẽ hướng dẫn cách tạo một quy trình xử lý dữ liệu bằng Apache Spark với Dataproc trên Google Cloud Platform. Đây là một trường hợp sử dụng phổ biến trong khoa học dữ liệu và kỹ thuật dữ liệu để đọc dữ liệu từ một vị trí lưu trữ, thực hiện các phép biến đổi trên dữ liệu đó và ghi dữ liệu vào một vị trí lưu trữ khác. Các hoạt động chuyển đổi thường gặp bao gồm thay đổi nội dung của dữ liệu, loại bỏ thông tin không cần thiết và thay đổi loại tệp.

Trong lớp học lập trình này, bạn sẽ tìm hiểu về Apache Spark, chạy một quy trình mẫu bằng Dataproc với PySpark (API Python của Apache Spark), BigQuery, Google Cloud Storage và dữ liệu từ Reddit.

2. Giới thiệu về Apache Spark (Không bắt buộc)

Theo trang web này, "Apache Spark là một công cụ phân tích hợp nhất để xử lý dữ liệu quy mô lớn". Công cụ này cho phép bạn phân tích và xử lý dữ liệu song song và trong bộ nhớ, nhờ đó cho phép tính toán song song trên nhiều máy và nút khác nhau. Ban đầu, Spark được phát hành vào năm 2014 dưới dạng bản nâng cấp cho MapReduce truyền thống và vẫn là một trong những khung phổ biến nhất để thực hiện các phép tính quy mô lớn. Apache Spark được viết bằng Scala và sau đó có các API bằng Scala, Java, Python và R. Nền tảng này chứa rất nhiều thư viện như Spark SQL để thực hiện các truy vấn SQL trên dữ liệu, Spark Streaming để truyền dữ liệu trực tuyến, MLlib để học máy và GraphX để xử lý đồ thị, tất cả đều chạy trên công cụ Apache Spark.

32add0b6a47bafbc.png

Spark có thể chạy độc lập hoặc tận dụng một dịch vụ quản lý tài nguyên như Yarn, Mesos hoặc Kubernetes để mở rộng quy mô. Bạn sẽ sử dụng Dataproc cho lớp học lập trình này, trong đó sử dụng Yarn.

Dữ liệu trong Spark ban đầu được tải vào bộ nhớ thành một RDD (tập dữ liệu phân tán có khả năng phục hồi). Kể từ đó, quá trình phát triển trên Spark đã bao gồm việc bổ sung hai loại dữ liệu mới theo kiểu cột: Tập dữ liệu (được phân loại) và Khung dữ liệu (không được phân loại). Nói một cách đơn giản, RDD rất phù hợp với mọi loại dữ liệu, trong khi Tập dữ liệu và Khung dữ liệu được tối ưu hoá cho dữ liệu dạng bảng. Vì Tập dữ liệu chỉ có trong API Java và Scala, nên chúng ta sẽ tiếp tục sử dụng API Dataframe của PySpark cho lớp học lập trình này. Để biết thêm thông tin, vui lòng tham khảo tài liệu của Apache Spark.

3. Trường hợp sử dụng

Kỹ sư dữ liệu thường cần dữ liệu dễ dàng truy cập đối với nhà khoa học dữ liệu. Tuy nhiên, dữ liệu thường ban đầu không được chuẩn hoá (khó sử dụng cho hoạt động phân tích ở trạng thái hiện tại) và cần được chuẩn hoá trước khi có thể sử dụng. Ví dụ: dữ liệu được lấy từ web có thể chứa các mã hoá lạ hoặc thẻ HTML thừa.

Trong bài thực hành này, bạn sẽ tải một tập dữ liệu từ BigQuery dưới dạng các bài đăng trên Reddit vào một cụm Spark được lưu trữ trên Dataproc, trích xuất thông tin hữu ích và lưu trữ dữ liệu đã xử lý dưới dạng tệp CSV nén trong Google Cloud Storage.

be2a4551ece63bfc.png

Nhà khoa học dữ liệu trưởng tại công ty của bạn muốn các nhóm của mình giải quyết nhiều vấn đề về xử lý ngôn ngữ tự nhiên. Cụ thể, họ muốn phân tích dữ liệu trong subreddit "r/food". Bạn sẽ tạo một quy trình cho một bản kết xuất dữ liệu, bắt đầu bằng một quy trình bổ sung dữ liệu từ tháng 1 năm 2017 đến tháng 8 năm 2019.

4. Truy cập vào BigQuery thông qua BigQuery Storage API

Việc kéo dữ liệu từ BigQuery bằng phương thức tabledata.list API có thể tốn nhiều thời gian và không hiệu quả khi lượng dữ liệu tăng lên. Phương thức này trả về một danh sách các đối tượng JSON và yêu cầu đọc tuần tự từng trang một để đọc toàn bộ tập dữ liệu.

BigQuery Storage API mang đến những cải tiến đáng kể cho việc truy cập dữ liệu trong BigQuery bằng cách sử dụng giao thức dựa trên RPC. Nó hỗ trợ việc đọc và ghi dữ liệu song song cũng như nhiều định dạng chuyển đổi tuần tự như Apache AvroApache Arrow. Nhìn chung, điều này giúp cải thiện đáng kể hiệu suất, đặc biệt là trên các tập dữ liệu lớn hơn.

Trong lớp học lập trình này, bạn sẽ sử dụng spark-bigquery-connector để đọc và ghi dữ liệu giữa BigQuery và Spark.

5. Tạo dự án

Đăng nhập vào Bảng điều khiển Google Cloud Platform tại console.cloud.google.com rồi tạo một dự án mới:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Tiếp theo, bạn cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.

Việc thực hiện lớp học lập trình này sẽ không tốn của bạn quá vài đô la, nhưng có thể tốn nhiều hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để các tài nguyên đó chạy. Phần cuối cùng của lớp học lập trình này sẽ hướng dẫn bạn cách dọn dẹp dự án.

Người dùng mới của Google Cloud Platform đủ điều kiện dùng thử miễn phí trị giá 300 USD.

6. Thiết lập môi trường

Giờ đây, bạn sẽ tiến hành thiết lập môi trường bằng cách:

  • Bật Compute Engine, Dataproc và BigQuery Storage API
  • Định cấu hình chế độ cài đặt dự án
  • Tạo một cụm Dataproc
  • Tạo bộ chứa Google Cloud Storage

Bật API và định cấu hình môi trường

Mở Cloud Shell bằng cách nhấn vào nút ở góc trên cùng bên phải của Bảng điều khiển đám mây.

a10c47ee6ca41c54.png

Sau khi Cloud Shell tải, hãy chạy các lệnh sau để bật Compute Engine, Dataproc và BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Đặt mã dự án cho dự án của bạn. Bạn có thể tìm thấy dự án bằng cách chuyển đến trang chọn dự án rồi tìm kiếm dự án của mình. Tên này có thể không giống với tên dự án của bạn.

e682e8227aa3c781.png

76d45fb295728542.png

Chạy lệnh sau để đặt mã dự án:

gcloud config set project <project_id>

Đặt khu vực cho dự án bằng cách chọn một khu vực trong danh sách tại đây. Ví dụ: us-central1.

gcloud config set dataproc/region <region>

Chọn tên cho cụm Dataproc và tạo một biến môi trường cho cụm đó.

CLUSTER_NAME=<cluster_name>

Tạo một Cụm Dataproc

Tạo một cụm Dataproc bằng cách thực thi lệnh sau:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Lệnh này sẽ mất vài phút để hoàn tất. Để phân tích lệnh này:

Thao tác này sẽ bắt đầu quá trình tạo một cụm Dataproc có tên mà bạn đã cung cấp trước đó. Việc sử dụng API beta sẽ cho phép các tính năng beta của Dataproc, chẳng hạn như Cổng thành phần.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Thao tác này sẽ đặt loại máy mà nhân viên của bạn sẽ sử dụng.

--worker-machine-type n1-standard-8

Thao tác này sẽ đặt số lượng worker mà cụm của bạn sẽ có.

--num-workers 8

Thao tác này sẽ đặt phiên bản hình ảnh của Dataproc.

--image-version 1.5-debian

Thao tác này sẽ định cấu hình các thao tác khởi chạy sẽ được dùng trên cụm. Ở đây, bạn đang thêm thao tác khởi tạo pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Đây là siêu dữ liệu cần thêm vào cụm. Tại đây, bạn đang cung cấp siêu dữ liệu cho thao tác khởi tạo pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Thao tác này sẽ đặt Các thành phần không bắt buộc để cài đặt trên cụm.

--optional-components=ANACONDA

Thao tác này sẽ bật cổng thành phần, cho phép bạn sử dụng Cổng thành phần của Dataproc để xem các giao diện người dùng phổ biến như Zeppelin, Jupyter hoặc Spark History

--enable-component-gateway

Để biết thông tin giới thiệu chi tiết hơn về Dataproc, vui lòng xem lớp học lập trình này.

Tạo bộ chứa Google Cloud Storage

Bạn sẽ cần một bộ chứa Google Cloud Storage cho đầu ra của công việc. Xác định một tên riêng biệt cho nhóm của bạn và chạy lệnh sau để tạo một nhóm mới. Tên bộ chứa là duy nhất trên tất cả các dự án của Google Cloud đối với tất cả người dùng, vì vậy, bạn có thể cần thử vài lần với các tên khác nhau. Bạn sẽ tạo thành công một nhóm nếu không nhận được ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Phân tích dữ liệu khám phá

Trước khi thực hiện quy trình tiền xử lý, bạn nên tìm hiểu thêm về bản chất của dữ liệu mà bạn đang xử lý. Để làm được điều này, bạn sẽ khám phá 2 phương pháp khám phá dữ liệu. Trước tiên, bạn sẽ xem một số dữ liệu thô bằng Giao diện người dùng web BigQuery, sau đó bạn sẽ tính số lượng bài đăng trên mỗi subreddit bằng PySpark và Dataproc.

Sử dụng giao diện người dùng web BigQuery

Bắt đầu bằng cách sử dụng giao diện người dùng BigQuery trên web để xem dữ liệu. Trong biểu tượng trình đơn trên Cloud Console, hãy di chuyển xuống rồi nhấn vào "BigQuery" để mở Giao diện người dùng BigQuery trên web.

242a597d7045b4da.png

Tiếp theo, hãy chạy lệnh sau trong Trình chỉnh sửa truy vấn của giao diện người dùng web BigQuery. Thao tác này sẽ trả về 10 hàng dữ liệu đầy đủ từ tháng 1 năm 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Bạn có thể di chuyển trên trang để xem tất cả các cột có sẵn cũng như một số ví dụ. Cụ thể, bạn sẽ thấy 2 cột đại diện cho nội dung dạng văn bản của mỗi bài đăng: "title" (tiêu đề) và "selftext" (nội dung bài đăng). Ngoài ra, hãy lưu ý đến các cột khác như "created_utc" là thời gian utc mà bài đăng được tạo và "subreddit" là subreddit mà bài đăng tồn tại.

Thực thi một lệnh PySpark

Chạy các lệnh sau trong Cloud Shell để sao chép kho lưu trữ bằng mã mẫu và chuyển đến đúng thư mục:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Bạn có thể dùng PySpark để xác định số lượng bài đăng cho mỗi subreddit. Bạn có thể mở Cloud Editor và đọc tập lệnh cloud-dataproc/codelabs/spark-bigquery trước khi thực thi tập lệnh đó ở bước tiếp theo:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Nhấp vào nút "Mở thiết bị đầu cuối" trong Cloud Editor để chuyển về Cloud Shell và chạy lệnh sau để thực thi công việc PySpark đầu tiên:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Lệnh này cho phép bạn gửi các lệnh đến Dataproc thông qua Jobs API. Tại đây, bạn đang cho biết loại công việc là pyspark. Bạn có thể cung cấp tên cụm, các tham số không bắt buộc và tên của tệp chứa công việc. Ở đây, bạn đang cung cấp tham số --jars cho phép bạn thêm spark-bigquery-connector vào công việc. Bạn cũng có thể đặt cấp độ đầu ra nhật ký bằng cách sử dụng --driver-log-levels root=FATAL. Thao tác này sẽ chặn mọi đầu ra nhật ký, ngoại trừ lỗi. Nhật ký Spark thường khá ồn ào.

Quá trình này sẽ mất vài phút để chạy và đầu ra cuối cùng của bạn sẽ có dạng như sau:

6c185228db47bb18.png

8. Khám phá giao diện người dùng Dataproc và Spark

Khi chạy các công việc Spark trên Dataproc, bạn có quyền truy cập vào 2 giao diện người dùng để kiểm tra trạng thái của các công việc / cụm. Cách đầu tiên là sử dụng giao diện người dùng Dataproc. Bạn có thể tìm thấy giao diện này bằng cách nhấp vào biểu tượng trình đơn rồi di chuyển xuống Dataproc. Tại đây, bạn có thể xem bộ nhớ hiện có, bộ nhớ đang chờ xử lý và số lượng worker.

6f2987346d15c8e2.png

Bạn cũng có thể nhấp vào thẻ công việc để xem các công việc đã hoàn thành. Bạn có thể xem thông tin chi tiết về công việc, chẳng hạn như nhật ký và đầu ra của những công việc đó bằng cách nhấp vào ID công việc của một công việc cụ thể. 114d90129b0e4c88.png

1b2160f0f484594a.png

Bạn cũng có thể xem giao diện người dùng Spark. Trên trang công việc, hãy nhấp vào mũi tên quay lại rồi nhấp vào Giao diện web. Bạn sẽ thấy một số lựa chọn trong cổng thành phần. Bạn có thể bật nhiều thành phần trong số này thông qua Các thành phần không bắt buộc khi thiết lập cụm. Đối với lớp học lập trình này, hãy nhấp vào "Spark History Server.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Thao tác này sẽ mở cửa sổ sau:

8f6786760f994fe8.png

Tất cả các công việc đã hoàn thành sẽ xuất hiện ở đây và bạn có thể nhấp vào bất kỳ application_id nào để tìm hiểu thêm thông tin về công việc đó. Tương tự, bạn có thể nhấp vào "Show Incomplete Applications" (Hiện các ứng dụng chưa hoàn tất) ở cuối trang đích để xem tất cả các công việc hiện đang chạy.

9. Chạy lệnh Backfill Job

Giờ đây, bạn sẽ chạy một công việc tải dữ liệu vào bộ nhớ, trích xuất thông tin cần thiết và kết xuất đầu ra vào một bộ chứa Google Cloud Storage. Bạn sẽ trích xuất "tiêu đề", "nội dung" (văn bản thô) và "dấu thời gian tạo" cho từng bình luận trên Reddit. Sau đó, bạn sẽ lấy dữ liệu này, chuyển đổi thành tệp csv, nén tệp đó và tải vào một vùng chứa có URI là gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Bạn có thể tham khảo lại Cloud Editor để đọc cho cloud-dataproc/codelabs/spark-bigquery/backfill.sh. Đây là một tập lệnh bao bọc để thực thi trong cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Bạn sẽ sớm thấy hàng loạt thông báo hoàn tất công việc. Công việc này có thể mất tối đa 15 phút để hoàn tất. Bạn cũng có thể kiểm tra kỹ bộ chứa lưu trữ để xác minh dữ liệu đầu ra thành công bằng cách sử dụng gsutil. Sau khi tất cả các lệnh đã hoàn tất, hãy chạy lệnh sau:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Bạn sẽ thấy kết quả sau đây:

a7c3c7b2e82f9fca.png

Xin chúc mừng, bạn đã hoàn tất thành công việc điền dữ liệu cũ cho dữ liệu bình luận trên Reddit! Nếu bạn quan tâm đến cách tạo mô hình dựa trên dữ liệu này, vui lòng tiếp tục tham gia lớp học lập trình Spark-NLP.

10. Dọn dẹp

Để tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP sau khi hoàn tất hướng dẫn bắt đầu nhanh này, hãy làm như sau:

  1. Xoá bộ chứa Cloud Storage cho môi trường mà bạn đã tạo
  2. Xoá môi trường Dataproc.

Nếu chỉ tạo một dự án cho lớp học lập trình này, bạn cũng có thể xoá dự án (không bắt buộc):

  1. Trong Bảng điều khiển của GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.

Giấy phép

Tác phẩm này được cấp phép theo giấy phép Ghi công theo Creative Commons 3.0 và giấy phép Apache 2.0.