Ускорьте работу Spark с помощью бессерверных вычислений для Apache Spark и Lightning Engine.

1. Введение

В этом практическом занятии вы изучите преимущества повышения производительности собственного механизма выполнения Apache Spark — Lightning Engineв Google Cloud Serverless for Apache Spark , а также рассмотрите, как он оптимизирует ваши рабочие нагрузки Spark на Serverless for Apache Spark .

Lightning Engine использует Velox и Apache Gluten . Velox — это высокопроизводительный движок обработки данных на C++. Apache Gluten — это промежуточный слой, отвечающий за преобразование заданий Spark на основе JVM в код C++, который может быть выполнен Velox.

В этой демонстрации используется TPC-DS , отраслевой стандартный бенчмарк, предназначенный для оценки производительности систем поддержки принятия решений. Вы запустите базовое задание PySpark для запроса к образцу набора данных TPC-DS, используя стандартный бессерверный уровень. Затем вы запустите точно такое же задание, используя уровень Premium с включенным Lightning Engine. Наконец, вы сравните время выполнения и перейдете к пользовательскому интерфейсу Spark, чтобы визуализировать разницу в графиках выполнения Spark с аппаратным ускорением.

Ориентировочная стоимость проведения этого практического занятия составляет менее 1 доллара США , при условии оперативного освобождения ресурсов, как описано в разделе «Очистка ресурсов».

Что вы будете делать

  • Создайте хранилище Cloud Storage для хранения ваших скриптов и результатов бенчмаркинга.
  • Выполните базовую задачу обработки данных PySpark, используя уровень Serverless for Apache Spark Standard.
  • Выполните ту же задачу, используя уровень Serverless for Apache Spark Premium с Lightning Engine.
  • Сравните показатели времени выполнения.
  • Запустите пользовательский интерфейс Spark History Server, чтобы сравнить графики физического выполнения, полученные непосредственно в Spark.

Что вам понадобится

  • Веб-браузер, например Chrome.
  • Проект Google Cloud с включенной функцией выставления счетов.
  • Базовые знания Apache Spark и командной строки Linux.

2. Прежде чем начать

Создайте проект в Google Cloud.

  1. В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud .
  2. Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .

Запустить Cloud Shell

Cloud Shell — это среда командной строки, работающая в Google Cloud и поставляемая с предустановленными необходимыми инструментами.

  1. В верхней части консоли Google Cloud нажмите кнопку «Активировать Cloud Shell» .
  2. После подключения к Cloud Shell подтвердите свою аутентификацию:
    gcloud auth list
    
  3. Убедитесь, что ваш проект настроен:
    gcloud config get project
    
  4. Если параметры вашего проекта заданы не так, как ожидалось, настройте их следующим образом:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Включить API

Выполните эту команду, чтобы включить все необходимые API для данного практического занятия:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Подготовьте окружающую среду

На этом шаге вы инициализируете переменные среды и создадите хранилище Cloud Storage. В этом хранилище будет храниться скрипт PySpark, который вы отправляете на оба уровня Serverless for Apache Spark.

Установка переменных среды

Выполните следующие команды в Cloud Shell, чтобы установить переменные среды по умолчанию. Мы будем использовать регион us-central1 , но вы можете изменить его по своему усмотрению.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Создайте корзину облачного хранилища.

Создайте хранилище для ваших скриптов и логов:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

Скопируйте набор данных TPC-DS в свой собственный сегмент.

На этом шаге вы скопируете набор данных TPC-DS из общедоступного хранилища в собственное хранилище Cloud Storage. Это гарантирует, что ваши задания PySpark смогут считывать данные локально из вашего проекта.

Задайте переменные среды, чтобы выбрать размер и тип набора данных:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

Скопируйте данные TPC-DS в свой собственный контейнер:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

Создайте скрипт для тестирования производительности PySpark.

Мы будем использовать скрипт PySpark, который регистрирует стандартные таблицы TPC-DS из вашего хранилища Cloud Storage и выполняет 5 стандартных запросов, полученных из общедоступного репозитория Apache Spark. Скрипт принимает путь к вашему набору данных в качестве аргумента.

Создайте в Cloud Shell файл с именем benchmark.py . Для создания файла можно скопировать и вставить следующую команду:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Скопируйте скрипт в свой сегмент Cloud Storage, чтобы Serverless for Apache Spark мог получить к нему доступ:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Запустите базовое задание бессерверной архитектуры.

Для проведения базового сравнения без использования Lightning Engine, отправьте загруженное ранее задание PySpark для бенчмаркинга на уровень Serverless for Apache Spark Standard. В качестве аргумента мы передадим путь к скопированному вами набору данных.

