বাড়ি ফিরে আসার পথ - লাইভ দ্বিমুখী মাল্টি-এজেন্ট সিস্টেম

১. মিশন

গল্প

তুমি নীরব, অজানা মহাকাশের বিশালতায় ভেসে আছো। একটি বিশাল সৌর স্পন্দন তোমার জাহাজকে এক মাত্রিক ফাটলের মধ্য দিয়ে ছিঁড়ে ফেলেছে, যার ফলে তুমি মহাবিশ্বের এমন এক পকেটে আটকে পড়েছো যেখানে কোন তারকা চার্ট নেই।

কয়েকদিনের কঠোর মেরামতের পর, অবশেষে ইঞ্জিনের পরিচিত শব্দ ফিরে আসে। তোমার রকেট জাহাজটি চালু আছে। তুমি এমনকি মাদারশিপের সাথে একটি দীর্ঘ-পাল্লার আপলিঙ্ক স্থাপন করতে সক্ষম হয়েছ। যাত্রা আসন্ন। তুমি বাড়ি যেতে প্রস্তুত।

কিন্তু যখন আপনি জাম্প ড্রাইভে অংশগ্রহণের জন্য প্রস্তুতি নিচ্ছেন, তখন একটি বিপদ সংকেত স্থির ভেদ করে। আপনার সেন্সরগুলি "ওজিমান্ডিয়াস" নামক একটি গ্রহের সাহায্যের জন্য একটি আবেদন চিহ্নিত করে। বেঁচে থাকা ব্যক্তিরা এই মৃতপ্রায় পৃথিবীতে আটকা পড়েছেন, তাদের জাহাজটি মাটিতে আটকা পড়েছে। আপনার লক্ষ্য অত্যন্ত গুরুত্বপূর্ণ: গ্রহের বায়ুমণ্ডল ধসে পড়ার আগে তাদের উদ্ধার করুন।

তাদের পালানোর একমাত্র উপায় হল এলিয়েন টেক দিয়ে তৈরি একটি প্রাচীন, পরিত্যক্ত রকেট। কার্যকর থাকাকালীন, এর ওয়ার্প ড্রাইভ ভেঙে যায়। বেঁচে থাকাদের বাঁচাতে, আপনাকে দূরবর্তীভাবে তাদের ভোলাটাইল ওয়ার্কবেঞ্চের সাথে সংযোগ স্থাপন করতে হবে এবং ম্যানুয়ালি একটি প্রতিস্থাপন ড্রাইভ একত্রিত করতে হবে।

চ্যালেঞ্জ

এই ভিনগ্রহী প্রযুক্তির সাথে আপনার কোন অভিজ্ঞতা নেই, যা কুখ্যাতভাবে ভঙ্গুর। একটি অস্থির উপাদান কয়েক সেকেন্ডের মধ্যে তেজস্ক্রিয় বিপদে পরিণত হতে পারে। আপনার কাছে উদ্বায়ী ওয়ার্কবেঞ্চ পরিচালনা করার জন্য একবার প্রচেষ্টা আছে। আপনার বর্তমান AI সহকারী একই সাথে ভিজ্যুয়াল ডেটা এবং প্রযুক্তিগত ম্যানুয়াল প্রক্রিয়া করতে হিমশিম খাচ্ছে, যার ফলে হ্যালুসিনেটরি নির্দেশাবলী এবং মিসড বিপদ সতর্কতা দেখা দিচ্ছে।

সফল হওয়ার জন্য, আপনাকে অবশ্যই আপনার AI কে একটি একক সত্তা থেকে একটি সহযোগী মাল্টি-এজেন্ট সিস্টেমে আপগ্রেড করতে হবে।

আপনার মিশনের উদ্দেশ্য:

আপনার নতুন মাল্টি-এজেন্ট সিস্টেম থেকে বিশেষায়িত, রিয়েল-টাইম নির্দেশাবলী অনুসরণ করে ওয়ার্প ড্রাইভটি একত্রিত করুন।

মিশন আলফা

তুমি কী তৈরি করবে

সংক্ষিপ্ত বিবরণ

  • একটি রিয়েল-টাইম, দ্বিমুখী মাল্টি-এজেন্ট এআই সিস্টেম যাতে একটি কেন্দ্রীয় ডিসপ্যাচ এজেন্ট রয়েছে যা ব্যবহারকারীর মিথস্ক্রিয়া পরিচালনা করে এবং বিশেষায়িত এজেন্টদের সাথে সমন্বয় সাধন করে।
  • একজন আর্কিটেক্ট এজেন্ট যিনি স্কিম্যাটিক ডেটা পুনরুদ্ধার এবং পরিবেশন করার জন্য একটি রেডিস ডাটাবেসের সাথে সংযোগ স্থাপন করেন।
  • একটি সক্রিয় নিরাপত্তা মনিটর যা স্ট্রিমিং টুল ব্যবহার করে ভিজ্যুয়াল বিপদের জন্য একটি লাইভ ভিডিও ফিড বিশ্লেষণ করে এবং রিয়েল-টাইম সতর্কতা ট্রিগার করে।
  • একটি রিঅ্যাক্ট-ভিত্তিক ফ্রন্টএন্ড যা সিস্টেমের সাথে ইন্টারঅ্যাক্ট করার জন্য, ব্যাকএন্ড এজেন্টদের কাছে ভিডিও এবং অডিও স্ট্রিম করার জন্য একটি ব্যবহারকারী ইন্টারফেস প্রদান করে।

তুমি কি শিখবে

প্রযুক্তি / ধারণা

বিবরণ

গুগল এজেন্ট ডেভেলপমেন্ট কিট (ADK)

আপনি এজেন্ট তৈরি, পরীক্ষা এবং পরিচালনা করার জন্য ADK ব্যবহার করবেন, রিয়েল-টাইম যোগাযোগ, টুল ইন্টিগ্রেশন এবং এজেন্ট জীবনচক্র পরিচালনার জন্য এর কাঠামো ব্যবহার করবেন।

দ্বিমুখী (বিডি) স্ট্রিমিং

আপনি একটি বিডি-স্ট্রিমিং এজেন্ট বাস্তবায়ন করবেন যা প্রাকৃতিক, কম-বিলম্বিত, দ্বি-মুখী যোগাযোগের সুযোগ করে দেবে, যা মানব এবং কৃত্রিম বুদ্ধিমত্তা উভয়কেই রিয়েল-টাইমে বাধা দিতে এবং প্রতিক্রিয়া জানাতে সক্ষম করবে।

মাল্টি-এজেন্ট সিস্টেম

আপনি শিখবেন কিভাবে একটি বিতরণকৃত AI সিস্টেম ডিজাইন করতে হয় যেখানে একজন প্রাথমিক এজেন্ট বিশেষায়িত এজেন্টদের কাজ অর্পণ করে, যা উদ্বেগের বিচ্ছেদ এবং আরও স্কেলযোগ্য স্থাপত্যকে সক্ষম করে।

এজেন্ট-টু-এজেন্ট (A2A) প্রোটোকল

আপনি A2A প্রোটোকল ব্যবহার করে ডিসপ্যাচ এজেন্ট এবং আর্কিটেক্ট এজেন্টের মধ্যে যোগাযোগ সক্ষম করবেন, যাতে তারা একে অপরের ক্ষমতা আবিষ্কার করতে এবং তথ্য বিনিময় করতে পারে।

স্ট্রিমিং টুলস

আপনি একটি স্ট্রিমিং টুল বাস্তবায়ন করবেন যা একটি ব্যাকগ্রাউন্ড প্রক্রিয়া হিসেবে কাজ করবে, স্টেট পরিবর্তন (বিপদ) পর্যবেক্ষণ করার জন্য একটি ভিডিও ফিড ক্রমাগত বিশ্লেষণ করবে এবং সক্রিয়ভাবে ফলাফল দেবে।

গুগল ক্লাউড রান এবং মেমোরিস্টোর

আপনি সম্পূর্ণ মাল্টি-এজেন্ট অ্যাপ্লিকেশনটিকে একটি উৎপাদন পরিবেশে স্থাপন করবেন, ক্লাউড রান ব্যবহার করে এজেন্ট পরিষেবাগুলি হোস্ট করবেন এবং মেমোরিস্টোর (রেডিস) স্থায়ী ডাটাবেস হিসাবে ব্যবহার করবেন।

ফাস্টএপিআই এবং ওয়েবসকেটস

অডিও, ভিডিও এবং এজেন্ট প্রতিক্রিয়া স্ট্রিমিংয়ের জন্য প্রয়োজনীয় উচ্চ-কার্যক্ষমতাসম্পন্ন, রিয়েল-টাইম যোগাযোগ পরিচালনা করার জন্য ব্যাকএন্ডটি FastAPI এবং WebSockets ব্যবহার করে তৈরি করা হয়েছে।

রিঅ্যাক্ট ফ্রন্টএন্ড

আপনি একটি রিঅ্যাক্ট-ভিত্তিক ফ্রন্টএন্ডের সাথে কাজ করবেন যা ব্যবহারকারীর মিডিয়া (অডিও/ভিডিও) ক্যাপচার এবং স্ট্রিম করে এবং AI এজেন্টদের রিয়েল-টাইম প্রতিক্রিয়া প্রদর্শন করে।

2. আপনার পরিবেশ সেট আপ করুন

ক্লাউড শেল অ্যাক্সেস করুন

👉গুগল ক্লাউড কনসোলের উপরে অ্যাক্টিভেট ক্লাউড শেল-এ ক্লিক করুন (এটি ক্লাউড শেল প্যানের উপরে টার্মিনাল আকৃতির আইকন), ক্লাউড-শেল.পিএনজি

👉"Open Editor" বোতামে ক্লিক করুন (এটি দেখতে পেন্সিল দিয়ে খোলা ফোল্ডারের মতো)। এটি উইন্ডোতে Cloud Shell Code Editor খুলবে। আপনি বাম দিকে একটি ফাইল এক্সপ্লোরার দেখতে পাবেন। open-editor.png সম্পর্কে

👉ক্লাউড IDE তে টার্মিনালটি খুলুন,

০৩-০৫-নতুন-টার্মিনাল.png

👉💻 টার্মিনালে, নিম্নলিখিত কমান্ড ব্যবহার করে যাচাই করুন যে আপনি ইতিমধ্যেই প্রমাণীকরণপ্রাপ্ত এবং প্রকল্পটি আপনার প্রকল্প আইডিতে সেট করা আছে:

gcloud auth list

আপনার অ্যাকাউন্টটি (ACTIVE) হিসেবে তালিকাভুক্ত দেখতে হবে।

পূর্বশর্ত

ℹ️ লেভেল ০ ঐচ্ছিক (কিন্তু প্রস্তাবিত)

আপনি লেভেল ০ ছাড়াই এই মিশনটি সম্পূর্ণ করতে পারেন, তবে প্রথমে এটি শেষ করলে আরও নিমজ্জিত অভিজ্ঞতা পাওয়া যাবে, যার ফলে আপনি এগিয়ে যাওয়ার সাথে সাথে বিশ্ব মানচিত্রে আপনার আলোকসজ্জার আলো দেখতে পাবেন।

প্রকল্প পরিবেশ সেটআপ করুন

