১. মিশন

আপনি এক অনাবিষ্কৃত অঞ্চলের নিস্তব্ধতায় ভেসে চলেছেন। একটি প্রচণ্ড সৌর স্পন্দন আপনার মহাকাশযানটিকে একটি ফাটলের মধ্য দিয়ে ছিঁড়ে ফেলেছে, ফলে আপনি মহাবিশ্বের এমন এক প্রকোষ্ঠে আটকা পড়েছেন যার কোনো নক্ষত্র-মানচিত্রে অস্তিত্ব নেই।
কয়েক দিনের ক্লান্তিকর মেরামতের পর, অবশেষে আপনি আপনার পায়ের নিচে ইঞ্জিনের গুঞ্জন অনুভব করছেন। আপনার রকেটশিপটি ঠিক হয়ে গেছে। এমনকি আপনি মাদারশিপের সাথে একটি দূরপাল্লার সংযোগও স্থাপন করতে পেরেছেন। আপনি যাত্রার জন্য প্রস্তুত। আপনি বাড়ি ফেরার জন্য তৈরি।
কিন্তু আপনি যখন জাম্প ড্রাইভ চালু করার প্রস্তুতি নিচ্ছেন, ঠিক তখনই স্ট্যাটিক ভেদ করে একটি বিপদ সংকেত ভেসে আসে। আপনার সেন্সরগুলো একটি সাহায্যের সংকেত শনাক্ত করে। এক্স-৪২ গ্রহের পৃষ্ঠে পাঁচজন বেসামরিক নাগরিক আটকা পড়েছেন। তাদের পালানোর একমাত্র আশা ১৫টি প্রাচীন পডের উপর নির্ভর করছে, যেগুলোকে কক্ষপথে থাকা তাদের মাদারশিপে একটি বিপদ সংকেত পাঠানোর জন্য সমন্বিত করতে হবে।
তবে, পডগুলো একটি স্যাটেলাইট স্টেশন দ্বারা নিয়ন্ত্রিত হয়, যার প্রধান নেভিগেশন কম্পিউটারটি ক্ষতিগ্রস্ত। পডগুলো উদ্দেশ্যহীনভাবে ভেসে বেড়াচ্ছে। আমরা স্যাটেলাইটটির সাথে একটি ব্যাকডোর সংযোগ স্থাপন করতে সক্ষম হয়েছি, কিন্তু আপলিঙ্কটি তীব্র আন্তঃনাক্ষত্রিক প্রতিবন্ধকতার কারণে জর্জরিত, যা অনুরোধ-প্রতিক্রিয়া চক্রে ব্যাপক বিলম্ব ঘটাচ্ছে।
চ্যালেঞ্জ
যেহেতু রিকোয়েস্ট/রেসপন্স মডেল অত্যন্ত ধীরগতির, তাই কোলাহলের মধ্য দিয়ে টেলিমেট্রি প্রবাহ নিশ্চিত করতে আমাদের সার্ভার-সেন্ট ইভেন্টস (SSE) সহ একটি ইভেন্ট-ড্রাইভেন আর্কিটেকচার (EDA) স্থাপন করতে হবে।

আপনাকে একটি কাস্টম এজেন্ট তৈরি করতে হবে যা পডগুলোকে নির্দিষ্ট সিগন্যাল-বর্ধক বিন্যাসে (বৃত্ত, তারকা, রেখা) আনতে প্রয়োজনীয় জটিল ভেক্টর গণিত গণনা করতে পারে। আপনাকে অবশ্যই এই এজেন্টটিকে স্যাটেলাইটের নতুন আর্কিটেকচারের সাথে সংযুক্ত করতে হবে।
আপনি যা তৈরি করবেন

- রিয়েল-টাইমে ১৫টি পডের একটি ফ্লিটকে দেখা ও নিয়ন্ত্রণ করার জন্য একটি রিয়্যাক্ট-ভিত্তিক হেডস-আপ ডিসপ্লে (HUD)।
- গুগল এজেন্ট ডেভেলপমেন্ট কিট (ADK) ব্যবহার করে তৈরি একটি জেনারেটিভ এআই এজেন্ট , যা স্বাভাবিক ভাষার কমান্ডের উপর ভিত্তি করে পডগুলোর জন্য জটিল জ্যামিতিক বিন্যাস গণনা করে।
- একটি পাইথন-ভিত্তিক স্যাটেলাইট স্টেশন ব্যাকএন্ড যা কেন্দ্রীয় হাব হিসেবে কাজ করে এবং সার্ভার-সেন্ট ইভেন্টস (SSE)-এর মাধ্যমে ফ্রন্টএন্ডের সাথে যোগাযোগ করে।
- অ্যাপাচি কাফকা ব্যবহার করে একটি ইভেন্ট-ড্রাইভেন আর্কিটেকচার, যা এআই এজেন্টকে স্যাটেলাইট কন্ট্রোল সিস্টেম থেকে বিচ্ছিন্ন করে স্থিতিস্থাপক এবং অ্যাসিঙ্ক্রোনাস যোগাযোগ সক্ষম করে।
আপনি যা শিখবেন
প্রযুক্তি / ধারণা | বর্ণনা |
গুগল এডিকে (এজেন্ট ডেভেলপমেন্ট কিট) | আপনি এই ফ্রেমওয়ার্কটি ব্যবহার করে জেমিনি মডেল দ্বারা চালিত একটি বিশেষায়িত এআই এজেন্ট তৈরি, পরীক্ষা এবং কাঠামোবদ্ধ করবেন। |
ইভেন্ট-চালিত স্থাপত্য (ইডিএ) | আপনি একটি ডিকাপলড সিস্টেম তৈরির মূলনীতি শিখবেন, যেখানে কম্পোনেন্টগুলো ইভেন্টের মাধ্যমে অ্যাসিঙ্ক্রোনাসভাবে যোগাযোগ করে, যা অ্যাপ্লিকেশনটিকে আরও স্থিতিস্থাপক এবং স্কেলেবল করে তোলে। |
অ্যাপাচি কাফকা | আপনি বিভিন্ন মাইক্রোসার্ভিসের মধ্যে কমান্ড এবং ডেটার প্রবাহ পরিচালনা করার জন্য একটি ডিস্ট্রিবিউটেড ইভেন্ট স্ট্রিমিং প্ল্যাটফর্ম হিসেবে কাফকা সেট আপ ও ব্যবহার করবেন। |
সার্ভার-প্রেরিত ইভেন্ট (SSE) | আপনি একটি FastAPI ব্যাকএন্ডে SSE প্রয়োগ করবেন, যা সার্ভার থেকে রিয়েল-টাইম টেলিমেট্রি ডেটা React ফ্রন্টএন্ডে পাঠাবে এবং UI-কে ক্রমাগত আপডেট রাখবে। |
এ২এ (এজেন্ট-টু-এজেন্ট) প্রোটোকল | আপনি শিখবেন কীভাবে আপনার এজেন্টকে একটি A2A সার্ভারে আবদ্ধ করতে হয়, যা একটি বৃহত্তর এজেন্টিক ইকোসিস্টেমের মধ্যে প্রমিত যোগাযোগ এবং আন্তঃকার্যক্ষমতা সক্ষম করে। |
ফাস্টএপিআই | আপনি এই উচ্চ-ক্ষমতাসম্পন্ন পাইথন ওয়েব ফ্রেমওয়ার্কটি ব্যবহার করে মূল ব্যাকএন্ড সার্ভিস, স্যাটেলাইট স্টেশন, তৈরি করবেন। |
প্রতিক্রিয়া | আপনি একটি আধুনিক ফ্রন্টএন্ড অ্যাপ্লিকেশন নিয়ে কাজ করবেন, যা একটি ডায়নামিক ও ইন্টারেক্টিভ ইউজার ইন্টারফেস তৈরি করার জন্য একটি SSE স্ট্রিমে সাবস্ক্রাইব করে। |
সিস্টেম নিয়ন্ত্রণে জেনারেটিভ এআই | আপনি দেখতে পাবেন কিভাবে একটি বৃহৎ ভাষা মডেলকে (LLM) শুধুমাত্র কথোপকথনমূলক আলাপচারিতার পরিবর্তে নির্দিষ্ট, তথ্য-ভিত্তিক কাজ (যেমন স্থানাঙ্ক তৈরি) সম্পাদন করার জন্য নির্দেশ দেওয়া যেতে পারে। |
২. আপনার পরিবেশ প্রস্তুত করুন
ক্লাউড শেল অ্যাক্সেস করুন
👉Google Cloud কনসোলের উপরে থাকা Activate Cloud Shell-এ ক্লিক করুন (এটি Cloud Shell পেনের উপরে থাকা টার্মিনাল আকৃতির আইকন), 
👉'ওপেন এডিটর' বোতামটিতে (এটি দেখতে পেন্সিলসহ একটি খোলা ফোল্ডারের মতো) ক্লিক করুন। এটি উইন্ডোতে ক্লাউড শেল কোড এডিটর খুলে দেবে। আপনি বাম দিকে একটি ফাইল এক্সপ্লোরার দেখতে পাবেন। 
👉ক্লাউড IDE-তে টার্মিনালটি খুলুন,

