1. 简介
在此 Codelab 中,您将探索 Google Cloud Serverless for Apache Spark 的原生执行引擎 Lightning Engine 的性能优势,并了解它如何优化 Serverless for Apache Spark 上的 Spark 工作负载。
Lightning Engine 使用 Velox 和 Apache Gluten。Velox 是一款用于数据处理的高性能 C++ 引擎。Apache Gluten 是一个中间层,负责将基于 JVM 的 Spark 作业转换为可由 Velox 执行的 C++ 代码。
此演示使用 TPC-DS,这是一种旨在评估决策支持系统性能的行业标准基准。您将提交一个基准 PySpark 作业,以使用标准无服务器层级查询 TPC-DS 示例数据集。然后,您将使用启用 Lightning Engine 的高级层级运行完全相同的作业。最后,您将比较执行时间,并深入了解 Spark 界面,直观呈现硬件加速的 Spark 执行图表的差异。
假设您按照清理部分中的说明及时清理资源,则运行此 Codelab 的估计费用不到 1.00 美元。
您将执行的操作
- 创建一个 Cloud Storage 存储分区,用于存储基准脚本和结果
- 使用 Serverless for Apache Spark 标准层执行基准 PySpark 数据处理作业
- 使用 Serverless for Apache Spark 高级层和 Lightning Engine 执行同一作业
- 比较运行时指标
- 启动 Spark 历史记录服务器界面,比较原生物理执行图
所需条件
- 网络浏览器,例如 Chrome
- 已启用结算功能的 Google Cloud 项目
- 基本熟悉 Apache Spark 和 Linux 命令行
2. 准备工作
创建 Google Cloud 项目
- 在 Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目。
- 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
启动 Cloud Shell
Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了必要的工具。
- 点击 Google Cloud 控制台顶部的激活 Cloud Shell。
- 连接到 Cloud Shell 后,验证您的身份验证:
gcloud auth list - 确认您的项目已配置:
gcloud config get project - 如果项目未按预期设置,请进行设置:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
启用 API
运行以下命令可为此 Codelab 启用所有必需的 API:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. 准备环境
在此步骤中,您将初始化环境变量并创建 Cloud Storage 存储分区。此存储分区将用于存放您提交到两个 Serverless for Apache Spark 层级的 PySpark 脚本。
设置环境变量
在 Cloud Shell 中运行以下命令,以设置默认环境变量。我们将使用 us-central1 区域,但您可以根据需要更改此设置。
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
创建 Cloud Storage 存储桶
创建用于存放脚本和日志的存储分区:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
将 TPC-DS 数据集复制到您自己的存储分区
在此步骤中,您将把 TPC-DS 数据集从公共存储分区复制到您自己的 Cloud Storage 存储分区。这可确保您的 PySpark 作业能够从您的项目中读取本地数据。
设置环境变量以选择数据集大小和类型:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
将 TPC-DS 数据复制到您自己的存储分区中:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
创建 PySpark 基准脚本
我们将使用一个 PySpark 脚本,该脚本会注册 Cloud Storage 存储分区中的标准 TPC-DS 表,并执行来自 Apache Spark 公共代码库的 5 个标准查询。该脚本接受数据集的路径作为实参。
在 Cloud Shell 中创建一个名为 benchmark.py 的文件。您可以复制并粘贴以下命令来生成文件:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
将脚本复制到 Cloud Storage 存储分区,以便 Serverless for Apache Spark 可以访问该脚本:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. 运行基准无服务器作业
为了提供不使用 Lightning Engine 的基准比较,请将您之前上传的 PySpark 基准测试作业提交到 Serverless for Apache Spark 标准层。我们将以实参的形式传递您复制的数据集的路径。
运行以下命令以执行批量作业:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
监控作业
作业执行期间,您会在 Cloud Shell 终端中看到 PySpark 日志流。Serverless for Apache Spark 正在分配容器、从 Cloud Storage 读取 TPC-DS Parquet 数据集,并执行复杂的 SQL 计划。
脚本完成后,观察控制台输出。您应该会看到每个已执行的标准查询的结果和时间,如下所示:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
记下完成测试所用的总秒数。这是您的基准运行时长。
5. 使用 Serverless Premium 和 Lightning Engine 运行
接下来,您将在 Serverless for Apache Spark 上运行完全相同的 Spark 作业,但使用高级层并启用 Google 的原生矢量化查询引擎 Lightning Engine。
将基准作业提交到无服务器,并明确启用 Lightning Engine:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
比较结果
等待作业完成并检查输出。您应该会看到相同的查询结果。仔细查看完成时间:
... All benchmark queries completed in 64.24 seconds.
将基准无服务器运行与无服务器 Lightning Engine 运行进行比较,您会发现 Lightning Engine 通过利用原生 C++ 执行层和后端矢量化处理,更快地执行分组、聚合和联接,而无需对 PySpark 应用代码进行任何更改。
Lightning Engine 经过优化,可随着工作负载的增加而提升性能。在此示例中,我们使用的是小型数据集,因此性能提升幅度不如使用大型数据集时那么显著。在 10 TB 的数据集上,基准测试表明,与开源 Spark 相比,性能最多可提升 4.3 倍。
6. 在 Spark 界面中比较执行图
运行时缩短效果令人印象深刻,但我们不妨深入了解 Spark 在查询执行期间实际执行的操作。您可以通过检查这两个作业的 Spark 界面执行图来实现此目的。
- 在浏览器中打开 Google Cloud 控制台。
- 前往 Dataproc > 批处理。
- 您会在列表中看到两个批次:标准基准测试和高级层级测试。
- 点击您运行的 Premium 级批处理,然后依次点击查看 Spark 界面和查看详情。
- 在 Spark 界面中,前往作业标签页。
- 在已完成的作业下方的搜索框中,输入
Velox。 - 您会看到许多包含
VeloxSparkPlanExecApi的职位说明。指 Lightning Engine 使用的 Velox 原生执行引擎。
现在,针对标准层级运行重复此流程:
- 返回到“Serverless for Apache Spark 批次”页面。
- 点击标准层级批次的链接,然后依次点击查看 Spark 界面和查看详情。
- 在 Spark 界面中,前往作业标签页。
- 在已完成的作业下方的搜索框中,输入
Velox。 - 您不会在作业说明中看到任何有关 Velox API 的内容。
7. 清理
为避免系统向您的 Google Cloud 账号持续收取费用,请删除本 Codelab 中创建的资源。
在 Cloud Shell 中,删除 Cloud Storage 存储分区及其内容:
gcloud storage rm -r gs://${BUCKET_NAME}
删除 benchmark.py 的本地副本:
rm benchmark.py
8. 恭喜
恭喜!您已成功为 Apache Spark 构建基准比较环境,并将 Serverless for Apache Spark Standard 与 Serverless for Apache Spark Premium 进行了比较。
您亲眼看到了启用 Serverless for Apache Spark 的全新 Lightning Engine 如何缩短 Spark 工作负载的运行时长,还探索了 Spark 界面,了解了如何使用 Native Query Engine 将物理执行图转换为原生 C++ 代码。
您学到的内容
- 如何编写 PySpark 数据集基准测试脚本。
- 如何将 Spark 作业提交到 Serverless for Apache Spark。
- 如何启用 Lightning Engine。
- 如何在 Spark 界面中比较作业计划。