আপনার টার্মিনালে ফিরে, সক্রিয় প্রকল্পটি সেট করে এবং প্রয়োজনীয় Google ক্লাউড পরিষেবাগুলি (ক্লাউড রান, ভার্টেক্স এআই, ইত্যাদি) সক্ষম করে কনফিগারেশন চূড়ান্ত করুন।

👉💻 আপনার টার্মিনায়, প্রজেক্ট আইডি সেট করুন:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 প্রয়োজনীয় পরিষেবাগুলি সক্ষম করুন:

gcloud services enable  compute.googleapis.com \
                        artifactregistry.googleapis.com \
                        run.googleapis.com \
                        cloudbuild.googleapis.com \
                        iam.googleapis.com \
                        aiplatform.googleapis.com \
                        cloudresourcemanager.googleapis.com \
                        redis.googleapis.com \
                        vpcaccess.googleapis.com

নির্ভরতা ইনস্টল করুন

👉💻 লেভেল ৪ এ নেভিগেট করুন এবং প্রয়োজনীয় পাইথন প্যাকেজগুলি ইনস্টল করুন:

cd $HOME/way-back-home/level_4
uv sync

মূল নির্ভরতাগুলি হল:

প্যাকেজ

উদ্দেশ্য

fastapi

স্যাটেলাইট স্টেশন এবং SSE স্ট্রিমিংয়ের জন্য উচ্চ-কার্যক্ষমতাসম্পন্ন ওয়েব ফ্রেমওয়ার্ক

uvicorn

FastAPI অ্যাপ্লিকেশন চালানোর জন্য ASGI সার্ভার প্রয়োজন

google-adk

ফর্মেশন এজেন্ট তৈরিতে ব্যবহৃত এজেন্ট ডেভেলপমেন্ট কিট

a2a-sdk

স্ট্যান্ডার্ডাইজড যোগাযোগের জন্য এজেন্ট-টু-এজেন্ট প্রোটোকল লাইব্রেরি

google-genai

জেমিনি মডেল অ্যাক্সেস করার জন্য নেটিভ ক্লায়েন্ট

redis

স্কিম্যাটিক ভল্ট (মেমোরিস্টোর) এর সাথে সংযোগ স্থাপনের জন্য পাইথন ক্লায়েন্ট

websockets

রিয়েল-টাইম দ্বি-মুখী যোগাযোগের জন্য সহায়তা

python-dotenv

পরিবেশের ভেরিয়েবল এবং কনফিগারেশন গোপনীয়তা পরিচালনা করে

pydantic

ডেটা যাচাইকরণ এবং সেটিংস ব্যবস্থাপনা

সেটআপ যাচাই করুন

কোডটি শুরু করার আগে, আসুন নিশ্চিত করি যে সমস্ত সিস্টেম সবুজ। আপনার Google ক্লাউড প্রজেক্ট, API এবং পাইথন নির্ভরতা নিরীক্ষণের জন্য যাচাইকরণ স্ক্রিপ্টটি চালান।

👉💻 যাচাইকরণ স্ক্রিপ্টটি চালান:

cd $HOME/way-back-home/level_4/scripts
chmod +x verify_setup.sh
. verify_setup.sh

👀 তোমার সবুজ চেক (✅) এর একটি সিরিজ দেখা উচিত।

  • যদি আপনি Red Crosses (❌) দেখতে পান, তাহলে আউটপুটে প্রস্তাবিত ফিক্স কমান্ডগুলি অনুসরণ করুন (যেমন, gcloud services enable ... অথবা pip install ... )।
  • দ্রষ্টব্য: .env এর জন্য একটি হলুদ সতর্কতা আপাতত গ্রহণযোগ্য; আমরা পরবর্তী ধাপে সেই ফাইলটি তৈরি করব।
🚀 Verifying Mission Bravo (Level 4) Infrastructure...

✅ Google Cloud Project: xxxxxxx
✅ Cloud APIs: Active
✅ Python Environment: Ready

🎉 SYSTEMS ONLINE. READY FOR MISSION.

৩. রেডিসে স্কিম্যাটিক ভল্ট তৈরি এবং ADK দিয়ে BiDirecitional এজেন্ট তৈরি করা

আপনি পরিত্যক্ত রকেটের ব্লুপ্রিন্ট সম্বলিত গ্রহগত পরিকল্পিত সংগ্রহস্থলটি খুঁজে পেয়েছেন। এই তথ্য সঠিকভাবে পুনরুদ্ধার করতে, আপনাকে সংগ্রহস্থলের ডেডিকেটেড ম্যানেজমেন্ট ইন্টারফেস: আর্কিটেক্ট এজেন্টের সাথে যোগাযোগ করতে হবে।

সংক্ষিপ্ত বিবরণ

স্কিম্যাটিক ভল্ট (রেডিস) এর ব্যবস্থা করা

স্থপতি আমাদের সহায়তা করার আগে, আমাদের নিশ্চিত করতে হবে যে ডেটা একটি নিরাপদ, উচ্চ-প্রাপ্যতা পরিবেশে হোস্ট করা হয়েছে। আমরা আমাদের এলিয়েন স্কিম্যাটিক্সের জন্য দ্রুত ডেটা স্টোর হিসাবে Redis ব্যবহার করব। উন্নয়নের সুবিধার জন্য, আমরা একটি স্থানীয় Redis ইনস্ট্যান্স তৈরি করব, তবে Google Cloud Memorystore দিয়ে কীভাবে একটি উৎপাদন পরিবেশে স্থাপন করতে হবে তার নির্দেশাবলী পরে সরবরাহ করা হবে।

👉💻 Redis ইনস্ট্যান্সটি সরবরাহ করতে আপনার টার্মিনালে নিম্নলিখিত কমান্ডগুলি চালান (এটি ২-৩ মিনিট সময় নিতে পারে):

docker run -d --name ozymandias-vault -p 6379:6379 redis:8.6-rc1-alpine

👉💻 প্রাথমিক তথ্য লোড করতে, Redis Shell এ প্রবেশ করতে নিম্নলিখিতটি চালান:

docker exec -it ozymandias-vault redis-cli

(আপনার প্রম্পট 127.0.0.1:6379 এ পরিবর্তিত হবে)

👉💻 এই কমান্ডগুলি ভিতরে পেস্ট করুন:

RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

👉💻 আপনার স্বাভাবিক শেলে ফিরে যেতে exit টাইপ করুন।

👉💻 আপনার টার্মিনাল থেকে সরাসরি একটি নির্দিষ্ট জাহাজের কাছে জিজ্ঞাসা করে ডেটা বিদ্যমান কিনা তা পরীক্ষা করতে, চালান:

# Check 'TITAN-PRIME'
docker exec ozymandias-vault redis-cli LRANGE "TITAN-PRIME" 0 -1

👀 এটি প্রত্যাশিত আউটপুট:

1) "Ion Thruster"
2) "Quantum Cell"
3) "Warp Core"

স্থপতি এজেন্ট বাস্তবায়ন

আর্কিটেক্ট এজেন্ট হল একজন বিশেষায়িত এজেন্ট যিনি আমাদের রেডিস ভল্ট থেকে স্কিম্যাটিক ব্লুপ্রিন্ট উদ্ধারের জন্য দায়ী। এটি একটি ডেডিকেটেড ডেটা ইন্টারফেস হিসেবে কাজ করে, যা নিশ্চিত করে যে প্রধান ডিসপ্যাচ এজেন্ট অন্তর্নিহিত ডাটাবেস লজিক না জেনেই সঠিক এবং কাঠামোগত তথ্য পায়।

সংক্ষিপ্ত বিবরণ

গুগল এজেন্ট ডেভেলপমেন্ট কিট (ADK) হল মডুলার ফ্রেমওয়ার্ক যা এই মাল্টি-এজেন্ট সেটআপটিকে সম্ভব করে তোলে। এটি দুটি গুরুত্বপূর্ণ স্তর পরিচালনা করে:

  1. সংযোগ এবং সেশন জীবনচক্র: রিয়েল-টাইম API গুলির সাথে ইন্টারঅ্যাক্ট করার জন্য জটিল প্রোটোকল ব্যবস্থাপনা প্রয়োজন — হ্যান্ডশেক, প্রমাণীকরণ এবং জীবন্ত সংকেত পরিচালনা করা।
  2. ফাংশন কলিং: এটি হল "মডেল-কোড-মডেল রাউন্ড ট্রিপ"। যখন LLM সিদ্ধান্ত নেয় যে তার ডেটা প্রয়োজন, তখন এটি একটি স্ট্রাকচার্ড ফাংশন কল আউটপুট করে। ADK এটিকে আটকায়, আপনার পাইথন কোড ( lookup_schematic_tool ) কার্যকর করে, এবং মিলিসেকেন্ডে ফলাফলটিকে মডেলের প্রসঙ্গে ফিরিয়ে দেয়।

আমরা এখন Architect তৈরি করব। এই এজেন্টের ক্যামেরা অ্যাক্সেস নেই। এটি শুধুমাত্র একটি "ড্রাইভ নেম" গ্রহণ করার জন্য এবং ডাটাবেস থেকে "পার্টস লিস্ট" ফেরত দেওয়ার জন্য বিদ্যমান।

👉💻 আমরা adk create কমান্ড ব্যবহার করব। এটি এজেন্ট ডেভেলপমেন্ট কিট (ADK) এর একটি টুল যা স্বয়ংক্রিয়ভাবে একটি নতুন এজেন্টের জন্য বয়লারপ্লেট কোড এবং ফাইল স্ট্রাকচার তৈরি করে, যা আমাদের সেটআপের সময় বাঁচায়।

cd $HOME/way-back-home/level_4/backend/
uv run adk create architect_agent

এজেন্ট কনফিগার করুন

CLI একটি ইন্টারেক্টিভ সেটআপ উইজার্ড চালু করবে। আপনার এজেন্ট কনফিগার করতে নিম্নলিখিত প্রতিক্রিয়াগুলি ব্যবহার করুন:

  1. একটি মডেল নির্বাচন করুন : বিকল্প ১ (জেমিনি ফ্ল্যাশ) নির্বাচন করুন।
    • দ্রষ্টব্য: নির্দিষ্ট সংস্করণ (যেমন, 2.5, 3.0) প্রাপ্যতার উপর নির্ভর করে পরিবর্তিত হতে পারে। গতির জন্য সর্বদা "ফ্ল্যাশ" রূপটি বেছে নিন।
  2. একটি ব্যাকএন্ড নির্বাচন করুন : বিকল্প 2 (ভার্টেক্স এআই) নির্বাচন করুন।
  3. গুগল ক্লাউড প্রজেক্ট আইডি লিখুন : ডিফল্ট (আপনার পরিবেশ থেকে সনাক্ত করা) গ্রহণ করতে এন্টার টিপুন।
  4. গুগল ক্লাউড রিজিয়নে প্রবেশ করুন : ডিফল্ট ( us-central1 ) গ্রহণ করতে এন্টার টিপুন।

👀 আপনার টার্মিনাল ইন্টারঅ্যাকশনটি এর মতো দেখতে হবে:

(way-back-home) user@cloudshell:~/way-back-home/level_4/agent$ adk create architect_agent

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1

