이 Codelab 정보
1. 소개
최종 업데이트: 2024년 6월 10일
배경
스마트 홈 솔루션에서 산업용 센서에 이르기까지 IoT (사물 인터넷) 기기는 네트워크 에지에서 방대한 양의 데이터를 생성합니다. 이 데이터는 기기 모니터링, 추적, 진단, 감시, 맞춤설정, 차량 최적화 등 다양한 사용 사례에 매우 유용합니다. Apache Kafka용 Google 관리형 서비스는 OSS와 호환되고 사용하기 쉽고 안전한 방식으로 이러한 연속 데이터 스트림을 처리하고 저장할 수 있는 확장 가능하고 내구성 있는 방법을 제공하며, Google Cloud Dataproc을 사용하면 Apache Spark 및 Hadoop 클러스터를 사용하여 데이터 분석을 위해 이러한 대규모 데이터 세트를 처리할 수 있습니다.
빌드할 항목
이 Codelab에서는 실시간 분석을 실행하는 Apache Kafka용 Google 관리형 서비스, Dataproc, Python, Apache Spark를 사용하여 IoT 데이터 처리 파이프라인을 빌드합니다. 파이프라인은 다음을 실행합니다.
- GCE VM을 사용하여 IoT 기기의 데이터를 관리형 Kafka 클러스터에 게시
- 관리 Kafka 클러스터에서 Dataproc 클러스터로 데이터 스트리밍
- Dataproc Spark 스트리밍 작업을 사용하여 데이터 처리
학습할 내용
- Google 관리형 Kafka 및 Dataproc 클러스터를 만드는 방법
- Dataproc을 사용하여 스트리밍 작업을 실행하는 방법
필요한 항목
- 프로젝트가 설정된 활성 GCP 계정 계정이 없는 경우 무료 체험을 신청할 수 있습니다.
- gcloud CLI가 설치되고 구성되었습니다. OS에 gcloud CLI를 설치하는 안내를 따르세요.
- GCP 프로젝트에서 Google 관리 Kafka 및 Dataproc에 API를 사용 설정했습니다.
2. 개요
이 Codelab에서는 가상 회사인 DemoIOT Solutions의 이야기를 따라가 보겠습니다. DemoIOT Solutions는 온도, 습도, 압력, 밝기 수준, 위치 데이터를 측정하고 전송하는 센서 기기를 제공합니다. 이 데이터를 처리하여 고객에게 실시간 통계를 표시하는 파이프라인을 설정하려 합니다. 이러한 파이프라인을 사용하면 고객이 센서를 설치한 위치에 대한 모니터링, 자동 추천, 알림, 통계 등 다양한 서비스를 고객에게 제공할 수 있습니다.
이를 위해 GCE VM을 사용하여 IoT 기기를 시뮬레이션합니다. 기기는 Google 관리형 Kafka 클러스터의 Kafka 주제에 데이터를 게시하며, 이 데이터는 Dataproc 스트리밍 작업에서 읽고 처리합니다. 기본 요건 설정 및 다음 페이지에서 이러한 모든 단계를 수행할 수 있습니다.
기본 요건 설정
- 프로젝트의 프로젝트 이름과 프로젝트 번호를 찾습니다. 참고로 프로젝트 이름, 번호, ID 찾기를 참고하세요.
- VPC 서브네트워크 이렇게 하면 GCE VM, Kafka 클러스터, Dataproc 클러스터 간에 연결할 수 있습니다. 이 방법에 따라 gcloud CLI를 사용하여 기존 서브넷을 나열합니다. 필요한 경우 자동 모드 VPC 네트워크 만들기에 따라 각 Google Cloud 리전에 서브넷이 있는 VPC 네트워크를 만듭니다. 하지만 이 Codelab에서는 단일 리전의 하위 네트워크만 사용합니다.
- 이 하위 네트워크에서 SSH에 필요한 tcp:22의 모든 인그레스를 허용하는 방화벽 규칙이 있는지 확인합니다. 이 규칙은 네트워크를 만들 때 방화벽 규칙 섹션에서 선택할 수 있으므로 선택해야 합니다.
- GCS 버킷 Dataproc 작업 리소스를 저장하고 처리된 데이터를 유지하려면 Google Cloud Storage 버킷에 액세스해야 합니다. 계정이 없는 경우 GCP 프로젝트에서 만들 수 있습니다.
환경 변수 채우기
gcloud CLI를 실행하는 터미널에서 이러한 환경 변수를 채워 나중에 사용할 수 있도록 합니다.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
다음을 바꿉니다.
<project-id>
를 설정한 GCP 프로젝트의 이름으로 바꿉니다.<project-number>
를 기본 요건 1단계의 프로젝트 번호 이름으로 바꿉니다.<region>
을 사용하려는 사용 가능한 리전 및 영역의 리전 이름으로 바꿉니다. 예를 들어us-central1
를 사용할 수 있습니다.<zone>
을 이전에 선택한 리전 아래의 사용 가능한 리전 및 영역에서 영역 이름으로 바꿉니다. 예를 들어us-central1
를 리전으로 선택한 경우us-central1-f
를 영역으로 사용할 수 있습니다. 이 영역은 IoT 기기를 시뮬레이션하는 GCE VM을 만드는 데 사용됩니다. 선택한 영역에 영역이 있는지 확인합니다.<subnet-path>
를 기본 요건 2단계의 서브넷 전체 경로로 바꿉니다. 이 값은projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
형식이어야 합니다.<bucket-name>
을 기본 요건 3단계의 GCS 버킷 이름으로 바꿉니다.
3. Google 관리형 Kafka 설정
이 섹션에서는 Kafka 서버를 배포하는 Google 관리 Kafka 클러스터를 설정하고 이 서버에 IoT 데이터를 구독한 후 게시하고 읽을 수 있는 주제를 만듭니다. DemoIOT Solutions는 모든 기기가 이 클러스터에 데이터를 게시하도록 이 클러스터를 설정할 수 있습니다.
관리형 Kafka 클러스터 만들기
- 관리형 Kafka 클러스터를 만듭니다. 여기서 클러스터 이름은
kafka-iot
입니다.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
다음과 비슷한 응답이 표시됩니다.
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
클러스터를 만드는 데 20~30분 정도 걸립니다. 이 작업이 완료될 때까지 기다립니다.
주제 만들기
- 클러스터에서 관리형 Kafka 주제를 만듭니다. 여기서 주제 이름은
kafka-iot-topic
입니다.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
다음과 비슷한 출력이 표시됩니다.
Created topic [kafka-iot-topic].
4. 게시자 설정
관리형 Kafka 클러스터에 게시하기 위해 관리형 Kafka 클러스터에서 사용하는 서브넷이 포함된 VPC에 액세스할 수 있는 Google Compute Engine VM 인스턴스를 설정합니다. 이 VM은 DemoIOT Solutions에서 제공하는 센서 기기를 시뮬레이션합니다.
단계
- Google Compute Engine VM 인스턴스를 만듭니다. 여기서 GCE VM의 이름은
publisher-instance
입니다.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Google Compute Engine 기본 서비스 계정에 Apache Kafka용 관리형 서비스를 사용할 권한을 부여합니다.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- SSH를 사용하여 VM에 연결합니다. 또는 Google Cloud 콘솔을 사용하여 SSH를 실행합니다.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Kafka 명령줄 도구를 실행할 Java를 설치하고 다음 명령어를 사용하여 Kafka 바이너리를 다운로드합니다.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- 관리형 Kafka 인증 라이브러리 및 종속 항목을 다운로드하고 Kafka 클라이언트 속성을 구성합니다.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
게시자 머신 설정에 관한 자세한 내용은 클라이언트 머신 설정을 참고하세요.
5. 관리형 Kafka에 게시
이제 게시자가 설정되었으므로 Kafka 명령줄을 사용하여 GCE VM (DemoIOT Solutions의 IoT 기기 시뮬레이션)에서 관리형 Kafka 클러스터로 일부 더미 데이터를 게시할 수 있습니다.
- GCE VM 인스턴스에 SSH로 연결했으므로
PROJECT_ID
변수를 다시 채워야 합니다.
export PROJECT_ID=<project-id>
export REGION=<region>
다음을 바꿉니다.
<project-id>
를 설정한 GCP 프로젝트의 이름으로 바꿉니다.<region>
을 Kafka 클러스터가 생성된 리전으로 바꿉니다.
managed-kafka clusters describe
명령어를 사용하여 Kafka 부트스트랩 서버의 IP 주소를 가져옵니다. 이 주소는 Kafka 클러스터에 연결하는 데 사용할 수 있습니다.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- 클러스터의 주제를 나열합니다.
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
앞서 만든 kafka-iot-topic
주제가 포함된 다음 출력이 표시됩니다.
__remote_log_metadata
kafka-iot-topic
- 이 스크립트를 복사하여 새 파일
publish_iot_data.sh
에 붙여넣습니다. GCE VM에서 새 파일을 만들려면vim
또는nano
와 같은 도구를 사용하면 됩니다.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
설명
- 이 스크립트는 기기 ID, 타임스탬프, 센서 데이터 (온도, 습도, 압력, 조명), 위치 정보 (위도, 경도), 기기 상태 (배터리, 신호, 연결 유형) 및 일부 메타데이터가 포함된 시뮬레이션된 센서 판독값으로 JSON 메시지를 만듭니다.
- 각 기기가 지정된 시간 간격으로 데이터를 전송하여 실제 IoT 기기를 모방하는 고유한 기기의 연속적인 메시지 흐름을 생성합니다. 여기서는 10초 간격으로 각각 20개의 측정값을 생성하는 10대의 기기에서 수집한 데이터를 게시합니다.
- 또한 Kafka 생산자 명령줄 도구를 사용하여 생성된 모든 데이터를 Kafka 주제에 게시합니다.
- 스크립트에서 사용하는 일부 종속 항목(수학 계산을 위한
bc
패키지 및 JSON 처리를 위한jq
패키지)을 설치합니다.
sudo apt-get install bc jq
- 스크립트를 실행 파일로 수정하고 스크립트를 실행합니다. 실행하는 데 2분 정도 걸립니다.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
이 명령어를 실행하면 모든 이벤트가 출력되므로 이벤트가 게시되었는지 확인할 수 있습니다. 종료하려면 <control-c>
를 누릅니다.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Dataproc 클러스터 설정
이 섹션에서는 관리형 Kafka 클러스터가 있는 VPC 하위 네트워크에 Dataproc 클러스터를 만듭니다. 이 클러스터는 DemoIOT 솔루션에 필요한 실시간 통계 및 통계를 생성하는 작업을 실행하는 데 사용됩니다.
- Dataproc 클러스터를 만듭니다. 여기서 클러스터 이름은
dataproc-iot
입니다.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
다음과 비슷한 응답이 표시됩니다.
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
클러스터를 만드는 데 10~15분 정도 걸릴 수 있습니다. 이 작업이 완료될 때까지 기다린 후 클러스터를 설명하여 클러스터가 RUNNING
상태인지 확인합니다.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Dataproc을 사용하여 Kafka 메시지 처리
이 마지막 섹션에서는 Spark Streaming을 사용하여 게시된 메시지를 처리하는 Dataproc 작업을 제출합니다. 이 작업은 실제로 DemoIOT 솔루션에서 사용할 수 있는 실시간 통계 및 통계를 생성합니다.
- 이 명령어를 실행하여 로컬에서
process_iot.py
라는 스트리밍 PySpark 작업 파일을 만듭니다.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
설명
- 이 코드는 지정된 Kafka 주제에서 데이터를 읽도록 PySpark 구조화된 스트리밍 작업을 설정합니다. 제공된 Kafka 서버 부트스트랩 주소와 GCS 구성 파일에서 로드된 Kafka 구성을 사용하여 Kafka 브로커에 연결하고 인증합니다.
- 먼저 Kafka에서 원시 데이터를 바이트 배열의 스트림으로 읽고 이러한 바이트 배열을 문자열로 변환한 후 Spark의 StructType을 사용하여
json_schema
를 적용하여 데이터 구조 (기기 ID, 타임스탬프, 위치, 센서 데이터 등)를 지정합니다. - 미리보기용으로 처음 10개 행을 콘솔에 출력하고, 센서당 평균 온도를 계산하고, 모든 데이터를
avro
형식으로 GCS 버킷에 씁니다. Avro는 정형 데이터를 스키마 정의의 컴팩트한 바이너리 형식으로 효율적으로 저장하는 행 기반 데이터 직렬화 시스템으로, 대규모 데이터 처리를 위해 스키마 진화, 언어 중립성, 높은 압축을 제공합니다.
client.properties
파일을 만들고 kafka 서버의 부트스트랩 주소에 대한 환경 변수를 채웁니다.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Dataproc 작업에서 사용할 수 있도록
process_iot.py
및client.properties
파일을 Google Cloud Storage 버킷에 업로드합니다.
gsutil cp process_iot.py client.properties $BUCKET
- Dataproc 작업의 일부 종속 항목 jar를 GCS 버킷에 복사합니다. 이 디렉터리에는 Kafka로 Spark Streaming 작업을 실행하는 데 필요한 jar와 클라이언트 머신 설정에서 가져온 관리형 Kafka 인증 라이브러리 및 종속 항목이 포함되어 있습니다.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Spark 작업을 Dataproc 클러스터에 제출합니다.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Spark 드라이버 로그가 출력됩니다. 또한 콘솔에 로깅된 이러한 테이블과 GCS 버킷에 저장된 데이터를 볼 수 있습니다.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. 삭제
Codelab을 완료한 후 단계에 따라 리소스를 삭제합니다.
- 관리형 Kafka 클러스터, 게시자 GCE VM, Dataproc 클러스터를 삭제합니다.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- VPC 서브네트워크 및 네트워크를 삭제합니다.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- 더 이상 데이터를 사용하지 않으려면 GCS 버킷을 삭제합니다.
gcloud storage rm --recursive $BUCKET
9. 축하합니다
축하합니다. Manage Kafka 및 Dataproc를 사용하여 IoT 데이터 처리 파이프라인을 빌드하여 DemoIOT Solutions에서 기기에서 게시된 데이터에 대한 실시간 통계를 얻을 수 있도록 했습니다.
관리형 Kafka 클러스터를 만들고 IoT 이벤트를 클러스터에 게시한 후 Spark 스트리밍을 사용하여 이러한 이벤트를 실시간으로 처리하는 Dataproc 작업을 실행했습니다. 이제 관리형 Kafka 및 Dataproc을 사용하여 데이터 파이프라인을 만드는 데 필요한 주요 단계를 알게 되었습니다.