1. المهمة

أنت تائه في صمت قطاع مجهول. مزّقت نبضة شمسية هائلة سفينتك عبر صدع، ما أدى إلى تقطُّع السبل بك في جزء من الكون غير موجود في أي خريطة نجمية.
بعد أيام من أعمال الإصلاح الشاقة، تشعر أخيرًا بدوي المحركات تحت قدميك. تم إصلاح سفينة الصواريخ. لقد تمكّنت حتى من تأمين وصلة صاعدة بعيدة المدى إلى السفينة الأم. يمكنك المغادرة. يمكنك الآن العودة إلى المنزل.
ولكن بينما تستعد لتشغيل محرك الأقراص، يخترق إشارة استغاثة التشويش. تلتقط أجهزة الاستشعار إشارة طلب المساعدة. خمسة مدنيين محاصرون على سطح الكوكب X-42. أملهم الوحيد في الهروب يكمن في 15 مركبة فضائية قديمة يجب مزامنتها لإرسال إشارة استغاثة إلى سفينتهم الأم في المدار.
ومع ذلك، يتم التحكّم في المركبات من خلال محطة فضائية تالفة. تتجوّل المركبات الفضائية بلا هدف. تمكّنا من إنشاء اتصال خلفي بالقمر الصناعي، ولكن يعاني الربط الصاعد من تداخل شديد بين النجوم، ما يؤدي إلى تأخير كبير في دورات الطلب والاستجابة.
التحدّي
بما أنّ نموذج الطلب/الردّ بطيء جدًا، نحتاج إلى نشر بنية مستندة إلى الأحداث (EDA) مع أحداث يتم إرسالها من الخادم (SSE) لبث بيانات القياس عن بُعد من خلال الضوضاء.

عليك إنشاء وكيل مخصّص يمكنه إجراء العمليات الحسابية المعقّدة على المتّجهات اللازمة لإجبار الأجهزة على اتّخاذ أشكال محدّدة لتعزيز الإشارة (دائرة، نجمة، خط). يجب ربط هذا الوكيل بالبنية الجديدة للقمر الصناعي.
ما ستنشئه

- شاشة عرض أمامية (HUD) مستندة إلى React لتصوّر أسطول من 15 مركبة والتحكّم فيه في الوقت الفعلي
- وكيل ذكاء اصطناعي توليدي يستخدم حزمة Google Agent Development Kit (ADK) لحساب التكوينات الهندسية المعقّدة للكبسولات استنادًا إلى أوامر اللغة الطبيعية
- خادم خلفي لمحطة Satellite Station مستند إلى لغة Python يعمل كمركز رئيسي ويتواصل مع الواجهة الأمامية من خلال أحداث مرسَلة من الخادم (SSE).
- بنية مستندة إلى الأحداث باستخدام Apache Kafka لفصل وكيل الذكاء الاصطناعي عن نظام التحكّم في الأقمار الصناعية، ما يتيح التواصل المرن وغير المتزامن
ما ستتعلمه
التكنولوجيا / المفهوم | الوصف |
Google ADK (Agent Development Kit) | ستستخدم هذا الإطار لإنشاء واختبار وتصميم وكيل متخصص مستند إلى الذكاء الاصطناعي ومزوّد بنماذج Gemini. |
بنية قائمة على الأحداث (EDA) | ستتعرّف على مبادئ إنشاء نظام منفصل تتواصل فيه المكوّنات بشكل غير متزامن من خلال الأحداث، ما يجعل التطبيق أكثر مرونة وقابلية للتوسّع. |
Apache Kafka | ستعمل على إعداد Kafka واستخدامه كمنصّة موزّعة لبث الأحداث من أجل إدارة تدفّق الأوامر والبيانات بين الخدمات المصغّرة المختلفة. |
أحداث يتم إرسالها من الخادم (SSE) | ستنفّذ SSE في خادم FastAPI الخلفي لإرسال بيانات القياس عن بُعد في الوقت الفعلي من الخادم إلى واجهة React الأمامية، ما يضمن تعديل واجهة المستخدم باستمرار. |
بروتوكول A2A (من وكيل إلى وكيل) | ستتعرّف على كيفية تضمين وكيلك في خادم A2A، ما يتيح التواصل الموحّد وإمكانية التشغيل التفاعلي ضمن منظومة متكاملة أكبر للوكلاء. |
FastAPI | ستنشئ خدمة الخلفية الأساسية، أي "محطة الأقمار الصناعية"، باستخدام إطار عمل الويب عالي الأداء هذا المستند إلى Python. |
التفاعل | ستعمل على تطبيق حديث للواجهة الأمامية يشترك في بث SSE لإنشاء واجهة مستخدم ديناميكية وتفاعلية. |
الذكاء الاصطناعي التوليدي في "التحكّم بالنظام" | ستتعرّف على كيفية توجيه طلب إلى نموذج لغوي كبير (LLM) لتنفيذ مهام محدّدة تستند إلى البيانات (مثل إنشاء إحداثيات) بدلاً من مجرد إجراء محادثة. |
2. إعداد البيئة
الوصول إلى Cloud Shell
👉انقر على "تفعيل Cloud Shell" في أعلى "وحدة تحكّم Google Cloud" (رمز شكل الوحدة الطرفية في أعلى لوحة Cloud Shell)، 
👉انقر على الزر "فتح المحرّر" (يبدو كملف مفتوح مع قلم رصاص). سيؤدي ذلك إلى فتح "محرِّر Cloud Shell" في النافذة. سيظهر لك مستكشف الملفات على الجانب الأيمن. 
👉افتح المحطة الطرفية في بيئة التطوير المتكاملة المستندة إلى السحابة الإلكترونية.

