1. Introdução: Google Dataproc
O Dataproc é um serviço totalmente gerenciado e altamente escalonável para executar o Apache Spark, o Apache Flink, o Presto e muitas outras ferramentas e frameworks de código aberto. Use o Dataproc para modernização de data lakes, ETL / ELT e ciência de dados segura em escala global. O Dataproc também está totalmente integrado a vários serviços do Google Cloud, incluindo BigQuery, Cloud Storage, Vertex AI e Dataplex.
O Dataproc está disponível em três tipos:
- O Dataproc sem servidor permite que você execute jobs do PySpark sem precisar configurar a infraestrutura e o escalonamento automático. O Dataproc sem servidor oferece suporte a cargas de trabalho e sessões / notebooks do PySpark.
- O Dataproc no Google Compute Engine permite gerenciar um cluster Hadoop YARN para cargas de trabalho do Spark baseadas em YARN, além de ferramentas de código aberto, como Flink e Presto. É possível personalizar clusters baseados na nuvem com o escalonamento vertical ou horizontal que você quiser, incluindo escalonamento automático.
- O Dataproc no Google Kubernetes Engine permite configurar clusters virtuais do Dataproc na sua infraestrutura do GKE para enviar jobs do Spark, PySpark, SparkR ou Spark SQL.
2. Crie um cluster do Dataproc em uma VPC do Google Cloud
Nesta etapa, você vai criar um cluster do Dataproc no Google Cloud usando o console do Google Cloud.
Para começar, ative a API de serviço Dataproc no console. Depois de ativar, pesquise "Dataproc" na barra de pesquisa e clique em Criar cluster.
Selecione Cluster no Compute Engine para usar as VMs do Google Compute Engine(GCE) como a infraestrutura subjacente para executar clusters do Dataproc.
Agora você está na página de criação de clusters.
Nesta página:
- Forneça um nome exclusivo para o cluster.
- Selecione a região específica. Você também pode selecionar uma zona, mas o Dataproc permite escolher uma automaticamente para você. Neste codelab, selecione "us-central1". e "us-central1-c".
- Selecione a opção "Padrão" tipo de cluster. Isso garante que haja um nó mestre.
- Na guia Configurar nós, confirme se o número de workers criados será dois.
- Na seção Personalizar cluster, marque a caixa ao lado de Ativar Gateway de Componentes. Isso permite o acesso às interfaces da Web no cluster, incluindo a UI do Spark, o gerenciador de nós do Yarn e os notebooks do Jupyter.
- Em Componentes opcionais, selecione Notebook do Jupyter. Isso configura o cluster com um servidor de notebooks do Jupyter.
- Deixe tudo como está e clique em Criar cluster.
Isso ativará um cluster do Dataproc.
3. Iniciar o cluster e usar SSH nele
Quando o status do cluster mudar para Em execução, clique no nome do cluster no console do Dataproc.
Clique na guia Instância de VM para ver o nó mestre e os dois nós de trabalho do cluster.
Clique em SSH ao lado do nó mestre para fazer login nele.
Execute os comandos do hdfs para ver a estrutura do diretório.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. Interfaces da Web e gateways de componentes
No console do cluster do Dataproc, clique no nome do seu cluster e depois na guia INTERFACES DA WEB.
Ela mostra as interfaces da Web disponíveis, incluindo o Jupyter. Clique em Jupyter para abrir um notebook do Jupyter. É possível usá-lo para criar, no PySpark, notebooks armazenados no GCS. para armazenar seu notebook no Google Cloud Storage e abrir um notebook PySpark para usar neste codelab.
5. Monitorar e observar jobs do Spark
Com o cluster do Dataproc em execução, crie um job em lote do PySpark e envie-o para o cluster do Dataproc.
Crie um bucket do Google Cloud Storage (GCS) para armazenar o script PySpark. Crie o bucket na mesma região do cluster do Dataproc.
Agora que o bucket do GCS foi criado, copie o arquivo a seguir para ele.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
Esse script cria um DataFrame de amostra do Spark e o grava como uma tabela do Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
Envie esse script como um job em lote do Spark no Dataproc. Clique em Jobs no menu de navegação à esquerda e depois em Enviar job.
Forneça um ID do job e uma região. Selecione seu cluster e forneça o local do GCS do script Spark que você copiou. Esse job será executado como um job em lote do Spark no Dataproc.
Em Propriedades, adicione a chave spark.submit.deployMode
e o valor client
para garantir que o driver seja executado no nó mestre do Dataproc e não nos nós de trabalho. Clique em Enviar para encaminhar o job em lote ao Dataproc.
O script do Spark criará um DataFrame e gravará em uma tabela do Hive test_table_1
.
Depois que o job for executado, as instruções de exibição do console vão aparecer na guia Monitoramento.
Agora que a tabela Hive foi criada, envie outro job de consulta do Hive para selecionar o conteúdo da tabela e exibi-la no console.
Crie outro job com as seguintes propriedades:
Observe que Job Type está definido como Hive e o tipo de origem da consulta é Query Text, o que significa que vamos escrever toda a instrução do HiveQL na caixa de texto Query Text.
Envie o job, mantendo o restante dos parâmetros como padrão.
Observe como o HiveQL seleciona todos os registros e exibe-o no console.
6. Escalonamento automático
O escalonamento automático é a tarefa de estimar o valor "certo" número de nós de trabalho do cluster para uma carga de trabalho.
A API Dataproc autoscalingPolicies oferece um mecanismo para automatizar o gerenciamento de recursos do cluster e permite o escalonamento automático de VM de worker do cluster. Uma política de escalonamento automático é uma configuração reutilizável que descreve o escalonamento dos workers do cluster que usam essa política. Ela define os limites, a frequência e a intensidade do escalonamento para fornecer controle detalhado sobre os recursos do cluster durante todo o ciclo de vida dele.
As políticas de escalonamento automático do Dataproc são escritas usando arquivos YAML, que são transmitidos no comando da CLI para criar o cluster ou selecionados de um bucket do GCS quando um cluster é criado no console do Cloud.
Confira um exemplo de política de escalonamento automático do Dataproc :
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. Configurar componentes opcionais do Dataproc
Isso ativará um cluster do Dataproc.
Quando você cria um cluster do Dataproc, os componentes padrão do ecossistema Apache Hadoop são instalados automaticamente. Consulte a Lista de versões do Dataproc. Ao criá-lo, você pode instalar outros componentes, chamados componentes opcionais.
Ao criar o cluster do Dataproc no console, ativamos componentes opcionais e selecionamos Notebook do Jupyter como o componente opcional.
8. Limpar recursos
Para limpar o cluster, clique em Parar depois de selecioná-lo no console do Dataproc. Quando o cluster for interrompido, clique em Excluir para excluí-lo.
Após a exclusão do cluster do Dataproc, exclua os buckets do GCS em que o código foi copiado.
Para limpar os recursos e interromper cobranças indesejadas, o cluster do Dataproc precisa ser interrompido primeiro e depois excluído.
Antes de parar e excluir o cluster, garanta que todos os dados gravados no armazenamento HDFS sejam copiados para o GCS para armazenamento durável.
Para interromper o cluster, clique em Parar.
Quando o cluster for interrompido, clique em Excluir para excluí-lo.
Na caixa de diálogo de confirmação, clique em Excluir para excluir o cluster.