เร่งความเร็ว Spark ด้วย Serverless สำหรับ Apache Spark และ Lightning Engine

1. บทนำ

ใน Codelab นี้ คุณจะได้สำรวจประโยชน์ด้านประสิทธิภาพของเครื่องมือดำเนินการดั้งเดิมของ Google Cloud Serverless สำหรับ Apache Spark ซึ่งก็คือ Lightning Engine และดูวิธีที่เครื่องมือนี้เพิ่มประสิทธิภาพเวิร์กโหลด Spark ใน Serverless สำหรับ Apache Spark

Lightning Engine ใช้ Velox และ Apache Gluten Velox เป็นเครื่องมือ C++ ประสิทธิภาพสูงสำหรับการประมวลผลข้อมูล Apache Gluten เป็นเลเยอร์กลางที่รับผิดชอบในการแปลงงาน Spark ที่ใช้ JVM เป็นโค้ด C++ ที่ Velox สามารถเรียกใช้ได้

การสาธิตนี้ใช้ TPC-DS ซึ่งเป็นเกณฑ์มาตรฐานระดับอุตสาหกรรมที่ออกแบบมาเพื่อประเมินประสิทธิภาพของระบบสนับสนุนการตัดสินใจ คุณจะส่งงาน PySpark พื้นฐานเพื่อค้นหาชุดข้อมูล TPC-DS ตัวอย่างโดยใช้ระดับ Standard Serverless จากนั้นคุณจะเรียกใช้ชิ้นงานเดียวกันโดยใช้ระดับพรีเมียมที่เปิดใช้ Lightning Engine สุดท้าย คุณจะเปรียบเทียบเวลาในการดำเนินการและเจาะลึก UI ของ Spark เพื่อแสดงภาพความแตกต่างในกราฟการดำเนินการ Spark ที่เร่งด้วยฮาร์ดแวร์

ค่าใช้จ่ายโดยประมาณในการเรียกใช้ Codelab นี้จะน้อยกว่า $1.00 USD โดยสมมติว่ามีการล้างข้อมูลทรัพยากรทันทีตามที่อธิบายไว้ในส่วนล้างข้อมูล

สิ่งที่คุณต้องดำเนินการ

  • สร้าง Bucket ของ Cloud Storage เพื่อจัดเก็บสคริปต์และผลลัพธ์การทดสอบประสิทธิภาพ
  • เรียกใช้งานการประมวลผลข้อมูล PySpark พื้นฐานโดยใช้ระดับมาตรฐานของ Serverless สำหรับ Apache Spark
  • เรียกใช้งานเดียวกันโดยใช้ Serverless สำหรับ Apache Spark ระดับพรีเมียมด้วย Lightning Engine
  • เปรียบเทียบเมตริกเวลาทำงาน
  • เปิด UI ของ Spark History Server เพื่อเปรียบเทียบกราฟการดำเนินการจริงแบบเนทีฟ

สิ่งที่คุณต้องมี

  • เว็บเบราว์เซอร์ เช่น Chrome
  • โปรเจ็กต์ Google Cloud ที่เปิดใช้การเรียกเก็บเงิน
  • มีความคุ้นเคยพื้นฐานกับ Apache Spark และบรรทัดคำสั่ง Linux

2. ก่อนเริ่มต้น

สร้างโปรเจ็กต์ Google Cloud

  1. ในคอนโซล Google Cloud ในหน้าตัวเลือกโปรเจ็กต์ ให้เลือกหรือสร้างโปรเจ็กต์ Google Cloud
  2. ตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินสำหรับโปรเจ็กต์ Cloud แล้ว ดูวิธีตรวจสอบว่าได้เปิดใช้การเรียกเก็บเงินในโปรเจ็กต์แล้วหรือไม่

เริ่มต้น Cloud Shell

Cloud Shell คือสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานใน Google Cloud ซึ่งโหลดเครื่องมือที่จำเป็นไว้ล่วงหน้า

  1. คลิกเปิดใช้งาน Cloud Shell ที่ด้านบนของคอนโซล Google Cloud
  2. เมื่อเชื่อมต่อกับ Cloud Shell แล้ว ให้ยืนยันการตรวจสอบสิทธิ์โดยทำดังนี้
    gcloud auth list
    
  3. ตรวจสอบว่าได้กำหนดค่าโปรเจ็กต์แล้ว
    gcloud config get project
    
  4. หากไม่ได้ตั้งค่าโปรเจ็กต์ตามที่คาดไว้ ให้ตั้งค่าดังนี้
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

เปิดใช้ API

เรียกใช้คำสั่งนี้เพื่อเปิดใช้ API ที่จำเป็นทั้งหมดสำหรับ Codelab นี้

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. เตรียมสภาพแวดล้อม

