Accelerare Spark con Serverless per Apache Spark e Lightning Engine

1. Introduzione

In questo codelab, esplorerai i vantaggi in termini di prestazioni del motore di esecuzione nativo di Google Cloud Serverless per Apache Spark, Lightning Engine, ed esaminerai come ottimizza i tuoi workload Spark su Serverless per Apache Spark.

Lightning Engine utilizza Velox e Apache Gluten. Velox è un motore C++ ad alte prestazioni per l'elaborazione dei dati. Apache Gluten è un livello intermedio responsabile della conversione dei job Spark basati su JVM in codice C++ che può essere eseguito da Velox.

Questa demo utilizza TPC-DS, un benchmark standard del settore progettato per valutare le prestazioni dei sistemi di supporto decisionale. Invierai un job PySpark di base per eseguire query su un set di dati TPC-DS di esempio utilizzando il livello Serverless Standard. Poi, eseguirai lo stesso job utilizzando il livello Premium con Lightning Engine abilitato. Infine, confronterai il tempo di esecuzione ed esaminerai l'interfaccia utente di Spark per visualizzare la differenza nei grafici di esecuzione di Spark con accelerazione hardware.

Il costo stimato per eseguire questo codelab è inferiore a 1,00$, presupponendo che le risorse vengano liberate tempestivamente come descritto nella sezione Liberare spazio.

In questo lab proverai a:

  • Creare un bucket Cloud Storage per archiviare gli script e i risultati dei benchmark
  • Eseguire un job di elaborazione dei dati PySpark di base utilizzando il livello Serverless per Apache Spark Standard
  • Eseguire lo stesso job utilizzando il livello Serverless per Apache Spark Premium con Lightning Engine
  • Confrontare le metriche di runtime
  • Avviare l'interfaccia utente del server di cronologia Spark per confrontare i grafici di esecuzione fisica nativa

Che cosa ti serve

2. Prima di iniziare

Crea un progetto Google Cloud

  1. Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
  2. Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.

Avvia Cloud Shell

Cloud Shell è un ambiente a riga di comando in esecuzione in Google Cloud che viene fornito con gli strumenti necessari precaricati.

  1. Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.
  2. Una volta eseguita la connessione a Cloud Shell, verifica l'autenticazione:
    gcloud auth list
    
  3. Verifica che il progetto sia configurato:
    gcloud config get project
    
  4. Se il progetto non è impostato come previsto, impostalo:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Abilita API

Esegui questo comando per abilitare tutte le API richieste per questo codelab:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Prepara l'ambiente

In questo passaggio, inizializzerai le variabili di ambiente e creerai un bucket Cloud Storage. Questo bucket conterrà lo script PySpark che invierai a entrambi i livelli Serverless per Apache Spark.

Imposta le variabili di ambiente

Esegui i seguenti comandi in Cloud Shell per impostare le variabili di ambiente predefinite. Utilizzeremo la regione us-central1, ma puoi modificarla se preferisci.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Crea un bucket Cloud Storage

Crea il bucket per contenere gli script e i log:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

Copia il set di dati TPC-DS nel tuo bucket

In questo passaggio, copierai il set di dati TPC-DS da un bucket pubblico al tuo bucket Cloud Storage. In questo modo, i job PySpark possono leggere i dati localmente dal tuo progetto.

Imposta le variabili di ambiente per scegliere le dimensioni e il tipo del set di dati:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

Copia i dati TPC-DS nel tuo bucket:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

Crea lo script di benchmark PySpark

Utilizzeremo uno script PySpark che registra le tabelle TPC-DS standard dal tuo bucket Cloud Storage ed esegue 5 query standard provenienti dal repository pubblico di Apache Spark. Lo script accetta il percorso del set di dati come argomento.

Crea un file denominato benchmark.py in Cloud Shell. Puoi copiare e incollare il seguente comando per generare il file:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Copia lo script nel bucket Cloud Storage in modo che Serverless per Apache Spark possa accedervi:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Esegui il job Serverless di base

Per fornire un confronto di base senza Lightning Engine, invia il job di benchmarking PySpark che hai caricato in precedenza al livello Serverless per Apache Spark Standard. Passeremo il percorso del set di dati che hai copiato come argomento.

Esegui il comando seguente per eseguire il job batch:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

Monitora il job

