Xử lý trước dữ liệu BigQuery bằng PySpark trên Dataproc

1. Tổng quan

Lớp học lập trình này sẽ trình bày cách tạo quy trình xử lý dữ liệu bằng Apache Spark với Dataproc trên Google Cloud Platform. Đây là trường hợp sử dụng phổ biến trong khoa học dữ liệu và kỹ thuật dữ liệu để đọc dữ liệu từ một vị trí lưu trữ, thực hiện các phép biến đổi trên dữ liệu đó và ghi dữ liệu vào một vị trí lưu trữ khác. Các phép biến đổi phổ biến bao gồm thay đổi nội dung của dữ liệu, loại bỏ thông tin không cần thiết và thay đổi loại tệp.

Trong lớp học lập trình này, bạn sẽ tìm hiểu về Apache Spark, chạy một quy trình mẫu bằng Dataproc với PySpark (API Python của Apache Spark), BigQuery, Google Cloud Storage và dữ liệu từ Reddit.

2. Giới thiệu về Apache Spark (Không bắt buộc)

Theo trang web này, "Apache Spark là một công cụ phân tích hợp nhất để xử lý dữ liệu trên quy mô lớn". Công cụ này cho phép bạn phân tích và xử lý dữ liệu song song và trong bộ nhớ, cho phép tính toán song song trên nhiều máy và nút khác nhau. Phiên bản này được phát hành lần đầu vào năm 2014 dưới dạng bản nâng cấp cho MapReduce truyền thống và vẫn là một trong những khung phổ biến nhất để thực hiện các phép tính trên quy mô lớn. Apache Spark được viết bằng Scala và sau đó có các API trong Scala, Java, Python và R. Thư viện này chứa rất nhiều thư viện như Spark SQL để thực hiện các truy vấn SQL trên dữ liệu, Spark Streaming để truyền trực tuyến dữ liệu, MLlib để học máy và GraphX để xử lý biểu đồ, tất cả đều chạy trên công cụ Apache Spark.

32add0b6a47bafbc.png

Spark có thể tự chạy hoặc tận dụng một dịch vụ quản lý tài nguyên như Yarn, Mesos hoặc Kubernetes để mở rộng quy mô. Bạn sẽ sử dụng Dataproc cho lớp học lập trình này. Dataproc sử dụng Yarn.

Dữ liệu trong Spark ban đầu được tải vào bộ nhớ vào một vùng gọi là RDD (tập dữ liệu phân tán có khả năng phục hồi). Kể từ đó, quá trình phát triển trên Spark đã thêm hai loại dữ liệu mới, kiểu cột: Tập dữ liệu (được nhập) và Khung dữ liệu (không được nhập). Nói một cách đơn giản, RDD rất phù hợp với mọi loại dữ liệu, trong khi Tập dữ liệu và Khung dữ liệu được tối ưu hoá cho dữ liệu dạng bảng. Vì Tập dữ liệu chỉ có sẵn với API Java và Scala, nên chúng ta sẽ tiếp tục sử dụng API Dataframe PySpark cho lớp học lập trình này. Để biết thêm thông tin, vui lòng tham khảo tài liệu về Apache Spark.

3. Trường hợp sử dụng

Kỹ sư dữ liệu thường cần nhà khoa học dữ liệu dễ dàng truy cập vào dữ liệu. Tuy nhiên, dữ liệu ban đầu thường không sạch (khó sử dụng cho mục đích phân tích ở trạng thái hiện tại) và cần được làm sạch trước khi có thể sử dụng được. Ví dụ: dữ liệu được thu thập từ web có thể chứa các mã hoá lạ hoặc thẻ HTML không liên quan.

Trong lớp học này, bạn sẽ tải một tập dữ liệu từ BigQuery ở dạng bài đăng trên Reddit vào một cụm Spark được lưu trữ trên Dataproc, trích xuất thông tin hữu ích và lưu trữ dữ liệu đã xử lý dưới dạng tệp CSV nén trong Google Cloud Storage.

be2a4551ece63bfc.png

