Không máy chủ Dataproc

1. Tổng quan – Google Dataproc

Dataproc là một dịch vụ được quản lý hoàn toàn và có khả năng mở rộng cao để chạy Apache Spark, Apache Flink, Presto cũng như nhiều công cụ và khung nguồn mở khác. Sử dụng Dataproc để hiện đại hoá hồ dữ liệu, ETL / ELT và khoa học dữ liệu bảo mật trên quy mô hành tinh. Dataproc cũng được tích hợp đầy đủ với một số dịch vụ của Google Cloud, trong đó có BigQuery, Cloud Storage, Vertex AIDataplex.

Dataproc có 3 phiên bản:

  • Dataproc Serverless cho phép bạn chạy các công việc PySpark mà không cần định cấu hình cơ sở hạ tầng và phương thức tự động cấp tài nguyên bổ sung. Dataproc Serverless hỗ trợ các tải công việc theo lô và các phiên / sổ tay PySpark.
  • Dataproc trên Google Compute Engine cho phép bạn quản lý một cụm Hadoop YARN cho các tải công việc Spark dựa trên YARN, bên cạnh các công cụ nguồn mở như Flink và Presto. Bạn có thể điều chỉnh các cụm trên đám mây với tỷ lệ tuỳ ý theo chiều dọc hoặc chiều ngang, bao gồm cả tính năng tự động cấp tài nguyên bổ sung.
  • Dataproc trên Google Kubernetes Engine cho phép bạn định cấu hình các cụm ảo Dataproc trong cơ sở hạ tầng GKE của bạn để gửi các công việc Spark, PySpark, SparkR hoặc Spark SQL.

Trong lớp học lập trình này, bạn sẽ tìm hiểu một số cách sử dụng Dataproc Serverless.

Apache Spark ban đầu được xây dựng để chạy trên các cụm Hadoop và sử dụng YARN làm trình quản lý tài nguyên. Việc duy trì các cụm Hadoop đòi hỏi một nhóm chuyên môn cụ thể và đảm bảo nhiều nút bấm trên các cụm được định cấu hình đúng cách. Đây là sản phẩm bổ sung cho một bộ nút riêng biệt mà Spark cũng yêu cầu người dùng đặt. Điều này dẫn đến nhiều tình huống trong đó nhà phát triển mất nhiều thời gian hơn để định cấu hình cơ sở hạ tầng thay vì tự xử lý mã Spark.

Dataproc Serverless giúp bạn không cần phải định cấu hình các cụm Hadoop hoặc Spark theo cách thủ công. Dataproc Serverless không chạy trên Hadoop mà sử dụng chức năng Phân bổ tài nguyên động của riêng mình để xác định các yêu cầu về tài nguyên, bao gồm cả việc tự động cấp tài nguyên bổ sung. Một nhóm nhỏ các thuộc tính Spark vẫn có thể tuỳ chỉnh được bằng Dataproc Serverless. Tuy nhiên, trong hầu hết các trường hợp, bạn sẽ không cần phải tinh chỉnh các thuộc tính này.

2. Thiết lập

Bạn sẽ bắt đầu bằng cách định cấu hình môi trường và tài nguyên dùng trong lớp học lập trình này.

Tạo một dự án trên Google Cloud. Bạn có thể sử dụng mã hiện có.

Mở Cloud Shell bằng cách nhấp vào biểu tượng này trong thanh công cụ Cloud Console.

ba0bb17945a73543.png

Cloud Shell cung cấp một môi trường Shell sẵn sàng sử dụng mà bạn có thể sử dụng cho lớp học lập trình này.

68c4ebd2a8539764.pngs

Theo mặc định, Cloud Shell sẽ đặt tên dự án của bạn. Hãy kiểm tra kỹ bằng cách chạy echo $GOOGLE_CLOUD_PROJECT. Nếu bạn không thấy mã dự án trong kết quả, hãy đặt mã đó.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Thiết lập một khu vực trong Compute Engine cho tài nguyên của bạn, chẳng hạn như us-central1 hoặc europe-west2.

export REGION=<your-region>

Bật API

Lớp học lập trình sử dụng các API sau:

  • BigQuery
  • Dataproc

Bật các API cần thiết. Quá trình này sẽ mất khoảng một phút và thông báo thành công sẽ xuất hiện khi hoàn tất.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Định cấu hình quyền truy cập mạng

