Xử lý trước dữ liệu BigQuery bằng PySpark trên Dataproc

1. Tổng quan

Lớp học lập trình này sẽ trình bày cách tạo quy trình xử lý dữ liệu bằng Apache Spark với Dataproc trên Google Cloud Platform. Đây là một trường hợp sử dụng phổ biến trong khoa học dữ liệu và kỹ thuật dữ liệu để đọc dữ liệu từ một vị trí lưu trữ, thực hiện các phép biến đổi trên dữ liệu đó và ghi vào một vị trí lưu trữ khác. Các phép biến đổi phổ biến bao gồm thay đổi nội dung của dữ liệu, loại bỏ thông tin không cần thiết và thay đổi loại tệp.

Trong lớp học lập trình này, bạn sẽ tìm hiểu về Apache Spark, chạy một quy trình mẫu bằng Dataproc với PySpark (API Python của Apache Spark), BigQuery, Google Cloud Storage và dữ liệu từ Reddit.

2. Giới thiệu về Apache Spark (Không bắt buộc)

Theo trang web, "Apache Spark là một công cụ phân tích hợp nhất để xử lý dữ liệu quy mô lớn." Công cụ này cho phép bạn phân tích và xử lý dữ liệu song song và trong bộ nhớ, cho phép tính toán song song trên nhiều máy và nút khác nhau. MapReduce Apache Spark được viết bằng Scala và sau đó có các API bằng Scala, Java, Python và R. Công cụ này chứa vô số thư viện như Spark SQL để thực hiện các truy vấn SQL trên dữ liệu, Spark Streaming để truyền trực tuyến dữ liệu, MLlib cho hoạt động học máy và GraphX để xử lý đồ thị, tất cả đều chạy trên công cụ Apache Spark.

32add0b6a47bafbc.png

Spark có thể chạy độc lập hoặc có thể tận dụng một dịch vụ quản lý tài nguyên như Yarn, Mesos hoặc Kubernetes để mở rộng quy mô. Bạn sẽ sử dụng Dataproc cho lớp học lập trình này, sử dụng Yarn.

Dữ liệu trong Spark ban đầu được tải vào bộ nhớ trong cái gọi là RDD hoặc tập dữ liệu phân tán có khả năng phục hồi. Kể từ đó, quá trình phát triển trên Spark đã bao gồm việc bổ sung hai loại dữ liệu mới theo kiểu cột: Tập dữ liệu (được nhập) và Dataframe (không được nhập). Nói một cách đơn giản, RDD rất phù hợp với mọi loại dữ liệu, trong khi Tập dữ liệu và Dataframe được tối ưu hoá cho dữ liệu dạng bảng. Vì Tập dữ liệu chỉ có sẵn với các API Java và Scala, nên chúng ta sẽ tiếp tục sử dụng API Dataframe PySpark cho lớp học lập trình này. Để biết thêm thông tin, vui lòng tham khảo tài liệu về Apache Spark documentation.

3. Trường hợp sử dụng

Kỹ sư dữ liệu thường cần dữ liệu để nhà khoa học dữ liệu có thể dễ dàng truy cập. Tuy nhiên, dữ liệu thường ban đầu bị lỗi (khó sử dụng cho số liệu phân tích ở trạng thái hiện tại) và cần được làm sạch trước khi có thể sử dụng. Ví dụ về trường hợp này là dữ liệu đã được thu thập từ web có thể chứa các mã hoá kỳ lạ hoặc thẻ HTML không cần thiết.

Trong phòng thí nghiệm này, bạn sẽ tải một tập hợp dữ liệu từ BigQuery ở dạng bài đăng trên Reddit vào một cụm Spark được lưu trữ trên Dataproc, trích xuất thông tin hữu ích và lưu trữ dữ liệu đã xử lý dưới dạng tệp CSV được nén trong Google Cloud Storage.

be2a4551ece63bfc.png

Nhà khoa học dữ liệu trưởng tại công ty của bạn quan tâm đến việc các nhóm của họ giải quyết các vấn đề khác nhau về xử lý ngôn ngữ tự nhiên. Cụ thể, họ quan tâm đến việc phân tích dữ liệu trong subreddit "r/food". Bạn sẽ tạo một pipeline cho một kết xuất dữ liệu bắt đầu bằng một quy trình bổ sung dữ liệu cũ từ tháng 1 năm 2017 đến tháng 8 năm 2019.

