CrewAI, LangGraph, A2A, और ADK की मदद से एजेंट बढ़ाना

1. परिचय

इस कोडलैब में, CrewAI, LangGraph, A2A प्रोटोकॉल, और ADK (एजेंट डेवलपमेंट किट) का इस्तेमाल करके, मल्टी-एजेंट ऑर्केस्ट्रेशन सिस्टम बनाने का तरीका बताया गया है. आपको एक ऐसा सिस्टम बनाना होगा जिसमें ADK कंट्रोल रूम, प्लानिंग की ज़िम्मेदारी LangGraph प्लानर को सौंपता है. यह प्लानर, CrewAI की एक्ज़ीक्यूशन टीम को टास्क भेजता है. ये सभी, A2A के ज़रिए कनेक्ट होते हैं, ताकि खुदरा इन्वेंट्री को फिर से स्टॉक करने के काम को मैनेज किया जा सके.

मल्टी-एजेंट ऑर्केस्ट्रेशन क्या है?

मल्टी-एजेंट सिस्टम में, कई विशेषज्ञ एआई एजेंट मिलकर ऐसे टास्क पूरे करते हैं जिन्हें कोई एक एजेंट पूरा नहीं कर सकता. एक ही एजेंट से सारे काम कराने के बजाय, समस्या को अलग-अलग भूमिकाओं में बांटा जाता है. जैसे, प्लानर और एक्ज़ीक्यूटर. हर भूमिका के लिए अलग-अलग टूल और विशेषज्ञता होती है.

यह ठीक उसी तरह काम करता है जैसे कोई मानवीय संगठन काम करता है: एक मैनेजर, रणनीति बनाने का काम विश्लेषकों को और उसे लागू करने का काम विशेषज्ञों को सौंपता है. इसके ये फ़ायदे हैं:

  • काम को अलग-अलग हिस्सों में बांटना: हर एजेंट उस काम पर फ़ोकस करता है जिसे वह सबसे अच्छी तरह से कर सकता है
  • फ़्रेमवर्क को ज़रूरत के हिसाब से इस्तेमाल करना: हर भूमिका के लिए सबसे सही फ़्रेमवर्क का इस्तेमाल करें. जैसे, प्लानिंग लॉजिक के लिए LangGraph और टूल को लागू करने के लिए CrewAI
  • स्केलेबिलिटी: पूरे सिस्टम में बदलाव किए बिना, खास एजेंट जोड़ें

स्थिति

जब कोई उपयोगकर्ता, "टोक्यो ऑफ़िस के लिए 1 Pixel 7 फ़ोन फिर से स्टॉक में उपलब्ध कराएं" जैसा रीस्टॉक करने का अनुरोध भेजता है, तो सिस्टम:

  1. LangGraph Planner, अनुरोध का विश्लेषण करता है और आइटम और संख्या की जानकारी निकालता है
  2. Planner, प्लान को लागू करने का काम CrewAI Execution Crew को सौंपता है
  3. सोर्सिंग स्पेशलिस्ट एजेंट, टूल का इस्तेमाल करके प्रॉडक्ट कैटलॉग खोजता है
  4. खरीदारी अधिकारी एजेंट, बजट की पुष्टि करता है और टूल का इस्तेमाल करके खरीदारी का ऑर्डर देता है
  5. नतीजा प्लानर को वापस भेज दिया जाता है. इसके बाद, प्लानर एक फ़ाइनल रिपोर्ट जनरेट करता है
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

टेक्नोलॉजी स्टैक

परत

टेक्नोलॉजी

भूमिका

प्लानिंग

LangGraph

स्टेट मशीन, अनुरोधों का विश्लेषण करती है, उन्हें रूट करती है, और रिपोर्ट जनरेट करती है

लागू करना

CrewAI

भूमिका के आधार पर एजेंट, जो कॉल टूल को क्रम से कॉल करते हैं

LLM

Vertex AI पर Gemini

इससे एजेंट को तर्क करने और टूल चुनने में मदद मिलती है

एजेंट के बीच बातचीत

A2A प्रोटोकॉल

JSON-RPC 2.0 ब्रिज, ताकि अलग-अलग फ़्रेमवर्क के एजेंट आपस में बातचीत कर सकें

टॉप-लेवल ऑर्केस्ट्रेटर

ADK (BaseAgent)

अनुरोध स्वीकार करता है, A2A के ज़रिए काम सौंपता है, और काम पूरा न होने पर फिर से प्लान बनाता है

इसे काम करते हुए देखें: अगर यह सुविधा उपलब्ध है, तो https://scale-control-room-761793285222.us-central1.run.app पर जाकर, प्रोडक्शन सिस्टम को आज़माएं. इसमें रीयल-टाइम डैशबोर्ड, A2A प्रोटोकॉल, और IAM सुरक्षा की सुविधा मिलती है.

आपको क्या करना होगा

  • एजेंट के इस्तेमाल के लिए कस्टम टूल तय करना.
  • CrewAI की मदद से, खास एजेंट बनाएँ.
  • LangGraph की मदद से, स्टेट मशीन प्लानर बनाएं.
  • प्लानर और एक्ज़ीक्यूशन क्रू के बीच के फ़्लो को मैनेज करना.
  • अलग-अलग फ़्रेमवर्क के बीच कम्यूनिकेशन के लिए, प्लानर को A2A प्रोटोकॉल सर्वर में रैप करें.
  • टॉप-लेवल का ADK कंट्रोल रूम बनाएं. यह A2A के ज़रिए काम करता है और अगर कोई समस्या आती है, तो फिर से प्लान बनाता है.

आपको किन चीज़ों की ज़रूरत होगी

  • कोई वेब ब्राउज़र, जैसे कि Chrome
  • बिलिंग की सुविधा वाला Google Cloud प्रोजेक्ट

यह कोडलैब, इंटरमीडिएट डेवलपर के लिए है. इन्हें Python और एलएलएम के बुनियादी सिद्धांतों के बारे में पता है.

अनुमानित अवधि: 35 मिनट.

लागत का अनुमान: इस कोडलैब में बनाए गए संसाधनों की लागत, 1 डॉलर से कम होनी चाहिए.

2. शुरू करने से पहले

Google Cloud प्रोजेक्ट बनाना

  1. Google Cloud Console में, प्रोजेक्ट चुनने वाले पेज पर, Google Cloud प्रोजेक्ट चुनें या बनाएं.
  2. पक्का करें कि आपके Cloud प्रोजेक्ट के लिए बिलिंग चालू हो. किसी प्रोजेक्ट के लिए बिलिंग चालू है या नहीं, यह देखने का तरीका जानें.

Cloud Shell शुरू करना

Cloud Shell, Google Cloud में चलने वाला एक कमांड-लाइन एनवायरमेंट है. इसमें ज़रूरी टूल पहले से लोड होते हैं.

  1. Google Cloud कंसोल में सबसे ऊपर मौजूद, Cloud Shell चालू करें पर क्लिक करें.
  2. Cloud Shell से कनेक्ट होने के बाद, अपने क्रेडेंशियल की पुष्टि करें:
    gcloud auth list
    
  3. पुष्टि करें कि आपका प्रोजेक्ट कॉन्फ़िगर किया गया है:
    gcloud config get project
    
  4. अगर आपका प्रोजेक्ट उम्मीद के मुताबिक सेट नहीं है, तो इसे सेट करें:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

एपीआई चालू करें

Vertex AI API को चालू करने के लिए, यह निर्देश चलाएं:

gcloud services enable aiplatform.googleapis.com

ध्यान दें: Cloud Shell, आपके Google Cloud खाते से अपने-आप पुष्टि करता है. अगर इस कोडलैब को Cloud Shell के बाहर चलाया जा रहा है, तो आपको Vertex AI से पुष्टि करने के लिए, gcloud auth application-default login चलाना होगा.

अपना एनवायरमेंट सेट अप करने का तरीका

Cloud Shell में, अपने प्रोजेक्ट के लिए एक नई डायरेक्ट्री बनाएं और उसमें जाएं:

mkdir scale-agents
cd scale-agents

uv इंस्टॉल करें और इसका इस्तेमाल करके ज़रूरी पैकेज इंस्टॉल करें:

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

Vertex AI के लिए एनवायरमेंट वैरिएबल सेट करें:

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

3. टूल और एजेंट तय करना

मल्टी-एजेंट सिस्टम में, एजेंट को दुनिया से इंटरैक्ट करने के लिए टूल की ज़रूरत होती है. साथ ही, उन्हें यह पता होना चाहिए कि उन्हें क्या करना है.

