1. Visão geral: Google Dataproc
Dataproc é um serviço totalmente gerenciado e altamente escalonável para executar o Apache Spark, o Apache Flink, o Presto, além de ferramentas e frameworks de código aberto. Use o Dataproc para modernização do data lake, ETL / ELT e ciência de dados segura em escala planetária. O Dataproc também é totalmente integrado a vários serviços do Google Cloud, incluindo BigQuery, Cloud Storage, Vertex AI e Dataplex.
O Dataproc está disponível em três versões:
- O Dataproc sem servidor permite executar jobs do PySpark sem precisar configurar a infraestrutura e o escalonamento automático. O Dataproc sem servidor oferece suporte a cargas de trabalho em lote e sessões / notebooks do PySpark.
- O Dataproc no Google Compute Engine permite gerenciar um cluster Hadoop Hadoop para cargas de trabalho do Spark baseadas em YARN, além de ferramentas de código aberto, como Flink e Presto. É possível personalizar seus clusters baseados na nuvem com o escalonamento vertical ou horizontal que você quiser, incluindo o escalonamento automático.
- O Dataproc no Google Kubernetes Engine permite configurar clusters virtuais do Dataproc na infraestrutura do GKE para o envio de jobs do Spark, PySpark, SparkR ou Spark SQL.
Neste codelab, você vai aprender várias maneiras diferentes de consumir o Dataproc sem servidor.
O Apache Spark foi originalmente criado para ser executado em clusters do Hadoop e usou o YARN como o gerenciador de recursos. A manutenção de clusters Hadoop requer um conjunto específico de conhecimentos e a garantia de que muitos botões diferentes nos clusters estejam configurados corretamente. Isso é além de um conjunto separado de botões que o Spark também exige que o usuário defina. Isso leva a muitos cenários em que os desenvolvedores gastam mais tempo configurando a infraestrutura do que trabalhando no código do Spark.
O Dataproc sem servidor elimina a necessidade de configurar manualmente clusters Hadoop ou Spark. O Dataproc sem servidor não é executado no Hadoop e usa a própria alocação de recursos dinâmicos para determinar os requisitos de recursos, incluindo o escalonamento automático. Um pequeno subconjunto de propriedades do Spark ainda é personalizável com o Dataproc sem servidor, mas na maioria dos casos não é necessário ajustá-las.
2. Configurar
Comece configurando seu ambiente e os recursos usados neste codelab.
Crie um projeto do Google Cloud. Você pode usar um projeto atual.
Clique no Cloud Shell na barra de ferramentas do console do Cloud para abrir.

O Cloud Shell oferece um ambiente de shell pronto para uso que pode ser usado neste codelab.