4. Truy cập vào BigQuery thông qua BigQuery Storage API

Việc kéo dữ liệu từ BigQuery bằng phương thức API tabledata.list có thể tốn thời gian và không hiệu quả khi lượng dữ liệu tăng lên. Phương thức này trả về một danh sách các đối tượng JSON và yêu cầu đọc tuần tự từng trang một để đọc toàn bộ tập dữ liệu.

BigQuery Storage API mang đến những cải tiến đáng kể cho việc truy cập dữ liệu trong BigQuery bằng cách sử dụng giao thức dựa trên RPC. API này hỗ trợ đọc và ghi dữ liệu song song cũng như các định dạng chuyển đổi tuần tự khác nhau như Apache AvroApache Arrow. Ở cấp độ cao, điều này giúp cải thiện đáng kể hiệu suất, đặc biệt là trên các tập dữ liệu lớn hơn.

Trong lớp học lập trình này, bạn sẽ sử dụng spark-bigquery-connector để đọc và ghi dữ liệu giữa BigQuery và Spark.

5. Tạo dự án

Đăng nhập vào Bảng điều khiển Google Cloud Platform tại console.cloud.google.com rồi tạo một dự án mới:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Tiếp theo, bạn cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.

Việc chạy lớp học lập trình này sẽ không tốn quá vài đô la, nhưng có thể tốn nhiều hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để các tài nguyên đó chạy. Phần cuối cùng của lớp học lập trình này sẽ hướng dẫn bạn cách dọn dẹp dự án.

Người dùng mới của Google Cloud Platform đủ điều kiện dùng thử miễn phí trị giá 300 USD.

6. Thiết lập môi trường

Bây giờ, bạn sẽ tiến hành thiết lập môi trường bằng cách:

  • Bật API Compute Engine, Dataproc và BigQuery Storage
  • Định cấu hình chế độ cài đặt dự án
  • Tạo một cụm Dataproc
  • Tạo một bộ chứa Google Cloud Storage

Bật API và định cấu hình môi trường

Mở Cloud Shell bằng cách nhấn vào nút ở góc trên bên phải của Cloud Console.

a10c47ee6ca41c54.png

Sau khi Cloud Shell tải, hãy chạy các lệnh sau để bật API Compute Engine, Dataproc và BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Đặt mã dự án của dự án. Bạn có thể tìm thấy mã này bằng cách chuyển đến trang chọn dự án và tìm kiếm dự án của mình. Mã này có thể không giống với tên dự án.

e682e8227aa3c781.png

76d45fb295728542.png

Chạy lệnh sau để đặt mã dự án:

gcloud config set project <project_id>

Đặt khu vực của dự án bằng cách chọn một khu vực trong danh sách tại đây. Ví dụ: us-central1.

gcloud config set dataproc/region <region>

Chọn tên cho cụm Dataproc và tạo một biến môi trường cho cụm đó.

CLUSTER_NAME=<cluster_name>

Tạo một cụm Dataproc

Tạo một cụm Dataproc bằng cách thực thi lệnh sau:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Lệnh này sẽ mất vài phút để hoàn tất. Để phân tích lệnh:

Lệnh này sẽ bắt đầu quá trình tạo một cụm Dataproc có tên mà bạn đã cung cấp trước đó. Việc sử dụng API beta sẽ bật các tính năng beta của Dataproc, chẳng hạn như Cổng thành phần.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Lệnh này sẽ đặt loại máy để sử dụng cho các trình chạy.

--worker-machine-type n1-standard-8

Lệnh này sẽ đặt số lượng trình chạy mà cụm của bạn sẽ có.

--num-workers 8

Lệnh này sẽ đặt phiên bản hình ảnh của Dataproc.

--image-version 1.5-debian

Lệnh này sẽ định cấu hình các hành động khởi chạy để sử dụng trên cụm. Tại đây, bạn đang thêm hành động khởi chạy pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Đây là siêu dữ liệu để thêm vào cụm. Tại đây, bạn đang cung cấp siêu dữ liệu cho hành động khởi chạy pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Lệnh này sẽ đặt các Thành phần không bắt buộc để cài đặt trên cụm.

