1. Tổng quan – Google Dataproc
Dataproc là một dịch vụ được quản lý toàn diện và có khả năng mở rộng cao để chạy Apache Spark, Apache Flink, Presto và nhiều công cụ cũng như khung nguồn mở khác. Hãy sử dụng Dataproc để hiện đại hoá hồ dữ liệu, ETL / ELT và khoa học dữ liệu an toàn ở quy mô toàn cầu. Dataproc cũng được tích hợp đầy đủ với một số dịch vụ của Google Cloud, bao gồm BigQuery, Cloud Storage, Vertex AI và Dataplex.
Dataproc có 3 phiên bản:
- Dataproc Serverless cho phép bạn chạy các công việc PySpark mà không cần định cấu hình cơ sở hạ tầng và tính năng tự động mở rộng quy mô. Dataproc Serverless hỗ trợ các phiên và sổ ghi chép / tải công việc hàng loạt PySpark.
- Dataproc trên Google Compute Engine cho phép bạn quản lý một cụm Hadoop YARN cho các tải công việc Spark dựa trên YARN, ngoài các công cụ nguồn mở như Flink và Presto. Bạn có thể điều chỉnh các cụm hoạt động trên nền tảng đám mây với khả năng mở rộng quy mô theo chiều dọc hoặc chiều ngang tuỳ ý, bao gồm cả phương thức tự động cấp tài nguyên bổ sung.
- Dataproc trên Google Kubernetes Engine cho phép bạn định cấu hình các cụm ảo Dataproc trong cơ sở hạ tầng GKE để gửi các công việc Spark, PySpark, SparkR hoặc Spark SQL.
Trong lớp học lập trình này, bạn sẽ tìm hiểu một số cách để sử dụng Dataproc Serverless.
Apache Spark ban đầu được xây dựng để chạy trên các cụm Hadoop và sử dụng YARN làm trình quản lý tài nguyên. Việc duy trì các cụm Hadoop đòi hỏi một bộ kỹ năng cụ thể và đảm bảo rằng nhiều nút khác nhau trên các cụm được định cấu hình đúng cách. Đây là ngoài một bộ nút riêng biệt mà Spark cũng yêu cầu người dùng đặt. Điều này dẫn đến nhiều trường hợp nhà phát triển dành nhiều thời gian hơn để định cấu hình cơ sở hạ tầng thay vì làm việc trên chính mã Spark.
Dataproc Serverless giúp bạn không cần định cấu hình cụm Hadoop hoặc Spark theo cách thủ công. Dataproc Serverless không chạy trên Hadoop và sử dụng tính năng Phân bổ tài nguyên động của riêng mình để xác định các yêu cầu về tài nguyên, bao gồm cả phương thức tự động cấp tài nguyên bổ sung. Bạn vẫn có thể tuỳ chỉnh một nhóm nhỏ các thuộc tính Spark bằng Dataproc Serverless, tuy nhiên, trong hầu hết các trường hợp, bạn sẽ không cần điều chỉnh các thuộc tính này.
2. Thiết lập
Bạn sẽ bắt đầu bằng cách định cấu hình môi trường và tài nguyên được sử dụng trong lớp học lập trình này.
Tạo một dự án trên Google Cloud. Bạn có thể sử dụng một dự án hiện có.
Mở Cloud Shell bằng cách nhấp vào biểu tượng này trên thanh công cụ Cloud Console.

Cloud Shell cung cấp một môi trường Shell sẵn sàng sử dụng mà bạn có thể dùng cho lớp học lập trình này.

