Way Back Home - Google ADK, A2A, Kafka를 사용한 이벤트 기반 아키텍처

1. 미션

스토리

미지의 섹터의 침묵 속에서 표류하고 있습니다. 거대한 태양 펄스가 균열을 통해 함선을 찢어버려 별 지도에 존재하지 않는 우주의 한 구석에 고립되었습니다.

며칠간의 힘든 수리 끝에 드디어 발밑에서 엔진의 웅웅거리는 소리가 들립니다. 로켓이 수정되었습니다. 모선에 장거리 업링크를 확보하기까지 했습니다. 출발 허가가 나왔습니다. 이제 집으로 돌아가도 됩니다.

하지만 점프 드라이브를 작동시키려고 할 때 조난 신호가 정적을 뚫고 들어옵니다. 센서가 도움 요청 신호를 감지합니다. X-42 행성 표면에 민간인 5명이 갇혀 있습니다. 탈출할 수 있는 유일한 방법은 궤도에 있는 모선에 조난 신호를 전송하기 위해 동기화해야 하는 15개의 고대 포드에 의존하는 것입니다.

하지만 포드는 기본 탐색 컴퓨터가 손상된 위성 기지국에 의해 제어됩니다. 포드가 목적 없이 떠다니고 있습니다. 위성에 백도어 연결을 설정했지만 업링크에 심각한 성간 간섭이 발생하여 요청-응답 주기에 엄청난 지연 시간이 발생합니다.

과제

요청/응답 모델은 너무 느리므로 서버 전송 이벤트 (SSE)가 있는 이벤트 기반 아키텍처 (EDA)를 배포하여 노이즈를 통해 원격 분석을 스트리밍해야 합니다.

임무

포드를 특정 신호 증폭 포메이션 (원, 별, 선)으로 강제하는 데 필요한 복잡한 벡터 수학을 계산할 수 있는 맞춤 에이전트를 빌드해야 합니다. 이 에이전트를 위성의 새 아키텍처에 연결해야 합니다.

빌드 대상

개요

  • 15개 포드의 함대를 실시간으로 시각화하고 명령하는 React 기반 헤드업 디스플레이 (HUD)
  • 자연어 명령에 따라 포드의 복잡한 기하학적 형성을 계산하는 Google 에이전트 개발 키트 (ADK)를 사용하는 생성형 AI 에이전트
  • 중앙 허브 역할을 하며 서버 전송 이벤트 (SSE)를 통해 프런트엔드와 통신하는 Python 기반 위성 방송국 백엔드
  • Apache Kafka를 사용하여 AI 에이전트와 위성 제어 시스템을 분리하여 복원력 있는 비동기 통신을 지원하는 이벤트 기반 아키텍처

학습할 내용

기술 / 개념

설명

Google ADK (에이전트 개발 키트)

이 프레임워크를 사용하여 Gemini 모델로 구동되는 전문 AI 에이전트를 빌드, 테스트, 스캐폴딩합니다.

이벤트 기반 아키텍처 (EDA)

구성요소가 이벤트를 통해 비동기적으로 통신하여 애플리케이션의 복원력과 확장성을 높이는 분리된 시스템을 빌드하는 원리를 배우게 됩니다.

Apache Kafka

Kafka를 분산 이벤트 스트리밍 플랫폼으로 설정하고 사용하여 다양한 마이크로서비스 간의 명령어와 데이터 흐름을 관리합니다.

서버 전송 이벤트 (SSE)

FastAPI 백엔드에서 SSE를 구현하여 서버에서 React 프런트엔드로 실시간 원격 분석 데이터를 푸시하고 UI를 지속적으로 업데이트합니다.

A2A (에이전트 간) 프로토콜

더 큰 에이전트 생태계 내에서 표준화된 통신과 상호 운용성을 지원하는 A2A 서버로 에이전트를 래핑하는 방법을 알아봅니다.

FastAPI

이 고성능 Python 웹 프레임워크를 사용하여 핵심 백엔드 서비스인 위성 기지를 빌드합니다.