👉💻 টার্মিনালে, নিম্নলিখিত কমান্ডটি ব্যবহার করে যাচাই করুন যে আপনি ইতিমধ্যেই প্রমাণীকৃত এবং প্রজেক্টটি আপনার প্রজেক্ট আইডিতে সেট করা আছে:
gcloud auth list
আপনার অ্যাকাউন্টটি (ACTIVE) হিসেবে তালিকাভুক্ত দেখতে পাবেন।
পূর্বশর্ত
ℹ️ লেভেল ০ ঐচ্ছিক (তবে সুপারিশকৃত)
আপনি লেভেল ০ ছাড়াও এই মিশনটি সম্পন্ন করতে পারেন, কিন্তু প্রথমে এটি শেষ করলে আরও বেশি বাস্তবসম্মত অভিজ্ঞতা পাওয়া যায়, যার ফলে আপনি অগ্রগতির সাথে সাথে গ্লোবাল ম্যাপে আপনার বীকনটি জ্বলতে দেখতে পাবেন।
প্রকল্পের পরিবেশ সেটআপ করুন
আপনার টার্মিনালে ফিরে এসে, সক্রিয় প্রজেক্ট সেট করে এবং প্রয়োজনীয় গুগল ক্লাউড পরিষেবাগুলি (ক্লাউড রান, ভার্টেক্স এআই, ইত্যাদি) সক্রিয় করে কনফিগারেশনটি চূড়ান্ত করুন।
👉💻 আপনার টার্মিনালে প্রজেক্ট আইডি সেট করুন:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 প্রয়োজনীয় পরিষেবাগুলি সক্রিয় করুন:
gcloud services enable compute.googleapis.com \
artifactregistry.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
iam.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
নির্ভরতা ইনস্টল করুন
👉💻 লেভেল ৫-এ যান এবং প্রয়োজনীয় পাইথন প্যাকেজগুলো ইনস্টল করুন:
cd $HOME/way-back-home/level_5
uv sync
মূল নির্ভরশীলতাগুলো হলো:
প্যাকেজ | উদ্দেশ্য |
| স্যাটেলাইট স্টেশন এবং এসএসই স্ট্রিমিংয়ের জন্য উচ্চ-কর্মক্ষমতাসম্পন্ন ওয়েব ফ্রেমওয়ার্ক |
| FastAPI অ্যাপ্লিকেশনটি চালানোর জন্য ASGI সার্ভার প্রয়োজন। |
| ফর্মেশন এজেন্ট তৈরি করতে ব্যবহৃত এজেন্ট ডেভেলপমেন্ট কিট |
| মানসম্মত যোগাযোগের জন্য এজেন্ট-টু-এজেন্ট প্রোটোকল লাইব্রেরি |
| ইভেন্ট লুপের জন্য অ্যাসিঙ্ক্রোনাস কাফকা ক্লায়েন্ট |
| জেমিনি মডেল অ্যাক্সেস করার জন্য নেটিভ ক্লায়েন্ট |
| সিমুলেশনের জন্য ভেক্টর গণিত এবং স্থানাঙ্ক গণনা |
| রিয়েল-টাইম দ্বিমুখী যোগাযোগের জন্য সমর্থন |
| পরিবেশ ভেরিয়েবল এবং কনফিগারেশন গোপনীয়তা পরিচালনা করে |
| সার্ভার-প্রেরিত ইভেন্ট (SSE) এর কার্যকর পরিচালনা |
| বাহ্যিক এপিআই কলের জন্য সহজ HTTP লাইব্রেরি |
সেটআপ যাচাই করুন
কোড শুরু করার আগে, চলুন নিশ্চিত করে নিই যে সবকিছু ঠিকঠাক আছে। আপনার গুগল ক্লাউড প্রজেক্ট, এপিআই (API), এবং পাইথন ডিপেন্ডেন্সিগুলো নিরীক্ষা করতে ভেরিফিকেশন স্ক্রিপ্টটি চালান।
👉💻 ভেরিফিকেশন স্ক্রিপ্টটি চালান:
cd $HOME/way-back-home/level_5/scripts
chmod +x verify_setup.sh
. verify_setup.sh
👀 আপনি পরপর কয়েকটি সবুজ টিক চিহ্ন (✅) দেখতে পাবেন।
- যদি আপনি লাল ক্রস (❌) চিহ্ন দেখতে পান, তাহলে আউটপুটে দেওয়া সমাধান কমান্ডগুলো অনুসরণ করুন (যেমন,
gcloud services enable ...অথবাpip install ...)। - দ্রষ্টব্য:
.envজন্য একটি হলুদ সতর্কবার্তা আপাতত গ্রহণযোগ্য; আমরা পরবর্তী ধাপে ফাইলটি তৈরি করব।
🚀 Verifying Mission Charlie (Level 5) Infrastructure... ✅ Google Cloud Project: xxxxxx ✅ Cloud APIs: Active ✅ Python Environment: Ready 🎉 SYSTEMS ONLINE. READY FOR MISSION.
৩. এলএলএম ব্যবহার করে পড পজিশন বিন্যাস করা
আমাদের উদ্ধার অভিযানের 'মস্তিষ্ক' তৈরি করতে হবে। এটি হবে গুগল এডিকে (এজেন্ট ডেভেলপমেন্ট কিট) ব্যবহার করে তৈরি একটি এজেন্ট। এর একমাত্র উদ্দেশ্য হলো একটি বিশেষায়িত জ্যামিতিক নেভিগেটর হিসেবে কাজ করা। সাধারণ এলএলএম-রা যেখানে কথা বলতে ভালোবাসে, সেখানে গভীর মহাকাশে আমাদের সংলাপ নয়, ডেটা প্রয়োজন। আমরা এই এজেন্টটিকে এমনভাবে প্রোগ্রাম করব যাতে এটি 'Star'-এর মতো একটি কমান্ড গ্রহণ করে আমাদের ১৫টি পডের জন্য সরাসরি JSON স্থানাঙ্ক ফেরত দেয়।

