Serverless for Apache Spark と Lightning Engine で Spark を高速化する

1. はじめに

この Codelab では、Google Cloud Serverless for Apache Spark のネイティブ実行エンジンである Lightning Engine のパフォーマンス上のメリットについて説明し、Serverless for Apache Spark で Spark ワークロードを最適化する方法について説明します。

Lightning Engine は VeloxApache Gluten を使用します。Velox は、データ処理用の高性能 C++ エンジンです。Apache Gluten は、JVM ベースの Spark ジョブを Velox で実行できる C++ コードに変換する中間レイヤです。

このデモでは、意思決定支援システムのパフォーマンスを評価するために設計された業界標準のベンチマークである TPC-DS を使用します。ベースラインの PySpark ジョブを送信して、スタンダード Serverless ティアを使用してサンプルの TPC-DS データセットにクエリを実行します。次に、Lightning Engine を有効にして、プレミアム ティアを使用してまったく同じジョブを実行します。最後に、実行時間を比較し、Spark UI を掘り下げて、ハードウェア アクセラレーションによる Spark 実行グラフの違いを視覚化します。

この Codelab の実行にかかる費用は、クリーンアップ セクションの説明に従ってリソースを速やかにクリーンアップした場合、1.00 米ドル 未満です。

演習内容

  • ベンチマーク スクリプトと結果を保存するCloud Storageバケットを作成する
  • Serverless for Apache Spark スタンダード ティア を使用して、ベースラインの PySpark データ処理ジョブを実行する
  • Lightning Engine を使用して、Serverless for Apache Spark プレミアム ティア で同じジョブを実行する
  • ランタイム指標を比較する
  • Spark History Server UI を起動して、ネイティブ物理実行グラフを比較する

必要なもの

2. 始める前に

Google Cloud プロジェクトの作成

  1. Google Cloud コンソールのプロジェクト選択ページで、Google Cloud プロジェクトを選択または作成します。
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

Cloud Shell の起動

Cloud Shell は、必要なツールがプリロードされた Google Cloud 上で動作するコマンドライン環境です。

  1. Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする 」アイコンをクリックします。
  2. Cloud Shell に接続したら、認証を確認します。
    gcloud auth list
    
  3. プロジェクトが構成されていることを確認します。
    gcloud config get project
    
  4. プロジェクトが想定どおりに設定されていない場合は、設定します。
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

API を有効にする

この Codelab に必要なすべての API を有効にするには、次のコマンドを実行します。

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. 環境を準備する

このステップでは、環境変数を初期化し、Cloud Storage バケットを作成します。このバケットには、Serverless for Apache Spark の両方のティアに送信する PySpark スクリプトが格納されます。

環境変数の設定

Cloud Shell で次のコマンドを実行して、デフォルトの環境変数を設定します。us-central1 リージョンを使用しますが、必要に応じて変更できます。

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Cloud Storage バケットを作成する

スクリプトとログを保持するバケットを作成します。

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

TPC-DS データセットを自分のバケットにコピーする

このステップでは、TPC-DS データセットを公開バケットから自分の Cloud Storage バケットにコピーします。これにより、PySpark ジョブでプロジェクトからローカルにデータを読み取ることができます。

データセットのサイズとタイプを選択する環境変数を設定します。

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

TPC-DS データを自分のバケットにコピーします。

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

PySpark ベンチマーク スクリプトを作成する

Cloud Storage バケットから標準の TPC-DS テーブルを登録し、Apache Spark 公開リポジトリから取得した 5 つの標準クエリを実行する PySpark スクリプトを使用します。このスクリプトは、データセットのパスを引数として受け取ります。

Cloud Shell で benchmark.py という名前のファイルを作成します。次のコマンドをコピーして貼り付けると、ファイルが生成されます。

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

スクリプトを Cloud Storage バケットにコピーして、Serverless for Apache Spark がアクセスできるようにします。

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. ベースラインの Serverless ジョブを実行する

Lightning Engine を使用せずにベースラインの比較を行うには、先ほどアップロードした PySpark ベンチマーク ジョブを Serverless for Apache Spark スタンダード ティアに送信します。コピーしたデータセットのパスを引数として渡します。

次のコマンドを実行して、バッチジョブを実行します。

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

