افزایش سرعت اسپارک با Serverless برای آپاچی اسپارک و موتور لایتنینگ

۱. مقدمه

در این آزمایشگاه کد، مزایای عملکرد Google Cloud Serverless را برای موتور اجرای بومی Apache Spark ، یعنی Lightning Engine ، بررسی خواهید کرد و بررسی خواهید کرد که چگونه بار کاری Spark شما را روی Serverless برای Apache Spark بهینه می‌کند.

موتور لایتنینگ از Velox و Apache Gluten استفاده می‌کند. Velox یک موتور C++ با کارایی بالا برای پردازش داده‌ها است. Apache Gluten یک لایه میانی است که مسئول تبدیل کارهای Spark مبتنی بر JVM به کد C++ است که می‌تواند توسط Velox اجرا شود.

این نسخه آزمایشی از TPC-DS ، یک معیار استاندارد صنعتی که برای ارزیابی عملکرد سیستم‌های پشتیبانی تصمیم‌گیری طراحی شده است، استفاده می‌کند. شما یک کار پایه PySpark را برای پرس‌وجو از یک مجموعه داده نمونه TPC-DS با استفاده از لایه استاندارد بدون سرور ارسال خواهید کرد. سپس، دقیقاً همان کار را با استفاده از لایه Premium با موتور Lightning فعال اجرا خواهید کرد. در نهایت، زمان اجرا را مقایسه کرده و به رابط کاربری Spark خواهید پرداخت تا تفاوت در نمودارهای اجرای Spark با شتاب‌دهنده سخت‌افزاری را تجسم کنید.

هزینه تخمینی برای اجرای این آزمایشگاه کد کمتر از ۱.۰۰ دلار آمریکا است، با فرض اینکه منابع به سرعت طبق توضیحات بخش پاکسازی، پاکسازی شوند.

کاری که انجام خواهید داد

  • یک فضای ذخیره‌سازی ابری برای ذخیره اسکریپت‌ها و نتایج بنچمارک خود ایجاد کنید
  • اجرای یک کار پردازش داده پایه PySpark با استفاده از Serverless برای لایه استاندارد Apache Spark
  • اجرای همان کار با استفاده از Serverless برای لایه Apache Spark Premium با موتور لایتنینگ
  • مقایسه معیارهای زمان اجرا
  • رابط کاربری Spark History Server را برای مقایسه نمودارهای اجرای فیزیکی بومی اجرا کنید.

آنچه نیاز دارید

۲. قبل از شروع

ایجاد یک پروژه ابری گوگل

  1. در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید .
  2. مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .

شروع پوسته ابری

Cloud Shell یک محیط خط فرمان است که در Google Cloud اجرا می‌شود و ابزارهای لازم از قبل روی آن بارگذاری شده‌اند.

  1. روی فعال کردن Cloud Shell در بالای کنسول Google Cloud کلیک کنید.
  2. پس از اتصال به Cloud Shell، احراز هویت خود را تأیید کنید:
    gcloud auth list
    
  3. تأیید کنید که پروژه شما پیکربندی شده است:
    gcloud config get project
    
  4. اگر پروژه شما مطابق انتظار تنظیم نشده است، آن را تنظیم کنید:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

فعال کردن APIها

برای فعال کردن تمام API های مورد نیاز برای این codelab، این دستور را اجرا کنید:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

۳. محیط خود را آماده کنید

در این مرحله، متغیرهای محیطی را مقداردهی اولیه کرده و یک مخزن ذخیره‌سازی ابری ایجاد خواهید کرد. این مخزن، اسکریپت PySpark ارسالی شما به هر دو لایه Serverless برای Apache Spark را در خود جای می‌دهد.

تنظیم متغیرهای محیطی

دستورات زیر را در Cloud Shell اجرا کنید تا متغیرهای محیطی پیش‌فرض تنظیم شوند. ما از ناحیه us-central1 استفاده خواهیم کرد، اما در صورت تمایل می‌توانید آن را تغییر دهید.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

