১. মিশন

তুমি এক অজানা ক্ষেত্রের নীরবতায় ভেসে বেড়াচ্ছ। এক বিশাল সৌরশক্তির স্পন্দন তোমার জাহাজকে একটা ফাটলের মধ্য দিয়ে ছিঁড়ে ফেলেছে, তোমাকে মহাবিশ্বের এমন এক পকেটে আটকে রেখেছে যার অস্তিত্ব কোন তারকা চার্টে নেই।
কয়েকদিন ধরে যন্ত্রণাদায়ক মেরামতের পর, অবশেষে তুমি তোমার পায়ের নীচে ইঞ্জিনের শব্দ অনুভব করবে। তোমার রকেট জাহাজটি ঠিক করা হয়েছে। তুমি এমনকি মাদারশিপে একটি দীর্ঘ-পাল্লার আপলিঙ্কও নিশ্চিত করতে পেরেছো। তুমি প্রস্থানের জন্য প্রস্তুত। তুমি বাড়ি যেতে প্রস্তুত।
কিন্তু যখন আপনি জাম্প ড্রাইভে অংশগ্রহণের জন্য প্রস্তুতি নিচ্ছেন, তখন একটি বিপদ সংকেত স্থিরকে ভেদ করে। আপনার সেন্সরগুলি একটি সাহায্য সংকেত গ্রহণ করে। প্ল্যানেট X-42 এর পৃষ্ঠে পাঁচজন বেসামরিক নাগরিক আটকা পড়েছে। তাদের পালানোর একমাত্র আশা 15টি প্রাচীন পডের উপর নির্ভর করে যেগুলিকে কক্ষপথে তাদের মাদারশিপে বিপদ সংকেত প্রেরণের জন্য সিঙ্ক্রোনাইজ করতে হবে।
তবে, পডগুলি একটি স্যাটেলাইট স্টেশন দ্বারা নিয়ন্ত্রিত হয় যার প্রধান নেভিগেশন কম্পিউটার ক্ষতিগ্রস্ত। পডগুলি লক্ষ্যহীনভাবে ভেসে বেড়াচ্ছে। আমরা স্যাটেলাইটের সাথে একটি ব্যাকডোর সংযোগ স্থাপন করতে সক্ষম হয়েছি, কিন্তু আপলিংকটি তীব্র আন্তঃনাক্ষত্রিক হস্তক্ষেপের দ্বারা জর্জরিত, যার ফলে অনুরোধ-প্রতিক্রিয়া চক্রে ব্যাপক বিলম্ব ঘটে।
চ্যালেঞ্জ
যেহেতু একটি অনুরোধ/প্রতিক্রিয়া মডেল খুব ধীর, তাই আমাদের শব্দের মাধ্যমে টেলিমেট্রি স্ট্রিম করার জন্য সার্ভার-প্রেরিত ইভেন্টস (SSE) সহ একটি ইভেন্ট-চালিত আর্কিটেকচার (EDA) স্থাপন করতে হবে।

আপনাকে একটি কাস্টম এজেন্ট তৈরি করতে হবে যা পডগুলিকে নির্দিষ্ট সংকেত-বুস্টিং ফর্মেশনে (বৃত্ত, তারকা, রেখা) জোর করে প্রয়োগ করার জন্য প্রয়োজনীয় জটিল ভেক্টর গণিত গণনা করতে পারে। আপনাকে এই এজেন্টটিকে স্যাটেলাইটের নতুন স্থাপত্যের সাথে সংযুক্ত করতে হবে।
তুমি কী তৈরি করবে

- একটি রিঅ্যাক্ট-ভিত্তিক হেডস-আপ ডিসপ্লে (HUD) যা রিয়েল-টাইমে ১৫টি পডের একটি বহর কল্পনা এবং নিয়ন্ত্রণ করতে সাহায্য করে।
- গুগল এজেন্ট ডেভেলপমেন্ট কিট (ADK) ব্যবহার করে একটি জেনারেটিভ এআই এজেন্ট যা প্রাকৃতিক ভাষার কমান্ডের উপর ভিত্তি করে পডগুলির জন্য জটিল জ্যামিতিক গঠন গণনা করে।
- একটি পাইথন-ভিত্তিক স্যাটেলাইট স্টেশন ব্যাকএন্ড যা কেন্দ্রীয় হাব হিসেবে কাজ করে, সার্ভার-সেন্ট ইভেন্টস (SSE) এর মাধ্যমে ফ্রন্টএন্ডের সাথে যোগাযোগ করে।
- অ্যাপাচি কাফকা ব্যবহার করে একটি ইভেন্ট-চালিত আর্কিটেকচার যা স্যাটেলাইট নিয়ন্ত্রণ ব্যবস্থা থেকে এআই এজেন্টকে আলাদা করে, স্থিতিস্থাপক এবং অ্যাসিঙ্ক্রোনাস যোগাযোগ সক্ষম করে।
তুমি কি শিখবে
প্রযুক্তি / ধারণা | বিবরণ |
গুগল এডিকে (এজেন্ট ডেভেলপমেন্ট কিট) | আপনি এই কাঠামোটি ব্যবহার করে জেমিনি মডেল দ্বারা চালিত একটি বিশেষায়িত এআই এজেন্ট তৈরি, পরীক্ষা এবং ভারা তৈরি করবেন। |
ইভেন্ট-চালিত স্থাপত্য (EDA) | আপনি একটি ডিকপলড সিস্টেম তৈরির নীতিগুলি শিখবেন যেখানে উপাদানগুলি ইভেন্টগুলির মাধ্যমে অ্যাসিঙ্ক্রোনাসভাবে যোগাযোগ করে, অ্যাপ্লিকেশনটিকে আরও স্থিতিস্থাপক এবং স্কেলেবল করে তোলে। |
আপাচি কাফকা | বিভিন্ন মাইক্রোসার্ভিসের মধ্যে কমান্ড এবং ডেটা প্রবাহ পরিচালনা করার জন্য আপনি কাফকাকে একটি বিতরণকৃত ইভেন্ট স্ট্রিমিং প্ল্যাটফর্ম হিসেবে সেট আপ এবং ব্যবহার করবেন। |
সার্ভার-প্রেরিত ইভেন্ট (SSE) | আপনি সার্ভার থেকে রিয়্যাক্ট ফ্রন্টএন্ডে রিয়েল-টাইম টেলিমেট্রি ডেটা পুশ করার জন্য একটি FastAPI ব্যাকএন্ডে SSE বাস্তবায়ন করবেন, UI ক্রমাগত আপডেট থাকবে। |
A2A (এজেন্ট-টু-এজেন্ট) প্রোটোকল | আপনি শিখবেন কিভাবে আপনার এজেন্টকে একটি A2A সার্ভারে সংযুক্ত করবেন, যা একটি বৃহত্তর এজেন্টিক ইকোসিস্টেমের মধ্যে মানসম্মত যোগাযোগ এবং আন্তঃকার্যক্ষমতা সক্ষম করবে। |
FastAPI সম্পর্কে | আপনি এই উচ্চ-কার্যক্ষমতাসম্পন্ন পাইথন ওয়েব ফ্রেমওয়ার্ক ব্যবহার করে মূল ব্যাকএন্ড পরিষেবা, স্যাটেলাইট স্টেশন তৈরি করবেন। |
প্রতিক্রিয়া | আপনি একটি আধুনিক ফ্রন্টএন্ড অ্যাপ্লিকেশনের সাথে কাজ করবেন যা একটি SSE স্ট্রিমে সাবস্ক্রাইব করে একটি গতিশীল এবং ইন্টারেক্টিভ ইউজার ইন্টারফেস তৈরি করবে। |
সিস্টেম নিয়ন্ত্রণে জেনারেটিভ এআই | আপনি দেখতে পাবেন কিভাবে একটি বৃহৎ ভাষা মডেল (LLM) কে কেবল কথোপকথনের পরিবর্তে নির্দিষ্ট, ডেটা-ভিত্তিক কাজ (যেমন স্থানাঙ্ক তৈরি) সম্পাদনের জন্য প্ররোচিত করা যেতে পারে। |
2. আপনার পরিবেশ সেট আপ করুন
ক্লাউড শেল অ্যাক্সেস করুন
👉গুগল ক্লাউড কনসোলের উপরে অ্যাক্টিভেট ক্লাউড শেল-এ ক্লিক করুন (এটি ক্লাউড শেল প্যানের উপরে টার্মিনাল আকৃতির আইকন), 
👉"Open Editor" বোতামে ক্লিক করুন (এটি দেখতে পেন্সিল দিয়ে খোলা ফোল্ডারের মতো)। এটি উইন্ডোতে Cloud Shell Code Editor খুলবে। আপনি বাম দিকে একটি ফাইল এক্সপ্লোরার দেখতে পাবেন। 
👉ক্লাউড IDE তে টার্মিনালটি খুলুন,