React

SSE 스트림을 구독하는 최신 프런트엔드 애플리케이션을 사용하여 동적이고 대화형 사용자 인터페이스를 만듭니다.

시스템 제어의 생성형 AI

대규모 언어 모델(LLM)이 대화형 채팅뿐만 아니라 좌표 생성과 같은 구체적인 데이터 중심 작업을 수행하도록 프롬프트할 수 있는 방법을 알아봅니다.

2. 환경 설정

Cloud Shell 액세스

👉Google Cloud 콘솔 상단에서 Cloud Shell 활성화를 클릭합니다 (Cloud Shell 창 상단의 터미널 모양 아이콘). cloud-shell.png

👉'편집기 열기' 버튼 (연필이 있는 열린 폴더 모양)을 클릭합니다. 그러면 창에 Cloud Shell 코드 편집기가 열립니다. 왼쪽에 파일 탐색기가 표시됩니다. open-editor.png

👉클라우드 IDE에서 터미널을 열고

03-05-new-terminal.png

👉💻 터미널에서 다음 명령어를 사용하여 이미 인증되었는지, 프로젝트가 프로젝트 ID로 설정되었는지 확인합니다.

gcloud auth list

계정이 (ACTIVE)로 표시됩니다.

기본 요건

ℹ️ 레벨 0은 선택사항이지만 권장됩니다.

레벨 0 없이 이 미션을 완료할 수 있지만 먼저 완료하면 더 몰입감 있는 환경을 경험할 수 있으며, 진행하면서 전 세계 지도에서 내 비컨이 켜지는 것을 확인할 수 있습니다.

프로젝트 환경 설정

터미널로 돌아가 활성 프로젝트를 설정하고 필요한 Google Cloud 서비스 (Cloud Run, Vertex AI 등)를 사용 설정하여 구성을 완료합니다.

👉💻 터미널에서 프로젝트 ID를 설정합니다.

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 필수 서비스 사용 설정:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

종속 항목 설치

👉💻 5단계로 이동하여 필요한 Python 패키지를 설치합니다.

cd $HOME/way-back-home/level_5
uv sync

주요 종속 항목은 다음과 같습니다.

패키지

목적

fastapi

위성 방송국 및 SSE 스트리밍을 위한 고성능 웹 프레임워크

uvicorn

FastAPI 애플리케이션을 실행하는 데 필요한 ASGI 서버

google-adk

Formation Agent를 빌드하는 데 사용된 에이전트 개발 키트

a2a-sdk

표준화된 통신을 위한 에이전트 간 프로토콜 라이브러리

aiokafka

이벤트 루프용 비동기 Kafka 클라이언트

google-genai

Gemini 모델에 액세스하기 위한 네이티브 클라이언트

numpy

시뮬레이션의 벡터 수학 및 좌표 계산

websockets

실시간 양방향 통신 지원

python-dotenv

환경 변수 및 구성 보안 비밀을 관리합니다.

sse-starlette

서버 전송 이벤트 (SSE)의 효율적인 처리

requests

외부 API 호출을 위한 간단한 HTTP 라이브러리

설정 확인

코드를 살펴보기 전에 모든 시스템이 정상인지 확인해 보겠습니다. 확인 스크립트를 실행하여 Google Cloud 프로젝트, API, Python 종속 항목을 감사합니다.

👉💻 확인 스크립트 실행:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 일련의 녹색 체크표시 (✅)가 표시됩니다.

  • 빨간색 십자 (❌)가 표시되면 출력에 제안된 수정 명령어 (예: gcloud services enable ... 또는 pip install ...).
  • 참고: 현재는 .env에 대한 노란색 경고가 표시되어도 괜찮습니다. 다음 단계에서 해당 파일을 만듭니다.
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. LLM으로 포드 위치 형식 지정

