Processamento de dados de IoT em tempo real usando o Dataproc e o serviço gerenciado do Google para Apache Kafka
Sobre este codelab
1. Introdução
Última atualização:10/06/2024
Contexto
Os dispositivos de Internet das Coisas (IoT), que vão de soluções de casa inteligente a sensores industriais, geram grandes quantidades de dados na borda da rede. Esses dados são inestimáveis para vários casos de uso, como monitoramento, rastreamento, diagnóstico, vigilância, personalização, otimização de frotas e muito mais. O serviço gerenciado do Google para Apache Kafka oferece uma maneira escalonável e durável de ingerir e armazenar esse fluxo contínuo de dados de maneira segura, compatível com OSS, fácil de usar, enquanto o Google Cloud Dataproc permite o processamento desses grandes conjuntos de dados para análises usando clusters do Apache Spark e do Hadoop.
O que você vai criar
Neste codelab, você vai criar um pipeline de processamento de dados de IoT usando o Google Managed Service para Apache Kafka, o Dataproc, o Python e o Apache Spark, que faz análises em tempo real. O pipeline vai:
- Publicar dados de dispositivos IoT em um cluster do Kafka gerenciado usando VMs do GCE
- Transmitir dados do cluster do Manage Kafka para um cluster do Dataproc
- Processar dados usando um job do Dataproc Spark Streaming
O que você vai aprender
- Como criar clusters do Kafka e do Dataproc gerenciados pelo Google
- Como executar jobs de streaming usando o Dataproc
O que é necessário
- Uma conta ativa do GCP com um projeto configurado. Se você não tiver uma, inscreva-se para fazer um teste sem custo financeiro.
- CLI gcloud instalada e configurada. Siga as instruções para instalar a CLI gcloud no seu SO.
- Ative as APIs para Google Managed Kafka e Dataproc no projeto do GCP.
2. Visão geral
Neste codelab, vamos acompanhar a história de uma empresa fictícia, a DemoIOT Solutions. A DemoIOT Solutions oferece dispositivos de sensores que medem e transmitem dados de temperatura, umidade, pressão, nível de luz e localização. Eles querem configurar pipelines que processem esses dados para mostrar estatísticas em tempo real aos clientes. Com esses pipelines, eles podem oferecer uma grande variedade de serviços aos clientes, como monitoramento, sugestões automatizadas, alertas e insights sobre os lugares em que os clientes instalaram os sensores.
Para isso, vamos usar uma VM do GCE para simular o dispositivo IoT. O dispositivo vai publicar dados em um tópico do Kafka no cluster do Kafka gerenciado pelo Google, que será lido e processado por um job de streaming do Dataproc. A configuração de pré-requisito e as páginas a seguir vão orientar você a realizar todas essas etapas.
Configuração de pré-requisito
- Encontre o nome e o número do projeto. Consulte Encontrar o nome, o número e o ID do projeto para referência.
- Sub-rede VPC. Isso permite a conectividade entre a VM do GCE, o cluster do Kafka e o cluster do Dataproc. Siga este procedimento para listar as sub-redes atuais usando a gcloud CLI. Se necessário, siga as instruções em Criar uma rede VPC de modo automático, que cria uma rede VPC com sub-rede em cada região do Google Cloud. No entanto, para os fins deste codelab, vamos usar uma sub-rede de apenas uma região.
- Nessa sub-rede, verifique se há uma regra de firewall que permita toda a entrada de tcp:22, que é necessária para SSH. Essa regra vai estar disponível para seleção na seção "Regras de firewall" ao criar uma rede.
- Bucket do GCS. Você vai precisar de acesso a um bucket do Google Cloud Storage para armazenar recursos de jobs do Dataproc e manter os dados processados. Se você não tiver um, crie um no seu projeto do GCP.
Preencher as variáveis de ambiente
No terminal em que você executa a gcloud CLI, preencha essas variáveis de ambiente para que possam ser usadas mais tarde.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Substitua:
<project-id>
com o nome do projeto do GCP que você configurou.<project-number>
pelo nome do número de projeto da etapa 1 da condição.<region>
pelo nome de uma região das Regiões e zonas disponíveis que você quer usar. Por exemplo, podemos usarus-central1
.<zone>
com o nome da zona em Regiões e zonas disponíveis na região selecionada anteriormente. Por exemplo, se você selecionouus-central1
como a região, useus-central1-f
como a zona. Essa zona será usada para criar a VM do GCE que simula dispositivos IoT. Verifique se a zona está na região escolhida.<subnet-path>
pelo caminho completo da sub-rede da etapa 2 de pré-requisito. O valor precisa estar no formatoprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
pelo nome do bucket do GCS da etapa 3 de pré-requisito.
3. Configurar o Kafka gerenciado pelo Google
Esta seção configura um cluster do Kafka gerenciado pelo Google, que implanta o servidor do Kafka, e cria um tópico nesse servidor em que os dados de IoT podem ser publicados e lidos após a inscrição. A DemoIOT Solutions pode configurar esse cluster para que todos os dispositivos publiquem dados nele.
Criar um cluster do Kafka gerenciado
- Crie o cluster do Kafka gerenciado. Aqui, o nome do cluster é
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Você receberá uma resposta semelhante a esta:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
A criação do cluster leva de 20 a 30 minutos. Aguarde a conclusão dessa operação.
Criar um tópico
- Crie um tópico do Kafka gerenciado no cluster. Aqui, o nome do tópico é
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Você vai receber um resultado semelhante a este:
Created topic [kafka-iot-topic].
4. Configurar um editor
Para publicar no cluster do Managed Kafka, configuramos uma instância de VM do Google Compute Engine que pode acessar a VPC que contém a sub-rede usada pelo cluster do Managed Kafka. Essa VM simula os dispositivos de sensores fornecidos pela DemoIOT Solutions.
Etapas
- Crie a instância de VM do Google Compute Engine. Aqui, o nome da VM do GCE é
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Conceda à conta de serviço padrão do Google Compute Engine as permissões para usar o serviço gerenciado para o Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Use o SSH para se conectar à VM. Como alternativa, use o console do Google Cloud para SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Instale o Java para executar as ferramentas de linha de comando do Kafka e faça o download do binário do Kafka usando estes comandos.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Faça o download da biblioteca de autenticação do Kafka gerenciado e das dependências dela e configure as propriedades do cliente do Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Para mais detalhes sobre a configuração da máquina do editor, consulte Configurar uma máquina cliente.
5. Publicar no Kafka gerenciado
Agora que o publisher está configurado, podemos usar a linha de comando do Kafka para publicar alguns dados fictícios da VM do GCE (simulando dispositivos IoT pela DemoIOT Solutions) no cluster do Kafka gerenciado.
- Como fizemos o SSH na instância de VM do GCE, precisamos preencher novamente a variável
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
Substitua:
<project-id>
com o nome do projeto do GCP que você configurou.<region>
com a região em que o cluster do Kafka foi criado
- Use o comando
managed-kafka clusters describe
para conferir o endereço IP do servidor de inicialização do Kafka. Esse endereço pode ser usado para se conectar ao cluster do Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Liste os tópicos no cluster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Você vai ver a saída a seguir, que contém o tópico kafka-iot-topic
que criamos anteriormente.
__remote_log_metadata
kafka-iot-topic
- Copie e cole este script em um novo arquivo
publish_iot_data.sh
. Para criar um novo arquivo na VM do GCE, use uma ferramenta comovim
ounano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explicação
- Esse script cria mensagens JSON com leituras de sensores simulados que têm ID do dispositivo, carimbo de data/hora, dados do sensor (temperatura, umidade, pressão, luz), informações de local (latitude, longitude), status do dispositivo (bateria, sinal, tipo de conexão) e alguns metadados.
- Ele gera um fluxo contínuo de mensagens de um número definido de dispositivos exclusivos, cada um enviando dados em um intervalo de tempo especificado, imitando dispositivos de IoT reais. Aqui, publicamos dados de 10 dispositivos que produzem 20 leituras cada, em um intervalo de 10 segundos.
- Ele também publica todos os dados gerados no tópico do Kafka usando a ferramenta de linha de comando do produtor do Kafka.
- Instale algumas dependências usadas pelo script: pacote
bc
para cálculos matemáticos e pacotejq
para processamento JSON.
sudo apt-get install bc jq
- Modifique o script para que ele seja executável e execute-o. O processo leva cerca de dois minutos.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Para verificar se os eventos foram publicados, execute este comando, que vai mostrar todos os eventos. Pressione <control-c>
para sair.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Configurar o cluster do Dataproc
Esta seção cria um cluster do Dataproc na sub-rede da VPC em que o cluster do Kafka gerenciado está presente. Esse cluster será usado para executar jobs que geram as estatísticas e os insights em tempo real necessários para as soluções DemoIOT.
- Criar um cluster de Dataproc. Aqui, o nome do cluster é
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Você receberá uma resposta semelhante a esta:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
A criação do cluster pode levar de 10 a 15 minutos. Aguarde a conclusão dessa operação e verifique se o cluster está no estado RUNNING
descrevendo-o.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Processar mensagens do Kafka usando o Dataproc
Nesta última seção, você vai enviar um job do Dataproc que processa as mensagens publicadas usando o Spark Streaming. Essa atividade gera algumas estatísticas e insights em tempo real que podem ser usados pela DemoIOT Solutions.
- Execute este comando para criar o arquivo de job do PySpark de streaming chamado
process_iot.py
localmente.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explicação
- Esse código configura um job de streaming estruturado do PySpark para ler dados de um tópico do Kafka especificado. Ele usa o endereço de inicialização do servidor Kafka fornecido e as configurações do Kafka carregadas de um arquivo de configuração do GCS para se conectar e se autenticar com o agente Kafka.
- Primeiro, ele lê os dados brutos do Kafka como um fluxo de matrizes de bytes, converte essas matrizes em strings e aplica o
json_schema
usando o StructType do Spark para especificar a estrutura dos dados (ID do dispositivo, carimbo de data/hora, local, dados do sensor etc.). - Ele imprime as primeiras 10 linhas no console para visualização, calcula a temperatura média por sensor e grava todos os dados no bucket do GCS no formato
avro
. O Avro é um sistema de serialização de dados baseado em linha que armazena dados estruturados de maneira eficiente em um formato binário compacto definido pelo esquema, oferecendo evolução do esquema, neutralidade de linguagem e alta compactação para processamento de dados em grande escala.
- Crie o arquivo
client.properties
e preencha a variável de ambiente para o endereço de inicialização do servidor Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Faça upload de arquivos
process_iot.py
eclient.properties
para o bucket do Google Cloud Storage para que eles possam ser usados pelo job do Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Copie alguns jars de dependência do job do Dataproc para o bucket do GCS. Esse diretório contém jars necessários para executar jobs do Spark Streaming com o Kafka, além da biblioteca de autenticação do Kafka gerenciado e suas dependências, retiradas de Configurar uma máquina cliente.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Envie o job do Spark para o cluster do Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Os registros do driver do Spark serão impressos. Você também poderá conferir essas tabelas registradas no console e os dados armazenados no bucket do GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Limpar
Siga as etapas para limpar os recursos depois de concluir o codelab.
- Exclua o cluster do Kafka gerenciado, a VM do GCE do editor e o cluster do Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Exclua a sub-rede e a rede VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Exclua seu bucket do GCS se não quiser mais usar os dados.
gcloud storage rm --recursive $BUCKET
9. Parabéns
Parabéns! Você criou um pipeline de processamento de dados de IoT com o Manage Kafka e o Dataproc para ajudar a DemoIOT Solutions a ter insights em tempo real sobre os dados publicados pelos dispositivos.
Você criou um cluster do Kafka gerenciado, publicou eventos de IoT nele e executou um job do Dataproc que usou o streaming do Spark para processar esses eventos em tempo real. Agora você conhece as principais etapas necessárias para criar pipelines de dados usando o Kafka gerenciado e o Dataproc.