Way Back Home - Event-Driven Architecture with Google ADK, A2A, and Kafka

1. मिशन

कहानी

आप किसी ऐसे सेक्टर में हैं जिसके बारे में कोई जानकारी नहीं है. एक बड़े सोलर पल्स ने आपके जहाज़ को एक दरार से फाड़ दिया है. इससे आप ब्रह्मांड के एक ऐसे हिस्से में फंस गए हैं जो किसी भी स्टार चार्ट पर मौजूद नहीं है.

कई दिनों तक मरम्मत करने के बाद, आपको अपने पैरों के नीचे इंजन की आवाज़ सुनाई देती है. आपकी रॉकेटशिप ठीक कर दी गई है. आपने मदरशिप से लंबी दूरी का अपलिंक भी सुरक्षित कर लिया है. आपको उड़ान भरने की अनुमति है. अब आप घर जाने के लिए तैयार हैं.

लेकिन जंप ड्राइव को चालू करने से पहले, एक डिस्ट्रेस सिग्नल सुनाई देता है. आपके सेंसर, मदद के लिए भेजे गए सिग्नल को पहचान लेते हैं. पांच आम नागरिक, प्लैनेट X-42 की सतह पर फंसे हुए हैं. उनके पास बचने का सिर्फ़ एक ही तरीका है. उन्हें 15 पुराने पॉड को सिंक्रनाइज़ करना होगा, ताकि वे ऑर्बिट में मौजूद अपने मदरशिप को खतरे का सिग्नल भेज सकें.

हालांकि, पॉड को एक सैटेलाइट स्टेशन से कंट्रोल किया जाता है. इस स्टेशन का मुख्य नेविगेशन कंप्यूटर खराब हो गया है. पॉड बिना किसी मकसद के इधर-उधर घूम रहे हैं. हमने सैटलाइट से बैकडोर कनेक्शन बना लिया है. हालांकि, इंटरस्टेलर इंटरफ़ेरेंस की वजह से अपलिंक में समस्या आ रही है. इस वजह से, अनुरोध-जवाब के साइकल में बहुत ज़्यादा देरी हो रही है.

चुनौती

अनुरोध/जवाब मॉडल बहुत धीमा होता है. इसलिए, हमें इवेंट-ड्रिवन आर्किटेक्चर (ईडीए) को सर्वर-सेंट इवेंट (एसएसई) के साथ डिप्लॉय करना होगा, ताकि टेलीमेट्री को स्ट्रीम किया जा सके.

मिशन

आपको एक कस्टम एजेंट बनाना होगा. यह एजेंट, पॉड को सिग्नल बढ़ाने वाले खास फ़ॉर्मेशन (सर्कल, स्टार, लाइन) में रखने के लिए ज़रूरी जटिल वेक्टर गणित का हिसाब लगा सकता है. आपको इस एजेंट को सैटलाइट के नए आर्किटेक्चर में शामिल करना होगा.

आपको क्या बनाना है

खास जानकारी

  • React पर आधारित हेड्स-अप डिसप्ले (एचयूडी), जिससे 15 पॉड के फ़्लीट को रीयल-टाइम में विज़ुअलाइज़ और कंट्रोल किया जा सकता है.
  • Google के एजेंट डेवलपमेंट किट (एडीके) का इस्तेमाल करने वाला जनरेटिव एआई एजेंट, जो नैचुरल लैंग्वेज कमांड के आधार पर पॉड के लिए जटिल ज्यामितीय संरचनाओं का हिसाब लगाता है.
  • Python पर आधारित सैटलाइट स्टेशन बैकएंड, जो सेंट्रल हब के तौर पर काम करता है. यह Server-Sent Events (एसएसई) के ज़रिए फ़्रंटएंड से कम्यूनिकेट करता है.
  • Apache Kafka का इस्तेमाल करने वाला इवेंट-ड्रिवन आर्किटेक्चर, जो एआई एजेंट को सैटलाइट कंट्रोल सिस्टम से अलग करता है. इससे, भरोसेमंद और एसिंक्रोनस कम्यूनिकेशन किया जा सकता है.

आपको क्या सीखने को मिलेगा

टेक्नोलॉजी / कॉन्सेप्ट

ब्यौरा

Google ADK (Agent Development Kit)

इस फ़्रेमवर्क का इस्तेमाल, Gemini मॉडल की मदद से काम करने वाले एआई एजेंट को बनाने, उसकी टेस्टिंग करने, और उसे तैयार करने के लिए किया जाएगा.

इवेंट-ड्रिवन आर्किटेक्चर (ईडीए)

आपको डीकपल्ड सिस्टम बनाने के सिद्धांतों के बारे में जानकारी मिलेगी. इसमें कॉम्पोनेंट, इवेंट के ज़रिए एसिंक्रोनस तरीके से कम्यूनिकेट करते हैं. इससे ऐप्लिकेशन ज़्यादा भरोसेमंद और स्केलेबल बनता है.

Apache Kafka

आपको Kafka को डिस्ट्रिब्यूटेड इवेंट स्ट्रीमिंग प्लैटफ़ॉर्म के तौर पर सेट अप करना होगा और इसका इस्तेमाल करना होगा. इससे अलग-अलग माइक्रोसेवाओं के बीच कमांड और डेटा के फ़्लो को मैनेज किया जा सकेगा.

Server-Sent Events (SSE)

आपको FastAPI बैकएंड में एसएसई लागू करना होगा, ताकि सर्वर से रीयल-टाइम टेलीमेट्री डेटा को React फ़्रंटएंड पर पुश किया जा सके. इससे यूज़र इंटरफ़ेस (यूआई) लगातार अपडेट होता रहेगा.

A2A (Agent-to-Agent) प्रोटोकॉल