এজেন্টকে কাঠামোবদ্ধ করুন
👉💻 আপনার এজেন্ট ডিরেক্টরিতে যেতে এবং ADK তৈরির উইজার্ড শুরু করতে নিম্নলিখিত কমান্ডগুলি চালান:
cd $HOME/way-back-home/level_5/agent
uv run adk create formation
CLI একটি ইন্টারেক্টিভ সেটআপ উইজার্ড চালু করবে। আপনার এজেন্ট কনফিগার করতে নিম্নলিখিত প্রতিক্রিয়াগুলি ব্যবহার করুন:
- একটি মডেল বেছে নিন : অপশন ১ (জেমিনি ফ্ল্যাশ) নির্বাচন করুন।
- দ্রষ্টব্য: নির্দিষ্ট সংস্করণটি ভিন্ন হতে পারে। দ্রুততার জন্য সর্বদা 'ফ্ল্যাশ' সংস্করণটি বেছে নিন।
- একটি ব্যাকএন্ড বেছে নিন : অপশন ২ (ভার্টেক্স এআই) নির্বাচন করুন।
- গুগল ক্লাউড প্রজেক্ট আইডি লিখুন : ডিফল্টটি (আপনার পরিবেশ থেকে শনাক্তকৃত) গ্রহণ করতে এন্টার চাপুন।
- গুগল ক্লাউড অঞ্চল লিখুন : ডিফল্ট (
us-central1) গ্রহণ করতে এন্টার চাপুন।
👀 আপনার টার্মিনাল ইন্টারঅ্যাকশনটি দেখতে এইরকম হওয়া উচিত:
(way-back-home) user@cloudshell:~/way-back-home/level_5/agent$ adk create formation Choose a model for the root agent: 1. gemini-2.5-flash 2. Other models (fill later) Choose model (1, 2): 1 1. Google AI 2. Vertex AI Choose a backend (1, 2): 2 You need an existing Google Cloud account and project... Enter Google Cloud project ID [your-project-id]: <PRESS ENTER> Enter Google Cloud region [us-central1]: <PRESS ENTER> Agent created in /home/user/way-back-home/level_5/agent/formation: - .env - __init__.py - agent.py
আপনি একটি Agent created সফলতার বার্তা' দেখতে পাবেন। এটি সেই কাঠামো কোডটি তৈরি করে যা আমরা এখন পরিবর্তন করব।
👉✏️ আপনার এডিটরে নতুন তৈরি করা $HOME/way-back-home/level_5/agent/formation/agent.py ফাইলটি খুলুন। ফাইলটির সম্পূর্ণ বিষয়বস্তু নিচের কোড দিয়ে প্রতিস্থাপন করুন। এটি এজেন্টের নাম আপডেট করে এবং এর কঠোর অপারেশনাল প্যারামিটার প্রদান করে।
import os
from google.adk.agents import Agent
root_agent = Agent(
name="formation_agent",
model="gemini-2.5-flash",
instruction="""
You are the **Formation Controller AI**.
Your strict objective is to calculate X,Y coordinates for a fleet of **15 Drones** based on a requested geometric shape.
### FIELD SPECIFICATIONS
- **Canvas Size**: 800px (width) x 600px (height).
- **Safe Margin**: Keep pods at least 50px away from edges (x: 50-750, y: 50-550).
- **Center Point**: x=400, y=300 (Use this as the origin for shapes).
- **Top Menu Avoidance**: Do NOT place pods in the top 100px (y < 100) to avoid UI overlap.
### FORMATION RULES
When given a formation name, output coordinates for exactly 15 pods (IDs 0-14).
1. **CIRCLE**: Evenly spaced around a center point (R=200).
2. **STAR**: 5 points or a star-like distribution.
3. **X**: A large X crossing the screen.
4. **LINE**: A horizontal line across the middle.
5. **PARABOLA**: A U-shape opening UPWARDS. Center it at y=400, opening up to y=100. IMPORTANT: Lowest point must be at bottom (high Y value), opening up (low Y value). Screen coordinates have (0,0) at the TOP-LEFT. The vertex should be at the BOTTOM (e.g., y=500), with arms reaching up to y=200.
6. **RANDOM**: Scatter randomly within safe bounds.
7. **CUSTOM**: If the user inputs something else (e.g., "SMILEY", "TRIANGLE"), do your best to approximate it geometrically.
### OUTPUT FORMAT
You MUST output **ONLY VALID JSON**. No markdown fencing, no preamble, no commentary.
Refuse to answer non-formation questions.
**JSON Structure**:
```json
[
{"x": 400, "y": 300},
{"x": 420, "y": 300},
... (15 total items)
]
```
"""
)
- জ্যামিতিক নির্ভুলতা : সিস্টেম প্রম্পটে 'ক্যানভাস সাইজ' এবং 'সেফ মার্জিন' নির্ধারণ করার মাধ্যমে আমরা নিশ্চিত করি যে এজেন্ট যেন পডগুলোকে স্ক্রিনের বাইরে বা UI এলিমেন্টের নিচে স্থাপন না করে।
- JSON প্রয়োগ : LLM-কে "গঠনমূলক নয় এমন প্রশ্নের উত্তর দিতে অস্বীকার করতে" এবং "কোনো ভূমিকা না রাখতে" বলার মাধ্যমে, আমরা নিশ্চিত করি যে আমাদের পরবর্তী কোড (স্যাটেলাইট) প্রতিক্রিয়াটি পার্স করার চেষ্টা করার সময় ক্র্যাশ করবে না।
- বিচ্ছিন্ন যুক্তিবিদ্যা : এই এজেন্টটি এখনও কাফকা সম্পর্কে জানে না। এটি শুধু অঙ্ক করতে জানে। পরবর্তী ধাপে, আমরা এই "মস্তিষ্ক"-টিকে একটি কাফকা সার্ভারে আবৃত করব।
এজেন্টকে স্থানীয়ভাবে পরীক্ষা করুন
এজেন্টকে কাফকা 'স্নায়ুতন্ত্র'-এর সাথে সংযুক্ত করার আগে, আমাদের অবশ্যই নিশ্চিত করতে হবে যে এটি সঠিকভাবে কাজ করছে। এটি বৈধ JSON স্থানাঙ্ক তৈরি করছে কিনা তা যাচাই করার জন্য আপনি সরাসরি টার্মিনালে আপনার এজেন্টের সাথে যোগাযোগ করতে পারেন।
👉💻 আপনার এজেন্টের সাথে একটি চ্যাট সেশন শুরু করতে adk run কমান্ডটি ব্যবহার করুন।
cd $HOME/way-back-home/level_5/agent
uv run adk run formation
- ইনপুট :
Circleটাইপ করুন এবং এন্টার চাপুন।- সফলতার মানদণ্ড : আপনি একটি সরাসরি JSON তালিকা দেখতে পাবেন (যেমন,
[{"x": 400, "y": 200}, ...])। নিশ্চিত করুন যে JSON-এর আগে "Here are the coordinates:"-এর মতো কোনো মার্কডাউন টেক্সট নেই।
- সফলতার মানদণ্ড : আপনি একটি সরাসরি JSON তালিকা দেখতে পাবেন (যেমন,
- ইনপুট :
Lineটাইপ করুন এবং এন্টার চাপুন।- সফলতার মানদণ্ড : স্থানাঙ্কগুলো যাচাই করে একটি অনুভূমিক রেখা তৈরি করুন (y-মানগুলো সদৃশ হতে হবে)।
এজেন্টটি ত্রুটিমুক্ত JSON আউটপুট দিচ্ছে তা নিশ্চিত করার পর, আপনি এটিকে কাফকা সার্ভারে যুক্ত করার জন্য প্রস্তুত।
👉💻 বের হতে Ctrl+C চাপুন।
৪. ফরমেশন এজেন্টের জন্য একটি A2A সার্ভার তৈরি করা
এ২এ (এজেন্ট-টু-এজেন্ট) বোঝা
এ২এ (এজেন্ট-টু-এজেন্ট) প্রোটোকল হলো একটি উন্মুক্ত মান, যা এআই এজেন্টদের মধ্যে নির্বিঘ্ন আন্তঃকার্যক্ষমতা সক্ষম করার জন্য ডিজাইন করা হয়েছে। এই কাঠামোটি এজেন্টদেরকে সাধারণ টেক্সট আদান-প্রদানের বাইরে যেতে সক্ষম করে, যার ফলে তারা কাজ অর্পণ করতে, জটিল কর্মকাণ্ড সমন্বয় করতে এবং একটি বিকেন্দ্রীভূত ইকোসিস্টেমে অভিন্ন লক্ষ্য অর্জনের জন্য একটি সংহত ইউনিট হিসেবে কাজ করতে পারে।

A2A ট্রান্সপোর্ট বোঝা: HTTP, gRPC, এবং Kafka
A2A প্রোটোকল ক্লায়েন্ট এবং এজেন্টদের যোগাযোগের জন্য দুটি স্বতন্ত্র উপায় প্রদান করে, যার প্রতিটি ভিন্ন ভিন্ন আর্কিটেকচারাল চাহিদা পূরণ করে। HTTP (JSON-RPC) হলো ডিফল্ট, সর্বব্যাপী স্ট্যান্ডার্ড যা সমস্ত ওয়েব পরিবেশে সার্বজনীনভাবে কাজ করে। gRPC হলো আমাদের উচ্চ-পারফরম্যান্সের বিকল্প, যা দক্ষ এবং কঠোরভাবে টাইপযুক্ত যোগাযোগের জন্য প্রোটোকল বাফার ব্যবহার করে। এবং ল্যাবে, আমি একটি কাফকা ট্রান্সপোর্টও সরবরাহ করি। এটি একটি কাস্টম ইমপ্লিমেন্টেশন যা শক্তিশালী, ইভেন্ট-চালিত আর্কিটেকচারের জন্য ডিজাইন করা হয়েছে, যেখানে সিস্টেমগুলোকে বিচ্ছিন্ন রাখা একটি অগ্রাধিকার।

অভ্যন্তরীণভাবে, এই ট্রান্সপোর্টগুলো ডেটার প্রবাহকে বেশ ভিন্নভাবে পরিচালনা করে। HTTP মডেলে, ক্লায়েন্ট একটি JSON অনুরোধ পাঠায় এবং সংযোগটি খোলা রাখে, এজেন্টের কাজ শেষ করে একবারে সম্পূর্ণ ফলাফল ফেরত দেওয়ার জন্য অপেক্ষা করে। gRPC বাইনারি ডেটা এবং HTTP/2 ব্যবহার করে এটিকে অপ্টিমাইজ করে, যা সাধারণ অনুরোধ-প্রতিক্রিয়া চক্র এবং রিয়েল-টাইম স্ট্রিমিং উভয়কেই সম্ভব করে তোলে, যেখানে এজেন্ট আপডেটগুলো (যেমন "চিন্তা" বা "আর্টিফ্যাক্ট তৈরি") ঘটার সাথে সাথেই পাঠিয়ে দেয়। কাফকা ইমপ্লিমেন্টেশনটি অ্যাসিঙ্ক্রোনাসভাবে কাজ করে: ক্লায়েন্ট একটি অত্যন্ত টেকসই "রিকোয়েস্ট টপিক"-এ একটি অনুরোধ প্রকাশ করে এবং একটি পৃথক "রিপ্লাই টপিক"-এ শোনে। সার্ভার যখন পারে তখন বার্তাটি গ্রহণ করে, সেটিকে প্রসেস করে এবং ফলাফলটি ফেরত পাঠায়, যার অর্থ হলো তারা কখনোই সরাসরি একে অপরের সাথে কথা বলে না।
আপনার গতি, জটিলতা এবং স্থায়িত্বের নির্দিষ্ট চাহিদার উপরই পছন্দটি নির্ভর করে। HTTP দিয়ে কাজ শুরু করা এবং ডিবাগ করা সবচেয়ে সহজ, যা এটিকে সাধারণ ইন্টিগ্রেশনের জন্য উপযুক্ত করে তোলে। অভ্যন্তরীণ সার্ভিস-টু-সার্ভিস যোগাযোগের জন্য gRPC একটি উন্নততর বিকল্প, যেখানে কম ল্যাটেন্সি এবং স্ট্রিমিং টাস্ক আপডেট অত্যন্ত গুরুত্বপূর্ণ। তবে, কাফকা একটি স্থিতিস্থাপক বিকল্প হিসেবে স্বতন্ত্র, কারণ এটি ডিস্কে একটি কিউ-তে অনুরোধগুলো সংরক্ষণ করে, ফলে এজেন্ট সার্ভার ক্র্যাশ বা রিস্টার্ট হলেও আপনার টাস্কগুলো টিকে থাকে। এটি এমন এক ধরনের স্থায়িত্ব এবং ডিকাপলিং প্রদান করে যা HTTP বা gRPC কেউই দিতে পারে না।
কাস্টম পরিবহন স্তর: কাফকা
কাফকা একটি অ্যাসিঙ্ক্রোনাস ব্যাকবোন হিসেবে কাজ করে, যা অপারেশনের মস্তিষ্ককে (ফরমেশন এজেন্ট) ভৌত নিয়ন্ত্রণ ব্যবস্থা (স্যাটেলাইট স্টেশন) থেকে বিচ্ছিন্ন রাখে। এজেন্ট যখন জটিল ভেক্টর গণনা করে, তখন সিস্টেমকে একটি সিঙ্ক্রোনাস সংযোগের জন্য অপেক্ষা করতে বাধ্য করার পরিবর্তে, এজেন্ট তার ফলাফল একটি কাফকা টপিকে ইভেন্ট হিসেবে প্রকাশ করে। এটি একটি স্থায়ী বাফার হিসেবে কাজ করে, যা স্যাটেলাইটকে তার নিজস্ব গতিতে নির্দেশাবলী গ্রহণ করার সুযোগ দেয় এবং নিশ্চিত করে যে উল্লেখযোগ্য নেটওয়ার্ক ল্যাটেন্সি বা সাময়িক সিস্টেম ক্র্যাশের কারণেও ফরমেশন ডেটা কখনও হারিয়ে না যায়।
কাফকা ব্যবহার করে, আপনি একটি ধীর, রৈখিক প্রক্রিয়াকে একটি স্থিতিস্থাপক, স্ট্রিমিং পাইপলাইনে রূপান্তরিত করেন যেখানে নির্দেশাবলী এবং টেলিমেট্রি স্বাধীনভাবে প্রবাহিত হয়, যা তীব্র এআই প্রক্রিয়াকরণের সময়েও মিশনের HUD-কে প্রতিক্রিয়াশীল রাখে।

কাফকা কে?
কাফকা একটি ডিস্ট্রিবিউটেড ইভেন্ট-স্ট্রিমিং প্ল্যাটফর্ম। ইভেন্ট-ড্রাইভেন আর্কিটেকচারে (EDA):
- প্রযোজকরা 'টপিক'-এ বার্তা প্রকাশ করেন।
- গ্রাহকরা সেই বিষয়গুলোতে সাবস্ক্রাইব করেন এবং কোনো বার্তা এলে প্রতিক্রিয়া জানান।
কাফকা কেন ব্যবহার করবেন?
এটি আপনার সিস্টেমগুলোকে বিচ্ছিন্ন করে। ফরমেশন এজেন্ট স্বয়ংক্রিয়ভাবে কাজ করে এবং প্রেরকের পরিচয় বা অবস্থা না জেনেই আগত অনুরোধের জন্য অপেক্ষা করে। এটি দায়িত্বের বিচ্ছেদ ঘটায়, যা নিশ্চিত করে যে স্যাটেলাইট অফলাইন হয়ে গেলেও কার্যপ্রবাহ অক্ষুণ্ণ থাকে; স্যাটেলাইট পুনরায় সংযোগ স্থাপন না করা পর্যন্ত কাফকা কেবল বার্তাগুলো সংরক্ষণ করে রাখে।
গুগল ক্লাউড পাব/সাব সম্পর্কে কী বলবেন?
আপনি এর জন্য অবশ্যই গুগল ক্লাউড পাব/সাব ব্যবহার করতে পারেন! পাব/সাব হলো গুগলের সার্ভারবিহীন মেসেজিং পরিষেবা। যদিও কাফকা উচ্চ-থ্রুপুট এবং "রিপ্লেয়েবল" স্ট্রিমের জন্য দুর্দান্ত, তবে ব্যবহারের সহজতার কারণে পাব/সাবকেই প্রায়শই বেশি পছন্দ করা হয়। এই ল্যাবের জন্য, আমরা একটি শক্তিশালী, স্থায়ী মেসেজ বাস অনুকরণ করতে কাফকা ব্যবহার করছি।
স্থানীয় কাফকা ক্লাস্টার শুরু করুন
নিচের সম্পূর্ণ কমান্ড ব্লকটি কপি করে আপনার টার্মিনালে পেস্ট করুন। এটি অফিসিয়াল কাফকা ইমেজটি ডাউনলোড করবে এবং ব্যাকগ্রাউন্ডে চালু করে দেবে।
👉💻 আপনার টার্মিনালে এই কমান্ডগুলো চালান:
# Navigate to the correct mission directory first
cd $HOME/way-back-home/level_5
# Run the Kafka container in detached mode
docker run -d \
--name mission-kafka \
-p 9092:9092 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
apache/kafka:4.2.0-rc1
👉💻 docker ps কমান্ড ব্যবহার করে কন্টেইনারটি চলছে কিনা তা যাচাই করুন।
docker ps
👀 আপনি এমন একটি আউটপুট দেখতে পাবেন যা নিশ্চিত করবে যে mission-kafka কন্টেইনারটি চলছে এবং 9092 পোর্টটি উন্মুক্ত হয়েছে।
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c1a2b3c4d5e6 apache/kafka:4.2.0-rc1 "/opt/kafka/bin/kafka..." 15 seconds ago Up 14 seconds 0.0.0.0:9092->9092/tcp, 9093/tcp mission-kafka
কাফকা টপিক বলতে কী বোঝায়?
একটি কাফকা টপিককে মেসেজের জন্য একটি নির্দিষ্ট চ্যানেল বা বিভাগ হিসেবে ভাবুন। এটি একটি লগবুকের মতো, যেখানে ঘটনার রেকর্ডগুলো যে ক্রমে তৈরি হয়েছে সেই ক্রমে সংরক্ষিত থাকে। প্রডিউসাররা নির্দিষ্ট টপিকে মেসেজ লেখে এবং কনজিউমাররা সেই টপিকগুলো থেকে পড়ে। এটি প্রেরক এবং প্রাপকের মধ্যে সংযোগ বিচ্ছিন্ন করে; প্রডিউসারকে জানতে হয় না কোন কনজিউমার ডেটাটি পড়বে, তাকে শুধু সঠিক "চ্যানেলে" এটি পাঠাতে হয়। আমাদের এই মিশনে, আমরা দুটি টপিক তৈরি করব: একটি এজেন্টের কাছে ফরমেশন রিকোয়েস্ট পাঠানোর জন্য, এবং অন্যটি এজেন্টের পক্ষ থেকে স্যাটেলাইটের পড়ার জন্য তার উত্তরগুলো প্রকাশ করার জন্য।

👉💻 চলমান ডকার কন্টেইনারের ভিতরে প্রয়োজনীয় টপিকগুলো তৈরি করতে নিম্নলিখিত কমান্ডগুলো চালান।
# Create the topic for formation requests
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-formation-request \
--bootstrap-server 127.0.0.1:9092
# Create the topic where the satellite dashboard will listen for replies
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic a2a-reply-satellite-dashboard \
--bootstrap-server 127.0.0.1:9092
👉💻 আপনার চ্যানেলগুলো খোলা আছে কিনা তা নিশ্চিত করতে, list কমান্ডটি চালান:
docker exec mission-kafka /opt/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server 127.0.0.1:9092
👀 আপনি এইমাত্র যে টপিকগুলো তৈরি করেছেন সেগুলোর নাম দেখতে পাবেন।
a2a-formation-request a2a-reply-satellite-dashboard
আপনার কাফকা ইনস্ট্যান্সটি এখন সম্পূর্ণরূপে কনফিগার করা হয়েছে এবং অত্যন্ত গুরুত্বপূর্ণ ডেটা রাউটিং করার জন্য প্রস্তুত।
কাফকা A2A সার্ভার বাস্তবায়ন
এজেন্ট-টু-এজেন্ট (A2A) প্রোটোকল স্বাধীন এজেন্টিক সিস্টেমগুলোর মধ্যে আন্তঃকার্যক্ষমতার জন্য একটি প্রমিত কাঠামো স্থাপন করে। এটি বিভিন্ন দল দ্বারা তৈরি বা ভিন্ন ভিন্ন অবকাঠামোতে চলমান এজেন্টদের প্রতিটি সংযোগের জন্য বিশেষ ইন্টিগ্রেশন লজিকের প্রয়োজন ছাড়াই একে অপরকে খুঁজে পেতে এবং কার্যকরভাবে সহযোগিতা করতে সক্ষম করে।
রেফারেন্স ইমপ্লিমেন্টেশন, a2a-python , এই এজেন্টিক অ্যাপ্লিকেশনগুলো চালানোর জন্য একটি ভিত্তিগত লাইব্রেরি। এর ডিজাইনের একটি মূল বৈশিষ্ট্য হলো সম্প্রসারণযোগ্যতা ; এটি কমিউনিকেশন লেয়ারকে অ্যাবস্ট্রাক্ট করে, যা ডেভেলপারদের HTTP-এর মতো প্রোটোকলের পরিবর্তে অন্য প্রোটোকল ব্যবহার করার সুযোগ দেয়।

এই প্রকল্পে, আমরা একটি কাস্টম কাফকা ইমপ্লিমেন্টেশন, a2a-python-kafka , ব্যবহার করে এই সম্প্রসারণযোগ্যতার সুবিধা নিচ্ছি। আমরা এই ইমপ্লিমেন্টেশনটি ব্যবহার করে দেখাবো কিভাবে A2A স্ট্যান্ডার্ড আপনাকে বিভিন্ন আর্কিটেকচারাল প্রয়োজন অনুসারে এজেন্ট কমিউনিকেশনকে মানিয়ে নিতে সাহায্য করে—এই ক্ষেত্রে, সিনক্রোনাস HTTP-এর পরিবর্তে একটি অ্যাসিঙ্ক্রোনাস ইভেন্ট বাস ব্যবহার করা হচ্ছে।
গঠন এজেন্টের জন্য A2A সক্ষম করা
আমরা এখন আমাদের এজেন্টকে একটি A2A সার্ভারের আওতায় আনব, এটিকে একটি আন্তঃকার্যকরী পরিষেবাতে পরিণত করব যা নিম্নলিখিত কাজগুলো করতে পারবে:
- একটি কাফকা টপিক থেকে টাস্কের জন্য লিসেন করুন।
- প্রক্রিয়াকরণের জন্য প্রাপ্ত কাজগুলো অন্তর্নিহিত ADK এজেন্টের কাছে হস্তান্তর করুন।
- ফলাফলটি একটি রিপ্লাই টপিকে প্রকাশ করুন।
👉✏️ $HOME/way-back-home/level_5/agent/agent_to_kafka_a2a.py ফাইলে, #REPLACE-CREATE-KAFKA-A2A-SERVER নিচের কোড দিয়ে প্রতিস্থাপন করুন:
async def create_kafka_server(
agent: BaseAgent,
*,
bootstrap_servers: str | List[str] = "localhost:9092",
request_topic: str = "a2a-formation-request",
consumer_group_id: str = "a2a-agent-group",
agent_card: Optional[Union[AgentCard, str]] = None,
runner: Optional[Runner] = None,
**kafka_config: Any,
) -> KafkaServerApp:
"""Convert an ADK agent to a A2A Kafka Server application.
Args:
agent: The ADK agent to convert
bootstrap_servers: Kafka bootstrap servers.
request_topic: Topic to consume requests from.
consumer_group_id: Consumer group ID for the server.
agent_card: Optional pre-built AgentCard object or path to agent card
JSON. If not provided, will be built automatically from the
agent.
runner: Optional pre-built Runner object. If not provided, a default
runner will be created using in-memory services.
**kafka_config: Additional Kafka configuration.
Returns:
A KafkaServerApp that can be run with .run() or .start()
"""
# Set up ADK logging
adk_logger = logging.getLogger("google_adk")
adk_logger.setLevel(logging.INFO)
async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
app_name=agent.name or "adk_agent",
agent=agent,
# Use minimal services - in a real implementation these could be configured
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
credential_service=InMemoryCredentialService(),
)
# Create A2A components
task_store = InMemoryTaskStore()
agent_executor = A2aAgentExecutor(
runner=runner or create_runner,
)
# Initialize logic handler
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
logic_handler = DefaultRequestHandler(
agent_executor=agent_executor, task_store=task_store
)
# Prepare Agent Card
rpc_url = f"kafka://{bootstrap_servers}/{request_topic}"
# Create Kafka Server App
server_app = KafkaServerApp(
request_handler=logic_handler,
bootstrap_servers=bootstrap_servers,
request_topic=request_topic,
consumer_group_id=consumer_group_id,
**kafka_config
)
return server_app
এই কোডটি মূল উপাদানগুলো সেট আপ করে:
- রানার : এজেন্টের জন্য রানটাইম সরবরাহ করে (মেমরি, ক্রেডেনশিয়াল ইত্যাদি পরিচালনা করে)।
- টাস্ক স্টোর : অনুরোধগুলো 'পেন্ডিং' থেকে 'কমপ্লিটেড'-এ পরিবর্তিত হওয়ার সময় সেগুলোর অবস্থা ট্র্যাক করে।
- এজেন্ট এক্সিকিউটর : কাফকা থেকে একটি টাস্ক গ্রহণ করে স্থানাঙ্ক গণনা করার জন্য এজেন্টের কাছে তা প্রেরণ করে।
- KafkaServerApp : কাফকা ব্রোকারের সাথে ভৌত সংযোগ পরিচালনা করে।