구조 작전의 '두뇌'를 만들어야 합니다. Google ADK (에이전트 개발 키트)를 사용하여 생성된 에이전트입니다. 이 모델의 유일한 목적은 전문적인 기하학적 탐색기 역할을 하는 것입니다. 표준 LLM은 채팅을 좋아하지만 심우주에서는 대화가 아닌 데이터가 필요합니다. 이 에이전트가 '별'과 같은 명령어를 받아 15개 포드의 원시 JSON 좌표를 반환하도록 프로그래밍할 것입니다.

에이전트

에이전트 스캐폴딩

👉💻 다음 명령어를 실행하여 에이전트 디렉터리로 이동하고 ADK 생성 마법사를 시작합니다.

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

CLI에서 대화형 설정 마법사가 실행됩니다. 다음 대답을 사용하여 에이전트를 구성하세요.

  1. 모델 선택: 옵션 1 (Gemini Flash)을 선택합니다.
    • 참고: 구체적인 버전은 다를 수 있습니다. 속도를 위해 항상 'Flash' 변형을 선택하세요.
  2. 백엔드 선택: 옵션 2 (Vertex AI)를 선택합니다.
  3. Google Cloud 프로젝트 ID 입력: Enter 키를 눌러 기본값 (환경에서 감지됨)을 수락합니다.
  4. Google Cloud 리전 입력: Enter 키를 눌러 기본값 (us-central1)을 수락합니다.

👀 터미널 상호작용은 다음과 유사해야 합니다.

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

Agent created 성공 메시지가 표시됩니다. 이렇게 하면 이제 수정할 골격 코드가 생성됩니다.

👉✏️ 편집기에서 새로 만든 $HOME/way-back-home/level_5/agent/formation/agent.py 파일로 이동하여 엽니다. 파일의 전체 콘텐츠를 아래 코드로 바꿉니다. 이렇게 하면 에이전트 이름이 업데이트되고 엄격한 운영 매개변수가 제공됩니다.

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • 기하학적 정밀도: 시스템 프롬프트에서 '캔버스 크기'와 '안전 여백'을 정의하여 에이전트가 화면 밖이나 UI 요소 아래에 포드를 배치하지 않도록 합니다.
  • JSON 시행: LLM에 '형성되지 않은 질문에 대한 답변 거부' 및 '서문 없음'을 제공하도록 지시하면 다운스트림 코드 (위성)가 응답을 파싱하려고 할 때 비정상 종료되지 않습니다.
  • 분리된 로직: 이 에이전트는 아직 Kafka에 대해 알지 못합니다. 수학만 할 수 있습니다. 다음 단계에서는 이 '브레인'을 Kafka 서버로 래핑합니다.

로컬에서 에이전트 테스트

에이전트를 Kafka '신경계'에 연결하기 전에 에이전트가 올바르게 작동하는지 확인해야 합니다. 터미널에서 에이전트와 직접 상호작용하여 유효한 JSON 좌표를 생성하는지 확인할 수 있습니다.

👉💻 adk run 명령어를 사용하여 에이전트와 채팅 세션을 시작합니다.

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. 입력: Circle를 입력하고 Enter 키를 누릅니다.
    • 성공 기준: 원시 JSON 목록 (예: [{"x": 400, "y": 200}, ...]). JSON 앞에 '좌표는 다음과 같습니다.'와 같은 마크다운 텍스트가 없는지 확인합니다.
  2. 입력: Line를 입력하고 Enter 키를 누릅니다.
    • 성공 기준: 좌표가 수평선을 만드는지 확인합니다 (y 값이 유사해야 함).

에이전트가 정리된 JSON을 출력하는지 확인하면 Kafka 서버에 래핑할 수 있습니다.

👉💻 종료하려면 Ctrl+C을 누르세요.

4. Formation Agent용 A2A 서버 만들기

A2A (Agent-to-Agent) 이해

A2A (Agent-to-Agent) 프로토콜은 AI 에이전트 간의 원활한 상호 운용성을 지원하도록 설계된 개방형 표준입니다. 이 프레임워크를 통해 에이전트는 단순한 텍스트 교환을 넘어 작업을 위임하고, 복잡한 작업을 조율하고, 분산된 생태계에서 공동의 목표를 달성하기 위해 일관된 단위로 기능할 수 있습니다.