Dataproc Serverless yêu cầu bật Google Private Access (Quyền truy cập riêng tư của Google) ở khu vực mà bạn sẽ chạy các công việc Spark của mình vì trình điều khiển và người thực thi của Spark chỉ có IP riêng tư. Chạy lệnh sau để bật trong mạng con default.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Bạn có thể xác minh rằng bạn đã bật Quyền truy cập riêng tư của Google thông qua các lệnh sau đây để xuất ra True hoặc False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Tạo bộ chứa lưu trữ

Tạo một bộ chứa lưu trữ dùng để lưu trữ tài sản được tạo trong lớp học lập trình này.

Chọn tên cho bộ chứa của bạn. Tên bộ chứa phải là tên duy nhất trên toàn cầu cho tất cả người dùng.

export BUCKET=<your-bucket-name>

Tạo bộ chứa trong khu vực bạn dự định chạy công việc Spark của mình.

gsutil mb -l ${REGION} gs://${BUCKET}

Bạn có thể thấy rằng bộ chứa của mình đã có trong bảng điều khiển Cloud Storage. Bạn cũng có thể chạy gsutil ls để xem bộ chứa của mình.

Tạo máy chủ nhật ký liên tục

Giao diện người dùng Spark cung cấp một bộ công cụ gỡ lỗi phong phú và thông tin chi tiết về công việc Spark. Để xem giao diện người dùng Spark cho các công việc Dataproc Serverless đã hoàn tất, bạn phải tạo một cụm Dataproc nút duy nhất để sử dụng làm máy chủ lịch sử ổn định.

Đặt tên cho máy chủ lịch sử cố định.

PHS_CLUSTER_NAME=my-phs

Chạy lệnh sau.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Giao diện người dùng Spark và máy chủ nhật ký cố định sẽ được tìm hiểu chi tiết hơn ở phần sau của lớp học lập trình này.

3. Chạy công việc Spark không máy chủ bằng Lô Dataproc

Trong mẫu này, bạn sẽ làm việc với tập dữ liệu từ tập dữ liệu công khai của danh sách chuyến đi xe đạp thành phố New York (NYC). NYC trích dẫn xe đạp có tính phí tại thành phố New York. Bạn sẽ thực hiện một số thao tác biến đổi đơn giản và in 10 mã trạm xe đạp phổ biến nhất tại Trung Quốc. Một cách đáng chú ý là mẫu này sử dụng spark-bigquery-connector nguồn mở để đọc và ghi dữ liệu một cách liền mạch giữa Spark và BigQuery.

Sao chép kho lưu trữ GitHub và cd vào thư mục chứa tệp citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Gửi công việc đến Serverless Spark bằng Cloud SDK có trong Cloud Shell theo mặc định. Chạy lệnh sau trong shell sử dụng Cloud SDK và Dataproc Batches API để gửi các công việc Serverless Spark.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Để phân tích chi tiết, hãy làm như sau:

  • gcloud dataproc batches submit tham chiếu đến Dataproc Batches API.
  • pyspark biểu thị rằng bạn đang gửi một công việc trên PySpark.
  • --batch là tên của công việc. Nếu không được cung cấp, mã nhận dạng duy nhất (UUID) được tạo ngẫu nhiên sẽ được sử dụng.
  • --region=${REGION} là khu vực địa lý nơi công việc sẽ được xử lý.
  • --deps-bucket=${BUCKET} là nơi bạn tải tệp Python cục bộ lên trước khi chạy trong môi trường Serverless.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar bao gồm tệp jar cho spark-bigquery-connector trong môi trường thời gian chạy Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} là tên đủ điều kiện của máy chủ nhật ký cố định. Đây là nơi dữ liệu sự kiện Spark (tách biệt với kết quả của bảng điều khiển) được lưu trữ và có thể xem được từ giao diện người dùng Spark.
  • -- ở cuối biểu thị mọi nội dung vượt ra ngoài phạm vi này sẽ là đối số thời gian chạy cho chương trình. Trong trường hợp này, bạn đang gửi tên bộ chứa của mình theo yêu cầu của công việc.

Bạn sẽ thấy kết quả sau đây khi gửi lô.

Batch [citibike-job] submitted.

Sau vài phút, bạn sẽ thấy kết quả sau đây cùng với siêu dữ liệu của công việc.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

Trong phần tiếp theo, bạn sẽ tìm hiểu cách tìm nhật ký cho công việc này.

Các tính năng khác