یک سطل ذخیره‌سازی ابری ایجاد کنید

یک سطل برای نگهداری اسکریپت‌ها و لاگ‌های خود ایجاد کنید:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

مجموعه داده TPC-DS را در سطل خود کپی کنید

در این مرحله، مجموعه داده‌های TPC-DS را از یک باکت عمومی به باکت ذخیره‌سازی ابری خود کپی خواهید کرد. این کار تضمین می‌کند که وظایف PySpark شما می‌توانند داده‌ها را به صورت محلی از پروژه شما بخوانند.

متغیرهای محیطی را برای انتخاب اندازه و نوع مجموعه داده تنظیم کنید:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

داده‌های TPC-DS را در سطل خودتان کپی کنید:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

ایجاد اسکریپت معیار PySpark

ما از یک اسکریپت PySpark استفاده خواهیم کرد که جداول استاندارد TPC-DS را از مخزن ذخیره‌سازی ابری شما ثبت می‌کند و 5 کوئری استاندارد را که از مخزن عمومی آپاچی اسپارک گرفته شده‌اند، اجرا می‌کند. این اسکریپت مسیر مجموعه داده‌های شما را به عنوان آرگومان می‌پذیرد.

یک فایل با نام benchmark.py در Cloud Shell ایجاد کنید. می‌توانید دستور زیر را برای تولید فایل کپی و جایگذاری کنید:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

اسکریپت را در مخزن ذخیره‌سازی ابری خود کپی کنید تا Serverless برای Apache Spark بتواند به آن دسترسی داشته باشد:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

۴. اجرای کار Baseline Serverless

برای ارائه یک مقایسه پایه بدون موتور لایتنینگ، کار معیارسنجی PySpark را که قبلاً آپلود کرده‌اید، به لایه Serverless for Apache Spark Standard ارسال کنید. ما مسیر مجموعه داده‌ای را که کپی کرده‌اید به عنوان آرگومان ارسال خواهیم کرد.

برای اجرای کار دسته‌ای، دستور زیر را اجرا کنید:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

نظارت بر کار

در حین اجرای کار، گزارش‌های PySpark را در ترمینال Cloud Shell خود مشاهده خواهید کرد. Serverless برای Apache Spark در حال تخصیص کانتینرها، خواندن مجموعه داده‌های TPC-DS Parquet از Cloud Storage و اجرای طرح‌های پیچیده SQL است.

پس از اتمام اسکریپت، خروجی کنسول را مشاهده کنید. باید نتایج و زمان‌بندی‌های هر پرس‌وجوی استاندارد اجرا شده را مشاهده کنید، مشابه:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

به کل ثانیه‌هایی که طول کشید تا تکمیل شود توجه کنید. این زمان اجرای پایه شماست.

۵. با Serverless Premium و Lightning Engine اجرا کنید

در مرحله بعد، دقیقاً همان کار Spark را روی Serverless برای Apache Spark اجرا خواهید کرد، اما با استفاده از سطح Premium و فعال کردن موتور جستجوی برداری بومی گوگل: Lightning Engine .

کار معیار را با فعال بودن صریح موتور لایتنینگ به Serverless ارسال کنید:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

نتایج را مقایسه کنید

منتظر بمانید تا کار تکمیل شود و خروجی را بررسی کنید. باید همان نتایج پرس و جو را ببینید. به زمان تکمیل با دقت نگاه کنید:

...
All benchmark queries completed in 64.24 seconds.

با مقایسه اجرای اولیه Serverless با اجرای Serverless Lightning Engine، متوجه خواهید شد که Lightning Engine با استفاده از یک لایه اجرایی بومی C++ و پردازش برداری در backend، بدون نیاز به هیچ تغییری در کد برنامه PySpark شما، گروه‌بندی، تجمیع و اتصال را سریع‌تر انجام می‌دهد.