A2A

A2A 전송 이해: HTTP, gRPC, Kafka

A2A 프로토콜은 클라이언트와 에이전트가 통신할 수 있는 두 가지 서로 다른 방법을 제공하며, 각 방법은 서로 다른 아키텍처 요구사항을 충족합니다. HTTP (JSON-RPC)는 모든 웹 환경에서 범용으로 작동하는 기본 표준입니다. gRPC는 효율적이고 엄격한 유형의 통신을 위해 프로토콜 버퍼를 활용하는 고성능 옵션입니다. 실습에서는 Kafka 트랜스포트도 제공합니다. 시스템 분리가 우선인 강력한 이벤트 기반 아키텍처를 위해 설계된 맞춤 구현입니다.

전송

내부적으로 이러한 전송은 데이터 흐름을 매우 다르게 처리합니다. HTTP 모델에서 클라이언트는 JSON 요청을 전송하고 연결을 열어 둔 채 에이전트가 작업을 완료하고 한 번에 전체 결과를 반환할 때까지 기다립니다. gRPC는 바이너리 데이터와 HTTP/2를 사용하여 이를 최적화하므로 간단한 요청-응답 주기와 에이전트가 업데이트('생각' 또는 '아티팩트 생성됨' 등)를 발생하는 대로 전송하는 실시간 스트리밍이 모두 가능합니다. Kafka 구현은 비동기식으로 작동합니다. 클라이언트는 내구성이 뛰어난 '요청 주제'에 요청을 게시하고 별도의 '답장 주제'에서 수신 대기합니다. 서버는 가능한 경우 메시지를 가져와 처리하고 결과를 다시 게시합니다. 즉, 두 항목은 서로 직접 통신하지 않습니다.

선택은 속도, 복잡성, 지속성에 대한 구체적인 요구사항에 따라 달라집니다. HTTP는 시작하고 디버그하기 가장 쉬우므로 간단한 통합에 적합합니다. gRPC는 지연 시간이 짧고 스트리밍 작업 업데이트가 중요한 내부 서비스 간 통신에 적합합니다. 하지만 Kafka는 요청을 디스크의 대기열에 저장하므로 에이전트 서버가 비정상 종료되거나 다시 시작되더라도 작업이 유지되어 HTTP나 gRPC에서는 제공할 수 없는 수준의 내구성과 분리를 제공하므로 복원력이 있는 선택입니다.

맞춤 전송 계층: Kafka

Kafka는 작업의 브레인 (Formation Agent)을 물리적 제어 (위성 스테이션)에서 분리하는 비동기 백본 역할을 합니다. 에이전트가 복잡한 벡터를 계산하는 동안 시스템이 동기 연결을 기다리도록 강제하는 대신 에이전트는 결과를 Kafka 주제에 이벤트로 게시합니다. 이는 지속적인 버퍼 역할을 하여 위성이 자체 속도로 명령어를 사용할 수 있도록 하고, 네트워크 지연 시간이 길거나 일시적인 시스템 비정상 종료가 발생하더라도 포메이션 데이터가 손실되지 않도록 합니다.

Kafka를 사용하면 느린 선형 프로세스를 탄력적인 스트리밍 파이프라인으로 변환하여 명령과 원격 분석이 독립적으로 흐르므로 집중적인 AI 처리 중에도 미션의 HUD가 응답성을 유지합니다.

Kafka

Kafka란 무엇인가요?

Kafka는 분산 이벤트 스트리밍 플랫폼입니다. 이벤트 기반 아키텍처 (EDA)에서 다음이 수행됩니다.

  1. 프로듀서는 '주제'에 메시지를 게시합니다.
  2. 소비자는 이러한 주제를 구독하고 메시지가 도착하면 반응합니다.

Kafka를 사용해야 하는 이유