आपको अपने एजेंट को A2A सर्वर में रैप करने का तरीका बताया जाएगा. इससे, एजेंट के बड़े इकोसिस्टम में स्टैंडर्ड तरीके से बातचीत की जा सकेगी और इंटरऑपरेबिलिटी को चालू किया जा सकेगा.

FastAPI

इस हाई-परफ़ॉर्मेंस वाले Python वेब फ़्रेमवर्क का इस्तेमाल करके, मुख्य बैकएंड सेवा, यानी कि Satellite Station बनाई जाएगी.

प्रतिक्रिया दें

आपको एक आधुनिक फ़्रंटएंड ऐप्लिकेशन के साथ काम करना होगा. यह ऐप्लिकेशन, डाइनैमिक और इंटरैक्टिव यूज़र इंटरफ़ेस बनाने के लिए, एसएसई स्ट्रीम की सदस्यता लेता है.

सिस्टम कंट्रोल में जनरेटिव एआई की सुविधा

आपको यह पता चलेगा कि लार्ज लैंग्वेज मॉडल (एलएलएम) को सिर्फ़ बातचीत करने के बजाय, डेटा से जुड़े खास टास्क (जैसे, कोऑर्डिनेट जनरेट करना) करने के लिए कैसे प्रॉम्प्ट किया जा सकता है.

2. अपना एनवायरमेंट सेट अप करना

Cloud Shell ऐक्सेस करना

👉Google Cloud Console में सबसे ऊपर मौजूद, Cloud Shell चालू करें पर क्लिक करें (यह Cloud Shell पैनल में सबसे ऊपर मौजूद टर्मिनल के आकार का आइकॉन है), cloud-shell.png

👉 "एडिटर खोलें" बटन पर क्लिक करें. यह बटन, पेंसिल वाले खुले फ़ोल्डर की तरह दिखता है. इससे विंडो में Cloud Shell Code Editor खुल जाएगा. आपको बाईं ओर फ़ाइल एक्सप्लोरर दिखेगा. open-editor.png

👉क्लाउड आईडीई में टर्मिनल खोलें,

03-05-new-terminal.png

👉💻 टर्मिनल में, पुष्टि करें कि आपने पहले ही पुष्टि कर ली है और प्रोजेक्ट को अपने प्रोजेक्ट आईडी पर सेट किया गया है. इसके लिए, यह कमांड इस्तेमाल करें:

gcloud auth list

आपको अपना खाता (ACTIVE) के तौर पर दिखेगा.

ज़रूरी शर्तें

ℹ️ लेवल 0 ज़रूरी नहीं है (लेकिन इसका सुझाव दिया जाता है)

इस मिशन को लेवल 0 पर भी पूरा किया जा सकता है. हालांकि, इसे सबसे पहले पूरा करने पर आपको ज़्यादा बेहतर अनुभव मिलता है. इससे आपको ग्लोबल मैप पर अपना बीकन जलता हुआ दिखता है.

प्रोजेक्ट एनवायरमेंट सेट अप करना

अपने टर्मिनल पर वापस जाएं. इसके बाद, चालू प्रोजेक्ट सेट करके और ज़रूरी Google Cloud सेवाएं (Cloud Run, Vertex AI वगैरह) चालू करके, कॉन्फ़िगरेशन पूरा करें.

👉💻 अपने टर्मिनल में, प्रोजेक्ट आईडी सेट करें:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 ज़रूरी सेवाएं चालू करें:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

डिपेंडेंसी इंस्टॉल करना

👉💻 लेवल 5 पर जाएं और ज़रूरी Python पैकेज इंस्टॉल करें:

cd $HOME/way-back-home/level_5
uv sync

मुख्य डिपेंडेंसी ये हैं:

पैकेज

मकसद

fastapi

सैटलाइट स्टेशन और एसएसई स्ट्रीमिंग के लिए, ज़्यादा परफ़ॉर्मेंस वाला वेब फ़्रेमवर्क

uvicorn

FastAPI ऐप्लिकेशन को चलाने के लिए, ASGI सर्वर की ज़रूरत होती है

google-adk

फ़ॉर्मेशन एजेंट को बनाने के लिए इस्तेमाल की गई एजेंट डेवलपमेंट किट

a2a-sdk

स्टैंडर्ड कम्यूनिकेशन के लिए, एजेंट-टू-एजेंट प्रोटोकॉल लाइब्रेरी

aiokafka

इवेंट लूप के लिए एसिंक्रोनस Kafka क्लाइंट

google-genai

Gemini मॉडल को ऐक्सेस करने के लिए नेटिव क्लाइंट

numpy

सिमुलेशन के लिए, वेक्टर मैथ और कोऑर्डिनेट कैलकुलेशन

websockets

रीयल-टाइम में दोनों तरफ़ से बातचीत करने की सुविधा

python-dotenv

यह एनवायरमेंट वैरिएबल और कॉन्फ़िगरेशन सीक्रेट मैनेज करता है

sse-starlette

सर्वर-सेंट इवेंट (एसएसई) को बेहतर तरीके से हैंडल करना

requests

बाहरी एपीआई कॉल के लिए सामान्य एचटीटीपी लाइब्रेरी

सेटअप की पुष्टि करना

कोड को लॉन्च करने से पहले, आइए पक्का करें कि सभी सिस्टम ठीक से काम कर रहे हों. अपने Google Cloud प्रोजेक्ट, एपीआई, और Python डिपेंडेंसी का ऑडिट करने के लिए, पुष्टि करने वाली स्क्रिप्ट चलाएं.

👉💻 पुष्टि करने वाली स्क्रिप्ट चलाएं:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 आपको ग्रीन चेक (✅) की एक सीरीज़ दिखेगी.

  • अगर आपको लाल रंग के क्रॉस (❌) दिखते हैं, तो आउटपुट में दिए गए, समस्या ठीक करने के सुझावों का पालन करें. उदाहरण के लिए, gcloud services enable ... या pip install ...).
  • ध्यान दें: फ़िलहाल, .env के लिए पीले रंग की चेतावनी स्वीकार की जा सकती है. हम अगले चरण में वह फ़ाइल बनाएंगे.
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. एलएलएम की मदद से पॉड की पोज़िशन को फ़ॉर्मैट करना

