Managed Service for Apache Spark と Lightning Engine で Spark を高速化する

1. はじめに

この Codelab では、Managed Service for Apache Spark のネイティブ実行エンジンである Lightning Engine のパフォーマンス上のメリットについて説明し、Managed Apache Spark サーバーレスで Spark ワークロードを最大 4.9 倍高速に最適化する方法について説明します。

Lightning Engine は VeloxApache Gluten を使用します。Velox はデータ処理用の高性能 C++ エンジンです。Apache Gluten は、JVM ベースの Spark ジョブを Velox で実行できる C++ コードに変換する中間レイヤです。

このデモでは、意思決定支援システムのパフォーマンスを評価するために設計された業界標準のベンチマークである TPC-DS を使用します。ベースラインの PySpark ジョブを送信して、Standard サーバーレス ティアを使用してサンプル TPC-DS データセットをクエリします。次に、Lightning Engine を有効にして Premium 階層を使用して、まったく同じジョブを実行します。最後に、実行時間を比較し、Spark UI を詳しく調べて、ハードウェア アクセラレーションされた Spark 実行グラフの違いを可視化します。

この Codelab の実行にかかる費用は、クリーンアップ セクションの説明に従ってリソースを速やかにクリーンアップした場合、$1.00 USD 未満です。

演習内容

  • ベンチマーク スクリプトと結果を保存する Cloud Storage バケットを作成する
  • マネージド Apache Spark サーバーレス Standard 階層を使用して、ベースライン PySpark データ処理ジョブを実行する
  • Lightning Engine を使用して Managed Apache Spark サーバーレス プレミアム ティアで同じジョブを実行する
  • ランタイム指標を比較する
  • Spark History Server UI を起動して、ネイティブの物理実行グラフを比較する

必要なもの

2. 始める前に

Google Cloud プロジェクトの作成

  1. Google Cloud コンソールのプロジェクト セレクタ ページで、Google Cloud プロジェクトを選択または作成します。
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

Cloud Shell の起動

Cloud Shell は、必要なツールがプリロードされた Google Cloud で動作するコマンドライン環境です。

  1. Google Cloud コンソールの上部にある [Cloud Shell をアクティブにする] をクリックします。
  2. Cloud Shell に接続したら、認証を確認します。
    gcloud auth list
    
  3. プロジェクトが構成されていることを確認します。
    gcloud config get project
    
  4. プロジェクトが想定どおりに設定されていない場合は、設定します。
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

API を有効にする

次のコマンドを実行して、この Codelab に必要なすべての API を有効にします。

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. 環境を準備する

このステップでは、環境変数を初期化し、Cloud Storage バケットを作成します。このバケットには、Serverless for Apache Spark の両方の階層に送信する PySpark スクリプトが格納されます。

環境変数の設定

Cloud Shell で次のコマンドを実行して、デフォルトの環境変数を設定します。ここでは us-central1 リージョンを使用しますが、必要に応じて変更できます。

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Cloud Storage バケットを作成する

スクリプトとログを保持するバケットを作成します。

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

TPC-DS データセットを独自のバケットにコピーする

このステップでは、一般公開バケットから独自の Cloud Storage バケットに TPC-DS データセットをコピーします。これにより、PySpark ジョブがプロジェクトからローカルでデータを読み取ることができます。

環境変数を設定して、データセットのサイズとタイプを選択します。

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

TPC-DS データを独自のバケットにコピーします。

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

PySpark ベンチマーク スクリプトを作成する

Cloud Storage バケットから標準の TPC-DS テーブルを登録し、Apache Spark 公開リポジトリから取得した 5 つの標準クエリを実行する PySpark スクリプトを使用します。このスクリプトは、データセットへのパスを引数として受け取ります。

Cloud Shell で benchmark.py という名前のファイルを作成します。次のコマンドをコピーして貼り付けると、ファイルを生成できます。

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Apache Spark 用 Serverless がアクセスできるように、スクリプトを Cloud Storage バケットにコピーします。

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. ベースライン サーバーレス ジョブを実行する

Lightning Engine を使用しないベースライン比較を行うには、先ほどアップロードした PySpark ベンチマーク ジョブを Apache Spark 向け Serverless の Standard 階層に送信します。コピーしたデータセットのパスを引数として渡します。