O Cloud Shell vai definir o nome do projeto por padrão. Verifique executando echo $GOOGLE_CLOUD_PROJECT. Se o ID do projeto não aparecer na saída, defina-o.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Defina uma região do Compute Engine para seus recursos, como us-central1 ou europe-west2.
export REGION=<your-region>
Ativar APIs
O codelab usa as seguintes APIs:
- BigQuery
- Dataproc
Ative as APIs necessárias. Isso leva cerca de um minuto, e uma mensagem de sucesso aparece quando concluída.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Configurar o acesso à rede
O Dataproc sem servidor requer que o Acesso particular do Google esteja ativado na região em que os jobs do Spark serão executados, já que os drivers e executores do Spark só têm IPs particulares. Execute o comando a seguir para ativá-lo na sub-rede default.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
É possível verificar se o Acesso particular do Google está ativado usando o comando a seguir, que vai gerar True ou False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Criar um bucket de armazenamento
Crie um bucket de armazenamento que será usado para armazenar os recursos criados neste codelab.
Escolha um nome para o bucket. Os nomes dos buckets precisam ser exclusivos globalmente para todos os usuários.
export BUCKET=<your-bucket-name>
Crie o bucket na região em que você pretende executar os jobs do Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
É possível conferir se o bucket está disponível no Cloud Storage console. Você também pode executar gsutil ls para conferir o bucket.
Crie um servidor de histórico permanente
A interface do Spark oferece um conjunto avançado de ferramentas de depuração e insights sobre jobs do Spark. Para conferir a interface do Spark para jobs concluídos do Dataproc sem servidor, crie um cluster do Dataproc de nó único para usar como um servidor de histórico permanente.
Defina um nome para o servidor do histórico persistente.
PHS_CLUSTER_NAME=my-phs
Execute o comando a seguir.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
A interface do Spark e o servidor de histórico permanente serão explicados com mais detalhes mais adiante no codelab.
3. Executar jobs do Spark sem servidor com lotes do Dataproc
Neste exemplo, você vai trabalhar com um conjunto de dados do conjunto de dados públicos do Citi Bike de Nova York (NYC). O NYC Citi Bikes é um sistema de aluguel de bicicletas pago em Nova York. Você vai executar algumas transformações simples e imprimir os 10 IDs de estação mais famosos do Citi Bike. Este exemplo também usa o conector de código aberto spark-bigquery-connector para ler e gravar dados perfeitamente entre o Spark e o BigQuery.
Clone o repositório do GitHub a seguir e cd no diretório que contém o arquivo citibike.py.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Envie o job para o Spark sem servidor usando o SDK do Cloud, disponível no Cloud Shell por padrão. Execute o comando a seguir no shell, que usa o SDK Cloud e a API Dataproc Batches para enviar jobs do Spark sem servidor.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Para detalhar:
gcloud dataproc batches submitfaz referência à API Dataproc Batches.pysparkindica que você está enviando um job do PySpark.--batché o nome do job. Se não for informado, um UUID gerado aleatoriamente será usado.--region=${REGION}é a região geográfica em que o job será processado.--deps-bucket=${BUCKET}é o local em que o arquivo Python local é enviado antes de ser executado no ambiente sem servidor.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarinclui o jar do conector spark-bigquery-connector no ambiente de execução do Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}é o nome totalmente qualificado do servidor de histórico permanente. É aqui que os dados de eventos do Spark (separados da saída do console) são armazenados e podem ser visualizados na interface do Spark.- O
--à direita indica que tudo além disso será argumentos de execução para o programa. Nesse caso, você está enviando o nome do bucket, conforme exigido pelo job.
A saída a seguir vai aparecer quando o lote for enviado.
Batch [citibike-job] submitted.
Depois de alguns minutos, você verá a saída a seguir com os metadados do job.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Na próxima seção, você vai aprender a localizar os registros desse job.
Recursos adicionais
Com o Spark sem servidor, você tem outras opções para executar seus jobs.
- É possível criar uma imagem do Docker personalizada em que o job é executado. Essa é uma ótima maneira de incluir dependências adicionais, incluindo bibliotecas Python e R.
- É possível conectar uma instância do Metastore do Dataproc ao job para acessar os metadados do Hive.
- Para ter mais controle, o Dataproc sem servidor oferece suporte à configuração de um pequeno conjunto de propriedades do Spark.
4. Métricas e observabilidade do Dataproc
O console de lotes do Dataproc lista todos os jobs do Dataproc sem servidor. No console, você verá o ID do lote, o local, o status, o horário de criação, o tempo decorrido e o tipo de cada job. Clique no ID do lote do job para conferir mais informações sobre ele.
Nesta página, você verá informações como o Monitoramento , que mostra quantos executores do Spark em lote o job usou ao longo do tempo (indicando o quanto ele foi escalonado automaticamente).
Na guia Detalhes , você verá mais metadados sobre o job, incluindo argumentos e parâmetros enviados com ele.
Também é possível acessar todos os registros nessa página. Quando os jobs do Dataproc sem servidor são executados, três conjuntos diferentes de registros são gerados:
- Nível de serviço
- Saída do console
- Registro de eventos do Spark
Nível de serviço inclui registros gerados pelo serviço Dataproc sem servidor. Isso inclui coisas como o Dataproc sem servidor solicitando CPUs extras para escalonamento automático. Para conferir esses registros, clique em Ver registros , que vai abrir o Cloud Logging.
A saída do console pode ser visualizada em Saída. Essa é a saída gerada pelo job, incluindo metadados que o Spark imprime ao iniciar um job ou qualquer instrução de impressão incorporada ao job.
O registro de eventos do Spark pode ser acessado na interface do Spark. Como você forneceu um servidor de histórico permanente para o job do Spark, é possível acessar a interface do Spark clicando em Acessar o servidor de histórico do Spark, que contém informações sobre os jobs do Spark executados anteriormente. Saiba mais sobre a interface do Spark na documentação oficial do Spark.
5. Modelos do Dataproc: BQ -> GCS
Os modelos do Dataproc são ferramentas de código aberto que ajudam a simplificar ainda mais as tarefas de processamento de dados na nuvem. Eles servem como um wrapper para o Dataproc sem servidor e incluem modelos para muitas tarefas de importação e exportação de dados, incluindo:
BigQuerytoGCSeGCStoBigQueryGCStoBigTableGCStoJDBCeJDBCtoGCSHivetoBigQueryMongotoGCSeGCStoMongo
A lista completa está disponível no README.
Nesta seção, você vai usar os modelos do Dataproc para exportar dados do BigQuery para o GCS.
Clone o repositório
Clone o repositório e mude para a pasta python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Configurar o ambiente
Agora, defina as variáveis de ambiente. Os modelos do Dataproc usam a variável de ambiente GCP_PROJECT para o ID do projeto. Portanto, defina-a como GOOGLE_CLOUD_PROJECT..
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Sua região já deve estar definida no ambiente. Caso contrário, defina-a aqui.
export REGION=<region>
Os modelos do Dataproc usam o conector spark-bigquery-conector para processar jobs do BigQuery e exigem que o URI seja incluído em uma variável de ambiente JARS. Defina a variável JARS.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Configurar parâmetros do modelo
Defina o nome de um bucket de preparo para o serviço usar.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Em seguida, defina algumas variáveis específicas do job. Para a tabela de entrada, você vai referenciar novamente o conjunto de dados do BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
É possível escolher csv, parquet, avro ou json. Para este codelab, escolha CSV. Na próxima seção, mostramos como usar os modelos do Dataproc para converter tipos de arquivo.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Defina o modo de saída como overwrite. É possível escolher entre overwrite, append, ignore ou errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Defina o local de saída do GCS como um caminho no bucket.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Executar o modelo
Execute o modelo BIGQUERYTOGCS especificando-o abaixo e fornecendo os parâmetros de entrada definidos.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
A saída será bastante ruidosa, mas depois de cerca de um minuto você verá o seguinte.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Para verificar se os arquivos foram gerados, execute o comando a seguir.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
O Spark grava em vários arquivos por padrão, dependendo da quantidade de dados. Nesse caso, você verá aproximadamente 30 arquivos gerados. Os nomes dos arquivos de saída do Spark são formatados com part- seguido por um número de cinco dígitos (indicando o número da peça) e uma string hash. Para grandes quantidades de dados, o Spark geralmente grava em vários arquivos. Um exemplo de nome de arquivo é part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Modelos do Dataproc: CSV para Parquet
Agora, você vai usar os modelos do Dataproc para converter dados no GCS de um tipo de arquivo para outro usando o GCSTOGCS. Esse modelo usa o SparkSQL e oferece a opção de enviar uma consulta do SparkSQL para ser processada durante a transformação para processamento adicional.
Confirmar variáveis de ambiente
Confirme se GCP_PROJECT, REGION e GCS_STAGING_BUCKET estão definidos na seção anterior.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Definir parâmetros do modelo
Agora, defina os parâmetros de configuração para GCStoGCS. Comece com o local dos arquivos de entrada. Observe que esse é um diretório e não um arquivo específico, já que todos os arquivos no diretório serão processados. Defina isso como BIGQUERY_GCS_OUTPUT_LOCATION.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Defina o formato do arquivo de entrada.
GCS_TO_GCS_INPUT_FORMAT=csv
Defina o formato de saída desejado. É possível escolher Parquet, JSON, Avro ou CSV.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Defina o modo de saída como overwrite. É possível escolher entre overwrite, append, ignore ou errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Defina o local de saída.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Executar o modelo
Execute o modelo GCStoGCS.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
A saída será bastante ruidosa, mas depois de cerca de um minuto você verá uma mensagem de sucesso como a abaixo.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Para verificar se os arquivos foram gerados, execute o comando a seguir.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Com esse modelo, você também tem a opção de fornecer consultas do SparkSQL transmitindo gcs.to.gcs.temp.view.name e gcs.to.gcs.sql.query para o modelo, permitindo que uma consulta do SparkSQL seja executada nos dados antes de gravar no GCS.
7. Limpar recursos
Para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste codelab:
- Exclua o bucket do Cloud Storage do ambiente que você criou.
gsutil rm -r gs://${BUCKET}
- Exclua o cluster do Dataproc usado para o servidor de histórico permanente.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Exclua os jobs do Dataproc sem servidor. Acesse o console de lotes, clique na caixa ao lado de cada job que você quer excluir e clique em EXCLUIR.
Se você criou um projeto apenas para este codelab, também é possível excluir o projeto:
- No Console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione um e clique em Excluir.
- Na caixa, digite o ID do projeto e clique em desligar para excluir o projeto.
8. A seguir
Os recursos a seguir oferecem outras maneiras de aproveitar o Spark sem servidor:
- Saiba como orquestrar fluxos de trabalho do Dataproc sem servidor usando o Cloud Composer.
- Saiba como integrar o Dataproc sem servidor aos pipelines do Kubeflow.