👉💻 টার্মিনালে, নিম্নলিখিত কমান্ড ব্যবহার করে যাচাই করুন যে আপনি ইতিমধ্যেই প্রমাণীকরণপ্রাপ্ত এবং প্রকল্পটি আপনার প্রকল্প আইডিতে সেট করা আছে:
gcloud auth list
আপনার অ্যাকাউন্টটি (ACTIVE) হিসেবে তালিকাভুক্ত দেখতে হবে।
পূর্বশর্ত
ℹ️ লেভেল ০ ঐচ্ছিক (কিন্তু প্রস্তাবিত)
আপনি লেভেল ০ ছাড়াই এই মিশনটি সম্পূর্ণ করতে পারেন, তবে প্রথমে এটি শেষ করলে আরও নিমজ্জিত অভিজ্ঞতা পাওয়া যাবে, যার ফলে আপনি এগিয়ে যাওয়ার সাথে সাথে বিশ্ব মানচিত্রে আপনার আলোকসজ্জার আলো দেখতে পাবেন।
প্রকল্প পরিবেশ সেটআপ করুন
আপনার টার্মিনালে ফিরে, সক্রিয় প্রকল্পটি সেট করে এবং প্রয়োজনীয় Google ক্লাউড পরিষেবাগুলি (ক্লাউড রান, ভার্টেক্স এআই, ইত্যাদি) সক্ষম করে কনফিগারেশন চূড়ান্ত করুন।
👉💻 আপনার টার্মিনালে, প্রজেক্ট আইডি সেট করুন:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 প্রয়োজনীয় পরিষেবাগুলি সক্ষম করুন:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
নির্ভরতা ইনস্টল করুন
👉💻 লেভেল ৫ এ নেভিগেট করুন এবং প্রয়োজনীয় পাইথন প্যাকেজগুলি ইনস্টল করুন:
cd $HOME/way-back-home/level_5
uv sync
মূল নির্ভরতাগুলি হল:
প্যাকেজ | উদ্দেশ্য |
| স্যাটেলাইট স্টেশন এবং SSE স্ট্রিমিংয়ের জন্য উচ্চ-কার্যক্ষমতাসম্পন্ন ওয়েব ফ্রেমওয়ার্ক |
| FastAPI অ্যাপ্লিকেশন চালানোর জন্য ASGI সার্ভার প্রয়োজন |
| ফর্মেশন এজেন্ট তৈরিতে ব্যবহৃত এজেন্ট ডেভেলপমেন্ট কিট |
| স্ট্যান্ডার্ডাইজড যোগাযোগের জন্য এজেন্ট-টু-এজেন্ট প্রোটোকল লাইব্রেরি |
| ইভেন্ট লুপের জন্য অ্যাসিঙ্ক্রোনাস কাফকা ক্লায়েন্ট |
| জেমিনি মডেল অ্যাক্সেস করার জন্য নেটিভ ক্লায়েন্ট |
| সিমুলেশনের জন্য ভেক্টর গণিত এবং স্থানাঙ্ক গণনা |
| রিয়েল-টাইম দ্বি-মুখী যোগাযোগের জন্য সহায়তা |
| পরিবেশের ভেরিয়েবল এবং কনফিগারেশন গোপনীয়তা পরিচালনা করে |
| সার্ভার-প্রেরিত ইভেন্ট (SSE) এর দক্ষ পরিচালনা |
| বহিরাগত API কলের জন্য সহজ HTTP লাইব্রেরি |
সেটআপ যাচাই করুন
কোডটি শুরু করার আগে, আসুন নিশ্চিত করি যে সমস্ত সিস্টেম সবুজ। আপনার Google ক্লাউড প্রজেক্ট, API এবং পাইথন নির্ভরতা নিরীক্ষণের জন্য যাচাইকরণ স্ক্রিপ্টটি চালান।
👉💻 যাচাইকরণ স্ক্রিপ্টটি চালান:
source $HOME/way-back-home/.venv/bin/activate
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 তোমার সবুজ চেক (✅) এর একটি সিরিজ দেখা উচিত।
- যদি আপনি Red Crosses (❌) দেখতে পান, তাহলে আউটপুটে প্রস্তাবিত ফিক্স কমান্ডগুলি অনুসরণ করুন (যেমন,
gcloud services enable ...অথবাpip install ...)। - দ্রষ্টব্য:
.envএর জন্য একটি হলুদ সতর্কতা আপাতত গ্রহণযোগ্য; আমরা পরবর্তী ধাপে সেই ফাইলটি তৈরি করব।
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
৩. এলএলএম ব্যবহার করে পড পজিশন ফরম্যাটিং করা
আমাদের উদ্ধার অভিযানের "মস্তিষ্ক" তৈরি করতে হবে। এটি গুগল ADK (এজেন্ট ডেভেলপমেন্ট কিট) ব্যবহার করে তৈরি একটি এজেন্ট হবে। এর একমাত্র উদ্দেশ্য হল একটি বিশেষায়িত জ্যামিতিক ন্যাভিগেটর হিসেবে কাজ করা। যদিও স্ট্যান্ডার্ড LLM গুলি চ্যাট করতে পছন্দ করে, গভীর স্থানে আমাদের ডেটার প্রয়োজন, সংলাপের নয় । আমরা এই এজেন্টকে "স্টার" এর মতো কমান্ড নেওয়ার জন্য প্রোগ্রাম করব এবং আমাদের 15টি পডের জন্য কাঁচা JSON স্থানাঙ্ক ফেরত দেব।