हमें बचाव अभियान के लिए "ब्रेन" बनाना होगा. यह Google ADK (Agent Development Kit) का इस्तेमाल करके बनाया गया एजेंट होगा. इसका मकसद सिर्फ़ एक खास तरह के जियोमेट्रिक नेविगेटर के तौर पर काम करना है. सामान्य एलएलएम को बातचीत करना पसंद होता है, लेकिन डीप स्पेस में हमें डेटा की ज़रूरत होती है, बातचीत की नहीं. हम इस एजेंट को "स्टार" जैसे कमांड लेने और हमारे 15 पॉड के लिए रॉ JSON कोऑर्डिनेट वापस करने के लिए प्रोग्राम करेंगे.

एजेंट

एजेंट को तैयार करना

👉💻 एजेंट डायरेक्ट्री पर जाने और ADK बनाने वाले विज़र्ड को शुरू करने के लिए, यहां दिए गए कमांड चलाएं:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

सीएलआई, इंटरैक्टिव सेटअप विज़र्ड लॉन्च करेगा. अपने एजेंट को कॉन्फ़िगर करने के लिए, इन जवाबों का इस्तेमाल करें:

  1. कोई मॉडल चुनें: पहला विकल्प (Gemini Flash) चुनें.
    • ध्यान दें: वर्शन अलग-अलग हो सकता है. तेज़ी से काम करने के लिए, हमेशा "फ़्लैश" वर्शन चुनें.
  2. कोई बैकएंड चुनें: विकल्प 2 (Vertex AI) चुनें.
  3. Google Cloud प्रोजेक्ट आईडी डालें: डिफ़ॉल्ट आईडी (आपके एनवायरमेंट से पता लगाया गया) स्वीकार करने के लिए, Enter दबाएं.
  4. Google Cloud Region डालें: डिफ़ॉल्ट (us-central1) को स्वीकार करने के लिए, Enter दबाएं.

👀 आपके टर्मिनल इंटरैक्शन का आउटपुट कुछ ऐसा दिखना चाहिए:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

आपको Agent created 'हो गया' मैसेज दिखेगा. इससे एक स्केलेटन कोड जनरेट होता है, जिसे अब हम बदलेंगे.

👉✏️ अपने एडिटर में, नई बनाई गई $HOME/way-back-home/level_5/agent/formation/agent.py फ़ाइल पर जाएं और उसे खोलें. फ़ाइल के पूरे कॉन्टेंट की जगह नीचे दिया गया कोड डालें. इससे एजेंट का नाम अपडेट हो जाता है और उसके काम करने के तरीके के बारे में जानकारी मिल जाती है.

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • ज्यामितीय सटीक जानकारी: सिस्टम प्रॉम्प्ट में "कैनवस का साइज़" और "सेफ़ मार्जिन" तय करके, हम यह पक्का करते हैं कि एजेंट, पॉड को स्क्रीन से बाहर या यूज़र इंटरफ़ेस (यूआई) एलिमेंट के नीचे न रखे.
  • JSON फ़ॉर्मैट लागू करना: एलएलएम को "फ़ॉर्मेशन से जुड़े सवालों के जवाब न देने" और "कोई भूमिका न देने" के लिए कहकर, हम यह पक्का करते हैं कि जवाब को पार्स करते समय, हमारा डाउनस्ट्रीम कोड (सैटलाइट) क्रैश न हो.
  • डिकपल्ड लॉजिक: इस एजेंट को अभी तक Kafka के बारे में जानकारी नहीं है. इसे सिर्फ़ गणित के सवालों को हल करना आता है. अगले चरण में, हम इस "ब्रेन" को Kafka सर्वर में रैप करेंगे.

एजेंट को स्थानीय तौर पर टेस्ट करना

एजेंट को Kafka "नर्वस सिस्टम" से कनेक्ट करने से पहले, हमें यह पक्का करना होगा कि यह सही तरीके से काम कर रहा हो. टर्मिनल में सीधे तौर पर अपने एजेंट से इंटरैक्ट किया जा सकता है. इससे यह पुष्टि की जा सकती है कि वह मान्य JSON कोऑर्डिनेट जनरेट करता है.

👉💻 अपने एजेंट के साथ चैट सेशन शुरू करने के लिए, adk run कमांड का इस्तेमाल करें.

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. इनपुट: Circle टाइप करें और Enter दबाएं.
    • सफलता की शर्तें: आपको रॉ JSON फ़ॉर्मैट में सूची दिखनी चाहिए. उदाहरण के लिए, [{"x": 400, "y": 200}, ...]). पक्का करें कि JSON से पहले "यहां निर्देशांक दिए गए हैं:" जैसा कोई मार्कडाउन टेक्स्ट न हो.
  2. इनपुट: Line टाइप करें और Enter दबाएं.
    • सफलता की शर्तें: पुष्टि करें कि निर्देशांकों से एक हॉरिज़ॉन्टल लाइन बनती है (y-वैल्यू एक जैसी होनी चाहिए).

जब आपको यह पक्का हो जाए कि एजेंट, साफ़ JSON फ़ाइल आउटपुट कर रहा है, तब उसे Kafka Server में रैप किया जा सकता है.

👉💻 बाहर निकलने के लिए Ctrl+C दबाएं.

4. फ़ॉर्मेशन एजेंट के लिए A2A सर्वर बनाना

A2A (एजेंट-टू-एजेंट) के बारे में जानकारी