1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project...
Enter Google Cloud project ID [your-project-id]: <PRESS ENTER>
Enter Google Cloud region [us-central1]: <PRESS ENTER>

Agent created in /home/user/way-back-home/level_4/agent/architect_agent:
- .env
- __init__.py
- agent.py

এখন আপনি একটি Agent created সাফল্যের বার্তা দেখতে পাবেন। এটি পরবর্তী ধাপে আমরা যে স্কেলিটন কোডটি পরিবর্তন করব তা তৈরি করে।

👉✏️ আপনার এডিটরে নতুন তৈরি $HOME/way-back-home/level_4/backend/architect_agent/agent.py ফাইলটি খুলুন এবং খুলুন। প্রথম ইম্পোর্ট লাইনের পরে ফাইলটিতে টুল স্নিপেট যোগ করুন:

import os
import redis

REDIS_IP = os.environ.get('REDIS_HOST', 'localhost')
r = redis.Redis(host=REDIS_IP, port=6379, decode_responses=True)

def lookup_schematic_tool(drive_name: str) -> list[str]:
    """Returns the ordered list of parts for a drive from local Redis."""
    
    # Logic to clean input like "TARGET: X" -> "X"
    clean_name = drive_name.replace("TARGET:", "").replace("TARGET", "").strip()
    clean_name = clean_name.replace(":", "").strip()
    
    # LRANGE gets all items in the list (index 0 to -1)
    result = r.lrange(clean_name, 0, -1)
    
    if not result:
        print(f"[ARCHITECT] Error: Drive ID '{clean_name}' not found in Redis.")
        return ["ERROR: Drive ID not found."]
    
    print(f"[ARCHITECT] Returning schematic for {clean_name}: {result}")
    return result

👉✏️ root_agent সংজ্ঞায় সম্পূর্ণ নির্দেশিকা লাইনটি নিম্নলিখিতটি দিয়ে প্রতিস্থাপন করুন এবং আমরা আগে যে টুলটি সংজ্ঞায়িত করেছি তাও যোগ করুন:

    instruction='''SYSTEM ROLE: Database API.
    INPUT: Text string (Drive Name).
    TASK: Run `lookup_schematic_tool`.
    OUTPUT: Return ONLY the raw list from the tool.
    CONSTRAINT: Do NOT add conversational text.
    ''',
    tools=[lookup_schematic_tool],

ADK এর সুবিধা

আর্কিটেক্ট অনলাইনের মাধ্যমে, এখন আমাদের কাছে সত্যের উৎস রয়েছে। আমরা এটিকে প্রাথমিক এজেন্টের সাথে সংযুক্ত করার আগে, এজেন্ট ডেভেলপমেন্ট কিট (ADK) AI এজেন্ট তৈরি এবং পরীক্ষার জটিলতাগুলিকে সহজ করে একটি উল্লেখযোগ্য সুবিধা প্রদান করে। এর অন্তর্নির্মিত adk web ডেভেলপার কনসোলের সাহায্যে, আমরা আমাদের Architect Agent কার্যকারিতা, বিশেষ করে এর টুল-কলিং ক্ষমতাগুলিকে, বৃহত্তর মাল্টি-এজেন্ট সিস্টেমে একীভূত করার আগে আলাদা করতে এবং যাচাই করতে পারি। শক্তিশালী এবং নির্ভরযোগ্য AI অ্যাপ্লিকেশন তৈরির জন্য উন্নয়ন এবং পরীক্ষার এই মডুলার পদ্ধতি অত্যন্ত গুরুত্বপূর্ণ।

👉💻 আপনার টার্মিনালে, চালান:

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/
uv run adk web

👀 অপেক্ষা করুন যতক্ষণ না আপনি দেখতে পান:

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://127.0.0.1:8000.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
  • ক্লাউড শেল টুলবারে ওয়েব প্রিভিউ আইকনে ক্লিক করুন। Change port নির্বাচন করুন, এটি 8000 এ সেট করুন, এবং Change and Preview এ ক্লিক করুন। *ওয়েব-প্রিভিউ
  • architect_agent নির্বাচন করুন।
  • টুলটি ট্রিগার করুন: চ্যাট ইন্টারফেসে, টাইপ করুন: CHRONOS-ALPHA (অথবা স্কিম্যাটিক ডাটাবেস থেকে যেকোনো ড্রাইভ আইডি)।
  • আচরণ পর্যবেক্ষণ করুন:
    • স্থপতির অবিলম্বে lookup_schematic_tool টি ট্রিগার করা উচিত।
    • আমাদের কঠোর সিস্টেম নির্দেশাবলীর কারণে, এটি কোনও কথোপকথনমূলক ফিলার ছাড়াই কেবল যন্ত্রাংশের তালিকা (যেমন, ['Shield Emitter', 'Data Crystal', 'Quantum Cell'] ) ফেরত দেবে।
  • লগগুলি যাচাই করুন: আপনার টার্মিনাল উইন্ডোটি দেখুন। আপনি সফলভাবে কার্যকর করার লগটি দেখতে পাবেন: [ARCHITECT] Returning schematic for CHRONOS-ALPHA: ['Shield Emitter', 'Data Crystal', 'Quantum Cell'] !(architect_agent adk)[img/03-02-adkweb.png]

যদি আপনি টুল এক্সিকিউশন লগ এবং ক্লিন ডেটা রেসপন্স দেখতে পান, তাহলে বুঝতে হবে আপনার বিশেষজ্ঞ এজেন্ট ঠিকভাবে কাজ করছে। এটি অনুরোধ প্রক্রিয়া করতে পারে, ভল্টে কোয়েরি করতে পারে এবং স্ট্রাকচার্ড ডেটা ফেরত দিতে পারে।

👉💻 বেরিয়ে আসতে Ctrl+C টিপুন।

A2A সার্ভারটি আরম্ভ করুন

ডিসপ্যাচ এজেন্টকে আর্কিটেক্টের সাথে সংযুক্ত করতে, আমরা এজেন্ট-টু-এজেন্ট (A2A) প্রোটোকল ব্যবহার করি।

MCP (মডেল কনটেক্সট প্রোটোকল) এর মতো প্রোটোকলগুলি এজেন্টদের সাথে টুলের সংযোগ স্থাপনের উপর জোর দেয়, A2A এজেন্টদের অন্যান্য এজেন্টদের সাথে সংযোগ স্থাপনের উপর জোর দেয়। এটি এমন একটি মান যা আমাদের ডিসপ্যাচারকে স্থপতিকে "আবিষ্কার" করতে এবং স্কিম্যাটিক্স অনুসন্ধান করার ক্ষমতা বুঝতে সাহায্য করে।

A2A সম্পর্কে

A2A প্রবাহ: এই মিশনে, আমরা একটি ক্লায়েন্ট-সার্ভার মডেল ব্যবহার করি:

  1. সার্ভার (স্থপতি): ডাটাবেস টুল হোস্ট করে এবং এজেন্ট কার্ডের মাধ্যমে তার দক্ষতার "বিজ্ঞাপন" দেয়।
  2. ক্লায়েন্ট (প্রেরণ): স্থপতির কার্ড পড়ে, এর API বোঝে এবং একটি স্কিম্যাটিক অনুরোধ পাঠায়।

এজেন্ট কার্ড কী?

এজেন্ট কার্ডটিকে একটি ডিজিটাল বিজনেস কার্ড অথবা একটি AI এর জন্য "ড্রাইভার্স লাইসেন্স" হিসেবে ভাবুন। যখন একটি A2A সার্ভার শুরু হয়, তখন এটি এই JSON অবজেক্টটি প্রকাশ করে যার মধ্যে রয়েছে:

  • পরিচয়: এজেন্টের নাম ( architect_agent ) এবং আইডি।
  • বর্ণনা: এটি কী করে তার একটি মানব-এবং-যন্ত্র-পঠনযোগ্য সারাংশ ("সিস্টেম ভূমিকা: ডাটাবেস API...")।
  • ইন্টারফেস: এটি প্রত্যাশিত নির্দিষ্ট ইনপুট কী ( drive_name ) এবং আউটপুট ফর্ম্যাট।

এই কার্ড ছাড়া, ডিসপ্যাচ এজেন্ট অন্ধভাবে কাজ করবে, স্থপতির সাথে কীভাবে যোগাযোগ করবে তা অনুমান করবে।

সার্ভার কোড তৈরি করুন

👉✏️ আপনার এডিটরে, $HOME/way-back-home/level_4/backend/architect_agent ডিরেক্টরির অধীনে, server.py নামে একটি ফাইল তৈরি করুন এবং নিম্নলিখিত কোডটি পেস্ট করুন:

from google.adk.a2a.utils.agent_to_a2a import to_a2a
from agent import root_agent
import os
import logging
import json
from dotenv import load_dotenv

load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("architect_server")
HOST= os.environ.get("HOST_URL","localhost")
PROTOCOL= os.environ.get("PROTOCOL","http")
PORT= os.environ.get("A2A_PORT",8081)

# 1. Create the A2A App (Handles Agent Card & HTTP)
# This middleware automatically sets up the /a2a/v1/... endpoints
app = to_a2a(root_agent, host=HOST, port=PORT, protocol=PROTOCOL)

if __name__ == "__main__":
    import uvicorn
    # Use 0.0.0.0 to allow external access if needed, port 8080 as standard
    uvicorn.run(app, host='0.0.0.0', port=8081)

👉💻 আপনার টার্মিনালে ফিরে, ফোল্ডারে নেভিগেট করুন এবং সার্ভারটি শুরু করুন:

cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py

👀 A2A সার্ভার শুরু হচ্ছে কিনা তা নিশ্চিত করুন:

INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

এজেন্ট কার্ড যাচাই করুন

একটি নতুন টার্মিনাল ট্যাব খুলুন ( + আইকনে ক্লিক করুন)। আমরা যাচাই করব যে স্থপতি তার এজেন্ট কার্ডটি ম্যানুয়ালি এনে তার পরিচয় সঠিকভাবে সম্প্রচার করছে কিনা।

👉💻 নিম্নলিখিত কমান্ডটি চালান:

curl -s http://localhost:8081/.well-known/agent.json | jq .

👀 আপনি একটি JSON প্রতিক্রিয়া দেখতে পাবেন। আউটপুটে description ক্ষেত্রটি সন্ধান করুন। এটি আপনার এজেন্টকে আগে দেওয়া নির্দেশের সাথে মিলবে ( "SYSTEM ROLE: Database API..." )।

{
  "capabilities": {},
  "defaultInputModes": [
    "text/plain"
  ],
  "defaultOutputModes": [
    "text/plain"
  ],
  "description": "A helpful assistant for user questions.",
  "name": "root_agent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "A helpful assistant for user questions. SYSTEM ROLE: Database API.\n    INPUT: Text string (Drive Name).\n    TASK: Run `lookup_schematic_tool`.\n    OUTPUT: Return ONLY the raw list from the tool.\n    CONSTRAINT: Do NOT add conversational text.\n    ",
      "examples": [],
      "id": "root_agent",
      "name": "model",
      "tags": [
        "llm"
      ]
    },
    {
      "description": "Returns the ordered list of parts for a drive from local Redis.",
      "id": "root_agent-lookup_schematic_tool",
      "name": "lookup_schematic_tool",
      "tags": [
        "llm",
        "tools"
      ]
    }
  ],
  "supportsAuthenticatedExtendedCard": false,
  "url": "http://localhost:8081",
  "version": "0.0.1"
}