시스템을 분리합니다. Formation Agent는 발신자의 신원이나 상태를 알 필요 없이 수신되는 요청을 기다리면서 자율적으로 작동합니다. 이렇게 하면 책임이 분리되어 위성이 오프라인 상태가 되더라도 워크플로가 그대로 유지됩니다. Kafka는 위성이 다시 연결될 때까지 메시지를 저장하기만 합니다.

Google Cloud Pub/Sub은 어떤가요?

Google Cloud Pub/Sub를 사용하면 됩니다. Pub/Sub는 Google의 서버리스 메시징 서비스입니다. Kafka는 높은 처리량과 '재생 가능한' 스트림에 적합하지만 Pub/Sub는 사용 편의성으로 인해 선호되는 경우가 많습니다. 이 실습에서는 Kafka를 사용하여 강력한 영구 메시지 버스를 시뮬레이션합니다.

로컬 Kafka 클러스터 시작

아래 전체 명령어 블록을 복사하여 터미널에 붙여넣습니다. 이렇게 하면 공식 Kafka 이미지가 다운로드되고 백그라운드에서 시작됩니다.

👉💻 터미널에서 다음 명령어를 실행합니다.

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 docker ps 명령어를 사용하여 컨테이너가 실행 중인지 확인합니다.

docker ps

👀 mission-kafka 컨테이너가 실행 중이고 포트 9092이 노출되었음을 확인하는 출력이 표시됩니다.

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

Kafka 주제란 무엇인가요?

Kafka 주제는 메시지를 위한 전용 채널 또는 카테고리라고 생각하면 됩니다. 이벤트 레코드가 생성된 순서대로 저장되는 로그북과 같습니다. 생산자는 특정 주제에 메시지를 쓰고 소비자는 해당 주제에서 읽습니다. 이렇게 하면 발신자와 수신자가 분리됩니다. 생산자는 어떤 소비자가 데이터를 읽을지 알 필요가 없고 올바른 '채널'로만 전송하면 됩니다. 미션에서는 두 가지 주제를 만듭니다. 하나는 에이전트에 형성 요청을 보내는 것이고 다른 하나는 에이전트가 위성이 읽을 수 있도록 대답을 게시하는 것입니다.

Kafka

👉💻 다음 명령어를 실행하여 실행 중인 Docker 컨테이너 내에 필요한 주제를 만듭니다.

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 채널이 열려 있는지 확인하려면 목록 명령어를 실행합니다.

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 방금 만든 주제의 이름이 표시됩니다.

a2a-formation-request
a2a-reply-satellite-dashboard

이제 Kafka 인스턴스가 완전히 구성되어 미션 크리티컬 데이터를 라우팅할 준비가 되었습니다.

Kafka A2A 서버 구현

에이전트 간 (A2A) 프로토콜은 독립적인 에이전트 시스템 간의 상호 운용성을 위한 표준화된 프레임워크를 수립합니다. 이를 통해 다양한 팀에서 개발하거나 다양한 인프라에서 실행되는 에이전트가 모든 연결에 맞춤 통합 로직이 필요하지 않고 서로를 검색하고 효과적으로 공동작업할 수 있습니다.

참조 구현인 a2a-python은 이러한 에이전트 애플리케이션을 실행하기 위한 기본 라이브러리입니다. 설계의 핵심 기능은 확장성입니다. 통신 레이어를 추상화하여 개발자가 HTTP와 같은 프로토콜을 다른 프로토콜로 바꿀 수 있습니다.

A2A 흐름

이 프로젝트에서는 맞춤 Kafka 구현인 a2a-python-kafka를 사용하여 이 확장성을 활용합니다. 이 구현을 사용하여 A2A 표준을 통해 다양한 아키텍처 요구사항에 맞게 에이전트 커뮤니케이션을 조정하는 방법을 보여줍니다. 이 경우 동기 HTTP를 비동기 이벤트 버스로 바꿉니다.

Formation Agent에 A2A 사용 설정