Nhà khoa học dữ liệu chính tại công ty của bạn muốn các nhóm của họ giải quyết nhiều vấn đề về xử lý ngôn ngữ tự nhiên. Cụ thể, họ muốn phân tích dữ liệu trong subreddit "r/food". Bạn sẽ tạo một quy trình để kết xuất dữ liệu bắt đầu bằng dữ liệu bổ sung từ tháng 1 năm 2017 đến tháng 8 năm 2019.

4. Truy cập vào BigQuery thông qua BigQuery Storage API

Việc lấy dữ liệu từ BigQuery bằng phương thức API tabledata.list có thể tốn nhiều thời gian và không hiệu quả khi lượng dữ liệu tăng lên. Phương thức này trả về danh sách đối tượng JSON và yêu cầu đọc tuần tự từng trang một để đọc toàn bộ tập dữ liệu.

BigQuery Storage API mang lại những điểm cải tiến đáng kể cho việc truy cập dữ liệu trong BigQuery bằng cách sử dụng giao thức dựa trên RPC. Thư viện này hỗ trợ đọc và ghi dữ liệu song song cũng như nhiều định dạng chuyển đổi tuần tự, chẳng hạn như Apache AvroApache Arrow. Nhìn chung, điều này giúp cải thiện đáng kể hiệu suất, đặc biệt là trên các tập dữ liệu lớn hơn.

Trong lớp học lập trình này, bạn sẽ sử dụng spark-bigquery-connector để đọc và ghi dữ liệu giữa BigQuery và Spark.

5. Tạo dự án

Đăng nhập vào bảng điều khiển Google Cloud Platform tại console.cloud.google.com và tạo một dự án mới:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Tiếp theo, bạn cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.

Việc tham gia lớp học lập trình này sẽ không tốn quá vài đô la, nhưng có thể tốn nhiều hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để các tài nguyên đó chạy. Phần cuối cùng của lớp học lập trình này sẽ hướng dẫn bạn cách dọn dẹp dự án.

Người dùng mới của Google Cloud Platform đủ điều kiện dùng thử miễn phí 300 USD.

6. Thiết lập môi trường

Bây giờ, bạn sẽ thiết lập môi trường của mình bằng cách:

  • Bật Compute Engine, Dataproc và BigQuery Storage API
  • Định cấu hình chế độ cài đặt dự án
  • Tạo cụm Dataproc
  • Tạo bộ chứa Google Cloud Storage

Bật API và định cấu hình môi trường

Mở Cloud Shell bằng cách nhấn vào nút ở góc trên cùng bên phải của Cloud Console.

a10c47ee6ca41c54.png

Sau khi Cloud Shell tải, hãy chạy các lệnh sau để bật Compute Engine, Dataproc và BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Đặt project id (mã dự án) của dự án. Bạn có thể tìm thấy dự án đó bằng cách chuyển đến trang lựa chọn dự án và tìm dự án của mình. Tên này có thể không giống với tên dự án của bạn.

e682e8227aa3c781.png

76d45fb295728542.png

Chạy lệnh sau để đặt mã dự án:

gcloud config set project <project_id>

Đặt khu vực của dự án bằng cách chọn một khu vực trong danh sách tại đây. Ví dụ: us-central1.

gcloud config set dataproc/region <region>

Chọn tên cho cụm Dataproc và tạo biến môi trường cho cụm đó.

CLUSTER_NAME=<cluster_name>

Tạo cụm Dataproc

Tạo cụm Dataproc bằng cách thực thi lệnh sau:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Quá trình thực thi lệnh này sẽ mất vài phút. Cách phân tích lệnh:

Thao tác này sẽ bắt đầu tạo một cụm Dataproc có tên mà bạn đã cung cấp trước đó. Việc sử dụng API beta sẽ bật các tính năng beta của Dataproc, chẳng hạn như Cổng thành phần.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Thao tác này sẽ đặt loại máy để sử dụng cho worker.

--worker-machine-type n1-standard-8

Thao tác này sẽ thiết lập số lượng worker cho cụm của bạn.

--num-workers 8

Thao tác này sẽ thiết lập phiên bản hình ảnh của Dataproc.

--image-version 1.5-debian

Thao tác này sẽ định cấu hình các thao tác khởi chạy để sử dụng trên cụm. Tại đây, bạn sẽ thêm thao tác khởi chạy pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Đây là siêu dữ liệu cần đưa vào cụm. Tại đây, bạn đang cung cấp siêu dữ liệu cho thao tác khởi chạy pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Thao tác này sẽ thiết lập Thành phần không bắt buộc để cài đặt trên cụm.