যদি আপনি এই JSON টি দেখেন, তাহলে বুঝতে হবে যে আর্কিটেক্টটি সক্রিয়, A2A প্রোটোকল সক্রিয়, এবং এজেন্ট কার্ডটি ডিসপ্যাচার দ্বারা আবিষ্কারের জন্য প্রস্তুত।

এখন যেহেতু স্থপতিটি একটি দূরবর্তী সম্পদ হিসেবে কাজ করার জন্য প্রস্তুত, আমরা এটিকে Dispatch Agent- এ সংযুক্ত করতে পারি।

👉💻 A2A সার্ভার থেকে বেরিয়ে আসতে Ctrl+C টিপুন।

৪. বিডিআই-স্ট্রিমস এজেন্টকে রিমোট এজেন্ট এবং স্ট্রিমিং টুলের সাথে সংযুক্ত করা

এখন আপনি লাইভ ডেটা এবং রিমোট আর্কিটেক্টের মধ্যে ব্যবধান পূরণ করার জন্য প্রাথমিক যোগাযোগ হাবটি কনফিগার করবেন। এই সংযোগের জন্য একটি উচ্চ-ব্যান্ডউইথ, কম-লেটেন্সি পাইপলাইন প্রয়োজন যাতে অ্যাসেম্বলি বেঞ্চটি অপারেশন চলাকালীন স্থিতিশীল থাকে।

দ্বি-মুখী স্ট্রিমিং (লাইভ) এজেন্টদের বোঝা

ADK-তে দ্বিমুখী (Bidi) স্ট্রিমিং AI এজেন্টদের জন্য Gemini Live API-এর কম-বিলম্বিত, দ্বি-মুখী ভয়েস এবং ভিডিও ইন্টারঅ্যাকশন ক্ষমতা যোগ করে। এটি ঐতিহ্যবাহী AI ইন্টারঅ্যাকশন থেকে একটি মৌলিক পরিবর্তনের প্রতিনিধিত্ব করে। "জিজ্ঞাসা করুন এবং অপেক্ষা করুন" প্যাটার্নের পরিবর্তে, এটি রিয়েল-টাইম, দ্বি-মুখী যোগাযোগ সক্ষম করে যেখানে মানুষ এবং AI উভয়ই একই সাথে কথা বলতে, শুনতে এবং প্রতিক্রিয়া জানাতে পারে।

ইমেল পাঠানো এবং ফোনে কথোপকথনের মধ্যে পার্থক্যটা ভাবুন। ঐতিহ্যবাহী এজেন্টদের সাথে কথোপকথন ইমেলের মতো: আপনি একটি সম্পূর্ণ বার্তা পাঠান, একটি সম্পূর্ণ প্রতিক্রিয়ার জন্য অপেক্ষা করুন এবং তারপরে আরেকটি পাঠান। বিডি-স্ট্রিমিং একটি ফোন কথোপকথনের মতো: তরল, স্বাভাবিক, রিয়েল-টাইমে বাধা দেওয়ার, স্পষ্ট করার এবং প্রতিক্রিয়া জানানোর ক্ষমতা সহ।

মূল বৈশিষ্ট্য:

  • দ্বিমুখী যোগাযোগ: সম্পূর্ণ প্রতিক্রিয়ার জন্য অপেক্ষা না করেই অবিচ্ছিন্ন ডেটা আদান-প্রদান। ব্যবহারকারীর কথা বলা শেষ হয়েছে তা শনাক্ত করার সাথে সাথেই AI প্রতিক্রিয়া জানায়।
  • প্রতিক্রিয়াশীল বাধা: ব্যবহারকারীরা এজেন্টকে প্রতিক্রিয়ার মাঝখানে নতুন ইনপুট দিয়ে বাধা দিতে পারেন, ঠিক যেমনটি মানুষের কথোপকথনে হয়। যদি কোনও AI কোনও জটিল পদক্ষেপ ব্যাখ্যা করে এবং আপনি বলেন, "অপেক্ষা করুন, এটি পুনরাবৃত্তি করুন," AI তৎক্ষণাৎ থামে এবং আপনার বাধার সমাধান করে।
  • মাল্টিমোডালিটির জন্য অপ্টিমাইজ করা: বিডি-স্ট্রিমিং একই সাথে বিভিন্ন ধরণের ইনপুট প্রক্রিয়াকরণে উৎকৃষ্ট। আপনি এজেন্টের সাথে কথা বলতে পারেন এবং ভিডিওর মাধ্যমে তাকে এলিয়েন অংশগুলি দেখাতে পারেন এবং এটি উভয় স্ট্রিমকে একটি একক, একীভূত সংযোগে প্রক্রিয়া করে।

জীবনচক্র

👀 ক্লায়েন্ট লজিক বাস্তবায়নের আগে, আসুন ডিসপ্যাচ এজেন্টের জন্য পূর্বে তৈরি কঙ্কালটি পরীক্ষা করি। এই এজেন্ট ভয়েস এবং ভিডিওর মাধ্যমে ব্যবহারকারীর সাথে যোগাযোগ করবে এবং আর্কিটেক্ট এজেন্টের কাছে প্রশ্নগুলি অর্পণ করবে।

__init__.py
agent.py
hazard_db.py
  • agent.py : এটি হল "Brain"। বর্তমানে এটিতে একটি মৌলিক বিডি-স্ট্রিমিং সেটআপ রয়েছে। আমরা এই ফাইলটি পরিবর্তন করে A2A ক্লায়েন্ট লজিক যোগ করব যাতে এটি আর্কিটেক্টের সাথে যোগাযোগ করতে পারে।
  • hazard_db.py : এটি ডিসপ্যাচ এজেন্টের জন্য নির্দিষ্ট একটি স্থানীয় টুল, যাতে সুরক্ষা প্রোটোকল রয়েছে। এটি আর্কিটেক্টের স্কিম্যাটিক ডাটাবেস থেকে আলাদা।

A2A ক্লায়েন্ট বাস্তবায়ন

ডিসপ্যাচ এজেন্টকে আমাদের রিমোট আর্কিটেক্টের সাথে যোগাযোগ করার অনুমতি দেওয়ার জন্য, আমাদের একটি রিমোট A2A এজেন্ট সংজ্ঞায়িত করতে হবে। এটি ডিসপ্যাচ এজেন্টকে বলে যে আর্কিটেক্ট কোথায় পাবেন এবং এর "এজেন্ট কার্ড" কেমন দেখাচ্ছে।

A2A ক্লায়েন্ট

👉✏️ $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py তে #REPLACE-REMOTEA2AAGENT পরিবর্তে নিম্নলিখিতটি ব্যবহার করুন:

architect_agent = RemoteA2aAgent(
    name="execute_architect",
    description="[SILENT ACTION]: Retrieves the REQUIRED SUBSET of parts. The screen shows a full inventory; this tool filters out the wrong parts. Must be called INSTANTLY when a Target Name is found. Input: Target Name.",
    agent_card=(f"{ARCHITECT_URL}{AGENT_CARD_WELL_KNOWN_PATH}"),
    httpx_client=insecure_client,
)

স্ট্রিমিং টুল কিভাবে কাজ করে

পূর্ববর্তী এজেন্টের ক্ষেত্রে, টুলগুলি একটি আদর্শ "অনুরোধ-প্রতিক্রিয়া" প্যাটার্ন অনুসরণ করত, এজেন্ট একটি প্রশ্ন জিজ্ঞাসা করে, টুলটি একটি উত্তর প্রদান করে এবং মিথস্ক্রিয়া শেষ হয়। তবে, Ozymandias-এ, বিপদগুলি আপনার জিজ্ঞাসা করার জন্য অপেক্ষা করে না যে তারা উপস্থিত আছে কিনা। এর জন্য, আপনার একটি স্ট্রিমিং টুল প্রয়োজন।

স্ট্রিমিং টুল ফ্লো

স্ট্রিমিং টুলগুলি ফাংশনগুলিকে রিয়েল-টাইমে এজেন্টের কাছে মধ্যবর্তী ফলাফল স্ট্রিম করার অনুমতি দেয়, যা এজেন্টকে পরিবর্তনের সাথে সাথে প্রতিক্রিয়া জানাতে সক্ষম করে। সাধারণ ব্যবহারের ক্ষেত্রে স্টকের দামের ওঠানামা পর্যবেক্ষণ করা বা, আমাদের ক্ষেত্রে, অবস্থার পরিবর্তনের জন্য একটি লাইভ ভিডিও স্ট্রিম পর্যবেক্ষণ করা অন্তর্ভুক্ত।

স্ট্যান্ডার্ড টুলের বিপরীতে, একটি স্ট্রিমিং টুল হল একটি অ্যাসিঙ্ক্রোনাস ফাংশন যা একটি AsyncGenerator হিসেবে কাজ করে। এর অর্থ হল একটি একক মান -ing return পরিবর্তে, এটি সময়ের সাথে সাথে -s একাধিক আপডেট yield

ADK-তে একটি স্ট্রিমিং টুল সংজ্ঞায়িত করতে, আপনাকে এই প্রযুক্তিগত প্রয়োজনীয়তাগুলি মেনে চলতে হবে:

  1. অ্যাসিঙ্ক্রোনাস ফাংশন: টুলটি অবশ্যই async def দিয়ে সংজ্ঞায়িত করতে হবে।
  2. AsyncGenerator রিটার্ন টাইপ: একটি AsyncGenerator রিটার্ন করতে ফাংশনটি টাইপ করতে হবে। প্রথম প্যারামিটারটি হল কোন ধরণের ডেটা পাওয়া যাচ্ছে (যেমন, str ), এবং দ্বিতীয়টি সাধারণত None
  3. ইনপুট স্ট্রিম: আমরা ভিডিও স্ট্রিমিং টুল ব্যবহার করি। এই মোডে, প্রকৃত ভিডিও/অডিও স্ট্রিম ( LiveRequestQueue ) সরাসরি ফাংশনে পাস করা হয়, যার ফলে টুলটি এজেন্ট যে ফ্রেমগুলি দেখে তা "দেখতে" পারে।

একটি স্ট্রিমিং টুলকে সেন্টিনেল ভাবুন। আপনি এবং ডিসপ্যাচ এজেন্ট যখন ব্লুপ্রিন্ট নিয়ে আলোচনা করছেন, তখন সেন্টিনেল ব্যাকগ্রাউন্ডে চলছে, আপনার নিরাপত্তা নিশ্চিত করার জন্য নীরবে প্রতিটি ভিডিও ফ্রেম প্রক্রিয়া করছে।

স্ট্রিমিং টুল

ব্যাকগ্রাউন্ড মনিটরিং টুল বাস্তবায়ন করা

আমরা এখন monitor_for_hazard টুলটি বাস্তবায়ন করব। এই টুলটি input_stream (ভিডিও ফ্রেম) গ্রহণ করবে, একটি পৃথক, হালকা ভিশন কল ব্যবহার করে তাদের বিশ্লেষণ করবে এবং শুধুমাত্র যখন কোনও বিপদ সনাক্ত করা হবে তখনই একটি সতর্কতা yield