Durante l'esecuzione del job, vedrai i log PySpark in streaming nel terminale Cloud Shell. Serverless per Apache Spark sta allocando i container, leggendo il set di dati Parquet TPC-DS da Cloud Storage ed eseguendo i piani SQL complessi.

Al termine dello script, osserva l'output della console. Dovresti vedere i risultati e i tempi per ogni query standard eseguita, simili a:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Prendi nota del numero totale di secondi necessari per il completamento. Questo è il runtime di base.

5. Esegui con Serverless Premium e Lightning Engine

Poi, eseguirai lo stesso job Spark su Serverless per Apache Spark, ma utilizzando il livello Premium e abilitando il motore di query vettorializzato nativo di Google: Lightning Engine.

Invia il job di benchmark a Serverless con Lightning Engine abilitato in modo esplicito:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Confronta i risultati

Attendi il completamento del job ed esamina l'output. Dovresti vedere gli stessi risultati della query. Esamina attentamente il tempo di completamento:

...
All benchmark queries completed in 64.24 seconds.

Confrontando l'esecuzione di base di Serverless con l'esecuzione di Serverless Lightning Engine, noterai che Lightning Engine esegue i raggruppamenti, le aggregazioni e i join più rapidamente utilizzando un livello di esecuzione C++ nativo e l'elaborazione vettorializzata sul backend, senza richiedere modifiche al codice dell'applicazione PySpark.

Lightning Engine è ottimizzato per aumentare le prestazioni quanto più grande è il workload. In questo esempio utilizziamo un set di dati di piccole dimensioni, quindi l'aumento delle prestazioni non è così drastico come potrebbe essere. Su un set di dati da 10 TB, i benchmark hanno dimostrato un miglioramento delle prestazioni fino a 4,3 volte rispetto a Spark open source.

6. Confronta i grafici di esecuzione nell'interfaccia utente di Spark

La riduzione del runtime è impressionante, ma esaminiamo sotto il cofano cosa fa effettivamente Spark durante l'esecuzione della query. Puoi farlo esaminando i grafici di esecuzione dell'interfaccia utente di Spark per entrambi i job.

  1. Apri la console Google Cloud nel browser.
  2. Vai a Dataproc > Batch.
  3. Nell'elenco vedrai due batch: l'esecuzione di base standard e l'esecuzione del livello Premium.
  4. Fai clic sul batch del livello Premium che hai eseguito, poi su Visualizza interfaccia utente di Spark e infine su Visualizza dettagli.
  5. Nell'interfaccia utente di Spark, vai alla scheda Job.
  6. In Job completati, nella casella di ricerca, digita Velox.
  7. Vedrai molte descrizioni dei job che includono VeloxSparkPlanExecApi. Si riferisce al motore di esecuzione nativo di Velox utilizzato da Lightning Engine.

Ora, ripeti questa procedura per l'esecuzione del livello Standard:

  1. Torna alla pagina Batch di Serverless per Apache Spark.
  2. Fai clic sul link per il batch del livello Standard, poi su Visualizza interfaccia utente di Spark e infine su Visualizza dettagli.
  3. Nell'interfaccia utente di Spark, vai alla scheda Job.
  4. In Job completati, nella casella di ricerca, digita Velox.
  5. Non vedrai alcun riferimento all'API Velox nelle descrizioni dei job.

7. Libera spazio

Per evitare addebiti continui sul tuo account Google Cloud, elimina le risorse create durante questo codelab.

In Cloud Shell, elimina il bucket Cloud Storage e i relativi contenuti:

gcloud storage rm -r gs://${BUCKET_NAME}

Elimina la copia locale di benchmark.py:

rm benchmark.py

8. Complimenti

Complimenti! Hai creato correttamente un ambiente di benchmarking per Apache Spark e hai confrontato Serverless per Apache Spark Standard con Serverless per Apache Spark Premium.

Hai visto in prima persona come l'abilitazione del nuovo Lightning Engine di Serverless per Apache Spark può ridurre il runtime del workload Spark e hai esplorato l'interfaccia utente di Spark per vedere come il grafico di esecuzione fisica viene trasformato in codice C++ nativo utilizzando il motore di query nativo.

Che cosa hai imparato

  • Come scrivere uno script di benchmarking del set di dati PySpark.
  • Come inviare job Spark a Serverless per Apache Spark.
  • Come abilitare Lightning Engine.
  • Come confrontare i piani dei job nell'interfaccia utente di Spark.

Passaggi successivi