Với Spark Serverless, bạn có thêm lựa chọn để chạy các công việc của mình.

  • Bạn có thể tạo một hình ảnh Docker tuỳ chỉnh để thực hiện công việc của mình. Đây là một cách hay để đưa các phần phụ thuộc khác vào, trong đó có thư viện Python và R.
  • Bạn có thể kết nối một thực thể Dataproc Metastore với công việc của mình để truy cập vào siêu dữ liệu Hive.
  • Để có thêm quyền kiểm soát, Dataproc Serverless hỗ trợ cấu hình của một tập hợp nhỏ các thuộc tính Spark.

4. Các chỉ số và khả năng quan sát của Dataproc

Bảng điều khiển Dataproc Batches liệt kê tất cả công việc Dataproc Serverless. Trong bảng điều khiển, bạn sẽ thấy Mã lô, Vị trí, Trạng thái, Thời gian tạo, Thời gian đã trôi quaLoại của từng công việc. Nhấp vào Mã lô của công việc để xem thêm thông tin về công việc đó.

Trên trang này, bạn sẽ thấy những thông tin như Giám sát, cho biết số lượng Bộ thực thi luồng tia lửa mà công việc của bạn đã sử dụng theo thời gian (cho biết mức độ tự động điều chỉnh theo tỷ lệ).

Trên thẻ Details (Chi tiết), bạn sẽ thấy thêm siêu dữ liệu về công việc, bao gồm mọi đối số và tham số đã được gửi trong công việc.

Bạn cũng có thể truy cập vào mọi nhật ký trên trang này. Khi chạy công việc Dataproc Serverless, 3 bộ nhật ký khác nhau sẽ được tạo:

  • Cấp dịch vụ
  • Kết quả xuất ra trên bảng điều khiển
  • Ghi nhật ký sự kiện Spark

Cấp dịch vụ, bao gồm nhật ký mà dịch vụ Dataproc Serverless đã tạo. Chẳng hạn như Dataproc Serverless yêu cầu thêm CPU để tự động cấp tài nguyên bổ sung. Bạn có thể xem chúng bằng cách nhấp vào phần Xem nhật ký. Thao tác này sẽ mở Cloud Logging.

Bạn có thể xem kết quả trên bảng điều khiển trong phần Kết quả. Đây là kết quả do lệnh tạo ra, bao gồm cả siêu dữ liệu mà Spark in ra khi bắt đầu một lệnh hoặc bất kỳ câu lệnh in nào được kết hợp với lệnh đó.

Bạn có thể truy cập tính năng Ghi nhật ký sự kiện Spark từ giao diện người dùng Spark. Vì bạn đã cung cấp cho công việc Spark của mình một máy chủ lịch sử cố định, bạn có thể truy cập vào giao diện người dùng Spark bằng cách nhấp vào Xem máy chủ nhật ký Spark, nơi chứa thông tin về các công việc Spark đã chạy trước đó của bạn. Bạn có thể tìm hiểu thêm về giao diện người dùng Spark trong tài liệu chính thức về Spark.

5. Mẫu Dataproc: BQ -> GCS

Mẫu Dataproc là các công cụ nguồn mở giúp đơn giản hoá hơn nữa các công việc xử lý dữ liệu trong Cloud. Các tệp này đóng vai trò là một trình bao bọc cho Dataproc Serverless và bao gồm các mẫu cho nhiều tác vụ nhập và xuất dữ liệu, bao gồm:

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

Danh sách đầy đủ có sẵn README.

Trong phần này, bạn sẽ sử dụng Mẫu Dataproc để xuất dữ liệu từ BigQuery sang GCS.

Sao chép kho lưu trữ

Sao chép kho lưu trữ rồi thay đổi vào thư mục python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Định cấu hình môi trường

Bây giờ, bạn sẽ đặt các biến môi trường. Mẫu Dataproc sử dụng biến môi trường GCP_PROJECT cho mã dự án của bạn, vì vậy, hãy đặt giá trị này bằng GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Bạn phải thiết lập khu vực của mình trong môi trường trước đó. Nếu chưa, hãy thiết lập ở đây.

export REGION=<region>

Mẫu Dataproc sử dụng spark-bigquery-conector để xử lý công việc trong BigQuery và yêu cầu URI phải được đưa vào biến môi trường JARS. Đặt biến JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Định cấu hình thông số mẫu