scale_agents.py नाम की एक फ़ाइल बनाएं और इसमें यह कोड जोड़ें. इससे इंपोर्ट, मॉक टूल, और CrewAI एजेंट सेट अप किए जाते हैं.

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

खास कॉन्सेप्ट

  • @tool डेकोरेटर: CrewAI इसका इस्तेमाल, सामान्य Python फ़ंक्शन को ऐसे टूल में बदलने के लिए करता है जिन्हें एलएलएम समझ सकें और कॉल कर सकें. फ़ंक्शन के टाइप हिंट और डॉकस्ट्रिंग का इस्तेमाल करके, टूल स्कीमा जनरेट किया जाता है. इसे एलएलएम समझ सकता है.
  • भूमिका, लक्ष्य, और बैकस्टोरी: इनसे एजेंट की भूमिका तय होती है. साथ ही, ये एलएलएम को तर्क करने में मदद करते हैं. बैकस्टोरी सिर्फ़ जानकारी देने के लिए नहीं है. "कैटलॉग में हमेशा खोजें" से एजेंट को जवाबों के बारे में भ्रमित होने के बजाय, अपने टूल इस्तेमाल करने के लिए बढ़ावा मिलता है.
  • reasoning=False: इससे जवाब देने के लिए ज़्यादा सोच-विचार करने की सुविधा बंद हो जाती है. इसलिए, एजेंट सीधे जवाब देने के बजाय, टूल को कॉल करने के स्टैंडर्ड लूप को फ़ॉलो करता है.
  • allow_delegation=False: इससे हर एजेंट को, उसे असाइन किए गए टूल पर फ़ोकस करने में मदद मिलती है. इससे काम को दूसरे एजेंट को नहीं सौंपा जाता.

एक के बजाय दो एजेंट क्यों? हर एजेंट के पास अलग-अलग टूल होते हैं और उसका काम भी अलग होता है. सोर्सिंग स्पेशलिस्ट सिर्फ़ प्रॉडक्ट खोजता है. वहीं, खरीद अधिकारी सिर्फ़ बजट और ऑर्डर मैनेज करता है. ज़िम्मेदारियों को अलग-अलग करने का मतलब है कि हर एजेंट के पास एक फ़ोकस वाला प्रॉम्प्ट और काम के टूल का एक छोटा सेट होता है. इससे एलएलएम, एक ही एजेंट के मुकाबले ज़्यादा भरोसेमंद तरीके से काम करता है.

4. टास्क और क्रू को परिभाषित करना

अब हम यह तय करेंगे कि इन एजेंट को क्या करना है. इसके लिए, टास्क बनाए जाएंगे और उन्हें क्रू में शामिल किया जाएगा.

इसी scale_agents.py फ़ाइल के आखिर में यह कोड जोड़ें:

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

खास कॉन्सेप्ट

  • टास्क का कॉन्टेक्स्ट: context=[sourcing_task] से CrewAI को पता चलता है कि खरीदारी के टास्क को पूरा करने के लिए, सोर्सिंग के टास्क का आउटपुट ज़रूरी है. खरीदारी अधिकारी, यह देख सकता है कि सोर्सिंग स्पेशलिस्ट ने क्या जानकारी इकट्ठा की है. इसके बाद, वह यह तय कर सकता है कि क्या ऑर्डर करना है.
  • Process.sequential: टास्क उसी क्रम में लागू होते हैं जिस क्रम में वे सूची में दिखते हैं. यह ज़रूरी है, क्योंकि खरीदारी का टास्क, सोर्सिंग के टास्क के नतीजों पर निर्भर करता है. आपको यह पता होने से पहले कि कौन-सा प्रॉडक्ट खरीदना है, ऑर्डर नहीं दिया जा सकता.
  • memory=False / planning=False: इस डेमो के लिए, CrewAI की बिल्ट-इन मेमोरी और प्लानिंग की सुविधाओं को बंद कर देता है, ताकि इसे आसानी से और अनुमान के मुताबिक पूरा किया जा सके.

5. LangGraph Planner बनाना

एक्ज़ीक्यूशन क्रू, "कैसे" से जुड़ी चीज़ें मैनेज करता है. जैसे, प्रॉडक्ट खोजना, बजट की जांच करना, और ऑर्डर देना. लेकिन "क्या" तय कौन करता है? यह प्लानिंग एजेंट है, जिसे LangGraph की मदद से बनाया गया है.

LangGraph, वर्कफ़्लो को स्टेट मशीन के तौर पर मॉडल करता है. यह नोड (फ़ंक्शन) का एक ऐसा ग्राफ़ होता है जो किनारों (ट्रांज़िशन) से जुड़ा होता है. स्टेट, ग्राफ़ में फ़्लो होता है. हर नोड, शेयर की गई स्टेट से पढ़ता है और उसमें लिखता है. यह प्लानिंग वर्कफ़्लो के लिए सबसे सही है. इसमें आपको कंट्रोल फ़्लो को साफ़ तौर पर तय करने की ज़रूरत होती है: अनुरोध का विश्लेषण करें, क्रू को काम सौंपें, और रिपोर्ट जनरेट करें.

इसी scale_agents.py फ़ाइल के आखिर में यह कोड जोड़ें:

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

खास कॉन्सेप्ट

  • StateGraph: यह स्टेट मशीन को तय करता है. PlanState टाइप की गई स्थिति है, जो हर नोड के अनुरोध को प्रोसेस करने के दौरान इकट्ठा होती है.
  • नोड: ऐसे फ़ंक्शन जो मौजूदा स्थिति को लेते हैं और उसमें अपडेट दिखाते हैं. हर नोड की एक ही ज़िम्मेदारी होती है – analyze_alert इंटेंट का पता लगाता है, delegate_to_executor क्रू को चलाता है, और generate_report नतीजे की खास जानकारी देता है.
  • किनारे: नोड के बीच के फ़्लो को तय करते हैं. इस कोडलैब में, हमने एक आसान लीनियर फ़्लो (analyze → delegate → report) का इस्तेमाल किया है. पूरी वर्कशॉप में, इसे शर्त के आधार पर राउटिंग के साथ बढ़ाया गया है. उदाहरण के लिए, एक्ज़ीक्यूटर के बजाय सुरक्षा पाथ पर डिस्ट्रक्टिव अनुरोधों को राउट करना.

प्लानर के लिए LangGraph का इस्तेमाल क्यों किया जाता है? CrewAI, टूल-कॉलिंग एजेंट के लिए बहुत अच्छा है. हालांकि, प्लानर को कंट्रोल फ़्लो तय करने की ज़रूरत होती है. जैसे, "अगर नुकसान पहुंचाने वाला है, तो सुरक्षा से जुड़े तरीके का इस्तेमाल करो; नहीं तो, किसी और को काम सौंप दो." LangGraph का स्टेट मशीन मॉडल, इस राउटिंग को साफ़ तौर पर दिखाता है और इसे टेस्ट किया जा सकता है. वहीं, CrewAI नीचे दिए गए टूल को बिना किसी तय फ़ॉर्मैट के इस्तेमाल करता है.

6. Planner and Crew को चलाना

अब हम LangGraph प्लानर और CrewAI क्रू को एक साथ टेस्ट करेंगे.

अपने Cloud Shell टर्मिनल में, यह स्क्रिप्ट चलाएं:

uv run python scale_agents.py

आपको आउटपुट में, की जा रही कार्रवाइयों के बारे में जानकारी दिखेगी:

  1. सूचना का विश्लेषण किया जा रहा है: LangGraph नोड चलता है.
  2. Crew को टास्क सौंपना: LangGraph नोड, CrewAI क्रू को कॉल करता है.
  3. CrewAI Execution: इसमें आपको सोर्सिंग स्पेशलिस्ट को प्रॉडक्ट ढूंढते हुए और खरीद अधिकारी को बजट की जांच करते हुए और परचेज़ ऑर्डर (पीओ) बनाते हुए दिखाया जाएगा.
  4. फ़ाइनल रिपोर्ट: इसमें, खास जानकारी के साथ नतीजे आखिर में प्रिंट किए जाएंगे.

आउटपुट का उदाहरण (संक्षिप्त किया गया):

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

ध्यान दें: आपको जवाब में [CrewAIEventsBus] Warning: Event pairing mismatch मैसेज दिख सकते हैं. ये CrewAI की इंटरनल इवेंट ट्रैकिंग से जुड़ी सामान्य चेतावनियां हैं. इन्हें अनदेखा किया जा सकता है.