A2A (एजेंट-टू-एजेंट) प्रोटोकॉल एक ओपन स्टैंडर्ड है. इसे एआई एजेंट के बीच आसानी से इंटरऑपरेबिलिटी की सुविधा देने के लिए डिज़ाइन किया गया है. इस फ़्रेमवर्क की मदद से, एजेंट सिर्फ़ टेक्स्ट का आदान-प्रदान करने के बजाय, कई अन्य काम भी कर सकते हैं. जैसे, टास्क सौंपना, मुश्किल कार्रवाइयों को मैनेज करना, और एक साथ मिलकर काम करना. इससे वे डिस्ट्रिब्यूटेड इकोसिस्टम में, एक ही लक्ष्य को हासिल कर पाते हैं.

A2A

A2A ट्रांसपोर्ट को समझना: एचटीटीपी, gRPC, और Kafka

A2A प्रोटोकॉल, क्लाइंट और एजेंट के बीच बातचीत करने के दो अलग-अलग तरीके उपलब्ध कराता है. ये दोनों तरीके, आर्किटेक्चर की अलग-अलग ज़रूरतों को पूरा करते हैं. एचटीटीपी (JSON-RPC) डिफ़ॉल्ट रूप से उपलब्ध स्टैंडर्ड है. यह सभी वेब एनवायरमेंट में काम करता है. gRPC, ज़्यादा परफ़ॉर्मेंस वाला विकल्प है. यह प्रोटोकॉल बफ़र का इस्तेमाल करता है, ताकि कम्यूनिकेशन को ज़्यादा असरदार बनाया जा सके. साथ ही, लैब में Kafka ट्रांसपोर्ट की सुविधा भी उपलब्ध है. यह एक कस्टम इंप्लीमेंटेशन है. इसे इवेंट-ड्रिवन आर्किटेक्चर के लिए डिज़ाइन किया गया है. इसमें डीकपलिंग सिस्टम को प्राथमिकता दी जाती है.

परिवहन

हालांकि, ये ट्रांसपोर्ट डेटा के फ़्लो को अलग-अलग तरीके से हैंडल करते हैं. एचटीटीपी मॉडल में, क्लाइंट एक JSON अनुरोध भेजता है और कनेक्शन को खुला रखता है. ऐसा तब तक किया जाता है, जब तक एजेंट अपना काम पूरा नहीं कर लेता और एक बार में पूरा नतीजा नहीं दिखा देता. gRPC, बाइनरी डेटा और HTTP/2 का इस्तेमाल करके इसे ऑप्टिमाइज़ करता है. इससे अनुरोध-जवाब के सामान्य साइकल और रीयल-टाइम स्ट्रीमिंग, दोनों की अनुमति मिलती है. रीयल-टाइम स्ट्रीमिंग में, एजेंट अपडेट भेजता है. जैसे, "सोचा गया" या "आर्टफ़ैक्ट बनाया गया". Kafka को एसिंक्रोनस तरीके से लागू किया जाता है: क्लाइंट, "अनुरोध विषय" पर अनुरोध पब्लिश करता है और "जवाब विषय" पर सुनता है. सर्वर, मैसेज को तब प्रोसेस करता है, जब वह ऐसा कर सकता है. इसके बाद, वह नतीजे को वापस पोस्ट करता है. इसका मतलब है कि दोनों कभी एक-दूसरे से सीधे तौर पर बात नहीं करते.

यह आपकी ज़रूरतों पर निर्भर करता है कि आपको कितनी तेज़, जटिल, और लगातार काम करने वाली सेवा चाहिए. एचटीटीपी का इस्तेमाल करना और इसे डीबग करना सबसे आसान है. इसलिए, यह सामान्य इंटिग्रेशन के लिए सबसे सही है. gRPC, एक बेहतर विकल्प है. इसका इस्तेमाल, एक सेवा से दूसरी सेवा के बीच कम्यूनिकेशन के लिए किया जाता है. इसमें कम इंतज़ार का समय और स्ट्रीमिंग टास्क के अपडेट ज़रूरी होते हैं. हालांकि, Kafka एक भरोसेमंद विकल्प है. ऐसा इसलिए, क्योंकि यह अनुरोधों को डिस्क पर एक कतार में सेव करता है. इससे एजेंट सर्वर क्रैश होने या रीस्टार्ट होने पर भी आपके टास्क बने रहते हैं. साथ ही, यह एक ऐसा लेवल देता है जो न तो एचटीटीपी और न ही gRPC दे सकता है.

कस्टम ट्रांसपोर्टेशन लेयर: Kafka

Kafka, एसिंक्रोनस बैकबोन के तौर पर काम करता है. यह ऑपरेशन के मुख्य हिस्से (फ़ॉर्मेशन एजेंट) को फ़िज़िकल कंट्रोल (सैटलाइट स्टेशन) से अलग करता है. एजेंट, जटिल वेक्टर का हिसाब लगाते समय सिस्टम को सिंक्रोनस कनेक्शन पर इंतज़ार करने के लिए मजबूर करने के बजाय, अपने नतीजों को Kafka विषय के इवेंट के तौर पर पब्लिश करता है. यह एक बफ़र की तरह काम करता है. इससे सैटलाइट को अपनी गति के हिसाब से निर्देश मिलते हैं. साथ ही, यह पक्का किया जाता है कि नेटवर्क में ज़्यादा रुकावट आने या सिस्टम के कुछ समय के लिए क्रैश होने पर भी, फ़ॉर्मेशन का डेटा कभी न मिटे.

Kafka का इस्तेमाल करके, धीमी और लीनियर प्रोसेस को एक ऐसी स्ट्रीमिंग पाइपलाइन में बदला जा सकता है जो मुश्किल समय में भी काम करती है. इसमें निर्देश और टेलीमेट्री डेटा अलग-अलग तरीके से फ़्लो होता है. इससे एआई की प्रोसेसिंग के दौरान भी मिशन के एचयूडी को रिस्पॉन्सिव बनाए रखने में मदद मिलती है.