Cloud Shell sẽ đặt tên dự án của bạn theo mặc định. Kiểm tra kỹ bằng cách chạy echo $GOOGLE_CLOUD_PROJECT. Nếu bạn không thấy mã dự án trong kết quả, hãy đặt mã dự án.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Đặt một khu vực Compute Engine cho các tài nguyên của bạn, chẳng hạn như us-central1 hoặc europe-west2.
export REGION=<your-region>
Bật API
Lớp học lập trình này sử dụng các API sau:
- BigQuery
- Dataproc
Bật các API cần thiết. Quá trình này sẽ mất khoảng một phút và một thông báo thành công sẽ xuất hiện khi hoàn tất.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Định cấu hình quyền truy cập mạng
Dataproc Serverless yêu cầu bật tính năng Truy cập riêng tư của Google trong khu vực mà bạn sẽ chạy các công việc Spark, vì trình điều khiển và trình thực thi Spark chỉ có IP riêng tư. Chạy lệnh sau để bật tính năng này trong mạng con default.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
Bạn có thể xác minh rằng tính năng Truy cập riêng tư của Google đã được bật thông qua lệnh sau, lệnh này sẽ xuất ra True hoặc False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Tạo một bộ chứa lưu trữ
Tạo một bộ chứa lưu trữ sẽ được dùng để lưu trữ các thành phần được tạo trong lớp học lập trình này.
Chọn tên cho bộ chứa. Tên bộ chứa phải là duy nhất trên toàn cầu đối với tất cả người dùng.
export BUCKET=<your-bucket-name>
Tạo bộ chứa trong khu vực mà bạn dự định chạy các công việc Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Bạn có thể thấy bộ chứa của mình có trong bảng điều khiển Cloud Storage console. Bạn cũng có thể chạy gsutil ls để xem bộ chứa.
Tạo một máy chủ nhật ký liên tục
Giao diện người dùng Spark cung cấp một bộ công cụ gỡ lỗi và thông tin chi tiết phong phú về các công việc Spark. Để xem giao diện người dùng Spark cho các công việc Dataproc Serverless đã hoàn tất, bạn phải tạo một cụm Dataproc một nút để sử dụng làm máy chủ nhật ký liên tục.
Đặt tên cho máy chủ nhật ký liên tục.
PHS_CLUSTER_NAME=my-phs
Chạy lệnh sau.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Giao diện người dùng Spark và máy chủ nhật ký liên tục sẽ được khám phá chi tiết hơn sau trong lớp học lập trình.
3. Chạy các công việc Spark không máy chủ bằng Dataproc Batches
Trong mẫu này, bạn sẽ làm việc với một tập hợp dữ liệu từ tập dữ liệu công khai New York City (NYC) Citi Bike Trips. NYC Citi Bikes là một hệ thống chia sẻ xe đạp có trả phí trong NYC. Bạn sẽ thực hiện một số phép biến đổi đơn giản và in 10 mã trạm Citi Bike phổ biến nhất. Đáng chú ý là mẫu này cũng sử dụng trình kết nối nguồn mở spark-bigquery-connector để đọc và ghi dữ liệu một cách liền mạch giữa Spark và BigQuery.
Sao chép kho lưu trữ Github sau đây và cd vào thư mục chứa tệp citibike.py.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Gửi công việc đến Spark không máy chủ bằng Cloud SDK (có sẵn trong Cloud Shell theo mặc định). Chạy lệnh sau trong shell của bạn. Lệnh này sử dụng Cloud SDK và Dataproc Batches API để gửi các công việc Spark không máy chủ.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Để phân tích điều này:
gcloud dataproc batches submittham chiếu đến Dataproc Batches API.pysparkcho biết rằng bạn đang gửi một công việc PySpark.--batchlà tên của công việc. Nếu bạn không cung cấp, hệ thống sẽ sử dụng UUID được tạo ngẫu nhiên.--region=${REGION}là khu vực địa lý mà công việc sẽ được xử lý.--deps-bucket=${BUCKET}là nơi tệp Python cục bộ của bạn được tải lên trước khi chạy trong môi trường không máy chủ.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarbao gồm tệp jar cho trình kết nối spark-bigquery-connector trong môi trường thời gian chạy Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}là tên đủ điều kiện của máy chủ nhật ký liên tục. Đây là nơi lưu trữ dữ liệu sự kiện Spark (tách biệt với kết quả xuất ra trên bảng điều khiển) và có thể xem được từ giao diện người dùng Spark.- Dấu
--ở cuối cho biết rằng mọi thứ vượt quá dấu này sẽ là đối số thời gian chạy cho chương trình. Trong trường hợp này, bạn đang gửi tên của bộ chứa theo yêu cầu của công việc.
Bạn sẽ thấy kết quả sau khi gửi lô.
Batch [citibike-job] submitted.
Sau vài phút, bạn sẽ thấy kết quả sau cùng với siêu dữ liệu từ công việc.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Trong phần tiếp theo, bạn sẽ tìm hiểu cách xác định vị trí nhật ký cho công việc này.
Các tính năng khác
Với Spark Serverless, bạn có thêm các lựa chọn để chạy công việc.
- Bạn có thể tạo một hình ảnh Docker tuỳ chỉnh mà công việc của bạn chạy trên đó. Đây là một cách tuyệt vời để đưa thêm các phần phụ thuộc, bao gồm cả thư viện Python và R.
- Bạn có thể kết nối một thực thể Dataproc Metastore với công việc của mình để truy cập vào siêu dữ liệu Hive.
- Để có thêm quyền kiểm soát, Dataproc Serverless hỗ trợ việc định cấu hình một nhóm nhỏ các thuộc tính Spark.
4. Chỉ số và khả năng quan sát Dataproc
Bảng điều khiển Dataproc Batches liệt kê tất cả các công việc Dataproc Serverless của bạn. Trong bảng điều khiển, bạn sẽ thấy Mã lô, Vị trí, Trạng thái, Thời gian tạo, Thời gian đã trôi qua và Loại của mỗi công việc. Nhấp vào Mã lô của công việc để xem thêm thông tin về công việc đó.
Trên trang này, bạn sẽ thấy thông tin như Giám sát cho biết số lượng Trình thực thi Spark theo lô mà công việc của bạn đã sử dụng theo thời gian (cho biết mức độ tự động mở rộng quy mô).
Trên thẻ Chi tiết , bạn sẽ thấy thêm siêu dữ liệu về công việc, bao gồm mọi đối số và tham số đã được gửi cùng với công việc.
Bạn cũng có thể truy cập vào tất cả nhật ký từ trang này. Khi các công việc Dataproc Serverless được chạy, 3 tập hợp nhật ký khác nhau sẽ được tạo:
- Cấp dịch vụ
- Kết quả xuất ra trên bảng điều khiển
- Ghi nhật ký sự kiện Spark
Cấp dịch vụ bao gồm các nhật ký mà dịch vụ Dataproc Serverless đã tạo. Các nhật ký này bao gồm những nội dung như Dataproc không máy chủ yêu cầu thêm CPU để phương thức tự động cấp tài nguyên bổ sung. Bạn có thể xem các nhật ký này bằng cách nhấp vào Xem nhật ký để mở Cloud Logging.
Bạn có thể xem Kết quả xuất ra trên bảng điều khiển trong phần Đầu ra. Đây là kết quả do công việc tạo ra, bao gồm cả siêu dữ liệu mà Spark in khi bắt đầu một công việc hoặc bất kỳ câu lệnh in nào được kết hợp vào công việc.
Bạn có thể truy cập vào Ghi nhật ký sự kiện Spark từ giao diện người dùng Spark. Vì bạn đã cung cấp cho công việc Spark một máy chủ nhật ký liên tục, nên bạn có thể truy cập vào giao diện người dùng Spark bằng cách nhấp vào Xem máy chủ nhật ký Spark. Máy chủ này chứa thông tin cho các công việc Spark mà bạn đã chạy trước đó. Bạn có thể tìm hiểu thêm về giao diện người dùng Spark trong tài liệu chính thức về Spark.
5. Mẫu Dataproc: BQ -> GCS
Mẫu Dataproc là các công cụ nguồn mở giúp đơn giản hoá hơn nữa các tác vụ xử lý dữ liệu trong đám mây. Các mẫu này đóng vai trò là trình bao bọc cho Dataproc Serverless và bao gồm các mẫu cho nhiều tác vụ nhập dữ liệu và xuất dữ liệu, bao gồm:
BigQuerytoGCSvàGCStoBigQueryGCStoBigTableGCStoJDBCvàJDBCtoGCSHivetoBigQueryMongotoGCSvàGCStoMongo
Bạn có thể xem danh sách đầy đủ trong README.
Trong phần này, bạn sẽ sử dụng Mẫu Dataproc để xuất dữ liệu từ BigQuery sang GCS.
Nhân bản kho lưu trữ
Sao chép kho lưu trữ và chuyển sang thư mục python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Định cấu hình môi trường
Bây giờ, bạn sẽ đặt các biến môi trường. Mẫu Dataproc sử dụng biến môi trường GCP_PROJECT cho mã dự án của bạn, vì vậy, hãy đặt biến này bằng GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Khu vực của bạn phải được đặt trong môi trường từ trước. Nếu không, hãy đặt tại đây.
export REGION=<region>
Mẫu Dataproc sử dụng trình kết nối spark-bigquery-conector để xử lý các công việc BigQuery và yêu cầu URI được đưa vào một biến môi trường JARS. Đặt biến JARS.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Định cấu hình tham số mẫu
Đặt tên cho một bộ chứa dàn dựng để dịch vụ sử dụng.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Tiếp theo, bạn sẽ đặt một số biến dành riêng cho công việc. Đối với bảng đầu vào, bạn sẽ tham chiếu lại tập dữ liệu BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Bạn có thể chọn csv, parquet, avro hoặc json. Đối với lớp học lập trình này, hãy chọn CSV – trong phần tiếp theo, bạn sẽ tìm hiểu cách sử dụng Mẫu Dataproc để chuyển đổi các loại tệp.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Đặt chế độ đầu ra thành overwrite. Bạn có thể chọn giữa overwrite, append, ignore hoặc errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Đặt vị trí đầu ra GCS thành một đường dẫn trong bộ chứa của bạn.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Chạy mẫu
Chạy mẫu BIGQUERYTOGCS bằng cách chỉ định mẫu đó bên dưới và cung cấp các tham số đầu vào mà bạn đã đặt.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Kết quả đầu ra sẽ khá nhiều, nhưng sau khoảng một phút, bạn sẽ thấy kết quả sau.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Bạn có thể xác minh rằng các tệp đã được tạo bằng cách chạy lệnh sau.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Theo mặc định, Spark ghi vào nhiều tệp, tuỳ thuộc vào lượng dữ liệu. Trong trường hợp này, bạn sẽ thấy khoảng 30 tệp được tạo. Tên tệp đầu ra Spark được định dạng bằng part- theo sau là một số có 5 chữ số (cho biết số phần) và một chuỗi băm. Đối với lượng dữ liệu lớn, Spark thường sẽ ghi vào một số tệp. Ví dụ về tên tệp là part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Mẫu Dataproc: CSV sang parquet
Bây giờ, bạn sẽ sử dụng Mẫu Dataproc để chuyển đổi dữ liệu trong GCS từ một loại tệp sang một loại tệp khác bằng GCSTOGCS. Mẫu này sử dụng SparkSQL và cung cấp lựa chọn gửi một truy vấn SparkSQL để được xử lý trong quá trình chuyển đổi để xử lý thêm.
Xác nhận các biến môi trường
Xác nhận rằng GCP_PROJECT, REGION và GCS_STAGING_BUCKET được đặt từ phần trước.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Đặt tham số mẫu
Bây giờ, bạn sẽ đặt các tham số cấu hình cho GCStoGCS. Bắt đầu với vị trí của các tệp đầu vào. Xin lưu ý rằng đây là một thư mục chứ không phải là một tệp cụ thể vì tất cả các tệp trong thư mục sẽ được xử lý. Đặt giá trị này thành BIGQUERY_GCS_OUTPUT_LOCATION.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Đặt định dạng của tệp đầu vào.
GCS_TO_GCS_INPUT_FORMAT=csv
Đặt định dạng đầu ra mong muốn. Bạn có thể chọn parquet, json, avro hoặc csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Đặt chế độ đầu ra thành overwrite. Bạn có thể chọn giữa overwrite, append, ignore hoặc errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Đặt vị trí đầu ra.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Chạy mẫu
Chạy mẫu GCStoGCS.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Kết quả đầu ra sẽ khá nhiều, nhưng sau khoảng một phút, bạn sẽ thấy một thông báo thành công như bên dưới.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Bạn có thể xác minh rằng các tệp đã được tạo bằng cách chạy lệnh sau.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Với mẫu này, bạn cũng có thể cung cấp các truy vấn SparkSQL bằng cách truyền gcs.to.gcs.temp.view.name và gcs.to.gcs.sql.query vào mẫu, cho phép chạy một truy vấn SparkSQL trên dữ liệu trước khi ghi vào GCS.
7. Dọn dẹp tài nguyên
Để tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP sau khi hoàn tất lớp học lập trình này:
- Xoá bộ chứa Cloud Storage cho môi trường mà bạn đã tạo.
gsutil rm -r gs://${BUCKET}
- Xoá cụm Dataproc dùng cho máy chủ nhật ký liên tục.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Xoá các công việc Dataproc Serverless. Chuyển đến Bảng điều khiển Batches, nhấp vào hộp bên cạnh mỗi công việc mà bạn muốn xoá, rồi nhấp vào XOÁ.
Nếu bạn chỉ tạo một dự án cho lớp học lập trình này, thì bạn cũng có thể xoá dự án đó (không bắt buộc):
- Trong Bảng điều khiển GCP, hãy chuyển đến trang Projects.
- Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Delete (Xoá).
- Trong hộp, hãy nhập mã dự án, rồi nhấp vào Shut down (Tắt) để xoá dự án.
8. Bước tiếp theo
Các tài nguyên sau đây cung cấp thêm các cách để bạn có thể tận dụng Spark không máy chủ:
- Tìm hiểu cách điều phối quy trình công việc Dataproc Serverless bằng Cloud Composer.
- Tìm hiểu cách tích hợp Dataproc Serverless với quy trình Kubeflow.