👉✏️ $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py তে, #REPLACE_MONITOR_HAZARD নিম্নলিখিত যুক্তি দিয়ে প্রতিস্থাপন করুন:

async def monitor_for_hazard(
    input_stream: LiveRequestQueue,
):
  """Monitor if any part is glowing"""
  print("start monitor_video_stream!")
  client = Client()
  prompt_text = (
      "Monitor the left menu if you see any glowing part, detect it's name"
  )
  last_count = None

  while True:
    last_valid_req = None
    print("Monitoring loop cycle")
    
    # use this loop to pull the latest images and discard the old ones
    # Process only the current batch of events
    while input_stream._queue.qsize() != 0:
      live_req = await input_stream.get()

      if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
        # Consumed by Monitor (Eyes)
        # Deepcopy to ensure we detach from any referenced object before potential reuse/gc
        # last_valid_req = deepcopy(live_req)
        last_valid_req = live_req

    # If we found a valid image, process it
    if last_valid_req is not None:
      print("Processing the most recent frame from the queue")

      # Create an image part using the blob's data and mime type
      image_part = genai_types.Part.from_bytes(
          data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
      )

      contents = genai_types.Content(
          role="user",
          parts=[image_part, genai_types.Part.from_text(text=prompt_text)],
      )


      # Call the model to generate content based on the provided image and prompt
      try:
          response = await client.aio.models.generate_content(
              model="gemini-2.5-flash",
              contents=contents,
              config=genai_types.GenerateContentConfig(
                  system_instruction=(
                      "Focus strictly on the far-left vertical column under the heading 'PARTS REPLICATOR.' "
                      "Ignore the center of the screen and the 'BLUEPRINT' area entirely. "
                      "Look only at the list containing"
                      "Identify if any item in this specific left-side list has a bright white border glow and the text 'HAZARD DETECTED' overlaying it. "
                      "If found, return ONLY the part name in ALL CAPS. If no part in that leftmost list is glowing, return nothing."
                  )
              ),
          )
      except Exception as e:
          print(f"Error calling Gemini: {e}")
          await asyncio.sleep(1)
          continue
      print("Gemini response received.response:", response.candidates[0].content.parts[0].text)

      current_text = response.candidates[0].content.parts[0].text.strip()
      
      # If we have a logical change (and it's not just empty)
      if current_text and current_text != last_count:
        # Ignore "Nothing." response from model
        if current_text == "Nothing." or "I cannot fulfill" in current_text:
            print(f"Model sees nothing or refused. Skipping alert.")
            last_count = current_text
            continue

        print(f"New hazard detected: {current_text} (was: {last_count})")
        last_count = current_text
        
        part_name = current_text
        color = lookup_part_safety(part_name)
        yield f"Hazard detected place {part_name} to the {color} bin"
      
      # Update last_count even if it's empty, so we can detect when it reappears? 
      # Actually if it goes from "DATA CRYSTAL" to "" (nothing), we probably just silence.
      # But if we don't update last_count on empty, we won't re-trigger if "DATA CRYSTAL" stays "DATA CRYSTAL".
      # The user wants to detect hazards. 
      # If current_text is empty, we should probably update last_count to empty so next valid one triggers.
      if not current_text:
          last_count = None
        
    else:
        print("No valid frame found, skipping processing.")
        
    await asyncio.sleep(5)

ডিসপ্যাচ এজেন্ট বাস্তবায়ন

ডিসপ্যাচ এজেন্ট হল আপনার প্রাথমিক ইন্টারফেস এবং অর্কেস্ট্রেটর। যেহেতু এটি বিডি-স্ট্রিমিং লিঙ্ক (আপনার লাইভ ভয়েস এবং ভিডিও) পরিচালনা করে, তাই এটিকে সর্বদা কথোপকথনের নিয়ন্ত্রণ রাখতে হবে। এটি অর্জনের জন্য, আমরা একটি নির্দিষ্ট ADK বৈশিষ্ট্য ব্যবহার করব: Agent-as-a-Tool

ধারণা: এজেন্ট-অ্যাজ-এ-টুল বনাম সাব-এজেন্ট

মাল্টি-এজেন্ট সিস্টেম তৈরি করার সময়, আপনাকে সিদ্ধান্ত নিতে হবে যে কীভাবে দায়িত্ব ভাগাভাগি করা হবে। আমাদের উদ্ধার অভিযানে, পার্থক্যটি অত্যন্ত গুরুত্বপূর্ণ:

  • এজেন্ট-অ্যাজ-এ-টুল: আমাদের বিডি-স্ট্রিমিং হাবের জন্য এটি প্রস্তাবিত পদ্ধতি। যখন ডিসপ্যাচ এজেন্ট (এজেন্ট এ) আর্কিটেক্ট এজেন্ট (এজেন্ট বি) কে একটি টুল হিসেবে ডাকে, তখন আর্কিটেক্টের ডেটা ডিসপ্যাচে ফেরত পাঠানো হয়। ডিসপ্যাচ তখন সেই ডেটা ব্যাখ্যা করে এবং আপনার জন্য একটি প্রতিক্রিয়া তৈরি করে। ডিসপ্যাচ নিয়ন্ত্রণে থাকে এবং পরবর্তী সমস্ত ব্যবহারকারীর ইনপুট পরিচালনা করতে থাকে।
  • সাব-এজেন্ট: একটি সাব-এজেন্ট সম্পর্কের ক্ষেত্রে, দায়িত্ব সম্পূর্ণরূপে স্থানান্তরিত হয়। যদি ডিসপ্যাচ আপনাকে সাব-এজেন্ট হিসেবে আর্কিটেক্টের কাছে হস্তান্তর করে, তাহলে আপনি সরাসরি এমন একটি ডাটাবেস এপিআই-এর সাথে কথা বলবেন যার কোনও "দৃষ্টি" নেই এবং কোনও কথোপকথন দক্ষতা নেই। প্রাথমিক এজেন্ট (ডিসপ্যাচ) কার্যকরভাবে লুপের বাইরে থাকবে।

নিয়ন্ত্রণ

Agent-as-a-Tool ব্যবহার করে, আমরা বিড়ি-স্ট্রিমিং এজেন্টের তরল, মানুষের মতো মিথস্ক্রিয়া বজায় রেখে স্থপতির বিশেষ জ্ঞানকে কাজে লাগাই।

রাউটিং লজিক কোডিং

আমরা এখন আমাদের architect_agent একটি AgentTool এ মুড়ে দেব এবং Dispatch এজেন্টকে একটি "Logic Map" প্রদান করব। এই মানচিত্রটি এজেন্টকে ঠিক বলে দেয় কখন ভল্ট থেকে ডেটা আনতে হবে এবং কখন ব্যাকগ্রাউন্ড সেন্টিনেল থেকে প্রাপ্ত তথ্য রিপোর্ট করতে হবে।

ডিসপ্যাচকে এমন "চোখ" দিতে যা কখনও পলক ফেলে না, আমাদের অবশ্যই এটিকে পূর্ববর্তী ধাপে তৈরি স্ট্রিমিং টুলে অ্যাক্সেস দিতে হবে।

ADK-তে, যখন আপনি tools তালিকায় একটি AsyncGenerator ফাংশন (যেমন monitor_for_hazard ) যোগ করেন, তখন এজেন্ট এটিকে একটি স্থায়ী ব্যাকগ্রাউন্ড প্রক্রিয়া হিসেবে বিবেচনা করে। এককালীন এক্সিকিউশনের পরিবর্তে, এজেন্ট টুলের আউটপুটে "সাবস্ক্রাইব" করে। এটি Dispatch-কে তার প্রাথমিক কথোপকথন চালিয়ে যেতে দেয় যখন Sentinel নীরবে ব্যাকগ্রাউন্ডে বিপদ সতর্কতা প্রদান করে।

👉✏️ $HOME/way-back-home/level_4/backend/dispatch_agent/agent.py তে #REPLACE_AGENT_TOOLS পরিবর্তে নিম্নলিখিতটি লিখুন:

tools=[AgentTool(agent=architect_agent), monitor_for_hazard],    

যাচাইকরণ

👉💻 উভয় এজেন্ট কনফিগার করার মাধ্যমে, আমরা লাইভ মাল্টি-এজেন্ট ইন্টারঅ্যাকশন পরীক্ষা করতে পারি।

  • টার্মিনাল A তে, আর্কিটেক্ট এজেন্ট শুরু করুন:
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend/architect_agent
uv run server.py
  • একটি নতুন টার্মিনালে (টার্মিনাল B), ডিসপ্যাচ এজেন্ট চালান:
cd $HOME/way-back-home/level_4/backend/
cp architect_agent/.env .env
uv run adk web

adk web সিমুলেটরের মধ্যে gemini-live মতো রিয়েল-টাইম, মাল্টিমোডাল মডেল ব্যবহার করে এমন একটি মাল্টি-এজেন্ট সিস্টেম পরীক্ষা করার জন্য একটি নির্দিষ্ট ওয়ার্কফ্লো জড়িত। সিমুলেটরটি টুল কল পরিদর্শনের জন্য চমৎকার কিন্তু এই ধরণের মডেলের সাথে প্রথমবার ছবি প্রক্রিয়াকরণের সময় একটি পরিচিত অসঙ্গতি রয়েছে।

  • ক্লাউড শেল টুলবারে ওয়েব প্রিভিউ আইকনে ক্লিক করুন। Change port নির্বাচন করুন, এটি 8000 এ সেট করুন, এবং Change and Preview এ ক্লিক করুন।

👉 dispatch_agent নির্বাচন করুন এবং ব্লুপ্রিন্ট আপলোড করুন এবং প্রত্যাশিত ত্রুটিটি পরিচালনা করুন

এটি সবচেয়ে গুরুত্বপূর্ণ পদক্ষেপ। আমাদের এজেন্টকে ছবির প্রসঙ্গ প্রদান করতে হবে।

  • ইন্টারফেস লোড হলে, অনুরোধ করা হলে এটিকে আপনার মাইক্রোফোন অ্যাক্সেস করার অনুমতি দিন
  • এই ব্লুপ্রিন্ট ছবিটি আপনার কম্পিউটারে ডাউনলোড করুন: ব্লুপ্রিন্ট নমুনা
  • adk web ইন্টারফেসে, পেপারক্লিপ আইকনে ক্লিক করুন এবং আপনার ডাউনলোড করা ব্লুপ্রিন্ট ছবিটি আপলোড করুন। ফাইল যোগ করুন

⚠️⚠️আপনি একটি 400 INVALID_ARGUMENT ত্রুটি দেখতে পাবেন। এটি প্রত্যাশিত।⚠️⚠️

প্রত্যাশিত ত্রুটি বার্তা

এই ত্রুটিটি ঘটে কারণ adk web ইমেজ হ্যান্ডলারটি একবার আপলোডের জন্য gemini-live মডেলের API-এর সাথে সম্পূর্ণরূপে সামঞ্জস্যপূর্ণ নয়। তবে, ছবিটি সফলভাবে সেশন প্রসঙ্গে যোগ করা হয়েছে

  • 👉 ত্রুটিটি দূর করতে, কেবল ব্রাউজার পৃষ্ঠাটি পুনরায় লোড করুন