ในขั้นตอนนี้ คุณจะเริ่มต้นตัวแปรสภาพแวดล้อมและสร้างที่เก็บข้อมูล Cloud Storage ที่เก็บข้อมูลนี้จะเก็บสคริปต์ PySpark ที่คุณส่งไปยังทั้ง 2 ระดับของ Serverless สำหรับ Apache Spark

ตั้งค่าตัวแปรสภาพแวดล้อม

เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อตั้งค่าตัวแปรสภาพแวดล้อมเริ่มต้น เราจะใช้ภูมิภาคus-central1 แต่คุณสามารถเปลี่ยนได้หากต้องการ

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

สร้าง Bucket ของ Cloud Storage

สร้าง Bucket เพื่อเก็บสคริปต์และบันทึก

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

คัดลอกชุดข้อมูล TPC-DS ไปยัง Bucket ของคุณเอง

ในขั้นตอนนี้ คุณจะคัดลอกชุดข้อมูล TPC-DS จาก Bucket สาธารณะไปยัง Bucket ของ Cloud Storage ของคุณเอง ซึ่งจะช่วยให้งาน PySpark อ่านข้อมูลจากโปรเจ็กต์ในเครื่องได้

ตั้งค่าตัวแปรสภาพแวดล้อมเพื่อเลือกขนาดและประเภทชุดข้อมูล

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

คัดลอกข้อมูล TPC-DS ลงใน Bucket ของคุณเองโดยทำดังนี้

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

สร้างสคริปต์การทดสอบประสิทธิภาพ PySpark

เราจะใช้สคริปต์ PySpark ที่ลงทะเบียนตาราง TPC-DS มาตรฐานจากที่เก็บข้อมูล Cloud Storage และเรียกใช้การค้นหามาตรฐาน 5 รายการที่มาจากที่เก็บข้อมูลสาธารณะของ Apache Spark สคริปต์จะยอมรับเส้นทางไปยังชุดข้อมูลเป็นอาร์กิวเมนต์

สร้างไฟล์ชื่อ benchmark.py ใน Cloud Shell คุณสามารถคัดลอกและวางคำสั่งต่อไปนี้เพื่อสร้างไฟล์

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

คัดลอกสคริปต์ไปยัง Bucket ของ Cloud Storage เพื่อให้ Serverless สำหรับ Apache Spark เข้าถึงได้

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. เรียกใช้ Baseline Serverless Job

หากต้องการเปรียบเทียบพื้นฐานโดยไม่มี Lightning Engine ให้ส่งงานการเปรียบเทียบ PySpark ที่คุณอัปโหลดไว้ก่อนหน้านี้ไปยังระดับมาตรฐานแบบ Serverless สำหรับ Apache Spark เราจะส่งเส้นทางไปยังชุดข้อมูลที่คุณคัดลอกเป็นอาร์กิวเมนต์

เรียกใช้คำสั่งต่อไปนี้เพื่อเรียกใช้ชื่องานแบบกลุ่ม

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

ตรวจสอบงาน

ขณะที่งานกำลังดำเนินการ คุณจะเห็นบันทึก PySpark สตรีมในเทอร์มินัล Cloud Shell Serverless สำหรับ Apache Spark จะจัดสรรคอนเทนเนอร์ อ่านชุดข้อมูล TPC-DS Parquet จาก Cloud Storage และเรียกใช้แผน SQL ที่ซับซ้อน

เมื่อสคริปต์ทำงานเสร็จแล้ว ให้สังเกตเอาต์พุตของคอนโซล คุณควรเห็นผลลัพธ์และเวลาที่ใช้สำหรับคำค้นหามาตรฐานที่ดำเนินการแต่ละรายการ ซึ่งคล้ายกับตัวอย่างต่อไปนี้

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

จดบันทึกวินาทีทั้งหมดที่ใช้ในการดำเนินการให้เสร็จสมบูรณ์ นี่คือรันไทม์พื้นฐาน

5. เรียกใช้ด้วย Serverless Premium และ Lightning Engine

จากนั้นคุณจะเรียกใช้งาน Spark Job เดียวกันใน Serverless สำหรับ Apache Spark แต่จะใช้ระดับพรีเมียมและเปิดใช้เครื่องมือค้นหาแบบเวกเตอร์ดั้งเดิมของ Google ซึ่งก็คือ Lightning Engine

ส่งงานการวัดประสิทธิภาพไปยัง Serverless โดยเปิดใช้ Lightning Engine อย่างชัดเจน

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

เปรียบเทียบผลลัพธ์

รอให้งานเสร็จสมบูรณ์แล้วตรวจสอบเอาต์พุต คุณควรเห็นผลการค้นหาเดียวกัน ดูเวลาที่ดำเนินการเสร็จสมบูรณ์อย่างละเอียด

...
All benchmark queries completed in 64.24 seconds.

