Przyspieszanie Sparka za pomocą Serverless for Apache Spark i Lightning Engine

1. Wprowadzenie

W tym ćwiczeniu poznasz korzyści z wydajności, jakie daje natywny mechanizm wykonawczy Google Cloud Serverless for Apache Spark, czyli Lightning Engine, oraz dowiesz się, jak optymalizuje on zbiory zadań Spark w usłudze Serverless for Apache Spark.

Lightning Engine korzysta z Velox i Apache Gluten. Velox to wysokowydajny mechanizm C++ do przetwarzania danych. Apache Gluten to warstwa pośrednia odpowiedzialna za konwertowanie zadań Spark opartych na JVM na kod C++, który może być wykonywany przez Velox.

W tej wersji demonstracyjnej używamy TPC-DS, czyli standardowego w branży testu porównawczego, który służy do oceny wydajności systemów wspomagania decyzji. Prześlesz podstawowe zadanie PySpark, aby wysłać zapytanie do przykładowego zbioru danych TPC-DS przy użyciu standardowej warstwy bezserwerowej. Następnie uruchomisz dokładnie to samo zadanie przy użyciu warstwy Premium z włączonym Lightning Engine. Na koniec porównasz czas wykonania i przejdziesz do interfejsu Spark, aby zobaczyć różnicę na wykresach wykonania Spark z akceleracją sprzętową.

Szacowany koszt wykonania tego ćwiczenia jest mniejszy niż 1 USD, przy założeniu, że zasoby zostaną szybko zwolnione zgodnie z opisem w sekcji Zwalnianie miejsca.

Jakie zadania wykonasz

  • Utworzysz zasobnik Cloud Storage do przechowywania skryptów i wyników testów porównawczych.
  • Uruchomisz podstawowe zadanie przetwarzania danych PySpark przy użyciu standardowej warstwy Serverless for Apache Spark.
  • Uruchomisz to samo zadanie przy użyciu warstwy Premium Serverless for Apache Spark z Lightning Engine.
  • Porównasz dane środowiska wykonawczego.
  • Uruchomisz interfejs serwera historii Spark, aby porównać natywne wykresy wykonania fizycznego.

Czego potrzebujesz

  • Przeglądarka internetowa, np. Chrome.
  • Projekt Google Cloud z włączonymi płatnościami.
  • Podstawowa znajomość Apache Spark i wiersza poleceń Linuxa.

2. Zanim zaczniesz

Utwórz projekt Google Cloud

  1. W konsoli Google Cloud na stronie wyboru projektu wybierz lub utwórz projekt w chmurze Google Cloud.
  2. Sprawdź, czy w projekcie w chmurze włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.

Uruchamianie Cloud Shell

Cloud Shell to środowisko wiersza poleceń działające w Google Cloud, które jest wstępnie załadowane niezbędnymi narzędziami.

  1. U góry konsoli Google Cloud kliknij Aktywuj Cloud Shell.
  2. Po połączeniu z Cloud Shell sprawdź uwierzytelnianie:
    gcloud auth list
    
  3. Potwierdź, że projekt jest skonfigurowany:
    gcloud config get project
    
  4. Jeśli projekt nie jest ustawiony zgodnie z oczekiwaniami, ustaw go:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Włącz interfejsy API

Aby włączyć wszystkie interfejsy API wymagane w tym ćwiczeniu, uruchom to polecenie:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Przygotowanie środowiska

W tym kroku zainicjujesz zmienne środowiskowe i utworzysz zasobnik Cloud Storage. Ten zasobnik będzie zawierać skrypt PySpark, który prześlesz do obu warstw Serverless for Apache Spark.

Ustawianie zmiennych środowiskowych

Aby ustawić domyślne zmienne środowiskowe, uruchom w Cloud Shell te polecenia. Użyjemy regionu us-central1, ale możesz go zmienić.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Tworzenie zasobnika Cloud Storage

Utwórz zasobnik do przechowywania skryptów i logów:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

Kopiowanie zbioru danych TPC-DS do własnego zasobnika

W tym kroku skopiujesz zbiór danych TPC-DS z publicznego zasobnika do własnego zasobnika Cloud Storage. Dzięki temu zadania PySpark będą mogły odczytywać dane lokalnie z Twojego projektu.

Ustaw zmienne środowiskowe, aby wybrać rozmiar i typ zbioru danych:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

Skopiuj dane TPC-DS do własnego zasobnika:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

Tworzenie skryptu testu porównawczego PySpark

Użyjemy skryptu PySpark, który rejestruje standardowe tabele TPC-DS z Twojego zasobnika Cloud Storage i wykonuje 5 standardowych zapytań pochodzących z publicznego repozytorium Apache Spark. Skrypt przyjmuje ścieżkę do zbioru danych jako argument.

W Cloud Shell utwórz plik o nazwie benchmark.py. Aby wygenerować plik, możesz skopiować i wkleić to polecenie:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Skopiuj skrypt do zasobnika Cloud Storage, aby usługa Serverless for Apache Spark mogła uzyskać do niego dostęp:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Uruchamianie podstawowego zadania bezserwerowego