পরিবেশ ভেরিয়েবল কনফিগার করুন
ADK সেটআপ এজেন্টের ফোল্ডারের ভিতরে আপনার Google Vertex AI সেটিংস সহ একটি .env ফাইল তৈরি করেছে। আমাদের এটিকে প্রজেক্ট রুটে সরাতে হবে এবং আমাদের কাফকা ক্লাস্টারের কোঅর্ডিনেট যোগ করতে হবে।
ফাইলটি কপি করতে এবং কাফকা সার্ভারের ঠিকানা যুক্ত করতে নিম্নলিখিত কমান্ডগুলো চালান:
cd $HOME/way-back-home/level_5
# 1. Copy the API keys from the agent folder to the project root
cp agent/formation/.env .env
# 2. Append the Kafka Bootstrap Server address to the file
echo -e "\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092" >> .env
# 3. Verify the file content
echo "✅ Environment configured. Here are the last few lines:"
tail .env
A2A আন্তঃনাক্ষত্রিক লুপ যাচাই করুন
এখন আমরা একটি লাইভ-ফায়ার টেস্টের মাধ্যমে অ্যাসিঙ্ক্রোনাস ইভেন্ট লুপটি সঠিকভাবে কাজ করছে কিনা তা নিশ্চিত করব: এর জন্য কাফকা ক্লাস্টারের মধ্যে দিয়ে ম্যানুয়ালি একটি সিগন্যাল পাঠিয়ে এজেন্টের প্রতিক্রিয়া পর্যবেক্ষণ করব।