Kafka

Kafka क्या है?

Kafka, इवेंट-स्ट्रीमिंग का डिस्ट्रिब्यूटेड प्लैटफ़ॉर्म है. इवेंट-ड्रिवन आर्किटेक्चर (ईडीए) में:

  1. प्रोड्यूसर, "विषयों" पर मैसेज पब्लिश करते हैं.
  2. उपयोगकर्ता इन विषयों की सदस्यता लेते हैं और जब कोई मैसेज आता है, तो उस पर प्रतिक्रिया देते हैं.

Kafka का इस्तेमाल क्यों करें?

इससे आपके सिस्टम अलग हो जाते हैं. फ़ॉर्मेशन एजेंट, अपने-आप काम करता है. यह आने वाले अनुरोधों का इंतज़ार करता है. इसके लिए, उसे भेजने वाले की पहचान या स्टेटस जानने की ज़रूरत नहीं होती. इससे ज़िम्मेदारी अलग हो जाती है. इससे यह पक्का होता है कि सैटलाइट के ऑफ़लाइन होने पर भी वर्कफ़्लो बना रहता है. Kafka, सैटलाइट के फिर से कनेक्ट होने तक मैसेज सेव करता है.

Google Cloud Pub/Sub के बारे में क्या जानकारी है?

इसके लिए, Google Cloud Pub/Sub का इस्तेमाल किया जा सकता है! Pub/Sub, Google की सर्वरलेस मैसेज सेवा है. Kafka, ज़्यादा थ्रूपुट और "दोबारा चलाने" वाली स्ट्रीम के लिए बेहतरीन है. हालांकि, Pub/Sub को इस्तेमाल करने में आसानी होती है. इसलिए, इसे अक्सर प्राथमिकता दी जाती है. इस लैब के लिए, हम Kafka का इस्तेमाल कर रहे हैं, ताकि एक मज़बूत और लगातार काम करने वाला मैसेज बस तैयार किया जा सके.

लोकल Kafka क्लस्टर शुरू करना

नीचे दिए गए पूरे कमांड ब्लॉक को कॉपी करें और अपने टर्मिनल में चिपकाएं. इससे Kafka की आधिकारिक इमेज डाउनलोड हो जाएगी और बैकग्राउंड में शुरू हो जाएगी.

👉💻 अपने टर्मिनल में ये कमांड चलाएं:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

👉💻 देखें कि कंटेनर, docker ps कमांड के साथ चल रहा हो.

docker ps

👀 आपको एक आउटपुट दिखेगा. इससे पुष्टि होगी कि mission-kafka कंटेनर चल रहा है और पोर्ट 9092 चालू है.

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

Kafka Topic क्या है?

Kafka विषय को मैसेज के लिए एक खास चैनल या कैटगरी के तौर पर समझें. यह एक लॉगबुक की तरह होता है, जिसमें इवेंट के रिकॉर्ड को उसी क्रम में सेव किया जाता है जिस क्रम में वे जनरेट हुए थे. प्रोड्यूसर, किसी विषय के बारे में मैसेज लिखते हैं. वहीं, उपभोक्ता उन विषयों के बारे में पढ़ते हैं. इससे, मैसेज भेजने वाला और पाने वाला अलग हो जाता है. मैसेज बनाने वाले को यह जानने की ज़रूरत नहीं होती कि कौन सा उपभोक्ता डेटा पढ़ेगा. उसे सिर्फ़ सही "चैनल" पर डेटा भेजना होता है. इस मिशन में, हम दो विषय बनाएंगे: एक एजेंट को फ़ॉर्मेशन के अनुरोध भेजने के लिए और दूसरा एजेंट को अपने जवाब पब्लिश करने के लिए, ताकि सैटलाइट उन्हें पढ़ सके.

Kafka

👉💻 चालू Docker कंटेनर में ज़रूरी विषय बनाने के लिए, यहां दी गई कमांड चलाएं.

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

👉💻 यह पुष्टि करने के लिए कि आपके चैनल खुले हैं, सूची बनाने का निर्देश चलाएं:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 आपको अभी बनाए गए विषयों के नाम दिखेंगे.

a2a-formation-request
a2a-reply-satellite-dashboard

आपका Kafka इंस्टेंस अब पूरी तरह से कॉन्फ़िगर हो गया है और मिशन के लिए ज़रूरी डेटा को राउट करने के लिए तैयार है.

Kafka A2A सर्वर लागू करना

एजेंट-टू-एजेंट (A2A) प्रोटोकॉल, स्वतंत्र एजेंटिक सिस्टम के बीच इंटरऑपरेबिलिटी के लिए एक स्टैंडर्ड फ़्रेमवर्क बनाता है. इसकी मदद से, अलग-अलग टीमों के बनाए गए या अलग-अलग इन्फ़्रास्ट्रक्चर पर काम करने वाले एजेंट एक-दूसरे को ढूंढ सकते हैं और साथ मिलकर काम कर सकते हैं. इसके लिए, हर कनेक्शन के लिए इंटिग्रेशन लॉजिक की ज़रूरत नहीं होती.

रेफ़रंस इंप्लीमेंटेशन, a2a-python, इन एजेंटिक ऐप्लिकेशन को चलाने के लिए एक बुनियादी लाइब्रेरी है. इसके डिज़ाइन की मुख्य सुविधा एक्सटेंसिबिलिटी है. यह कम्यूनिकेशन लेयर को ऐब्स्ट्रैक्ट करता है, जिससे डेवलपर एचटीटीपी जैसे प्रोटोकॉल को दूसरों के लिए स्वैप कर सकते हैं.

A2A फ़्लो

