1. Pengantar
Dalam codelab ini, Anda akan mempelajari manfaat performa mesin eksekusi native Google Cloud Serverless untuk Apache Spark, Lightning Engine, dan memeriksa cara Lightning Engine mengoptimalkan workload Spark Anda di Serverless untuk Apache Spark.
Lightning Engine menggunakan Velox dan Apache Gluten. Velox adalah mesin C++ berperforma tinggi untuk pemrosesan data. Apache Gluten adalah lapisan tengah yang bertanggung jawab untuk mengonversi tugas Spark berbasis JVM menjadi kode C++ yang dapat dieksekusi oleh Velox.
Demo ini menggunakan TPC-DS, tolok ukur standar industri yang dirancang untuk mengevaluasi performa sistem pendukung keputusan. Anda akan mengirimkan tugas PySpark dasar untuk membuat kueri set data TPC-DS sampel menggunakan tingkat Serverless Standar. Kemudian, Anda akan menjalankan tugas yang sama persis menggunakan tingkat Premium dengan Lightning Engine diaktifkan. Terakhir, Anda akan membandingkan waktu eksekusi dan mempelajari Spark UI untuk memvisualisasikan perbedaan dalam grafik eksekusi Spark yang dipercepat hardware.
Perkiraan biaya untuk menjalankan codelab ini kurang dari $1,00 USD, dengan asumsi resource dibersihkan dengan cepat seperti yang dijelaskan di bagian Pembersihan.
Yang akan Anda lakukan
- Membuat bucket Cloud Storage untuk menyimpan skrip dan hasil tolok ukur
- Menjalankan tugas pemrosesan data PySpark dasar menggunakan tingkat Serverless untuk Apache Spark Standar
- Menjalankan tugas yang sama menggunakan tingkat Serverless untuk Apache Spark Premium dengan Lightning Engine
- Membandingkan metrik runtime
- Meluncurkan Spark History Server UI untuk membandingkan grafik eksekusi fisik native
Yang akan Anda butuhkan
- Browser web seperti Chrome
- Project Google Cloud yang mengaktifkan penagihan
- Pengetahuan dasar tentang Apache Spark dan command line Linux
2. Sebelum memulai
Membuat Project Google Cloud
- Di Konsol Google Cloud, di halaman pemilih project, pilih atau buat project Google Cloud.
- Pastikan penagihan diaktifkan untuk project Cloud Anda. Pelajari cara memeriksa apakah penagihan diaktifkan di project.
Mulai Cloud Shell
Cloud Shell adalah lingkungan command line yang berjalan di Google Cloud dan sudah dilengkapi dengan alat yang diperlukan.
- Klik Activate Cloud Shell di bagian atas konsol Google Cloud.
- Setelah terhubung ke Cloud Shell, verifikasi autentikasi Anda:
gcloud auth list - Konfirmasi bahwa project Anda telah dikonfigurasi:
gcloud config get project - Jika project Anda tidak ditetapkan seperti yang diharapkan, tetapkan project:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Mengaktifkan API
Jalankan perintah ini untuk mengaktifkan semua API yang diperlukan untuk codelab ini:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. Menyiapkan Lingkungan
Pada langkah ini, Anda akan menginisialisasi variabel lingkungan dan membuat bucket Cloud Storage. Bucket ini akan menyimpan skrip PySpark yang Anda kirimkan ke kedua tingkat Serverless untuk Apache Spark.
Menetapkan Variabel Lingkungan
Jalankan perintah berikut di Cloud Shell untuk menetapkan variabel lingkungan default. Kita akan menggunakan region us-central1, tetapi Anda dapat mengubahnya jika mau.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Membuat Bucket Cloud Storage
Buat bucket untuk menyimpan skrip dan log Anda:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
Menyalin Set Data TPC-DS ke Bucket Anda Sendiri
Pada langkah ini, Anda akan menyalin set data TPC-DS dari bucket publik ke bucket Cloud Storage Anda sendiri. Hal ini memastikan tugas PySpark Anda dapat membaca data secara lokal dari project Anda.
Tetapkan variabel lingkungan untuk memilih ukuran dan jenis set data:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
Salin data TPC-DS ke bucket Anda sendiri:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
Membuat Skrip Tolok Ukur PySpark
Kita akan menggunakan skrip PySpark yang mendaftarkan tabel TPC-DS standar dari bucket Cloud Storage Anda dan menjalankan 5 kueri standar yang bersumber dari repositori publik Apache Spark. Skrip ini menerima jalur ke set data Anda sebagai argumen.
Buat file bernama benchmark.py di Cloud Shell. Anda dapat menyalin dan menempelkan perintah berikut untuk membuat file:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Salin skrip ke bucket Cloud Storage Anda sehingga Serverless untuk Apache Spark dapat mengaksesnya:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. Menjalankan Tugas Serverless Dasar
Untuk memberikan perbandingan dasar tanpa Lightning Engine, kirimkan tugas tolok ukur PySpark yang Anda upload sebelumnya ke tingkat Serverless untuk Apache Spark Standar. Kita akan meneruskan jalur ke set data yang Anda salin sebagai argumen.
Jalankan perintah berikut untuk menjalankan tugas batch:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
Memantau Tugas
Saat tugas sedang dijalankan, Anda akan melihat log PySpark yang di-streaming di terminal Cloud Shell. Serverless untuk Apache Spark mengalokasikan container, membaca set data TPC-DS Parquet dari Cloud Storage, dan menjalankan rencana SQL yang kompleks.
Setelah skrip selesai, amati output konsol. Anda akan melihat hasil dan waktu untuk setiap kueri standar yang dijalankan, mirip dengan:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
Perhatikan total detik yang diperlukan untuk menyelesaikan tugas. Ini adalah runtime dasar Anda.
5. Menjalankan dengan Serverless Premium dan Lightning Engine
Selanjutnya, Anda akan menjalankan tugas Spark yang sama persis di Serverless untuk Apache Spark, tetapi menggunakan tingkat Premium dan mengaktifkan mesin kueri vektor native Google: Lightning Engine.
Kirimkan tugas tolok ukur ke Serverless dengan Lightning Engine yang diaktifkan secara eksplisit:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
Membandingkan Hasil
Tunggu hingga tugas selesai dan periksa output. Anda akan melihat hasil kueri yang sama. Perhatikan baik-baik waktu penyelesaian:
... All benchmark queries completed in 64.24 seconds.
Dengan membandingkan eksekusi Serverless dasar dengan eksekusi Serverless Lightning Engine, Anda akan melihat bahwa Lightning Engine menjalankan pengelompokan, agregasi, dan gabungan lebih cepat dengan memanfaatkan lapisan eksekusi C++ native dan pemrosesan vektor di backend, tanpa memerlukan perubahan apa pun pada kode aplikasi PySpark Anda.
Lightning Engine dioptimalkan untuk meningkatkan performa seiring dengan bertambahnya workload. Dalam contoh ini, kita menggunakan set data kecil, sehingga peningkatan performanya tidak sedramatis yang seharusnya. Pada set data 10 TB, peningkatan performa hingga 4,3x lipat dibandingkan Spark open source telah ditunjukkan dalam tolok ukur.
6. Membandingkan Grafik Eksekusi di Spark UI
Pengurangan runtime sangat mengesankan, tetapi mari kita lihat di balik layar apa yang sebenarnya dilakukan Spark selama eksekusi kueri. Anda dapat melakukannya dengan memeriksa grafik eksekusi Spark UI untuk kedua tugas.
- Buka konsol Google Cloud di browser Anda.
- Buka Dataproc > Batches.
- Anda akan melihat dua batch dalam daftar: eksekusi dasar standar dan eksekusi tingkat Premium.
- Klik batch tingkat Premium yang Anda jalankan, lalu klik View Spark UI , lalu View Details.
- Di Spark UI, buka tab Jobs.
- Di bagian Completed Jobs, di kotak penelusuran, ketik
Velox. - Anda akan melihat banyak deskripsi tugas yang menyertakan
VeloxSparkPlanExecApi. Hal ini mengacu pada mesin eksekusi native Velox yang digunakan oleh Lightning Engine.
Sekarang, ulangi proses ini untuk eksekusi tingkat Standar:
- Kembali ke halaman Serverless untuk Apache Spark Batches.
- Klik link untuk batch tingkat Standar , lalu klik View Spark UI , lalu View Details.
- Di Spark UI, buka tab Jobs.
- Di bagian Completed Jobs, di kotak penelusuran, ketik
Velox. - Anda tidak akan melihat penyebutan Velox API dalam deskripsi tugas.
7. Pembersihan
Untuk menghindari biaya berkelanjutan ke akun Google Cloud Anda, hapus resource yang dibuat selama codelab ini.
Di Cloud Shell, hapus bucket Cloud Storage dan kontennya:
gcloud storage rm -r gs://${BUCKET_NAME}
Hapus salinan lokal benchmark.py:
rm benchmark.py
8. Selamat
Selamat! Anda telah berhasil membuat lingkungan tolok ukur untuk Apache Spark dan membandingkan Serverless untuk Apache Spark Standar dengan Serverless untuk Apache Spark Premium.
Anda melihat langsung bagaimana mengaktifkan Lightning Engine baru Serverless untuk Apache Spark dapat mengurangi runtime workload Spark Anda, dan Anda mempelajari Spark UI untuk melihat bagaimana grafik eksekusi fisik diubah menjadi kode C++ native menggunakan Native Query Engine.
Yang telah Anda pelajari
- Cara menulis skrip tolok ukur set data PySpark.
- Cara mengirimkan tugas Spark ke Serverless untuk Apache Spark.
- Cara mengaktifkan Lightning Engine.
- Cara membandingkan rencana tugas di Spark UI.
Langkah berikutnya
- Mempelajari Dokumentasi Serverless untuk Apache Spark
- Melihat alat kualifikasi Native Query Execution
- Melihat kueri tolok ukur TPC-DS lengkap di GitHub