--optional-components=ANACONDA

Thao tác này sẽ bật cổng thành phần cho phép bạn sử dụng Cổng thành phần của Dataproc để xem các giao diện người dùng phổ biến như Zeppelin, Jupyter hoặc Nhật ký Spark

--enable-component-gateway

Để biết thêm thông tin giới thiệu chuyên sâu về Dataproc, vui lòng xem codelab này.

Tạo bộ chứa Google Cloud Storage

Bạn cần có một bộ chứa Google Cloud Storage để lưu kết quả công việc. Xác định tên riêng biệt cho bộ chứa và chạy lệnh sau để tạo bộ chứa mới. Tên bộ chứa phải là duy nhất trên tất cả các dự án Google Cloud cho tất cả người dùng. Vì vậy, bạn có thể phải thử một vài lần với các tên khác nhau. Bạn sẽ tạo thành công một bộ chứa nếu không nhận được ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Phân tích dữ liệu khám phá

Trước khi thực hiện quá trình xử lý trước, bạn nên tìm hiểu thêm về bản chất của dữ liệu mà bạn đang xử lý. Để làm việc này, bạn sẽ khám phá hai phương pháp khám phá dữ liệu. Trước tiên, bạn sẽ xem một số dữ liệu thô bằng giao diện người dùng BigQuery trên web, sau đó tính số lượng bài đăng trên mỗi subreddit bằng PySpark và Dataproc.

Sử dụng giao diện người dùng web của BigQuery

Bắt đầu bằng cách sử dụng Giao diện người dùng BigQuery trên web để xem dữ liệu. Trên biểu tượng trình đơn trong Cloud Console, hãy di chuyển xuống rồi nhấn vào "BigQuery" để mở giao diện người dùng BigQuery trên web.

242a597d7045b4da.png

Tiếp theo, hãy chạy lệnh sau trong Trình chỉnh sửa truy vấn trên giao diện người dùng web của BigQuery. Thao tác này sẽ trả về 10 hàng đầy đủ dữ liệu từ tháng 1 năm 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Bạn có thể di chuyển trên trang để xem tất cả các cột có sẵn cũng như một số ví dụ. Cụ thể, bạn sẽ thấy hai cột đại diện cho nội dung văn bản của mỗi bài đăng: "title" (tiêu đề) và "selftext" (nội dung chính), trong đó cột sau là nội dung chính của bài đăng. Ngoài ra, hãy lưu ý các cột khác như "created_utc" (giờ tạo theo giờ UTC) là thời gian tạo bài đăng và "subreddit" (nhóm nhỏ) là nhóm nhỏ chứa bài đăng.

Thực thi công việc PySpark

Chạy các lệnh sau trong Cloud Shell để sao chép kho lưu trữ bằng mã mẫu và cd vào thư mục chính xác:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Bạn có thể sử dụng PySpark để xác định số lượng bài đăng hiện có cho mỗi subreddit. Bạn có thể mở Cloud Editor và đọc tập lệnh cloud-dataproc/codelabs/spark-bigquery trước khi thực thi tập lệnh đó ở bước tiếp theo:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Nhấp vào nút "Mở dòng lệnh" trong Trình chỉnh sửa trên đám mây để chuyển về Cloud Shell và chạy lệnh sau để thực thi công việc PySpark đầu tiên:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Lệnh này cho phép bạn gửi công việc đến Dataproc thông qua API Công việc. Tại đây, bạn đang chỉ định loại công việc là pyspark. Bạn có thể cung cấp tên cụm, các tham số không bắt buộc và tên của tệp chứa công việc. Tại đây, bạn đang cung cấp tham số --jars cho phép bạn đưa spark-bigquery-connector vào công việc của mình. Bạn cũng có thể đặt cấp độ đầu ra nhật ký bằng --driver-log-levels root=FATAL. Thao tác này sẽ ngăn tất cả đầu ra nhật ký ngoại trừ lỗi. Nhật ký Spark thường khá lộn xộn.

Quá trình này sẽ mất vài phút để chạy và kết quả cuối cùng sẽ có dạng như sau:

6c185228db47bb18.png