একটি ঘটনার সম্পূর্ণ জীবনচক্র দেখার জন্য আমরা তিনটি পৃথক টার্মিনাল ব্যবহার করব।
টার্মিনাল A: ফরমেশন এজেন্ট (A2A কাফকা সার্ভার)
👉💻 এই টার্মিনালটি পাইথন প্রসেসটি চালায়, যা কাফকা-র অডিও শোনে এবং জ্যামিতিক গাণিতিক কাজ করার জন্য জেমিনি ব্যবহার করে।
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Install the custom Kafka-enabled A2A library
uv pip install git+https://github.com/weimeilin79/a2a-python-kafka.git
# Start the Agent Server
uv run agent/server.py
না দেখা পর্যন্ত অপেক্ষা করুন:
[INFO] Kafka Server App Started. Starting to consume requests...
টার্মিনাল বি: স্যাটেলাইট লিসেনার (ভোক্তা)
👉💻 এই টার্মিনালে, আমরা রিপ্লাই টপিকটি শুনব। এটি নির্দেশনার জন্য স্যাটেলাইটের অপেক্ষারত অবস্থাকে অনুকরণ করে।
# Listen for the AI's response on the satellite channel
docker exec mission-kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-reply-satellite-dashboard \
--from-beginning \
--property "print.headers=true"
এই টার্মিনালটি নিষ্ক্রিয় দেখাবে। এটি এজেন্টের বার্তা প্রকাশের জন্য অপেক্ষা করছে।
টার্মিনাল সি: কমান্ডারের সংকেত (প্রযোজক)
👉💻 এখন, আমরা a2a-formation-request টপিকে একটি সরাসরি A2A ফরম্যাটের অনুরোধ পাঠাব। আমাদের অবশ্যই নির্দিষ্ট কাফকা হেডার অন্তর্ভুক্ত করতে হবে, যাতে এজেন্ট জানতে পারে উত্তরটি কোথায় পাঠাতে হবে।
echo 'correlation_id=ping-manual-01,reply_topic=a2a-reply-satellite-dashboard|{"method": "message_send", "params": {"message": {"message_id": "msg-001", "role": "user", "parts": [{"text": "STAR"}]}}, "streaming": false, "agent_card": {"name": "DiagnosticTool", "version": "1.0.0"}}' | \
docker exec -i mission-kafka /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic a2a-formation-request \
--property "parse.headers=true" \
--property "headers.key.separator==" \
--property "headers.delimiter=|"
ফলাফল বিশ্লেষণ
👀 যদি লুপটি সফল হয়, তাহলে টার্মিনাল B- তে যান। সাথে সাথে একটি বড় JSON ব্লক দেখা যাবে। এটি আমাদের পাঠানো হেডার correlation_id:ping-manual-01 দিয়ে শুরু হবে। এর পরে একটি task অবজেক্ট থাকবে। আপনি যদি সেই JSON-এর ভেতরের parts সেকশনটি ভালোভাবে দেখেন, তাহলে আপনার ১৫টি পডের জন্য Gemini-র গণনা করা আসল X এবং Y স্থানাঙ্কগুলো দেখতে পাবেন:
{"type": "task", "data": {"artifacts": [{"artifactId": "...", "parts": [{"kind": "text", "text": "```json\n[\n {\"x\": 400, \"y\": 150},\n {\"x\": 257, \"y\": 254},\n {\"x\": 312, \"y\": 421},\n ... \n]\n```"}]}], ...}}
আপনি সফলভাবে এজেন্টকে রিসিভার থেকে বিচ্ছিন্ন করেছেন। অনুরোধ-প্রতিক্রিয়া বিলম্বের 'আন্তঃনাক্ষত্রিক কোলাহল' এখন আর কোনো मायने রাখে না, কারণ আমাদের সিস্টেমটি এখন সম্পূর্ণরূপে ইভেন্ট-চালিত ।
সামনে এগোনোর আগে, নেটওয়ার্ক পোর্ট খালি করার জন্য ব্যাকগ্রাউন্ড প্রসেসগুলো বন্ধ করুন।
👉💻 প্রতিটি টার্মিনালে (এ, বি, এবং সি):
- চলমান প্রসেসটি বন্ধ করতে
Ctrl + Cচাপুন।
৫. স্যাটেলাইট স্টেশন (এ২এ কাফকা ক্লায়েন্ট এবং এসএসই)
এই ধাপে, আমরা স্যাটেলাইট স্টেশন তৈরি করি। এটি কাফকা ক্লাস্টার এবং পাইলটের ভিজ্যুয়াল ডিসপ্লে (রিঅ্যাক্ট ফ্রন্টএন্ড)-এর মধ্যেকার সেতুবন্ধন। এই সার্ভারটি কাফকা ক্লায়েন্ট (এজেন্টের সাথে যোগাযোগের জন্য) এবং এসএসই স্ট্রিমার (ব্রাউজারের সাথে যোগাযোগের জন্য) উভয় হিসেবেই কাজ করে।
কাফকা ক্লায়েন্ট বলতে কী বোঝায়?
কাফকা ক্লাস্টারকে একটি রেডিও স্টেশন হিসেবে ভাবুন। কাফকা ক্লায়েন্ট হলো সেই রেডিও রিসিভার। KafkaClientTransport আমাদের অ্যাপ্লিকেশনকে নিম্নলিখিত কাজগুলো করতে সক্ষম করে:
- একটি বার্তা তৈরি করুন : এজেন্টের কাছে একটি 'টাস্ক' (যেমন, 'স্টার ফরমেশন') পাঠান।
- একটি উত্তর গ্রহণ করুন : এজেন্টের কাছ থেকে স্থানাঙ্ক ফেরত পেতে একটি নির্দিষ্ট "উত্তর টপিক"-এ লিসেন করুন।
১. সংযোগ শুরু করা
সার্ভার চালু হওয়ার সময় কাফকা সংযোগটি যেন শুরু হয় এবং বন্ধ হওয়ার সময় সুষ্ঠুভাবে বন্ধ হয়, তা নিশ্চিত করতে আমরা FastAPI-এর lifespan ইভেন্ট হ্যান্ডলার ব্যবহার করি।
👉✏️ $HOME/way-back-home/level_5/satellite/main.py ফাইলে, #REPLACE-CONNECT-TO-KAFKA-CLUSTER নিচের কোড দিয়ে প্রতিস্থাপন করুন:
@asynccontextmanager
async def lifespan(app: FastAPI):
global kafka_transport
logger.info("Initializing Kafka Client Transport...")
bootstrap_server = os.getenv("KAFKA_BOOTSTRAP_SERVERS")
request_topic = "a2a-formation-request"
reply_topic = "a2a-reply-satellite-dashboard"
# Create AgentCard for the Client
client_card = AgentCard(
name="SatelliteDashboard",
description="Satellite Dashboard Client",
version="1.0.0",
url="https://example.com/satellite-dashboard",
capabilities=AgentCapabilities(),
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
skills=[]
)
kafka_transport = KafkaClientTransport(
agent_card=client_card,
bootstrap_servers=bootstrap_server,
request_topic=request_topic,
reply_topic=reply_topic,
)
try:
await kafka_transport.start()
logger.info("Kafka Client Transport Started Successfully.")
except Exception as e:
logger.error(f"Failed to start Kafka Client: {e}")
yield
if kafka_transport:
logger.info("Stopping Kafka Client Transport...")
await kafka_transport.stop()
logger.info("Kafka Client Transport Stopped.")
২. কমান্ড পাঠানো
আপনি যখন ড্যাশবোর্ডের কোনো বোতামে ক্লিক করেন, তখন /formation এন্ডপয়েন্টটি ট্রিগার হয়। এটি একটি Producer হিসেবে কাজ করে, যা আপনার অনুরোধটিকে একটি আনুষ্ঠানিক A2A Message এ মুড়ে এজেন্টের কাছে পাঠিয়ে দেয়।