👉💻 في نافذة الوحدة الطرفية، تأكَّد من أنّك قد أثبتّ هويتك وأنّ المشروع مضبوط على رقم تعريف مشروعك باستخدام الأمر التالي:
gcloud auth list
من المفترض أن يظهر حسابك على أنّه (ACTIVE).
المتطلبات الأساسية
ℹ️ المستوى 0 اختياري (ولكن يُنصح به)
يمكنك إكمال هذه المهمة بدون المستوى 0، ولكنّ إكمالها أولاً يوفّر لك تجربة أكثر تفاعلية، ما يتيح لك رؤية ضوء جهاز التتبّع يضيء على الخريطة العالمية أثناء تقدّمك.
إعداد بيئة المشروع
في نافذة الوحدة الطرفية، أكمل عملية الإعداد من خلال ضبط المشروع النشط وتفعيل خدمات Google Cloud المطلوبة (Cloud Run وVertex AI وما إلى ذلك).
👉💻 في نافذة الأوامر، اضبط رقم تعريف المشروع:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 تفعيل الخدمات المطلوبة:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
تثبيت الحِزم التابعة
👉💻 انتقِل إلى المستوى 5 وثبِّت حِزم Python المطلوبة:
cd $HOME/way-back-home/level_5
uv sync
تتضمّن التبعيات الرئيسية ما يلي:
الحزمة | الغرض |
| إطار عمل عالي الأداء على الويب لمحطة Satellite Station وبث SSE |
| مطلوب خادم ASGI لتشغيل تطبيق FastAPI |
| مجموعة أدوات تطوير الوكيل المستخدَمة لإنشاء Formation Agent |
| مكتبة بروتوكولات للتواصل الموحّد بين العملاء |
| عميل Kafka غير المتزامن لـ Event Loop |
| برنامج أصلي للوصول إلى نماذج Gemini |
| عمليات حسابية متّجهة وحسابات إحداثيات للمحاكاة |
| إتاحة التواصل الثنائي الاتجاه في الوقت الفعلي |
| إدارة متغيرات البيئة وأسرار الإعدادات |
| التعامل بكفاءة مع أحداث Server-Sent Events (SSE) |
| مكتبة HTTP بسيطة لطلبات البيانات من واجهات برمجة التطبيقات الخارجية |
التحقّق من الإعداد
قبل أن نبدأ في كتابة الرمز، لنحرص على أن تكون جميع الأنظمة جاهزة. نفِّذ نص التحقّق البرمجي لتدقيق مشروع Google Cloud وواجهات برمجة التطبيقات وتبعيات Python.
👉💻 تشغيل نص التحقّق البرمجي:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 ستظهر لك سلسلة من علامات الصح الخضراء (✅).
- إذا ظهرت لك علامات X حمراء (❌)، اتّبِع أوامر الإصلاح المقترَحة في الناتج (مثل
gcloud services enable ...أوpip install ...). - ملاحظة: يمكن تجاهل التحذير الأصفر بشأن
.envفي الوقت الحالي، وسننشئ هذا الملف في الخطوة التالية.
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
3- تنسيق مواضع البودكاست باستخدام نموذج لغوي كبير
نحتاج إلى بناء "عقل" عملية الإنقاذ. سيكون هذا وكيلًا تم إنشاؤه باستخدام حزمة تطوير الوكلاء (ADK) من Google. الغرض الوحيد منه هو العمل كأداة متخصصة لتحديد المواقع الجغرافية. في حين أنّ النماذج اللغوية الكبيرة العادية تحب الدردشة، نحتاج في الفضاء السحيق إلى البيانات، وليس الحوار. سنبرمج هذا الوكيل لتلقّي أمر مثل "نجمة" وعرض إحداثيات JSON أولية لـ 15 وحدة.

