1. परिचय
इस कोडलैब में, CrewAI, LangGraph, A2A प्रोटोकॉल, और ADK (एजेंट डेवलपमेंट किट) का इस्तेमाल करके, मल्टी-एजेंट ऑर्केस्ट्रेशन सिस्टम बनाने का तरीका बताया गया है. आपको एक ऐसा सिस्टम बनाना होगा जिसमें ADK कंट्रोल रूम, प्लानिंग की ज़िम्मेदारी LangGraph प्लानर को सौंपता है. यह प्लानर, CrewAI की एक्ज़ीक्यूशन टीम को टास्क भेजता है. ये सभी, A2A के ज़रिए कनेक्ट होते हैं, ताकि खुदरा इन्वेंट्री को फिर से स्टॉक करने के काम को मैनेज किया जा सके.
मल्टी-एजेंट ऑर्केस्ट्रेशन क्या है?
मल्टी-एजेंट सिस्टम में, कई विशेषज्ञ एआई एजेंट मिलकर ऐसे टास्क पूरे करते हैं जिन्हें कोई एक एजेंट पूरा नहीं कर सकता. एक ही एजेंट से सारे काम कराने के बजाय, समस्या को अलग-अलग भूमिकाओं में बांटा जाता है. जैसे, प्लानर और एक्ज़ीक्यूटर. हर भूमिका के लिए अलग-अलग टूल और विशेषज्ञता होती है.
यह ठीक उसी तरह काम करता है जैसे कोई मानवीय संगठन काम करता है: एक मैनेजर, रणनीति बनाने का काम विश्लेषकों को और उसे लागू करने का काम विशेषज्ञों को सौंपता है. इसके ये फ़ायदे हैं:
- काम को अलग-अलग हिस्सों में बांटना: हर एजेंट उस काम पर फ़ोकस करता है जिसे वह सबसे अच्छी तरह से कर सकता है
- फ़्रेमवर्क को ज़रूरत के हिसाब से इस्तेमाल करना: हर भूमिका के लिए सबसे सही फ़्रेमवर्क का इस्तेमाल करें. जैसे, प्लानिंग लॉजिक के लिए LangGraph और टूल को लागू करने के लिए CrewAI
- स्केलेबिलिटी: पूरे सिस्टम में बदलाव किए बिना, खास एजेंट जोड़ें
स्थिति
जब कोई उपयोगकर्ता, "टोक्यो ऑफ़िस के लिए 1 Pixel 7 फ़ोन फिर से स्टॉक में उपलब्ध कराएं" जैसा रीस्टॉक करने का अनुरोध भेजता है, तो सिस्टम:
- LangGraph Planner, अनुरोध का विश्लेषण करता है और आइटम और संख्या की जानकारी निकालता है
- Planner, प्लान को लागू करने का काम CrewAI Execution Crew को सौंपता है
- सोर्सिंग स्पेशलिस्ट एजेंट, टूल का इस्तेमाल करके प्रॉडक्ट कैटलॉग खोजता है
- खरीदारी अधिकारी एजेंट, बजट की पुष्टि करता है और टूल का इस्तेमाल करके खरीदारी का ऑर्डर देता है
- नतीजा प्लानर को वापस भेज दिया जाता है. इसके बाद, प्लानर एक फ़ाइनल रिपोर्ट जनरेट करता है
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
टेक्नोलॉजी स्टैक
परत | टेक्नोलॉजी | भूमिका |
प्लानिंग | LangGraph | स्टेट मशीन, अनुरोधों का विश्लेषण करती है, उन्हें रूट करती है, और रिपोर्ट जनरेट करती है |
लागू करना | CrewAI | भूमिका के आधार पर एजेंट, जो कॉल टूल को क्रम से कॉल करते हैं |
LLM | Vertex AI पर Gemini | इससे एजेंट को तर्क करने और टूल चुनने में मदद मिलती है |
एजेंट के बीच बातचीत | A2A प्रोटोकॉल | JSON-RPC 2.0 ब्रिज, ताकि अलग-अलग फ़्रेमवर्क के एजेंट आपस में बातचीत कर सकें |
टॉप-लेवल ऑर्केस्ट्रेटर | ADK (BaseAgent) | अनुरोध स्वीकार करता है, A2A के ज़रिए काम सौंपता है, और काम पूरा न होने पर फिर से प्लान बनाता है |
इसे काम करते हुए देखें: अगर यह सुविधा उपलब्ध है, तो https://scale-control-room-761793285222.us-central1.run.app पर जाकर, प्रोडक्शन सिस्टम को आज़माएं. इसमें रीयल-टाइम डैशबोर्ड, A2A प्रोटोकॉल, और IAM सुरक्षा की सुविधा मिलती है.
आपको क्या करना होगा
- एजेंट के इस्तेमाल के लिए कस्टम टूल तय करना.
- CrewAI की मदद से, खास एजेंट बनाएँ.
- LangGraph की मदद से, स्टेट मशीन प्लानर बनाएं.
- प्लानर और एक्ज़ीक्यूशन क्रू के बीच के फ़्लो को मैनेज करना.
- अलग-अलग फ़्रेमवर्क के बीच कम्यूनिकेशन के लिए, प्लानर को A2A प्रोटोकॉल सर्वर में रैप करें.
- टॉप-लेवल का ADK कंट्रोल रूम बनाएं. यह A2A के ज़रिए काम करता है और अगर कोई समस्या आती है, तो फिर से प्लान बनाता है.
आपको किन चीज़ों की ज़रूरत होगी
- कोई वेब ब्राउज़र, जैसे कि Chrome
- बिलिंग की सुविधा वाला Google Cloud प्रोजेक्ट
यह कोडलैब, इंटरमीडिएट डेवलपर के लिए है. इन्हें Python और एलएलएम के बुनियादी सिद्धांतों के बारे में पता है.
अनुमानित अवधि: 35 मिनट.
लागत का अनुमान: इस कोडलैब में बनाए गए संसाधनों की लागत, 1 डॉलर से कम होनी चाहिए.
2. शुरू करने से पहले
Google Cloud प्रोजेक्ट बनाना
- Google Cloud Console में, प्रोजेक्ट चुनने वाले पेज पर, Google Cloud प्रोजेक्ट चुनें या बनाएं.
- पक्का करें कि आपके Cloud प्रोजेक्ट के लिए बिलिंग चालू हो. किसी प्रोजेक्ट के लिए बिलिंग चालू है या नहीं, यह देखने का तरीका जानें.
Cloud Shell शुरू करना
Cloud Shell, Google Cloud में चलने वाला एक कमांड-लाइन एनवायरमेंट है. इसमें ज़रूरी टूल पहले से लोड होते हैं.
- Google Cloud कंसोल में सबसे ऊपर मौजूद, Cloud Shell चालू करें पर क्लिक करें.
- Cloud Shell से कनेक्ट होने के बाद, अपने क्रेडेंशियल की पुष्टि करें:
gcloud auth list - पुष्टि करें कि आपका प्रोजेक्ट कॉन्फ़िगर किया गया है:
gcloud config get project - अगर आपका प्रोजेक्ट उम्मीद के मुताबिक सेट नहीं है, तो इसे सेट करें:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
एपीआई चालू करें
Vertex AI API को चालू करने के लिए, यह निर्देश चलाएं:
gcloud services enable aiplatform.googleapis.com
ध्यान दें: Cloud Shell, आपके Google Cloud खाते से अपने-आप पुष्टि करता है. अगर इस कोडलैब को Cloud Shell के बाहर चलाया जा रहा है, तो आपको Vertex AI से पुष्टि करने के लिए, gcloud auth application-default login चलाना होगा.
अपना एनवायरमेंट सेट अप करने का तरीका
Cloud Shell में, अपने प्रोजेक्ट के लिए एक नई डायरेक्ट्री बनाएं और उसमें जाएं:
mkdir scale-agents
cd scale-agents
uv इंस्टॉल करें और इसका इस्तेमाल करके ज़रूरी पैकेज इंस्टॉल करें:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Vertex AI के लिए एनवायरमेंट वैरिएबल सेट करें:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. टूल और एजेंट तय करना
मल्टी-एजेंट सिस्टम में, एजेंट को दुनिया से इंटरैक्ट करने के लिए टूल की ज़रूरत होती है. साथ ही, उन्हें यह पता होना चाहिए कि उन्हें क्या करना है.
scale_agents.py नाम की एक फ़ाइल बनाएं और इसमें यह कोड जोड़ें. इससे इंपोर्ट, मॉक टूल, और CrewAI एजेंट सेट अप किए जाते हैं.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
खास कॉन्सेप्ट
@toolडेकोरेटर: CrewAI इसका इस्तेमाल, सामान्य Python फ़ंक्शन को ऐसे टूल में बदलने के लिए करता है जिन्हें एलएलएम समझ सकें और कॉल कर सकें. फ़ंक्शन के टाइप हिंट और डॉकस्ट्रिंग का इस्तेमाल करके, टूल स्कीमा जनरेट किया जाता है. इसे एलएलएम समझ सकता है.- भूमिका, लक्ष्य, और बैकस्टोरी: इनसे एजेंट की भूमिका तय होती है. साथ ही, ये एलएलएम को तर्क करने में मदद करते हैं. बैकस्टोरी सिर्फ़ जानकारी देने के लिए नहीं है. "कैटलॉग में हमेशा खोजें" से एजेंट को जवाबों के बारे में भ्रमित होने के बजाय, अपने टूल इस्तेमाल करने के लिए बढ़ावा मिलता है.
reasoning=False: इससे जवाब देने के लिए ज़्यादा सोच-विचार करने की सुविधा बंद हो जाती है. इसलिए, एजेंट सीधे जवाब देने के बजाय, टूल को कॉल करने के स्टैंडर्ड लूप को फ़ॉलो करता है.allow_delegation=False: इससे हर एजेंट को, उसे असाइन किए गए टूल पर फ़ोकस करने में मदद मिलती है. इससे काम को दूसरे एजेंट को नहीं सौंपा जाता.
एक के बजाय दो एजेंट क्यों? हर एजेंट के पास अलग-अलग टूल होते हैं और उसका काम भी अलग होता है. सोर्सिंग स्पेशलिस्ट सिर्फ़ प्रॉडक्ट खोजता है. वहीं, खरीद अधिकारी सिर्फ़ बजट और ऑर्डर मैनेज करता है. ज़िम्मेदारियों को अलग-अलग करने का मतलब है कि हर एजेंट के पास एक फ़ोकस वाला प्रॉम्प्ट और काम के टूल का एक छोटा सेट होता है. इससे एलएलएम, एक ही एजेंट के मुकाबले ज़्यादा भरोसेमंद तरीके से काम करता है.
4. टास्क और क्रू को परिभाषित करना
अब हम यह तय करेंगे कि इन एजेंट को क्या करना है. इसके लिए, टास्क बनाए जाएंगे और उन्हें क्रू में शामिल किया जाएगा.
इसी scale_agents.py फ़ाइल के आखिर में यह कोड जोड़ें:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
खास कॉन्सेप्ट
- टास्क का कॉन्टेक्स्ट:
context=[sourcing_task]से CrewAI को पता चलता है कि खरीदारी के टास्क को पूरा करने के लिए, सोर्सिंग के टास्क का आउटपुट ज़रूरी है. खरीदारी अधिकारी, यह देख सकता है कि सोर्सिंग स्पेशलिस्ट ने क्या जानकारी इकट्ठा की है. इसके बाद, वह यह तय कर सकता है कि क्या ऑर्डर करना है. - Process.sequential: टास्क उसी क्रम में लागू होते हैं जिस क्रम में वे सूची में दिखते हैं. यह ज़रूरी है, क्योंकि खरीदारी का टास्क, सोर्सिंग के टास्क के नतीजों पर निर्भर करता है. आपको यह पता होने से पहले कि कौन-सा प्रॉडक्ट खरीदना है, ऑर्डर नहीं दिया जा सकता.
memory=False/planning=False: इस डेमो के लिए, CrewAI की बिल्ट-इन मेमोरी और प्लानिंग की सुविधाओं को बंद कर देता है, ताकि इसे आसानी से और अनुमान के मुताबिक पूरा किया जा सके.
5. LangGraph Planner बनाना
एक्ज़ीक्यूशन क्रू, "कैसे" से जुड़ी चीज़ें मैनेज करता है. जैसे, प्रॉडक्ट खोजना, बजट की जांच करना, और ऑर्डर देना. लेकिन "क्या" तय कौन करता है? यह प्लानिंग एजेंट है, जिसे LangGraph की मदद से बनाया गया है.
LangGraph, वर्कफ़्लो को स्टेट मशीन के तौर पर मॉडल करता है. यह नोड (फ़ंक्शन) का एक ऐसा ग्राफ़ होता है जो किनारों (ट्रांज़िशन) से जुड़ा होता है. स्टेट, ग्राफ़ में फ़्लो होता है. हर नोड, शेयर की गई स्टेट से पढ़ता है और उसमें लिखता है. यह प्लानिंग वर्कफ़्लो के लिए सबसे सही है. इसमें आपको कंट्रोल फ़्लो को साफ़ तौर पर तय करने की ज़रूरत होती है: अनुरोध का विश्लेषण करें, क्रू को काम सौंपें, और रिपोर्ट जनरेट करें.
इसी scale_agents.py फ़ाइल के आखिर में यह कोड जोड़ें:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
खास कॉन्सेप्ट
- StateGraph: यह स्टेट मशीन को तय करता है.
PlanStateटाइप की गई स्थिति है, जो हर नोड के अनुरोध को प्रोसेस करने के दौरान इकट्ठा होती है. - नोड: ऐसे फ़ंक्शन जो मौजूदा स्थिति को लेते हैं और उसमें अपडेट दिखाते हैं. हर नोड की एक ही ज़िम्मेदारी होती है –
analyze_alertइंटेंट का पता लगाता है,delegate_to_executorक्रू को चलाता है, औरgenerate_reportनतीजे की खास जानकारी देता है. - किनारे: नोड के बीच के फ़्लो को तय करते हैं. इस कोडलैब में, हमने एक आसान लीनियर फ़्लो (
analyze → delegate → report) का इस्तेमाल किया है. पूरी वर्कशॉप में, इसे शर्त के आधार पर राउटिंग के साथ बढ़ाया गया है. उदाहरण के लिए, एक्ज़ीक्यूटर के बजाय सुरक्षा पाथ पर डिस्ट्रक्टिव अनुरोधों को राउट करना.
प्लानर के लिए LangGraph का इस्तेमाल क्यों किया जाता है? CrewAI, टूल-कॉलिंग एजेंट के लिए बहुत अच्छा है. हालांकि, प्लानर को कंट्रोल फ़्लो तय करने की ज़रूरत होती है. जैसे, "अगर नुकसान पहुंचाने वाला है, तो सुरक्षा से जुड़े तरीके का इस्तेमाल करो; नहीं तो, किसी और को काम सौंप दो." LangGraph का स्टेट मशीन मॉडल, इस राउटिंग को साफ़ तौर पर दिखाता है और इसे टेस्ट किया जा सकता है. वहीं, CrewAI नीचे दिए गए टूल को बिना किसी तय फ़ॉर्मैट के इस्तेमाल करता है.
6. Planner and Crew को चलाना
अब हम LangGraph प्लानर और CrewAI क्रू को एक साथ टेस्ट करेंगे.
अपने Cloud Shell टर्मिनल में, यह स्क्रिप्ट चलाएं:
uv run python scale_agents.py
आपको आउटपुट में, की जा रही कार्रवाइयों के बारे में जानकारी दिखेगी:
- सूचना का विश्लेषण किया जा रहा है: LangGraph नोड चलता है.
- Crew को टास्क सौंपना: LangGraph नोड, CrewAI क्रू को कॉल करता है.
- CrewAI Execution: इसमें आपको सोर्सिंग स्पेशलिस्ट को प्रॉडक्ट ढूंढते हुए और खरीद अधिकारी को बजट की जांच करते हुए और परचेज़ ऑर्डर (पीओ) बनाते हुए दिखाया जाएगा.
- फ़ाइनल रिपोर्ट: इसमें, खास जानकारी के साथ नतीजे आखिर में प्रिंट किए जाएंगे.
आउटपुट का उदाहरण (संक्षिप्त किया गया):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
ध्यान दें: आपको जवाब में [CrewAIEventsBus] Warning: Event pairing mismatch मैसेज दिख सकते हैं. ये CrewAI की इंटरनल इवेंट ट्रैकिंग से जुड़ी सामान्य चेतावनियां हैं. इन्हें अनदेखा किया जा सकता है.
ध्यान दें: CrewAI, ट्रेसिंग की सुविधा बंद होने के बारे में मैसेज दिखा सकता है. यह सूचना के लिए है और इसे अनदेखा किया जा सकता है.
ध्यान दें: मॉक ओएमएस के लिए, बजट की सीमा 100 डॉलर है. सफलतापूर्वक काम करने के लिए, मात्रा कम रखें (लगभग दो यूनिट से कम). उदाहरण के लिए, 5,000 रुपये की कीमत वाला एक Pixel 7 फ़ोन, बजट की जांच पास कर लेता है. हालांकि, 15,000 रुपये की कीमत वाले तीन फ़ोन को "बजट से ज़्यादा" के तौर पर अस्वीकार कर दिया जाएगा.
7. Planner को A2A सर्वर में रैप करना
LangGraph प्लानर काम करता है, लेकिन यह Python प्रोसेस में फंसा हुआ है. इसे अन्य एजेंटों के लिए कॉल करने लायक बनाने के लिए, हम इसे A2A (Agent-to-Agent) सर्वर में रैप करते हैं. ऐसा इसलिए, ताकि इसे अलग-अलग फ़्रेमवर्क में लिखा जा सके या अलग-अलग मशीनों पर चलाया जा सके.
A2A, JSON-RPC 2.0 पर आधारित एक प्रोटोकॉल है. यह प्रोटोकॉल, एजेंट के कम्यूनिकेशन के तरीके को स्टैंडर्ड बनाता है. मुख्य कॉन्सेप्ट:
सिद्धांत | मकसद |
एजेंट कार्ड | JSON मेटाडेटा, जिसमें एजेंट की क्षमताओं के बारे में बताया गया है. इसे |
| एजेंट को टास्क भेजने के लिए JSON-RPC तरीका |
टास्क | स्टेट के साथ यूनिट ऑफ़ वर्क (सबमिट किया गया → काम कर रहा है → पूरा हुआ/काम नहीं किया) |
आर्टफ़ैक्ट | किसी टास्क से जुड़ी इंटरमीडिएट और फ़ाइनल आउटपुट |
नई फ़ाइल a2a_planner.py बनाएं:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
खास कॉन्सेप्ट
- एजेंट कार्ड: इसे
/.well-known/agent-card.jsonपर दिखाया जाता है. कोई भी एजेंट, इस सर्वर के बारे में जानकारी पा सकता है. इसके लिए, उसे उस यूआरएल को फ़ेच करना होगा. इसमें एजेंट के कौशल, इस्तेमाल किए जा सकने वाले कॉन्टेंट टाइप, और क्षमताओं की सूची होती है. AgentExecutor.execute(): सिर्फ़ इसी तरीके का इस्तेमाल किया जाता है. यह आने वाले अनुरोध को स्वीकार करता है, आपके एजेंट लॉजिक (यहां, LangGraph प्लानर) को लागू करता है, और नतीजों को आर्टफ़ैक्ट के तौर पर वापस भेजता है.TaskUpdater: यह टास्क की लाइफ़साइकल को मैनेज करता है –add_artifact()इंटरमीडिएट/फ़ाइनल आउटपुट भेजता है,complete()टास्क को 'पूरा हुआ' के तौर पर मार्क करता है. A2A लाइब्रेरी, सभी JSON-RPC प्लंबिंग को मैनेज करती है.
टर्मिनल में A2A सर्वर को चालू करके, उसकी जांच करें:
uv run python a2a_planner.py
कोई दूसरा Cloud Shell टैब खोलें (मौजूदा टैब के बगल में मौजूद + पर क्लिक करें) और पुष्टि करें कि एजेंट कार्ड दिखाया जा रहा है:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
आपको एजेंट कार्ड का JSON दिखेगा. अगले चरण के लिए, पहले टर्मिनल में A2A सर्वर को चालू रखें.
8. ADK कंट्रोल रूम बनाना
स्टैक में सबसे ऊपर कंट्रोल रूम है. इसे ADK (Google की एजेंट डेवलपमेंट किट) की मदद से बनाया गया है. इसे उपयोगकर्ता का अनुरोध मिलता है. यह A2A के ज़रिए, अनुरोध को प्लानर को सौंपता है. इसके बाद, यह नतीजे का आकलन करता है. साथ ही, यह प्लानिंग पूरी न होने पर फिर से प्लानिंग करता है (CUJ 2).
ADK, एजेंट प्रिमिटिव उपलब्ध कराता है. जैसे, BaseAgent, LlmAgent, और InMemoryRunner. हम कस्टम ऑर्केस्ट्रेशन लॉजिक लिखने के लिए, BaseAgent को सबक्लास करते हैं. जैसे, A2A कॉल, रिपोर्ट क्लासिफ़िकेशन, और LlmAgent सब-एजेंट के साथ डाइनैमिक री-प्लानिंग.
नई फ़ाइल control_room.py बनाएं:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
खास कॉन्सेप्ट
BaseAgent: यह कस्टम एजेंट के लिए ADK प्रिमिटिव है. इसे सबक्लास किया जाता है और_run_async_implको ओवरराइड किया जाता है, ताकि मनमुताबिक एसिंक ऑर्केस्ट्रेशन लॉजिक लिखा जा सके. यहां, A2A कॉल + क्लासिफ़ाई + री-प्लान लूप का इस्तेमाल किया गया है.- A2A JSON-RPC कॉल: Control Room, प्लानर के A2A सर्वर को
httpxका इस्तेमाल करके स्टैंडर्डmessage/sendअनुरोध भेजता है. साथ ही, JSON-RPC जवाब को पार्स करके फ़ाइनल रिपोर्ट निकालता है. _classify_report(): यह कीवर्ड के आधार पर, रिपोर्ट को अलग-अलग कैटगरी में बांटता है. इससे रिपोर्ट के टेक्स्ट से यह पता चलता है कि रिपोर्ट सफल हुई है, फिर से कोशिश की जा सकती है या रिपोर्ट पूरी नहीं हुई है. इससे फिर से प्लान बनाने की प्रोसेस शुरू होती है.- सब-एजेंट को शुरू करना: फिर से प्लान बनाने के लिए, कंट्रोल रूम एक
LlmAgentबनाता है और उसे चलाता है. इसके लिए, वह एक चाइल्डInvocationContextबनाता है औरreplanner.run_async(child_ctx)को कॉल करता है. इसकी मदद से, कस्टम ऑर्केस्ट्रेशन लॉजिक के अंदर एलएलएम एजेंट को डाइनैमिक तौर पर स्पिन अप किया जा सकता है. InMemoryRunner: यह एजेंट को स्थानीय तौर पर चलाता है. साथ ही, इसमें मेमोरी में सेव किए गए सेशन का डेटा होता है. प्रोडक्शन में, Vertex AI Agent Engine पर डिप्लॉय करने के लिए,adk deployका इस्तेमाल किया जाता है.
9. फ़ुल स्टैक चलाना
अब हम तीन लेयर वाले पूरे सिस्टम को टेस्ट करते हैं: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
आपने पहले जो दूसरा Cloud Shell टैब खोला था उसका इस्तेमाल करें. इसके अलावा, नया टैब खोलने के लिए + पर क्लिक करें और कंट्रोल रूम चलाएं. अहम जानकारी: हर Cloud Shell टैब का अपना शेल सेशन होता है. आपको प्रोजेक्ट और एनवायरमेंट वैरिएबल फिर से सेट करने होंगे:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
आपको पूरा ऑर्केस्ट्रेशन फ़्लो दिखेगा:
- कंट्रोल रूम को अनुरोध मिलता है और वह A2A सर्वर को
message/sendJSON-RPC कॉल भेजता है - A2A सर्वर को अनुरोध मिलता है और वह LangGraph प्लानर को शुरू करता है
- LangGraph प्लानर, अनुरोध का विश्लेषण करता है और उसे CrewAI क्रू को सौंपता है
- CrewAI क्रू, सोर्सिंग और खरीद एजेंटों को मैनेज करता है
- नतीजा, कंट्रोल रूम तक पहुंचता है
क्रिटिकल यूज़र जर्नी (सीयूजे)
इन स्थितियों को आज़माने के लिए, control_room.py में मौजूद prompt स्ट्रिंग में बदलाव करें:
सीयूजे | प्रॉम्प्ट | क्या होता है |
1. हैप्पी पाथ |
| Search -> budget check -> purchase order (SUCCESS). यह शुरू से आखिर तक काम करता है. |
2. फिर से प्लान किया जा रहा है |
| प्लानर, "प्रोसेस पूरी नहीं हो सकी: आइटम की जानकारी नहीं है" दिखाता है. Control Room इस समस्या का पता लगाता है और खोज के दायरे को बढ़ाने के लिए, |
सीयूजे 2 (फिर से प्लान बनाना) की जांच करने के लिए, control_room.py में मौजूद prompt को बदलकर यह करें:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
हार्डकोड किए गए प्लानर को इस आइटम के बारे में पता नहीं चलेगा. इसलिए, वह "Failed: Unknown item" मैसेज दिखाएगा. Control Room को गड़बड़ी का पता चल जाएगा. इसके बाद, वह डाइनैमिक तरीके से LlmAgent री-प्लानर बनाएगा और ज़्यादा बड़े लक्ष्य के साथ फिर से कोशिश करेगा. प्लानर सिर्फ़ "Pixel 7" को पहचानता है. इसलिए, फिर से कोशिश करने पर भी प्लान नहीं बनेगा. हालांकि, आपको प्लान बनाने की पूरी प्रोसेस दिखेगी. आखिरी आउटपुट FAILED after 2 attempts: ... होगा.
10. व्यवस्थित करें
अपने Google Cloud खाते से लगातार शुल्क लिए जाने से बचने के लिए, इस कोडलैब के दौरान बनाई गई संसाधन मिटाए जा सकते हैं. बनाई गई डायरेक्ट्री को आसानी से हटाया जा सकता है:
cd ..
rm -rf scale-agents
11. बधाई हो
बधाई हो! आपने CrewAI, LangGraph, A2A प्रोटोकॉल, और ADK का इस्तेमाल करके, मल्टी-एजेंट ऑर्केस्ट्रेशन सिस्टम बना लिया है.
आपको क्या सीखने को मिला
- CrewAI के
@toolडेकोरेटर का इस्तेमाल करके, एजेंट के लिए टूल कैसे तय करें. - अलग-अलग भूमिकाओं, टूल, और लक्ष्यों वाले खास एजेंट बनाने का तरीका.
- टास्क की डिपेंडेंसी के साथ, क्रम के मुताबिक काम करने वाली क्रू में एजेंटों को कैसे शामिल करें.
- LangGraph की मदद से, क्रू को काम सौंपने वाला स्टेट मशीन प्लानर बनाने का तरीका.
- प्लानर को
AgentCardऔरAgentExecutorके साथ A2A सेवा के तौर पर कैसे उपलब्ध कराएं. - A2A के ज़रिए काम करने वाला कस्टम ADK
BaseAgentकैसे बनाया जाता है. साथ ही,LlmAgentसब-एजेंट को लागू करके, टास्क पूरा न होने पर फिर से प्लान कैसे बनाया जाता है. - अलग-अलग फ़्रेमवर्क में प्लानिंग, एक्ज़ीक्यूशन, और ऑर्केस्ट्रेशन को अलग-अलग करने से, आपको मॉड्यूलरिटी और लचीलापन क्यों मिलता है.
ज़्यादा जानकारी
पूरी वर्कशॉप में, इस सिस्टम को इन चीज़ों के साथ बेहतर बनाया गया है:
- रीयल-टाइम डैशबोर्ड – एक से ज़्यादा एजेंट की प्रोग्रेस को विज़ुअलाइज़ करने के लिए, एसएसई स्ट्रीमिंग
- Identity Shield – यह आईएएम पर आधारित सुरक्षा है. यह प्रॉम्प्ट लेवल पर नहीं, बल्कि इन्फ़्रास्ट्रक्चर लेवल पर नुकसान पहुंचाने वाली कार्रवाइयों को रोकता है
- Vertex AI Agent Engine –
adk deployकी मदद से, मैनेज किए गए क्लाउड इन्फ़्रास्ट्रक्चर पर ADK एजेंट को डिप्लॉय करें