Sổ tay Apache Spark và Jupyter trên Cloud Dataproc

1. Tổng quan

Lớp học lập trình này sẽ trình bày cách thiết lập và sử dụng Apache SparkJupyter Notebook trên Cloud Dataproc.

Sổ tay Jupyter được sử dụng rộng rãi để phân tích dữ liệu khám phá và xây dựng các mô hình học máy vì chúng cho phép bạn chạy mã một cách tương tác và xem kết quả ngay lập tức.

Tuy nhiên, việc thiết lập và sử dụng Apache Spark và Jupyter Notebook có thể phức tạp.

b9ed855863c57d6.png

Cloud Dataproc giúp bạn thực hiện việc này một cách nhanh chóng và dễ dàng bằng cách cho phép bạn tạo một Cụm Dataproc với Apache Spark, thành phần JupyterCổng thành phần trong khoảng 90 giây.

Kiến thức bạn sẽ học được

Trong lớp học lập trình này, bạn sẽ tìm hiểu cách:

  • Tạo một bộ chứa Google Cloud Storage cho cụm của bạn
  • Tạo một Cụm Dataproc bằng Jupyter và Component Gateway,
  • Truy cập vào giao diện người dùng web JupyterLab trên Dataproc
  • Tạo một Sổ tay sử dụng trình kết nối Bộ nhớ BigQuery của Spark
  • Chạy một tác vụ Spark và vẽ biểu đồ kết quả.

Tổng chi phí để chạy bài tập này trên Google Cloud là khoảng 1 USD. Bạn có thể xem toàn bộ thông tin chi tiết về giá của Cloud Dataproc tại đây.

2. Tạo dự án

Đăng nhập vào bảng điều khiển Google Cloud Platform tại console.cloud.google.com rồi tạo một dự án mới:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Tiếp theo, bạn cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.

Việc thực hiện lớp học lập trình này sẽ không tốn của bạn quá vài đô la, nhưng có thể tốn nhiều hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để các tài nguyên đó chạy. Phần cuối cùng của lớp học lập trình này sẽ hướng dẫn bạn cách dọn dẹp dự án.

Người dùng mới của Google Cloud Platform đủ điều kiện dùng thử miễn phí trị giá 300 USD.

3. Thiết lập môi trường

Trước tiên, hãy mở Cloud Shell bằng cách nhấp vào nút ở góc trên cùng bên phải của bảng điều khiển đám mây:

a10c47ee6ca41c54.png

Sau khi Cloud Shell tải, hãy chạy lệnh sau để đặt mã dự án từ bước trước**:**

gcloud config set project <project_id>

Bạn cũng có thể tìm thấy mã dự án bằng cách nhấp vào dự án của mình ở trên cùng bên trái của bảng điều khiển đám mây:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Tiếp theo, hãy bật Dataproc, Compute Engine và BigQuery Storage API.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Ngoài ra, bạn có thể thực hiện việc này trong Cloud Console. Nhấp vào biểu tượng trình đơn ở trên cùng bên trái màn hình.

2bfc27ef9ba2ec7d.png

Chọn API Manager trong trình đơn thả xuống.

408af5f32c4b7c25.png

Nhấp vào Bật API và dịch vụ.

a9c0e84296a7ba5b.png

Tìm kiếm và bật các API sau:

  • Compute Engine API
  • Dataproc API
  • API BigQuery
  • BigQuery Storage API

4. Tạo một vùng lưu trữ GCS

Tạo một bộ chứa Google Cloud Storage ở khu vực gần với dữ liệu của bạn nhất và đặt tên riêng cho bộ chứa đó.

Khoá này sẽ được dùng cho cụm Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Bạn sẽ thấy kết quả sau

Creating gs://<your-bucket-name>/...

5. Tạo Cụm Dataproc bằng Jupyter và Component Gateway

Tạo cụm

Đặt các biến môi trường cho cụm

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Sau đó, hãy chạy lệnh gcloud này để tạo cụm có tất cả các thành phần cần thiết để làm việc với Jupyter trên cụm của bạn.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Bạn sẽ thấy kết quả sau đây trong khi cụm của bạn đang được tạo

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Quá trình tạo cụm sẽ mất khoảng 90 giây và sau khi hoàn tất, bạn sẽ có thể truy cập vào cụm của mình từ giao diện người dùng của Dataproc bảng điều khiển Cloud.

Trong thời gian chờ đợi, bạn có thể tiếp tục đọc phần bên dưới để tìm hiểu thêm về các cờ được dùng trong lệnh gcloud.

Bạn sẽ thấy kết quả sau đây sau khi tạo cụm:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Các cờ được dùng trong lệnh gcloud dataproc create

Sau đây là thông tin chi tiết về các cờ được dùng trong lệnh gcloud dataproc create

--region=${REGION}

Chỉ định khu vực và vùng nơi cụm sẽ được tạo. Bạn có thể xem danh sách các khu vực được hỗ trợ tại đây.

--image-version=1.4

Phiên bản hình ảnh sẽ dùng trong cụm. Bạn có thể xem danh sách các phiên bản có sẵn tại đây.

