מאיצים את Spark באמצעות Serverless for Apache Spark ו-Lightning Engine

1. מבוא

ב-codelab הזה נסביר על היתרונות של מנוע הביצוע המקורי של Google Cloud Serverless ל-Apache Spark,‏ Lightning Engine, ונבדוק איך הוא משפר את עומסי העבודה של Spark ב-Serverless ל-Apache Spark.

‫Lightning Engine משתמש ב-Velox וב-Apache Gluten. ‫Velox הוא מנוע C++‎ עתיר ביצועים לעיבוד נתונים. ‫Apache Gluten היא שכבת ביניים שאחראית להמרה של משימות Spark מבוססות JVM לקוד C++‎ שאפשר להריץ באמצעות Velox.

בהדגמה הזו נעשה שימוש ב-TPC-DS, מדד השוואה (benchmark) שהוא תקן בתעשייה ומיועד להערכת הביצועים של מערכות לתמיכה בהחלטות. תשלחו משימת PySpark בסיסית כדי לשלוח שאילתה למדגם של מערך נתונים של TPC-DS באמצעות רמת Standard Serverless. לאחר מכן, מריצים את אותה המשימה בדיוק באמצעות מסלול הפרימיום עם Lightning Engine מופעל. לבסוף, תשוו את זמן הביצוע ותעמיקו בממשק המשתמש של Spark כדי להמחיש את ההבדל בתרשימי הביצוע של Spark עם האצת חומרה.

העלות המשוערת של הפעלת ה-codelab הזה היא פחות מ-1.00$‎, בהנחה שהמשאבים ינוקו מיד כמו שמתואר בקטע ניקוי.

הפעולות שתבצעו:

  • יוצרים קטגוריה ב-Cloud Storage כדי לאחסן את הסקריפטים והתוצאות של הבנצ'מרק.
  • הפעלה של משימת עיבוד נתונים בסיסית ב-PySpark באמצעות רמת השירות Standard של Serverless for Apache Spark
  • הפעלת אותה משימה באמצעות מסלול פרימיום של Serverless for Apache Spark עם Lightning Engine
  • השוואה בין מדדי זמן הריצה
  • הפעלת ממשק המשתמש של Spark History Server כדי להשוות בין תרשימי הביצוע הפיזי המקוריים

הדרישות

‫2. לפני שמתחילים

יצירת פרויקט ב-Google Cloud

  1. במסוף Google Cloud, בדף לבחירת הפרויקט, בוחרים פרויקט ב-Google Cloud או יוצרים פרויקט.
  2. הקפידו לוודא שהחיוב מופעל בפרויקט שלכם ב-Cloud. כך בודקים אם החיוב מופעל בפרויקט

הפעלת Cloud Shell

Cloud Shell היא סביבת שורת פקודה שפועלת ב-Google Cloud ומגיעה עם כלים שנדרשים לשימוש.

  1. לוחצים על Activate Cloud Shell בחלק העליון של מסוף Google Cloud.
  2. אחרי שמתחברים ל-Cloud Shell, מאמתים את האימות:
    gcloud auth list
    
  3. מוודאים שהפרויקט מוגדר:
    gcloud config get project
    
  4. אם הפרויקט לא מוגדר כמו שציפיתם, מגדירים אותו:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

הפעלת ממשקי ה-API

מריצים את הפקודה הזו כדי להפעיל את כל ממשקי ה-API הנדרשים ל-codelab הזה:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. הכנת הסביבה

בשלב הזה מאתחלים משתני סביבה ויוצרים קטגוריה של Cloud Storage. ה-bucket הזה יכיל את סקריפט ה-PySpark שאתם שולחים לשתי הרמות של Serverless for Apache Spark.

הגדרה של משתני סביבה

מריצים את הפקודות הבאות ב-Cloud Shell כדי להגדיר משתני סביבה כברירת מחדל. אנחנו נשתמש באזור us-central1, אבל אפשר לשנות את זה אם רוצים.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

יצירת קטגוריה של Cloud Storage

יוצרים את הקטגוריה לאחסון הסקריפטים והיומנים:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

העתקת מערך הנתונים TPC-DS לקטגוריה משלכם

בשלב הזה, מעתיקים את מערך הנתונים TPC-DS מקטגוריה ציבורית לקטגוריה של Cloud Storage משלכם. כך תוכלו לוודא שעבודות PySpark יוכלו לקרוא נתונים באופן מקומי מהפרויקט.

מגדירים משתני סביבה כדי לבחור את הגודל והסוג של מערך הנתונים:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

מעתיקים את הנתונים של TPC-DS לקטגוריה משלכם:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

יצירת סקריפט PySpark להשוואה

אנחנו נשתמש בסקריפט PySpark שרושם את טבלאות TPC-DS הרגילות מהקטגוריה שלכם ב-Cloud Storage ומבצע 5 שאילתות רגילות שמקורן במאגר הציבורי של Apache Spark. הסקריפט מקבל את הנתיב למערך הנתונים כארגומנט.

יוצרים קובץ בשם benchmark.py ב-Cloud Shell. כדי ליצור את הקובץ, מעתיקים ומדביקים את הפקודה הבאה:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

מעתיקים את הסקריפט לקטגוריה של Cloud Storage כדי ש-Serverless for Apache Spark יוכל לגשת אליו:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. הרצת משימה ראשונית ללא שרת