이제 에이전트를 A2A 서버로 래핑하여 다음을 수행할 수 있는 상호 운용 가능한 서비스로 전환합니다.

  • Kafka 주제에서 작업을 수신 대기합니다.
  • 수신된 작업을 처리할 기본 ADK 에이전트에게 전달합니다.
  • 결과를 응답 주제에 게시합니다.

👉✏️ $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py에서 #REPLACE-CREATE-KAFKA-A2A-SERVER를 다음 코드로 바꿉니다.

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

이 코드는 주요 구성요소를 설정합니다.

  1. 러너: 에이전트의 런타임을 제공합니다 (메모리, 사용자 인증 정보 등을 처리).
  2. 작업 저장소: 요청이 '대기 중'에서 '완료됨'으로 이동할 때 요청의 상태를 추적합니다.
  3. 에이전트 실행기: Kafka에서 작업을 가져와 에이전트에 전달하여 좌표를 계산합니다.
  4. KafkaServerApp: Kafka 브로커에 대한 실제 연결을 관리합니다.

A2A Kafka

환경 변수 구성

ADK 설정으로 에이전트 폴더 내에 Google Vertex AI 설정이 포함된 .env 파일이 생성되었습니다. 이를 프로젝트 루트로 이동하고 Kafka 클러스터의 좌표를 추가해야 합니다.

다음 명령어를 실행하여 파일을 복사하고 Kafka 서버 주소를 추가합니다.

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

A2A Interstellar Loop 확인

이제 Kafka 클러스터를 통해 수동 신호를 전송하고 에이전트의 응답을 확인하여 비동기 이벤트 루프가 올바르게 작동하는지 확인합니다.

A2A Interstellar Loop 확인

이벤트의 전체 수명 주기를 확인하기 위해 3개의 개별 터미널을 사용합니다.

터미널 A: Formation Agent (A2A Kafka 서버)

👉💻 이 터미널은 Kafka를 수신하고 Gemini를 사용하여 기하학적 수학을 실행하는 Python 프로세스를 실행합니다.

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

다음이 표시될 때까지 기다립니다.

[INFO] Kafka Server App Started. Starting to consume requests...

터미널 B: 위성 리스너 (소비자)

👉💻 이 터미널에서 reply topic을 수신합니다. 이는 위성이 명령을 기다리는 상황을 시뮬레이션합니다.

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

이 터미널은 유휴 상태로 표시됩니다. 상담사가 메시지를 게시하기를 기다리고 있습니다.

터미널 C: 사령관의 신호 (생산자)

👉💻 이제 원시 A2A 형식 요청을 a2a-formation-request 주제로 전송합니다. 에이전트가 답변을 전송할 위치를 알 수 있도록 특정 Kafka 헤더를 포함해야 합니다.

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

결과 분석

👀 루프가 성공하면 터미널 B로 전환합니다. 큰 JSON 블록이 즉시 표시됩니다. 전송한 헤더 correlation_id:ping-manual-01로 시작합니다. 그 뒤에 task 객체가 옵니다. 해당 JSON 내의 parts 섹션을 자세히 살펴보면 Gemini가 15개 포드에 대해 계산한 원시 X 및 Y 좌표가 표시됩니다.

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

수신기에서 에이전트가 분리되었습니다. 이제 시스템이 완전히 이벤트 기반이므로 요청-응답 지연 시간의 '성간 노이즈'는 더 이상 중요하지 않습니다.

계속하기 전에 백그라운드 프로세스를 중지하여 네트워크 포트를 확보합니다.

👉💻 각 터미널 (A, B, C)에서 다음을 실행합니다.

  • Ctrl + C를 눌러 실행 중인 프로세스를 종료합니다.

5. 위성 스테이션 (A2A Kafka 클라이언트 및 SSE)

이 단계에서는 위성 기지국을 빌드합니다. 이는 Kafka 클러스터와 파일럿의 시각적 디스플레이 (React 프런트엔드) 간의 브리지입니다. 이 서버는 Kafka 클라이언트 (에이전트와 통신)와 SSE 스트리머 (브라우저와 통신) 역할을 모두 합니다.

Kafka 클라이언트란 무엇인가요?