Đặt tên của một bộ chứa thử nghiệm cho dịch vụ sẽ sử dụng.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Tiếp theo, bạn sẽ đặt một số biến dành riêng cho từng công việc. Đối với bảng dữ liệu đầu vào, bạn sẽ lại tham chiếu đến tập dữ liệu BigQuery NYC Cibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Bạn có thể chọn csv, parquet, avro hoặc json. Đối với lớp học lập trình này, hãy chọn CSV – trong phần tiếp theo về cách sử dụng Mẫu Dataproc để chuyển đổi các loại tệp.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Đặt chế độ đầu ra thành overwrite. Bạn có thể chọn overwrite, append, ignore hoặc errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Thiết lập vị trí đầu ra GCS thành một đường dẫn trong bộ chứa.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Chạy mẫu

Chạy mẫu BIGQUERYTOGCS bằng cách chỉ định mẫu đó ở bên dưới và cung cấp các tham số đầu vào mà bạn đã đặt.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Đầu ra sẽ khá ồn nhưng sau khoảng một phút, bạn sẽ thấy như sau.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Bạn có thể xác minh rằng các tệp đã được tạo bằng cách chạy lệnh sau.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Theo mặc định, Spark sẽ ghi vào nhiều tệp, tuỳ thuộc vào lượng dữ liệu. Trong trường hợp này, bạn sẽ thấy khoảng 30 tệp đã tạo. Tên tệp đầu ra Spark được định dạng bằng part, theo sau là một số gồm năm chữ số (cho biết số bộ phận) và một chuỗi băm. Đối với một lượng lớn dữ liệu, Spark thường sẽ ghi vào một số tệp. Tên tệp ví dụ là part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Mẫu Dataproc: CSV vào parquet

Bây giờ, bạn sẽ sử dụng Mẫu Dataproc để chuyển đổi dữ liệu trong GCS từ loại tệp này sang loại tệp khác bằng cách sử dụng GCSTOGCS. Mẫu này sử dụng SparkSQL và cung cấp lựa chọn gửi một truy vấn SparkSQL cần được xử lý trong quá trình chuyển đổi để xử lý thêm.

Xác nhận biến môi trường

Xác nhận rằng GCP_PROJECT, REGIONGCS_STAGING_BUCKET đã được đặt từ phần trước.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Đặt thông số mẫu

Bây giờ, bạn sẽ đặt các tham số cấu hình cho GCStoGCS. Bắt đầu với vị trí của các tệp nhập. Xin lưu ý rằng đây là một thư mục chứ không phải một tệp cụ thể vì tất cả các tệp trong thư mục đó sẽ được xử lý. Đặt giá trị này là BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Đặt định dạng của tệp nhập.

GCS_TO_GCS_INPUT_FORMAT=csv

Đặt định dạng đầu ra mong muốn. Bạn có thể chọn parquet, json, avro hoặc csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Đặt chế độ đầu ra thành overwrite. Bạn có thể chọn overwrite, append, ignore hoặc errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Đặt vị trí đầu ra.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Chạy mẫu

Chạy mẫu GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Đầu ra sẽ khá ồn nhưng sau khoảng một phút, bạn sẽ thấy thông báo thành công như bên dưới.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Bạn có thể xác minh rằng các tệp đã được tạo bằng cách chạy lệnh sau.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Với mẫu này, bạn cũng có thể cung cấp các truy vấn SparkSQL bằng cách truyền gcs.to.gcs.temp.view.namegcs.to.gcs.sql.query vào mẫu, cho phép truy vấn SparkSQL chạy trên dữ liệu trước khi ghi vào GCS.

7. Dọn dẹp tài nguyên

Cách tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP sau khi bạn hoàn thành lớp học lập trình này:

  1. Xoá bộ chứa Cloud Storage đối với môi trường bạn đã tạo.
gsutil rm -r gs://${BUCKET}
  1. Xoá cụm Dataproc dùng cho máy chủ lưu nhật ký cố định.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Xoá các công việc Dataproc Serverless. Chuyển đến Batches Console (Bảng điều khiển lô), nhấp vào hộp bên cạnh từng công việc mà bạn muốn xoá rồi nhấp vào XOÁ.

Nếu đã tạo một dự án chỉ dành cho lớp học lập trình này, bạn cũng có thể xoá dự án đó (không bắt buộc):

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án, sau đó nhấp vào Tắt để xoá dự án.

8. Các bước tiếp theo

Các tài nguyên sau đây cung cấp thêm một số cách giúp bạn có thể tận dụng Serverless Spark: