Acelerar o Spark com o Serviço Gerenciado para Apache Spark e o Lightning Engine

1. Introdução

Neste codelab, você vai conhecer os benefícios de desempenho do mecanismo de execução nativo do Serviço gerenciado para Apache Spark, o Lightning Engine, e examinar como ele otimiza suas cargas de trabalho do Spark no Apache Spark gerenciado sem servidor até 4,9 vezes mais rápido.

O Lightning Engine usa o Velox e o Apache Gluten. O Velox é um mecanismo C++ de alto desempenho para processamento de dados. O Apache Gluten é uma camada intermediária responsável por converter jobs do Spark baseados em JVM em código C++ que pode ser executado pelo Velox.

Esta demonstração usa o TPC-DS, um comparativo de mercado padrão do setor projetado para avaliar a performance de sistemas de suporte a decisões. Você vai enviar um job PySpark de referência para consultar um exemplo de conjunto de dados TPC-DS usando o nível Standard sem servidor. Em seguida, vai executar exatamente o mesmo job usando o nível Premium com o Lightning Engine ativado. Por fim, vai comparar o tempo de execução e analisar a interface do Spark para visualizar a diferença nos gráficos de execução do Spark acelerada por hardware.

O custo estimado para executar este codelab é inferior a US$1,00, supondo que os recursos sejam limpos imediatamente, conforme descrito na seção Limpeza.

Atividades deste laboratório

  • Crie um bucket do Cloud Storage para armazenar seus scripts e resultados de comparativo de mercado.
  • Execute um job de processamento de dados PySpark de base usando o nível padrão do Apache Spark sem servidor gerenciado.
  • Execute o mesmo job usando o nível Premium do Apache Spark sem servidor gerenciado com o Lightning Engine
  • Comparar as métricas de tempo de execução
  • Inicie a interface do usuário do servidor de histórico do Spark para comparar os gráficos de execução física nativa.

O que é necessário

2. Antes de começar

Criar um projeto do Google Cloud

  1. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto na nuvem do Google Cloud.
  2. Verifique se o faturamento está ativado para seu projeto do Cloud. Saiba como verificar se o faturamento está ativado em um projeto.

Iniciar o Cloud Shell

O Cloud Shell é um ambiente de linha de comando executado no Google Cloud que vem pré-carregado com as ferramentas necessárias.

  1. Clique em Ativar o Cloud Shell na parte de cima do console do Google Cloud.
  2. Depois de se conectar ao Cloud Shell, verifique sua autenticação:
    gcloud auth list
    
  3. Confirme se o projeto está configurado:
    gcloud config get project
    
  4. Se o projeto não estiver definido como esperado, faça o seguinte:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Ativar APIs

Execute este comando para ativar todas as APIs necessárias para este codelab:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Prepare o ambiente

Nesta etapa, você vai inicializar variáveis de ambiente e criar um bucket do Cloud Storage. Esse bucket vai armazenar o script PySpark que você enviar para os dois níveis do Serverless para Apache Spark.

Definir variáveis de ambiente

Execute os comandos a seguir no Cloud Shell para definir variáveis de ambiente padrão. Vamos usar a região us-central1, mas você pode mudar isso se preferir.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Crie um bucket do Cloud Storage

Crie o bucket para armazenar seus scripts e registros:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

Copiar o conjunto de dados TPC-DS para seu próprio bucket

Nesta etapa, você vai copiar o conjunto de dados TPC-DS de um bucket público para seu próprio bucket do Cloud Storage. Isso garante que seus jobs do PySpark possam ler dados localmente do seu projeto.

Defina variáveis de ambiente para escolher o tamanho e o tipo do conjunto de dados:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

Copie os dados do TPC-DS para seu próprio bucket:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

Criar o script de comparativo de mercado do PySpark

Vamos usar um script PySpark que registra as tabelas padrão do TPC-DS do seu bucket do Cloud Storage e executa cinco consultas padrão originadas do repositório público do Apache Spark. O script aceita o caminho para o conjunto de dados como um argumento.

Crie um arquivo chamado benchmark.py no Cloud Shell. Copie e cole o comando a seguir para gerar o arquivo:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Copie o script para seu bucket do Cloud Storage para que o Serverless para Apache Spark possa acessá-lo:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Executar o job sem servidor de referência

Para fornecer uma comparação de base sem o Lightning Engine, envie o job de comparativo de mercado do PySpark que você fez upload antes para o nível Standard do Serverless para Apache Spark. Vamos transmitir o caminho para o conjunto de dados que você copiou como um argumento.

Execute o comando a seguir para executar o job em lote:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

Monitorar o job

Enquanto o job é executado, os registros do PySpark são transmitidos no terminal do Cloud Shell. O Serverless para Apache Spark está alocando contêineres, lendo o conjunto de dados TPC-DS Parquet do Cloud Storage e executando os planos SQL complexos.

Quando o script for concluído, observe a saída do console. Você vai ver resultados e tempos para cada consulta padrão executada, semelhantes a estes:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Anote o total de segundos que levou para concluir. Esse é o tempo de execução de referência.

5. Executar com o Serverless Premium e o Lightning Engine

Em seguida, você vai executar o mesmo job do Spark no Apache Spark gerenciado sem servidor, mas usando o nível Premium e ativando o mecanismo de consulta nativo e vetorizado do Google: o Lightning Engine.

Envie o job de comparativo de mercado para o Serverless com o Lightning Engine explicitamente ativado:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Comparar os resultados

Aguarde a conclusão do job e examine a saída. Você vai ver os mesmos resultados da consulta. Observe atentamente o tempo de conclusão:

...
All benchmark queries completed in 64.24 seconds.

Ao comparar o job de referência com o do Lightning Engine, você vai notar que o Lightning Engine executa o agrupamento, as agregações e as junções mais rápido usando uma camada de execução nativa em C++ e o processamento vetorizado no back-end, sem exigir mudanças no código do aplicativo PySpark.

O Lightning Engine é otimizado para aumentar o desempenho quanto maior for a carga de trabalho. Neste exemplo, estamos usando um conjunto de dados pequeno, então o aumento de desempenho não é tão significativo quanto poderia ser. Em um conjunto de dados de 10 TB, os comparativos mostram uma melhoria de performance de até 4,9 vezes em relação ao Spark de código aberto.

6. Comparar gráficos de execução na interface do Spark

A redução no tempo de execução é impressionante, mas vamos analisar o que está acontecendo para saber o que o Spark realmente faz durante a execução da consulta. Para isso, examine os gráficos de execução da interface do Spark para os dois jobs.

  1. Abra o Console do Google Cloud no seu navegador.
  2. Acesse Apache Spark gerenciado > Lotes.
  3. Você vai ver dois lotes na lista: a execução padrão de comparativo e a execução do nível Premium.
  4. Clique no lote do nível Premium que você executou e em Acessar a interface do Spark e Ver detalhes.
  5. Na interface do Spark, acesse a guia Jobs.
  6. Em Trabalhos concluídos, na caixa de pesquisa, digite Velox.
  7. Você vai encontrar muitas descrições de vagas que incluem VeloxSparkPlanExecApi. Isso se refere ao mecanismo de execução nativo do Velox usado pelo Lightning Engine.

Agora, repita esse processo para a execução do nível Standard:

  1. Volte para a página "Lotes do Serverless para Apache Spark".
  2. Clique no link do lote do nível Standard e em Acessar a interface do Spark e Ver detalhes.
  3. Na interface do Spark, acesse a guia Jobs.
  4. Em Trabalhos concluídos, na caixa de pesquisa, digite Velox.
  5. Não há menção à API Velox nas descrições das vagas.

7. Limpar

Para evitar cobranças contínuas na sua conta do Google Cloud, exclua os recursos criados durante este codelab.

No Cloud Shell, exclua o bucket do Cloud Storage e o conteúdo dele:

gcloud storage rm -r gs://${BUCKET_NAME}

Exclua a cópia local de benchmark.py:

rm benchmark.py

8. Parabéns

Parabéns! Você criou um ambiente de comparativo de mercado para o Apache Spark e comparou o Apache Spark sem servidor gerenciado Standard com o Apache Spark sem servidor gerenciado Premium.

Você viu em primeira mão como ativar o novo Lightning Engine do Managed Apache Spark sem servidor pode reduzir o tempo de execução da sua carga de trabalho do Spark e explorou a interface do Spark para ver como o gráfico de execução física é transformado em código C++ nativo usando o Native Query Engine.

O que você aprendeu

  • Como escrever um script de comparativo de dados do PySpark.
  • Como enviar jobs do Spark para o Apache Spark sem servidor gerenciado.
  • Como ativar o Lightning Engine.
  • Como comparar planos de jobs na interface do Spark.

Próximas etapas