موتور لایتنینگ برای افزایش عملکرد در حجم کاری بیشتر بهینه شده است. در این مثال ما از یک مجموعه داده کوچک استفاده می‌کنیم، بنابراین افزایش عملکرد به آن اندازه که باید چشمگیر نیست. در یک مجموعه داده 10 ترابایتی، بهبود عملکرد تا 4.3 برابر نسبت به اسپارک متن‌باز در معیارها نشان داده شده است.

۶. مقایسه نمودارهای اجرا در رابط کاربری Spark

کاهش زمان اجرا چشمگیر است، اما بیایید نگاهی عمیق‌تر به آنچه اسپارک در حین اجرای کوئری انجام می‌دهد، بیندازیم. می‌توانید این کار را با بررسی نمودارهای اجرای رابط کاربری اسپارک برای هر دو کار انجام دهید.

  1. کنسول گوگل کلود را در مرورگر خود باز کنید.
  2. به Dataproc > batches بروید.
  3. دو دسته در لیست مشاهده خواهید کرد: اجرای پایه استاندارد و اجرای سطح ویژه.
  4. روی دسته‌ی Premium tier که اجرا کرده‌اید کلیک کنید، سپس روی View Spark UI و بعد View Details کلیک کنید.
  5. در رابط کاربری Spark، به تب Jobs بروید.
  6. در قسمت «کارهای تکمیل‌شده» ، در کادر جستجو، عبارت Velox را تایپ کنید.
  7. شما شرح وظایف زیادی را خواهید دید که شامل VeloxSparkPlanExecApi می‌شود. این به موتور اجرایی بومی Velox که توسط Lightning Engine استفاده می‌شود، اشاره دارد.

حالا، این فرآیند را برای اجرای ردیف استاندارد تکرار کنید:

  1. به صفحه Serverless برای دسته‌های آپاچی اسپارک برگردید.
  2. روی لینک مربوط به دسته استاندارد کلیک کنید، سپس روی View Spark UI و بعد View Details کلیک کنید.
  3. در رابط کاربری Spark، به تب Jobs بروید.
  4. در قسمت «کارهای تکمیل‌شده» ، در کادر جستجو، عبارت Velox را تایپ کنید.
  5. در توضیحات شغلی هیچ اشاره‌ای به API ولوکس نخواهید دید.

۷. تمیز کردن

برای جلوگیری از هزینه‌های مداوم برای حساب Google Cloud خود، منابع ایجاد شده در طول این codelab را حذف کنید.

در Cloud Shell، مخزن Cloud Storage و محتویات آن را حذف کنید:

gcloud storage rm -r gs://${BUCKET_NAME}

کپی محلی benchmark.py خود را حذف کنید:

rm benchmark.py

۸. تبریک

تبریک! شما با موفقیت یک محیط بنچمارک برای آپاچی اسپارک ساختید و Serverless برای آپاچی اسپارک استاندارد را با Serverless برای آپاچی اسپارک پریمیوم مقایسه کردید.

شما از نزدیک دیدید که چگونه فعال کردن Serverless برای موتور جدید Lightning آپاچی اسپارک می‌تواند زمان اجرای بار کاری اسپارک شما را کاهش دهد، و رابط کاربری اسپارک را بررسی کردید تا ببینید چگونه نمودار اجرای فیزیکی با استفاده از موتور جستجوی بومی به کد ++C بومی تبدیل می‌شود.

آنچه آموخته‌اید

  • چگونه یک اسکریپت بنچمارک‌گیری از مجموعه داده‌های PySpark بنویسیم؟
  • نحوه ارسال کارهای Spark به Serverless برای Apache Spark.
  • نحوه فعال کردن موتور لایتنینگ.
  • نحوه مقایسه طرح‌های شغلی در رابط کاربری Spark.

مراحل بعدی