इस प्रोजेक्ट में, हमने कस्टम Kafka को लागू करके, इस एक्सटेंसिबिलिटी का फ़ायदा उठाया है: a2a-python-kafka. हम इस तरीके का इस्तेमाल करके यह दिखाएंगे कि A2A स्टैंडर्ड की मदद से, एजेंट के साथ होने वाली बातचीत को अलग-अलग आर्किटेक्चर की ज़रूरतों के हिसाब से कैसे बदला जा सकता है. इस मामले में, सिंक्रोनस एचटीटीपी को एसिंक्रोनस इवेंट बस के लिए स्वैप किया जा रहा है.

फ़ॉर्मेशन एजेंट के लिए A2A की सुविधा चालू करना

अब हम अपने एजेंट को A2A सर्वर में रैप करेंगे. इससे यह एक ऐसी सेवा बन जाएगी जो इन कामों को कर सकती है:

  • Kafka के किसी विषय से टास्क सुनने के लिए.
  • मिले हुए टास्क को प्रोसेस करने के लिए, एआई एजेंट को सौंपता है.
  • जवाब के विषय पर नतीजा पब्लिश करें.

👉✏️ $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py में, #REPLACE-CREATE-KAFKA-A2A-SERVER को इस कोड से बदलें:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

यह कोड, मुख्य कॉम्पोनेंट सेट अप करता है:

  1. द रनर: यह एजेंट के रनटाइम की जानकारी देता है. जैसे, मेमोरी, क्रेडेंशियल वगैरह को मैनेज करना.
  2. टास्क स्टोर: यह कुकी, अनुरोधों की स्थिति को ट्रैक करती है. जैसे, "मंज़ूरी बाकी" से "पूरा हुआ" तक.
  3. Agent Executor: यह Kafka से टास्क लेता है और कोऑर्डिनेट का हिसाब लगाने के लिए, इसे एजेंट को पास करता है.
  4. KafkaServerApp: यह Kafka ब्रोकर से फ़िज़िकल कनेक्शन को मैनेज करता है.

A2A काफ़्का

एनवायरमेंट वैरिएबल कॉन्फ़िगर करना

ADK सेटअप ने एजेंट के फ़ोल्डर में, Google Vertex AI की सेटिंग वाली .env फ़ाइल बनाई है. हमें इसे प्रोजेक्ट रूट में ले जाना होगा और अपने Kafka क्लस्टर के लिए कोऑर्डिनेट जोड़ने होंगे.

फ़ाइल को कॉपी करने और Kafka सर्वर का पता जोड़ने के लिए, ये कमांड चलाएं:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

A2A इंटरस्टेलर लूप की पुष्टि करना

अब हम यह पक्का करेंगे कि एसिंक्रोनस इवेंट लूप सही तरीके से काम कर रहा है या नहीं. इसके लिए, हम लाइव-फ़ायर टेस्ट करेंगे. इसमें, Kafka क्लस्टर के ज़रिए मैन्युअल सिग्नल भेजा जाएगा और एजेंट के जवाब पर नज़र रखी जाएगी.

A2A इंटरस्टेलर लूप की पुष्टि करना

हम इवेंट के पूरे लाइफ़साइकल को देखने के लिए, तीन अलग-अलग टर्मिनल का इस्तेमाल करेंगे.

टर्मिनल A: फ़ॉर्मेशन एजेंट (A2A Kafka सर्वर)

👉💻 यह टर्मिनल, Python प्रोसेस को रन करता है. यह प्रोसेस Kafka से डेटा लेती है और ज्यामितीय गणित के लिए Gemini का इस्तेमाल करती है.

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

जब तक आपको यह न दिखे, तब तक इंतज़ार करें:

[INFO] Kafka Server App Started. Starting to consume requests...

टर्मिनल B: सैटलाइट लिसनर (उपभोक्ता)

👉💻 इस टर्मिनल में, हम reply topic को सुनेंगे. इससे यह पता चलता है कि सैटलाइट, निर्देशों का इंतज़ार कर रहा है.

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

यह टर्मिनल, इस्तेमाल में नहीं है के तौर पर दिखेगा. यह एजेंट के मैसेज पब्लिश करने का इंतज़ार कर रहा है.

टर्मिनल C: कमांडर का सिग्नल (प्रोड्यूसर)

👉💻 अब हम a2a-formation-request विषय में, A2A फ़ॉर्मैट में एक रॉ अनुरोध भेजेंगे. हमें कुछ खास Kafka हेडर शामिल करने होंगे, ताकि एजेंट को पता चल सके कि जवाब कहां भेजना है.

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

नतीजे का विश्लेषण करना

👀 अगर लूप सही तरीके से काम कर रहा है, तो टर्मिनल B पर स्विच करें. आपको तुरंत एक बड़ा JSON ब्लॉक दिखेगा. यह हमारे भेजे गए हेडर correlation_id:ping-manual-01 से शुरू होगा. इसके बाद, task ऑब्जेक्ट आता है. अगर आप उस JSON में मौजूद parts सेक्शन को ध्यान से देखेंगे, तो आपको Gemini के कैलकुलेट किए गए 15 पॉड के रॉ X और Y कोऑर्डिनेट दिखेंगे:

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

आपने एजेंट को रिसीवर से अलग कर दिया है. अनुरोध और जवाब के बीच लगने वाले समय में होने वाले "इंटरस्टेलर नॉइज़" से अब कोई फ़र्क़ नहीं पड़ता, क्योंकि हमारा सिस्टम अब पूरी तरह से इवेंट-ड्रिवन है.

आगे बढ़ने से पहले, बैकग्राउंड में चल रही प्रोसेस बंद करें, ताकि नेटवर्क पोर्ट खाली हो जाएं.

👉💻 हर टर्मिनल (A, B, और C) में:

  • चल रही प्रोसेस को बंद करने के लिए, Ctrl + C दबाएं.

5. सैटलाइट स्टेशन (A2A Kafka Client और SSE)

