1. Introdução

Última atualização:10/06/2024
Contexto
Os dispositivos da Internet das Coisas (IoT), que vão de soluções para casas inteligentes a sensores industriais, geram grandes quantidades de dados na borda da rede. Esses dados são muito úteis para vários casos de uso, como monitoramento, rastreamento, diagnóstico, vigilância, personalização, otimização de frota e muito mais. O Google Managed Service para Apache Kafka oferece uma maneira escalonável e durável de ingerir e armazenar esse fluxo contínuo de dados de maneira compatível com OSS, fácil de usar e segura. Já o Google Cloud Dataproc permite o processamento desses grandes conjuntos de dados para análise de dados usando clusters do Apache Spark e do Hadoop.
O que você vai criar
Neste codelab, você vai criar um pipeline de tratamento de dados de IoT usando o Google Serviço Gerenciado para Apache Kafka, o Dataproc, o Python e o Apache Spark, que faz análises em tempo real. O pipeline vai:
- Publicar dados de dispositivos IoT em um cluster gerenciado do Kafka usando VMs do GCE
- Transmitir dados do cluster do Kafka gerenciado para um cluster do Dataproc
- Processar dados usando um job do Spark Streaming do Dataproc
O que você vai aprender
- Como criar clusters do Google Managed Kafka e do Dataproc
- Como executar jobs de streaming usando o Dataproc
O que é necessário
- Uma conta ativa do GCP com um projeto configurado. Se você não tiver uma, inscreva-se para um teste sem custo financeiro.
- A CLI gcloud instalada e configurada. Siga as instruções para instalar a CLI gcloud no seu SO.
- APIs ativadas para Kafka gerenciado pelo Google e Dataproc no projeto do GCP.
2. Visão geral
Neste codelab, vamos acompanhar a história de uma empresa fictícia, a DemoIOT Solutions. A DemoIOT Solutions oferece dispositivos de sensor que medem e transmitem dados de temperatura, umidade, pressão, nível de luz e localização. Eles querem configurar pipelines que processem esses dados para mostrar estatísticas em tempo real aos clientes. Com esses pipelines, eles podem oferecer uma ampla variedade de serviços aos clientes, como monitoramento, sugestões automatizadas, alertas e insights sobre os locais em que os clientes instalaram os sensores.
Para isso, vamos usar uma VM do GCE para simular o dispositivo IoT. O dispositivo vai publicar dados em um tópico do Kafka no cluster do Kafka gerenciado do Google, que será lido e processado por um job de streaming do Dataproc. A configuração de pré-requisitos e as páginas a seguir vão orientar você em todas essas etapas.
Configuração de pré-requisito
- Encontre o nome e o número do seu projeto. Consulte Encontrar o nome, o número e o ID do projeto para referência.
- Sub-rede VPC. Isso permite a conectividade entre a VM do GCE, o cluster do Kafka e o cluster do Dataproc. Siga estas instruções para listar as sub-redes atuais usando a CLI gcloud. Se necessário, siga as etapas para criar uma rede VPC de modo automático, que vai criar uma rede VPC com sub-rede em cada região do Google Cloud. No entanto, para fins deste codelab, vamos usar uma sub-rede de uma única região.
- Nessa sub-rede, verifique se há uma regra de firewall que permita toda a entrada de tcp:22, que é o SSH necessário. Essa regra estará disponível para seleção na seção "Regras de firewall" ao criar uma rede. Portanto, selecione-a.
- Bucket do GCS. Você precisa de acesso a um bucket do Cloud Storage para armazenar recursos de jobs do Dataproc e manter os dados processados. Se você não tiver uma, crie uma no seu projeto do GCP.
Preencher variáveis de ambiente
No terminal em que você executa a CLI gcloud, preencha essas variáveis de ambiente para que possam ser usadas mais tarde.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Substitua:
<project-id>com o nome do projeto do GCP que você configurou.<project-number>com o nome do número do projeto da etapa 1 dos pré-requisitos.<region>com o nome de uma região em Regiões e zonas disponíveis que você quer usar. Por exemplo, podemos usarus-central1.<zone>com o nome da zona em Regiões e zonas disponíveis na região selecionada anteriormente. Por exemplo, se você selecionouus-central1como a região, pode usarus-central1-fcomo a zona. Essa zona será usada para criar a VM do GCE que simula dispositivos IoT. Verifique se a zona está na região escolhida.<subnet-path>com o caminho completo da sub-rede da etapa 2 dos pré-requisitos. O valor precisa estar no formato:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.<bucket-name>com o nome do bucket do GCS da etapa 3 dos pré-requisitos.
3. Configurar o Kafka gerenciado pelo Google
Esta seção configura um cluster do Kafka gerenciado pelo Google, que implanta o servidor do Kafka e cria um tópico nele em que os dados de IoT podem ser publicados e lidos depois de se inscreverem nele. A DemoIOT Solutions pode configurar esse cluster para que todos os dispositivos publiquem dados nele.
Criar um cluster do Kafka gerenciado
- Crie o cluster do Kafka gerenciado. Aqui, o nome do cluster é
kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Você receberá uma resposta semelhante a esta:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
A criação do cluster leva de 20 a 30 minutos. Aguarde a conclusão dessa operação.
Criar um tópico
- Crie um tópico gerenciado do Kafka no cluster. Aqui, o nome do tópico é
kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Você vai receber uma saída semelhante a esta:
Created topic [kafka-iot-topic].
4. Configurar um publisher
Para publicar no cluster do Kafka gerenciado, configuramos uma instância de VM do Google Compute Engine que pode acessar a VPC que contém a sub-rede usada pelo cluster do Kafka gerenciado. Essa VM simula os dispositivos de sensor fornecidos pela DemoIOT Solutions.
Etapas
- Crie a instância de VM do Google Compute Engine. Aqui, o nome da VM do GCE é
publisher-instance.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Conceda à conta de serviço padrão do Google Compute Engine as permissões para usar o serviço gerenciado para Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Use SSH para se conectar à VM. Como alternativa, use o Console do Google Cloud para SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Instale o Java para executar as ferramentas de linha de comando do Kafka e faça o download do binário do Kafka usando estes comandos.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Faça o download da biblioteca de autenticação do Kafka gerenciado e das dependências dela e configure as propriedades do cliente Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Para mais detalhes sobre a configuração da máquina do editor, consulte Configurar uma máquina cliente.
5. Publicar no Kafka gerenciado
Agora que o editor está configurado, podemos usar a linha de comando do Kafka nele para publicar alguns dados fictícios da VM do GCE (simulando dispositivos IoT da DemoIOT Solutions) no cluster gerenciado do Kafka.
- Como nos conectamos à instância de VM do GCE por SSH, precisamos preencher novamente a variável
PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>
Substitua:
<project-id>com o nome do projeto do GCP que você configurou.<region>com a região em que o cluster do Kafka foi criado
- Use o comando
managed-kafka clusters describepara receber o endereço IP do servidor de bootstrap do Kafka. Esse endereço pode ser usado para se conectar ao cluster do Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Liste os tópicos no cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Você vai encontrar a seguinte saída, que contém o tópico kafka-iot-topic criado anteriormente.
__remote_log_metadata
kafka-iot-topic
- Copie e cole este script em um novo arquivo
publish_iot_data.sh. Para criar um arquivo na VM do GCE, use uma ferramenta comovimounano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explicação
- Esse script cria mensagens JSON com leituras simuladas de sensores que têm ID do dispositivo, carimbo de data/hora, dados do sensor (temperatura, umidade, pressão, luz), informações de localização (latitude, longitude), status do dispositivo (bateria, sinal, tipo de conexão) e alguns metadados.
- Ele gera um fluxo contínuo de mensagens de um número definido de dispositivos exclusivos, cada um enviando dados em um intervalo de tempo especificado, imitando dispositivos de IoT do mundo real. Aqui, publicamos dados de 10 dispositivos que produzem 20 leituras cada, em um intervalo de tempo de 10 segundos.
- Ele também publica todos os dados gerados no tópico do Kafka usando a ferramenta de linha de comando do produtor do Kafka.
- Instale algumas dependências usadas pelo script: o pacote
bcpara cálculos matemáticos e o pacotejqpara processamento de JSON.
sudo apt-get install bc jq
- Modifique o script para que ele seja executável e execute-o. Ele leva cerca de 2 minutos para ser executado.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Para verificar se os eventos foram publicados com sucesso, execute este comando, que vai imprimir todos os eventos. Pressione <control-c> para sair.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Configurar o cluster do Dataproc
Esta seção cria um cluster do Dataproc na sub-rede da VPC em que o cluster gerenciado do Kafka está presente. Esse cluster será usado para executar jobs que geram as estatísticas e os insights em tempo real necessários para a DemoIOT Solutions.
- Criar um cluster de Dataproc. Aqui, o nome do cluster é
dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Você receberá uma resposta semelhante a esta:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
A criação do cluster pode levar de 10 a 15 minutos. Aguarde a conclusão da operação e verifique se o cluster está no estado RUNNING descrevendo-o.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Processar mensagens do Kafka usando o Dataproc
Nesta última seção, você vai enviar um job do Dataproc que processa as mensagens publicadas usando o Spark Streaming. Esse trabalho gera estatísticas e insights em tempo real que podem ser usados pela DemoIOT Solutions.
- Execute este comando para criar localmente o arquivo de job do PySpark de streaming chamado
process_iot.py.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explicação
- Esse código configura um job de streaming estruturado do PySpark para ler dados de um tópico especificado do Kafka. Ele usa o endereço de bootstrap do servidor Kafka fornecido e as configurações do Kafka carregadas de um arquivo de configuração do GCS para se conectar e autenticar com o broker do Kafka.
- Primeiro, ele lê os dados brutos do Kafka como um fluxo de matrizes de bytes, converte essas matrizes em strings e aplica o
json_schemausando o StructType do Spark para especificar a estrutura dos dados (ID do dispositivo, carimbo de data/hora, local, dados do sensor etc.). - Ele imprime as 10 primeiras linhas no console para visualização, calcula a temperatura média por sensor e grava todos os dados no bucket do GCS no formato
avro. O Avro é um sistema de serialização de dados baseado em linhas que armazena dados estruturados de maneira eficiente em um formato binário compacto e definido por esquema, oferecendo evolução de esquema, neutralidade de linguagem e alta compactação para processamento de dados em grande escala.
- Crie o arquivo
client.propertiese preencha a variável de ambiente para o endereço de inicialização do servidor Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Faça upload dos arquivos
process_iot.pyeclient.propertiespara seu bucket do Cloud Storage para que possam ser usados pelo job do Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Copie alguns jars de dependência para o job do Dataproc no bucket do GCS. Esse diretório contém jars necessários para executar jobs do Spark Streaming com o Kafka, além da biblioteca de autenticação do Kafka gerenciado e das dependências dela, extraídas de Configurar uma máquina cliente.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Envie o job do Spark para o cluster do Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Os registros do driver do Spark serão impressos. Também é possível ver essas tabelas registradas no console e os dados armazenados no bucket do GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Limpar
Siga as etapas para liberar recursos depois de concluir o codelab.
- Exclua o cluster do Kafka gerenciado, a VM do GCE do editor e o cluster do Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Exclua a sub-rede e a rede VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Exclua o bucket do GCS se não quiser mais usar os dados.
gcloud storage rm --recursive $BUCKET
9. Parabéns
Parabéns! Você criou um pipeline de tratamento de dados de IoT com o Manage Kafka e o Dataproc, que ajuda a DemoIOT Solutions a receber insights em tempo real sobre os dados publicados pelos dispositivos.
Você criou um cluster gerenciado do Kafka, publicou eventos de IoT nele e executou um job do Dataproc que usou o streaming do Spark para processar esses eventos em tempo real. Agora você conhece as principais etapas necessárias para criar pipelines de dados usando o Kafka gerenciado e o Dataproc.