Для выполнения пакетного задания выполните следующую команду:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

Контроль за выполнением задания

Во время выполнения задачи вы будете видеть поток логов PySpark в терминале Cloud Shell. Serverless for Apache Spark выделяет контейнеры, считывает набор данных TPC-DS Parquet из Cloud Storage и выполняет сложные SQL-запросы.

После завершения выполнения скрипта просмотрите вывод в консоли. Вы должны увидеть результаты и время выполнения каждого стандартного запроса, примерно следующее:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Обратите внимание на общее время выполнения в секундах. Это ваше базовое время выполнения .

5. Запуск с использованием Serverless Premium и Lightning Engine.

Далее вы запустите точно такое же задание Spark на Serverless для Apache Spark , но используя уровень Premium и включив собственный векторизованный механизм запросов Google: Lightning Engine .

Отправьте задание на тестирование производительности в Serverless, явно включив Lightning Engine:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Сравните результаты

Дождитесь завершения задания и изучите результат. Вы должны увидеть те же результаты запроса. Внимательно посмотрите на время выполнения:

...
All benchmark queries completed in 64.24 seconds.

Сравнивая базовый вариант Serverless с вариантом Serverless Lightning Engine, вы заметите, что Lightning Engine выполняет группировку, агрегацию и объединение данных быстрее, используя собственный слой выполнения на C++ и векторизованную обработку на бэкэнде, без каких-либо изменений в коде вашего приложения PySpark.

Lightning Engine оптимизирован для повышения производительности при больших объемах рабочей нагрузки. В этом примере мы используем небольшой набор данных, поэтому прирост производительности не так значителен, как мог бы быть. На наборе данных объемом 10 ТБ в бенчмарках было показано улучшение производительности до 4,3 раз по сравнению с открытым исходным кодом Spark.

6. Сравните графы выполнения в пользовательском интерфейсе Spark.

Сокращение времени выполнения впечатляет, но давайте заглянем под капот и посмотрим, что на самом деле делает Spark во время выполнения запроса. Это можно сделать, изучив графики выполнения Spark UI для обеих задач.

  1. Откройте консоль Google Cloud в своем браузере.
  2. Перейдите в Dataproc > Пакеты .
  3. В списке вы увидите две группы: стандартную базовую версию и версию для уровня Premium.
  4. Щелкните по пакету данных уровня Premium, который вы запускали, затем щелкните «Просмотреть пользовательский интерфейс Spark» , а затем «Просмотреть подробности» .
  5. В пользовательском интерфейсе Spark перейдите на вкладку «Задания» .
  6. В разделе «Выполненные задания» в поле поиска введите Velox .
  7. Вы часто будете встречать описания заданий, в которых упоминается VeloxSparkPlanExecApi . Это относится к собственному механизму выполнения Velox, используемому Lightning Engine.

Теперь повторите этот процесс для стандартного уровня:

  1. Вернитесь на страницу «Бессерверные вычисления для Apache Spark пакетной обработки».
  2. Щелкните ссылку для пакета стандартного уровня , затем щелкните «Просмотреть пользовательский интерфейс Spark» , а затем «Просмотреть подробности» .
  3. В пользовательском интерфейсе Spark перейдите на вкладку «Задания» .
  4. В разделе «Выполненные задания» в поле поиска введите Velox .
  5. В описаниях вакансий вы не найдете упоминания API Velox.

7. Уборка

Чтобы избежать дальнейших списаний средств с вашего аккаунта Google Cloud, удалите ресурсы, созданные в ходе этого практического занятия.

В Cloud Shell удалите сегмент Cloud Storage и его содержимое:

gcloud storage rm -r gs://${BUCKET_NAME}

Удалите локальную копию файла benchmark.py :

rm benchmark.py

8. Поздравляем!

Поздравляем! Вы успешно создали среду для сравнительного анализа производительности Apache Spark и сравнили Serverless для Apache Spark Standard с Serverless для Apache Spark Premium.

Вы воочию убедились, как включение бессерверной архитектуры для нового механизма Lightning Engine в Apache Spark может сократить время выполнения вашей рабочей нагрузки Spark, и изучили пользовательский интерфейс Spark, чтобы увидеть, как физический граф выполнения преобразуется в нативный код C++ с помощью механизма Native Query Engine.

Что вы узнали

  • Как написать скрипт для сравнительного анализа производительности набора данных PySpark.
  • Как отправлять задания Spark в Serverless для Apache Spark.
  • Как включить Lightning Engine.
  • Как сравнивать планы заданий в пользовательском интерфейсе Spark.

Следующие шаги