เมื่อเปรียบเทียบการเรียกใช้แบบ Serverless พื้นฐานกับการเรียกใช้ Lightning Engine แบบ Serverless คุณจะเห็นว่า Lightning Engine ดำเนินการจัดกลุ่ม การรวมข้อมูล และการรวมได้เร็วขึ้นโดยใช้เลเยอร์การดำเนินการ C++ ดั้งเดิมและการประมวลผลแบบเวกเตอร์ในแบ็กเอนด์ โดยไม่ต้องเปลี่ยนแปลงโค้ดของแอปพลิเคชัน PySpark เลย

Lightning Engine ได้รับการเพิ่มประสิทธิภาพเพื่อให้มีประสิทธิภาพสูงขึ้นเมื่อปริมาณงานมากขึ้น ในตัวอย่างนี้ เราใช้ชุดข้อมูลขนาดเล็ก ดังนั้นประสิทธิภาพที่เพิ่มขึ้นจึงไม่มากเท่าที่ควร ในการทดสอบเปรียบเทียบพบว่าชุดข้อมูลขนาด 10 TB มีการปรับปรุงประสิทธิภาพสูงสุด 4.3 เท่าเมื่อเทียบกับ Spark แบบโอเพนซอร์ส

6. เปรียบเทียบกราฟการดำเนินการใน Spark UI

การลดเวลาการทำงานนั้นน่าประทับใจ แต่เรามาดูเบื้องหลังสิ่งที่ Spark ทำจริงๆ ระหว่างการดำเนินการค้นหากัน คุณทำได้โดยตรวจสอบกราฟการดำเนินการของ Spark UI สำหรับทั้ง 2 งาน

  1. เปิด คอนโซล Google Cloud ในเบราว์เซอร์
  2. ไปที่ Dataproc > แบตช์
  3. คุณจะเห็น 2 ชุดในรายการ ได้แก่ การเรียกใช้ค่าพื้นฐานมาตรฐานและการเรียกใช้ระดับพรีเมียม
  4. คลิกชุดงานระดับพรีเมียมที่คุณเรียกใช้ จากนั้นคลิกดู UI ของ Spark แล้วคลิกดูรายละเอียด
  5. ใน UI ของ Spark ให้ไปที่แท็บ Jobs
  6. ในช่องค้นหาใต้งานที่เสร็จสมบูรณ์ ให้พิมพ์ Velox
  7. คุณจะเห็นคำบรรยายลักษณะงานจำนวนมากที่มี VeloxSparkPlanExecApi ซึ่งหมายถึง Lightning Engine ใช้เครื่องมือดำเนินการดั้งเดิมของ Velox

ตอนนี้ให้ทำกระบวนการนี้ซ้ำสำหรับการเรียกใช้ระดับ Standard ดังนี้

  1. กลับไปที่หน้า Serverless สำหรับ Apache Spark Batches
  2. คลิกลิงก์สำหรับกลุ่ม Standard Tier แล้วคลิกดู UI ของ Spark จากนั้นคลิกดูรายละเอียด
  3. ใน UI ของ Spark ให้ไปที่แท็บ Jobs
  4. ในช่องค้นหาใต้งานที่เสร็จสมบูรณ์ ให้พิมพ์ Velox
  5. คุณจะไม่เห็นการกล่าวถึง Velox API ในคำอธิบายงาน

7. ล้างข้อมูล

โปรดลบทรัพยากรที่สร้างขึ้นระหว่างการทำ Codelab นี้เพื่อหลีกเลี่ยงการเรียกเก็บเงินอย่างต่อเนื่องในบัญชี Google Cloud

ใน Cloud Shell ให้ลบที่เก็บข้อมูล Cloud Storage และเนื้อหาของที่เก็บข้อมูลโดยทำดังนี้

gcloud storage rm -r gs://${BUCKET_NAME}

วิธีลบสำเนาของ benchmark.py ในเครื่อง

rm benchmark.py

8. ขอแสดงความยินดี

ยินดีด้วย คุณสร้างสภาพแวดล้อมการเปรียบเทียบสำหรับ Apache Spark และเปรียบเทียบ Serverless สำหรับ Apache Spark Standard กับ Serverless สำหรับ Apache Spark Premium ได้สำเร็จแล้ว

คุณได้เห็นด้วยตาตนเองว่าการเปิดใช้ Serverless สำหรับ Lightning Engine ใหม่ของ Apache Spark ช่วยลดรันไทม์ของภาระงาน Spark ได้อย่างไร และคุณได้สำรวจ UI ของ Spark เพื่อดูว่ากราฟการดำเนินการจริงเปลี่ยนเป็นโค้ด C++ ดั้งเดิมโดยใช้ Native Query Engine ได้อย่างไร

สิ่งที่คุณได้เรียนรู้

  • วิธีเขียนสคริปต์การเปรียบเทียบประสิทธิภาพชุดข้อมูล PySpark
  • วิธีส่งงาน Spark ไปยัง Serverless สำหรับ Apache Spark
  • วิธีเปิดใช้ Lightning Engine
  • วิธีเปรียบเทียบแผนงานใน UI ของ Spark

ขั้นตอนถัดไป