1. 소개
이 Codelab에서는 Apache Spark용 Google Cloud 서버리스의 기본 실행 엔진인 Lightning Engine의 성능 이점을 살펴보고 Apache Spark용 서버리스에서 Spark 워크로드를 최적화하는 방법을 알아봅니다.
Lightning Engine은 Velox 및 Apache Gluten을 사용합니다. Velox는 데이터 처리를 위한 고성능 C++ 엔진입니다. Apache Gluten은 JVM 기반 Spark 작업을 Velox에서 실행할 수 있는 C++ 코드로 변환하는 역할을 하는 중간 계층입니다.
이 데모에서는 의사 결정 지원 시스템의 성능을 평가하기 위해 설계된 업계 표준 벤치마크인 TPC-DS를 사용합니다. 표준 서버리스 등급을 사용하여 샘플 TPC-DS 데이터 세트를 쿼리하는 기본 PySpark 작업을 제출합니다. 그런 다음 Lightning Engine이 사용 설정된 프리미엄 등급을 사용하여 동일한 작업을 실행합니다. 마지막으로 실행 시간을 비교하고 Spark UI를 살펴보고 하드웨어 가속 Spark 실행 그래프의 차이를 시각화합니다.
이 Codelab을 실행하는 데 드는 예상 비용은 삭제 섹션에 설명된 대로 리소스가 즉시 삭제된다고 가정할 때 미화 1달러 미만 입니다.
실습할 내용
- 벤치마크 스크립트와 결과를 저장할 Cloud Storage 버킷을 만듭니다.
- Apache Spark용 서버리스 표준 등급 을 사용하여 기본 PySpark 데이터 처리 작업을 실행합니다.
- Lightning Engine이 포함된 Apache Spark용 서버리스 프리미엄 등급 을 사용하여 동일한 작업을 실행합니다.
- 런타임 측정항목을 비교합니다.
- Spark 기록 서버 UI를 실행하여 기본 물리적 실행 그래프를 비교합니다.
필요한 항목
- 웹브라우저(예: Chrome)
- 결제가 사용 설정된 Google Cloud 프로젝트
- Apache Spark 및 Linux 명령줄에 대한 기본 지식
2. 시작하기 전에
Google Cloud 프로젝트 만들기
- Google Cloud 콘솔의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
- Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
Cloud Shell 시작
Cloud Shell 은 Google Cloud에서 실행되는 명령줄 환경으로 필요한 도구가 사전 로드되어 있습니다.
- Google Cloud 콘솔 상단에서 Cloud Shell 활성화 를 클릭합니다.
- Cloud Shell에 연결되면 인증을 확인합니다.
gcloud auth list - 프로젝트가 구성되어 있는지 확인합니다.
gcloud config get project - 프로젝트가 예상대로 설정되지 않은 경우 설정합니다.
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
API 사용 설정
이 Codelab에 필요한 모든 API를 사용 설정하려면 이 명령어를 실행합니다.
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. 환경 준비
이 단계에서는 환경 변수를 초기화하고 Cloud Storage 버킷을 만듭니다. 이 버킷에는 Apache Spark용 서버리스 등급에 제출하는 PySpark 스크립트가 포함됩니다.
환경 변수 설정
Cloud Shell에서 다음 명령어를 실행하여 기본 환경 변수를 설정합니다. us-central1 리전을 사용하지만 원하는 경우 변경할 수 있습니다.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Cloud Storage 버킷 만들기
스크립트와 로그를 저장할 버킷을 만듭니다.
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
TPC-DS 데이터 세트를 자체 버킷에 복사
이 단계에서는 TPC-DS 데이터 세트를 공개 버킷에서 자체 Cloud Storage 버킷으로 복사합니다. 이렇게 하면 PySpark 작업이 프로젝트에서 로컬로 데이터를 읽을 수 있습니다.
환경 변수를 설정하여 데이터 세트 크기와 유형을 선택합니다.
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
TPC-DS 데이터를 자체 버킷에 복사합니다.
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
PySpark 벤치마크 스크립트 만들기
Cloud Storage 버킷에서 표준 TPC-DS 테이블을 등록하고 Apache Spark 공개 저장소에서 가져온 5개의 표준 쿼리를 실행하는 PySpark 스크립트를 사용합니다. 스크립트는 데이터 세트의 경로를 인수로 허용합니다.
Cloud Shell에서 benchmark.py라는 파일을 만듭니다. 다음 명령어를 복사하여 붙여넣어 파일을 생성할 수 있습니다.
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Apache Spark용 서버리스가 액세스할 수 있도록 스크립트를 Cloud Storage 버킷에 복사합니다.
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. 기본 서버리스 작업 실행
Lightning Engine 없이 기본 비교를 제공하려면 이전에 업로드한 PySpark 벤치마킹 작업을 Apache Spark용 서버리스 표준 등급에 제출합니다. 복사한 데이터 세트의 경로를 인수로 전달합니다.
다음 명령어를 실행하여 일괄 작업을 실행합니다.
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
작업 모니터링
작업이 실행되는 동안 Cloud Shell 터미널에 PySpark 로그가 스트리밍됩니다. Apache Spark용 서버리스는 컨테이너를 할당하고 Cloud Storage에서 TPC-DS Parquet 데이터 세트를 읽고 복잡한 SQL 계획을 실행합니다.
스크립트가 완료되면 콘솔 출력을 확인합니다. 실행된 각 표준 쿼리에 대한 결과와 타이밍이 다음과 같이 표시됩니다.
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
완료하는 데 걸린 총 시간을 기록해 둡니다. 이것이 기본 런타임 입니다.
5. 서버리스 프리미엄 및 Lightning Engine으로 실행
다음으로 Apache Spark용 서버리스에서 동일한 Spark 작업을 실행하지만 프리미엄 등급을 사용하고 Google의 기본 벡터화된 쿼리 엔진인 Lightning Engine을 사용 설정합니다.
Lightning Engine이 명시적으로 사용 설정된 서버리스에 벤치마크 작업을 제출합니다.
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
결과 비교
작업이 완료될 때까지 기다렸다가 출력을 검토합니다. 동일한 쿼리 결과가 표시됩니다. 완료 시간을 자세히 살펴보세요.
... All benchmark queries completed in 64.24 seconds.
기본 서버리스 실행과 서버리스 Lightning Engine 실행을 비교하면 Lightning Engine이 PySpark 애플리케이션 코드를 변경하지 않고도 기본 C++ 실행 레이어와 백엔드의 벡터화된 처리를 활용하여 그룹화, 집계, 조인을 더 빠르게 실행하는 것을 확인할 수 있습니다.
Lightning Engine은 워크로드가 클수록 성능이 향상되도록 최적화되어 있습니다. 이 예에서는 작은 데이터 세트를 사용하므로 성능 향상이 극적이지 않습니다. 10TB 데이터 세트에서 벤치마크에서 오픈소스 Spark보다 최대 4.3배의 성능 향상이 나타났습니다.
6. Spark UI에서 실행 그래프 비교
런타임 감소는 인상적이지만 쿼리 실행 중에 Spark가 실제로 수행하는 작업을 내부적으로 살펴보겠습니다. 두 작업의 Spark UI 실행 그래프를 검토하여 이 작업을 수행할 수 있습니다.
- 브라우저에서 Google Cloud 콘솔을 엽니다.
- Dataproc > Batches 로 이동합니다.
- 목록에 두 개의 일괄 작업이 표시됩니다. 표준 기본 실행과 프리미엄 등급 실행입니다.
- 실행한 프리미엄 등급 일괄 작업을 클릭한 다음 Spark UI 보기 를 클릭하고 세부정보 보기 를 클릭합니다.
- Spark UI에서 작업 탭으로 이동합니다.
- 완료된 작업의 검색창에
Velox를 입력합니다. VeloxSparkPlanExecApi가 포함된 작업 설명이 많이 표시됩니다. 이는 Lightning Engine에서 사용되는 Velox 기본 실행 엔진을 나타냅니다.
이제 표준 등급 실행에 대해 이 프로세스를 반복합니다.
- Apache Spark용 서버리스 일괄 작업 페이지로 돌아갑니다.
- 표준 등급 일괄 작업의 링크를 클릭한 다음 Spark UI 보기 를 클릭하고 세부정보 보기 를 클릭합니다.
- Spark UI에서 작업 탭으로 이동합니다.
- 완료된 작업의 검색창에
Velox를 입력합니다. - 작업 설명에 Velox API에 대한 언급이 없습니다.
7. 삭제
Google Cloud 계정에 지속적으로 비용이 청구되지 않도록 하려면 이 Codelab 중에 만든 리소스를 삭제합니다.
Cloud Shell에서 Cloud Storage 버킷 및 콘텐츠를 삭제합니다.
gcloud storage rm -r gs://${BUCKET_NAME}
benchmark.py의 로컬 사본을 삭제합니다.
rm benchmark.py
8. 축하합니다
축하합니다. Apache Spark용 벤치마킹 환경을 성공적으로 구축하고 Apache Spark용 서버리스 표준과 Apache Spark용 서버리스 프리미엄을 비교했습니다.
Apache Spark용 서버리스의 새로운 Lightning Engine을 사용 설정하면 Spark 워크로드의 런타임을 줄일 수 있는 방법을 직접 확인하고 Spark UI를 살펴보고 기본 쿼리 엔진을 사용하여 물리적 실행 그래프가 기본 C++ 코드로 변환되는 방법을 확인했습니다.
학습한 내용
- PySpark 데이터 세트 벤치마킹 스크립트를 작성하는 방법
- Apache Spark용 서버리스에 Spark 작업을 제출하는 방법
- Lightning Engine을 사용 설정하는 방법
- Spark UI에서 작업 계획을 비교하는 방법
다음 단계
- Apache Spark용 서버리스 문서 살펴보기
- 기본 쿼리 실행 자격 도구 보기
- GitHub에서 전체 TPC-DS 벤치마킹 쿼리 확인