Serverless for Apache Spark ve Lightning Engine ile Spark'ı hızlandırma

1. Giriş

Bu codelab'de, Google Cloud Serverless for Apache Spark'ın yerel yürütme motoru Lightning Engine'in performans avantajlarını keşfedecek ve Serverless for Apache Spark'taki Spark iş yüklerinizi nasıl optimize ettiğini inceleyeceksiniz.

Lightning Engine, Velox ve Apache Gluten'i kullanır. Velox, veri işleme için yüksek performanslı bir C++ motorudur. Apache Gluten, JVM tabanlı Spark işlerini Velox tarafından yürütülebilen C++ koduna dönüştürmekten sorumlu bir ara katmandır.

Bu demoda, karar destek sistemlerinin performansını değerlendirmek için tasarlanmış bir endüstri standardı kıyaslama olan TPC-DS kullanılır. Standart sunucusuz katmanını kullanarak örnek bir TPC-DS veri kümesini sorgulamak için temel bir PySpark işi göndereceksiniz. Ardından, Lightning Engine'in etkin olduğu Premium katmanı kullanarak aynı işi çalıştıracaksınız. Son olarak, yürütme süresini karşılaştıracak ve donanım hızlandırmalı Spark yürütme grafiklerindeki farkı görselleştirmek için Spark kullanıcı arayüzünü inceleyeceksiniz.

Kaynakların Temizleme bölümünde açıklandığı gibi zamanında temizlendiği varsayıldığında bu codelab'i çalıştırmanın tahmini maliyeti 1, 00 ABD dolarından azdır.

Yapacaklarınız

  • Karşılaştırma komut dosyalarınızı ve sonuçlarınızı depolamak için bir Cloud Storage paketi oluşturun.
  • Apache Spark için Sunucusuz Standart katmanını kullanarak temel bir PySpark veri işleme işi yürütme
  • Aynı işi Lightning Engine ile Serverless for Apache Spark Premium katmanını kullanarak yürütme
  • Çalışma zamanı metriklerini karşılaştırma
  • Yerel fiziksel yürütme grafiklerini karşılaştırmak için Spark History Server kullanıcı arayüzünü başlatın.

İhtiyacınız olanlar

2. Başlamadan önce

Google Cloud projesi oluşturma

  1. Google Cloud Console'daki proje seçici sayfasında bir Google Cloud projesi seçin veya oluşturun.
  2. Cloud projeniz için faturalandırmanın etkinleştirildiğinden emin olun. Bir projede faturalandırmanın etkin olup olmadığını kontrol etmeyi öğrenin.

Cloud Shell'i Başlatma

Cloud Shell, Google Cloud'da çalışan ve gerekli araçların önceden yüklendiği bir komut satırı ortamıdır.

  1. Google Cloud Console'un üst kısmından Cloud Shell'i etkinleştir'i tıklayın.
  2. Cloud Shell'e bağlandıktan sonra kimlik doğrulamanızı onaylayın:
    gcloud auth list
    
  3. Projenizin yapılandırıldığını onaylayın:
    gcloud config get project
    
  4. Projeniz beklendiği gibi ayarlanmamışsa şu şekilde ayarlayın:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

API'leri etkinleştir

Bu codelab için gerekli tüm API'leri etkinleştirmek üzere şu komutu çalıştırın:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Ortamınızı Hazırlama

Bu adımda, ortam değişkenlerini başlatacak ve bir Cloud Storage paketi oluşturacaksınız. Bu paket, Apache Spark için sunucusuz katmanların her ikisine de gönderdiğiniz PySpark komut dosyasını içerir.

Ortam değişkenlerini ayarlama

Varsayılan ortam değişkenlerini ayarlamak için Cloud Shell'de aşağıdaki komutları çalıştırın. us-central1 bölgesi kullanılır ancak isterseniz bunu değiştirebilirsiniz.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Cloud Storage paketi oluşturma

Komut dosyalarınızı ve günlüklerinizi barındıracak paketi oluşturun:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

TPC-DS veri kümesini kendi paketinize kopyalama

Bu adımda, TPC-DS veri kümesini herkese açık bir paketten kendi Cloud Storage paketinize kopyalayacaksınız. Bu sayede PySpark işleriniz, projenizdeki verileri yerel olarak okuyabilir.

Veri kümesi boyutunu ve türünü seçmek için ortam değişkenlerini ayarlayın:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

TPC-DS verilerini kendi paketinize kopyalayın:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

PySpark Benchmark komut dosyasını oluşturma

Cloud Storage paketinize standart TPC-DS tablolarını kaydeden ve Apache Spark herkese açık deposundan alınan 5 standart sorguyu yürüten bir PySpark komut dosyası kullanacağız. Komut dosyası, veri kümenizin yolunu bağımsız değişken olarak kabul eder.

Cloud Shell'de benchmark.py adlı bir dosya oluşturun. Dosyayı oluşturmak için aşağıdaki komutu kopyalayıp yapıştırabilirsiniz:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Serverless for Apache Spark'ın erişebilmesi için komut dosyasını Cloud Storage paketinize kopyalayın:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Temel Sunucusuz İş'i çalıştırma

Lightning Engine olmadan temel bir karşılaştırma sağlamak için daha önce yüklediğiniz PySpark karşılaştırma işini Apache Spark Standard katmanı için sunucusuz'a gönderin. Kopyaladığınız veri kümesinin yolunu bağımsız değişken olarak iletiriz.