इस चरण में, हम सैटलाइट स्टेशन बनाते हैं. यह Kafka क्लस्टर और पायलट के विज़ुअल डिसप्ले (React फ़्रंटएंड) के बीच का ब्रिज है. यह सर्वर, Kafka Client (एजेंट से बात करने के लिए) और SSE Streamer (ब्राउज़र से बात करने के लिए), दोनों के तौर पर काम करता है.

Kafka Client क्या है?

Kafka क्लस्टर को रेडियो स्टेशन की तरह समझें. Kafka Client, रेडियो रिसीवर होता है. KafkaClientTransport की मदद से, हमारा ऐप्लिकेशन ये काम कर सकता है:

  1. मैसेज लिखना: "टास्क" भेजें. उदाहरण के लिए, "स्टार फ़ॉर्मेशन") के बारे में जानकारी दी गई.
  2. जवाब पाना: एजेंट से निर्देशांक वापस पाने के लिए, किसी "जवाब के विषय" पर सुनें.

1. कनेक्शन शुरू किया जा रहा है

हम FastAPI के lifespan इवेंट हैंडलर का इस्तेमाल करते हैं. इससे यह पक्का किया जाता है कि सर्वर के बूट अप होने पर Kafka कनेक्शन शुरू हो जाए और बंद होने पर बंद हो जाए.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py में, #REPLACE-CONNECT-TO-KAFKA-CLUSTER को इस कोड से बदलें:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. कोई निर्देश भेजना

डैशबोर्ड पर मौजूद किसी बटन पर क्लिक करने पर, /formation एंडपॉइंट ट्रिगर होता है. यह प्रोड्यूसर के तौर पर काम करता है. यह आपके अनुरोध को एक फ़ॉर्मल A2A Message में रैप करके, एजेंट को भेजता है.

फॉर्मेशन

मुख्य लॉजिक:

  • एसिंक्रोनस कम्यूनिकेशन: kafka_transport.send_message अनुरोध भेजता है और reply_topic पर नए कोऑर्डिनेट आने का इंतज़ार करता है.
  • जवाब पार्स करना: Gemini, जगह की जानकारी को मार्कडाउन ब्लॉक में दिखा सकता है. उदाहरण के लिए, json ... ). नीचे दिया गया कोड, इन वर्णों को हटा देता है और स्ट्रिंग को पॉइंट की Python सूची में बदल देता है.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py में, #REPLACE-FORMATION-REQUEST को इस कोड से बदलें:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

सर्वर-सेंट इवेंट (एसएसई)

स्टैंडर्ड एपीआई, "अनुरोध-जवाब" मॉडल का इस्तेमाल करते हैं. हमारे एचयूडी के लिए, हमें पॉड की पोज़िशन की "लाइव स्ट्रीम" चाहिए.

एसएसई का इस्तेमाल क्यों करें वेबसॉकेट (जो दोनों दिशाओं में काम करते हैं और ज़्यादा जटिल होते हैं) के उलट, एसएसई, सर्वर से ब्राउज़र तक डेटा स्ट्रीम करने का एक आसान तरीका है. यह डैशबोर्ड, स्टॉक टिकर या इंटरस्टेलर टेलीमेट्री के लिए सबसे सही है.

SSE

यह हमारे कोड में कैसे काम करता है: हम एक event_generator बनाते हैं. यह एक ऐसा लूप होता है जो हर आधे सेकंड में सभी 15 पॉड की मौजूदा पोज़िशन लेता है और उन्हें अपडेट के तौर पर ब्राउज़र पर "पुश" करता है.

👉✏️ $HOME/way-back-home/level_5/satellite/main.py में, #REPLACE-SSE-STREAM को इस कोड से बदलें:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

पूरे मिशन लूप को लागू करना

फ़ाइनल यूज़र इंटरफ़ेस (यूआई) लॉन्च करने से पहले, आइए हम पुष्टि कर लें कि सिस्टम एंड-टू-एंड काम करता है. हम एजेंट को मैन्युअल तरीके से ट्रिगर करेंगे और वायर पर रॉ डेटा पेलोड देखेंगे.

पुष्टि करें

तीन अलग-अलग टर्मिनल टैब खोलें.

टर्मिनल A: फ़ॉर्मेशन एजेंट (A2A सर्वर)

👉💻 यह ADK एजेंट है. यह टास्क के लिए सुनता है और जियोमेट्रिक गणित करता है.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

टर्मिनल B: सैटलाइट स्टेशन (Kafka Client)

👉💻 यह FastAPI सर्वर,"Receiver" के तौर पर काम करता है. यह Kafka के जवाबों को सुनता है और उन्हें लाइव SSE स्ट्रीम में बदलता है.

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

टर्मिनल सी: मैन्युअल एचयूडी

Send Formation Command (Trigger): 👉💻 In the same terminal C, trigger the formation process:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

👀 आपको नए निर्देशांक दिखेंगे.

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

इससे पुष्टि होती है कि सैटलाइट ने अपने इंटरनल पॉड के निर्देशांक अपडेट कर दिए हैं.

👉💻 हम curl का इस्तेमाल करके, सबसे पहले लाइव टेलीमेट्री स्ट्रीम को सुनेंगे. इसके बाद, फ़ॉर्मेशन में बदलाव करेंगे.

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

👀 curl -N कमांड का आउटपुट देखें. pod_update इवेंट में मौजूद x और y कोऑर्डिनेट, स्टार फ़ॉर्मेशन की नई पोज़िशन दिखाने लगेंगे.

आगे बढ़ने से पहले, सभी चालू प्रोसेस बंद करें, ताकि कम्यूनिकेशन पोर्ट खाली हो जाएं.

हर टर्मिनल (A, B, C, और ट्रिगर टर्मिनल) में: Ctrl + C दबाएं.

6. Go Rescue!

