使用 Managed Service for Apache Spark 和 Lightning Engine 加速 Spark

1. 简介

在本 Codelab 中,您将探索 Managed Service for Apache Spark 的原生执行引擎 Lightning Engine 的性能优势,并了解它如何将 Managed Apache Spark 无服务器 上的 Spark 工作负载优化到高达 4.9 倍的速度。

Lightning Engine 使用 VeloxApache Gluten。Velox 是一款用于数据处理的高性能 C++ 引擎。Apache Gluten 是一个中间层,负责将基于 JVM 的 Spark 作业转换为可由 Velox 执行的 C++ 代码。

此演示使用 TPC-DS,这是一个旨在评估决策支持系统性能的行业标准基准。您将提交基准 PySpark 作业,以使用标准无服务器层级查询示例 TPC-DS 数据集。然后,您将使用启用了 Lightning Engine 的高级层级运行完全相同的作业。最后,您将比较执行时间,并深入了解 Spark 界面,以直观呈现硬件加速的 Spark 执行图之间的差异。

假设您按照清理 部分中的说明及时清理资源,运行此 Codelab 的预计费用将低于 1.00 美元

您将执行的操作

  • 创建 Cloud Storage 存储分区以存储基准脚本和结果
  • 使用 Managed Apache Spark 无服务器标准层级 执行基准 PySpark 数据处理作业
  • 使用 Managed Apache Spark 无服务器高级层级 和 Lightning Engine 执行同一作业
  • 比较运行时指标
  • 启动 Spark 历史记录服务器界面以比较原生物理执行图

所需条件

2. 准备工作

创建 Google Cloud 项目

  1. Google Cloud 控制台 的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的云项目已启用结算功能。了解如何检查项目是否已启用结算功能

启动 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的一个命令行环境,其中预加载了必要的工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 连接到 Cloud Shell 后,验证您的身份验证:
    gcloud auth list
    
  3. 确认您的项目已配置:
    gcloud config get project
    
  4. 如果您的项目未按预期设置,请进行设置:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

启用 API

运行以下命令可启用此 Codelab 所需的所有 API:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. 准备环境

在此步骤中,您将初始化环境变量并创建 Cloud Storage 存储分区。此存储分区将保存您提交给 Serverless for Apache Spark 的两个层级的 PySpark 脚本。

设置环境变量

在 Cloud Shell 中运行以下命令,以设置默认环境变量。我们将使用 us-central1 区域,但您可以根据需要更改此区域。

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

创建 Cloud Storage 存储分区

创建存储分区以保存脚本和日志:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

将 TPC-DS 数据集复制到您自己的存储分区

在此步骤中,您将 TPC-DS 数据集从公共存储分区复制到您自己的 Cloud Storage 存储分区。这可确保您的 PySpark 作业可以从您的项目本地读取数据。

设置环境变量以选择数据集大小和类型:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

将 TPC-DS 数据复制到您自己的存储分区:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

创建 PySpark 基准脚本

我们将使用一个 PySpark 脚本,该脚本会注册 Cloud Storage 存储分区中的标准 TPC-DS 表,并执行来自 Apache Spark 公共代码库的 5 个标准查询。该脚本接受数据集的路径作为实参。

在 Cloud Shell 中创建一个名为 benchmark.py 的文件。您可以复制并粘贴以下命令来生成该文件:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

将脚本复制到 Cloud Storage 存储分区,以便 Serverless for Apache Spark 可以访问该脚本:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. 运行基准无服务器作业

如需提供不使用 Lightning Engine 的基准比较,请将您之前上传的 PySpark 基准作业提交到 Serverless for Apache Spark 标准层级。我们将传递您复制的数据集的路径作为实参。

运行以下命令以执行批量作业:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

监控作业

作业执行期间,您会在 Cloud Shell 终端中看到 PySpark 日志流。Serverless for Apache Spark 正在分配容器、从 Cloud Storage 读取 TPC-DS Parquet 数据集,并执行复杂的 SQL 计划。

脚本完成后,观察控制台输出。您应该会看到每个执行的标准查询的结果和时间,类似于:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

记下完成所需总秒数。这是您的基准运行时

5. 使用 Serverless 高级版和 Lightning Engine 运行

接下来,您将在 Managed Apache Spark 无服务器 上运行完全相同的 Spark 作业,但使用 高级层级 并启用 Google 的原生矢量化查询引擎:Lightning Engine

提交基准作业到 Serverless,并明确启用 Lightning Engine:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

比较结果

等待作业完成并检查输出。您应该会看到相同的查询结果。仔细查看完成时间:

...
All benchmark queries completed in 64.24 seconds.

将基准作业与 Lightning Engine 作业进行比较,您会注意到 Lightning Engine 通过利用原生 C++ 执行层和后端矢量化处理来更快地执行分组、聚合和联接,而无需对 PySpark 应用代码进行任何更改。

Lightning Engine 经过优化,工作负载越大,性能提升越高。在此示例中,我们使用的是小型数据集,因此性能提升不如预期那么显著。在 10TB 数据集上,基准测试显示,与开源 Spark 相比,性能提升高达 4.9 倍

6. 在 Spark 界面中比较执行图

运行时缩短令人印象深刻,但让我们深入了解一下 Spark 在查询执行期间实际执行的操作。 您可以通过检查这两个作业的 Spark 界面执行图来完成此操作。

  1. 在浏览器中打开 Google Cloud 控制台
  2. 依次前往 Managed Apache Spark > 批量
  3. 您会在列表中看到两个批次:标准基准运行和高级层级运行。
  4. 点击您运行的高级层级批次,然后依次点击查看 Spark 界面查看详情
  5. 在 Spark 界面中,前往作业 标签页。
  6. 已完成的作业 下方的搜索框中,输入 Velox
  7. 您会看到许多作业说明,其中包含 VeloxSparkPlanExecApi。这指的是 Lightning Engine 使用的 Velox 原生执行引擎。

现在,对标准层级运行重复此过程:

  1. 返回到 Serverless for Apache Spark 批量页面。
  2. 点击标准层级 批次的链接,然后依次点击查看 Spark 界面查看详情
  3. 在 Spark 界面中,前往作业 标签页。
  4. 已完成的作业 下方的搜索框中,输入 Velox
  5. 您会在作业说明中看到没有提及 Velox API。

7. 清理

为避免系统持续向您的 Google Cloud 账号收取费用,请删除在此 Codelab 期间创建的资源。

在 Cloud Shell 中删除 Cloud Storage 存储分区及其内容:

gcloud storage rm -r gs://${BUCKET_NAME}

删除 benchmark.py 的本地副本:

rm benchmark.py

8. 恭喜

恭喜!您已成功为 Apache Spark 构建基准测试环境,并比较了 Managed Apache Spark 无服务器标准版和 Managed Apache Spark 无服务器高级版。

您亲眼目睹了启用 Managed Apache Spark 无服务器的新 Lightning Engine 如何缩短 Spark 工作负载的运行时,并探索了 Spark 界面,了解如何使用原生查询引擎将物理执行图转换为原生 C++ 代码。

您学到的内容

  • 如何编写 PySpark 数据集基准脚本。
  • 如何将 Spark 作业提交到 Managed Apache Spark 无服务器。
  • 如何启用 Lightning Engine。
  • 如何在 Spark 界面中比较作业计划。

后续步骤