এজেন্টকে ভারা করা
👉💻 আপনার এজেন্ট ডিরেক্টরিতে নেভিগেট করতে এবং ADK তৈরির উইজার্ড শুরু করতে নিম্নলিখিত কমান্ডগুলি চালান:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
CLI একটি ইন্টারেক্টিভ সেটআপ উইজার্ড চালু করবে। আপনার এজেন্ট কনফিগার করতে নিম্নলিখিত প্রতিক্রিয়াগুলি ব্যবহার করুন:
- একটি মডেল নির্বাচন করুন : বিকল্প ১ (জেমিনি ফ্ল্যাশ) নির্বাচন করুন।
- দ্রষ্টব্য: নির্দিষ্ট সংস্করণটি ভিন্ন হতে পারে। গতির জন্য সর্বদা "ফ্ল্যাশ" ভেরিয়েন্টটি বেছে নিন।
- একটি ব্যাকএন্ড নির্বাচন করুন : বিকল্প 2 (ভার্টেক্স এআই) নির্বাচন করুন।
- গুগল ক্লাউড প্রজেক্ট আইডি লিখুন : ডিফল্ট (আপনার পরিবেশ থেকে সনাক্ত করা) গ্রহণ করতে এন্টার টিপুন।
- গুগল ক্লাউড রিজিয়নে প্রবেশ করুন : ডিফল্ট (
us-central1) গ্রহণ করতে এন্টার টিপুন।
👀 আপনার টার্মিনাল ইন্টারঅ্যাকশনটি এর মতো দেখতে হবে:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
আপনি একটি Agent created সাফল্যের বার্তা দেখতে পাবেন। এটি এখন আমরা যে স্কেলিটন কোডটি পরিবর্তন করব তা তৈরি করে।
👉✏️ আপনার এডিটরে নতুন তৈরি $HOME/way-back-home/level_5/agent/formation/agent.py ফাইলটি খুলুন এবং খুলুন। ফাইলের সম্পূর্ণ বিষয়বস্তু নীচের কোড দিয়ে প্রতিস্থাপন করুন। এটি এজেন্টের নাম আপডেট করে এবং এর কঠোর অপারেশনাল প্যারামিটার প্রদান করে।
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- জ্যামিতিক নির্ভুলতা : সিস্টেম প্রম্পটে "ক্যানভাসের আকার" এবং "নিরাপদ মার্জিন" নির্ধারণ করে, আমরা নিশ্চিত করি যে এজেন্ট পডগুলিকে স্ক্রিনের বাইরে বা UI উপাদানের নীচে না রাখে।
- JSON এনফোর্সমেন্ট : LLM-কে "নন-ফর্মেশন প্রশ্নের উত্তর দিতে অস্বীকৃতি জানাতে" এবং "কোনও প্রস্তাবনা নেই" বলতে বলার মাধ্যমে, আমরা নিশ্চিত করি যে আমাদের ডাউনস্ট্রিম কোড (স্যাটেলাইট) প্রতিক্রিয়া বিশ্লেষণ করার সময় ক্র্যাশ না করে।
- ডিকপলড লজিক : এই এজেন্টটি এখনও কাফকা সম্পর্কে জানে না। এটি কেবল গণিত করতে জানে। পরবর্তী ধাপে, আমরা এই "ব্রেন" কে একটি কাফকা সার্ভারে মুড়ে দেব।
স্থানীয়ভাবে এজেন্ট পরীক্ষা করুন
এজেন্টটিকে কাফকা "স্নায়ুতন্ত্র" এর সাথে সংযুক্ত করার আগে, আমাদের অবশ্যই নিশ্চিত করতে হবে যে এটি সঠিকভাবে কাজ করছে। আপনি আপনার এজেন্টের সাথে সরাসরি টার্মিনালে যোগাযোগ করতে পারেন যাতে এটি বৈধ JSON স্থানাঙ্ক তৈরি করে তা যাচাই করতে পারেন।
👉💻 আপনার এজেন্টের সাথে চ্যাট সেশন শুরু করতে adk run কমান্ডটি ব্যবহার করুন।
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- ইনপুট :
Circleটাইপ করুন এবং এন্টার টিপুন।- সাফল্যের মানদণ্ড : আপনার একটি কাঁচা JSON তালিকা দেখা উচিত (যেমন,
[{"x": 400, "y": 200}, ...])। নিশ্চিত করুন যে JSON-এর আগে "Here are the coordinates:" এর মতো কোনও মার্কডাউন টেক্সট নেই।
- সাফল্যের মানদণ্ড : আপনার একটি কাঁচা JSON তালিকা দেখা উচিত (যেমন,
- ইনপুট :
Lineটাইপ করুন এবং এন্টার টিপুন।- সাফল্যের মানদণ্ড : স্থানাঙ্কগুলি যাচাই করে একটি অনুভূমিক রেখা তৈরি করুন (y-মানগুলি একই রকম হওয়া উচিত)।
একবার আপনি নিশ্চিত হয়ে নিন যে এজেন্টটি পরিষ্কার JSON আউটপুট দেয়, আপনি এটি কাফকা সার্ভারে মোড়ানোর জন্য প্রস্তুত।
👉💻 বেরিয়ে আসতে Ctrl+C টিপুন।
৪. ফর্মেশন এজেন্টের জন্য একটি A2A সার্ভার তৈরি করা
A2A (এজেন্ট-টু-এজেন্ট) বোঝা
A2A (এজেন্ট-টু-এজেন্ট) প্রোটোকল হল একটি উন্মুক্ত মান যা AI এজেন্টদের মধ্যে নিরবচ্ছিন্ন আন্তঃকার্যক্ষমতা সক্ষম করার জন্য ডিজাইন করা হয়েছে। এই কাঠামো এজেন্টদের সহজ টেক্সট বিনিময়ের বাইরে যেতে ক্ষমতা দেয়, তাদের কাজগুলি অর্পণ করতে, জটিল ক্রিয়াগুলি সমন্বয় করতে এবং একটি বিতরণকৃত বাস্তুতন্ত্রে ভাগ করা লক্ষ্য অর্জনের জন্য একটি সমন্বিত ইউনিট হিসাবে কাজ করার অনুমতি দেয়।

A2A পরিবহন বোঝা: HTTP, gRPC, এবং কাফকা
A2A প্রোটোকল ক্লায়েন্ট এবং এজেন্টদের যোগাযোগের জন্য দুটি স্বতন্ত্র উপায় প্রদান করে, প্রতিটি ভিন্ন স্থাপত্যের চাহিদা পূরণ করে। HTTP (JSON-RPC) হল ডিফল্ট, সর্বব্যাপী মান যা সমস্ত ওয়েব পরিবেশে সর্বজনীনভাবে কাজ করে। gRPC হল আমাদের উচ্চ-কার্যক্ষমতাসম্পন্ন বিকল্প, দক্ষ, কঠোরভাবে টাইপ করা যোগাযোগের জন্য প্রোটোকল বাফার ব্যবহার করে। এবং ল্যাবে, আমি একটি কাফকা ট্রান্সপোর্টও প্রদান করি। এটি একটি কাস্টম বাস্তবায়ন যা শক্তিশালী, ইভেন্ট-চালিত আর্কিটেকচারের জন্য ডিজাইন করা হয়েছে যেখানে ডিকাপলিং সিস্টেমগুলিকে অগ্রাধিকার দেওয়া হয়।

হুডের নিচে, এই পরিবহনগুলি ডেটা প্রবাহকে বেশ ভিন্নভাবে পরিচালনা করে। HTTP মডেলে, ক্লায়েন্ট একটি JSON অনুরোধ পাঠায় এবং সংযোগটি খোলা রাখে, এজেন্টের কাজ শেষ করার এবং একবারে সম্পূর্ণ ফলাফল ফেরত দেওয়ার জন্য অপেক্ষা করে। gRPC বাইনারি ডেটা এবং HTTP/2 ব্যবহার করে এটিকে অপ্টিমাইজ করে, যা সহজ অনুরোধ-প্রতিক্রিয়া চক্র এবং রিয়েল-টাইম স্ট্রিমিং উভয়ের জন্য অনুমতি দেয় যেখানে এজেন্ট আপডেটগুলি (যেমন "চিন্তা" বা "শিল্প তৈরি") পাঠায়। কাফকা বাস্তবায়ন অ্যাসিঙ্ক্রোনাসভাবে কাজ করে: ক্লায়েন্ট একটি অত্যন্ত টেকসই "অনুরোধ বিষয়"-তে একটি অনুরোধ প্রকাশ করে এবং একটি পৃথক "উত্তর বিষয়" শোনে। সার্ভার যখন সম্ভব বার্তাটি তুলে নেয়, এটি প্রক্রিয়া করে এবং ফলাফলটি আবার পোস্ট করে, যার অর্থ দুজনে কখনও একে অপরের সাথে সরাসরি কথা বলে না।
গতি, জটিলতা এবং স্থায়িত্বের জন্য আপনার নির্দিষ্ট প্রয়োজনীয়তার উপর নির্ভর করে পছন্দটি নির্ভর করে। HTTP শুরু করা এবং ডিবাগ করা সবচেয়ে সহজ, যা এটিকে সহজ ইন্টিগ্রেশনের জন্য নিখুঁত করে তোলে। অভ্যন্তরীণ পরিষেবা-থেকে-পরিষেবা যোগাযোগের জন্য gRPC হল সর্বোত্তম পছন্দ যেখানে কম লেটেন্সি এবং স্ট্রিমিং টাস্ক আপডেটগুলি গুরুত্বপূর্ণ। যাইহোক, কাফকা স্থিতিস্থাপক পছন্দ হিসাবে আলাদা, কারণ এটি একটি সারিতে ডিস্কে অনুরোধগুলি সংরক্ষণ করে, এজেন্ট সার্ভার ক্র্যাশ বা পুনরায় চালু হলেও আপনার কাজগুলি টিকে থাকে, স্থায়িত্বের একটি স্তর প্রদান করে এবং HTTP বা gRPC কেউই অফার করতে পারে না এমন ডিকাপলিং প্রদান করে।
কাস্টম পরিবহন স্তর: কাফকা
কাফকা অ্যাসিঙ্ক্রোনাস ব্যাকবোন হিসেবে কাজ করে যা অপারেশনের মস্তিষ্ককে (ফর্মেশন এজেন্ট) ভৌত নিয়ন্ত্রণ (স্যাটেলাইট স্টেশন) থেকে আলাদা করে। এজেন্ট জটিল ভেক্টর গণনা করার সময় সিস্টেমকে একটি সিঙ্ক্রোনাস সংযোগে অপেক্ষা করতে বাধ্য করার পরিবর্তে, এজেন্ট তার ফলাফলগুলি কাফকা বিষয়ের ইভেন্ট হিসাবে প্রকাশ করে। এটি একটি স্থায়ী বাফার হিসেবে কাজ করে, যা স্যাটেলাইটকে তার নিজস্ব গতিতে নির্দেশাবলী গ্রহণ করতে দেয় এবং নিশ্চিত করে যে গঠন ডেটা কখনই হারিয়ে না যায়, এমনকি উল্লেখযোগ্য নেটওয়ার্ক ল্যাটেন্সি বা অস্থায়ী সিস্টেম ক্র্যাশের পরেও।
কাফকা ব্যবহার করে, আপনি একটি ধীর, রৈখিক প্রক্রিয়াকে একটি স্থিতিস্থাপক, স্ট্রিমিং পাইপলাইনে রূপান্তরিত করেন যেখানে নির্দেশাবলী এবং টেলিমেট্রি স্বাধীনভাবে প্রবাহিত হয়, তীব্র AI প্রক্রিয়াকরণের সময়ও মিশনের HUD প্রতিক্রিয়াশীল রাখে।

কাফকা কী?
কাফকা একটি বিতরণকৃত ইভেন্ট-স্ট্রিমিং প্ল্যাটফর্ম। ইভেন্ট-চালিত আর্কিটেকচারে (EDA):
- প্রযোজকরা "বিষয়"-তে বার্তা প্রকাশ করেন।
- গ্রাহকরা সেই বিষয়গুলিতে সাবস্ক্রাইব করেন এবং কোনও বার্তা এলে প্রতিক্রিয়া জানান।
কেন কাফকা ব্যবহার করবেন?
এটি আপনার সিস্টেমগুলিকে ডিকপল করে। ফর্মেশন এজেন্ট স্বায়ত্তশাসিতভাবে কাজ করে, প্রেরকের পরিচয় বা স্থিতি জানার প্রয়োজন ছাড়াই আগত অনুরোধের জন্য অপেক্ষা করে। এটি দায়িত্ব ডিকপল করে, নিশ্চিত করে যে স্যাটেলাইট অফলাইনে গেলেও, কর্মপ্রবাহ অক্ষত থাকে; কাফকা কেবল বার্তাগুলি সংরক্ষণ করে যতক্ষণ না স্যাটেলাইট পুনরায় সংযোগ স্থাপন করে।
গুগল ক্লাউড পাব/সাব সম্পর্কে কী বলা যায়?
আপনি এর জন্য গুগল ক্লাউড পাব/সাব ব্যবহার করতে পারেন! পাব/সাব হল গুগলের সার্ভারলেস মেসেজিং পরিষেবা। যদিও কাফকা উচ্চ-থ্রুপুট এবং "রিপ্লেযোগ্য" স্ট্রিমগুলির জন্য দুর্দান্ত, পাব/সাব প্রায়শই ব্যবহারের সহজতার জন্য পছন্দ করা হয়। এই ল্যাবের জন্য, আমরা একটি শক্তিশালী, স্থায়ী বার্তা বাস অনুকরণ করতে কাফকা ব্যবহার করছি।
স্থানীয় কাফকা ক্লাস্টার শুরু করুন
নিচের সম্পূর্ণ কমান্ড ব্লকটি কপি করে আপনার টার্মিনালে পেস্ট করুন। এটি অফিসিয়াল কাফকা ইমেজটি ডাউনলোড করবে এবং এটি ব্যাকগ্রাউন্ডে চালু করবে।
👉💻 আপনার টার্মিনালে এই কমান্ডগুলি কার্যকর করুন:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 পরীক্ষা করুন যে কন্টেইনারটি docker ps কমান্ড দিয়ে চলছে কিনা।
docker ps
👀 আপনার একটি আউটপুট দেখা উচিত যা নিশ্চিত করবে যে mission-kafka কন্টেইনারটি চলছে এবং পোর্ট 9092 উন্মুক্ত।
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
কাফকা টপিক কী?
কাফকার বিষয়বস্তুকে বার্তার জন্য একটি নিবেদিত চ্যানেল বা বিভাগ হিসেবে ভাবুন। এটি একটি লগবুকের মতো যেখানে ইভেন্ট রেকর্ডগুলি তৈরি করা ক্রমানুসারে সংরক্ষণ করা হয়। প্রযোজকরা নির্দিষ্ট বিষয়গুলিতে বার্তা লেখেন এবং গ্রাহকরা সেই বিষয়গুলি থেকে বার্তা পড়েন। এটি প্রেরক এবং প্রাপককে আলাদা করে দেয়; প্রযোজককে জানতে হবে না যে কোন গ্রাহক ডেটা পড়বে, তাকে কেবল এটি সঠিক "চ্যানেল" এ পাঠাতে হবে। আমাদের লক্ষ্যে, আমরা দুটি বিষয় তৈরি করব: একটি এজেন্টের কাছে গঠনের অনুরোধ পাঠানোর জন্য, এবং অন্যটি এজেন্টের জন্য স্যাটেলাইট পড়ার জন্য তার উত্তর প্রকাশ করার জন্য।

👉💻 চলমান ডকার কন্টেইনারের ভিতরে প্রয়োজনীয় বিষয়গুলি তৈরি করতে নিম্নলিখিত কমান্ডগুলি চালান।
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 আপনার চ্যানেলগুলি খোলা আছে কিনা তা নিশ্চিত করতে, তালিকা কমান্ডটি চালান:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 আপনার তৈরি করা বিষয়গুলির নামগুলি আপনার দেখা উচিত।
a2a-formation-request a2a-reply-satellite-dashboard
আপনার কাফকা ইনস্ট্যান্স এখন সম্পূর্ণরূপে কনফিগার করা হয়েছে এবং মিশন-ক্রিটিকাল ডেটা রুট করার জন্য প্রস্তুত।
কাফকা A2A সার্ভার বাস্তবায়ন
এজেন্ট-টু-এজেন্ট (A2A) প্রোটোকল স্বাধীন এজেন্টিক সিস্টেমগুলির মধ্যে আন্তঃকার্যক্ষমতার জন্য একটি মানসম্মত কাঠামো স্থাপন করে। এটি বিভিন্ন দল দ্বারা তৈরি বা বিভিন্ন অবকাঠামোতে চলমান এজেন্টদের একে অপরকে আবিষ্কার করতে এবং প্রতিটি সংযোগের জন্য বেসপোক ইন্টিগ্রেশন লজিকের প্রয়োজন ছাড়াই কার্যকরভাবে সহযোগিতা করার অনুমতি দেয়।
রেফারেন্স বাস্তবায়ন, a2a-python , এই এজেন্টিক অ্যাপ্লিকেশনগুলি চালানোর জন্য একটি মৌলিক লাইব্রেরি। এর নকশার একটি মূল বৈশিষ্ট্য হল এক্সটেনসিবিলিটি ; এটি যোগাযোগ স্তরকে বিমূর্ত করে, যা ডেভেলপারদের HTTP এর মতো প্রোটোকলগুলিকে অন্যদের জন্য অদলবদল করতে দেয়।

এই প্রকল্পে, আমরা একটি কাস্টম কাফকা বাস্তবায়ন ব্যবহার করে এই এক্সটেনসিবিলিটি ব্যবহার করব: a2a-python-kafka । আমরা এই বাস্তবায়নটি ব্যবহার করে দেখাব কিভাবে A2A স্ট্যান্ডার্ড আপনাকে বিভিন্ন স্থাপত্যের চাহিদা পূরণের জন্য এজেন্ট যোগাযোগকে অভিযোজিত করতে দেয় - এই ক্ষেত্রে, একটি অ্যাসিঙ্ক্রোনাস ইভেন্ট বাসের জন্য সিঙ্ক্রোনাস HTTP অদলবদল করা।
গঠন এজেন্টের জন্য A2A সক্ষম করা
আমরা এখন আমাদের এজেন্টকে একটি A2A সার্ভারে অন্তর্ভুক্ত করব, এটিকে একটি আন্তঃপরিচালনযোগ্য পরিষেবাতে রূপান্তরিত করব যা করতে পারে:
- কাফকা বিষয়ের কাজগুলো শুনুন।
- প্রক্রিয়াকরণের জন্য প্রাপ্ত কাজগুলি অন্তর্নিহিত ADK এজেন্টের কাছে হস্তান্তর করুন।
- একটি উত্তর বিষয়ে ফলাফল প্রকাশ করুন।
👉✏️ $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py তে, #REPLACE-CREATE-KAFKA-A2A-SERVER নিম্নলিখিত কোড দিয়ে প্রতিস্থাপন করুন:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
এই কোডটি মূল উপাদানগুলি সেট আপ করে:
- রানার : এজেন্টের রানটাইম প্রদান করে (মেমরি, শংসাপত্র ইত্যাদি পরিচালনা করা)।
- টাস্ক স্টোর : "পেন্ডিং" থেকে "সম্পূর্ণ" এ যাওয়ার সাথে সাথে অনুরোধগুলির অবস্থা ট্র্যাক করে।
- এজেন্ট এক্সিকিউটর : কাফকার কাছ থেকে একটি কাজ নেয় এবং স্থানাঙ্ক গণনা করার জন্য এজেন্টের কাছে পাঠায়।
- কাফকা সার্ভারঅ্যাপ : কাফকা ব্রোকারের সাথে শারীরিক সংযোগ পরিচালনা করে।

পরিবেশ ভেরিয়েবল কনফিগার করুন
ADK সেটআপটি এজেন্টের ফোল্ডারের ভিতরে আপনার Google Vertex AI সেটিংস দিয়ে একটি .env ফাইল তৈরি করেছে। আমাদের এটিকে প্রজেক্ট রুটে স্থানান্তর করতে হবে এবং আমাদের কাফকা ক্লাস্টারের জন্য স্থানাঙ্ক যোগ করতে হবে।
ফাইলটি কপি করতে এবং কাফকা সার্ভার ঠিকানা যুক্ত করতে নিম্নলিখিত কমান্ডগুলি চালান:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
A2A ইন্টারস্টেলার লুপ যাচাই করুন
এখন আমরা নিশ্চিত করব যে অ্যাসিঙ্ক্রোনাস ইভেন্ট লুপটি সঠিকভাবে কাজ করছে একটি লাইভ-ফায়ার পরীক্ষার মাধ্যমে: কাফকা ক্লাস্টারের মাধ্যমে একটি ম্যানুয়াল সিগন্যাল পাঠানো এবং এজেন্টের প্রতিক্রিয়া পর্যবেক্ষণ করা।

একটি ইভেন্টের সম্পূর্ণ জীবনচক্র দেখতে, আমরা তিনটি পৃথক টার্মিনাল ব্যবহার করব।
টার্মিনাল A: গঠন এজেন্ট (A2A কাফকা সার্ভার)
👉💻 এই টার্মিনালটি পাইথন প্রক্রিয়া চালায় যা কাফকার কথা শোনে এবং জ্যামিতিক গণিত করতে জেমিনি ব্যবহার করে।
cd $HOME/way-back-home/level_5
source $HOME/way-back-home/.venv/bin/activate
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
অপেক্ষা করুন যতক্ষণ না আপনি দেখতে পান:
[INFO] Kafka Server App Started. Starting to consume requests...
টার্মিনাল বি: স্যাটেলাইট লিসেনার (গ্রাহক)
👉💻 এই টার্মিনালে, আমরা উত্তরের বিষয় শুনব। এটি নির্দেশাবলীর জন্য অপেক্ষা করা স্যাটেলাইটের অনুকরণ করে।
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
এই টার্মিনালটি নিষ্ক্রিয় দেখাবে। এটি এজেন্টের বার্তা প্রকাশের জন্য অপেক্ষা করছে।
টার্মিনাল সি: কমান্ডারের সিগন্যাল (প্রযোজক)
👉💻 এখন, আমরা a2a-formation-request বিষয়ে একটি কাঁচা A2A-ফর্ম্যাটেড অনুরোধ পাঠাবো। আমাদের অবশ্যই নির্দিষ্ট কাফকা শিরোনাম অন্তর্ভুক্ত করতে হবে যাতে এজেন্ট জানতে পারে উত্তরটি কোথায় পাঠাতে হবে।
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
ফলাফল বিশ্লেষণ করা
👀 যদি লুপটি সফল হয়, তাহলে টার্মিনাল B এ স্যুইচ করুন। একটি বৃহৎ JSON ব্লক তাৎক্ষণিকভাবে প্রদর্শিত হবে। এটি correlation_id:ping-manual-01 প্রেরিত হেডার দিয়ে শুরু হবে। তারপরে একটি task অবজেক্ট আসবে। আপনি যদি সেই JSON এর মধ্যে parts বিভাগটি ঘনিষ্ঠভাবে দেখেন, তাহলে আপনি আপনার 15টি পডের জন্য গণনা করা raw X এবং Y স্থানাঙ্ক Gemini দেখতে পাবেন:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
আপনি এজেন্টটিকে রিসিভার থেকে সফলভাবে আলাদা করে ফেলেছেন। অনুরোধ-প্রতিক্রিয়া ল্যাটেন্সির "আন্তঃনাক্ষত্রিক শব্দ" আর গুরুত্বপূর্ণ নয় কারণ আমাদের সিস্টেম এখন সম্পূর্ণরূপে ইভেন্ট-চালিত ।
এগিয়ে যাওয়ার আগে, নেটওয়ার্ক পোর্ট খালি করতে ব্যাকগ্রাউন্ড প্রক্রিয়াগুলি বন্ধ করুন।
👉💻 প্রতিটি টার্মিনালে (A, B, এবং C):
- চলমান প্রক্রিয়াটি শেষ করতে
Ctrl + Cটিপুন।
৫. স্যাটেলাইট স্টেশন (A2A কাফকা ক্লায়েন্ট এবং SSE)
এই ধাপে, আমরা স্যাটেলাইট স্টেশন তৈরি করি। এটি কাফকা ক্লাস্টার এবং পাইলটের ভিজ্যুয়াল ডিসপ্লে (রিঅ্যাক্ট ফ্রন্টএন্ড) এর মধ্যে সেতুবন্ধন। এই সার্ভারটি কাফকা ক্লায়েন্ট (এজেন্টের সাথে কথা বলার জন্য) এবং একটি SSE স্ট্রিমার (ব্রাউজারের সাথে কথা বলার জন্য) উভয় হিসাবে কাজ করে।
কাফকা ক্লায়েন্ট কী?
কাফকা ক্লাস্টারকে একটি রেডিও স্টেশন হিসেবে ভাবুন। কাফকা ক্লায়েন্ট হলো রেডিও রিসিভার। KafkaClientTransport আমাদের অ্যাপ্লিকেশনকে নিম্নলিখিত বিষয়গুলি করতে সাহায্য করে:
- একটি বার্তা তৈরি করুন : এজেন্টকে একটি "টাস্ক" (যেমন, "তারকা গঠন") পাঠান।
- উত্তর গ্রহণ করুন : এজেন্টের কাছ থেকে স্থানাঙ্কগুলি ফিরে পেতে একটি নির্দিষ্ট "উত্তর বিষয়" শুনুন।
১. সংযোগ শুরু করা
সার্ভার বুট হওয়ার সাথে সাথে কাফকা সংযোগ শুরু হয় এবং বন্ধ হয়ে গেলে পরিষ্কারভাবে বন্ধ হয়ে যায় তা নিশ্চিত করতে আমরা FastAPI এর lifespan ইভেন্ট হ্যান্ডলার ব্যবহার করি।
👉✏️ $HOME/way-back-home/level_5/satellite/main.py তে, #REPLACE-CONNECT-TO-KAFKA-CLUSTER কে নিম্নলিখিত কোড দিয়ে প্রতিস্থাপন করুন:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
2. একটি কমান্ড পাঠানো
যখন আপনি ড্যাশবোর্ডের একটি বোতামে ক্লিক করেন, তখন /formation এন্ডপয়েন্টটি ট্রিগার হয়। এটি একটি প্রযোজক হিসাবে কাজ করে, আপনার অনুরোধটিকে একটি আনুষ্ঠানিক A2A Message মোড়ানো এবং এজেন্টের কাছে পাঠানো।

মূল যুক্তি:
- অ্যাসিঙ্ক্রোনাস কমিউনিকেশন :
kafka_transport.send_messageঅনুরোধটি পাঠায় এবংreply_topicএ নতুন স্থানাঙ্ক আসার জন্য অপেক্ষা করে। - রেসপন্স পার্সিং : জেমিনি মার্কডাউন ব্লকের ভিতরে স্থানাঙ্ক ফেরত দিতে পারে (যেমন,
json ...)। নিচের কোডটি সেগুলিকে বাদ দেয় এবং স্ট্রিংটিকে পয়েন্টের একটি পাইথন তালিকায় রূপান্তর করে।
👉✏️ $HOME/way-back-home/level_5/satellite/main.py তে, #REPLACE-FORMATION-REQUEST নিম্নলিখিত কোড দিয়ে প্রতিস্থাপন করুন:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
সার্ভার-প্রেরিত ইভেন্ট (SSE)
স্ট্যান্ডার্ড API গুলি "Request-Response" মডেল ব্যবহার করে। আমাদের HUD এর জন্য, আমাদের পড পজিশনের একটি "লাইভ স্ট্রিম" প্রয়োজন।
কেন SSE ওয়েবসকেটের (যা দ্বিমুখী এবং আরও জটিল) বিপরীতে, SSE সার্ভার থেকে ব্রাউজারে একটি সহজ, একমুখী ডেটা স্ট্রিম প্রদান করে। এটি ড্যাশবোর্ড, স্টক টিকার বা ইন্টারস্টেলার টেলিমেট্রির জন্য উপযুক্ত।

আমাদের কোডে এটি কীভাবে কাজ করে: আমরা একটি event_generator তৈরি করি, একটি অন্তহীন লুপ যা প্রতি আধ সেকেন্ডে সমস্ত 15টি পডের বর্তমান অবস্থান নেয় এবং আপডেট হিসাবে ব্রাউজারে "পুশ" করে।
👉✏️ $HOME/way-back-home/level_5/satellite/main.py তে, #REPLACE-SSE-STREAM নিম্নলিখিত কোড দিয়ে প্রতিস্থাপন করুন:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
সম্পূর্ণ মিশন লুপটি কার্যকর করুন
চূড়ান্ত UI চালু করার আগে সিস্টেমটি এন্ড-টু-এন্ড কাজ করছে কিনা তা যাচাই করা যাক। আমরা ম্যানুয়ালি এজেন্টটি ট্রিগার করব এবং তারে কাঁচা ডেটা পেলোড দেখব।

তিনটি পৃথক টার্মিনাল ট্যাব খুলুন।
টার্মিনাল A: গঠন এজেন্ট (A2A সার্ভার)
👉💻 এটি হল ADK এজেন্ট যে কাজগুলি শোনে এবং জ্যামিতিক গণিত সম্পাদন করে।
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
টার্মিনাল বি: স্যাটেলাইট স্টেশন (কাফকা ক্লায়েন্ট)
👉💻 এই FastAPI সার্ভারটি "রিসিভার" হিসেবে কাজ করে, কাফকার উত্তর শোনে এবং সেগুলিকে একটি লাইভ SSE স্ট্রীমে পরিণত করে।
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
টার্মিনাল সি: ম্যানুয়াল এইচইউডি
গঠন কমান্ড পাঠান (ট্রিগার): 👉💻 একই টার্মিনাল C-তে, গঠন প্রক্রিয়াটি ট্রিগার করুন:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 তোমার নতুন স্থানাঙ্কগুলো দেখা উচিত।
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
এটি নিশ্চিত করে যে স্যাটেলাইটটি তার অভ্যন্তরীণ পড স্থানাঙ্ক আপডেট করেছে।
👉💻 আমরা প্রথমে লাইভ টেলিমেট্রি স্ট্রিম শুনতে curl ব্যবহার করব এবং তারপর একটি গঠন পরিবর্তন ট্রিগার করব।
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 আপনার curl -N কমান্ডের আউটপুট দেখুন। pod_update ইভেন্টগুলিতে x এবং y স্থানাঙ্কগুলি তারা গঠনের নতুন অবস্থানগুলি প্রতিফলিত করতে শুরু করবে।
এগিয়ে যাওয়ার আগে, যোগাযোগ পোর্টগুলি খালি করতে সমস্ত চলমান প্রক্রিয়া বন্ধ করুন।
প্রতিটি টার্মিনালে (A, B, C, এবং ট্রিগার টার্মিনাল): Ctrl + C টিপুন।
৬. উদ্ধারে যান!
আপনি সফলভাবে সিস্টেমটি প্রতিষ্ঠা করেছেন। এখন, মিশনটিকে বাস্তবায়িত করার সময়। আমরা এখন রিঅ্যাক্ট-ভিত্তিক হেড-আপ ডিসপ্লে (HUD) চালু করব। এই ড্যাশবোর্ডটি SSE এর মাধ্যমে স্যাটেলাইট স্টেশনের সাথে সংযুক্ত, যা আপনাকে রিয়েল-টাইমে 15টি পড কল্পনা করতে দেয়।

যখন আপনি একটি কমান্ড জারি করেন, তখন আপনি কেবল একটি ফাংশন কল করছেন না; আপনি এমন একটি ইভেন্ট ট্রিগার করছেন যা কাফকার মধ্য দিয়ে ভ্রমণ করে, একটি AI এজেন্ট দ্বারা প্রক্রিয়া করা হয় এবং লাইভ টেলিমেট্রি হিসাবে আপনার স্ক্রিনে ফিরে আসে।

দুটি পৃথক টার্মিনাল ট্যাব খুলুন।
টার্মিনাল A: গঠন এজেন্ট (A2A সার্ভার)
👉💻 এটি হল ADK এজেন্ট যেটি কাজ শোনে এবং জেমিনি ব্যবহার করে জ্যামিতিক গণিত সম্পাদন করে। টার্মিনাল রানে:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
টার্মিনাল বি: স্যাটেলাইট স্টেশন এবং ভিজ্যুয়াল ড্যাশবোর্ড
👉💻 প্রথমে, ফ্রন্টএন্ড অ্যাপ্লিকেশন তৈরি করুন।
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 এখন, FastAPI সার্ভারটি শুরু করুন, যা ব্যাকএন্ড লজিক এবং ফ্রন্টএন্ড UI উভয়ই পরিবেশন করবে।
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
চালু করুন এবং যাচাই করুন
- 👉 প্রিভিউ খুলুন : ক্লাউড শেল টুলবারে, ওয়েব প্রিভিউ আইকনে ক্লিক করুন। চেঞ্জ পোর্ট নির্বাচন করুন, এটি 8000 এ সেট করুন, এবং চেঞ্জ অ্যান্ড প্রিভিউ ক্লিক করুন। আপনার Starfield HUD দেখানো একটি নতুন ব্রাউজার ট্যাব খুলবে।

- 👉 টেলিমেট্রি স্ট্রিম যাচাই করুন :
- UI লোড হয়ে গেলে, আপনি এলোমেলোভাবে ১৫টি পড দেখতে পাবেন।
- যদি পডগুলি সূক্ষ্মভাবে স্পন্দিত হয় বা "ঝাঁকুনি দেয়", তাহলে আপনার SSE স্ট্রিম সক্রিয় থাকে এবং স্যাটেলাইট স্টেশন সফলভাবে তাদের অবস্থান সম্প্রচার করছে।

- 👉 একটি গঠন শুরু করুন : ড্যাশবোর্ডে "স্টার" বোতামে ক্লিক করুন।

- 👀 ইভেন্ট লুপ ট্রেস করুন : আপনার টার্মিনালগুলি দেখুন যাতে স্থাপত্যটি কার্যকর হয়:
- টার্মিনাল B (স্যাটেলাইট স্টেশন) লগ করবে:
Sending A2A Message: 'Create a STAR formation'। - টার্মিনাল A (গঠন এজেন্ট) মিথুন রাশির সাথে পরামর্শ করার সময় কার্যকলাপ দেখাবে।
- টার্মিনাল B (স্যাটেলাইট স্টেশন)
Received A2A Responseলগ করবে এবং স্থানাঙ্কগুলি বিশ্লেষণ করবে।
- টার্মিনাল B (স্যাটেলাইট স্টেশন) লগ করবে:
- 👀 ভিজ্যুয়াল কনফার্মেশন : আপনার ড্যাশবোর্ডের ১৫টি পডকে তাদের এলোমেলো অবস্থান থেকে ৫-পয়েন্টেড তারা গঠনে মসৃণভাবে পিছলে যেতে দেখুন।
- 👉 পরীক্ষা :
- ৩টি ভিন্ন গঠনের জন্য, "X" , অথবা "LINE" চেষ্টা করুন।

- কাস্টম ইন্টেন্ট : "হার্ট" বা "ট্রায়াঙ্গেল" এর মতো অনন্য কিছু টাইপ করতে ম্যানুয়াল ইনপুট ব্যবহার করুন।

- যেহেতু আপনি GenAI ব্যবহার করছেন, এজেন্ট আপনার বর্ণনা করা যেকোনো জ্যামিতিক আকৃতির জন্য গণিত গণনা করার চেষ্টা করবে!
- ৩টি ভিন্ন গঠনের জন্য, "X" , অথবা "LINE" চেষ্টা করুন।
৩টি প্যাটার্ন তৈরি করার পর, আপনি সফলভাবে সংযোগটি পুনরায় স্থাপন করেছেন। 
মিশন সম্পন্ন!
শব্দের মধ্য দিয়ে তথ্য কোনও বাধা ছাড়াই প্রবাহিত হওয়ার সাথে সাথে প্রবাহটি স্থিতিশীল হয়। আপনার নির্দেশে, ১৫টি প্রাচীন পড তারা জুড়ে তাদের সমন্বিত নৃত্য শুরু করে।

তিনটি কঠিন ক্যালিব্রেশন পর্যায়ের মধ্য দিয়ে, আপনি টেলিমেট্রি লকটিকে তার জায়গায় স্থাপন করতে দেখেছেন। প্রতিটি সারিবদ্ধকরণের সাথে সাথে, সংকেতটি আরও শক্তিশালী হয়ে ওঠে, অবশেষে আশার আলোর মতো আন্তঃনাক্ষত্রিক হস্তক্ষেপকে ভেদ করে।
আপনার এবং ইভেন্ট-ড্রিভেন এজেন্টের দক্ষতার সাথে বাস্তবায়নের জন্য ধন্যবাদ, পাঁচজন জীবিত ব্যক্তিকে X-42 এর পৃষ্ঠ থেকে বিমানে তুলে আনা হয়েছে এবং এখন উদ্ধারকারী জাহাজে নিরাপদে আছেন। আপনার জন্য ধন্যবাদ, পাঁচটি জীবন রক্ষা করা হয়েছে ।
যদি আপনি লেভেল ০ তে অংশগ্রহণ করে থাকেন, তাহলে "ওয়ে ব্যাক হোম" মিশনে আপনার অগ্রগতি কোথায় তা পরীক্ষা করে দেখতে ভুলবেন না! আপনার তারকাদের দিকে ফিরে যাওয়ার যাত্রা অব্যাহত থাকবে। 