8. Khám phá giao diện người dùng Dataproc và Spark

Khi chạy công việc Spark trên Dataproc, bạn có quyền truy cập vào hai giao diện người dùng để kiểm tra trạng thái của công việc / kluster. Giao diện người dùng Dataproc là giao diện đầu tiên. Bạn có thể tìm thấy giao diện này bằng cách nhấp vào biểu tượng trình đơn rồi di chuyển xuống Dataproc. Tại đây, bạn có thể xem bộ nhớ hiện có cũng như bộ nhớ đang chờ xử lý và số lượng worker.

6f2987346d15c8e2.png

Bạn cũng có thể nhấp vào thẻ công việc để xem các công việc đã hoàn thành. Bạn có thể xem thông tin chi tiết về công việc, chẳng hạn như nhật ký và đầu ra của các công việc đó bằng cách nhấp vào Mã công việc của một công việc cụ thể. 114d90129b0e4c88.png

1b2160f0f484594a.png

Bạn cũng có thể xem giao diện người dùng Spark. Trên trang công việc, hãy nhấp vào mũi tên quay lại, rồi nhấp vào Giao diện web. Bạn sẽ thấy một số tuỳ chọn trong cổng thành phần. Bạn có thể bật nhiều thành phần trong số này thông qua Thành phần không bắt buộc khi thiết lập cụm. Đối với lớp học lập trình này, hãy nhấp vào "Máy chủ nhật ký Spark".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Thao tác này sẽ mở ra cửa sổ sau:

8f6786760f994fe8.png

Tất cả công việc đã hoàn thành sẽ xuất hiện ở đây và bạn có thể nhấp vào bất kỳ application_id nào để tìm hiểu thêm thông tin về công việc đó. Tương tự, bạn có thể nhấp vào "Hiển thị ứng dụng chưa hoàn tất" ở cuối trang đích để xem tất cả công việc đang chạy.

9. Chạy công việc Điền sẵn

Bây giờ, bạn sẽ chạy một công việc tải dữ liệu vào bộ nhớ, trích xuất thông tin cần thiết và kết xuất đầu ra vào một bộ chứa Google Cloud Storage. Bạn sẽ trích xuất "title" (tiêu đề), "body" (nội dung) (văn bản thô) và "timestamp created" (dấu thời gian tạo) cho mỗi bình luận trên Reddit. Sau đó, bạn sẽ lấy dữ liệu này, chuyển đổi thành csv, nén và tải dữ liệu đó vào một bộ chứa có URI là gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Bạn có thể tham khảo lại Trình chỉnh sửa trên đám mây để đọc cho cloud-dataproc/codelabs/spark-bigquery/backfill.sh. Đây là một tập lệnh trình bao bọc để thực thi trong cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Bạn sẽ sớm thấy một loạt thông báo hoàn tất công việc. Quá trình này có thể mất tối đa 15 phút để hoàn tất. Bạn cũng có thể kiểm tra kỹ bộ chứa bộ nhớ để xác minh dữ liệu đầu ra thành công bằng cách sử dụng gsutil. Sau khi tất cả công việc hoàn tất, hãy chạy lệnh sau:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Bạn sẽ thấy kết quả sau đây:

a7c3c7b2e82f9fca.png

Xin chúc mừng! Bạn đã hoàn tất việc điền lại dữ liệu bình luận trên Reddit! Nếu bạn quan tâm đến cách tạo mô hình dựa trên dữ liệu này, vui lòng tiếp tục tham gia lớp học lập trình Spark-NLP.

10. Dọn dẹp

Để tránh bị tính phí không cần thiết cho tài khoản GCP sau khi hoàn tất hướng dẫn nhanh này, hãy làm như sau:

  1. Xoá bộ chứa trên Cloud Storage cho môi trường và bộ chứa mà bạn đã tạo
  2. Xoá môi trường Dataproc.

Nếu chỉ tạo một dự án cho lớp học lập trình này, bạn cũng có thể xoá dự án đó nếu muốn:

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Projects (Dự án).
  2. Trong danh sách dự án, hãy chọn dự án bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.

Giấy phép

Tác phẩm này được cấp phép theo Giấy phép chung Ghi công theo Creative Commons 3.0 và giấy phép Apache 2.0.