মূল যুক্তি:
- অ্যাসিঙ্ক্রোনাস কমিউনিকেশন :
kafka_transport.send_messageঅনুরোধটি পাঠায় এবংreply_topicএ নতুন স্থানাঙ্ক আসার জন্য অপেক্ষা করে। - রেসপন্স পার্সিং : জেমিনি মার্কডাউন ব্লকের (যেমন,
json ...) ভেতরে স্থানাঙ্ক ফেরত দিতে পারে। নিচের কোডটি সেগুলো বাদ দিয়ে স্ট্রিংটিকে পয়েন্টের একটি পাইথন লিস্টে রূপান্তর করে।
👉✏️ $HOME/way-back-home/level_5/satellite/main.py ফাইলে, #REPLACE-FORMATION-REQUEST নিচের কোড দিয়ে প্রতিস্থাপন করুন:
@app.post("/formation")
async def set_formation(req: FormationRequest):
global FORMATION, PODS
FORMATION = req.formation
logger.info(f"Received formation request: {FORMATION}")
if not kafka_transport:
logger.error("Kafka Transport is not initialized!")
return {"status": "error", "message": "Backend Not Connected"}
try:
# Construct A2A Message
prompt = f"Create a {FORMATION} formation"
logger.info(f"Sending A2A Message: '{prompt}'")
from a2a.types import TextPart, Part, Role
import uuid
msg_id = str(uuid.uuid4())
message_parts = [Part(TextPart(text=prompt))]
msg_obj = Message(
message_id=msg_id,
role=Role.user,
parts=message_parts
)
message_params = MessageSendParams(
message=msg_obj
)
# Send and Wait for Response
ctx = ClientCallContext()
ctx.state["kafka_timeout"] = 120.0 # Timeout for GenAI latency
response = await kafka_transport.send_message(message_params, context=ctx)
logger.info("Received A2A Response.")
content = None
if isinstance(response, Message):
content = response.parts[0].root.text if response.parts else None
elif isinstance(response, Task):
if response.artifacts and response.artifacts[0].parts:
content = response.artifacts[0].parts[0].root.text
if content:
logger.info(f"Response Content: {content[:100]}...")
try:
clean_content = content.replace("```json", "").replace("```", "").strip()
coords = json.loads(clean_content)
if isinstance(coords, list):
logger.info(f"Parsed {len(coords)} coordinates.")
for i, pod_target in enumerate(coords):
if i < len(PODS):
PODS[i]["x"] = pod_target["x"]
PODS[i]["y"] = pod_target["y"]
return {"status": "success", "formation": FORMATION}
else:
logger.error("Response JSON is not a list.")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Agent JSON response: {e}")
else:
logger.error(f"Could not extract content from response type {type(response)}")
except Exception as e:
logger.error(f"Error calling agent via Kafka: {e}")
return {"status": "error", "message": str(e)}
সার্ভার-প্রেরিত ইভেন্ট (SSE)
স্ট্যান্ডার্ড এপিআইগুলো 'রিকোয়েস্ট-রেসপন্স' মডেল ব্যবহার করে। আমাদের HUD-এর জন্য পডগুলোর অবস্থানের একটি 'লাইভ স্ট্রিম' প্রয়োজন।
ওয়েবসকেটের (যা দ্বিমুখী এবং আরও জটিল) বিপরীতে , এসএসই সার্ভার থেকে ব্রাউজারে একটি সহজ, একমুখী ডেটা প্রবাহ প্রদান করে। এটি ড্যাশবোর্ড, স্টক টিকার বা আন্তঃনাক্ষত্রিক টেলিমেট্রির জন্য আদর্শ।