ジョブをモニタリングする

ジョブの実行中、Cloud Shell ターミナルに PySpark ログがストリーミングされます。Serverless for Apache Spark は、コンテナを割り当て、Cloud Storage から TPC-DS Parquet データセットを読み取り、複雑な SQL プランを実行します。

スクリプトが完了したら、コンソール出力を確認します。次のように、実行された標準クエリごとに結果とタイミングが表示されます。

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

完了までの合計秒数をメモします。これがベースライン ランタイム です。

5. Serverless Premium と Lightning Engine で実行する

次に、Serverless for Apache Spark でまったく同じ Spark ジョブを実行しますが、プレミアム ティア を使用し、Google のネイティブ ベクトル化クエリエンジンであるLightning Engine を有効にします。

Lightning Engine を明示的に有効にして、ベンチマーク ジョブを Serverless に送信します。

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

結果を比較する

ジョブが完了するまで待って、出力を確認します。同じクエリ結果が表示されます。完了時間をよく見てください。

...
All benchmark queries completed in 64.24 seconds.

ベースラインの Serverless 実行と Serverless Lightning Engine 実行を比較すると、Lightning Engine は、ネイティブ C++ 実行レイヤとバックエンドでのベクトル化処理を利用して、グループ化、集計、結合を高速に実行していることがわかります。PySpark アプリケーション コードを変更する必要はありません。

Lightning Engine は、ワークロードが大きいほどパフォーマンスが向上するように最適化されています。この例では小さなデータセットを使用しているため、パフォーマンスの向上はそれほど大きくありません。10 TB のデータセットでは、オープンソースの Spark と比較して最大 4.3 倍のパフォーマンス向上がベンチマークで示されています。

6. Spark UI で実行グラフを比較する

ランタイムの短縮は目覚ましいものですが、クエリの実行中に Spark が実際に何を行っているのかを詳しく見てみましょう。 これを行うには、両方のジョブの Spark UI 実行グラフを確認します。

  1. ブラウザで Google Cloud コンソール を開きます。
  2. [Dataproc] > [バッチ] に移動します。
  3. リストに 2 つのバッチが表示されます。標準のベースライン実行とプレミアム ティアの実行です。
  4. 実行したプレミアム ティアのバッチをクリックし、[Spark UI を表示]、[詳細を表示] の順にクリックします。
  5. Spark UI で、[ジョブ] タブに移動します。
  6. [完了したジョブ] の検索ボックスに「Velox」と入力します。
  7. VeloxSparkPlanExecApi を含むジョブの説明が多数表示されます。これは、Lightning Engine で使用されている Velox ネイティブ実行エンジンを指します。

次に、スタンダード ティアの実行でこのプロセスを繰り返します。

  1. Serverless for Apache Spark の [バッチ] ページに戻ります。
  2. [スタンダード ティア] バッチのリンクをクリックし、[Spark UI を表示]、[詳細を表示] の順にクリックします。
  3. Spark UI で、[ジョブ] タブに移動します。
  4. [完了したジョブ] の検索ボックスに「Velox」と入力します。
  5. ジョブの説明に Velox API の記述はありません。

7. クリーンアップ

Google Cloud アカウントに継続的に課金されないようにするには、この Codelab で作成したリソースを削除します。

Cloud Shell で、Cloud Storage バケットとその内容を削除します。

gcloud storage rm -r gs://${BUCKET_NAME}

benchmark.py のローカルコピーを削除します。

rm benchmark.py

8. 完了

おめでとうございます!Apache Spark のベンチマーク環境を構築し、Serverless for Apache Spark スタンダードと Serverless for Apache Spark プレミアムを比較できました。

Serverless for Apache Spark の新しい Lightning Engine を有効にすると、Spark ワークロードのランタイムを短縮できることを確認しました。また、Spark UI を使用して、物理実行グラフがネイティブ クエリエンジンを使用してネイティブ C++ コードに変換される方法を確認しました。

学習した内容

  • PySpark データセット ベンチマーク スクリプトを作成する方法。
  • Spark ジョブを Serverless for Apache Spark に送信する方法。
  • Lightning Engine を有効にする方法。
  • Spark UI でジョブプランを比較する方法。

次のステップ