Kafka 클러스터를 라디오 방송국이라고 생각하면 됩니다. Kafka 클라이언트는 라디오 수신기입니다. KafkaClientTransport를 사용하면 애플리케이션에서 다음 작업을 할 수 있습니다.

  1. 메시지 생성: '작업' (예: '별 형성')을 에이전트에게 전달합니다.
  2. 답변 사용: 특정 '답변 주제'를 수신 대기하여 상담사로부터 좌표를 다시 가져옵니다.

1. 연결 초기화

FastAPI의 lifespan 이벤트 핸들러를 사용하여 서버가 부팅될 때 Kafka 연결이 시작되고 종료될 때 완전히 닫히도록 합니다.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py에서 #REPLACE-CONNECT-TO-KAFKA-CLUSTER를 다음 코드로 바꿉니다.

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. 명령어 전송

대시보드에서 버튼을 클릭하면 /formation 엔드포인트가 트리거됩니다. 이 도구는 요청을 공식 A2A Message로 래핑하여 에이전트에 전송하는 생산자 역할을 합니다.

형성

핵심 로직:

  • 비동기 통신: kafka_transport.send_message가 요청을 전송하고 reply_topic에 새 좌표가 도착할 때까지 기다립니다.
  • 대답 파싱: Gemini가 마크다운 블록 내에 좌표를 반환할 수 있습니다 (예: json ...). 아래 코드는 이러한 문자를 삭제하고 문자열을 포인트의 Python 목록으로 변환합니다.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py에서 #REPLACE-FORMATION-REQUEST를 다음 코드로 바꿉니다.

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

서버 전송 이벤트 (SSE)

표준 API는 '요청-응답' 모델을 사용합니다. HUD의 경우 포드 위치의 '라이브 스트림'이 필요합니다.

SSE의 장점 양방향이고 더 복잡한 WebSocket과 달리 SSE는 서버에서 브라우저로의 간단한 단방향 데이터 스트림을 제공합니다. 대시보드, 주식 티커 또는 성간 원격 측정에 적합합니다.

SSE

코드에서 작동하는 방식: 0.5초마다 15개 포드의 현재 위치를 가져와 업데이트로 브라우저에 '푸시'하는 무한 루프인 event_generator를 만듭니다.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py에서 #REPLACE-SSE-STREAM를 다음 코드로 바꿉니다.

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

전체 미션 루프 실행

최종 UI를 출시하기 전에 시스템이 엔드 투 엔드로 작동하는지 확인해 보겠습니다. 에이전트를 수동으로 트리거하고 와이어에서 원시 데이터 페이로드를 확인합니다.

확인

별도의 터미널 탭 3개를 엽니다.

터미널 A: Formation Agent (A2A 서버)

👉💻 작업을 수신 대기하고 기하학적 수학을 실행하는 ADK 에이전트입니다.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

터미널 B: 위성 스테이션 (Kafka 클라이언트)

👉💻 이 FastAPI 서버는 Kafka 응답을 수신하고 이를 라이브 SSE 스트림으로 변환하는 '수신기' 역할을 합니다.

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

터미널 C: 수동 HUD

형성 명령 전송 (트리거): 👉💻 동일한 터미널 C에서 형성 프로세스를 트리거합니다.

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 새 좌표가 표시됩니다.

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

이렇게 하면 위성이 내부 포드 좌표를 업데이트했음을 확인할 수 있습니다.

👉💻 먼저 curl를 사용하여 실시간 원격 분석 스트림을 수신 대기한 다음 포메이션 변경을 트리거합니다.

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 curl -N 명령어의 출력을 확인합니다. pod_update 이벤트의 xy 좌표가 대형의 새 위치를 반영하기 시작합니다.

계속하기 전에 실행 중인 모든 프로세스를 중지하여 통신 포트를 확보하세요.

각 터미널 (A, B, C, 트리거 터미널): Ctrl + C를 누릅니다.

6. 구조하기