--bucket=${BUCKET_NAME}

Chỉ định bộ chứa Google Cloud Storage mà bạn đã tạo trước đó để dùng cho cụm. Nếu bạn không cung cấp một vùng lưu trữ GCS, thì vùng lưu trữ đó sẽ được tạo cho bạn.

Đây cũng là nơi sổ tay của bạn sẽ được lưu ngay cả khi bạn xoá cụm vì vùng chứa GCS không bị xoá.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Các loại máy dùng cho cụm Dataproc. Bạn có thể xem danh sách các loại máy có sẵn tại đây.

Theo mặc định, 1 nút chính và 2 nút worker sẽ được tạo nếu bạn không đặt cờ –num-workers

--optional-components=ANACONDA,JUPYTER

Việc đặt các giá trị này cho các thành phần không bắt buộc sẽ cài đặt tất cả các thư viện cần thiết cho Jupyter và Anaconda (bắt buộc đối với sổ tay Jupyter) trên cụm của bạn.

--enable-component-gateway

Việc bật Component Gateway (Cổng thành phần) sẽ tạo một đường liên kết App Engine bằng Apache Knox và Proxy đảo ngược, giúp bạn dễ dàng, an toàn và được xác thực khi truy cập vào giao diện web Jupyter và JupyterLab, tức là bạn không cần tạo đường hầm SSH nữa.

Thao tác này cũng sẽ tạo các đường liên kết cho những công cụ khác trên cụm, bao gồm cả Yarn Resource Manager và Spark History Server. Đây là những công cụ hữu ích để xem hiệu suất của các công việc và mẫu sử dụng cụm.

6. Tạo sổ tay Apache Spark

Truy cập vào giao diện web JupyterLab

Sau khi cụm sẵn sàng, bạn có thể tìm thấy đường liên kết Component Gateway (Cổng thành phần) đến giao diện web JupyterLab bằng cách chuyển đến Dataproc Clusters (Cụm Dataproc) – bảng điều khiển Cloud, nhấp vào cụm bạn đã tạo rồi chuyển đến thẻ Web Interfaces (Giao diện web).

afc40202d555de47.png

Bạn sẽ thấy rằng bạn có quyền truy cập vào Jupyter (giao diện sổ tay cổ điển) hoặc JupyterLab (được mô tả là giao diện người dùng thế hệ tiếp theo cho Dự án Jupyter).

JupyterLab có rất nhiều tính năng mới tuyệt vời về giao diện người dùng. Vì vậy, nếu mới sử dụng sổ ghi chép hoặc đang tìm kiếm những điểm cải tiến mới nhất, bạn nên sử dụng JupyterLab vì theo tài liệu chính thức, JupyterLab sẽ thay thế giao diện Jupyter cổ điển.

Tạo sổ tay bằng nhân Python 3

a463623f2ebf0518.png

Trên thẻ trình chạy, hãy nhấp vào biểu tượng sổ tay Python 3 để tạo một sổ tay có nhân Python 3 (không phải nhân PySpark). Thao tác này cho phép bạn định cấu hình SparkSession trong sổ tay và thêm spark-bigquery-connector cần thiết để sử dụng BigQuery Storage API.

Đổi tên sổ tay

196a3276ed07e1f3.png

Nhấp chuột phải vào tên sổ tay trong thanh bên ở bên trái hoặc thanh điều hướng trên cùng rồi đổi tên sổ tay thành "BigQuery Storage & Spark DataFrames.ipynb"

Chạy mã Spark trong sổ tay

fbac38062e5bb9cf.png

Trong sổ tay này, bạn sẽ sử dụng spark-bigquery-connector. Đây là một công cụ để đọc và ghi dữ liệu giữa BigQuery và Spark bằng cách sử dụng BigQuery Storage API.

BigQuery Storage API mang đến những cải tiến đáng kể cho việc truy cập dữ liệu trong BigQuery bằng cách sử dụng một giao thức dựa trên RPC. Nó hỗ trợ việc đọc và ghi dữ liệu song song cũng như nhiều định dạng chuyển đổi tuần tự như Apache AvroApache Arrow. Nhìn chung, điều này giúp cải thiện đáng kể hiệu suất, đặc biệt là trên các tập dữ liệu lớn hơn.

Trong ô đầu tiên, hãy kiểm tra phiên bản Scala của cụm để bạn có thể thêm phiên bản chính xác của tệp jar spark-bigquery-connector.

Đầu vào [1]:

!scala -version

Đầu ra [1]:f580e442576b8b1f.png Tạo một phiên Spark và thêm gói spark-bigquery-connector.

