1. Tổng quan
Phòng thí nghiệm này sẽ đề cập đến cách thiết lập và sử dụng sổ tay Apache Spark và sổ tay Jupyter trên Cloud Dataproc.
Sổ tay Jupyter được sử dụng rộng rãi cho việc phân tích dữ liệu khám phá và xây dựng các mô hình học máy vì chúng cho phép bạn chạy mã tương tác và xem kết quả ngay lập tức.
Tuy nhiên, việc thiết lập và sử dụng Sổ tay Apache Spark và Jupyter có thể phức tạp.
Cloud Dataproc giúp bạn thực hiện việc này nhanh chóng và dễ dàng bằng cách cho phép bạn tạo một cụm Dataproc với Apache Spark, thành phần Jupyter và Component Gateway trong khoảng 90 giây.
Kiến thức bạn sẽ học được
Trong lớp học lập trình này, bạn sẽ tìm hiểu cách:
- Tạo một bộ chứa Google Cloud Storage cho cụm của bạn
- Tạo một cụm Dataproc bằng Jupyter và Component Gateway,
- Truy cập giao diện người dùng web của JupyterLab trên Dataproc
- Tạo Sổ tay sử dụng trình kết nối Spark BigQuery Storage
- Chạy công việc Spark và lên kế hoạch kết quả.
Tổng chi phí để chạy phòng thí nghiệm này trên Google Cloud là khoảng 1 đô la. Bạn có thể xem toàn bộ thông tin chi tiết về mức giá của Cloud Dataproc tại đây.
2. Tạo dự án
Đăng nhập vào bảng điều khiển Google Cloud Platform tại console.cloud.google.com và tạo một dự án mới:
Tiếp theo, bạn sẽ cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên của Google Cloud.
Bạn sẽ không mất quá vài đô la khi chạy qua lớp học lập trình này, nhưng có thể sẽ cao hơn nếu bạn quyết định sử dụng nhiều tài nguyên hơn hoặc nếu bạn để chúng chạy. Phần cuối của lớp học lập trình này sẽ hướng dẫn bạn cách dọn dẹp dự án.
Người dùng mới của Google Cloud Platform đủ điều kiện nhận 300 USD dùng thử miễn phí.
3. Thiết lập môi trường
Trước tiên, hãy mở Cloud Shell bằng cách nhấp vào nút ở góc trên cùng bên phải của bảng điều khiển Cloud:
Sau khi Cloud Shell tải, hãy chạy lệnh sau để đặt mã dự án của bước trước**:**
gcloud config set project <project_id>
Bạn cũng có thể tìm thấy mã dự án bằng cách nhấp vào dự án ở trên cùng bên trái của Cloud Console:
Tiếp theo, hãy bật Dataproc, Compute Engine và BigQuery Storage API.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Bạn cũng có thể thực hiện việc này trong Bảng điều khiển Cloud. Nhấp vào biểu tượng trình đơn ở trên cùng bên trái màn hình.
Chọn Trình quản lý API trong trình đơn thả xuống.
Nhấp vào Enable APIs and Services (Bật API và dịch vụ).
Tìm kiếm và bật các API sau:
- API Compute Engine
- API Dataproc
- API BigQuery
- API Bộ nhớ BigQuery
4. Tạo bộ chứa GCS
Tạo một bộ chứa Google Cloud Storage ở khu vực gần với dữ liệu của bạn nhất và đặt một tên duy nhất cho bộ chứa đó.
Giá trị này sẽ được dùng cho cụm Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Bạn sẽ thấy kết quả sau
Creating gs://<your-bucket-name>/...
5. Tạo cụm Dataproc bằng Jupyter & Cổng thành phần
Đang tạo cụm
Đặt biến môi trường cho cụm của bạn
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Sau đó, chạy lệnh gcloud này để tạo cụm của bạn với tất cả các thành phần cần thiết nhằm hoạt động với Jupyter trên cụm của bạn.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Bạn sẽ thấy kết quả sau đây trong khi tạo cụm
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Quá trình tạo cụm sẽ mất khoảng 90 giây và sau khi cụm đã sẵn sàng, bạn có thể truy cập vào cụm đó qua giao diện người dùng bảng điều khiển Dataproc Cloud.
Trong khi chờ đợi, bạn có thể tiếp tục đọc phần bên dưới để tìm hiểu thêm về các cờ được sử dụng trong lệnh gcloud.
Bạn sẽ nhận được kết quả sau đây sau khi tạo cụm:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Cờ dùng trong lệnh create gcloud dataproc
Sau đây là bảng chi tiết về các cờ được dùng trong lệnh create gcloud dataproc
--region=${REGION}
Chỉ định khu vực và vùng nơi cụm sẽ được tạo. Bạn có thể xem danh sách các khu vực được hỗ trợ tại đây.
--image-version=1.4
Phiên bản hình ảnh để sử dụng trong cụm của bạn. Bạn có thể xem danh sách các phiên bản hiện có tại đây.
--bucket=${BUCKET_NAME}
Chỉ định bộ chứa Google Cloud Storage mà bạn đã tạo trước đó để dùng cho cụm. Nếu bạn không cung cấp bộ chứa GCS, hệ thống sẽ tạo bộ chứa cho bạn.
Đây cũng là nơi sổ tay của bạn sẽ được lưu ngay cả khi bạn xoá cụm vì bộ chứa GCS không bị xoá.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Loại máy sử dụng cho cụm Dataproc. Bạn có thể xem danh sách các loại máy hiện có tại đây.
Theo mặc định, 1 nút chính và 2 nút worker sẽ được tạo nếu bạn không đặt cờ –num-Worker
--optional-components=ANACONDA,JUPYTER
Việc đặt các giá trị này cho các thành phần không bắt buộc sẽ cài đặt tất cả các thư viện cần thiết cho Jupyter và Anaconda (bắt buộc đối với sổ tay Jupyter) trên cụm của bạn.
--enable-component-gateway
Việc bật Cổng thành phần sẽ tạo một đường liên kết đến App Engine bằng cách dùng Apache Knox và Proxy đảo ngược. Nhờ đó, bạn có thể truy cập dễ dàng, an toàn và xác thực vào giao diện web của Jupyter và JupyterLab, tức là bạn không cần phải tạo đường hầm SSH nữa.
Thao tác này cũng sẽ tạo đường liên kết cho các công cụ khác trên cụm (bao gồm cả Trình quản lý tài nguyên Yarn Resource Manager) và Máy chủ nhật ký Spark. Đây là những yếu tố rất hữu ích khi bạn xem hiệu suất của các công việc cũng như mô hình sử dụng cụm.
6. Tạo sổ tay Apache Spark
Truy cập giao diện web JupyterLab
Sau khi cụm đã sẵn sàng, bạn có thể tìm thấy đường liên kết đến Cổng thành phần đến giao diện web JupyterLab bằng cách chuyển đến Dataproc Clusters – Bảng điều khiển Cloud, nhấp vào cụm bạn đã tạo rồi chuyển đến thẻ Giao diện web.
Bạn sẽ nhận thấy rằng mình có quyền truy cập vào Jupyter, giao diện sổ tay cổ điển hoặc JupyterLab được mô tả là giao diện người dùng thế hệ tiếp theo của Project Jupyter.
Có rất nhiều tính năng giao diện người dùng mới tuyệt vời trong JupyterLab và vì vậy nếu bạn mới sử dụng sổ tay hoặc tìm kiếm các cải tiến mới nhất thì bạn nên sử dụng JupyterLab vì cuối cùng, giao diện Jupyter cổ điển theo tài liệu chính thức.
Tạo sổ tay bằng nhân Python 3
Từ thẻ trình chạy, hãy nhấp vào biểu tượng sổ tay Python 3 để tạo sổ tay có nhân Python 3 (không phải nhân PySpark) cho phép bạn định cấu hình SparkSession trong sổ tay đó và thêm spark-bigquery-connector cần thiết để sử dụng API Bộ nhớ BigQuery.
Đổi tên sổ tay
Hãy nhấp chuột phải vào tên sổ tay trong thanh bên ở bên trái hoặc thanh điều hướng trên cùng, rồi đổi tên sổ tay đó thành "BigQuery Storage & Truy cập nhanh DataFrames.ipynb"
Chạy mã Spark trong sổ tay
Trong sổ tay này, bạn sẽ sử dụng spark-bigquery-connector. Đây là một công cụ dùng để đọc và ghi dữ liệu giữa BigQuery và Spark để sử dụng BigQuery Storage API.
BigQuery Storage API mang đến những cải tiến đáng kể đối với việc truy cập dữ liệu trong BigQuery bằng cách sử dụng một giao thức dựa trên RPC. Thư viện này hỗ trợ ghi và đọc dữ liệu song song với nhiều định dạng chuyển đổi tuần tự, chẳng hạn như Apache Avro và Apache Arrow. Ở cấp độ cao, điều này đồng nghĩa với việc hiệu suất được cải thiện đáng kể, đặc biệt là đối với các tập dữ liệu lớn hơn.
Trong ô đầu tiên, hãy kiểm tra phiên bản Scala của cụm để bạn có thể đưa vào đúng phiên bản của lọ spark-bigquery-connector.
Đầu vào [1]:
!scala -version
Đầu ra [1]: Tạo một phiên Spark và bao gồm gói spark-bigquery-connector.
Nếu phiên bản Scala của bạn là 2.11, hãy sử dụng gói sau.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Nếu phiên bản Scala của bạn là 2.12, hãy sử dụng gói sau.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Đầu vào [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Bật repl.eagerEval
Thao tác này sẽ đưa ra kết quả của DataFrames trong mỗi bước mà không cần hiển thị df.show() mới, đồng thời cũng cải thiện định dạng của dữ liệu đầu ra.
Đầu vào [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Đọc bảng BigQuery trong Spark DataFrame
Tạo một Spark DataFrame bằng cách đọc dữ liệu từ một tập dữ liệu BigQuery công khai. Việc này sử dụng spark-bigquery-connector và BigQuery Storage API để tải dữ liệu vào cụm Spark.
Tạo một Spark DataFrame và tải dữ liệu từ tập dữ liệu công khai BigQuery cho số lượt xem trang trên Wikipedia. Bạn sẽ nhận thấy rằng bạn không chạy truy vấn trên dữ liệu khi đang sử dụng spark-bigquery-connector để tải dữ liệu vào Spark nơi việc xử lý dữ liệu sẽ diễn ra. Khi được chạy, mã này sẽ không tải bảng vì đây là quá trình đánh giá từng phần trong Spark và quá trình thực thi sẽ diễn ra trong bước tiếp theo.
Đầu vào [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Kết quả [4]:
Chọn các cột bắt buộc và áp dụng bộ lọc bằng cách sử dụng where()
là bí danh của filter()
.
Khi chạy mã này, mã sẽ kích hoạt hành động Spark và dữ liệu được đọc từ Bộ nhớ BigQuery tại thời điểm này.
Đầu vào [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Kết quả [5]:
Nhóm theo tiêu đề và thứ tự theo lượt xem trang để xem các trang hàng đầu
Đầu vào [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Kết quả [6]:
7. Sử dụng các thư viện vẽ đồ thị Python trong sổ tay
Bạn có thể sử dụng nhiều thư viện vẽ đồ thị có sẵn trong Python để vẽ biểu đồ kết quả cho các công việc trong Spark.
Chuyển đổi Spark DataFrame sang Pandas DataFrame
Chuyển đổi Spark DataFrame sang Pandas DataFrame và đặt ngày giờ làm chỉ mục. Điều này rất hữu ích nếu bạn muốn làm việc trực tiếp với dữ liệu trong Python và vẽ đồ thị dữ liệu bằng nhiều thư viện vẽ đồ thị có sẵn trong Python.
Đầu vào [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Kết quả [7]:
Vẽ biểu đồ cơ sở dữ liệu cho gấu trúc
Nhập thư viện matplotlib, là thư viện bắt buộc để hiển thị các biểu đồ trong sổ tay
Đầu vào [8]:
import matplotlib.pyplot as plt
Dùng hàm vẽ đồ thịPandas để tạo biểu đồ dạng đường trong Pandas DataFrame.
Đầu vào [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Kết quả [9]:
Kiểm tra xem sổ tay có được lưu trong GCS hay không
Bây giờ, bạn sẽ thiết lập và chạy sổ tay Jupyter đầu tiên trên cụm Dataproc. Đặt tên cho sổ tay của bạn và sổ tay đó sẽ tự động được lưu vào bộ chứa GCS mà bạn dùng khi tạo cụm.
Bạn có thể kiểm tra điều này bằng cách sử dụng lệnh NCMEC này trong shell đám mây
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Bạn sẽ thấy kết quả sau
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Mẹo tối ưu hoá – Dữ liệu lưu vào bộ nhớ đệm trong bộ nhớ
Có thể có những trường hợp mà bạn muốn dữ liệu trong bộ nhớ thay vì đọc từ BigQuery Storage mỗi lần.
Công việc này sẽ đọc dữ liệu từ BigQuery và đẩy bộ lọc sang BigQuery. Sau đó, tổng hợp sẽ được tính toán trong Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Bạn có thể sửa đổi công việc ở trên để đưa vào bộ nhớ đệm của bảng và giờ đây, bộ lọc trên cột wiki sẽ được Apache Spark áp dụng trong bộ nhớ.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Sau đó, bạn có thể lọc ngôn ngữ khác trên wiki bằng cách sử dụng dữ liệu được lưu trong bộ nhớ đệm thay vì đọc lại dữ liệu từ bộ nhớ BigQuery và do đó sẽ chạy nhanh hơn nhiều.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Bạn có thể xoá bộ nhớ đệm bằng cách chạy
df_wiki_all.unpersist()
9. Sổ tay mẫu cho các trường hợp sử dụng khác
Kho lưu trữ GitHub của Cloud Dataproc có các sổ tay Jupyter có các mẫu Apache Spark phổ biến để tải dữ liệu, lưu dữ liệu và lập biểu đồ dữ liệu của bạn bằng nhiều sản phẩm Google Cloud Platform và công cụ nguồn mở:
10. Dọn dẹp
Để tránh phát sinh các khoản phí không cần thiết cho tài khoản GCP của bạn sau khi hoàn tất phần bắt đầu nhanh này, hãy làm như sau:
- Xoá bộ chứa Cloud Storage đối với môi trường và bộ chứa bạn đã tạo
- Xoá môi trường Dataproc.
Nếu đã tạo một dự án chỉ dành cho lớp học lập trình này, bạn cũng có thể xoá dự án đó (không bắt buộc):
- Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
- Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
- Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.
Giấy phép
Tác phẩm này được cấp phép theo Giấy phép Creative Commons ghi nhận tác giả 3.0 chung và giấy phép Apache 2.0.