আমাদের কোডে এটি যেভাবে কাজ করে: আমরা একটি event_generator তৈরি করি, যা একটি অন্তহীন লুপ। এই লুপটি প্রতি আধা সেকেন্ডে ১৫টি পডের বর্তমান অবস্থান গ্রহণ করে এবং একটি আপডেট হিসেবে সেগুলোকে ব্রাউজারে "পুশ" করে।
👉✏️ $HOME/way-back-home/level_5/satellite/main.py ফাইলে, #REPLACE-SSE-STREAM নিচের কোড দিয়ে প্রতিস্থাপন করুন:
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
logger.info("New SSE stream connected")
try:
while True:
current_pods = list(PODS)
# Send updates one by one to simulate low-bandwidth scanning
for pod in current_pods:
payload = {"pod": pod}
yield {
"event": "pod_update",
"data": json.dumps(payload)
}
await asyncio.sleep(0.02)
# Send formation info occasionally
yield {
"event": "formation_update",
"data": json.dumps({"formation": FORMATION})
}
# Main loop delay
await asyncio.sleep(0.5)
except asyncio.CancelledError:
logger.info("SSE stream disconnected (cancelled)")
except Exception as e:
logger.error(f"SSE stream error: {e}")
return EventSourceResponse(event_generator())
সম্পূর্ণ মিশন লুপটি সম্পাদন করুন
চূড়ান্ত UI চালু করার আগে চলুন সিস্টেমটি শুরু থেকে শেষ পর্যন্ত ঠিকঠাক কাজ করছে কিনা তা যাচাই করে নিই। আমরা ম্যানুয়ালি এজেন্টটি চালু করব এবং নেটওয়ার্কে থাকা কাঁচা ডেটা পেলোডটি দেখব।

