Way Back Home - Event-Driven Architecture with Google ADK, A2A, and Kafka

1. המשימה

סטורי

אתם נסחפים בשקט של מגזר לא מוכר. פולס סולארי עצום קרע את הספינה שלכם דרך קרע במרחב, והשאיר אתכם תקועים בכיס ביקום שלא קיים בשום מפת כוכבים.

אחרי ימים של תיקונים מתישים, אתם סוף סוף מרגישים את רעש המנועים מתחת לרגליים. החללית שלך תוקנה. אפילו הצלחתם לאבטח קישור עלייה למטוס האם לטווח ארוך. יש לך אישור להמראה. הכול מוכן.

אבל כשאתם מתכוננים להפעיל את הזיכרון הנייד, אות מצוקה חודר את הרעש הסטטי. החיישנים קולטים אות לעזרה. חמישה אזרחים לכודים על פני השטח של כוכב X-42. התקווה היחידה שלהם להימלט היא 15 תרמילים עתיקים שצריך לסנכרן כדי לשדר אות מצוקה לספינת האם שלהם במסלול.

אבל התרמילים נשלטים על ידי תחנת לוויין, ומחשב הניווט הראשי שלה ניזוק. התרמילים נסחפים ללא מטרה. הצלחנו ליצור חיבור עורפי ללוויין, אבל העלייה ללוויין סובלת מהפרעות בין-כוכביות חמורות, שגורמות לזמן אחזור ארוך מאוד במחזורי בקשה-תגובה.

האתגר

מכיוון שמודל של בקשה/תגובה הוא איטי מדי, אנחנו צריכים לפרוס ארכיטקטורה מבוססת-אירועים (EDA) עם אירועים שנשלחים מהשרת (SSE) כדי להזרים טלמטריה דרך הרעש.

מיסיון

תצטרכו ליצור סוכן בהתאמה אישית שיכול לחשב את המתמטיקה המורכבת של הווקטורים שנדרשת כדי להכריח את המכשירים להתמקם בתצורות ספציפיות לשיפור האות (עיגול, כוכב, קו). צריך לחבר את הסוכן הזה לארכיטקטורה החדשה של הלוויין.

מה תפַתחו

סקירה כללית

  • תצוגה עילית (HUD) שמבוססת על React להמחשה ולשליטה בצי של 15 תרמילים בזמן אמת.
  • סוכן AI גנרטיבי שמשתמש בערכת פיתוח הסוכנים (ADK) של Google כדי לחשב מבנים גיאומטריים מורכבים של התרמילים על סמך פקודות בשפה טבעית.
  • קצה עורפי של תחנת לוויין מבוססת Python שמשמש כמרכז מרכזי, ומתקשר עם הקצה הקדמי באמצעות אירועים שנשלחים מהשרת (SSE).
  • ארכיטקטורה מבוססת-אירועים באמצעות Apache Kafka כדי להפריד את סוכן ה-AI ממערכת בקרת הלוויין, וכך לאפשר תקשורת גמישה ואסינכרונית.

מה תלמדו

טכנולוגיה / קונספט

תיאור

Google ADK (Agent Development Kit)

תשתמשו במסגרת הזו כדי לבנות, לבדוק ולתכנן סוכן AI מיוחד שמבוסס על מודלים של Gemini.

ארכיטקטורה מבוססת-אירועים (EDA)

תלמדו את העקרונות של בניית מערכת מנותקת שבה הרכיבים מתקשרים באופן אסינכרוני באמצעות אירועים, מה שהופך את האפליקציה לעמידה יותר וניתנת להרחבה.

Apache Kafka

תגדירו את Kafka כפלטפורמה מבוזרת להזרמת אירועים ותשתמשו בה כדי לנהל את זרימת הפקודות והנתונים בין מיקרו-שירותים שונים.

אירועים שנשלחים מהשרת (SSE)

תטמיעו SSE בבק-אנד של FastAPI כדי לשלוח נתוני טלמטריה בזמן אמת מהשרת לפרונט-אנד של React, וכך ממשק המשתמש יתעדכן כל הזמן.

פרוטוקול A2A (Agent-to-Agent)

תלמדו איך לעטוף את הסוכן בשרת A2A, כדי לאפשר תקשורת סטנדרטית ויכולת פעולה הדדית בתוך מערכת אקולוגית גדולה יותר של סוכנים.

FastAPI

תבנו את שירות הליבה לקצה העורפי, תחנת הלוויין, באמצעות מסגרת האינטרנט הזו של Python עם ביצועים גבוהים.

תגובות

תעבדו עם אפליקציית frontend מודרנית שמנויה לזרם SSE כדי ליצור ממשק משתמש דינמי ואינטראקטיבי.

AI גנרטיבי בבקרה על המערכת

תראו איך אפשר להנחות מודל שפה גדול (LLM) לבצע משימות ספציפיות שמבוססות על נתונים (כמו יצירת קואורדינטות), ולא רק שיחה בצ'אט.

2. הגדרת הסביבה

גישה ל-Cloud Shell

‫👈 לוחצים על 'הפעלת Cloud Shell' בחלק העליון של מסוף Google Cloud (זהו סמל הטרמינל בחלק העליון של חלונית Cloud Shell), cloud-shell.png

‫👈 לוחצים על הלחצן 'פתיחת הכלי לעריכה' (הוא נראה כמו תיקייה פתוחה עם עיפרון). ייפתח חלון עם Cloud Shell Code Editor. בצד ימין יופיע סייר הקבצים. open-editor.png

‫👈פותחים את הטרמינל בסביבת הפיתוח המשולבת (IDE) בענן,