ध्यान दें: CrewAI, ट्रेसिंग की सुविधा बंद होने के बारे में मैसेज दिखा सकता है. यह सूचना के लिए है और इसे अनदेखा किया जा सकता है.

ध्यान दें: मॉक ओएमएस के लिए, बजट की सीमा 100 डॉलर है. सफलतापूर्वक काम करने के लिए, मात्रा कम रखें (लगभग दो यूनिट से कम). उदाहरण के लिए, 5,000 रुपये की कीमत वाला एक Pixel 7 फ़ोन, बजट की जांच पास कर लेता है. हालांकि, 15,000 रुपये की कीमत वाले तीन फ़ोन को "बजट से ज़्यादा" के तौर पर अस्वीकार कर दिया जाएगा.

7. Planner को A2A सर्वर में रैप करना

LangGraph प्लानर काम करता है, लेकिन यह Python प्रोसेस में फंसा हुआ है. इसे अन्य एजेंटों के लिए कॉल करने लायक बनाने के लिए, हम इसे A2A (Agent-to-Agent) सर्वर में रैप करते हैं. ऐसा इसलिए, ताकि इसे अलग-अलग फ़्रेमवर्क में लिखा जा सके या अलग-अलग मशीनों पर चलाया जा सके.

A2A, JSON-RPC 2.0 पर आधारित एक प्रोटोकॉल है. यह प्रोटोकॉल, एजेंट के कम्यूनिकेशन के तरीके को स्टैंडर्ड बनाता है. मुख्य कॉन्सेप्ट:

सिद्धांत

मकसद

एजेंट कार्ड

JSON मेटाडेटा, जिसमें एजेंट की क्षमताओं के बारे में बताया गया है. इसे /.well-known/agent-card.json पर दिखाया जाता है

message/send

एजेंट को टास्क भेजने के लिए JSON-RPC तरीका

टास्क

स्टेट के साथ यूनिट ऑफ़ वर्क (सबमिट किया गया → काम कर रहा है → पूरा हुआ/काम नहीं किया)

आर्टफ़ैक्ट

किसी टास्क से जुड़ी इंटरमीडिएट और फ़ाइनल आउटपुट

नई फ़ाइल a2a_planner.py बनाएं:

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

खास कॉन्सेप्ट

  • एजेंट कार्ड: इसे /.well-known/agent-card.json पर दिखाया जाता है. कोई भी एजेंट, इस सर्वर के बारे में जानकारी पा सकता है. इसके लिए, उसे उस यूआरएल को फ़ेच करना होगा. इसमें एजेंट के कौशल, इस्तेमाल किए जा सकने वाले कॉन्टेंट टाइप, और क्षमताओं की सूची होती है.
  • AgentExecutor.execute(): सिर्फ़ इसी तरीके का इस्तेमाल किया जाता है. यह आने वाले अनुरोध को स्वीकार करता है, आपके एजेंट लॉजिक (यहां, LangGraph प्लानर) को लागू करता है, और नतीजों को आर्टफ़ैक्ट के तौर पर वापस भेजता है.
  • TaskUpdater: यह टास्क की लाइफ़साइकल को मैनेज करता है – add_artifact() इंटरमीडिएट/फ़ाइनल आउटपुट भेजता है, complete() टास्क को 'पूरा हुआ' के तौर पर मार्क करता है. A2A लाइब्रेरी, सभी JSON-RPC प्लंबिंग को मैनेज करती है.

टर्मिनल में A2A सर्वर को चालू करके, उसकी जांच करें:

uv run python a2a_planner.py

कोई दूसरा Cloud Shell टैब खोलें (मौजूदा टैब के बगल में मौजूद + पर क्लिक करें) और पुष्टि करें कि एजेंट कार्ड दिखाया जा रहा है:

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

आपको एजेंट कार्ड का JSON दिखेगा. अगले चरण के लिए, पहले टर्मिनल में A2A सर्वर को चालू रखें.

8. ADK कंट्रोल रूम बनाना

स्टैक में सबसे ऊपर कंट्रोल रूम है. इसे ADK (Google की एजेंट डेवलपमेंट किट) की मदद से बनाया गया है. इसे उपयोगकर्ता का अनुरोध मिलता है. यह A2A के ज़रिए, अनुरोध को प्लानर को सौंपता है. इसके बाद, यह नतीजे का आकलन करता है. साथ ही, यह प्लानिंग पूरी न होने पर फिर से प्लानिंग करता है (CUJ 2).