次のコマンドを実行して、バッチジョブを実行します。

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

ジョブをモニタリングする

ジョブの実行中は、Cloud Shell ターミナルに PySpark ログがストリーミングされます。Serverless for Apache Spark は、コンテナを割り当て、Cloud Storage から TPC-DS Parquet データセットを読み取り、複雑な SQL プランを実行します。

スクリプトが完了したら、コンソール出力を確認します。次のように、実行された各標準クエリの結果とタイミングが表示されます。

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

完了までの合計秒数をメモします。これが基準ランタイムです。

5. Serverless Premium と Lightning Engine で実行する

次に、Managed Apache Spark サーバーレスでまったく同じ Spark ジョブを実行しますが、プレミアム ティアを使用して、Google のネイティブのベクトル化されたクエリ エンジンである Lightning Engine を有効にします。

Lightning Engine を明示的に有効にして、ベンチマーク ジョブをサーバーレスに送信します。

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

結果を比較する

ジョブが完了するまで待って、出力を確認します。同じクエリ結果が表示されます。完了時間をよく確認します。

...
All benchmark queries completed in 64.24 seconds.

ベースライン ジョブと Lightning Engine ジョブを比較すると、Lightning Engine は、PySpark アプリケーション コードを一切変更することなく、ネイティブの C++ 実行レイヤとバックエンドのベクトル化された処理を利用して、グループ化、集計、結合を高速に実行していることがわかります。

Lightning Engine は、ワークロードが大きいほどパフォーマンスが向上するように最適化されています。この例では小さなデータセットを使用しているため、パフォーマンスの向上はそれほど大きくありません。10 TB のデータセットでは、ベンチマークでオープンソースの Spark と比較して最大 4.9 倍のパフォーマンス向上が示されています。

6. Spark UI で実行グラフを比較する

実行時間の短縮は目覚ましいものですが、クエリ実行中に Spark が実際に何を行っているのかを詳しく見てみましょう。これを行うには、両方のジョブの Spark UI 実行グラフを調べます。

  1. ブラウザで Google Cloud コンソールを開きます。
  2. [Managed Apache Spark] > [バッチ] に移動します。
  3. リストには、標準ベースライン実行とプレミアム ティア実行の 2 つのバッチが表示されます。
  4. 実行した Premium 階層のバッチをクリックし、[Spark UI を表示]、[詳細を表示] の順にクリックします。
  5. Spark UI で、[Jobs] タブに移動します。
  6. [完了したジョブ] の検索ボックスに「Velox」と入力します。
  7. VeloxSparkPlanExecApi を含む求人情報が多数表示されます。これは、Lightning Engine で使用されている Velox ネイティブ実行エンジンを指します。

次に、スタンダード ティアの実行についてこのプロセスを繰り返します。

  1. Serverless for Apache Spark の [バッチ] ページに戻ります。
  2. [Standard 階層] バッチのリンクをクリックし、[Spark UI を表示]、[詳細を表示] の順にクリックします。
  3. Spark UI で、[Jobs] タブに移動します。
  4. [完了したジョブ] の検索ボックスに「Velox」と入力します。
  5. 求人情報には Velox API についての記載はありません。

7. クリーンアップ

Google Cloud アカウントに継続的に課金されないようにするには、この Codelab で作成したリソースを削除します。

Cloud Shell で、Cloud Storage バケットとその内容を削除します。

gcloud storage rm -r gs://${BUCKET_NAME}

benchmark.py のローカルコピーを削除します。

rm benchmark.py

8. 完了

おめでとうございます!Apache Spark のベンチマーク環境を構築し、Managed Apache Spark サーバーレス Standard と Managed Apache Spark サーバーレス Premium を比較しました。

Managed Apache Spark サーバーレスの新しい Lightning Engine を有効にすると、Spark ワークロードの実行時間を短縮できることを確認しました。また、Spark UI を使用して、物理実行グラフが Native Query Engine を使用してネイティブ C++ コードに変換される様子を確認しました。

学習した内容

  • PySpark データセットのベンチマーク スクリプトの作成方法。
  • Managed Apache Spark サーバーレスに Spark ジョブを送信する方法。
  • Lightning Engine を有効にする方法。
  • Spark UI でジョブプランを比較する方法。

次のステップ