Spark mit Managed Service for Apache Spark und Lightning Engine beschleunigen

1. Einführung

In diesem Codelab erfahren Sie mehr über die Leistungssteigerungen, die durch die native Ausführungs-Engine Lightning Engine von Managed Service for Apache Spark erzielt werden.Außerdem wird erläutert, wie Ihre Spark-Arbeitslasten in Managed Apache Spark Serverless bis zu 4, 9-mal schneller optimiert werden.

Lightning Engine verwendet Velox und Apache Gluten. Velox ist eine leistungsstarke C++-Engine für die Datenverarbeitung. Apache Gluten ist eine Zwischenschicht, die für die Konvertierung von JVM-basierten Spark-Jobs in C++-Code verantwortlich ist, der von Velox ausgeführt werden kann.

In dieser Demo wird TPC-DS verwendet, eine branchenübliche Benchmark zur Bewertung der Leistung von Decision Support Systems. Sie senden einen PySpark-Baseline-Job, um ein TPC-DS-Beispieldataset mit dem Standard-Serverless-Tarif abzufragen. Anschließend führen Sie denselben Job mit dem Premium-Tarif und aktivierter Lightning Engine aus. Schließlich vergleichen Sie die Ausführungszeit und sehen sich die Spark-Benutzeroberfläche an, um den Unterschied in den Diagrammen für die hardwarebeschleunigte Spark-Ausführung zu visualisieren.

Die geschätzten Kosten für die Ausführung dieses Codelabs betragen weniger als 1,00 $, vorausgesetzt, die Ressourcen werden wie im Abschnitt Bereinigen beschrieben umgehend bereinigt.

Aufgaben

  • Cloud Storage-Bucket zum Speichern Ihrer Benchmark-Skripts und -Ergebnisse erstellen
  • Ausführen eines PySpark-Standardjobs zur Datenverarbeitung mit dem serverlosen Standard-Tier für verwaltetes Apache Spark
  • Diesen Job mit dem Managed Apache Spark-Serverless-Premium-Tarif mit Lightning Engine ausführen
  • Laufzeitmesswerte vergleichen
  • Starten Sie die Benutzeroberfläche des Spark-Verlaufsservers, um die nativen physischen Ausführungsgrafiken zu vergleichen.

Voraussetzungen

2. Hinweis

Google Cloud-Projekt erstellen

  1. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.
  2. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.

Cloud Shell starten

Cloud Shell ist eine Befehlszeilenumgebung, die in Google Cloud ausgeführt wird und mit den erforderlichen Tools vorinstalliert ist.

  1. Klicken Sie oben in der Google Cloud Console auf Cloud Shell aktivieren.
  2. Prüfen Sie nach der Verbindung mit Cloud Shell Ihre Authentifizierung:
    gcloud auth list
    
  3. Prüfen Sie, ob Ihr Projekt konfiguriert ist:
    gcloud config get project
    
  4. Wenn Ihr Projekt nicht wie erwartet festgelegt ist, legen Sie es fest:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

APIs aktivieren

Führen Sie diesen Befehl aus, um alle für dieses Codelab erforderlichen APIs zu aktivieren:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Umgebung vorbereiten

In diesem Schritt initialisieren Sie Umgebungsvariablen und erstellen einen Cloud Storage-Bucket. In diesem Bucket wird das PySpark-Script gespeichert, das Sie an beide Serverless for Apache Spark-Stufen senden.

Umgebungsvariablen festlegen

Führen Sie die folgenden Befehle in Cloud Shell aus, um Standardumgebungsvariablen festzulegen. Wir verwenden die Region us-central1. Sie können diese Einstellung jedoch ändern.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Cloud Storage-Bucket erstellen

Erstellen Sie den Bucket für Ihre Skripts und Logs:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

TPC-DS-Dataset in Ihren eigenen Bucket kopieren

In diesem Schritt kopieren Sie das TPC-DS-Dataset aus einem öffentlichen Bucket in Ihren eigenen Cloud Storage-Bucket. So können Ihre PySpark-Jobs Daten lokal aus Ihrem Projekt lesen.

Legen Sie Umgebungsvariablen fest, um die Dataset-Größe und den Dataset-Typ auszuwählen:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

Kopieren Sie die TPC-DS-Daten in Ihren eigenen Bucket:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

PySpark-Benchmark-Script erstellen

Wir verwenden ein PySpark-Skript, mit dem die standardmäßigen TPC-DS-Tabellen aus Ihrem Cloud Storage-Bucket registriert und fünf Standardabfragen aus dem öffentlichen Apache Spark-Repository ausgeführt werden. Das Skript akzeptiert den Pfad zu Ihrem Dataset als Argument.

Erstellen Sie in Cloud Shell eine Datei mit dem Namen benchmark.py. Sie können den folgenden Befehl kopieren und einfügen, um die Datei zu generieren:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Kopieren Sie das Skript in Ihren Cloud Storage-Bucket, damit Serverless for Apache Spark darauf zugreifen kann:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Serverlosen Referenzjob ausführen

Um einen Baseline-Vergleich ohne Lightning Engine zu ermöglichen, senden Sie den PySpark-Benchmarking-Job, den Sie zuvor hochgeladen haben, an die Standard-Stufe von Serverless for Apache Spark. Wir übergeben den Pfad zum kopierten Dataset als Argument.