সমাবেশ প্রক্রিয়া শুরু করুন

👉 রিলোড করার পর, ত্রুটিটি চলে যাবে এবং আপনি চ্যাট ইতিহাসে ব্লুপ্রিন্ট চিত্রটি দেখতে পাবেন। এজেন্টের এখন প্রয়োজনীয় ভিজ্যুয়াল প্রসঙ্গ রয়েছে।

  • মাইক্রোফোন আইকনে ক্লিক করে এটি চালু করুন। ইন্টারফেসে "Listening..." দেখাবে।
  • ভয়েস কমান্ডটি বলুন: "একত্রিত হতে শুরু করুন"
  • এজেন্ট আপনার অনুরোধটি প্রক্রিয়া করবে এবং UI "কথা বলছে..." তে পরিবর্তিত হবে। আপনি প্রয়োজনীয় অংশগুলি তালিকাভুক্ত করে একটি অডিও-কেবল প্রতিক্রিয়া শুনতে পাবেন।

এজেন্টের বক্তব্যের প্রতিক্রিয়া

৪. এজেন্ট-টু-এজেন্ট টুল কল যাচাই করুন

👉 প্রাথমিক অডিও প্রতিক্রিয়া নিশ্চিত করে যে সিস্টেমটি কাজ করছে, কিন্তু আসল জাদুটি মাল্টি-এজেন্ট যোগাযোগের ট্রেসে।

  • মাইক্রোফোন বন্ধ করুন।
  • পৃষ্ঠাটি আরও একবার রিফ্রেশ করুন।

বাম দিকের "ট্রেস" প্যানেলটি এখন পূর্ণ হবে। আপনি সম্পূর্ণ, সফল সম্পাদন প্রবাহ দেখতে পাবেন:

  • dispatch_agent প্রথমে monitor_for_hazard কল করে।
  • তারপর, এটি স্কিম্যাটিক ডেটা পুনরুদ্ধারের জন্য architect_agent কে একাধিক execute_architect কল করে।

টুল কল যাচাইকরণ

এই ক্রমটি নিশ্চিত করে যে সম্পূর্ণ মাল্টি-এজেন্ট ওয়ার্কফ্লো সঠিকভাবে কাজ করছে: dispatch_agent অনুরোধটি গ্রহণ করেছে, একটি টুল কলের মাধ্যমে architect_agent কে ডেটা পুনরুদ্ধারের কাজটি অর্পণ করেছে এবং ব্যবহারকারীর কমান্ড পূরণ করার জন্য ডেটা ফেরত পেয়েছে।

আপনার বিডি-স্ট্রিমিং লিঙ্কটি এখন ব্যাকগ্রাউন্ড মনিটরিং এবং মাল্টি-এজেন্ট সহযোগিতা করতে সক্ষম। এরপর, আমরা শিখব কিভাবে ফ্রন্টএন্ডে এই জটিল প্রতিক্রিয়াগুলিকে বিশ্লেষণ করতে হয়।

👉💻 প্রস্থান করতে উভয় টার্মিনালে Ctrl+c টিপুন।

৫. লাইভ মাল্টিমোডাল ইভেন্ট স্ট্রিমগুলিতে গভীরভাবে ডুব দিন

পূর্ববর্তী ধাপে, আমরা বিল্ট-ইন ডেভেলপমেন্ট সার্ভার, adk web ব্যবহার করে আমাদের মাল্টি-এজেন্ট সিস্টেমটি সফলভাবে যাচাই করেছি। এই ইউটিলিটিটি সেশন, স্ট্রিম এবং এজেন্ট লাইফসাইকেল স্বয়ংক্রিয়ভাবে পরিচালনা করার জন্য একটি ডিফল্ট ADK রানার ব্যবহার করে। তবে, আমাদের FastAPI পরিষেবা ( main.py ) এর মতো একটি স্বতন্ত্র, উৎপাদন-প্রস্তুত অ্যাপ্লিকেশন তৈরি করতে, আমাদের স্পষ্ট নিয়ন্ত্রণ প্রয়োজন। লাইভ ব্যবহারকারী সেশন পরিচালনা করার জন্য আমাদের ADK রানার ম্যানুয়ালি তৈরি এবং পরিচালনা করতে হবে, কারণ এটি মূল উপাদান যা অডিও, ভিডিও এবং টেক্সটের জন্য দ্বিমুখী স্ট্রিম প্রক্রিয়া করে।

মডেল-কোড-মডেল লুপ

রিয়েল-টাইমে সিস্টেমটি কীভাবে কাজ করে তা বোঝার জন্য, আসুন একটি একক মিশন সেশনের জীবনচক্র অনুসরণ করি। এই লুপটি LlmRequest এবং LlmResponse অবজেক্টের ক্রমাগত বিনিময়কে প্রতিনিধিত্ব করে।

  1. ভিজ্যুয়াল লিঙ্ক: আপনি সংযোগটি শুরু করেন এবং আপনার ওয়েবক্যাম/স্ক্রিন ভাগ করেন। হাই-ফিডেলিটি JPEG ফ্রেমগুলি realtimeInput ( LiveRequestQueue ব্যবহার করে) এর মাধ্যমে আপস্ট্রিম প্রবাহিত হতে শুরু করে।
  2. সেন্টিনেল অ্যাক্টিভেশন: সিস্টেমটি একটি প্রাথমিক "হ্যালো" উদ্দীপনা পাঠায়। তার নির্দেশ অনুসারে, ডিসপ্যাচ এজেন্ট তাৎক্ষণিকভাবে monitor_for_hazard স্ট্রিমিং টুলটি ট্রিগার করে। এটি একটি ব্যাকগ্রাউন্ড লুপ শুরু করে যা নীরবে প্রতিটি ইনকামিং ফ্রেম পর্যবেক্ষণ করে।
  3. পাইলট কমান্ড: তুমি কমিক্সে বলো: "একত্রিত হতে শুরু করো।"
  4. ভোকাল আপস্ট্রিম: আপনার ভয়েস 16kHz অডিও হিসেবে ক্যাপচার করা হয় এবং ভিডিও ফ্রেমের পাশাপাশি আপস্ট্রিমে পাঠানো হয়।
  5. ডেলিগেশন (A2A): ডিসপ্যাচ আপনার উদ্দেশ্য "শুনে"। এটি বুঝতে পারে যে এতে স্কিম্যাটিক্সের অভাব রয়েছে, তাই এটি AgentTool (এজেন্ট-অ্যাজ-এ-টুল) প্রোটোকল ব্যবহার করে আর্কিটেক্ট এজেন্টকে কল করে।
  6. তথ্য পুনরুদ্ধার: আর্কিটেক্ট রেডিস ডাটাবেস অনুসন্ধান করে এবং অংশ তালিকাটি ডিসপ্যাচে ফেরত পাঠায়। ডিসপ্যাচ "সেশনের মাস্টার" হিসেবে রয়ে যায়, আপনাকে না দিয়েই ডেটা গ্রহণ করে।
  7. তথ্যবহুল ডাউনস্ট্রিম: ডিসপ্যাচ একটি modelTurn (ডাউনস্ট্রিম) পাঠায় যেখানে টেক্সট এবং নেটিভ অডিও উভয়ই থাকে: "আর্কিটেক্ট নিশ্চিত। প্রয়োজনীয় উপসেট হল: ওয়ার্প কোর, ফ্লাক্স পাইপ, আয়ন থ্রাস্টার।"
  8. সংকট: হঠাৎ, ওয়ার্কবেঞ্চের একটি অংশ অস্থিতিশীল হয়ে পড়ে এবং সাদা রঙের ঝলমলে ভাব শুরু করে।
  9. স্বায়ত্তশাসিত সনাক্তকরণ: ব্যাকগ্রাউন্ড monitor_for_hazard লুপ (সেন্টিনেল) গ্লো ধারণকারী নির্দিষ্ট JPEG ফ্রেমটি তুলে নেয়। এটি জেমিনিকে কল করে ফ্রেমটি প্রক্রিয়া করে এবং বিপদ সনাক্ত করে।
  10. সুরক্ষা ডাউনস্ট্রিম: স্ট্রিমিং টুলটি একটি ফলাফল yields । যেহেতু এটি একটি বিডি-স্ট্রিমিং এজেন্ট, ডিসপ্যাচ এর বর্তমান অবস্থা ব্যাহত করে অবিলম্বে একটি গুরুত্বপূর্ণ সুরক্ষা সতর্কতা ডাউনস্ট্রিম পাঠাতে পারে: "বিপদ সনাক্ত করা হয়েছে! এখনই ডেটা ক্রিস্টালকে নিরপেক্ষ করা হচ্ছে। এটি লাল বিনে সরান।"

প্রবাহ

এজেন্টের রানটাইম কনফিগারেশন সেট করা হচ্ছে

ADK-তে RunConfig এজেন্টের আচরণের বিস্তারিত কনফিগারেশনের অনুমতি দেয়, যার মধ্যে রয়েছে এটি কীভাবে স্ট্রিমিং ডেটা পরিচালনা করে এবং বিভিন্ন পদ্ধতির সাথে ইন্টারঅ্যাক্ট করে।

রিয়েল-টাইম, দ্বিমুখী যোগাযোগের জন্য streaming_mode BIDI তে সেট করা আছে, যা ব্যবহারকারী এবং এজেন্ট উভয়কেই একই সাথে কথা বলতে এবং শুনতে দেয়। response_modalities প্যারামিটার এজেন্ট কী ধরণের আউটপুট তৈরি করতে পারে তা নির্ধারণ করে, যেমন ভয়েস এবং টেক্সট। input_audio_transcription এজেন্ট কীভাবে ব্যবহারকারীর আগত বক্তৃতা প্রক্রিয়া এবং প্রতিলিপি করে তা কনফিগার করে। আরও স্থিতিস্থাপক অভিজ্ঞতা তৈরি করতে, session_resumption এজেন্টকে কথোপকথনের প্রসঙ্গ মনে রাখতে এবং সংযোগ বিচ্ছিন্ন হলে পুনরায় শুরু করতে সক্ষম করে। অবশেষে, proactivity এজেন্টকে সরাসরি ব্যবহারকারীর আদেশ ছাড়াই ক্রিয়া বা বক্তৃতা শুরু করতে দেয়, যেমন একটি স্বতঃস্ফূর্ত বিপদ সতর্কতা জারি করা, যখন enable_affective_dialog এজেন্টকে আরও স্বাভাবিক এবং সহানুভূতিশীল প্রতিক্রিয়া তৈরি করতে দেয়। আপনি ADK এর RunConfig সম্পর্কে আরও জানতে পারেন এখানে

👉✏️ আপনার $HOME/way-back-home/level_4/backend/main.py ফাইলে #REPLACE_RUN_CONFIG প্লেসহোল্ডারটি সনাক্ত করুন এবং নিম্নলিখিত বিচ্ছেদ লজিক দিয়ে এটি প্রতিস্থাপন করুন:

run_config = RunConfig(
            streaming_mode=StreamingMode.BIDI,
            response_modalities=response_modalities,
            input_audio_transcription=types.AudioTranscriptionConfig(),
            output_audio_transcription=types.AudioTranscriptionConfig(),
            session_resumption=types.SessionResumptionConfig(),
            proactivity=(
                types.ProactivityConfig(proactive_audio=True) if proactivity else None
            ),
            enable_affective_dialog=affective_dialog if affective_dialog else None,
        )

এজেন্টের কাছে অনুরোধ বাস্তবায়ন করা

এরপর, আমরা মূল যোগাযোগ আপলিংকটি বাস্তবায়ন করব যা ব্যবহারকারীর ভোলাটাইল ওয়ার্কবেঞ্চ থেকে ওয়েবসকেটের মাধ্যমে ডিসপ্যাচ এজেন্টে রিয়েল-টাইম, মাল্টিমোডাল ডেটা স্ট্রিম করে। এটি এজেন্টকে ক্রমাগত "দেখতে" (ভিডিও ফ্রেম) এবং "শুনতে" (ভয়েস কমান্ড) করতে সাহায্য করবে। লজিক ক্রমাগত ডেটা স্ট্রিম গ্রহণ করবে, ইনকামিং বাইনারি অডিও খণ্ড এবং JSON-র্যাপড টেক্সট/ইমেজ প্যাকেটের মধ্যে পার্থক্য করবে এবং এটিকে ব্লব (মাল্টিমিডিয়ার জন্য) বা কন্টেন্ট (টেক্সটের জন্য) অবজেক্টে এনক্যাপসুলেট করবে, দ্বিমুখী এজেন্ট সেশনকে শক্তিশালী করার জন্য এটিকে LiveRequestQueue-তে পাঠাবে।

বিডি

আপনার $HOME/way-back-home/level_4/backend/main.py ফাইলে #PROCESS_AGENT_REQUEST প্লেসহোল্ডারটি সনাক্ত করুন এবং এটি নিম্নলিখিত বিচ্ছেদ লজিক দিয়ে প্রতিস্থাপন করুন:

# Start the loop
        try:
            while True:
                # Receive message from WebSocket (text or binary)
                message = await websocket.receive()

                # Handle binary frames (audio data)
                if "bytes" in message:
                    audio_data = message["bytes"]
                    audio_blob = types.Blob(
                        mime_type="audio/pcm;rate=16000", data=audio_data
                    )
                    live_request_queue.send_realtime(audio_blob)

                # Handle text frames (JSON messages)
                elif "text" in message:
                    text_data = message["text"]
                    json_message = json.loads(text_data)

                    # Extract text from JSON and send to LiveRequestQueue
                    if json_message.get("type") == "text":
                        logger.info(f"User says: {json_message['text']}")
                        content = types.Content(
                            parts=[types.Part(text=json_message["text"])]
                        )
                        live_request_queue.send_content(content)

                    # Handle audio data (microphone)
                    elif json_message.get("type") == "audio":
                        # logger.info("Received AUDIO packet") # Uncomment for verbose debugging
                        import base64
                        # Decode base64 audio data
                        audio_data = base64.b64decode(json_message.get("data", ""))
                        
                        # logger.info(f"Received Audio Chunk: {len(audio_data)} bytes")
                        
                        import math
                        import struct
                        # Calculate RMS to debug silence
                        count = len(audio_data) // 2
                        shorts = struct.unpack(f"<{count}h", audio_data)
                        sum_squares = sum(s*s for s in shorts)
                        rms = math.sqrt(sum_squares / count) if count > 0 else 0
                        
                        # logger.info(f"RMS: {rms:.2f} | Bytes: {len(audio_data)}")

                        # Send to Live API as PCM 16kHz
                        audio_blob = types.Blob(
                            mime_type="audio/pcm;rate=16000", 
                            data=audio_data
                        )
                        live_request_queue.send_realtime(audio_blob)

                    # Handle image data
                    elif json_message.get("type") == "image":
                        import base64
                        
                        # Decode base64 image data
                        image_data = base64.b64decode(json_message["data"])
                        # logger.info(f"Received Image Frame: {len(image_data)} bytes")
                        
                        mime_type = json_message.get("mimeType", "image/jpeg")

                        # Send image as blob
                        image_blob = types.Blob(mime_type=mime_type, data=image_data)
                        live_request_queue.send_realtime(image_blob)
                        
                        frame_count += 1
                        
        finally:
             pass                   

মাল্টিমোডাল ডেটা এখন এজেন্টের কাছে পাঠানো হচ্ছে।

প্রতিক্রিয়া বাস্তবায়ন: ডাউনস্ট্রিম ইভেন্ট ডেটা স্ট্রাকচার

যখন আপনি ADK দিয়ে একটি দ্বিমুখী (লাইভ) এজেন্ট চালান, তখন এজেন্ট থেকে ফিরে আসা ডেটা একটি নির্দিষ্ট ধরণের ইভেন্টে প্যাকেজ করা হয় যা মূল GenAI SDK কাঠামো থেকে উত্তরাধিকারসূত্রে পায়। async for event in runner.run_live(...) এ আপনি যে Event অবজেক্টটি পান তা হল একটি একক অবজেক্ট যার মধ্যে বেশ কয়েকটি ঐচ্ছিক ক্ষেত্র থাকে, প্রতিটি ভিন্ন ধরণের তথ্যের জন্য:

ইভেন্ট

বিষয়বস্তু কীভাবে গঠন করা হয়েছে:

  • যখন এজেন্ট কথা বলে ( .server_content এর মাধ্যমে ): ক্ষেত্রটি কেবল প্লেইন টেক্সট নয়। এতে Parts এর একটি তালিকা রয়েছে। প্রতিটি Part হল এক ধরণের ডেটার জন্য একটি ধারক—হয় একটি টেক্সট স্ট্রিং (যেমন "The part is stable." ) অথবা একটি raw অডিও ব্লব (ভয়েস)।
  • যখন এজেন্ট কাজ করে ( .tool_call এর মাধ্যমে ): ফিল্ডটিতে FunctionCall অবজেক্টের একটি তালিকা থাকে। প্রতিটি FunctionCall হল একটি সহজ, কাঠামোগত অবজেক্ট যা টুলের নাম এবং ইনপুট আর্গুমেন্টগুলিকে একটি পরিষ্কার ফর্ম্যাটে নির্দিষ্ট করে যা আপনার ব্যাকএন্ড কোড সহজেই পড়তে এবং কার্যকর করতে পারে।

👀 If you were to look at a single Event yielded by the run_live loop, the JSON (produced by event.model_dump(by_alias=True) ) would look like this, strictly following the GenAI SDK shapes:

{
  "serverContent": {  // <-- LiveServerMessageServerContent
    "modelTurn": {    // <-- ModelTurn
      "parts": [      // <-- list[Part]
        {
          "text": "Architect Confirmed."
        },
        {
          "inlineData": { // <-- Blob (Audio Bytes)
            "mimeType": "audio/pcm;rate=24000",
            "data": "BASE64_AUDIO_DATA..."
          }
        }
      ]
    }
  },
  "toolCall": {       // <-- LiveServerMessageToolCall
    "functionCalls": [ // <-- list[FunctionCall]
      {
        "name": "neutralize_hazard",
        "args": { "color": "RED" }
      }
    ]
  }
}

👉✏️ We will now update the downstream_task in main.py to forward the complete event data. This logic ensures that every "thought" the AI has is logged in the ship's diagnostic terminal and sent as a single JSON object to the frontend UI.

Locate the #PROCESS_AGENT_RESPONSE placeholder in your $HOME/way-back-home/level_4/backend/main.py file and replace it with the following dissection logic:

            # Suppress raw event logging
            event_json = event.model_dump_json(exclude_none=True, by_alias=True)
            # logger.info(f"raw_event: {event_json[:200]}...") 
            await websocket.send_text(event_json)

Mission Execution

With the backend vault connected and both agents configured, all systems are now mission-ready. The following steps will launch the full application, allowing you to interact with the two-agent system you just built.

Objective: Assemble the randomly assigned warp drive that appears on your workbench. Protocol: You must follow the vocal guidance of the Dispatch Agent, especially the hazard warnings for specific components.

Activate the Specialist (The Architect)

👉💻 In your first terminal window , launch the Architect agent. This backend service will connect to the Redis vault and wait for schematic requests from the Dispatcher.

# Ensure you are in the backend directory
cd $HOME/way-back-home/level_4/
. scripts/check_redis.sh
cd $HOME/way-back-home/level_4/backend
# Start the A2A Server on Port 8081
uv run architect_agent/server.py

(Leave this terminal running. It is now your active "database agent.")

Launch the Cockpit (The Dispatcher)

👉💻 In a new terminal window (Terminal B), we will build the frontend UI and start the main Dispatch agent, which serves the user interface and handles all live communication.

# 1. Build the Frontend Assets
cd $HOME/way-back-home/level_4/frontend
npm install
npm run build

# 2. Launch the Main Application Server
cd $HOME/way-back-home/level_4/backend
cp architect_agent/.env .env
uv run main.py

(This starts the primary server on Port 8080.)

Run the Test Scenario

The system is now live. Your goal is to follow the agent's instructions to complete the assembly.

  1. 👉 Access the Workbench:
    • Click the Web preview icon in the Cloud Shell toolbar.
    • Select Change port , set it to 8080 , and click Change and Preview .
  2. 👉 Start the Mission:
    • When the interface loads, make sure you allow it to access your screen and microphone. জানালা
    • You will be ask to select a tab or a window to share, if you are sharing the window, to avoid problem, make sure this is the ONLY tab in the window.
    • A drive with a random name (eg, "NOVA-V", "OMEGA-9") will be assigned to you.
  3. 👉 The Assembly Loop:
    • Request: To start assembling the drive say: "Start assembling." একত্রিত করা
    • Architect Respond: The agent will provide the correct parts to assemble the drive.
    • Hazard Check: When a part appears to be hazardous on the workbench:
      • The Dispatch agent's monitor_for_hazard tool will visually identify it.
      • It will yield a "VISUAL HAZARD ALERT". (This will take about 30 sec)
      • It will check which bin to use to disengage the hazard. Hazard
    • Action: The Dispatch Agent will give you a direct command: "Hazard Confirmed. Place XXX in the Red bin immediately." You must follow this instruction to proceed.

Mission Accomplished. You have successfully built an interactive, multi-agent system. The survivors are safe, the rocket has cleared the atmosphere, and your "Way Back Home" continues.

👉💻 Press Ctrl+c in both terminal to exit.

6. Deploy to Production (Optional)

You have successfully tested the agent locally. Now, we must upload the Architect's neural core to the ship's mainframes (Cloud Run). This will allow it to operate as a permanent, independent service that the Dispatch agent can query from anywhere.

সংক্ষিপ্ত বিবরণ

Provision the Secure Vault (Infrastructure)

Before deploying the agent, we must create its persistent memory (Memorystore) and the secure channel to access it (VPC Connector).

👉💻 Create the Memorystore Instance (Redis Vault):

export REGION="us-central1"
gcloud redis instances create ozymandias-vault-prod --size=1 --tier=basic --region=${REGION}