כדי לספק השוואה בסיסית בלי Lightning Engine, שולחים את משימת ההשוואה של PySpark שהעליתם קודם למסלול הרגיל של Serverless for Apache Spark. נעביר את הנתיב למערך הנתונים שהעתקתם כארגומנט.

מריצים את הפקודה הבאה כדי להפעיל את משימת האצווה:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

מעקב אחרי המשימה

במהלך ההרצה של המשימה, יומני PySpark יוזרמו לטרמינל של Cloud Shell. ‫Serverless for Apache Spark מקצה קונטיינרים, קורא את מערך הנתונים של TPC-DS Parquet מ-Cloud Storage ומבצע את תוכניות ה-SQL המורכבות.

אחרי שהסקריפט יסיים לפעול, בודקים את הפלט של המסוף. אמורים להופיע תוצאות וזמנים לכל שאילתה רגילה שהופעלה, כמו בדוגמה הבאה:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

שימו לב למשך הזמן הכולל בשניות שנדרש להשלמת הפעולה. זהו זמן הריצה הבסיסי.

5. Run with Serverless Premium and Lightning Engine

לאחר מכן, תריצו את אותה משימת Spark בדיוק ב-Serverless for Apache Spark, אבל באמצעות רמת Premium והפעלת מנוע השאילתות המקורי והווקטורי של Google: Lightning Engine.

שולחים את משימת הבנצ'מרק ל-Serverless עם הפעלה מפורשת של Lightning Engine:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

השוואה בין התוצאות

מחכים שהעבודה תסתיים ובודקים את הפלט. אמורות להופיע אותן תוצאות של השאילתה. בודקים את זמן ההשלמה:

...
All benchmark queries completed in 64.24 seconds.

אם משווים בין הפעלה בסיסית של Serverless לבין הפעלה של Serverless Lightning Engine, אפשר לראות ש-Lightning Engine מבצע את הקיבוץ, צבירת הנתונים והאיחוד מהר יותר. זאת בזכות שימוש בשכבת ביצוע מקורית של C++ ועיבוד וקטורי בבק-אנד, בלי שנדרשים שינויים בקוד אפליקציה של אפליקציית PySpark.

מנוע Lightning מותאם לשיפור הביצועים ככל שעומס העבודה גדול יותר. בדוגמה הזו אנחנו משתמשים במערך נתונים קטן, ולכן השיפור בביצועים לא דרמטי כמו שהוא יכול להיות. במחקר השוואתי נמצא שיפור של עד 4.3x בביצועים בהשוואה ל-Spark בקוד פתוח, בקבוצת נתונים של 10TB.

6. השוואה בין תרשימי ביצוע בממשק המשתמש של Spark

הקיצור של זמן הריצה מרשים, אבל בואו נבדוק מה קורה מאחורי הקלעים בזמן ש-Spark מריץ את השאילתה. כדי לעשות את זה, בודקים את תרשימי הביצוע בממשק המשתמש של Spark בשני הג'ובים.

  1. פותחים את מסוף Google Cloud בדפדפן.
  2. עוברים אל Dataproc > Batches.
  3. ברשימה יוצגו שתי קבוצות: הריצה של קו הבסיס הרגיל והריצה של מסלול פרימיום.
  4. לוחצים על חבילת הפרימיום שהפעלתם, ואז לוחצים על View Spark UI ואז על View Details.
  5. בממשק המשתמש של Spark, עוברים לכרטיסייה Jobs (משימות).
  6. בקטע משימות שהושלמו, בתיבת החיפוש, מקלידים Velox.
  7. תראו הרבה תיאורי משרות שכוללים את VeloxSparkPlanExecApi. הכוונה היא למנוע ההפעלה המקורי של Velox שמשמש את Lightning Engine.

עכשיו חוזרים על התהליך הזה להרצת מסלול רגיל:

  1. חוזרים לדף Serverless for Apache Spark Batches.
  2. לוחצים על הקישור של חבילת Standard tier, ואז על View Spark UI ואז על View Details.
  3. בממשק המשתמש של Spark, עוברים לכרטיסייה Jobs (משימות).
  4. בקטע משימות שהושלמו, בתיבת החיפוש, מקלידים Velox.
  5. לא תראו בתיאורי המשרות שום אזכור של Velox API.

7. הסרת המשאבים

כדי להימנע מחיובים שוטפים בחשבון Google Cloud, מוחקים את המשאבים שנוצרו במהלך ה-codelab הזה.

ב-Cloud Shell, מוחקים את הקטגוריה של Cloud Storage ואת התוכן שלה:

gcloud storage rm -r gs://${BUCKET_NAME}

מחיקת העותק המקומי של benchmark.py:

rm benchmark.py

8. מזל טוב

מעולה! הצלחתם ליצור סביבת השוואה ל-Apache Spark ולהשוות בין Serverless for Apache Spark Standard לבין Serverless for Apache Spark Premium.

ראיתם בעצמכם איך הפעלת Serverless במנוע Lightning החדש של Apache Spark יכולה לקצר את זמן הריצה של עומס העבודה של Spark, וראיתם בממשק המשתמש של Spark איך תרשים הביצוע הפיזי הופך לקוד C++‎ מקורי באמצעות מנוע השאילתות המקורי.

מה למדתם

  • איך כותבים סקריפט להשוואה בין מערכי נתונים ב-PySpark.
  • איך שולחים משימות Spark ל-Serverless for Apache Spark.
  • איך מפעילים את Lightning Engine
  • איך משווים בין תוכניות עבודה בממשק המשתמש של Spark.

השלבים הבאים