Nếu phiên bản Scala của bạn là 2.11, hãy sử dụng gói sau.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Nếu phiên bản Scala của bạn là 2.12, hãy sử dụng gói sau.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Đầu vào [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Bật repl.eagerEval

Thao tác này sẽ xuất kết quả của DataFrame trong từng bước mà không cần hiển thị df.show() và cũng cải thiện định dạng của đầu ra.

Đầu vào [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Đọc bảng BigQuery vào Spark DataFrame

Tạo Spark DataFrame bằng cách đọc dữ liệu từ một tập dữ liệu công khai trên BigQuery. Thao tác này sử dụng spark-bigquery-connector và BigQuery Storage API để tải dữ liệu vào cụm Spark.

Tạo Spark DataFrame và tải dữ liệu từ tập dữ liệu công khai BigQuery cho số lượt xem trang trên Wikipedia. Bạn sẽ nhận thấy rằng bạn không chạy truy vấn trên dữ liệu vì bạn đang sử dụng spark-bigquery-connector để tải dữ liệu vào Spark, nơi quá trình xử lý dữ liệu sẽ diễn ra. Khi chạy, mã này sẽ không thực sự tải bảng vì đây là một quy trình đánh giá trì hoãn trong Spark và quá trình thực thi sẽ diễn ra ở bước tiếp theo.

Đầu vào [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Đầu ra [4]:

c107a33f6fc30ca.png

Chọn các cột bắt buộc và áp dụng bộ lọc bằng cách sử dụng where() (còn gọi là filter()).

Khi mã này chạy, mã sẽ kích hoạt một thao tác Spark và dữ liệu sẽ được đọc từ BigQuery Storage tại thời điểm này.

Đầu vào [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Đầu ra [5]:

ad363cbe510d625a.png

Nhóm theo tiêu đề và sắp xếp theo số lượt xem trang để xem những trang hàng đầu

Đầu vào [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Đầu ra [6]:f718abd05afc0f4.png

7. Sử dụng các thư viện vẽ biểu đồ Python trong sổ tay

Bạn có thể sử dụng nhiều thư viện vẽ biểu đồ có trong Python để vẽ biểu đồ đầu ra của các công việc Spark.

Chuyển đổi Spark DataFrame thành Pandas DataFrame

Chuyển đổi Spark DataFrame thành Pandas DataFrame và đặt datehour làm chỉ mục. Điều này sẽ hữu ích nếu bạn muốn làm việc trực tiếp với dữ liệu trong Python và vẽ biểu đồ dữ liệu bằng nhiều thư viện vẽ biểu đồ Python hiện có.

Đầu vào [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Đầu ra [7]:

3df2aaa2351f028d.png

Vẽ biểu đồ Pandas Dataframe

Nhập thư viện matplotlib cần thiết để hiển thị các biểu đồ trong sổ tay

Nhập [8]:

import matplotlib.pyplot as plt

Sử dụng hàm vẽ Pandas để tạo biểu đồ dạng đường từ Pandas DataFrame.

Đầu vào [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Đầu ra [9]:bade7042c3033594.png

Kiểm tra xem sổ tay đã được lưu trong GCS hay chưa

Giờ đây, bạn đã có sổ tay Jupyter đầu tiên hoạt động trên cụm Dataproc. Đặt tên cho sổ tay của bạn và sổ tay đó sẽ tự động được lưu vào vùng lưu trữ GCS được dùng khi tạo cụm.

Bạn có thể kiểm tra bằng lệnh gsutil này trong Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Bạn sẽ thấy kết quả sau

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Mẹo tối ưu hoá – Lưu dữ liệu vào bộ nhớ đệm trong bộ nhớ

Có thể có những trường hợp bạn muốn dữ liệu trong bộ nhớ thay vì đọc từ Bộ nhớ BigQuery mỗi lần.

Tác vụ này sẽ đọc dữ liệu từ BigQuery và đẩy bộ lọc vào BigQuery. Sau đó, quá trình tổng hợp sẽ được tính toán trong Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Bạn có thể sửa đổi công việc ở trên để thêm bộ nhớ đệm cho bảng. Giờ đây, bộ lọc trên cột wiki sẽ được Apache Spark áp dụng trong bộ nhớ.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Sau đó, bạn có thể lọc theo một ngôn ngữ wiki khác bằng cách sử dụng dữ liệu được lưu vào bộ nhớ đệm thay vì đọc lại dữ liệu từ bộ nhớ BigQuery. Do đó, quá trình này sẽ chạy nhanh hơn nhiều.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Bạn có thể xoá bộ nhớ đệm bằng cách chạy

df_wiki_all.unpersist()

9. Sổ tay mẫu cho nhiều trường hợp sử dụng khác

Kho lưu trữ Cloud Dataproc GitHub có các sổ tay Jupyter với các mẫu Apache Spark phổ biến để tải dữ liệu, lưu dữ liệu và vẽ biểu đồ dữ liệu bằng nhiều sản phẩm của Google Cloud Platform và công cụ nguồn mở:

10. Dọn dẹp

Để tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP sau khi hoàn tất hướng dẫn bắt đầu nhanh này, hãy làm như sau:

  1. Xoá bộ chứa Cloud Storage cho môi trường mà bạn đã tạo
  2. Xoá môi trường Dataproc.

Nếu chỉ tạo một dự án cho lớp học lập trình này, bạn cũng có thể xoá dự án (không bắt buộc):

  1. Trong Bảng điều khiển của GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.

Giấy phép

Tác phẩm này được cấp phép theo giấy phép Ghi công theo Creative Commons 3.0 và giấy phép Apache 2.0.