03-05-new-terminal.png

‫👈💻 בטרמינל, מוודאים שכבר עברתם אימות ושהפרויקט מוגדר למזהה הפרויקט שלכם באמצעות הפקודה הבאה:

gcloud auth list

החשבון שלכם אמור להופיע ברשימה כ-(ACTIVE).

דרישות מוקדמות

‫ℹ️ רמה 0 היא אופציונלית (אבל מומלצת)

אפשר להשלים את המשימה הזו בלי להגיע לרמה 0, אבל אם משלימים אותה קודם, מקבלים חוויה סוחפת יותר, שמאפשרת לראות את האור של המשואה במפה הגלובלית בזמן ההתקדמות.

הגדרת סביבת הפרויקט

חוזרים לטרמינל, מגדירים את הפרויקט הפעיל ומפעילים את שירותי Google Cloud הנדרשים (Cloud Run,‏ Vertex AI וכו') כדי להשלים את ההגדרה.

‫👈💻 בטרמינל, מגדירים את מזהה הפרויקט:

gcloud config set project $(cat ~/project_id.txt) --quiet

‫👈💻 הפעלת השירותים הנדרשים:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com

התקנת יחסי תלות

‫👈💻 עוברים לרמה 5 ומתקינים את חבילות Python הנדרשות:

cd $HOME/way-back-home/level_5
uv sync

יחסי התלות העיקריים הם:

חבילה

מטרה

fastapi

‫High-performance web framework for the Satellite Station and SSE streaming

uvicorn

נדרש שרת ASGI כדי להריץ את אפליקציית FastAPI

google-adk

ערכת פיתוח הסוכן ששימשה לבניית סוכן Formation

a2a-sdk

ספריית פרוטוקולים של Agent-to-Agent לתקשורת סטנדרטית

aiokafka

לקוח Kafka אסינכרוני ל-Event Loop

google-genai

לקוח מקורי לגישה למודלים של Gemini

numpy

מתמטיקה וקטורית וחישובים של קואורדינטות לסימולציה

websockets

תמיכה בתקשורת דו-כיוונית בזמן אמת

python-dotenv

ניהול משתני סביבה וסודות תצורה

sse-starlette

טיפול יעיל באירועים שנשלחים מהשרת (SSE)

requests

ספריית HTTP פשוטה לקריאות ל-API חיצוני

אימות ההגדרה

לפני שנתחיל לכתוב קוד, נבדוק שכל המערכות פועלות. מריצים את סקריפט האימות כדי לבצע ביקורת בפרויקט Google Cloud, בממשקי ה-API ובתלות של Python.

‫👈💻 מריצים את סקריפט האימות:

source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh

‫👀 אמורים להופיע כמה סימני וי ירוקים (✅).

  • אם מופיעים סימני X אדומים (❌), פועלים לפי פקודות התיקון שמופיעות בפלט (לדוגמה, ‫gcloud services enable ... או pip install ...).
  • הערה: בשלב הזה, אפשר להתעלם מאזהרה צהובה לגבי .env. ניצור את הקובץ הזה בשלב הבא.
🚀 Verifying Mission Charlie (Level 5) Infrastructure...

✅ Google Cloud Project: xxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

3. עיצוב מיקומים ב-Pod באמצעות LLM

אנחנו צריכים לבנות את ה "מוח" של מבצע החילוץ שלנו. זה יהיה סוכן שנוצר באמצעות Google ADK (ערכת פיתוח סוכנים). המטרה היחידה שלו היא לשמש ככלי ניווט גיאומטרי ייעודי. מודלים סטנדרטיים של LLM אוהבים לשוחח, אבל בחלל העמוק אנחנו צריכים נתונים, לא דיאלוג. אנחנו נתכנת את הסוכן הזה כך שיקבל פקודה כמו Star ויחזיר קואורדינטות JSON גולמיות ל-15 התרמילים שלנו.

סוכן

יצירת תשתית לסוכן

‫👈💻 מריצים את הפקודות הבאות כדי לעבור לספריית הסוכן ולהפעיל את האשף ליצירת ADK:

cd $HOME/way-back-home/level_5/agent
uv run adk create formation

יופעל אשף הגדרה אינטראקטיבי ב-CLI. משתמשים בתשובות הבאות כדי להגדיר את הסוכן:

  1. בחירת מודל: בוחרים באפשרות 1 (Gemini Flash).
    • הערה: יכול להיות שהגרסה הספציפית תהיה שונה. תמיד בוחרים בגרסת Flash כדי להגביר את המהירות.
  2. בוחרים בקצה העורפי: בוחרים באפשרות 2 (Vertex AI).
  3. הזנת מזהה פרויקט ב-Google Cloud: מקישים על Enter כדי לאשר את ברירת המחדל (שזוהתה מהסביבה).
  4. הזנת אזור ב-Google Cloud: מקישים על Enter כדי לאשר את ברירת המחדל (us-central1).

‫👀 האינטראקציה שלכם עם הטרמינל אמורה להיראות כך:

(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_5/agent/formation:
- .env
- __init__.py
- agent.py

אמורה להופיע Agent created הודעה על סיום הפעולה. כך נוצר קוד בסיסי שאותו נשנה עכשיו.

‫👉✏️ מנווטים לקובץ $HOME/way-back-home/level_5/agent/formation/agent.py שנוצר ופותחים אותו בעורך. מחליפים את כל התוכן של הקובץ בקוד שלמטה. הפעולה הזו מעדכנת את שם הסוכן ומספקת את הפרמטרים התפעוליים המדויקים שלו.

import os
from google.adk.agents import Agent

root_agent = Agent(
    name="formation_agent",
    model="gemini-2.5-flash",
    instruction="""
    You are the **Formation Controller AI**.
    Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.

    ### FIELD SPECIFICATIONS
    - **Canvas Size**: 800px (width) x 600px (height).
    - **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
    - **Center Point**: x=400, y=300 (Use this as the origin for shapes).
    - **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.

    ### FORMATION RULES
    When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
    1.  **CIRCLE**: Evenly spaced around a center point (R=200).
    2.  **STAR**: 5 points or a star-like distribution.
    3.  **X**: A large X crossing the screen.
    4.  **LINE**: A horizontal line across the middle.
    5.  **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
    6.  **RANDOM**: Scatter randomly within safe bounds.
    7.  **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.

    ### OUTPUT FORMAT
    You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
    Refuse to answer non-formation questions.

    **JSON Structure**:
    ```json
    [
        {"x": 400, "y": 300},
        {"x": 420, "y": 300},
        ... (15 total items)
    ]
    ```
    """
)
  • דיוק גיאומטרי: הגדרת 'גודל בד הציור' ו'שוליים בטוחים' בהנחיית המערכת מבטיחה שהסוכן לא יציב את התרמילים מחוץ למסך או מתחת לרכיבי ממשק המשתמש.
  • אכיפת JSON: באמצעות ההנחיה ל-LLM ‏"Refuse to answer non-formation questions" (סירוב לענות על שאלות שלא קשורות לפורמט) וההנחיה "No preamble" (ללא הקדמה), אנחנו מוודאים שהקוד במורד הזרם (הלוויין) לא יקרוס כשהוא ינסה לנתח את התשובה.
  • לוגיקה מופרדת: הסוכן הזה עדיין לא יודע על Kafka. הוא יודע רק לעשות מתמטיקה. בשלב הבא, נכניס את ה-Brain הזה לשרת Kafka.

בדיקת הסוכן באופן מקומי

לפני שמחברים את הסוכן ל'מערכת העצבים' של Kafka, צריך לוודא שהוא פועל בצורה תקינה. אתם יכולים ליצור אינטראקציה עם הסוכן ישירות במסוף כדי לוודא שהוא יוצר קואורדינטות JSON תקינות.

‫👉💻 משתמשים בפקודה adk run כדי להתחיל שיחת צ'אט עם הסוכן.

cd $HOME/way-back-home/level_5/agent
uv run adk run formation
  1. קלט: מקלידים Circle ומקישים על Enter.
    • קריטריונים להצלחה: אמורה להופיע רשימת JSON גולמית (לדוגמה, [{"x": 400, "y": 200}, ...]). מוודאים שאין טקסט בפורמט Markdown כמו 'Here are the coordinates:' (אלה הקואורדינטות:) לפני ה-JSON.
  2. קלט: מקלידים Line ומקישים על Enter.
    • קריטריון להצלחה: מוודאים שהקואורדינטות יוצרות קו אופקי (ערכי ה-y צריכים להיות דומים).

אחרי שמוודאים שהסוכן מוציא JSON נקי, אפשר לעטוף אותו בשרת Kafka.

‫👈💻 מקישים על Ctrl+C כדי לצאת.

4. יצירת שרת A2A לסוכן ההרכבה

הסבר על A2A (נציג לנציג)

A2A (Agent-to-Agent) protocol הוא תקן פתוח שנועד לאפשר פעולה הדדית חלקה בין סוכני AI. המסגרת הזו מאפשרת לסוכנים לבצע יותר מסתם החלפת טקסט פשוטה, והם יכולים להקצות משימות, לתאם פעולות מורכבות ולפעול כיחידה מגובשת כדי להשיג יעדים משותפים בסביבה עסקית מבוזרת.

A2A

הסבר על פרוטוקולי A2A: ‏ HTTP,‏ gRPC ו-Kafka

פרוטוקול A2A מציע שני אופנים שונים לתקשורת בין לקוחות לסוכנים, וכל אחד מהם מיועד לצרכים ארכיטקטוניים שונים. ‫HTTP (JSON-RPC) הוא ברירת המחדל, תקן אוניברסלי שפועל בכל סביבות האינטרנט. gRPC היא האפשרות שלנו לביצועים גבוהים, שממנפת את Protocol Buffers לתקשורת יעילה עם הקלדה קפדנית. בנוסף, במעבדה אני מספקת גם העברה של Kafka. זוהי הטמעה מותאמת אישית שמיועדת לארכיטקטורות חזקות מבוססות-אירועים, שבהן הפרדה בין מערכות היא בראש סדר העדיפויות.

תחבורה

מתחת לפני השטח, פרוטוקולי התעבורה האלה מטפלים בזרימת הנתונים בצורה שונה למדי. במודל HTTP, הלקוח שולח בקשת JSON ומשאיר את החיבור פתוח, ומחכה שהסוכן יסיים את המשימה ויחזיר את התוצאה המלאה בבת אחת. פרוטוקול gRPC מבצע אופטימיזציה של התהליך הזה באמצעות נתונים בינאריים ו-HTTP/2, ומאפשר גם מחזורי בקשה-תגובה פשוטים וגם סטרימינג בזמן אמת, שבו הסוכן שולח עדכונים (כמו 'מחשבה' או 'נוצר ארטיפקט') כשהם מתרחשים. ההטמעה של Kafka פועלת באופן אסינכרוני: הלקוח מפרסם בקשה ב'נושא בקשה' עמיד מאוד ומאזין ל'נושא תשובה' נפרד. השרת אוסף את ההודעה כשהוא יכול, מעבד אותה ומפרסם את התוצאה בחזרה, כלומר שני הצדדים אף פעם לא מתקשרים ישירות זה עם זה.

הבחירה תלויה בדרישות הספציפיות שלכם לגבי מהירות, מורכבות והתמדה. הכי קל להתחיל לעבוד עם HTTP ולבצע בו ניפוי באגים, ולכן הוא מושלם לשילובים פשוטים. gRPC היא הבחירה הטובה ביותר לתקשורת פנימית בין שירותים, שבה זמן אחזור נמוך ועדכונים שוטפים של משימות הם קריטיים. עם זאת, קפקא היא הבחירה העמידה ביותר, כי היא מאחסנת בקשות בדיסק בתור, והמשימות שלכם שורדות גם אם שרת הסוכן קורס או מופעל מחדש. היא מספקת רמה של עמידות וניתוק ש-HTTP ו-gRPC לא יכולות לספק.

שכבת תעבורה בהתאמה אישית: Kafka

‫Kafka משמשת כעמוד השדרה האסינכרוני שמפריד בין המוח של הפעולה (Formation Agent) לבין אמצעי הבקרה הפיזיים (תחנת הלוויין). במקום לחייב את המערכת להמתין לחיבור סינכרוני בזמן שהסוכן מחשב וקטורים מורכבים, הסוכן מפרסם את התוצאות שלו כאירועים בנושא Kafka. המאגר הזה פועל כמאגר זמני קבוע, ומאפשר למערכת הלוויין לעבד את ההוראות בקצב שלה. כך נתוני המיקום לא הולכים לאיבוד, גם אם יש השהיה משמעותית ברשת או קריסת מערכת זמנית.

באמצעות Kafka, אתם יכולים להפוך תהליך לינארי איטי לצינור סטרימינג גמיש שבו ההוראות והטלמטריה זורמים באופן עצמאי, כך שה-HUD של המשימה ימשיך להגיב גם במהלך עיבוד אינטנסיבי של AI.

Kafka

מה זה Kafka?

קפקא היא פלטפורמה מבוזרת להזרמת אירועים. בארכיטקטורה מבוססת-אירועים (EDA):

  1. יצרנים מפרסמים הודעות ב'נושאים'.
  2. צרכנים נרשמים לנושאים האלה ומגיבים כשמתקבלת הודעה.

למה כדאי להשתמש ב-Kafka?

הוא מפריד בין המערכות. הסוכן Formation פועל באופן אוטונומי ומחכה לבקשות נכנסות בלי לדעת את הזהות או הסטטוס של השולח. כך האחריות מופרדת, וגם אם הלוויין עובר למצב אופליין, תהליך העבודה נשאר ללא שינוי. קפקא פשוט מאחסן את ההודעות עד שהלוויין מתחבר מחדש.

מה לגבי Google Cloud Pub/Sub?

בהחלט אפשר להשתמש ב-Google Cloud Pub/Sub למטרה הזו. ‫Pub/Sub הוא שירות העברת הודעות ללא שרתים של Google. ‫Kafka מצוין לסטרימינג עם נפח נתונים גבוה ולסטרימינג שניתן להפעלה מחדש, אבל Pub/Sub מועדף בדרך כלל בגלל קלות השימוש בו. בשיעור ה-Lab הזה אנחנו משתמשים ב-Kafka כדי לדמות אוטובוס הודעות חזק ומתמשך.

הפעלת אשכול Kafka מקומי

מעתיקים את כל בלוק הפקודות שבהמשך ומדביקים אותו בטרמינל. תתבצע הורדה של תמונת Kafka הרשמית והיא תופעל ברקע.

‫👈💻 מריצים את הפקודות הבאות בטרמינל:

# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5

# Run the Kafka container in detached mode
docker run -d \
  --name mission-kafka \
  -p 9092:9092 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:4.2.0-rc1

‫👈💻 מוודאים שהקונטיינר פועל באמצעות הפקודה docker ps.

docker ps

👀 אמור להופיע פלט שמאשר שהקונטיינר mission-kafka פועל והיציאה 9092 חשופה.

CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                               NAMES
c1a2b3c4d5e6   apache/kafka:4.2.0-rc1    "/opt/kafka/bin/kafka..."   15 seconds ago   Up 14 seconds   0.0.0.0:9092->9092/tcp, 9093/tcp   mission-kafka

מה זה נושא ב-Kafka?

אפשר לחשוב על נושא ב-Kafka כעל ערוץ או קטגוריה ייעודיים להודעות. זה כמו יומן שבו רשומות של אירועים מאוחסנות לפי הסדר שבו הן נוצרו. מפיקים כותבים הודעות לנושאים ספציפיים, וצרכנים קוראים את ההודעות מהנושאים האלה. כך השולח לא תלוי במקבל. המפיק לא צריך לדעת איזה צרכן יקרא את הנתונים, הוא רק צריך לשלוח אותם ל"ערוץ" הנכון. במשימה שלנו, ניצור שני נושאים: אחד לשליחת בקשות ליצירת תמונה לסוכן, ואחד לסוכן לפרסום התשובות שלו כדי שהלוויין יוכל לקרוא אותן.

Kafka

‫👉💻 מריצים את הפקודות הבאות כדי ליצור את הנושאים הנדרשים בתוך קונטיינר Docker הפועל.

# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-formation-request \
  --bootstrap-server 127.0.0.1:9092

# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --create \
  --topic a2a-reply-satellite-dashboard \
  --bootstrap-server 127.0.0.1:9092

‫👈💻 כדי לוודא שהערוצים פתוחים, מריצים את פקודת הרשימה:

docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
  --list \
  --bootstrap-server 127.0.0.1:9092

👀 השמות של הנושאים שיצרתם אמורים להופיע.

a2a-formation-request
a2a-reply-satellite-dashboard

הגדרת מופע Kafka הושלמה ועכשיו הוא מוכן לניתוב נתונים קריטיים.

הטמעה של שרת Kafka A2A

פרוטוקול Agent-to-Agent (A2A)‎ יוצר מסגרת סטנדרטית להפעלת יכולת פעולה הדדית בין מערכות אג'נטיות עצמאיות. הוא מאפשר לסוכנים שפותחו על ידי צוותים שונים או שפועלים בתשתיות שונות לגלות אחד את השני ולשתף פעולה ביעילות, בלי לדרוש לוגיקת שילוב מותאמת אישית לכל חיבור.

הטמעת ההפניה, a2a-python, היא ספרייה בסיסית להרצת אפליקציות מבוססות-סוכנים. תכונה מרכזית בעיצוב שלו היא הרחבה. הוא מבצע הפשטה של שכבת התקשורת, ומאפשר למפתחים להחליף פרוטוקולים כמו HTTP בפרוטוקולים אחרים.

תהליך העבודה של A2A

בפרויקט הזה, אנחנו משתמשים ביכולת ההרחבה הזו באמצעות הטמעה מותאמת אישית של Kafka: ‏ a2a-python-kafka. נשתמש בהטמעה הזו כדי להדגים איך תקן A2A מאפשר להתאים את התקשורת בין הסוכנים לצרכים שונים של ארכיטקטורה – במקרה הזה, החלפת HTTP סינכרוני באוטובוס אירועים אסינכרוני.

הפעלת A2A עבור סוכן התצורה

עכשיו נשתמש בשרת A2A כדי לעטוף את הסוכן שלנו, ולהפוך אותו לשירות שניתן להפעלה הדדית, שיכול:

  • האזנה למשימות מנושא Kafka.
  • העברת משימות שהתקבלו לסוכן ADK הבסיסי לצורך עיבוד.
  • פרסום התוצאה בנושא של תשובה.

‫👈✏️ ב-$HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py, מחליפים את #REPLACE-CREATE-KAFKA-A2A-SERVER בקוד הבא:

async def create_kafka_server(
    agent: BaseAgent,
    *,
    bootstrap_servers: str | List[str] = "localhost:9092",
    request_topic: str = "a2a-formation-request",
    consumer_group_id: str = "a2a-agent-group",
    agent_card: Optional[Union[AgentCard, str]] = None,
    runner: Optional[Runner] = None,
    **kafka_config: Any,
) -> KafkaServerApp:
  """Convert an ADK agent to a A2A Kafka Server application.
  Args:
      agent: The ADK agent to convert
      bootstrap_servers: Kafka bootstrap servers.
      request_topic: Topic to consume requests from.
      consumer_group_id: Consumer group ID for the server.
      agent_card: Optional pre-built AgentCard object or path to agent card
                  JSON. If not provided, will be built automatically from the
                  agent.
      runner: Optional pre-built Runner object. If not provided, a default
              runner will be created using in-memory services.
      **kafka_config: Additional Kafka configuration.

  Returns:
      A KafkaServerApp that can be run with .run() or .start()
  """
  # Set up ADK logging
  adk_logger = logging.getLogger("google_adk")
  adk_logger.setLevel(logging.INFO)

  async def create_runner() -> Runner:
    """Create a runner for the agent."""
    return Runner(
        app_name=agent.name or "adk_agent",
        agent=agent,
        # Use minimal services - in a real implementation these could be configured
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
        credential_service=InMemoryCredentialService(),
    )

  # Create A2A components
  task_store = InMemoryTaskStore()

  agent_executor = A2aAgentExecutor(
      runner=runner or create_runner,
  )
  
  # Initialize logic handler
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  
  logic_handler = DefaultRequestHandler(
      agent_executor=agent_executor, task_store=task_store
  )

  # Prepare Agent Card
  rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
      
  # Create Kafka Server App
  server_app = KafkaServerApp(
      request_handler=logic_handler,
      bootstrap_servers=bootstrap_servers,
      request_topic=request_topic,
      consumer_group_id=consumer_group_id,
      **kafka_config
  )
  
  return server_app

הקוד הזה מגדיר את הרכיבים העיקריים:

  1. The Runner: מספק את זמן הריצה של הסוכן (טיפול בזיכרון, בפרטי הכניסה וכו').
  2. חנות המשימות: אפשר לעקוב אחרי הסטטוס של הבקשות כשהן עוברות מ'בהמתנה' ל'הושלם'.
  3. Agent Executor: מקבל משימה מ-Kafka ומעביר אותה לסוכן כדי לחשב קואורדינטות.
  4. KafkaServerApp: מנהל את החיבור הפיזי ל-Kafka broker.

A2A Kafka

הגדרת משתני סביבה

ההגדרה של ADK יצרה קובץ .env עם ההגדרות של Google Vertex AI בתוך תיקיית הסוכן. צריך להעביר את הקובץ הזה לרמה הבסיסית של הפרויקט ולהוסיף את הקואורדינטות של אשכול Kafka.

מריצים את הפקודות הבאות כדי להעתיק את הקובץ ולצרף את כתובת שרת Kafka:

cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env

# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env

# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env

אימות של A2A Interstellar Loop

עכשיו נבצע בדיקה פעילה כדי לוודא שלולאת האירועים האסינכרונית פועלת בצורה תקינה: נשלח אות ידני דרך אשכול Kafka ונבדוק את התגובה של הסוכן.

אימות של A2A Interstellar Loop

כדי לראות את מחזור החיים המלא של אירוע, נשתמש בשלושה מסופים נפרדים.

מסוף א': סוכן ההקמה (שרת A2A Kafka)

‫👈💻 הטרמינל הזה מריץ את תהליך Python שמקשיב ל-Kafka ומשתמש ב-Gemini כדי לבצע את החישובים הגיאומטריים.

cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh 

# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git

# Start the Agent Server
uv run agent/server.py

מחכים עד שמופיע:

[INFO] Kafka Server App Started. Starting to consume requests...

מסוף ב': המאזין הלווייני (צרכן)

‫👈💻 במסוף הזה, נקשיב לנושא התשובה. כך מדמים את הלוויין שממתין להוראות.

# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-reply-satellite-dashboard \
  --from-beginning \
  --property "print.headers=true"

הטרמינל הזה יופיע כלא פעיל. הוא ממתין שהנציג יפרסם הודעה.

מסוף ג': האות של המפקד (יוצר)

‫👉💻 עכשיו נשלח בקשה בפורמט A2A לנושא a2a-formation-request. אנחנו צריכים לכלול כותרות ספציפיות של Kafka כדי שהסוכן ידע לאן לשלוח את התשובה.

echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic a2a-formation-request \
  --property "parse.headers=true" \
  --property "headers.key.separator==" \
  --property "headers.delimiter=|"

ניתוח התוצאה

👀 אם הלולאה מצליחה, עוברים אל Terminal B. בלוק גדול של JSON אמור להופיע באופן מיידי. היא תתחיל עם הכותרת ששלחנו correlation_id:ping-manual-01. אחריו אובייקט task. אם תתבוננו מקרוב בקטע parts בקובץ ה-JSON, תוכלו לראות את הקואורדינטות הגולמיות X ו-Y ש-Gemini חישב עבור 15 התרמילים:

{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n  {\"x\": 400, \"y\": 150},\n  {\"x\": 257, \"y\": 254},\n  {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}

הפרדת בהצלחה את הסוכן מהמקלט. ה'רעש הבין-כוכבי' של זמן האחזור של בקשה-תגובה כבר לא משנה כי המערכת שלנו היא עכשיו מבוססת-אירועים לחלוטין.

לפני שממשיכים, צריך לעצור את תהליכי הרקע כדי לפנות יציאות ברשת.

‫👈💻 בכל טרמינל (A,‏ B ו-C):

  • מקישים על Ctrl + C כדי להפסיק את התהליך הפעיל.

5. תחנת הלוויין (לקוח A2A Kafka ו-SSE)

בשלב הזה, אנחנו בונים את תחנת הלוויין. זהו הגשר בין אשכול Kafka לבין התצוגה החזותית של הפיילוט (החלק הקדמי של React). השרת הזה פועל גם כלקוח Kafka (לתקשורת עם הסוכן) וגם כסטרימר SSE (לתקשורת עם הדפדפן).

מהו לקוח Kafka?

אפשר לחשוב על Kafka Cluster כתחנת רדיו. לקוח Kafka הוא מקלט הרדיו. האפליקציה KafkaClientTransport מאפשרת לאפליקציה שלנו:

  1. ליצור הודעה: לשלוח 'משימה' (למשל, "Star formation") to the Agent.
  2. צריכה של תשובה: האזנה ל'נושא תשובה' ספציפי כדי לקבל את הקואורדינטות בחזרה מהסוכן.

1. אתחול החיבור

אנחנו משתמשים ב-lifespan event handler של FastAPI כדי לוודא שחיבור ה-Kafka מתחיל כשהשרת מופעל ונסגר בצורה נקייה כשהוא מושבת.

‫👈✏️ ב-$HOME/way-back-home/level_5/satellite/main.py, מחליפים את #REPLACE-CONNECT-TO-KAFKA-CLUSTER בקוד הבא:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global kafka_transport
    logger.info("Initializing Kafka Client Transport...")
    
    bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
    request_topic = "a2a-formation-request"
    reply_topic = "a2a-reply-satellite-dashboard"
    
    # Create AgentCard for the Client
    client_card = AgentCard(
        name="SatelliteDashboard",
        description="Satellite Dashboard Client",
        version="1.0.0",
        url="https://example.com/satellite-dashboard",
        capabilities=AgentCapabilities(),
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        skills=[]
    )
    
    kafka_transport = KafkaClientTransport(
            agent_card=client_card,
            bootstrap_servers=bootstrap_server,
            request_topic=request_topic,
            reply_topic=reply_topic,
    )
    
    try:
        await kafka_transport.start()
        logger.info("Kafka Client Transport Started Successfully.")
    except Exception as e:
        logger.error(f"Failed to start Kafka Client: {e}")
        
    yield
    
    if kafka_transport:
        logger.info("Stopping Kafka Client Transport...")
        await kafka_transport.stop()
        logger.info("Kafka Client Transport Stopped.")

2. שליחת פקודה

כשלוחצים על לחצן בלוח הבקרה, מופעלת נקודת הקצה /formation. הוא פועל כמפיק, עוטף את הבקשה שלכם בMessage רשמי מסוג A2A ושולח אותה לסוכן.

היווצרות

הלוגיקה העיקרית:

  • תקשורת לא סנכרונית: kafka_transport.send_message שולח את הבקשה וממתין שהקואורדינטות החדשות יגיעו אל reply_topic.
  • ניתוח תשובות: יכול להיות ש-Gemini יחזיר קואורדינטות בתוך בלוקים של Markdown (למשל, json ... ). הקוד שלמטה מסיר את התווים האלה וממיר את המחרוזת לרשימה של נקודות ב-Python.

‫👈✏️ ב-$HOME/way-back-home/level_5/satellite/main.py, מחליפים את #REPLACE-FORMATION-REQUEST בקוד הבא:

@app.post("/formation")
async def set_formation(req: FormationRequest):
    global FORMATION, PODS
    FORMATION = req.formation
    logger.info(f"Received formation request: {FORMATION}")
    
    if not kafka_transport:
        logger.error("Kafka Transport is not initialized!")
        return {"status": "error", "message": "Backend Not Connected"}
    
    try:
        # Construct A2A Message
        prompt = f"Create a {FORMATION} formation"
        logger.info(f"Sending A2A Message: '{prompt}'")
        
        from a2a.types import TextPart, Part, Role
        import uuid
        
        msg_id = str(uuid.uuid4())
        message_parts = [Part(TextPart(text=prompt))]
        
        msg_obj = Message(
            message_id=msg_id,
            role=Role.user,
            parts=message_parts
        )
        
        message_params = MessageSendParams(
            message=msg_obj
        )
        
        # Send and Wait for Response
        ctx = ClientCallContext()
        ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
        response = await kafka_transport.send_message(message_params, context=ctx)
        
        logger.info("Received A2A Response.")
        
        content = None
        if isinstance(response, Message):
            content = response.parts[0].root.text if response.parts else None
        elif isinstance(response, Task):
            if response.artifacts and response.artifacts[0].parts:
                content = response.artifacts[0].parts[0].root.text

        if content:
            logger.info(f"Response Content: {content[:100]}...")
            try:
                clean_content = content.replace("```json", "").replace("```", "").strip()
                coords = json.loads(clean_content)
                
                if isinstance(coords, list):
                    logger.info(f"Parsed {len(coords)} coordinates.")
                    for i, pod_target in enumerate(coords):
                        if i < len(PODS):
                            PODS[i]["x"] = pod_target["x"]
                            PODS[i]["y"] = pod_target["y"]
                    return {"status": "success", "formation": FORMATION}
                else:
                    logger.error("Response JSON is not a list.")
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse Agent JSON response: {e}")
        else:
            logger.error(f"Could not extract content from response type {type(response)}")

    except Exception as e:
        logger.error(f"Error calling agent via Kafka: {e}")
        return {"status": "error", "message": str(e)}

אירועים שנשלחים מהשרת (SSE)

ממשקי API רגילים משתמשים במודל 'בקשה-תגובה'. כדי להציג את ה-HUD, אנחנו צריכים 'שידור חי' של מיקומי התרמילים.

למה SSE בניגוד ל-WebSockets (שהם דו-כיווניים ומורכבים יותר), SSE מספק זרם נתונים פשוט וחד-כיווני מהשרת לדפדפן. הוא מושלם ללוחות בקרה, לטיקרים של מניות או לטלמטריה בין-כוכבית.

SSE

איך זה עובד בקוד שלנו: אנחנו יוצרים event_generator, לולאה אינסופית שלוקחת את המיקום הנוכחי של כל 15 התרמילים כל חצי שנייה ו "דוחפת" אותם לדפדפן כעדכון.

‫👈✏️ ב-$HOME/way-back-home/level_5/satellite/main.py, מחליפים את #REPLACE-SSE-STREAM בקוד הבא:

@app.get("/stream")
async def message_stream(request: Request):
    async def event_generator():
        logger.info("New SSE stream connected")
        try:
            while True:
                current_pods = list(PODS) 
                
                # Send updates one by one to simulate low-bandwidth scanning
                for pod in current_pods:
                     payload = {"pod": pod}
                     yield {
                         "event": "pod_update",
                         "data": json.dumps(payload)
                     }
                     await asyncio.sleep(0.02)
                
                # Send formation info occasionally
                yield {
                    "event": "formation_update",
                    "data": json.dumps({"formation": FORMATION})
                }
                
                # Main loop delay
                await asyncio.sleep(0.5)
                
        except asyncio.CancelledError:
             logger.info("SSE stream disconnected (cancelled)")
        except Exception as e:
             logger.error(f"SSE stream error: {e}")
             
    return EventSourceResponse(event_generator())

ביצוע של Full Mission Loop

לפני השקת ממשק המשתמש הסופי, נבדוק שהמערכת פועלת מקצה לקצה. נפעיל את הסוכן באופן ידני ונראה את מטען הנתונים הגולמיים בחיבור.

אימות

פותחים שלוש כרטיסיות נפרדות של Terminal.

מסוף א': סוכן ההקמה (שרת A2A)

‫👈💻 זהו סוכן ADK שמקשיב למשימות ומבצע את החישובים הגיאומטריים.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Agent Server
uv run agent/server.py

מסוף ב': תחנת הלוויין (לקוח Kafka)

‫👈💻 שרת FastAPI הזה פועל כ'מקלט', שמקשיב לתשובות של Kafka והופך אותן לסטרימינג בשידור חי של SSE.

cd $HOME/way-back-home/level_5

# Start the Satellite Station
uv run satellite/main.py

מסוף C: תצוגת ה-HUD הידנית

שליחת פקודת יצירה (טריגר): 👉💻 באותו מסוף ג', מפעילים את תהליך היצירה:

# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
     -H "Content-Type: application/json" \
     -d '{"formation": "STAR"}'

‫👀 הקואורדינטות החדשות אמורות להופיע.

INFO:satellite.main:Received formation request: STAR
INFO:satellite.main:Sending A2A Message: 'Create a STAR formation'
INFO:satellite.main:Received A2A Response.
INFO:satellite.main:Response Content: ```json ...
INFO:satellite.main:Parsed 15 coordinates.

כך מוודאים שהלוויין עדכן את הקואורדינטות הפנימיות של ה-Pod.

‫👈💻 נשתמש ב-curl כדי להאזין קודם לזרם הטלמטריה בזמן אמת ואז להפעיל שינוי במערך.

# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream

‫👀 צופים בפלט של הפקודה curl -N. הקואורדינטות x ו-y באירועי pod_update יתחילו לשקף את המיקומים החדשים של מערך הכוכבים.

לפני שממשיכים, צריך לעצור את כל התהליכים הפעילים כדי לפנות את יציאות התקשורת.

בכל טרמינל (A,‏ B,‏ C והטרמינל של הטריגר): לוחצים על Ctrl + C.

6. יוצאים למבצע הצלה!

הקמת את המערכת בהצלחה. עכשיו הגיע הזמן להפיח חיים במשימה. עכשיו נפעיל את התצוגה העילית (HUD) שמבוססת על React. לוח הבקרה הזה מתחבר לתחנת הלוויין באמצעות SSE, ומאפשר לכם לראות את 15 התרמילים בזמן אמת.

סקירה כללית

כשאתם מזינים פקודה, אתם לא רק קוראים לפונקציה, אלא מפעילים אירוע שעובר דרך Kafka, מעובד על ידי סוכן AI ומוזרם בחזרה למסך שלכם כטלמטריה בזמן אמת.

אימות

פותחים שתי כרטיסיות נפרדות של טרמינל.

מסוף א': סוכן ההקמה (שרת A2A)

‫👈💻 זהו סוכן ה-ADK שמקשיב למשימות ומבצע חישובים גיאומטריים באמצעות Gemini. בטרמינל, מריצים את הפקודה:

cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py

טרמינל B: תחנת הלוויין ולוח הבקרה החזותי

‫👈💻 קודם כול, בונים את אפליקציית החזית.

cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build

‫👈💻 עכשיו מפעילים את שרת FastAPI, שישרת גם את לוגיקת ה-Backend וגם את ממשק המשתמש של ה-Frontend.

cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh 
# Start the Satellite Station
uv run satellite/main.py

השקה ואימות

  1. 👉 פתיחת התצוגה המקדימה: בסרגל הכלים של Cloud Shell, לוחצים על סמל התצוגה המקדימה של האינטרנט. בוחרים באפשרות שינוי היציאה, מגדירים אותה ל-8000 ולוחצים על שינוי ותצוגה מקדימה. תיפתח כרטיסייה חדשה בדפדפן עם ה-HUD של Starfield. *תצוגה מקדימה של אתר
  2. 👉 אימות של Telemetry Stream:
    • אחרי שהממשק יטען, אמורים להופיע 15 תאים בפיזור אקראי.
    • אם הפודים מהבהבים או רוטטים קלות, זרם ה-SSE פעיל, ותחנת הלוויין משדרת את המיקומים שלהם בהצלחה. התחלה
  3. 👉 מתחילים יצירת תצורה: לוחצים על הלחצן STAR בלוח הבקרה. כוכב
  4. 👀 מעקב אחרי לולאת האירועים: צופים במסופים כדי לראות את הארכיטקטורה בפעולה:
    • בטרמינל ב' (תחנת לוויין) יירשם: Sending A2A Message: 'Create a STAR formation'.
    • ב-Terminal A (Formation Agent) תוצג הפעילות בזמן שהסוכן מתייעץ עם Gemini.
    • מסוף B (תחנת לוויין) ירשום: Received A2A Response וינתח את הקואורדינטות.
  5. 👀 אישור ויזואלי: צופים ב-15 התרמילים בלוח הבקרה מחליקים בצורה חלקה מהמיקומים האקראיים שלהם אל מבנה של כוכב בעל 5 קצוות.
  6. 👉 ניסוי:
    • כדי לנסות 3 מערכים שונים, אפשר להשתמש ב-X או ב-LINE. X
    • כוונת רכישה מותאמת אישית: משתמשים בהזנה ידנית כדי להקליד משהו ייחודי, כמו "לב" או "משולש". מעגל
    • מכיוון שאתם משתמשים ב-AI גנרטיבי, הסוכן ינסה לחשב את המתמטיקה של כל צורה גיאומטרית שתוכלו לתאר!

אחרי שיוצרים 3 דפוסים, החיבור נוצר מחדש בהצלחה. סיום

המשימה הושלמה!

הזרם מתייצב כשהנתונים עוברים דרך הרעש ללא הפרעה. בהוראתכם, 15 התרמילים העתיקים מתחילים לרקוד בסנכרון בין הכוכבים.

סיום

במהלך שלושה שלבי כיול מתישים, צפיתם בטלמטריה מתייצבת במקום. בכל יישור, האות התחזק, ולבסוף חדר את ההפרעות הבין-כוכביות כמו משואה של תקווה.

תודה לך על ההטמעה המדהימה של הסוכן מבוסס-האירועים. חמשת הניצולים חולצו מפני השטח של X-42 והם בטוחים עכשיו על סיפון כלי החילוץ. הודות לך, חמישה אנשים ניצלו.

אם השתתפתם ברמה 0, אל תשכחו לבדוק איפה אתם בתהליך ההתקדמות במשימה 'הדרך הביתה'! המסע שלכם חזרה אל הכוכבים נמשך.סופי