1. परिचय
इस कोडलैब में, आपको Google Cloud Serverless for Apache Spark के नेटिव एक्ज़ीक्यूशन इंजन, Lightning Engine की परफ़ॉर्मेंस के फ़ायदों के बारे में पता चलेगा. साथ ही, यह भी पता चलेगा कि यह Serverless for Apache Spark पर आपके Spark वर्कलोड को कैसे ऑप्टिमाइज़ करता है.
Lightning Engine, Velox और Apache Gluten का इस्तेमाल करता है. Velox, डेटा प्रोसेसिंग के लिए एक हाई-परफ़ॉर्मेंस C++ इंजन है. Apache Gluten एक मिडल लेयर है. यह JVM पर आधारित Spark जॉब को C++ कोड में बदलती है. इस कोड को Velox की मदद से एक्ज़ीक्यूट किया जा सकता है.
इस डेमो में, इंडस्ट्री के स्टैंडर्ड बेंचमार्क TPC-DS का इस्तेमाल किया गया है. इसे फ़ैसले लेने में मदद करने वाले सिस्टम की परफ़ॉर्मेंस का आकलन करने के लिए डिज़ाइन किया गया है. आपको स्टैंडर्ड सर्वरलेस टियर का इस्तेमाल करके, टीपीसी-डीएस के सैंपल डेटासेट को क्वेरी करने के लिए, एक बेसलाइन PySpark जॉब सबमिट करनी होगी. इसके बाद, Lightning Engine की सुविधा चालू करके, Premium टियर का इस्तेमाल करके वही जॉब चलाएं. आखिर में, आपको एक्ज़ीक्यूशन टाइम की तुलना करनी होगी. साथ ही, हार्डवेयर की मदद से तेज़ी से प्रोसेस किए गए Spark एक्ज़ीक्यूशन ग्राफ़ के अंतर को देखने के लिए, Spark UI का इस्तेमाल करना होगा.
इस कोडलैब को चलाने का अनुमानित खर्च 1.00 अमेरिकी डॉलर से कम है. हालांकि, यह तब होगा, जब हटाएं सेक्शन में बताए गए तरीके से संसाधनों को तुरंत हटा दिया जाए.
आपको क्या करना होगा
- अपने बेंचमार्क स्क्रिप्ट और नतीजों को सेव करने के लिए, Cloud Storage बकेट बनाएं
- Serverless for Apache Spark के स्टैंडर्ड टियर का इस्तेमाल करके, PySpark की मदद से डेटा प्रोसेस करने का बेसिक जॉब पूरा करना
- Lightning Engine के साथ Serverless for Apache Spark Premium tier का इस्तेमाल करके, एक ही जॉब को एक्ज़ीक्यूट करना
- रनटाइम मेट्रिक की तुलना करना
- नेटिव फ़िज़िकल एक्ज़ीक्यूशन ग्राफ़ की तुलना करने के लिए, Spark History Server का यूज़र इंटरफ़ेस (यूआई) लॉन्च करें
आपको किन चीज़ों की ज़रूरत होगी
- कोई वेब ब्राउज़र, जैसे कि Chrome
- बिलिंग की सुविधा वाला Google Cloud प्रोजेक्ट
- Apache Spark और Linux कमांड लाइन के बारे में बुनियादी जानकारी
2. शुरू करने से पहले
Google Cloud प्रोजेक्ट बनाना
- Google Cloud Console में, प्रोजेक्ट चुनने वाले पेज पर, Google Cloud प्रोजेक्ट चुनें या बनाएं.
- पक्का करें कि आपके Cloud प्रोजेक्ट के लिए बिलिंग चालू हो. किसी प्रोजेक्ट के लिए बिलिंग चालू है या नहीं, यह देखने का तरीका जानें.
Cloud Shell शुरू करना
Cloud Shell, Google Cloud में चलने वाला एक कमांड-लाइन एनवायरमेंट है. इसमें ज़रूरी टूल पहले से लोड होते हैं.
- Google Cloud कंसोल में सबसे ऊपर मौजूद, Cloud Shell चालू करें पर क्लिक करें.
- Cloud Shell से कनेक्ट होने के बाद, अपने क्रेडेंशियल की पुष्टि करें:
gcloud auth list - पुष्टि करें कि आपका प्रोजेक्ट कॉन्फ़िगर किया गया है:
gcloud config get project - अगर आपका प्रोजेक्ट उम्मीद के मुताबिक सेट नहीं है, तो इसे सेट करें:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
एपीआई चालू करें
इस कोडलैब के लिए ज़रूरी सभी एपीआई चालू करने के लिए, यह निर्देश चलाएं:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. अपना एनवायरमेंट तैयार करना
इस चरण में, एनवायरमेंट वैरिएबल को शुरू किया जाएगा और Cloud Storage बकेट बनाया जाएगा. इस बकेट में, PySpark स्क्रिप्ट सेव की जाएगी. इसे Serverless for Apache Spark के दोनों टियर में सबमिट किया जाता है.
एनवायरमेंट वैरिएबल सेट करना
डिफ़ॉल्ट एनवायरमेंट वैरिएबल सेट करने के लिए, Cloud Shell में यहां दिए गए कमांड चलाएं. हम us-central1 क्षेत्र का इस्तेमाल करेंगे. हालांकि, आपके पास इसे बदलने का विकल्प है.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Cloud Storage बकेट बनाना
अपनी स्क्रिप्ट और लॉग सेव करने के लिए बकेट बनाएं:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
TPC-DS डेटासेट को अपने बकेट में कॉपी करना
इस चरण में, आपको टीपीसी-डीएस डेटासेट को किसी सार्वजनिक बकेट से अपने Cloud Storage बकेट में कॉपी करना होगा. इससे यह पक्का होता है कि आपके PySpark जॉब, आपके प्रोजेक्ट से डेटा को स्थानीय तौर पर पढ़ सकें.
डेटासेट का साइज़ और टाइप चुनने के लिए, एनवायरमेंट वैरिएबल सेट करें:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
TPC-DS डेटा को अपने बकेट में कॉपी करें:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
PySpark की परफ़ॉर्मेंस की जांच करने वाली स्क्रिप्ट बनाना
हम PySpark स्क्रिप्ट का इस्तेमाल करेंगे. यह स्क्रिप्ट, आपके Cloud Storage बकेट से स्टैंडर्ड टीपीसी-डीएस टेबल रजिस्टर करती है. साथ ही, Apache Spark की सार्वजनिक रिपॉज़िटरी से ली गई पांच स्टैंडर्ड क्वेरी को एक्ज़ीक्यूट करती है. यह स्क्रिप्ट, आपके डेटासेट के पाथ को आर्ग्युमेंट के तौर पर स्वीकार करती है.
Cloud Shell में benchmark.py नाम की फ़ाइल बनाएं. फ़ाइल जनरेट करने के लिए, इस निर्देश को कॉपी करके चिपकाया जा सकता है:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
स्क्रिप्ट को Cloud Storage बकेट में कॉपी करें, ताकि Serverless for Apache Spark इसे ऐक्सेस कर सके:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. बिना सर्वर वाला बेसलाइन जॉब चलाना
Lightning Engine के बिना तुलना करने के लिए, Serverless for Apache Spark के स्टैंडर्ड टियर में, पहले अपलोड किया गया PySpark बेंचमार्किंग जॉब सबमिट करें. हम कॉपी किए गए डेटासेट के पाथ को आर्ग्युमेंट के तौर पर पास करेंगे.
बैच जॉब को लागू करने के लिए, यह कमांड चलाएं:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
जॉब को मॉनिटर करना
जॉब के चालू रहने के दौरान, आपको Cloud Shell टर्मिनल में PySpark लॉग स्ट्रीम होते हुए दिखेंगे. Serverless for Apache Spark, कंटेनर असाइन कर रहा है. साथ ही, Cloud Storage से TPC-DS Parquet डेटासेट पढ़ रहा है और जटिल SQL प्लान लागू कर रहा है.
स्क्रिप्ट पूरी होने के बाद, कंसोल आउटपुट देखें. आपको हर स्टैंडर्ड क्वेरी के लिए, नतीजे और समय दिखना चाहिए. जैसे:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
ध्यान दें कि इसे पूरा होने में कुल कितने सेकंड लगे. यह आपका बेसलाइन रनटाइम है.
5. Serverless Premium और Lightning Engine का इस्तेमाल करना
इसके बाद, आपको Serverless for Apache Spark पर ठीक वही Spark जॉब चलाना होगा. हालांकि, इसमें प्रीमियम टियर का इस्तेमाल करना होगा. साथ ही, Google के नेटिव, वेक्टर वाले क्वेरी इंजन: Lightning Engine को चालू करना होगा.
बेंचमार्क जॉब को सर्वरलेस पर सबमिट करें. इसके लिए, Lightning Engine को साफ़ तौर पर चालू करें:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
नतीजों की तुलना करना
जॉब पूरा होने का इंतज़ार करें और आउटपुट की जांच करें. आपको क्वेरी के नतीजे एक जैसे दिखेंगे. समीक्षा पूरी होने के समय पर ध्यान दें:
... All benchmark queries completed in 64.24 seconds.
बिना सर्वर वाली कंप्यूटिंग की सुविधा के साथ चलने वाले ऐप्लिकेशन की तुलना, बिना सर्वर वाली कंप्यूटिंग की सुविधा के साथ चलने वाले Lightning Engine से करने पर, आपको पता चलेगा कि Lightning Engine, ग्रुपिंग, एग्रीगेशन, और जॉइन को तेज़ी से पूरा करता है. इसके लिए, वह नेटिव C++ एक्ज़ीक्यूशन लेयर और बैकएंड पर वेक्टर प्रोसेस करने की सुविधा का इस्तेमाल करता है. इसके लिए, आपको अपने PySpark ऐप्लिकेशन कोड में कोई बदलाव करने की ज़रूरत नहीं होती.
Lightning Engine को इस तरह से ऑप्टिमाइज़ किया गया है कि वर्कलोड बढ़ने पर, इसकी परफ़ॉर्मेंस बेहतर हो जाती है. इस उदाहरण में, हम छोटे डेटासेट का इस्तेमाल कर रहे हैं. इसलिए, परफ़ॉर्मेंस में उतनी बढ़ोतरी नहीं हुई है जितनी हो सकती थी. बेंचमार्क में, 10 टीबी के डेटासेट पर, ओपन सोर्स स्पार्क की तुलना में 4.3 गुना बेहतर परफ़ॉर्मेंस दिखाई गई है.
6. Spark UI में एक्ज़ीक्यूशन ग्राफ़ की तुलना करना
रनटाइम में कमी काफ़ी अच्छी है. हालांकि, आइए अंदरूनी तौर पर देखते हैं कि क्वेरी एक्ज़ीक्यूट करने के दौरान Spark असल में क्या कर रहा है. इसके लिए, दोनों जॉब के लिए Spark UI के एक्ज़ीक्यूशन ग्राफ़ की जांच करें.
- अपने ब्राउज़र में Google Cloud Console खोलें.
- Dataproc > बैचेज़ पर जाएं.
- आपको सूची में दो बैच दिखेंगे: स्टैंडर्ड बेसलाइन रन और Premium टियर रन.
- आपने जिस प्रीमियम टियर बैच को चलाया था उस पर क्लिक करें. इसके बाद, Spark UI देखें और फिर जानकारी देखें पर क्लिक करें.
- Spark यूज़र इंटरफ़ेस (यूआई) में, Jobs टैब पर जाएं.
- पूरे हो चुके जॉब में जाकर, खोज बॉक्स में
Veloxटाइप करें. - आपको नौकरी की कई ऐसी जानकारी दिखेगी जिनमें
VeloxSparkPlanExecApiशामिल है. इसका मतलब है कि Lightning Engine, Velox के नेटिव एक्ज़ीक्यूशन इंजन का इस्तेमाल कर रहा है.
अब, स्टैंडर्ड टियर के रन के लिए यह प्रोसेस दोहराएं:
- Serverless for Apache Spark Batches पेज पर वापस जाएं.
- स्टैंडर्ड टियर बैच के लिंक पर क्लिक करें. इसके बाद, Spark UI देखें पर क्लिक करें. इसके बाद, जानकारी देखें पर क्लिक करें.
- Spark यूज़र इंटरफ़ेस (यूआई) में, Jobs टैब पर जाएं.
- पूरे हो चुके जॉब में जाकर, खोज बॉक्स में
Veloxटाइप करें. - आपको नौकरी के ब्यौरे में Velox API के बारे में कोई जानकारी नहीं दिखेगी.
7. व्यवस्थित करें
अपने Google Cloud खाते से लगातार शुल्क लिए जाने से बचने के लिए, इस कोडलैब के दौरान बनाई गई संसाधन मिटाएं.
Cloud Shell में, Cloud Storage बकेट और उसका कॉन्टेंट मिटाएं:
gcloud storage rm -r gs://${BUCKET_NAME}
benchmark.py की लोकल कॉपी मिटाने के लिए:
rm benchmark.py
8. बधाई हो
बधाई हो! आपने Apache Spark के लिए, बेंचमार्किंग एनवायरमेंट को सफलतापूर्वक बनाया है. साथ ही, Serverless for Apache Spark Standard की तुलना Serverless for Apache Spark Premium से की है.
आपने देखा कि Apache Spark के नए Lightning Engine के लिए Serverless की सुविधा चालू करने से, Spark के वर्कलोड का रनटाइम कैसे कम किया जा सकता है. साथ ही, आपने Spark UI को एक्सप्लोर करके देखा कि Native Query Engine का इस्तेमाल करके, फ़िज़िकल एक्ज़ीक्यूशन ग्राफ़ को नेटिव C++ कोड में कैसे बदला जाता है.
आपको क्या सीखने को मिला
- PySpark डेटासेट की परफ़ॉर्मेंस की तुलना करने वाली स्क्रिप्ट लिखने का तरीका.
- Serverless for Apache Spark में Spark जॉब कैसे सबमिट करें.
- Lightning Engine को चालू करने का तरीका.
- Spark UI में जॉब प्लान की तुलना कैसे करें.