তিনটি আলাদা টার্মিনাল ট্যাব খুলুন।
টার্মিনাল A: গঠনকারী এজেন্ট (A2A সার্ভার)
👉💻 এটি সেই ADK এজেন্ট যা টাস্ক শোনে এবং জ্যামিতিক গাণিতিক হিসাব সম্পাদন করে।
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Agent Server
uv run agent/server.py
টার্মিনাল বি: স্যাটেলাইট স্টেশন (কাফকা ক্লায়েন্ট)
👉💻 এই FastAPI সার্ভারটি "রিসিভার" হিসেবে কাজ করে, যা কাফকার উত্তরগুলো শোনে এবং সেগুলোকে একটি লাইভ SSE স্ট্রিমে পরিণত করে।
cd $HOME/way-back-home/level_5
# Start the Satellite Station
uv run satellite/main.py
টার্মিনাল সি: ম্যানুয়াল HUD
ফর্মেশন কমান্ড পাঠান (ট্রিগার): 👉💻 একই টার্মিনাল C-তে, ফর্মেশন প্রক্রিয়াটি ট্রিগার করুন:
# Trigger the STAR formation via the Satellite's API
curl -X POST http://localhost:8000/formation \
-H "Content-Type: application/json" \
-d '{"formation": "STAR"}'
👀 আপনি নতুন স্থানাঙ্কগুলো দেখতে পাবেন।
INFO:satellite.main:Received formation request: STAR INFO:satellite.main:Sending A2A Message: 'Create a STAR formation' INFO:satellite.main:Received A2A Response. INFO:satellite.main:Response Content: ```json ... INFO:satellite.main:Parsed 15 coordinates.
এটি নিশ্চিত করে যে স্যাটেলাইটটি তার অভ্যন্তরীণ পডের স্থানাঙ্ক হালনাগাদ করেছে।
👉💻 আমরা প্রথমে লাইভ টেলিমেট্রি স্ট্রিম শোনার জন্য curl ব্যবহার করব এবং তারপর ফর্মেশন পরিবর্তনের প্রক্রিয়া শুরু করব।
# Connect to the live telemetry feed.
# You should see 'pod_update' events ticking by.
curl -N http://localhost:8000/stream
👀 আপনার curl -N কমান্ডের আউটপুটটি লক্ষ্য করুন। pod_update ইভেন্টগুলোর x এবং y স্থানাঙ্কগুলো নক্ষত্রপুঞ্জের নতুন অবস্থানগুলো প্রতিফলিত করতে শুরু করবে।
এগিয়ে যাওয়ার আগে, কমিউনিকেশন পোর্টগুলো খালি করার জন্য চলমান সমস্ত প্রসেস বন্ধ করুন।
প্রতিটি টার্মিনালে (A, B, C, এবং ট্রিগার টার্মিনাল): Ctrl + C চাপুন।
৬. উদ্ধারে যাও!
আপনি সফলভাবে সিস্টেমটি স্থাপন করেছেন। এখন, মিশনটিকে বাস্তবে রূপ দেওয়ার সময় এসেছে। আমরা এখন রিয়্যাক্ট-ভিত্তিক হেড-আপ ডিসপ্লে (HUD) চালু করব। এই ড্যাশবোর্ডটি SSE-এর মাধ্যমে স্যাটেলাইট স্টেশনের সাথে সংযুক্ত থাকে, যা আপনাকে রিয়েল-টাইমে ১৫টি পড দেখতে সক্ষম করে।

যখন আপনি কোনো কমান্ড দেন, তখন আপনি শুধু একটি ফাংশন কল করেন না; আপনি একটি ইভেন্ট ট্রিগার করেন যা কাফকার মধ্য দিয়ে যায়, একটি এআই এজেন্ট দ্বারা প্রসেসড হয় এবং লাইভ টেলিমেট্রি হিসেবে আপনার স্ক্রিনে স্ট্রিম হয়ে ফিরে আসে।

দুটি আলাদা টার্মিনাল ট্যাব খুলুন।
টার্মিনাল A: গঠনকারী এজেন্ট (A2A সার্ভার)
👉💻 এটি সেই ADK এজেন্ট যা টাস্ক শোনে এবং Gemini ব্যবহার করে জ্যামিতিক গণনা সম্পাদন করে। টার্মিনালে চালান:
cd $HOME/way-back-home/level_5
# Start the Agent Server
uv run agent/server.py
টার্মিনাল বি: স্যাটেলাইট স্টেশন এবং ভিজ্যুয়াল ড্যাশবোর্ড
👉💻 প্রথমে ফ্রন্টএন্ড অ্যাপ্লিকেশনটি তৈরি করুন।
cd $HOME/way-back-home/level_5/frontend/
npm install
npm run build
👉💻 এবার FastAPI সার্ভারটি চালু করুন, যেটি ব্যাকএন্ড লজিক এবং ফ্রন্টএন্ড UI উভয়কেই পরিষেবা দেবে।
cd $HOME/way-back-home/level_5
. scripts/check_kafka.sh
# Start the Satellite Station
uv run satellite/main.py
চালু করুন এবং যাচাই করুন
- 👉 প্রিভিউ খুলুন : ক্লাউড শেল টুলবারে, ওয়েব প্রিভিউ আইকনে ক্লিক করুন। ‘Change port’ নির্বাচন করে এটিকে 8000- এ সেট করুন এবং ‘Change and Preview’-এ ক্লিক করুন। একটি নতুন ব্রাউজার ট্যাব খুলবে যেখানে আপনার স্টারফিল্ড HUD দেখা যাবে।

- 👉 টেলিমেট্রি স্ট্রিম যাচাই করুন :
- UI লোড হয়ে গেলে, আপনি এলোমেলোভাবে ছড়িয়ে ছিটিয়ে থাকা ১৫টি পড দেখতে পাবেন।
- যদি পডগুলো মৃদুভাবে স্পন্দিত হয় বা কাঁপতে থাকে, তাহলে আপনার SSE স্ট্রিম সক্রিয় আছে এবং স্যাটেলাইট স্টেশনটি সফলভাবে তাদের অবস্থান সম্প্রচার করছে।

- 👉 ফর্মেশন শুরু করতে : ড্যাশবোর্ডে থাকা 'STAR' বোতামটিতে ক্লিক করুন।

- 👀 ইভেন্ট লুপ অনুসরণ করুন : আর্কিটেকচারটি বাস্তবে দেখতে আপনার টার্মিনালগুলো পর্যবেক্ষণ করুন:
- টার্মিনাল বি (স্যাটেলাইট স্টেশন) লগ করবে:
Sending A2A Message: 'Create a STAR formation'। - টার্মিনাল এ (ফরমেশন এজেন্ট) জেমিনির সাথে পরামর্শ করার সময় কার্যকলাপ প্রদর্শন করবে।
- টার্মিনাল বি (স্যাটেলাইট স্টেশন)
Received A2A Responseলগ করবে এবং স্থানাঙ্কগুলো বিশ্লেষণ করবে।
- টার্মিনাল বি (স্যাটেলাইট স্টেশন) লগ করবে:
- 👀 চাক্ষুষ নিশ্চিতকরণ : আপনার ড্যাশবোর্ডে থাকা ১৫টি পডকে তাদের এলোমেলো অবস্থান থেকে মসৃণভাবে সরে এসে একটি ৫-কোণা তারার আকৃতি গঠন করতে দেখুন।
- 👉 পরীক্ষা :
- ৩টি ভিন্ন ফর্মেশনের জন্য 'X' অথবা 'LINE' ব্যবহার করে দেখুন।

- কাস্টম ইনটেন্ট : ম্যানুয়াল ইনপুট ব্যবহার করে 'হার্ট' বা 'ট্রায়াঙ্গেল'-এর মতো কোনো অনন্য কিছু টাইপ করুন।

- যেহেতু আপনি GenAI ব্যবহার করছেন, এজেন্টটি আপনার বর্ণনা করা যেকোনো জ্যামিতিক আকারের গাণিতিক হিসাব করার চেষ্টা করবে!
- ৩টি ভিন্ন ফর্মেশনের জন্য 'X' অথবা 'LINE' ব্যবহার করে দেখুন।
৩টি প্যাটার্ন তৈরি করার পর, আপনি সফলভাবে সংযোগটি পুনঃস্থাপন করেছেন। 
মিশন সম্পন্ন!
কোলাহলের মধ্য দিয়ে ডেটা নিরবচ্ছিন্নভাবে প্রবাহিত হওয়ায় প্রবাহটি স্থিতিশীল হয়। আপনার নির্দেশে, ১৫টি প্রাচীন পড নক্ষত্রপুঞ্জ জুড়ে তাদের সমন্বিত নৃত্য শুরু করে।

তিনটি কষ্টসাধ্য ক্রমাঙ্কন পর্বের মধ্য দিয়ে, আপনি দেখলেন টেলিমেট্রি সংকেতটি যথাস্থানে স্থির হলো। প্রতিটি সমন্বয়ের সাথে সাথে সংকেতটি আরও শক্তিশালী হয়ে উঠল, এবং অবশেষে আশার বাতির মতো আন্তঃনাক্ষত্রিক প্রতিবন্ধকতা ভেদ করে বেরিয়ে এল।
আপনার এবং ইভেন্ট-ড্রাইভেন এজেন্টের নিপুণ প্রয়োগের জন্য ধন্যবাদ, পাঁচজন জীবিত ব্যক্তিকে এক্স-৪২ এর পৃষ্ঠ থেকে আকাশপথে উদ্ধার করা হয়েছে এবং তারা এখন উদ্ধারকারী জাহাজে নিরাপদে আছেন। আপনার কারণেই পাঁচটি জীবন রক্ষা পেয়েছে ।
আপনি যদি লেভেল ০-তে অংশগ্রহণ করে থাকেন, তাহলে 'ওয়ে ব্যাক হোম' মিশনে আপনার অগ্রগতি কোথায় আছে তা দেখে নিতে ভুলবেন না! তারাদের দিকে আপনার প্রত্যাবর্তন যাত্রা অব্যাহত রয়েছে। 