Toplu işi yürütmek için aşağıdaki komutu çalıştırın:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

İşi izleme

İş yürütülürken Cloud Shell terminalinizde PySpark günlüklerinin yayınlandığını görürsünüz. Apache Spark için sunucusuz, kapsayıcıları ayırıyor, TPC-DS Parquet veri kümesini Cloud Storage'dan okuyor ve karmaşık SQL planlarını yürütüyor.

Komut dosyası tamamlandıktan sonra konsol çıkışını inceleyin. Şuna benzer şekilde, yürütülen her standart sorgu için sonuçlar ve zamanlamalar görmeniz gerekir:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Tamamlanması için geçen toplam süreyi not edin. Bu, temel çalışma zamanınızdır.

5. Serverless Premium ve Lightning Engine ile çalıştırma

Ardından, Apache Spark için sunucusuz'da aynı Spark işini çalıştıracaksınız ancak Premium katmanını kullanacak ve Google'ın yerel, vektörel sorgu motoru Lightning Engine'i etkinleştireceksiniz.

Lightning Engine açıkça etkinleştirilmiş şekilde karşılaştırma işini Sunucusuz'a gönderin:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Sonuçları Karşılaştırma

İşin tamamlanmasını bekleyin ve çıkışı inceleyin. Aynı sorgu sonuçlarını görmeniz gerekir. Tamamlanma süresine dikkat edin:

...
All benchmark queries completed in 64.24 seconds.

Temel sunucusuz çalıştırma ile Sunucusuz Lightning Engine çalıştırmasını karşılaştırdığınızda, Lightning Engine'in arka uçta yerel bir C++ yürütme katmanı ve vektörel işleme kullanarak gruplandırma, toplama ve birleştirme işlemlerini daha hızlı yürüttüğünü fark edeceksiniz. Bu işlemler için PySpark uygulama kodunuzda herhangi bir değişiklik yapmanız gerekmez.

Lightning Engine, iş yükü arttıkça performansı artıracak şekilde optimize edilmiştir. Bu örnekte küçük bir veri kümesi kullandığımız için performans artışı, olabileceği kadar belirgin değildir. 10 TB'lık bir veri kümesinde, açık kaynaklı Spark'a kıyasla 4,3 kat daha iyi performans sağladığı karşılaştırma testlerinde gösterilmiştir.

6. Spark kullanıcı arayüzünde yürütme grafiklerini karşılaştırma

Çalışma süresindeki azalma etkileyici olsa da Spark'ın sorgu yürütme sırasında aslında ne yaptığına yakından bakalım. Bunu, her iki iş için Spark kullanıcı arayüzü yürütme grafiklerini inceleyerek yapabilirsiniz.

  1. Tarayıcınızda Google Cloud Console'u açın.
  2. Dataproc > Batches'e (Dataproc > Toplu İşler) gidin.
  3. Listede iki grup görürsünüz: standart temel çalıştırmanız ve Premium katman çalıştırmanız.
  4. Çalıştırdığınız Premium katmanlı grubu, ardından View Spark UI (Spark kullanıcı arayüzünü görüntüle) ve View Details'ı (Ayrıntıları görüntüle) tıklayın.
  5. Spark kullanıcı arayüzünde Jobs (İşler) sekmesine gidin.
  6. Tamamlanan İşler bölümündeki arama kutusuna Velox yazın.
  7. VeloxSparkPlanExecApi simgesini içeren birçok iş tanımı görürsünüz. Bu, Lightning Engine tarafından kullanılan Velox yerel yürütme motorunu ifade eder.

Şimdi bu işlemi Standart katman çalıştırması için tekrarlayın:

  1. Serverless for Apache Spark Batches sayfasına geri dönün.
  2. Standart katman toplu işinin bağlantısını, ardından View Spark UI (Spark kullanıcı arayüzünü görüntüle) ve View Details'ı (Ayrıntıları göster) tıklayın.
  3. Spark kullanıcı arayüzünde Jobs (İşler) sekmesine gidin.
  4. Tamamlanan İşler bölümündeki arama kutusuna Velox yazın.
  5. İş açıklamalarında Velox API'den bahsedilmez.

7. Temizleme

Google Cloud hesabınızın sürekli olarak ücretlendirilmesini önlemek için bu codelab sırasında oluşturulan kaynakları silin.

Cloud Shell'de Cloud Storage paketini ve içeriğini silin:

gcloud storage rm -r gs://${BUCKET_NAME}

benchmark.py uygulamasının yerel kopyasını silme:

rm benchmark.py

8. Tebrikler

Tebrikler! Apache Spark için başarılı bir şekilde karşılaştırma testi ortamı oluşturdunuz ve Apache Spark Standard için sunucusuz ile Apache Spark Premium için sunucusuz'u karşılaştırdınız.

Apache Spark'ın yeni Lightning Engine'i için sunucusuz özelliği etkinleştirmenin Spark iş yükünüzün çalışma zamanını nasıl azaltabileceğini ilk elden gördünüz ve fiziksel yürütme grafiğinin Native Query Engine kullanılarak yerel C++ koduna nasıl dönüştürüldüğünü görmek için Spark kullanıcı arayüzünü incelediniz.

Öğrendikleriniz

  • PySpark veri kümesi karşılaştırma komut dosyası yazma
  • Apache Spark için Sunucusuz'a Spark işleri gönderme
  • Lightning Engine'i etkinleştirme
  • Spark kullanıcı arayüzünde iş planlarını karşılaştırma

Sonraki adımlar