ADK, एजेंट प्रिमिटिव उपलब्ध कराता है. जैसे, BaseAgent, LlmAgent, और InMemoryRunner. हम कस्टम ऑर्केस्ट्रेशन लॉजिक लिखने के लिए, BaseAgent को सबक्लास करते हैं. जैसे, A2A कॉल, रिपोर्ट क्लासिफ़िकेशन, और LlmAgent सब-एजेंट के साथ डाइनैमिक री-प्लानिंग.

नई फ़ाइल control_room.py बनाएं:

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

खास कॉन्सेप्ट

  • BaseAgent: यह कस्टम एजेंट के लिए ADK प्रिमिटिव है. इसे सबक्लास किया जाता है और _run_async_impl को ओवरराइड किया जाता है, ताकि मनमुताबिक एसिंक ऑर्केस्ट्रेशन लॉजिक लिखा जा सके. यहां, A2A कॉल + क्लासिफ़ाई + री-प्लान लूप का इस्तेमाल किया गया है.
  • A2A JSON-RPC कॉल: Control Room, प्लानर के A2A सर्वर को httpx का इस्तेमाल करके स्टैंडर्ड message/send अनुरोध भेजता है. साथ ही, JSON-RPC जवाब को पार्स करके फ़ाइनल रिपोर्ट निकालता है.
  • _classify_report(): यह कीवर्ड के आधार पर, रिपोर्ट को अलग-अलग कैटगरी में बांटता है. इससे रिपोर्ट के टेक्स्ट से यह पता चलता है कि रिपोर्ट सफल हुई है, फिर से कोशिश की जा सकती है या रिपोर्ट पूरी नहीं हुई है. इससे फिर से प्लान बनाने की प्रोसेस शुरू होती है.
  • सब-एजेंट को शुरू करना: फिर से प्लान बनाने के लिए, कंट्रोल रूम एक LlmAgent बनाता है और उसे चलाता है. इसके लिए, वह एक चाइल्ड InvocationContext बनाता है और replanner.run_async(child_ctx) को कॉल करता है. इसकी मदद से, कस्टम ऑर्केस्ट्रेशन लॉजिक के अंदर एलएलएम एजेंट को डाइनैमिक तौर पर स्पिन अप किया जा सकता है.
  • InMemoryRunner: यह एजेंट को स्थानीय तौर पर चलाता है. साथ ही, इसमें मेमोरी में सेव किए गए सेशन का डेटा होता है. प्रोडक्शन में, Vertex AI Agent Engine पर डिप्लॉय करने के लिए, adk deploy का इस्तेमाल किया जाता है.

9. फ़ुल स्टैक चलाना

अब हम तीन लेयर वाले पूरे सिस्टम को टेस्ट करते हैं: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.

आपने पहले जो दूसरा Cloud Shell टैब खोला था उसका इस्तेमाल करें. इसके अलावा, नया टैब खोलने के लिए + पर क्लिक करें और कंट्रोल रूम चलाएं. अहम जानकारी: हर Cloud Shell टैब का अपना शेल सेशन होता है. आपको प्रोजेक्ट और एनवायरमेंट वैरिएबल फिर से सेट करने होंगे:

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

आपको पूरा ऑर्केस्ट्रेशन फ़्लो दिखेगा:

  1. कंट्रोल रूम को अनुरोध मिलता है और वह A2A सर्वर को message/send JSON-RPC कॉल भेजता है
  2. A2A सर्वर को अनुरोध मिलता है और वह LangGraph प्लानर को शुरू करता है
  3. LangGraph प्लानर, अनुरोध का विश्लेषण करता है और उसे CrewAI क्रू को सौंपता है
  4. CrewAI क्रू, सोर्सिंग और खरीद एजेंटों को मैनेज करता है
  5. नतीजा, कंट्रोल रूम तक पहुंचता है

क्रिटिकल यूज़र जर्नी (सीयूजे)

इन स्थितियों को आज़माने के लिए, control_room.py में मौजूद prompt स्ट्रिंग में बदलाव करें:

सीयूजे

प्रॉम्प्ट

क्या होता है

1. हैप्पी पाथ

Restock 1 Pixel 7 phones for the Tokyo office

Search -> budget check -> purchase order (SUCCESS). यह शुरू से आखिर तक काम करता है.