--optional-components=ANACONDA

Lệnh này sẽ bật cổng thành phần cho phép bạn sử dụng Cổng thành phần của Dataproc để xem các giao diện người dùng phổ biến như Zeppelin, Jupyter hoặc Lịch sử Spark

--enable-component-gateway

Để biết thông tin giới thiệu chi tiết hơn về Dataproc, vui lòng xem lớp học lập trình này.

Tạo một bộ chứa Google Cloud Storage

Bạn sẽ cần một bộ chứa Google Cloud Storage cho đầu ra công việc. Xác định một tên riêng biệt cho bộ chứa của bạn và chạy lệnh sau để tạo một bộ chứa mới. Tên bộ chứa là duy nhất trên tất cả các dự án của Google Cloud cho tất cả người dùng, vì vậy, bạn có thể cần thử vài lần với các tên khác nhau. Bộ chứa được tạo thành công nếu bạn không nhận được ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Phân tích dữ liệu thăm dò

Trước khi thực hiện quá trình xử lý trước, bạn nên tìm hiểu thêm về bản chất của dữ liệu mà bạn đang xử lý. Để thực hiện việc này, bạn sẽ khám phá hai phương thức khám phá dữ liệu. Đầu tiên, bạn sẽ xem một số dữ liệu thô bằng Giao diện người dùng web BigQuery, sau đó bạn sẽ tính toán số lượng bài đăng trên mỗi subreddit bằng PySpark và Dataproc.

Sử dụng Giao diện người dùng web BigQuery

Bắt đầu bằng cách sử dụng Giao diện người dùng web BigQuery để xem dữ liệu. Trên biểu tượng trình đơn trong Cloud Console, hãy di chuyển xuống và nhấn vào "BigQuery" để mở Giao diện người dùng web BigQuery.

242a597d7045b4da.png

Tiếp theo, hãy chạy lệnh sau trong Trình chỉnh sửa truy vấn của Giao diện người dùng web BigQuery. Lệnh này sẽ trả về 10 hàng đầy đủ của dữ liệu từ tháng 1 năm 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Bạn có thể di chuyển trên trang để xem tất cả các cột có sẵn cũng như một số ví dụ. Cụ thể, bạn sẽ thấy hai cột đại diện cho nội dung văn bản của mỗi bài đăng: "title" (tiêu đề) và "selftext" (nội dung tự viết), cột sau là nội dung của bài đăng. Ngoài ra, hãy lưu ý các cột khác như "created_utc" là thời gian utc mà bài đăng được tạo và "subreddit" là subreddit mà bài đăng tồn tại.

Thực thi một công việc PySpark

Chạy các lệnh sau trong Cloud Shell để sao chép kho lưu trữ bằng mã mẫu và cd vào thư mục chính xác:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Bạn có thể sử dụng PySpark để xác định số lượng bài đăng tồn tại cho mỗi subreddit. Bạn có thể mở Cloud Editor và đọc tập lệnh cloud-dataproc/codelabs/spark-bigquery trước khi thực thi tập lệnh đó trong bước tiếp theo:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Nhấp vào nút "Open Terminal" (Mở cửa sổ dòng lệnh) trong Cloud Editor để chuyển lại Cloud Shell và chạy lệnh sau để thực thi công việc PySpark đầu tiên:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Lệnh này cho phép bạn gửi công việc đến Dataproc thông qua Jobs API. Tại đây, bạn đang chỉ định loại công việc là pyspark. Bạn có thể cung cấp tên cụm, các tham số không bắt buộc và tên của tệp chứa công việc. Tại đây, bạn đang cung cấp tham số --jars cho phép bạn thêm spark-bigquery-connector vào công việc của mình. Bạn cũng có thể đặt các cấp đầu ra nhật ký bằng cách sử dụng --driver-log-levels root=FATAL để chặn tất cả đầu ra nhật ký ngoại trừ lỗi. Nhật ký Spark có xu hướng khá ồn.

Quá trình này sẽ mất vài phút để chạy và đầu ra cuối cùng sẽ có dạng như sau:

6c185228db47bb18.png

8. Khám phá giao diện người dùng Dataproc và Spark