시스템이 설정되었습니다. 이제 미션을 실현할 차례입니다. 이제 React 기반 헤드업 디스플레이 (HUD)가 실행됩니다. 이 대시보드는 SSE를 통해 위성 스테이션에 연결되어 15개의 포드를 실시간으로 시각화할 수 있습니다.

개요

명령어를 실행하면 함수를 호출하는 것뿐만 아니라 Kafka를 통해 이동하고 AI 에이전트가 처리하며 실시간 원격 분석으로 화면에 다시 스트리밍되는 이벤트가 트리거됩니다.

확인

별도의 터미널 탭 두 개를 엽니다.

터미널 A: Formation Agent (A2A 서버)

👉💻 작업을 수신하고 Gemini를 사용하여 기하학적 수학을 실행하는 ADK 에이전트입니다. 터미널에서 다음을 실행합니다.

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

터미널 B: 위성 스테이션 및 시각적 대시보드

👉💻 먼저 프런트엔드 애플리케이션을 빌드합니다.

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 이제 백엔드 로직과 프런트엔드 UI를 모두 제공하는 FastAPI 서버를 시작합니다.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

실행 및 확인

  1. 👉 미리보기 열기: Cloud Shell 툴바에서 웹 미리보기 아이콘을 클릭합니다. 포트 변경을 선택하고 8000으로 설정한 후 변경 및 미리보기를 클릭합니다. 새 브라우저 탭이 열리고 Starfield HUD가 표시됩니다. *웹 미리보기
  2. 👉 원격 분석 스트림 확인:
    • UI가 로드되면 무작위로 흩어져 있는 15개의 포드가 표시됩니다.
    • 포드가 미세하게 깜박이거나 '지터링'되면 SSE 스트림이 활성 상태이고 위성 스테이션이 위치를 성공적으로 브로드캐스트하고 있는 것입니다. 시작
  3. 👉 포메이션 시작: 대시보드에서 'STAR' 버튼을 클릭합니다. 별표
  4. 👀 이벤트 루프 추적: 터미널을 확인하여 아키텍처가 작동하는 모습을 확인하세요.
    • 터미널 B (위성 방송국)에는 Sending A2A Message: 'Create a STAR formation'이 기록됩니다.
    • 터미널 A (Formation Agent)에는 Gemini와 상담할 때 활동이 표시됩니다.
    • 터미널 B (위성 스테이션)Received A2A Response를 로깅하고 좌표를 파싱합니다.
  5. 👀 시각적 확인: 대시보드에서 15개의 포드가 무작위 위치에서 5각별 모양으로 부드럽게 이동하는 것을 확인합니다.
  6. 👉 실험:
    • 3가지 다른 포메이션의 경우 'X' 또는 'LINE'이라고 말하세요. X
    • 맞춤 의도: 수동 입력을 사용하여 '하트' 또는 '삼각형'과 같은 고유한 항목을 입력합니다. 원
    • 생성형 AI를 사용하므로 에이전트가 설명할 수 있는 모든 기하학적 도형의 수학을 계산하려고 시도합니다.

패턴 3개를 만들면 연결이 다시 설정됩니다. 완료

미션 완료

데이터가 중단 없이 노이즈를 통과하면 스트림이 안정화됩니다. 명령에 따라 15개의 고대 포드가 별들 사이에서 동기화된 춤을 추기 시작합니다.

종료

3단계의 힘든 보정 단계를 거치면서 원격 분석이 제자리에 고정되는 것을 확인했습니다. 정렬할 때마다 신호가 강해져 마침내 희망의 봉화처럼 성간 간섭을 뚫고 나갔습니다.

이벤트 기반 에이전트를 능숙하게 구현해 주신 덕분에 생존자 5명이 X-42 표면에서 공중으로 이송되어 현재 구조선에 안전하게 탑승했습니다. 덕분에 5명의 생명을 구할 수 있었습니다.

레벨 0에 참여한 경우 '집으로 돌아가는 길' 미션에서 진행 상황을 확인하세요. 별을 향한 여정은 계속됩니다.최종