Accelerate Spark with Serverless for Apache Spark and Lightning Engine

1. Introduction

In this codelab, you will explore the performance benefits of Google Cloud Serverless for Apache Spark's native execution engine, Lightning Engine, and examine how it optimizes your Spark workloads on Serverless for Apache Spark.

Lightning Engine uses Velox and Apache Gluten. Velox is a high-performance C++ engine for data processing. Apache Gluten is a middle layer responsible for converting JVM-based Spark jobs to C++ code that can be executed by Velox.

This demo uses TPC-DS, an industry-standard benchmark designed to evaluate the performance of decision support systems. You will submit a baseline PySpark job to query a sample TPC-DS dataset using the Standard Serverless tier. Then, you will run the exact same job using the Premium tier with Lightning Engine enabled. Finally, you will compare the execution time and dive into the Spark UI to visualize the difference in hardware-accelerated Spark execution graphs.

The estimated cost to run this codelab is less than $1.00 USD, assuming resources are cleaned up promptly as described in the Clean up section.

What you'll do

  • Create a Cloud Storage bucket to store your benchmark scripts and results
  • Execute a baseline PySpark data processing job using the Serverless for Apache Spark Standard tier
  • Execute the same job using the Serverless for Apache Spark Premium tier with Lightning Engine
  • Compare the runtime metrics
  • Launch the Spark History Server UI to compare the native physical execution graphs

What you'll need

2. Before you begin

Create a Google Cloud Project

  1. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
  2. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

Start Cloud Shell

Cloud Shell is a command-line environment running in Google Cloud that comes preloaded with necessary tools.

  1. Click Activate Cloud Shell at the top of the Google Cloud console.
  2. Once connected to Cloud Shell, verify your authentication:
    gcloud auth list
    
  3. Confirm your project is configured:
    gcloud config get project
    
  4. If your project is not set as expected, set it:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Enable APIs

Run this command to enable all the required APIs for this codelab:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Prepare Your Environment

In this step, you will initialize environment variables and create a Cloud Storage bucket. This bucket will hold the PySpark script you submit to both Serverless for Apache Spark tiers.

Set Environment Variables

Run the following commands in Cloud Shell to set default environment variables. We will use the us-central1 region, but you can change this if you prefer.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Create a Cloud Storage Bucket

Create the bucket to hold your scripts and logs:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

Copy the TPC-DS Dataset to Your Own Bucket

In this step, you will copy the TPC-DS dataset from a public bucket to your own Cloud Storage bucket. This ensures your PySpark jobs can read data locally from your project.

Set environment variables to choose the dataset size and type:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

Copy the TPC-DS data into your own bucket:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

Create the PySpark Benchmark Script

We will use a PySpark script that registers the standard TPC-DS tables from your Cloud Storage bucket and executes 5 standard queries sourced from the Apache Spark public repository. The script accepts the path to your dataset as an argument.

Create a file named benchmark.py in Cloud Shell. You can copy and paste the following command to generate the file:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Copy the script up to your Cloud Storage bucket so Serverless for Apache Spark can access it:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Run the Baseline Serverless Job

To provide a baseline comparison without Lightning Engine, submit the PySpark benchmarking job you uploaded earlier to the Serverless for Apache Spark Standard tier. We will pass the path to the dataset you copied as an argument.

Run the following command to execute the batch job:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

Monitor the Job

While the job is executing, you will see PySpark logs streaming in your Cloud Shell terminal. Serverless for Apache Spark is allocating containers, reading the TPC-DS Parquet dataset from Cloud Storage, and executing the complex SQL plans.

Once the script completes, observe the console output. You should see results and timings for each executed standard query, similar to:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Take note of the total seconds it took to complete. This is your baseline runtime.

5. Run with Serverless Premium and Lightning Engine

Next, you will run the exact same Spark job on Serverless for Apache Spark, but using the Premium tier and enabling Google's native, vectorized query engine: Lightning Engine.

Submit the benchmark job to Serverless with Lightning Engine explicitly enabled:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Compare the Results

Wait for the job to complete and examine the output. You should see the same query results. Look closely at the completion time:

...
All benchmark queries completed in 64.24 seconds.

Comparing the baseline Serverless run to the Serverless Lightning Engine run, you'll notice that the Lightning Engine executes the grouping, aggregations, and joins faster by utilizing a native C++ execution layer and vectorized processing on the backend, without requiring any changes to your PySpark application code whatsoever.

Lightning Engine is optimized for increased performance the bigger the workload. In this example we are using a small dataset, so the performance increase is not as dramatic as it could be. On a 10TB dataset, up to a 4.3x performance improvement over open source Spark has been shown in benchmarks.

6. Compare Execution Graphs in Spark UI

The runtime reduction is impressive, but let's look under the hood at what Spark is actually doing during query execution. You can do this by examining the Spark UI execution graphs for both jobs.

  1. Open the Google Cloud console in your browser.
  2. Navigate to Dataproc > Batches.
  3. You will see two batches in the list: your standard baseline run and your Premium tier run.
  4. Click the Premium tier batch you ran, then click View Spark UI and then View Details.
  5. In the Spark UI, navigate to the Jobs tab.
  6. Under Completed Jobs, in the search box, type Velox.
  7. You'll see many job descriptions that include VeloxSparkPlanExecApi. This refers to the Velox native execution engine being used by Lightning Engine.

Now, repeat this process for the Standard tier run:

  1. Go back to the Serverless for Apache Spark Batches page.
  2. Click the link for the Standard tier batch, then click View Spark UI and then View Details.
  3. In the Spark UI, navigate to the Jobs tab.
  4. Under Completed Jobs, in the search box, type Velox.
  5. You'll see no mention of the Velox API in the job descriptions.

7. Clean up

To avoid ongoing charges to your Google Cloud account, delete the resources created during this codelab.

In Cloud Shell, delete the Cloud Storage bucket and its contents:

gcloud storage rm -r gs://${BUCKET_NAME}

Delete your local copy of benchmark.py:

rm benchmark.py

8. Congratulations

Congratulations! You've successfully built a benchmarking environment for Apache Spark and compared Serverless for Apache Spark Standard against Serverless for Apache Spark Premium.

You saw firsthand how enabling Serverless for Apache Spark's new Lightning Engine can reduce your Spark workload's runtime, and you explored the Spark UI to see how the physical execution graph is transformed into native C++ code using the Native Query Engine.

What you've learned

  • How to write a PySpark dataset benchmarking script.
  • How to submit Spark jobs to Serverless for Apache Spark.
  • How to enable Lightning Engine.
  • How to compare job plans in the Spark UI.

Next steps