2. फिर से प्लान किया जा रहा है

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

प्लानर, "प्रोसेस पूरी नहीं हो सकी: आइटम की जानकारी नहीं है" दिखाता है. Control Room इस समस्या का पता लगाता है और खोज के दायरे को बढ़ाने के लिए, LlmAgent री-प्लानर को शुरू करता है. दोनों कोशिशें पूरी नहीं होंगी, क्योंकि हार्डकोड किए गए प्लानर को सिर्फ़ "Pixel 7" के बारे में पता है. हालांकि, आपको फिर से प्लान बनाने की पूरी प्रक्रिया दिखेगी.

सीयूजे 2 (फिर से प्लान बनाना) की जांच करने के लिए, control_room.py में मौजूद prompt को बदलकर यह करें:

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

हार्डकोड किए गए प्लानर को इस आइटम के बारे में पता नहीं चलेगा. इसलिए, वह "Failed: Unknown item" मैसेज दिखाएगा. Control Room को गड़बड़ी का पता चल जाएगा. इसके बाद, वह डाइनैमिक तरीके से LlmAgent री-प्लानर बनाएगा और ज़्यादा बड़े लक्ष्य के साथ फिर से कोशिश करेगा. प्लानर सिर्फ़ "Pixel 7" को पहचानता है. इसलिए, फिर से कोशिश करने पर भी प्लान नहीं बनेगा. हालांकि, आपको प्लान बनाने की पूरी प्रोसेस दिखेगी. आखिरी आउटपुट FAILED after 2 attempts: ... होगा.

10. व्यवस्थित करें

अपने Google Cloud खाते से लगातार शुल्क लिए जाने से बचने के लिए, इस कोडलैब के दौरान बनाई गई संसाधन मिटाए जा सकते हैं. बनाई गई डायरेक्ट्री को आसानी से हटाया जा सकता है:

cd ..
rm -rf scale-agents

11. बधाई हो

बधाई हो! आपने CrewAI, LangGraph, A2A प्रोटोकॉल, और ADK का इस्तेमाल करके, मल्टी-एजेंट ऑर्केस्ट्रेशन सिस्टम बना लिया है.

आपको क्या सीखने को मिला

  • CrewAI के @tool डेकोरेटर का इस्तेमाल करके, एजेंट के लिए टूल कैसे तय करें.
  • अलग-अलग भूमिकाओं, टूल, और लक्ष्यों वाले खास एजेंट बनाने का तरीका.
  • टास्क की डिपेंडेंसी के साथ, क्रम के मुताबिक काम करने वाली क्रू में एजेंटों को कैसे शामिल करें.
  • LangGraph की मदद से, क्रू को काम सौंपने वाला स्टेट मशीन प्लानर बनाने का तरीका.
  • प्लानर को AgentCard और AgentExecutor के साथ A2A सेवा के तौर पर कैसे उपलब्ध कराएं.
  • A2A के ज़रिए काम करने वाला कस्टम ADK BaseAgent कैसे बनाया जाता है. साथ ही, LlmAgent सब-एजेंट को लागू करके, टास्क पूरा न होने पर फिर से प्लान कैसे बनाया जाता है.
  • अलग-अलग फ़्रेमवर्क में प्लानिंग, एक्ज़ीक्यूशन, और ऑर्केस्ट्रेशन को अलग-अलग करने से, आपको मॉड्यूलरिटी और लचीलापन क्यों मिलता है.

ज़्यादा जानकारी

पूरी वर्कशॉप में, इस सिस्टम को इन चीज़ों के साथ बेहतर बनाया गया है:

  • रीयल-टाइम डैशबोर्ड – एक से ज़्यादा एजेंट की प्रोग्रेस को विज़ुअलाइज़ करने के लिए, एसएसई स्ट्रीमिंग
  • Identity Shield – यह आईएएम पर आधारित सुरक्षा है. यह प्रॉम्प्ट लेवल पर नहीं, बल्कि इन्फ़्रास्ट्रक्चर लेवल पर नुकसान पहुंचाने वाली कार्रवाइयों को रोकता है
  • Vertex AI Agent Engineadk deploy की मदद से, मैनेज किए गए क्लाउड इन्फ़्रास्ट्रक्चर पर ADK एजेंट को डिप्लॉय करें

रेफ़रंस दस्तावेज़