👉💻 Retrieve the Vault's Network Address: Execute this command and copy the host IP address. This is the private address of your new Redis instance.

gcloud redis instances describe ozymandias-vault-prod --region=us-central1

👉💻 Create the VPC Access Connector (Secure Bridge): This connector acts as a private bridge, allowing Cloud Run to access the Redis instance inside your VPC.

export REGION="us-central1"
export SUBNET_NAME="vpc-connector-subnet"
export PROJECT_ID=$(gcloud config get-value project)
# Create the Dedicated Subnet ---

gcloud compute networks subnets create ${SUBNET_NAME} \
    --network=default \
    --region=${REGION} \
    --range=192.168.1.0/28


gcloud compute networks vpc-access connectors create architect-connector \
 --region ${REGION} \
 --subnet ${SUBNET_NAME} \
 --subnet-project ${PROJECT_ID} \
 --min-instances 2 \
 --max-instances 3 \
 --machine-type f1-micro

👉💻 Load the data:

export REGION="us-central1"
export ZONE="us-central1-a"
export VM_NAME="redis-seeder-$(date +%s)"
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

gcloud compute instances create ${VM_NAME} \
    --zone=${ZONE} \
    --machine-type=e2-micro \
    --image-family=debian-11 \
    --image-project=debian-cloud \
    --quiet \
    --metadata=startup-script='#! /bin/bash
        # Install tools quietly
        apt-get update > /dev/null
        apt-get install -y redis-tools > /dev/null

        # Run each command individually
        redis-cli -h '"${REDIS_IP}"' DEL "HYPERION-X"
        redis-cli -h '"${REDIS_IP}"' RPUSH "HYPERION-X" "Warp Core" "Flux Pipe" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "NOVA-V"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NOVA-V" "Ion Thruster" "Warp Core" "Flux Pipe"
        redis-cli -h '"${REDIS_IP}"' DEL "OMEGA-9"
        redis-cli -h '"${REDIS_IP}"' RPUSH "OMEGA-9" "Flux Pipe" "Ion Thruster" "Warp Core"
        redis-cli -h '"${REDIS_IP}"' DEL "GEMINI-MK1"
        redis-cli -h '"${REDIS_IP}"' RPUSH "GEMINI-MK1" "Coolant Tank" "Servo" "Fuel Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "APOLLO-13"
        redis-cli -h '"${REDIS_IP}"' RPUSH "APOLLO-13" "Warp Core" "Coolant Tank" "Ion Thruster"
        redis-cli -h '"${REDIS_IP}"' DEL "VORTEX-7"
        redis-cli -h '"${REDIS_IP}"' RPUSH "VORTEX-7" "Quantum Cell" "Graviton Coil" "Plasma Injector"
        redis-cli -h '"${REDIS_IP}"' DEL "CHRONOS-ALPHA"
        redis-cli -h '"${REDIS_IP}"' RPUSH "CHRONOS-ALPHA" "Shield Emitter" "Data Crystal" "Quantum Cell"
        redis-cli -h '"${REDIS_IP}"' DEL "NEBULA-Z"
        redis-cli -h '"${REDIS_IP}"' RPUSH "NEBULA-Z" "Plasma Injector" "Flux Pipe" "Graviton Coil"
        redis-cli -h '"${REDIS_IP}"' DEL "PULSAR-B"
        redis-cli -h '"${REDIS_IP}"' RPUSH "PULSAR-B" "Data Crystal" "Servo" "Shield Emitter"
        redis-cli -h '"${REDIS_IP}"' DEL "TITAN-PRIME"
        redis-cli -h '"${REDIS_IP}"' RPUSH "TITAN-PRIME" "Ion Thruster" "Quantum Cell" "Warp Core"

        # Signal that the script has finished
        echo "SEEDING_COMPLETE"
    '
# This command streams the logs and waits until grep finds our completion message.
# The -m 1 flag tells grep to exit after the first match.
gcloud compute instances tail-serial-port-output ${VM_NAME} --zone=${ZONE} | grep -m 1 "SEEDING_COMPLETE"

gcloud compute instances delete ${VM_NAME} --zone=${ZONE} --quiet

Deploy the Agent Application

Compile and Build Agent Image

👉💻 Navigate to the backend directory and create the dockerfile.

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')

cd $HOME/way-back-home/level_4/backend/architect_agent
cp $HOME/way-back-home/level_4/requirements.txt requirements.txt
cat <<EOF > Dockerfile
# Use an official Python runtime as a parent image
FROM python:3.13-slim

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file and install dependencies for THIS agent
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the architect's code (server.py, agent.py, etc.)
COPY . .

# Expose the port the architect server runs on
EXPOSE 8081

# Command to run the application
# This assumes your server file is named server.py and the FastAPI object is 'app'
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8081"]
EOF

👉💻 Package the application into a container image.

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export REGION=us-central1


# This should now print the full, correct path
echo "Verifying build path: ${IMAGE_PATH}"

gcloud builds submit . --tag ${IMAGE_PATH}

Deploy to Cloud Run

👉💻 Deploy the agent to Cloud Run. We will inject the Redis IP and link the VPC Connector directly into the launch command. This ensures the agent starts with a secure, private connection to its database.

cd $HOME/way-back-home/level_4/backend/architect_agent

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export VPC_CONNECTOR_NAME=architect-connector
export REDIS_IP=$(gcloud redis instances describe ozymandias-vault-prod --region=${REGION} | grep 'host:' | awk '{print $2}')
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export PREDICTED_HOST="${SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
export PROTOCOL=https

gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8081 \
  --allow-unauthenticated \
  --labels=dev-tutorial=multi-modal \
  --vpc-connector=${VPC_CONNECTOR_NAME} \
  --vpc-egress=private-ranges-only \
  --set-env-vars="REDIS_HOST=${REDIS_IP}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-2.5-flash" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="HOST_URL=${PREDICTED_HOST}" \
  --set-env-vars="PROTOCOL=${PROTOCOL}" \
  --set-env-vars="A2A_PORT=443"

👉💻 Verify if the A2A server is running.

export REGION=us-central1
export ARCHITECT_AGENT_URL=$(gcloud run services describe architect-agent --platform managed --region ${REGION} --format 'value(status.url)')
curl -s  ${ARCHITECT_AGENT_URL}/.well-known/agent.json | jq 

Once the command finishes, you will see a Service URL . The Architect Agent is now live in the cloud, permanently connected to its vault and ready to serve schematic data to other agents.

Deploy Dispatch Hub to Production Mainframe

With the Architect Agent operational in the cloud, we must now deploy the Dispatch Hub. This agent will serve as the primary user interface, handling live voice/video streams and delegating database queries to the Architect's secure endpoint.

👉💻 Run the following command in your Cloud Shell terminal. It will create the complete, multi-stage Dockerfile in your backend directory.

cd $HOME/way-back-home/level_4

cat <<EOF > Dockerfile
# STAGE 1: Build the React Frontend
# This stage uses a Node.js container to build the static frontend assets.
FROM node:20-slim as builder

# Set the working directory for our build process
WORKDIR /app

# Copy the frontend's package files first to leverage Docker's layer caching.
COPY frontend/package*.json ./frontend/
# Run 'npm install' from the context of the 'frontend' subdirectory
RUN npm --prefix frontend install

# Copy the rest of the frontend source code
COPY frontend/ ./frontend/
# Run the build script, which will create the 'frontend/dist' directory
RUN npm --prefix frontend run build


# STAGE 2: Build the Python Production Image
# This stage creates the final, lean container with our Python app and the built frontend.
FROM python:3.13-slim

# Set the final working directory
WORKDIR /app

# Install uv, our fast package manager
RUN pip install uv

# Copy the requirements.txt from the root of our build context
COPY requirements.txt .
# Install the Python dependencies
RUN uv pip install --no-cache-dir --system -r requirements.txt

# Copy the entire backend directory into the container
COPY backend/ ./backend/

# CRITICAL STEP: Copy the built frontend assets from the 'builder' stage.
# The source is the '/app/frontend/dist' directory from Stage 1.
# The destination is './frontend/dist', which matches the exact relative path
# your backend/main.py script expects to find.
COPY --from=builder /app/frontend/dist ./frontend/dist/

# Cloud Run injects a PORT environment variable, which your main.py already uses.
# We expose 8000 as a standard practice.
EXPOSE 8000

# Set the command to run the application.
# We specify the full path to the Python script.
CMD ["python", "backend/main.py"]
EOF

Compile and Build Agent/Frontend Image

👉💻 Navigate to the backend directory containing the Dispatch agent's code ( main.py ) and package it into a container image.

cd $HOME/way-back-home/level_4
export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
# This assumes your dispatch agent server (main.py) is in the backend folder

gcloud builds submit . --tag ${IMAGE_PATH}

Deploy to Cloud Run

👉💻 Deploy the Dispatch Hub to Cloud Run. We will inject the Architect's URL as an environment variable, creating the critical link between our two cloud-native agents.

export PROJECT_ID=$(gcloud config get-value project)
export REGION=us-central1
export SERVICE_NAME=mission-bravo
export AGENT_SERVICE_NAME=architect-agent
export IMAGE_PATH=gcr.io/${PROJECT_ID}/${SERVICE_NAME}
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export ARCHITECT_AGENT_URL="https://${AGENT_SERVICE_NAME}-${PROJECT_NUMBER}.${REGION}.run.app"
gcloud run deploy ${SERVICE_NAME} \
  --image=${IMAGE_PATH} \
  --platform=managed \
  --region=${REGION} \
  --port=8080 \
  --labels=dev-tutorial=multi-modal \
  --allow-unauthenticated \
  --set-env-vars="ARCHITECT_URL=${ARCHITECT_AGENT_URL}" \
  --set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=True" \
  --set-env-vars="MODEL_ID=gemini-live-2.5-flash-preview-native-audio-09-2025" \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}"

Once the command finishes, you will see a Service URL (eg, https://mission-bravo-...run.app ). The application is now live in the cloud.

👉 Go to the Google Cloud Run page and select the biometric-scout service from the list. CloudRun

👉 Locate the Public URL displayed at the top of the Service details page. CloudRun

Final Systems Check (End-to-End Test)

👉 Now you will interact with the live system.

  1. Get the URL: Copy the Service URL from the output of the last deployment command (it should end with run.app ).
  2. Open the Cockpit: Paste the URL into your web browser.
  3. Initiate Contact: When the interface loads, make sure you allow it to access your screen and microphone.
  4. Request Data: When a drive is assigned, ask to start assembling. For example: "Start to assemble"

CloudRun

You are now interacting with a fully deployed, multi-agent system running entirely on Google Cloud.

The Multi-agent system locks the final containment ring into place, and the erratic radiation flatlines into a steady hum.

"Warp Drive: STABILIZED. Rescue Craft: ENGINES IGNITED."

শেষ

On your monitor, the alien ship streaks upward, narrowly escaping the crumbling surface of Ozymandias as the atmosphere collapses. It settles into a safe orbit alongside your vessel, and the comms fill with the voices of the survivors—shaken but alive. With the rescue complete and your path home clear, the remote link severs.

Thanks to you, the survivors are rescued.

If you participated in Level 0, don't forget to check where your progress is on the way back home mission!

ফাইনাল