Führen Sie den folgenden Befehl aus, um den Batchjob auszuführen:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

Job überwachen

Während der Job ausgeführt wird, werden PySpark-Logs in Ihrem Cloud Shell-Terminal gestreamt. Serverless for Apache Spark weist Container zu, liest den TPC-DS-Parquet-Datensatz aus Cloud Storage und führt die komplexen SQL-Pläne aus.

Sehen Sie sich nach Abschluss des Skripts die Konsolenausgabe an. Sie sollten Ergebnisse und Zeitangaben für jede ausgeführte Standardabfrage sehen, etwa so:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Notieren Sie sich die Gesamtdauer in Sekunden. Das ist deine Baseline-Laufzeit.

5. Mit Serverless Premium und Lightning Engine ausführen

Als Nächstes führen Sie denselben Spark-Job in Managed Apache Spark Serverless aus, aber mit dem Premium-Tarif und der nativen, vektorisierten Abfrage-Engine von Google: Lightning Engine.

Senden Sie den Benchmark-Job an Serverless mit explizit aktivierter Lightning Engine:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Ergebnisse vergleichen

Warten Sie, bis der Job abgeschlossen ist, und sehen Sie sich die Ausgabe an. Sie sollten dieselben Abfrageergebnisse sehen. Sehen Sie sich die Fertigstellungszeit genau an:

...
All benchmark queries completed in 64.24 seconds.

Wenn Sie den Baseline-Job mit dem Lightning Engine-Job vergleichen, werden Sie feststellen, dass Lightning Engine die Gruppierung, Aggregationen und Joins schneller ausführt. Dazu werden eine native C++-Ausführungsebene und die vektorisierte Verarbeitung im Backend verwendet, ohne dass Änderungen an Ihrem PySpark-Anwendungscode erforderlich sind.

Die Lightning Engine ist für eine höhere Leistung optimiert, je größer die Arbeitslast ist. In diesem Beispiel verwenden wir einen kleinen Datensatz. Die Leistungssteigerung ist daher nicht so deutlich wie möglich. Bei einem 10‑TB-Dataset wurde in Benchmarks eine bis zu 4, 9‑mal höhere Leistung im Vergleich zu Open-Source-Spark erzielt.

6. Ausführungsgrafiken in der Spark-UI vergleichen

Die Laufzeitverkürzung ist beeindruckend. Sehen wir uns aber unter der Haube an, was Spark während der Abfrageausführung tatsächlich tut. Dazu können Sie die Ausführungsgrafiken der Spark-UI für beide Jobs untersuchen.

  1. Öffnen Sie die Google Cloud Console in Ihrem Browser.
  2. Gehen Sie zu Verwaltetes Apache Spark > Batches.
  3. In der Liste werden zwei Batches angezeigt: Ihr Standard-Baseline-Lauf und Ihr Premium-Stufen-Lauf.
  4. Klicken Sie auf den Premium-Batch, den Sie ausgeführt haben, dann auf Spark-UI ansehen und anschließend auf Details ansehen.
  5. Rufen Sie in der Spark-UI den Tab Jobs auf.
  6. Geben Sie unter Abgeschlossene Aufträge im Suchfeld Velox ein.
  7. In vielen Stellenbeschreibungen wird VeloxSparkPlanExecApi erwähnt. Dies bezieht sich auf die native Velox-Ausführungs-Engine, die von Lightning Engine verwendet wird.

Wiederholen Sie diesen Vorgang nun für den Lauf auf Standard-Ebene:

  1. Kehren Sie zur Seite „Serverless for Apache Spark-Batches“ zurück.
  2. Klicken Sie auf den Link für den Batch Standard-Tarif, dann auf Spark-UI ansehen und anschließend auf Details ansehen.
  3. Rufen Sie in der Spark-UI den Tab Jobs auf.
  4. Geben Sie unter Abgeschlossene Aufträge im Suchfeld Velox ein.
  5. Die Velox API wird in den Jobbeschreibungen nicht erwähnt.

7. Bereinigen

Löschen Sie die in diesem Codelab erstellten Ressourcen, um laufende Gebühren für Ihr Google Cloud-Konto zu vermeiden.

Löschen Sie in Cloud Shell den Cloud Storage-Bucket und seinen Inhalt:

gcloud storage rm -r gs://${BUCKET_NAME}

Löschen Sie die lokale Kopie von benchmark.py:

rm benchmark.py

8. Glückwunsch

Glückwunsch! Sie haben erfolgreich eine Benchmarking-Umgebung für Apache Spark erstellt und Managed Apache Spark Serverless Standard mit Managed Apache Spark Serverless Premium verglichen.

Sie haben gesehen, wie sich die Laufzeit Ihrer Spark-Arbeitslast durch Aktivieren der neuen Lightning Engine von Managed Apache Spark Serverless verkürzen lässt. Außerdem haben Sie die Spark-UI verwendet, um zu sehen, wie der physische Ausführungsgraph mit der Native Query Engine in nativen C++-Code umgewandelt wird.

Das haben Sie gelernt

  • So schreiben Sie ein PySpark-Dataset-Benchmarking-Script.
  • Spark-Jobs an Managed Apache Spark Serverless senden
  • So aktivieren Sie die Lightning Engine.
  • So vergleichen Sie Job-Pläne in der Spark-UI.

Nächste Schritte