إنشاء بنية الوكيل
👉💻 شغِّل الأوامر التالية للانتقال إلى دليل الوكيل وبدء معالج إنشاء حزمة تطوير البرامج (ADK):
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
ستطلق واجهة سطر الأوامر معالج إعداد تفاعليًا. استخدِم الردود التالية لإعداد برنامجك الآلي:
- اختيار نموذج: اختَر الخيار 1 (Gemini Flash).
- ملاحظة: قد يختلف الإصدار المحدّد. اختَر دائمًا صيغة "Flash" للحصول على سرعة أكبر.
- اختيار واجهة خلفية: اختَر الخيار 2 (Vertex AI).
- إدخال رقم تعريف مشروع Google Cloud: اضغط على Enter لقبول القيمة التلقائية (التي تم رصدها من بيئتك).
- أدخِل منطقة Google Cloud: اضغط على Enter لقبول القيمة التلقائية (
us-central1).
👀 من المفترض أن يبدو تفاعلك مع الوحدة الطرفية على النحو التالي:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
من المفترض أن تظهر لك رسالة Agent created تفيد بنجاح العملية. يؤدي ذلك إلى إنشاء رمز أساسي سنعدّله الآن.
👉✏️ انتقِل إلى ملف $HOME/way-back-home/level_5/agent/formation/agent.py الذي تم إنشاؤه حديثًا وافتحه في المحرّر. استبدِل المحتوى الكامل للملف بالرمز البرمجي أدناه. يعدّل هذا الإجراء اسم الوكيل ويقدّم مَعلمات التشغيل الدقيقة الخاصة به.
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- الدقة الهندسية: من خلال تحديد "حجم لوحة العرض" و"الهوامش الآمنة" في طلب النظام، نضمن ألا يضع الوكيل اللوحات خارج الشاشة أو تحت عناصر واجهة المستخدم.
- فرض تنسيق JSON: من خلال توجيه النموذج اللغوي الكبير إلى "رفض الإجابة عن الأسئلة غير المتعلقة بالتنسيق" وتقديم "بدون مقدمة"، نضمن عدم تعطُّل الرمز البرمجي التالي (التابع) عند محاولة تحليل الرد.
- المنطق المنفصل: لا يعرف هذا الوكيل عن Kafka بعد. وهي تعرف فقط كيفية إجراء العمليات الحسابية. في الخطوة التالية، سنغلّف هذا "الدماغ" في خادم Kafka.
اختبار الوكيل محليًا
قبل ربط الوكيل بـ "النظام العصبي" في Kafka، يجب التأكّد من أنّه يعمل بشكل صحيح. يمكنك التفاعل مع الوكيل مباشرةً في نافذة الأوامر للتحقّق من أنّه ينتج إحداثيات JSON صالحة.
👉💻 استخدِم الأمر adk run لبدء جلسة محادثة مع الوكيل.
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- الإدخال: اكتب
Circleواضغط على Enter.- معايير النجاح: يجب أن تظهر لك قائمة JSON أولية (مثل
[{"x": 400, "y": 200}, ...]). تأكَّد من عدم وجود نص بتنسيق Markdown، مثل "في ما يلي الإحداثيات:" قبل JSON.
- معايير النجاح: يجب أن تظهر لك قائمة JSON أولية (مثل
- الإدخال: اكتب
Lineواضغط على Enter.- معايير النجاح: تأكَّد من أنّ الإحداثيات تنشئ خطًا أفقيًا (يجب أن تكون قيم y متشابهة).
بعد التأكّد من أنّ نتائج الوكيل هي JSON نظيف، يمكنك تضمينها في خادم Kafka.
👉💻 اضغط على Ctrl+C للخروج.
4. إنشاء خادم A2A لوكيل التكوين
فهم A2A (التواصل بين وكيلين)
بروتوكول A2A (من وكيل إلى وكيل) هو معيار مفتوح مصمّم لإتاحة التشغيل التفاعلي السلس بين وكلاء الذكاء الاصطناعي. يتيح هذا الإطار للوكلاء تجاوز تبادل النصوص البسيط، ما يسمح لهم بتفويض المهام وتنسيق الإجراءات المعقّدة والعمل كوحدة متماسكة لتحقيق الأهداف المشتركة في نظام شامل للبيانات موزّع.

التعرّف على عمليات نقل البيانات من تطبيق إلى تطبيق: HTTP وgRPC وKafka
يوفّر بروتوكول A2A طريقتَين مختلفتَين للتواصل بين العملاء والوكلاء، وتخدم كلّ طريقة احتياجات معمارية مختلفة. HTTP (JSON-RPC) هو المعيار التلقائي والشامل الذي يعمل على مستوى جميع بيئات الويب. أما gRPC، فهو خيارنا العالي الأداء الذي يستفيد من مخازن البروتوكولات المؤقتة لإجراء عمليات تواصل فعّالة ومحدّدة الأنواع. وفي المختبر، أوفّر أيضًا وسيلة نقل Kafka. وهي عملية تنفيذ مخصّصة مصمَّمة لبُنى قوية مستندة إلى الأحداث، حيث تكون أولوية فصل الأنظمة.

في الخلفية، تتعامل عمليات النقل هذه مع تدفق البيانات بشكل مختلف تمامًا. في نموذج HTTP، يرسل العميل طلب JSON ويُبقي الاتصال مفتوحًا، منتظرًا أن ينهي الوكيل مهمته ويعرض النتيجة الكاملة دفعة واحدة. تعمل gRPC على تحسين ذلك من خلال استخدام البيانات الثنائية وHTTP/2، ما يتيح دورات بسيطة لطلب الاستجابة والبث في الوقت الفعلي حيث يرسل الوكيل التحديثات (مثل "فكرة" أو "تم إنشاء عنصر") فور حدوثها. يعمل تنفيذ Kafka بشكل غير متزامن: ينشر العميل طلبًا في "موضوع الطلب" المتين للغاية ويستمع إلى "موضوع الرد" المنفصل. يستلم الخادم الرسالة عندما يكون ذلك ممكنًا، ويعالجها، ثم ينشر النتيجة، ما يعني أنّ الجهازين لا يتواصلان مع بعضهما البعض مباشرةً.
يعتمد الاختيار على متطلباتك المحدّدة من حيث السرعة والتعقيد والثبات. يُعدّ HTTP الأسهل للبدء واستكشاف الأخطاء وإصلاحها، ما يجعله مثاليًا عمليات الدمج البسيطة، بينما يُعدّ gRPC الخيار الأفضل للتواصل الداخلي بين الخدمات حيث يكون الحدّ الأدنى من وقت الاستجابة وتحديثات مهام البث أمرًا بالغ الأهمية. ومع ذلك، يبرز Kafka كخيار مرن، لأنّه يخزّن الطلبات على القرص في قائمة انتظار، وتستمر مهامك حتى إذا تعطل خادم الوكيل أو أعيد تشغيله، ما يوفّر مستوى من المتانة والفصل لا يمكن أن يوفّرهما HTTP أو gRPC.
طبقة نقل مخصّصة: Kafka
تعمل Kafka كبنية أساسية غير متزامنة تفصل بين مركز العمليات (Formation Agent) وعناصر التحكّم المادية (محطة الأقمار الصناعية). بدلاً من إجبار النظام على انتظار اتصال متزامن بينما يحسب الوكيل متجهات معقّدة، ينشر الوكيل نتائجه كأحداث في موضوع Kafka. يعمل هذا كذاكرة تخزين مؤقت ثابتة، ما يسمح للقمر الصناعي بتلقّي التعليمات بالسرعة التي تناسبه ويضمن عدم فقدان بيانات التشكيل أبدًا، حتى مع حدوث تأخير كبير في الشبكة أو تعطُّل مؤقت في النظام.
باستخدام Kafka، يمكنك تحويل عملية بطيئة وخطية إلى مسار بث مرن تتدفق فيه التعليمات وبيانات القياس عن بُعد بشكل مستقل، ما يحافظ على استجابة شاشة العرض الأمامية (HUD) للمهمة حتى أثناء المعالجة المكثفة للذكاء الاصطناعي.

ما هي Kafka؟
Kafka هي منصة موزّعة لبث الأحداث. في بنية قائمة على الأحداث (EDA):
- ينشر المنتجون الرسائل في "المواضيع".
- يشترك المستهلكون في هذه المواضيع ويتفاعلون عندما تصل رسالة.
لماذا يجب استخدام Kafka؟
يؤدي ذلك إلى فصل أنظمتك. يعمل Formation Agent بشكل مستقل، وينتظر الطلبات الواردة بدون الحاجة إلى معرفة هوية المرسل أو حالته. يؤدي ذلك إلى فصل المسؤولية، ما يضمن بقاء سير العمل سليمًا حتى في حال توقّف Satellite عن العمل، إذ يخزّن Kafka الرسائل ببساطة إلى أن يعيد Satellite الاتصال.
ماذا عن Google Cloud Pub/Sub؟
يمكنك بالتأكيد استخدام Google Cloud Pub/Sub لهذا الغرض. Pub/Sub هي خدمة مراسلة بدون خادم من Google. على الرغم من أنّ Kafka مناسبة تمامًا لعمليات البث العالية الإنتاجية و "القابلة لإعادة التشغيل"، غالبًا ما يتم تفضيل Pub/Sub لسهولة استخدامها. في هذا التمرين العملي، نستخدم Kafka لمحاكاة ناقل رسائل قوي ودائم.
بدء تشغيل مجموعة Kafka المحلية
انسخ والصق كتلة الأوامر الكاملة أدناه في الوحدة الطرفية. سيؤدي ذلك إلى تنزيل صورة Kafka الرسمية وبدء تشغيلها في الخلفية.
👉💻 نفِّذ هذه الأوامر في الوحدة الطرفية:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 تأكَّد من أنّ الحاوية تعمل باستخدام الأمر docker ps.
docker ps
👀 من المفترض أن يظهر لك ناتج يؤكّد أنّ الحاوية mission-kafka قيد التشغيل وأنّ المنفذ 9092 مكشوف.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
ما هو موضوع Kafka؟
يمكن اعتبار موضوع Kafka قناة أو فئة مخصّصة للرسائل. وهي تشبه دفترًا يتم فيه تخزين سجلّات الأحداث بالترتيب الذي تم إنشاؤها به. يكتب المنتجون رسائل حول مواضيع معيّنة، ويقرأ المستهلكون من تلك المواضيع. يؤدي ذلك إلى فصل المُرسِل عن المُستلِم، إذ لا يحتاج المنتج إلى معرفة المستهلك الذي سيقرأ البيانات، بل يحتاج فقط إلى إرسالها إلى "القناة" الصحيحة. في مهمتنا، سننشئ موضوعَين: أحدهما لإرسال طلبات تشكيل إلى البرنامج، والآخر ليتمكّن البرنامج من نشر ردوده ليقرأها القمر الصناعي.

👉💻 نفِّذ الأوامر التالية لإنشاء المواضيع المطلوبة داخل حاوية Docker قيد التشغيل.
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 للتأكّد من أنّ قنواتك مفتوحة، شغِّل أمر القائمة:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 من المفترض أن تظهر لك أسماء المواضيع التي أنشأتها للتو.
a2a-formation-request a2a-reply-satellite-dashboard
تم الآن إعداد مثيل Kafka بالكامل وأصبح جاهزًا لتوجيه البيانات المهمة للغاية.
تنفيذ خادم Kafka A2A
يضع بروتوكول "الوكيل إلى الوكيل" (A2A) إطارًا موحّدًا لإمكانية التشغيل التفاعلي بين الأنظمة المستقلة التي تعمل كوكلاء. ويسمح هذا البروتوكول للوكلاء الذين طوّرهم فِرق مختلفة أو الذين يعملون على بنى أساسية مختلفة باكتشاف بعضهم البعض والتعاون بفعالية بدون الحاجة إلى منطق تكامل مخصّص لكل اتصال.
التنفيذ المرجعي، a2a-python، هو مكتبة أساسية لتشغيل هذه التطبيقات المستندة إلى الوكلاء. من الميزات الأساسية في تصميمه القابلية للتوسيع، فهو يجرّد طبقة الاتصال، ما يتيح للمطوّرين استبدال بروتوكولات مثل HTTP ببروتوكولات أخرى.

في هذا المشروع، نستفيد من إمكانية التوسيع هذه باستخدام عملية تنفيذ مخصّصة لـ Kafka: a2a-python-kafka. سنستخدم هذا التنفيذ لتوضيح كيف يتيح لك معيار A2A تكييف عملية التواصل بين البرامج لتناسب الاحتياجات المعمارية المختلفة، وفي هذه الحالة، استبدال بروتوكول HTTP المتزامن بحافلة أحداث غير متزامنة.
تفعيل ميزة "التطبيق إلى التطبيق" لـ Formation Agent
سنغلّف الآن وكيلنا في خادم A2A، ما يحوّله إلى خدمة قابلة للتشغيل التفاعلي يمكنها تنفيذ ما يلي:
- الاستماع إلى المهام من موضوع Kafka
- تسليم المهام المستلَمة إلى وكيل ADK الأساسي لمعالجتها
- انشر النتيجة في موضوع ردّ.
👉✏️ في $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py، استبدِل #REPLACE-CREATE-KAFKA-A2A-SERVER بالرمز التالي:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
يُعدّ هذا الرمز المكوّنات الرئيسية:
- المنفِّذ: يوفّر وقت التشغيل للوكيل (التعامل مع الذاكرة وبيانات الاعتماد وما إلى ذلك).
- مخزن المهام: يتتبّع حالة الطلبات أثناء انتقالها من "في انتظار المراجعة" إلى "مكتملة".
- Agent Executor: يستلم مهمة من Kafka ويمررها إلى الوكيل لاحتساب الإحداثيات.
- KafkaServerApp: يدير الاتصال الفعلي بخادم وسيط Kafka.

ضبط متغيرات البيئة
أنشأ إعداد ADK ملف .env يتضمّن إعدادات Google Vertex AI داخل مجلد الوكيل. علينا نقل هذا الملف إلى جذر المشروع وإضافة إحداثيات مجموعة Kafka.
نفِّذ الأوامر التالية لنسخ الملف وإلحاق عنوان خادم Kafka:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
التحقّق من حلقة A2A Interstellar
سنحرص الآن على أن تعمل حلقة الأحداث غير المتزامنة بشكل صحيح من خلال اختبار مباشر: إرسال إشارة يدوية من خلال مجموعة Kafka ومراقبة استجابة البرنامج.

لعرض دورة الحياة الكاملة لأحد الأحداث، سنستخدم ثلاث وحدات طرفية منفصلة.
المحطة الطرفية A: وكيل التكوين (خادم A2A Kafka)
👉💻 يشغّل هذا الجهاز عملية Python التي تستمع إلى Kafka وتستخدم Gemini لإجراء العمليات الحسابية الهندسية.
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
انتظِر إلى أن يظهر لك:
[INFO] Kafka Server App Started. Starting to consume requests...
المحطة الطرفية B: المستمع عبر الأقمار الصناعية (المستهلك)
👉💻 في هذه الوحدة الطرفية، سنستمع إلى موضوع الردّ. يحاكي هذا الإجراء انتظار القمر الصناعي لتلقّي التعليمات.
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
ستبدو هذه المحطة غير نشطة. في انتظار أن ينشر "الوكيل" رسالة
Terminal C: The Commander's Signal (Producer)
👉💻 الآن، سنرسل طلبًا بتنسيق A2A إلى الموضوع a2a-formation-request. يجب تضمين عناوين Kafka محدّدة لكي يعرف "الوكيل" المكان الذي يجب إرسال الإجابة إليه.
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
تحليل النتيجة
👀 إذا نجحت عملية الربط، انتقِل إلى الجهاز الطرفي (ب). من المفترض أن يظهر على الفور جزء كبير من JSON. سيبدأ بالرأس الذي أرسلناه correlation_id:ping-manual-01. متبوعًا بعنصر task إذا نظرت عن كثب إلى القسم parts ضمن ملف JSON هذا، ستظهر لك إحداثيات X وY الأولية التي حسبتها Gemini لـ 15 وحدة:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
لقد تمكّنت من فصل الوكيل عن المستلِم بنجاح. لم يعُد "التشويش بين النجوم" في وقت استجابة الطلب مهمًا لأنّ نظامنا أصبح الآن يعتمد على الأحداث بالكامل.
قبل المتابعة، أوقِف العمليات التي تعمل في الخلفية لإتاحة منافذ الشبكة.
👉💻 في كل نافذة طرفية (A وB وC):
- اضغط على
Ctrl + Cلإنهاء العملية الجارية.
5- محطة الأقمار الصناعية (عميل A2A Kafka وSSE)
في هذه الخطوة، سننشئ محطة قمر صناعي. هذه هي نقطة الربط بين مجموعة Kafka وشاشة العرض المرئية الخاصة بالطيار (واجهة React الأمامية). يعمل هذا الخادم كـ عميل Kafka (للتواصل مع "الوكيل") وبرنامج بث SSE (للتواصل مع المتصفّح).
ما هو عميل Kafka؟
يمكنك اعتبار مجموعة Kafka بمثابة محطة إذاعية. عميل Kafka هو جهاز استقبال الراديو. يسمح KafkaClientTransport لتطبيقنا بما يلي:
- إنشاء رسالة: إرسال "مهمة" (مثلاً "تكوّن النجوم") إلى "الوكيل".
- استهلاك ردّ: استمع إلى "موضوع الردّ" محدّد للحصول على الإحداثيات من "الوكيل".
1. بدء الاتصال
نستخدم معالج الأحداث lifespan في FastAPI لضمان بدء اتصال Kafka عند تشغيل الخادم وإغلاقه بشكلٍ سليم عند إيقافه.
👉✏️ في $HOME/way-back-home/level_5/satellite/main.py، استبدِل #REPLACE-CONNECT-TO-KAFKA-CLUSTER بالرمز التالي:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. إرسال أمر
عند النقر على زر في لوحة البيانات، يتم تشغيل نقطة النهاية /formation. يعمل هذا المكوّن كمنتج، حيث يغلّف طلبك في Message رسمي من A2A ويرسله إلى الوكيل.

المنطق الأساسي:
- الاتصال غير المتزامن: يرسل
kafka_transport.send_messageالطلب وينتظر وصول الإحداثيات الجديدة إلىreply_topic. - تحليل الردود: قد يعرض Gemini إحداثيات داخل مربّعات Markdown (مثل
json ...). تزيل الرموز البرمجية أدناه هذه الأحرف وتحوّل السلسلة إلى قائمة نقاط Python.
👉✏️ في $HOME/way-back-home/level_5/satellite/main.py، استبدِل #REPLACE-FORMATION-REQUEST بالرمز التالي:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
أحداث يتم إرسالها من الخادم (SSE)
تستخدِم واجهات برمجة التطبيقات العادية نموذج "الطلب والاستجابة". بالنسبة إلى شاشة العرض الرئيسية، نحتاج إلى "بث مباشر" لمواقع المركبات الفضائية.
سبب استخدام SSE على عكس WebSockets (التي تكون ثنائية الاتجاه وأكثر تعقيدًا)، توفّر SSE بثًا بسيطًا للبيانات أحادي الاتجاه من الخادم إلى المتصفح. إنّها مثالية للوحات البيانات أو مؤشرات الأسهم أو قياسات المسافات بين النجوم.

طريقة عملها في الرمز: ننشئ event_generator، وهي حلقة لا نهائية تأخذ الموضع الحالي لجميع وحدات البود البالغ عددها 15 كل نصف ثانية و "تدفعها" إلى المتصفح كتحديث.
👉✏️ في $HOME/way-back-home/level_5/satellite/main.py، استبدِل #REPLACE-SSE-STREAM بالرمز التالي:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
تنفيذ "حلقة المهمة الكاملة"
لنتأكّد من أنّ النظام يعمل بشكل كامل قبل إطلاق واجهة المستخدم النهائية. سنفعّل الوكيل يدويًا ونرى حمولة البيانات الأولية على الشبكة.

افتح ثلاث علامات تبويب منفصلة في الوحدة الطرفية.
المحطة الطرفية A: وكيل التكوين (خادم A2A)
👉💻 هذا هو وكيل ADK الذي يستمع إلى المهام ويجري العمليات الحسابية الهندسية.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
المحطة الطرفية B: محطة الأقمار الصناعية (عميل Kafka)
👉💻 يعمل خادم FastAPI هذا كـ "مستلِم"، ويستمع إلى ردود Kafka ويحوّلها إلى بث مباشر من خلال SSE.
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
المبنى C: شاشة العرض الأمامي اليدوية
إرسال أمر التكوين (المشغّل): 👉💻 في الوحدة الطرفية C نفسها، شغِّل عملية التكوين:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 من المفترض أن تظهر لك الإحداثيات الجديدة.
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
يؤكّد هذا الإجراء أنّ جهاز Satellite قد عدّل إحداثيات الوحدة الداخلية.
👉💻 سنستخدم curl للاستماع أولاً إلى بث بيانات القياس عن بُعد المباشر ثم بدء تغيير في التشكيل.
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 شاهِد ناتج الأمر curl -N. ستبدأ إحداثيات x وy في أحداث pod_update بعكس المواضع الجديدة لتشكيل النجمة.
قبل المتابعة، أوقِف جميع العمليات الجارية لإتاحة منافذ الاتصال.
في كل محطة (A وB وC ومحطة التشغيل): اضغط على Ctrl + C.
6. Go Rescue!
لقد أنشأت النظام بنجاح. حان الوقت الآن لإضفاء الحيوية على هذه المهمة. سنطلق الآن شاشة العرض العلوية (HUD) المستندة إلى React. ترتبط لوحة البيانات هذه بمحطة الأقمار الصناعية من خلال SSE، ما يتيح لك عرض 15 وحدة في الوقت الفعلي.

عند إصدار أمر، لا تستدعي دالة فحسب، بل تُفعّل حدثًا ينتقل عبر Kafka، ويعالجه وكيل مستند إلى الذكاء الاصطناعي، ثم يعود إلى شاشتك كبيانات قياس عن بُعد مباشرة.

افتح علامتَي تبويب منفصلتَين في الوحدة الطرفية.
المحطة الطرفية A: وكيل التكوين (خادم A2A)
👉💻 هذا هو "وكيل حزمة تطوير التطبيقات" الذي يستمع إلى المهام وينفّذ العمليات الحسابية الهندسية باستخدام Gemini. في الوحدة الطرفية، شغِّل:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
المبنى B: محطة الأقمار الصناعية ولوحة البيانات المرئية
👉💻 أولاً، أنشئ تطبيق الواجهة الأمامية.
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 الآن، ابدأ خادم FastAPI الذي سيعرض كلاً من منطق الخلفية وواجهة المستخدم الأمامية.
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
الإطلاق وإثبات الملكية
- 👉 فتح المعاينة: في شريط أدوات Cloud Shell، انقر على رمز معاينة الويب. انقر على تغيير المنفذ، واضبطه على 8000، ثم انقر على تغيير ومعاينة. سيتم فتح علامة تبويب متصفح جديدة تعرض شاشة العرض الأمامي (HUD) في Starfield.

- 👈 التحقّق من صحة مصدر بيانات القياس عن بُعد:
- بعد تحميل واجهة المستخدم، من المفترض أن تظهر 15 كرة في شكل مبعثر عشوائي.
- إذا كانت الأجهزة تنبض بشكل خفيف أو "تهتز"، يعني ذلك أنّ بث SSE نشط وأنّ محطة الأقمار الصناعية تبث مواقعها الجغرافية بنجاح.

- 👉 بدء تشكيل: انقر على الزر "نجمة" في لوحة البيانات.

- 👀 تتبُّع Event Loop: راقِب وحدات التحكّم لمعرفة كيفية عمل البنية:
- سيسجّل الجهاز الطرفي B (محطة الأقمار الصناعية) ما يلي:
Sending A2A Message: 'Create a STAR formation'. - ستعرِض المحطة الطرفية A (Formation Agent) النشاط أثناء استشارة Gemini.
- سيسجّل المبنى B (محطة الأقمار الصناعية):
Received A2A Responseويحلّل الإحداثيات.
- سيسجّل الجهاز الطرفي B (محطة الأقمار الصناعية) ما يلي:
- 👀 التأكيد المرئي: شاهِد 15 كرة على لوحة البيانات وهي تتحرّك بسلاسة من مواضعها العشوائية لتشكّل نجمة خماسية الأضلاع.
- 👈 التجربة:
- للحصول على 3 تشكيلات مختلفة، جرِّب "X" أو "LINE".

- الجمهور المخصّص حسب النية بالشراء: استخدِم الإدخال اليدوي لكتابة شيء فريد، مثل "قلب" أو "مثلث".

- بما أنّك تستخدم الذكاء الاصطناعي التوليدي، سيحاول الوكيل إجراء العمليات الحسابية لأي شكل هندسي يمكنك وصفه.
- للحصول على 3 تشكيلات مختلفة، جرِّب "X" أو "LINE".
بعد إنشاء 3 أنماط، تكون قد أعدت إنشاء الاتصال بنجاح. 
تمت المهمة بنجاح!
يستقرّ البث أثناء تدفّق البيانات عبر التشويش بدون انقطاع. وبأمر منك، تبدأ الكبسولات القديمة الـ 15 رقصتها المتزامنة عبر النجوم.

خلال ثلاث مراحل شاقة من المعايرة، شاهدت بيانات القياس عن بُعد وهي تستقر في مكانها. ومع كل محاذاة، أصبحت الإشارة أقوى، إلى أن اخترقت أخيرًا التشويش بين النجوم كمنارة أمل.
بفضل جهودك وتنفيذك المتقن لـ "الوكيل المستند إلى الأحداث"، تم نقل الناجين الخمسة جوًا من سطح الكوكب X-42 وهم الآن بأمان على متن سفينة الإنقاذ. بفضل تبرّعك، تم إنقاذ خمسة أشخاص.
إذا شاركت في المستوى 0، لا تنسَ التحقّق من مستوى تقدّمك في مهمة "العودة إلى الوطن". تستمر رحلتك للعودة إلى النجوم.