1. Visão geral: Google Dataproc
O Dataproc é um serviço totalmente gerenciado e altamente escalonável para executar o Apache Spark, o Apache Flink, o Presto e muitas outras ferramentas e frameworks de código aberto. Use o Dataproc para modernização de data lakes, ETL / ELT e ciência de dados segura em escala global. O Dataproc também está totalmente integrado a vários serviços do Google Cloud, incluindo BigQuery, Cloud Storage, Vertex AI e Dataplex.
O Dataproc está disponível em três tipos:
- O Dataproc sem servidor permite que você execute jobs do PySpark sem precisar configurar a infraestrutura e o escalonamento automático. O Dataproc sem servidor oferece suporte a cargas de trabalho e sessões / notebooks do PySpark.
- O Dataproc no Google Compute Engine permite gerenciar um cluster Hadoop YARN para cargas de trabalho do Spark baseadas em YARN, além de ferramentas de código aberto, como Flink e Presto. É possível personalizar clusters baseados na nuvem com o escalonamento vertical ou horizontal que você quiser, incluindo escalonamento automático.
- O Dataproc no Google Kubernetes Engine permite configurar clusters virtuais do Dataproc na sua infraestrutura do GKE para enviar jobs do Spark, PySpark, SparkR ou Spark SQL.
Neste codelab, você vai aprender várias maneiras de consumir o Dataproc sem servidor.
O Apache Spark foi originalmente criado para ser executado em clusters do Hadoop e usava o YARN como gerenciador de recursos. A manutenção de clusters Hadoop requer um conjunto específico de experiência e a garantia de que muitos botões diferentes nos clusters sejam configurados corretamente. Isso é feito além de um conjunto separado de botões que o Spark também exige que o usuário defina. Isso leva a muitos cenários em que os desenvolvedores passam mais tempo configurando a infraestrutura em vez de trabalhar no código Spark em si.
O Dataproc Serverless elimina a necessidade de configurar manualmente os clusters do Hadoop ou do Spark. O Dataproc sem servidor não é executado no Hadoop e usa a própria alocação dinâmica de recursos para determinar os requisitos de recursos, incluindo o escalonamento automático. Um pequeno subconjunto de propriedades do Spark ainda pode ser personalizada com o Dataproc sem servidor. No entanto, na maioria das instâncias, você não precisará ajustá-las.
2. Configurar
Para começar, você vai configurar o ambiente e os recursos usados neste codelab.
Crie um projeto do Google Cloud. Você pode usar uma que já existe.
Para abrir o Cloud Shell, clique nele na barra de ferramentas do console do Cloud.
O Cloud Shell oferece um ambiente Shell pronto para uso que pode ser utilizado neste codelab.
O Cloud Shell vai definir o nome do projeto por padrão. Para verificar, execute echo $GOOGLE_CLOUD_PROJECT
. Se você não vir o ID do projeto na saída, configure-o.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Defina uma região do Compute Engine para os recursos, como us-central1
ou europe-west2
.
export REGION=<your-region>
Ativar APIs
O codelab usa as seguintes APIs:
- BigQuery
- Dataproc
Ative as APIs necessárias. Isso vai levar cerca de um minuto, e uma mensagem de êxito vai aparecer quando o processo for concluído.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Configurar o acesso à rede
O Dataproc sem servidor requer que o Acesso privado do Google esteja ativado na região onde você vai executar os jobs do Spark, já que os drivers e executores do Spark têm apenas IPs particulares. Execute o comando a seguir para ativá-lo na sub-rede default
.
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
Use o comando a seguir para verificar se o Acesso privado do Google está ativado, o que vai gerar True
ou False
.
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
Criar um bucket de armazenamento
Crie um bucket de armazenamento que será usado para armazenar os recursos criados neste codelab.
Escolha um nome para o bucket. Os nomes de bucket precisam ser globalmente exclusivos para todos os usuários.
export BUCKET=<your-bucket-name>
Crie o bucket na região em que você pretende executar os jobs do Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Confira se o bucket está disponível no console do Cloud Storage. Também é possível executar gsutil ls
para ver seu bucket.
Crie um servidor de histórico permanente
A IU do Spark oferece um conjunto avançado de ferramentas de depuração e insights sobre jobs do Spark. Para visualizar a interface do Spark dos jobs concluídos do Dataproc sem servidor, crie um cluster do Dataproc de nó único para usar como servidor de histórico permanente.
Defina um nome para o servidor do histórico persistente.
PHS_CLUSTER_NAME=my-phs
Execute o comando a seguir.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
A interface do Spark e o servidor de histórico persistente serão explorados com mais detalhes posteriormente no codelab.
3. Executar jobs do Spark sem servidor com lotes do Dataproc
Nesta amostra, você trabalhará com um conjunto de dados do conjunto de dados públicos de viagens da Cidade de Nova York (NYC). O NYC City Bikes é um sistema pago de compartilhamento de bicicletas em Nova York. Você vai realizar algumas transformações simples e imprimir os dez principais IDs de estações do Citrix Bike Este exemplo também usa o spark-bigquery-connector de código aberto para ler e gravar dados facilmente entre o Spark e o BigQuery.
Clone o seguinte repositório do GitHub e cd
no diretório que contém o arquivo citibike.py
.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Envie o job para o Spark sem servidor usando o SDK Cloud, disponível por padrão no Cloud Shell. Execute o seguinte comando no seu shell, que utiliza o SDK Cloud e a API Dataproc Batches para enviar jobs do Spark sem servidor.
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
Para detalhar isso:
gcloud dataproc batches submit
faz referência à API Dataproc Batches.pyspark
indica que você está enviando um job do PySpark.--batch
é o nome do job. Se não for informado, um UUID gerado aleatoriamente será usado.--region=${REGION}
é a região geográfica em que o job será processado.--deps-bucket=${BUCKET}
é o local para onde o arquivo Python local é enviado antes de ser executado no ambiente sem servidor.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
inclui o jar para spark-bigquery-connector no ambiente de execução do Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
é o nome totalmente qualificado do servidor de histórico persistente. É aqui que os dados de eventos do Spark (separados da saída do console) são armazenados e mostrados na interface do Spark.- O
--
à direita indica que o que for além disso será args de tempo de execução para o programa. Nesse caso, você está enviando o nome do bucket, conforme exigido pelo job.
A saída a seguir vai aparecer quando o lote for enviado.
Batch [citibike-job] submitted.
Depois de alguns minutos, você verá a saída a seguir junto com os metadados do job.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Na próxima seção, você vai aprender a localizar os registros deste job.
Outros recursos
Com o Spark Serverless, você tem mais opções para executar seus jobs.
- É possível criar uma imagem do Docker personalizada em que o job será executado. Essa é uma ótima maneira de incluir outras dependências, incluindo bibliotecas Python e R.
- Conecte uma instância do Metastore do Dataproc ao seu job para acessar os metadados do Hive.
- Para ter mais controle, o Dataproc sem servidor é compatível com a configuração de um pequeno conjunto de propriedades do Spark.
4. Métricas e observabilidade do Dataproc
O Console de lotes do Dataproc lista todos os jobs do Dataproc sem servidor. No console, você verá ID do lote, local, status, Horário de criação, Tempo decorrido e Tipo de cada job. Clique no ID do lote do seu job para ver mais informações sobre ele.
Nessa página, você verá informações como o Monitoramento, que mostra quantos Executores de lote do Spark o job usou ao longo do tempo (indicando o quanto ele foi escalonado automaticamente).
Na guia Detalhes, você verá mais metadados sobre o job, incluindo argumentos e parâmetros que foram enviados com ele.
Também é possível acessar todos os registros nessa página. Quando jobs do Dataproc sem servidor são executados, três conjuntos diferentes de registros são gerados:
- Nível de serviço
- Saída do console
- Geração de registros de eventos do Spark
Nível de serviço, inclui registros gerados pelo serviço Dataproc sem servidor. Isso inclui, por exemplo, o Dataproc sem servidor solicitar CPUs extras para escalonamento automático. Clique em Ver registros para abrir o Cloud Logging.
A saída do console pode ser vista em Saída. Essa é a saída gerada pelo trabalho, incluindo metadados que o Spark imprime ao iniciar um trabalho ou quaisquer instruções de impressão incorporadas a ele.
A geração de registros de eventos do Spark pode ser acessada na IU do Spark. Como você forneceu ao seu job do Spark um servidor de histórico persistente, você pode acessar a UI do Spark clicando em Exibir servidor de histórico do Spark, que contém informações sobre os jobs do Spark executados anteriormente. Saiba mais sobre a interface do Spark na documentação oficial do Spark.
5. Modelos do Dataproc: BQ -> GCS
Os modelos do Dataproc são ferramentas de código aberto que ajudam a simplificar ainda mais as tarefas de processamento de dados na nuvem. Eles servem como um wrapper para o Dataproc sem servidor e incluem modelos para muitas tarefas de importação e exportação de dados, incluindo:
BigQuerytoGCS
eGCStoBigQuery
GCStoBigTable
GCStoJDBC
eJDBCtoGCS
HivetoBigQuery
MongotoGCS
eGCStoMongo
A lista completa está disponível em README.
Nesta seção, você vai usar modelos do Dataproc para exportar dados do BigQuery para o GCS.
Clone o repositório
Clone o repositório e mude para a pasta python
.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Configurar o ambiente
Agora você definirá as variáveis de ambiente. Os modelos do Dataproc usam a variável de ambiente GCP_PROJECT
como ID do projeto. Portanto, defina-o como GOOGLE_CLOUD_PROJECT.
.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Defina a região no ambiente anterior. Caso contrário, defina-a aqui.
export REGION=<region>
Os modelos do Dataproc usam o spark-bigquery-conector para processar jobs do BigQuery e exigem que o URI seja incluído em uma variável de ambiente JARS
. Defina a variável JARS
.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Configurar parâmetros do modelo
Defina o nome de um bucket de preparo que será usado pelo serviço.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Em seguida, você definirá algumas variáveis específicas do job. Para a tabela de entrada, você fará referência novamente ao conjunto de dados do BigQuery NYC NYC.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
É possível escolher csv
, parquet
, avro
ou json
. Para este codelab, escolha CSV. Na próxima seção, saiba como usar modelos do Dataproc para converter tipos de arquivo.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Defina o modo de saída como overwrite
. É possível escolher entre overwrite
, append
, ignore
ou errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Defina o local de saída do GCS como um caminho no bucket.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Executar o modelo
Execute o modelo BIGQUERYTOGCS
especificando-o abaixo e fornecendo os parâmetros de entrada definidos.
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
A saída terá bastante ruído, mas após cerca de um minuto você verá o seguinte.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Execute o comando a seguir para confirmar se os arquivos foram gerados.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Por padrão, o Spark grava em vários arquivos, dependendo da quantidade de dados. Nesse caso, serão exibidos aproximadamente 30 arquivos gerados. Os nomes dos arquivos de saída do Spark são formatados com part
- seguido por um número de cinco dígitos (indicando o número da peça) e uma string de hash. Para grandes quantidades de dados, o Spark normalmente grava em vários arquivos. Um exemplo de nome de arquivo é part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
.
6. Modelos do Dataproc: CSV para parquet
Agora você vai usar os modelos do Dataproc para converter dados no GCS de um tipo de arquivo para outro usando o GCSTOGCS. Esse modelo usa o SparkSQL e oferece a opção de enviar uma consulta SparkSQL a ser processada durante a transformação para processamento adicional.
Confirmar variáveis de ambiente
Confirme se GCP_PROJECT
, REGION
e GCS_STAGING_BUCKET
estão definidos na seção anterior.
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
Definir parâmetros do modelo
Agora você vai definir os parâmetros de configuração do GCStoGCS
. Comece pelo local dos arquivos de entrada. Esse é um diretório, e não um arquivo específico, porque todos os arquivos dele serão processados. Defina como BIGQUERY_GCS_OUTPUT_LOCATION
.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Define o formato do arquivo de entrada.
GCS_TO_GCS_INPUT_FORMAT=csv
Defina o formato de saída desejado. As opções são parquet, json, avro ou csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Defina o modo de saída como overwrite
. É possível escolher entre overwrite
, append
, ignore
ou errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Define o local de saída.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Executar o modelo
Execute o modelo GCStoGCS
.
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
A saída será um pouco com ruído, mas, após cerca de um minuto, você vai ver uma mensagem de sucesso como abaixo.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Execute o comando a seguir para confirmar se os arquivos foram gerados.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Com esse modelo, também é possível fornecer consultas SparkSQL transmitindo gcs.to.gcs.temp.view.name
e gcs.to.gcs.sql.query
para o modelo. Isso permite que uma consulta SparkSQL seja executada nos dados antes de gravar no GCS.
7. Limpar recursos
Siga as etapas abaixo para evitar cobranças desnecessárias na sua conta do GCP após a conclusão deste codelab:
- Exclua o bucket do Cloud Storage do ambiente que você criou.
gsutil rm -r gs://${BUCKET}
- Exclua o cluster do Dataproc usado no servidor de histórico permanente.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- Exclua os jobs do Dataproc sem servidor. Vá para o Console de lotes, clique na caixa ao lado de cada job que gostaria de excluir e clique em EXCLUIR.
Se você criou um projeto apenas para este codelab, também é possível excluí-lo:
- No console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione o projeto que você quer excluir e clique em "Excluir".
- Na caixa, digite o ID do projeto e clique em "Encerrar" para excluí-lo.
8. A seguir
Nos recursos a seguir, você encontra mais maneiras de aproveitar o Spark sem servidor:
- Saiba como orquestrar fluxos de trabalho do Dataproc sem servidor usando o Cloud Composer.
- Saiba como integrar o Dataproc sem servidor com os pipelines do Kubeflow.