1. Introducción
En este codelab, explorarás los beneficios de rendimiento del motor de ejecución nativo de Google Cloud Serverless for Apache Spark, Lightning Engine, y examinarás cómo optimiza tus cargas de trabajo de Spark en Serverless for Apache Spark.
Lightning Engine usa Velox y Apache Gluten. Velox es un motor de C++ de alto rendimiento para el procesamiento de datos. Apache Gluten es una capa intermedia responsable de convertir los trabajos de Spark basados en JVM en código C++ que Velox puede ejecutar.
En esta demostración, se usa TPC-DS, una comparativa estándar de la industria diseñada para evaluar el rendimiento de los sistemas de asistencia para la toma de decisiones. Enviarás un trabajo de PySpark de referencia para consultar un conjunto de datos de muestra de TPC-DS con el nivel Estándar de Serverless. Luego, ejecutarás el mismo trabajo con el nivel Premium y Lightning Engine habilitado. Por último, compararás el tiempo de ejecución y explorarás la IU de Spark para visualizar la diferencia en los gráficos de ejecución de Spark acelerados por hardware.
El costo estimado para ejecutar este codelab es inferior a USD 1.00, suponiendo que los recursos se limpien de inmediato, como se describe en la sección Limpieza.
Actividades
- Crea un bucket de Cloud Storage para almacenar tus secuencias de comandos y resultados de comparativas.
- Ejecuta un trabajo de procesamiento de datos de PySpark de referencia con el nivel estándar de Serverless for Apache Spark
- Ejecuta el mismo trabajo con el nivel Premium de Serverless for Apache Spark con Lightning Engine
- Compara las métricas de tiempo de ejecución
- Inicia la IU del servidor de historial de Spark para comparar los gráficos de ejecución física nativos
Requisitos
- Un navegador web, como Chrome
- Un proyecto de Google Cloud con la facturación habilitada
- Conocimientos básicos de Apache Spark y la línea de comandos de Linux
2. Antes de comenzar
Crea un proyecto de Google Cloud
- En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.
- Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información para verificar si la facturación está habilitada en un proyecto.
Inicie Cloud Shell
Cloud Shell es un entorno de línea de comandos que se ejecuta en Google Cloud y que viene precargado con las herramientas necesarias.
- Haz clic en Activar Cloud Shell en la parte superior de la consola de Google Cloud.
- Una vez que te conectes a Cloud Shell, verifica tu autenticación:
gcloud auth list - Confirma que tu proyecto esté configurado:
gcloud config get project - Si tu proyecto no está configurado como se esperaba, configúralo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Habilita las APIs
Ejecuta este comando para habilitar todas las APIs requeridas para este codelab:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. Prepara tu entorno
En este paso, inicializarás variables de entorno y crearás un bucket de Cloud Storage. En este bucket, se almacenará la secuencia de comandos de PySpark que envíes a ambos niveles de Serverless for Apache Spark.
Configura variables de entorno
Ejecuta los siguientes comandos en Cloud Shell para establecer variables de entorno predeterminadas. Usaremos la región us-central1, pero puedes cambiarla si lo prefieres.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Crea un bucket de Cloud Storage
Crea el bucket para almacenar tus secuencias de comandos y registros:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
Copia el conjunto de datos de TPC-DS en tu propio bucket
En este paso, copiarás el conjunto de datos de TPC-DS de un bucket público a tu propio bucket de Cloud Storage. Esto garantiza que tus trabajos de PySpark puedan leer datos de forma local desde tu proyecto.
Configura las variables de entorno para elegir el tamaño y el tipo del conjunto de datos:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
Copia los datos de TPC-DS en tu propio bucket:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
Crea la secuencia de comandos de comparativa de PySpark
Usaremos una secuencia de comandos de PySpark que registra las tablas estándar de TPC-DS desde tu bucket de Cloud Storage y ejecuta 5 consultas estándar provenientes del repositorio público de Apache Spark. La secuencia de comandos acepta la ruta de acceso a tu conjunto de datos como argumento.
Crea un archivo llamado benchmark.py en Cloud Shell. Puedes copiar y pegar el siguiente comando para generar el archivo:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Copia la secuencia de comandos en tu bucket de Cloud Storage para que Serverless for Apache Spark pueda acceder a ella:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. Ejecuta el trabajo sin servidores de referencia
Para proporcionar una comparación de referencia sin Lightning Engine, envía el trabajo de comparativas de PySpark que subiste antes al nivel estándar de Serverless for Apache Spark. Pasaremos la ruta de acceso al conjunto de datos que copiaste como un argumento.
Ejecuta el siguiente comando para ejecutar el trabajo por lotes:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
Supervisa el trabajo
Mientras se ejecuta el trabajo, verás registros de PySpark transmitiéndose en tu terminal de Cloud Shell. Serverless para Apache Spark asigna contenedores, lee el conjunto de datos de TPC-DS en formato Parquet desde Cloud Storage y ejecuta los planes de SQL complejos.
Una vez que se complete la secuencia de comandos, observa el resultado de la consola. Deberías ver los resultados y los tiempos de cada consulta estándar ejecutada, de manera similar a lo siguiente:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
Toma nota de la cantidad total de segundos que tardaste en completar la tarea. Este es tu tiempo de ejecución de referencia.
5. Ejecuta con Serverless Premium y Lightning Engine
A continuación, ejecutarás el mismo trabajo de Spark en Serverless para Apache Spark, pero con el nivel Premium y habilitando el motor de consultas vectorizado nativo de Google: Lightning Engine.
Envía el trabajo comparativo a Serverless con Lightning Engine habilitado de forma explícita:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
Cómo comparar los resultados
Espera a que se complete el trabajo y examina el resultado. Deberías ver los mismos resultados de la búsqueda. Observa con atención el tiempo de finalización:
... All benchmark queries completed in 64.24 seconds.
Si comparas la ejecución de referencia de Serverless con la ejecución de Serverless Lightning Engine, notarás que Lightning Engine ejecuta las agrupaciones, las agregaciones y las uniones más rápido, ya que utiliza una capa de ejecución nativa de C++ y un procesamiento vectorizado en el backend, sin necesidad de realizar ningún cambio en el código de tu aplicación de PySpark.
Lightning Engine está optimizado para aumentar el rendimiento cuanto mayor sea la carga de trabajo. En este ejemplo, usamos un conjunto de datos pequeño, por lo que el aumento en el rendimiento no es tan drástico como podría ser. En un conjunto de datos de 10 TB, las comparativas mostraron una mejora del rendimiento de hasta 4.3 veces en comparación con Spark de código abierto.
6. Cómo comparar gráficos de ejecución en la IU de Spark
La reducción del tiempo de ejecución es impresionante, pero veamos qué sucede en segundo plano cuando Spark ejecuta la consulta. Para ello, examina los gráficos de ejecución de la IU de Spark para ambos trabajos.
- Abre la consola de Google Cloud en tu navegador.
- Navega a Dataproc > Lotes.
- Verás dos lotes en la lista: tu ejecución de referencia estándar y tu ejecución del nivel Premium.
- Haz clic en el lote de nivel Premium que ejecutaste y, luego, en Ver IU de Spark y, luego, en Ver detalles.
- En la IU de Spark, navega a la pestaña Trabajos.
- En Completed Jobs, escribe
Veloxen el cuadro de búsqueda. - Verás muchas descripciones de empleos que incluyen
VeloxSparkPlanExecApi. Esto se refiere al motor de ejecución nativo de Velox que usa Lightning Engine.
Ahora, repite este proceso para la ejecución del nivel estándar:
- Regresa a la página Lotes de Serverless for Apache Spark.
- Haz clic en el vínculo del lote de nivel estándar y, luego, en Ver IU de Spark y, por último, en Ver detalles.
- En la IU de Spark, navega a la pestaña Trabajos.
- En Completed Jobs, escribe
Veloxen el cuadro de búsqueda. - No verás ninguna mención de la API de Velox en las descripciones de los trabajos.
7. Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud, borra los recursos que creaste durante este codelab.
En Cloud Shell, borra el bucket de Cloud Storage y su contenido:
gcloud storage rm -r gs://${BUCKET_NAME}
Borra la copia local de benchmark.py:
rm benchmark.py
8. Felicitaciones
¡Felicitaciones! Creaste correctamente un entorno de comparativas para Apache Spark y comparaste Serverless para Apache Spark Standard con Serverless para Apache Spark Premium.
Viste de primera mano cómo habilitar el nuevo Lightning Engine de Serverless for Apache Spark puede reducir el tiempo de ejecución de tu carga de trabajo de Spark y exploraste la IU de Spark para ver cómo el gráfico de ejecución física se transforma en código C++ nativo con el motor de consultas nativo.
Qué aprendiste
- Cómo escribir una secuencia de comandos de comparativas de conjuntos de datos de PySpark
- Cómo enviar trabajos de Spark a Serverless for Apache Spark
- Cómo habilitar Lightning Engine
- Cómo comparar planes de trabajos en la IU de Spark