आपने सिस्टम को सेट अप कर लिया है. अब मिशन को पूरा करने का समय आ गया है. अब हम React पर आधारित हेड-अप डिसप्ले (एचयूडी) लॉन्च करेंगे. यह डैशबोर्ड, Satellite Station से SSE के ज़रिए कनेक्ट होता है. इससे आपको 15 पॉड की रीयल-टाइम जानकारी मिलती है.

खास जानकारी

कोई निर्देश देने पर, सिर्फ़ किसी फ़ंक्शन को कॉल नहीं किया जाता है. इससे एक इवेंट ट्रिगर होता है, जो Kafka से होकर गुज़रता है. इसके बाद, इसे एआई एजेंट प्रोसेस करता है और यह लाइव टेलीमेट्री के तौर पर आपकी स्क्रीन पर वापस स्ट्रीम होता है.

पुष्टि करें

दो अलग-अलग टर्मिनल टैब खोलें.

टर्मिनल A: फ़ॉर्मेशन एजेंट (A2A सर्वर)

👉💻 यह ADK एजेंट है. यह टास्क के लिए सुनता है और Gemini का इस्तेमाल करके, ज्यामितीय गणित के सवाल हल करता है. टर्मिनल में यह कमांड चलाएं:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

टर्मिनल बी: सैटलाइट स्टेशन और विज़ुअल डैशबोर्ड

👉💻 सबसे पहले, फ़्रंटएंड ऐप्लिकेशन बनाएं.

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

👉💻 अब FastAPI सर्वर शुरू करें. यह बैकएंड लॉजिक और फ़्रंटएंड यूज़र इंटरफ़ेस, दोनों को मैनेज करेगा.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

लॉन्च करना और पुष्टि करना

  1. 👉 झलक देखें: Cloud Shell टूलबार में, वेब झलक आइकॉन पर क्लिक करें. पोर्ट बदलें को चुनें. इसके बाद, इसे 8000 पर सेट करें. इसके बाद, बदलें और झलक देखें पर क्लिक करें. आपके ब्राउज़र में एक नया टैब खुलेगा. इसमें आपको Starfield का एचयूडी दिखेगा. *वेब-प्रीव्यू
  2. 👉 टेलीमेट्री स्ट्रीम की पुष्टि करें:
    • यूज़र इंटरफ़ेस (यूआई) लोड होने के बाद, आपको 15 पॉड रैंडम तरीके से बिखरे हुए दिखेंगे.
    • अगर पॉड धीरे-धीरे पल्स हो रहे हैं या "jittering" कर रहे हैं, तो इसका मतलब है कि SSE स्ट्रीम चालू है. साथ ही, सैटलाइट स्टेशन अपनी पोज़िशन को ब्रॉडकास्ट कर रहा है. शुरू करें
  3. 👉 कंपनी बनाने की प्रोसेस शुरू करें: डैशबोर्ड पर मौजूद "STAR" बटन पर क्लिक करें. स्टार
  4. 👀 इवेंट लूप को ट्रेस करें: अपने टर्मिनल देखें, ताकि आपको आर्किटेक्चर काम करता हुआ दिखे:
    • टर्मिनल बी (सैटलाइट स्टेशन) यह लॉग करेगा: Sending A2A Message: 'Create a STAR formation'.
    • टर्मिनल A (फ़ॉर्मेशन एजेंट) में, Gemini से सलाह लेने के दौरान की गई गतिविधि दिखेगी.
    • टर्मिनल B (सैटलाइट स्टेशन), Received A2A Response को लॉग करेगा और निर्देशांकों को पार्स करेगा.
  5. 👀 विज़ुअल पुष्टि: अपने डैशबोर्ड पर 15 पॉड को देखें. ये पॉड, अपनी रैंडम पोज़िशन से आसानी से खिसककर पांच नोक वाले तारे की शेप में आ जाते हैं.
  6. 👉 एक्सपेरिमेंट:
    • तीन अलग-अलग फ़ॉर्मेशन के लिए, "X" या "लाइन" आज़माएं. X
    • कस्टम इंटेंट: मैन्युअल इनपुट का इस्तेमाल करके, कोई यूनीक चीज़ टाइप करें. जैसे, "दिल" या "त्रिकोण". सर्कल
    • जेन एआई का इस्तेमाल करने पर, एजेंट किसी भी ज्यामितीय आकार के लिए गणित के सवालों को हल करने की कोशिश करेगा!

तीन पैटर्न बनाने के बाद, आपने कनेक्शन को फिर से चालू कर लिया है. हो गया

मिशन पूरा हुआ!

डेटा बिना किसी रुकावट के नॉइज़ से होकर गुज़रता है, इसलिए स्ट्रीम स्थिर हो जाती है. आपके निर्देश पर, 15 प्राचीन पॉड तारों के बीच एक साथ डांस करना शुरू कर देते हैं.

खत्म हो रहा है

कैलिब्रेशन के तीन मुश्किल चरणों के दौरान, आपने टेलीमेट्री को लॉक होते हुए देखा. हर अलाइनमेंट के साथ, सिग्नल और मज़बूत होता गया. आखिर में, यह इंटरस्टेलर इंटरफ़ेरेंस को भेदकर, उम्मीद की किरण की तरह चमकने लगा.

इवेंट-ड्रिवन एजेंट को बेहतरीन तरीके से लागू करने के लिए, आपका शुक्रिया. पांच बचे हुए लोगों को X-42 की सतह से एयरलिफ़्ट कर लिया गया है. वे अब बचाव करने वाले जहाज़ में सुरक्षित हैं. आपकी वजह से, पांच लोगों की जान बचाई जा सकी.

अगर आपने लेवल 0 में हिस्सा लिया था, तो Way Back Home मिशन में अपनी प्रोग्रेस देखना न भूलें! सितारों तक पहुंचने का आपका सफ़र जारी है.फ़ाइनल