Khi chạy các công việc Spark trên Dataproc, bạn có quyền truy cập vào hai giao diện người dùng để kiểm tra trạng thái của các công việc / cụm. Giao diện người dùng đầu tiên là giao diện người dùng Dataproc. Bạn có thể tìm thấy giao diện này bằng cách nhấp vào biểu tượng trình đơn và di chuyển xuống Dataproc. Tại đây, bạn có thể thấy bộ nhớ hiện có cũng như bộ nhớ đang chờ xử lý và số lượng trình chạy.

6f2987346d15c8e2.png

Bạn cũng có thể nhấp vào thẻ công việc để xem các công việc đã hoàn tất. Bạn có thể xem thông tin chi tiết về công việc như nhật ký và đầu ra của các công việc đó bằng cách nhấp vào ID công việc cho một công việc cụ thể. 114d90129b0e4c88.png

1b2160f0f484594a.png

Bạn cũng có thể xem giao diện người dùng Spark. Trên trang công việc, hãy nhấp vào mũi tên quay lại rồi nhấp vào Giao diện web. Bạn sẽ thấy một số lựa chọn trong cổng thành phần. Bạn có thể bật nhiều lựa chọn trong số này thông qua Thành phần không bắt buộc khi thiết lập cụm. Đối với phòng thí nghiệm này, hãy nhấp vào "Spark History Server".

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Thao tác này sẽ mở cửa sổ sau:

8f6786760f994fe8.png

Tất cả các công việc đã hoàn tất sẽ xuất hiện tại đây và bạn có thể nhấp vào bất kỳ application_id nào để tìm hiểu thêm thông tin về công việc đó. Tương tự, bạn có thể nhấp vào "Show Incomplete Applications" (Hiển thị các ứng dụng chưa hoàn tất) ở cuối trang đích để xem tất cả các công việc hiện đang chạy.

9. Chạy công việc điền ngược

Bây giờ, bạn sẽ chạy một công việc tải dữ liệu vào bộ nhớ, trích xuất thông tin cần thiết và kết xuất đầu ra vào một bộ chứa Google Cloud Storage. Bạn sẽ trích xuất "title" (tiêu đề), "body" (nội dung thô) và "timestamp created" (dấu thời gian được tạo) cho mỗi bình luận trên Reddit. Sau đó, bạn sẽ lấy dữ liệu này, chuyển đổi thành tệp csv, nén tệp đó và tải vào một bộ chứa có URI là gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Bạn có thể tham khảo lại Cloud Editor để đọc cho cloud-dataproc/codelabs/spark-bigquery/backfill.sh, đây là một tập lệnh trình bao bọc để thực thi trong cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Bạn sẽ sớm thấy một loạt thông báo hoàn tất công việc. Công việc có thể mất đến 15 phút để hoàn tất. Bạn cũng có thể kiểm tra kỹ bộ chứa lưu trữ để xác minh đầu ra dữ liệu thành công bằng cách sử dụng gsutil. Sau khi tất cả các công việc hoàn tất, hãy chạy lệnh sau:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Bạn sẽ thấy kết quả sau đây:

a7c3c7b2e82f9fca.png

Xin chúc mừng, bạn đã hoàn tất thành công quá trình điền ngược cho dữ liệu bình luận trên Reddit! Nếu bạn quan tâm đến cách xây dựng mô hình dựa trên dữ liệu này, vui lòng tiếp tục xem lớp học lập trình Spark-NLP.

10. Dọn dẹp

Để tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP sau khi hoàn tất hướng dẫn bắt đầu nhanh này:

  1. Xoá bộ chứa Cloud Storage cho môi trường và bộ chứa mà bạn đã tạo
  2. Xoá môi trường Dataproc.

Nếu bạn chỉ tạo một dự án cho lớp học lập trình này, bạn cũng có thể xoá dự án đó (không bắt buộc):

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Projects (Dự án).
  2. Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Delete (Xoá).
  3. Trong hộp, hãy nhập mã dự án, sau đó nhấp vào Shut down (Tắt) để xoá dự án.

Giấy phép

Tác phẩm này được cấp phép theo giấy phép Ghi công theo Creative Commons 3.0 Chung và giấy phép Apache 2.0.