Aby zapewnić podstawowe porównanie bez Lightning Engine, prześlij zadanie testu porównawczego PySpark, które zostało wcześniej przesłane do standardowej warstwy Serverless for Apache Spark. Jako argument przekażemy ścieżkę do skopiowanego zbioru danych.

Aby uruchomić zadanie wsadowe, uruchom następujące polecenie:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

Monitorowanie zadania

Podczas wykonywania zadania w terminalu Cloud Shell zobaczysz strumieniowe przesyłanie logów PySpark. Serverless for Apache Spark przydziela kontenery, odczytuje zbiór danych TPC-DS Parquet z Cloud Storage i wykonuje złożone plany SQL.

Po zakończeniu skryptu sprawdź dane wyjściowe w konsoli. Powinny się wyświetlić wyniki i czasy każdego wykonanego standardowego zapytania, podobnie jak w tym przykładzie:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Zanotuj łączny czas wykonania w sekundach. To jest podstawowy czas działania.

5. Uruchamianie z Serverless Premium i Lightning Engine

Następnie uruchomisz dokładnie to samo zadanie Spark w usłudze Serverless for Apache Spark, ale przy użyciu warstwy Premium i włączeniu natywnego, wektorowego mechanizmu zapytań Google: Lightning Engine.

Prześlij zadanie testu porównawczego do Serverless z wyraźnie włączonym Lightning Engine:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Porównywanie wyników

Poczekaj na zakończenie zadania i sprawdź dane wyjściowe. Powinny się wyświetlić te same wyniki zapytania. Przyjrzyj się dokładnie czasowi ukończenia:

...
All benchmark queries completed in 64.24 seconds.

Porównując podstawowe uruchomienie bezserwerowe z uruchomieniem bezserwerowym Lightning Engine, zauważysz, że Lightning Engine szybciej wykonuje grupowanie, agregacje i łączenia, korzystając z natywnej warstwy wykonawczej C++ i przetwarzania wektorowego w backendzie, bez konieczności wprowadzania jakichkolwiek zmian w kodzie aplikacji PySpark.

Lightning Engine jest zoptymalizowany pod kątem zwiększania wydajności wraz ze wzrostem obciążenia. W tym przykładzie używamy małego zbioru danych, więc wzrost wydajności nie jest tak duży, jak mógłby być. W przypadku zbioru danych o rozmiarze 10 TB testy porównawcze wykazały nawet 4, 3-krotny wzrost wydajności w porównaniu z Sparkiem typu open source.

6. Porównywanie wykresów wykonania w interfejsie Spark

Skrócenie czasu działania jest imponujące, ale przyjrzyjmy się pod maską , co Spark faktycznie robi podczas wykonywania zapytania. Możesz to zrobić, sprawdzając wykresy wykonania w interfejsie Spark dla obu zadań.

  1. W przeglądarce otwórz konsolę Google Cloud.
  2. Otwórz Dataproc > Grupy.
  3. Na liście zobaczysz 2 grupy: podstawowe uruchomienie standardowe i uruchomienie w warstwie Premium.
  4. Kliknij uruchomioną grupę w warstwie Premium, a potem kliknij kolejno Wyświetl interfejs usługi Spark i Wyświetl szczegóły.
  5. W interfejsie Spark otwórz kartę Zadania.
  6. W sekcji Ukończone zadania w polu wyszukiwania wpisz Velox.
  7. Zobaczysz wiele opisów zadań, które zawierają VeloxSparkPlanExecApi. Odwołuje się to do natywnego mechanizmu wykonawczego Velox używanego przez Lightning Engine.

Teraz powtórz ten proces w przypadku uruchomienia w warstwie Standard:

  1. Wróć na stronę Grupy w Serverless for Apache Spark.
  2. Kliknij link do grupy Standard, a potem kliknij kolejno Wyświetl interfejs usługi Spark i Wyświetl szczegóły.
  3. W interfejsie Spark otwórz kartę Zadania.
  4. W sekcji Ukończone zadania w polu wyszukiwania wpisz Velox.
  5. W opisach zadań nie zobaczysz żadnej wzmianki o interfejsie Velox API.

7. Zwalnianie miejsca

Aby uniknąć obciążenia konta Google Cloud bieżącymi opłatami, usuń zasoby utworzone podczas tego ćwiczenia.

W Cloud Shell usuń zasobnik Cloud Storage i jego zawartość:

gcloud storage rm -r gs://${BUCKET_NAME}

Usuń lokalną kopię pliku benchmark.py:

rm benchmark.py

8. Gratulacje

Gratulacje! Udało Ci się utworzyć środowisko testów porównawczych dla Apache Spark i porównać Serverless for Apache Spark Standard z Serverless for Apache Spark Premium.

Na własne oczy przekonałeś się, jak włączenie nowego Lightning Engine w Serverless for Apache Spark może skrócić czas działania zbioru zadań Spark, oraz sprawdziłeś interfejs Spark, aby zobaczyć, jak wykres wykonania fizycznego jest przekształcany w natywny kod C++ za pomocą natywnego mechanizmu zapytań.

Czego się dowiesz

  • Jak napisać skrypt testu porównawczego zbioru danych PySpark.
  • Jak przesyłać zadania Spark do Serverless for Apache Spark.
  • Jak włączyć Lightning Engine.
  • Jak porównywać plany zadań w interfejsie Spark.

Dalsze kroki