مقیاس‌پذیری نمایندگان با CrewAI، LangGraph، A2A و ​​ADK

۱. مقدمه

در این آزمایشگاه کد، شما یاد خواهید گرفت که چگونه یک سیستم ارکستراسیون چند عاملی با استفاده از CrewAI ، LangGraph ، پروتکل A2A و ​​ADK (کیت توسعه عامل) بسازید. شما سیستمی ایجاد خواهید کرد که در آن یک اتاق کنترل ADK برنامه‌ریزی را به یک برنامه‌ریز LangGraph واگذار می‌کند، که وظایف را به یک تیم اجرایی CrewAI - که همگی از طریق A2A به هم متصل هستند - ارسال می‌کند تا یک سناریوی انبارداری مجدد خرده‌فروشی را مدیریت کند.

ارکستراسیون چند عاملی چیست؟

در یک سیستم چندعاملی ، چندین عامل هوش مصنوعی متخصص برای انجام وظایفی که برای یک عامل واحد بسیار پیچیده است، با یکدیگر همکاری می‌کنند. به جای اینکه یک عامل یکپارچه همه کارها را انجام دهد، شما مسئله را به نقش‌هایی - یک برنامه‌ریز و یک مجری - تجزیه می‌کنید که هر کدام ابزار و تخصص خاص خود را دارند.

این نشان دهنده نحوه کار سازمان‌های انسانی است: یک مدیر، استراتژی را به تحلیلگران و اجرا را به متخصصان واگذار می‌کند. مزایای این کار عبارتند از:

  • تفکیک دغدغه‌ها : هر عامل بر کاری که در آن بهترین است تمرکز می‌کند
  • انعطاف‌پذیری چارچوب : استفاده از بهترین چارچوب برای هر نقش (LangGraph برای منطق برنامه‌ریزی، CrewAI برای اجرای ابزار)
  • مقیاس‌پذیری : اضافه کردن عوامل تخصصی بدون تغییر کل سیستم

سناریو

وقتی کاربری درخواست موجودی مجدد مانند «۱ گوشی Pixel 7 برای دفتر توکیو موجودی مجدد» ارسال می‌کند، سیستم:

  1. برنامه‌ریز LangGraph درخواست را تجزیه و تحلیل کرده و کالا و مقدار آن را استخراج می‌کند.
  2. برنامه‌ریز، اجرا را به گروه اجرایی CrewAI واگذار می‌کند.
  3. یک متخصص منابع انسانی با استفاده از ابزارها، کاتالوگ محصولات را جستجو می‌کند.
  4. یک نماینده مسئول تدارکات، بودجه را تأیید می‌کند و با استفاده از ابزارها، سفارش خرید را ثبت می‌کند.
  5. نتیجه به برنامه‌ریز برمی‌گردد که گزارش نهایی را تولید می‌کند.
User Request
    
    
┌──────────────────────┐
 ADK Control Room        Top-level orchestrator, re-plans on failure
 (BaseAgent)          
└──────────┬───────────┘
            A2A (JSON-RPC)
           
┌──────────────────────┐
 LangGraph Planner       Analyzes intent, delegates, reports
 (State Machine)      
└──────────┬───────────┘
           
           
┌──────────────────────┐
 CrewAI Execution Crew   Runs agents with tools
  ├─ Sourcing Agent      search_products
  └─ Procurement Agent   check_budget, create_purchase_order
└──────────────────────┘

پشته فنی

لایه

فناوری

نقش

برنامه‌ریزی

لانگ‌گراف

دستگاه حالت که قصد را تجزیه و تحلیل می‌کند، درخواست‌ها را مسیریابی می‌کند و گزارش تولید می‌کند

اعدام

CrewAI

عامل‌های مبتنی بر نقش که ابزارها را به صورت متوالی فراخوانی می‌کنند

کارشناسی ارشد حقوق

جمینی در Vertex AI

استدلال عامل قدرت و انتخاب ابزار

ارتباط بین عاملی

پروتکل A2A

پل JSON-RPC 2.0 تا عامل‌ها از چارچوب‌های مختلف بتوانند با هم صحبت کنند

ارکستراتور سطح بالا

ADK (عامل پایه)

درخواست‌ها و نمایندگان را از طریق A2A دریافت می‌کند و در صورت عدم موفقیت، برنامه‌ریزی مجدد انجام می‌دهد.

آن را در عمل ببینید: در صورت موجود بودن، سیستم تولید کامل را در https://scale-control-room-761793285222.us-central1.run.app امتحان کنید - این سیستم آنچه را که در اینجا خواهید ساخت با یک داشبورد بلادرنگ، پروتکل A2A و ​​امنیت IAM گسترش می‌دهد.

کاری که انجام خواهید داد

  • ابزارهای سفارشی برای استفاده نمایندگان تعریف کنید.
  • با CrewAI ماموران متخصص بسازید.
  • یک برنامه‌ریز ماشین حالت با LangGraph ایجاد کنید.
  • جریان بین برنامه‌ریز و تیم اجرا را هماهنگ کنید.
  • برنامه‌ریز را برای ارتباط بین فریم‌ورکی در یک سرور پروتکل A2A قرار دهید.
  • یک اتاق کنترل ADK سطح بالا بسازید که از طریق A2A تفویض اختیار کند و در صورت شکست، دوباره برنامه‌ریزی کند.

آنچه نیاز دارید

  • یک مرورگر وب مانند کروم
  • یک پروژه گوگل کلود با قابلیت پرداخت صورتحساب

این آزمایشگاه کد برای توسعه‌دهندگان سطح متوسط ​​است که با پایتون و مفاهیم پایه LLM آشنا هستند.

مدت زمان تخمینی: ۳۵ دقیقه .

تخمین هزینه: منابع ایجاد شده در این آزمایشگاه کد باید کمتر از ۱ دلار هزینه داشته باشند.

۲. قبل از شروع

ایجاد یک پروژه ابری گوگل

  1. در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید .
  2. مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .

شروع پوسته ابری

Cloud Shell یک محیط خط فرمان است که در Google Cloud اجرا می‌شود و ابزارهای لازم از قبل روی آن بارگذاری شده‌اند.

  1. روی فعال کردن Cloud Shell در بالای کنسول Google Cloud کلیک کنید.
  2. پس از اتصال به Cloud Shell، احراز هویت خود را تأیید کنید:
    gcloud auth list
    
  3. تأیید کنید که پروژه شما پیکربندی شده است:
    gcloud config get project
    
  4. اگر پروژه شما مطابق انتظار تنظیم نشده است، آن را تنظیم کنید:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

فعال کردن APIها

برای فعال کردن Vertex AI API، این دستور را اجرا کنید:

gcloud services enable aiplatform.googleapis.com

توجه: Cloud Shell به طور خودکار با حساب Google Cloud شما احراز هویت می‌شود. اگر این codelab را خارج از Cloud Shell اجرا می‌کنید، برای احراز هویت با Vertex AI باید gcloud auth application-default login اجرا کنید.

محیط خود را تنظیم کنید

در Cloud Shell، یک دایرکتوری جدید برای پروژه خود ایجاد کنید و به آن بروید:

mkdir scale-agents
cd scale-agents

uv را نصب کنید و از آن برای نصب بسته‌های مورد نیاز استفاده کنید:

curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow

متغیرهای محیطی را برای Vertex AI تنظیم کنید:

export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE

۳. ابزارها و عامل‌ها را تعریف کنید

در یک سیستم چندعاملی، عامل‌ها به ابزارهایی برای تعامل با جهان و نقش‌های خاصی برای دانستن اینکه چه کاری باید انجام دهند، نیاز دارند.

فایلی با نام scale_agents.py ایجاد کنید و کد زیر را به آن اضافه کنید. این کد، ایمپورت‌ها، ابزارهای شبیه‌سازی و عوامل CrewAI را تنظیم می‌کند.

import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"

# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"

# Initialize the LLM to use Vertex AI
llm = LLM(
    model="vertex_ai/gemini-2.5-flash",
    temperature=0.0,
    max_tokens=4096,
)

# --- Step 1: Define Tools ---

@tool("search_products")
def search_products(query: str) -> list:
    """Search for products in the catalog."""
    # Mock product catalog
    products = [
        {"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
        {"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
    ]
    return [p for p in products if query.lower() in p["name"].lower()]

@tool("check_budget")
def check_budget(amount: float) -> dict:
    """Check if a purchase amount is within the budget."""
    limit = 100.0
    if amount <= limit:
        return {"approved": True, "remaining": limit - amount}
    return {"approved": False, "reason": f"Exceeds budget of ${limit}"}

@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
    """Create a purchase order for a product."""
    return {
        "status": "SUCCESS",
        "po_id": f"PO-{product_id}-{quantity}",
        "message": f"Successfully ordered {quantity} units of {product_id}."
    }

# --- Step 2: Define Agents ---

sourcing_agent = Agent(
    role="Sourcing Specialist",
    goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
    backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
    tools=[search_products],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

procurement_agent = Agent(
    role="Procurement Officer",
    goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
    backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
    tools=[check_budget, create_purchase_order],
    llm=llm,
    verbose=True,
    allow_delegation=False,
    memory=False,
    reasoning=False,
)

مفاهیم کلیدی

  • @tool decorator : CrewAI از این برای تبدیل توابع معمولی پایتون به ابزارهایی که LLMها بتوانند آنها را بفهمند و فراخوانی کنند، استفاده می‌کند. نکات نوع تابع و رشته سند آن برای تولید یک طرحواره ابزار که LLM بتواند آن را بفهمد، استفاده می‌شوند.
  • نقش، هدف و پیشینه : این‌ها شخصیت عامل را تعریف می‌کنند و استدلال LLM آن را هدایت می‌کنند. پیشینه فقط متن خوش‌طعم نیست - جمله‌ی «شما همیشه کاتالوگ را جستجو می‌کنید» عامل را تشویق می‌کند تا از ابزارهای خود به جای پاسخ‌های توهم‌زا استفاده کند.
  • reasoning=False : استدلال گسترده را غیرفعال می‌کند، بنابراین عامل به جای تلاش برای پاسخ مستقیم، حلقه استاندارد فراخوانی ابزار را دنبال می‌کند.
  • allow_delegation=False : هر عامل را به جای واگذاری کار به سایر عامل‌ها، روی ابزارهای اختصاص داده شده به خودش متمرکز نگه می‌دارد.

چرا به جای یک نماینده، دو نماینده داریم؟ هر نماینده ابزار و وظیفه متفاوتی دارد. متخصص تامین فقط محصولات را جستجو می‌کند؛ مسئول تدارکات فقط بودجه‌ها و سفارشات را مدیریت می‌کند. این تفکیک وظایف به این معنی است که هر نماینده یک دستورالعمل متمرکز و یک مجموعه ابزار کوچک و مرتبط دارد - که منجر به رفتار LLM قابل اعتمادتری نسبت به یک نماینده واحد می‌شود که همه چیز را مدیریت می‌کند.

۴. وظایف و تیم را تعریف کنید

حالا بیایید با ایجاد Taskها و اتصال آنها به Crew ، مشخص کنیم که این Agentها چه کاری باید انجام دهند.

کد زیر را در انتهای همان فایل scale_agents.py اضافه کنید:

# --- Step 3: Define Tasks & Crew ---

sourcing_task = Task(
    description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
    expected_output="The product_id and price of the best matching product from the search_products tool.",
    agent=sourcing_agent
)

procurement_task = Task(
    description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
    expected_output="The purchase order details returned by the create_purchase_order tool.",
    agent=procurement_agent,
    context=[sourcing_task] # This task depends on the output of sourcing_task
)

def run_crew(item_description: str, quantity: int):
    crew = Crew(
        agents=[sourcing_agent, procurement_agent],
        tasks=[sourcing_task, procurement_task],
        process=Process.sequential, # Run tasks in order
        verbose=True,
        memory=False,
        planning=False,
    )
    
    result = crew.kickoff(inputs={
        "item_description": item_description,
        "quantity": quantity
    })
    return result

مفاهیم کلیدی

  • متن وظیفه : context=[sourcing_task] به CrewAI می‌گوید که وظیفه تدارکات برای ادامه به خروجی وظیفه منبع‌یابی نیاز دارد. مسئول تدارکات می‌تواند قبل از تصمیم‌گیری در مورد سفارش، ببیند که متخصص منبع‌یابی چه یافته است.
  • Process.sequential : وظایف به ترتیبی که فهرست شده‌اند اجرا می‌شوند. این مهم است زیرا وظیفه تدارکات به نتایج وظیفه منبع‌یابی بستگی دارد - شما نمی‌توانید قبل از اینکه بدانید کدام محصول را باید بخرید، سفارشی ثبت کنید.
  • memory=False / planning=False : حافظه داخلی و ویژگی‌های برنامه‌ریزی CrewAI را غیرفعال می‌کند تا اجرا برای این نسخه آزمایشی ساده و قابل پیش‌بینی باشد.

۵. برنامه‌ریز LangGraph را ایجاد کنید

تیم اجرایی «چگونگی» انجام کارها را انجام می‌دهد - جستجوی محصولات، بررسی بودجه‌ها، ثبت سفارش‌ها. اما چه کسی «چه چیزی» را تعیین می‌کند؟ این همان « عامل برنامه‌ریزی » است که با LangGraph ساخته شده است.

LangGraph جریان‌های کاری را به عنوان یک ماشین حالت مدل‌سازی می‌کند - گرافی از گره‌ها (توابع) که توسط لبه‌ها (انتقال‌ها) به هم متصل شده‌اند. حالت در این گراف جریان می‌یابد و هر گره از حالت مشترک می‌خواند و در آن می‌نویسد. این یک گزینه طبیعی برای برنامه‌ریزی جریان‌های کاری است که در آن به جریان کنترل واضح و قطعی نیاز دارید: درخواست را تجزیه و تحلیل کنید، به تیم واگذار کنید و یک گزارش تهیه کنید.

کد زیر را در انتهای همان فایل scale_agents.py اضافه کنید:

# --- Step 4: Define LangGraph Planner ---

class PlanState(TypedDict):
    objective: str
    item_description: Optional[str]
    quantity_needed: Optional[int]
    execution_result: Optional[str]
    final_report: Optional[str]

def analyze_alert(state: PlanState) -> PlanState:
    """Node 1: Extract intent from the raw objective string."""
    print("--- ANALYZING ALERT ---")
    # In a production app, you would use an LLM here to extract details.
    # For simplicity, we simulate extraction here.
    objective = state["objective"]
    
    # Hardcoded extraction for the demo
    if "Pixel 7" in objective:
        return {
            "item_description": "Pixel 7",
            "quantity_needed": 1,
        }
    return {
        "item_description": "unknown",
        "quantity_needed": 0,
    }

def delegate_to_executor(state: PlanState) -> PlanState:
    """Node 2: Call the CrewAI Execution Crew."""
    print("--- DELEGATING TO CREW ---")
    if state["item_description"] == "unknown":
        return {"execution_result": "Failed: Unknown item"}
        
    result = run_crew(
        item_description=state["item_description"],
        quantity=state["quantity_needed"]
    )
    return {"execution_result": str(result)}

def generate_report(state: PlanState) -> PlanState:
    """Node 3: Synthesize the final outcome."""
    print("--- GENERATING REPORT ---")
    return {
        "final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
    }

# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)

workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)

app = workflow.compile()

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting Multi-Agent System...")
    
    initial_state = {
        "objective": "Restock 1 Pixel 7 phones for the Tokyo office"
    }
    
    final_state = app.invoke(initial_state)
    
    print("\n=== FINAL REPORT ===")
    print(final_state["final_report"])

مفاهیم کلیدی

  • StateGraph : ماشین حالت را تعریف می‌کند. PlanState حالت تایپ‌شده‌ای است که با پردازش درخواست توسط هر گره، جمع‌آوری می‌شود.
  • گره‌ها : توابعی که وضعیت فعلی را دریافت کرده و به‌روزرسانی‌هایی را برای آن برمی‌گردانند. هر گره یک مسئولیت واحد دارد - analyze_alert هدف را استخراج می‌کند، delegate_to_executor اجرا می‌کند و generate_report نتیجه را خلاصه می‌کند.
  • لبه‌ها : جریان بین گره‌ها را تعریف می‌کنند. در این آزمایشگاه کد، ما از یک جریان خطی ساده ( analyze → delegate → report ) استفاده می‌کنیم. کارگاه کامل این را با مسیریابی شرطی گسترش می‌دهد - برای مثال، مسیریابی درخواست‌های مخرب به یک مسیر امنیتی به جای مجری.

چرا LangGraph برای برنامه‌ریز؟ CrewAI برای عامل‌های فراخوانی ابزار عالی است، اما برنامه‌ریز به جریان کنترل قطعی نیاز دارد - "اگر مخرب است، به مسیر امنیتی بروید؛ در غیر این صورت، واگذار کنید." مدل ماشین حالت LangGraph این مسیریابی را صریح و قابل آزمایش می‌کند، در حالی که CrewAI اجرای ابزار freeform زیر را مدیریت می‌کند.

۶. برنامه‌ریز و گروه را اجرا کنید

حالا بیایید برنامه‌ریز LangGraph و تیم CrewAI را با هم آزمایش کنیم.

در ترمینال Cloud Shell خود، اسکریپت را اجرا کنید:

uv run python scale_agents.py

باید خروجی را مشاهده کنید که مراحل انجام شده را نشان می‌دهد:

  1. تحلیل هشدار : گره LangGraph اجرا می‌شود.
  2. واگذاری به خدمه : گره LangGraph، خدمه CrewAI را فرا می‌خواند.
  3. اجرای CrewAI : خواهید دید که متخصص منبع‌یابی در حال جستجوی محصول است و مسئول تدارکات در حال بررسی بودجه و ایجاد سفارش خرید.
  4. گزارش نهایی : نتیجه خلاصه شده در پایان چاپ خواهد شد.

مثال خروجی (به اختصار):

Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---

  Agent: Sourcing Specialist
  Tool: search_products  Args: {'query': 'Pixel 7'}
  Tool Completed  Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]

  Agent: Procurement Officer
  Tool: check_budget  Args: {'amount': 50}
  Tool: create_purchase_order  Args: {'product_id': 'pixel-7', 'quantity': 1}
  Tool Completed  Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}

--- GENERATING REPORT ---

=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...

توجه: ممکن است در خروجی، پیام‌های [CrewAIEventsBus] Warning: Event pairing mismatch را مشاهده کنید. این‌ها هشدارهای ظاهری از ردیابی رویدادهای داخلی CrewAI هستند و می‌توان آن‌ها را با خیال راحت نادیده گرفت.

توجه: CrewAI ممکن است پیامی مبنی بر غیرفعال بودن ردیابی نمایش دهد. این پیام جنبه‌ی اطلاع‌رسانی دارد و می‌توان با خیال راحت آن را نادیده گرفت.

توجه: OMS آزمایشی محدودیت بودجه ۱۰۰ دلاری دارد. برای موفقیت مسیر شاد، تعداد را کوچک (کمتر از حدود ۲ واحد) نگه دارید. به عنوان مثال، ۱ گوشی Pixel 7 با قیمت ۵۰ دلار از بررسی بودجه عبور می‌کند، اما ۳ واحد با قیمت ۱۵۰ دلار به دلیل «بیش از بودجه» رد می‌شوند.

۷. برنامه‌ریز را در یک سرور A2A قرار دهید

برنامه‌ریز LangGraph کار می‌کند، اما درون یک فرآیند پایتون گیر افتاده است. برای اینکه توسط سایر عامل‌ها - که احتمالاً در چارچوب‌های مختلف نوشته شده‌اند یا روی ماشین‌های مختلف اجرا می‌شوند - قابل فراخوانی باشد، ما آن را در یک سرور A2A (عامل به عامل) قرار می‌دهیم.

A2A یک پروتکل مبتنی بر JSON-RPC 2.0 است که نحوه ارتباط عامل‌ها را استاندارد می‌کند. مفاهیم کلیدی:

مفهوم

هدف

کارت نماینده

فراداده JSON که قابلیت‌های عامل را توصیف می‌کند (در /.well-known/agent-card.json ارائه می‌شود)

message/send

روش JSON-RPC برای ارسال یک وظیفه به عامل

وظیفه

یک واحد کار با وضعیت (ارسال شده → در حال کار → تکمیل شده/ناموفق)

مصنوعات

خروجی‌های میانی و نهایی متصل به یک وظیفه

یک فایل جدید به a2a_planner.py ایجاد کنید:

import asyncio
import os
import uvicorn

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
                       InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError

# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app


class PlannerAgentExecutor(AgentExecutor):
    """Wraps the LangGraph planner as an A2A service."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        objective = context.get_user_input()

        # Initialize A2A task tracking
        task = context.current_task or new_task(context.message)
        await event_queue.enqueue_event(task)
        updater = TaskUpdater(event_queue, task.id, task.context_id)

        try:
            # Run the LangGraph planner synchronously in a thread
            initial_state = {"objective": objective}
            result = await asyncio.to_thread(planner_app.invoke, initial_state)
            final_report = result.get("final_report", "No report generated.")
        except Exception as e:
            final_report = f"Execution failed: {e}"

        # Send the result back as an artifact
        await updater.add_artifact(
            [Part(root=TextPart(text=final_report))],
            name="orchestration_report"
        )
        await updater.complete()

    async def cancel(self, context, event_queue):
        raise ServerError(error=InternalError(message="Cancel not supported"))


# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
    name="Retail-Planner-A2A",
    description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
    url=f"http://localhost:{port}/",
    version="1.0.0",
    default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
    capabilities=AgentCapabilities(streaming=False),
    skills=[
        AgentSkill(
            id="plan_logistics",
            name="Plan Logistics",
            description="Analyzes inventory alerts and orchestrates procurement.",
            tags=["logistics", "planning"],
            examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
        )
    ],
)

if __name__ == "__main__":
    executor = PlannerAgentExecutor()
    handler = DefaultRequestHandler(
        agent_executor=executor, task_store=InMemoryTaskStore()
    )
    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=handler
    )
    print(f"Starting A2A Planner Server on port {port}...")
    uvicorn.run(server.build(), host="0.0.0.0", port=port)

مفاهیم کلیدی

  • کارت عامل : در آدرس /.well-known/agent-card.json ارائه می‌شود - هر عاملی می‌تواند با دریافت آن URL، متوجه شود که این سرور چه کاری انجام می‌دهد. این آدرس، مهارت‌های عامل، انواع محتوای پشتیبانی‌شده و قابلیت‌های او را فهرست می‌کند.
  • AgentExecutor.execute() : تنها متدی که پیاده‌سازی می‌کنید. این متد درخواست ورودی را دریافت می‌کند، منطق عامل شما (در اینجا، برنامه‌ریز LangGraph) را اجرا می‌کند و نتایج را به عنوان خروجی ارسال می‌کند.
  • TaskUpdater : چرخه حیات وظیفه را مدیریت می‌کند - add_artifact() خروجی‌های میانی/نهایی را ارسال می‌کند، complete() وظیفه را به عنوان انجام شده علامت‌گذاری می‌کند. کتابخانه A2A تمام اتصالات JSON-RPC را مدیریت می‌کند.

سرور A2A را با اجرای آن در ترمینال آزمایش کنید:

uv run python a2a_planner.py

یک برگه Cloud Shell دیگر باز کنید (روی + کنار برگه فعلی کلیک کنید) و تأیید کنید که کارت Agent ارائه شده است:

cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool

شما باید کارت عامل JSON را ببینید. سرور A2A را برای مرحله بعدی در ترمینال اول در حال اجرا نگه دارید.

۸. اتاق کنترل ADK را بسازید

در بالای این مجموعه، اتاق کنترل قرار دارد که با ADK (کیت توسعه عامل گوگل) ساخته شده است. این اتاق درخواست کاربر را دریافت می‌کند، از طریق A2A به برنامه‌ریز ارجاع می‌دهد، نتیجه را ارزیابی می‌کند و - به طور بحرانی - برنامه‌ریزی مجدد در صورت شکست (CUJ 2) را انجام می‌دهد.

ADK عناصر اولیه عامل مانند BaseAgent ، LlmAgent و InMemoryRunner را فراهم می‌کند. ما BaseAgent زیرکلاس می‌کنیم تا منطق تنظیم سفارشی - فراخوانی‌های A2A، طبقه‌بندی گزارش و برنامه‌ریزی مجدد پویا با یک زیرعامل LlmAgent - را بنویسیم.

یک فایل جدید control_room.py ایجاد کنید:

import asyncio
import uuid
import os
import httpx

from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator

A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")


def _classify_report(report: str) -> tuple[bool, bool]:
    """Return (is_success, should_retry) for a planner report."""
    normalized = (report or "").replace("*", "").strip().lower()

    success_markers = [
        "status: success", "'status': 'success'",
        "outcome: success", "po_id", "successfully ordered",
    ]
    retryable_markers = ["not found", "discontinued", "no inventory",
                         "unknown item"]
    terminal_markers = [
        "status: failed", "over budget", "not issued",
        "procurement failed",
    ]

    if any(m in normalized for m in terminal_markers):
        return False, False      # Failed, don't retry
    if any(m in normalized for m in retryable_markers):
        return False, True       # Failed, but retryable
    if any(m in normalized for m in success_markers):
        return True, False       # Success!
    return False, False          # Unknown → treat as failure


class ControlRoomAgent(BaseAgent):
    """Top-level orchestrator: delegates via A2A, re-plans on failure."""

    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Extract user input from session events
        user_msg = ""
        if ctx.session and ctx.session.events:
            for evt in reversed(ctx.session.events):
                if evt.content and evt.content.role == "user":
                    user_msg = evt.content.parts[0].text
                    break

        max_attempts = 2
        current_objective = user_msg
        final_report = "No report returned."

        for attempt in range(1, max_attempts + 1):
            print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
            print(f"    Objective: {current_objective}")

            # Build the A2A JSON-RPC request
            payload = {
                "jsonrpc": "2.0",
                "id": f"req-{attempt}",
                "method": "message/send",
                "params": {
                    "message": {
                        "message_id": str(uuid.uuid4()),
                        "parts": [{"text": current_objective}],
                        "role": "user"
                    }
                }
            }

            try:
                async with httpx.AsyncClient(timeout=300.0) as client:
                    resp = await client.post(
                        f"{A2A_SERVER_URL}/", json=payload
                    )
                    data = resp.json()
                    if "error" in data:
                        final_report = data["error"].get("message", "Unknown A2A error")
                    elif "result" in data:
                        artifacts = data["result"].get("artifacts", [])
                        if artifacts and "parts" in artifacts[-1]:
                            parts = artifacts[-1]["parts"]
                            if parts and "text" in parts[0]:
                                final_report = parts[0]["text"]
            except Exception as e:
                final_report = f"Connection error: {e}"

            print(f"\n--- Report ---\n{final_report}\n")
            is_success, should_retry = _classify_report(final_report)

            if is_success:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=final_report)]
                    ),
                )
                return

            # --- Re-planning (CUJ 2) ---
            if should_retry and attempt < max_attempts:
                print("--- Re-planning with LLM ---")
                replanner = LlmAgent(
                    name=f"replanner_{attempt}",
                    model="gemini-2.5-flash",
                    instruction=(
                        "You are a strategic re-planner. The previous request "
                        "failed. Rewrite the objective to be broader or more "
                        "likely to succeed. Output ONLY the new objective text."
                    ),
                )

                # Run the re-planner as a sub-agent
                child_ctx = InvocationContext(
                    invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
                    agent=replanner,
                    session=ctx.session,
                    session_service=ctx.session_service,
                    run_config=ctx.run_config or RunConfig(),
                )
                child_ctx.user_content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=(
                        f"Original Objective: {current_objective}\n"
                        f"Failure Reason: {final_report}\n"
                        "Please broaden the search."
                    ))]
                )

                new_objective = ""
                async for event in replanner.run_async(child_ctx):
                    if event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, "text") and part.text:
                                new_objective += part.text

                current_objective = new_objective.strip()
                print(f"New objective: {current_objective}")
                continue

            # Terminal failure (not retryable)
            if not should_retry:
                yield Event(
                    author=self.name,
                    invocation_id=ctx.invocation_id,
                    content=types.Content(
                        role="model",
                        parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
                    ),
                )
                return

        # Max attempts exhausted
        yield Event(
            author=self.name,
            invocation_id=ctx.invocation_id,
            content=types.Content(
                role="model",
                parts=[types.Part.from_text(
                    text=f"FAILED after {max_attempts} attempts: {final_report}"
                )]
            ),
        )


async def main():
    prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
    print(f"Starting Control Room with: {prompt}\n")

    agent = ControlRoomAgent(name="control_room")
    runner = InMemoryRunner(app_name="control_room", agent=agent)
    session = await runner.session_service.create_session(
        app_name="control_room", user_id="admin"
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=prompt)]
    )

    async for event in runner.run_async(
        user_id="admin", session_id=session.id, new_message=content
    ):
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    print(f"Result: {part.text}")

    print("\n=== CONTROL ROOM COMPLETE ===")


if __name__ == "__main__":
    asyncio.run(main())

مفاهیم کلیدی

  • BaseAgent : ADK اولیه برای عامل‌های سفارشی. شما آن را زیرکلاس می‌کنید و _run_async_impl را برای نوشتن منطق تنظیم ناهمگام دلخواه بازنویسی می‌کنید - در اینجا، فراخوانی A2A + طبقه‌بندی + حلقه برنامه‌ریزی مجدد.
  • فراخوانی A2A JSON-RPC : اتاق کنترل یک درخواست message/send استاندارد را با استفاده از httpx به سرور A2A برنامه‌ریز ارسال می‌کند و پاسخ JSON-RPC را برای استخراج گزارش نهایی تجزیه و تحلیل می‌کند.
  • _classify_report() : طبقه‌بندی ساده مبتنی بر کلمه کلیدی که موفقیت، شکست قابل امتحان مجدد یا شکست ترمینال را از متن گزارش تعیین می‌کند. این حلقه برنامه‌ریزی مجدد را هدایت می‌کند.
  • فراخوانی زیرعامل : برای برنامه‌ریزی مجدد، اتاق کنترل یک LlmAgent ایجاد می‌کند و با ساخت یک InvocationContext فرزند و فراخوانی replanner.run_async(child_ctx) آن را اجرا می‌کند. این به شما امکان می‌دهد تا به صورت پویا عامل‌های LLM را در منطق ارکستراسیون سفارشی بچرخانید.
  • InMemoryRunner : عامل را به صورت محلی با یک حافظه نشست درون حافظه اجرا می‌کند. در محیط عملیاتی، شما adk deploy برای استقرار در Vertex AI Agent Engine استفاده خواهید کرد.

۹. اجرای فول استک

حالا بیایید سیستم سه لایه کامل را آزمایش کنیم: اتاق کنترل ADK → A2A → LangGraph Planner → CrewAI Crew.

از دومین تب Cloud Shell که قبلاً باز کردید استفاده کنید (یا برای باز شدن تب جدید روی + کلیک کنید) و Control Room را اجرا کنید. مهم: هر تب Cloud Shell جلسه پوسته مخصوص به خود را دارد. شما باید متغیرهای پروژه و محیط را دوباره تنظیم کنید:

cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py

شما باید جریان کامل ارکستراسیون را ببینید:

  1. اتاق کنترل درخواست را دریافت می‌کند و یک message/send تماس JSON-RPC به سرور A2A ارسال می‌کند.
  2. سرور A2A درخواست را دریافت کرده و برنامه‌ریز LangGraph را فراخوانی می‌کند.
  3. برنامه‌ریز LangGraph درخواست را تجزیه و تحلیل می‌کند و آن را به تیم CrewAI واگذار می‌کند.
  4. خدمه CrewAI، عوامل تامین و تدارکات را اداره می‌کنند.
  5. نتیجه به اتاق کنترل برمی‌گردد

سفرهای بحرانی کاربر (CUJ)

سعی کنید رشته prompt را در control_room.py تغییر دهید تا این سناریوها را آزمایش کنید:

سی یو جی

سریع

چه اتفاقی می‌افتد؟

۱. مسیر شاد

Restock 1 Pixel 7 phones for the Tokyo office

جستجو -> بررسی بودجه -> سفارش خرید (موفق). به صورت سرتاسری کار می‌کند.

۲. برنامه‌ریزی مجدد

Order 1 unit of the discontinued XR-7000 Quantum Holographic Display

برنامه‌ریز عبارت "ناموفق: مورد ناشناخته" را برمی‌گرداند. اتاق کنترل این موضوع را تشخیص می‌دهد و یک برنامه‌ریز مجدد LlmAgent را برای گسترش جستجو فراخوانی می‌کند. هر دو تلاش با شکست مواجه می‌شوند (برنامه‌ریز کدنویسی شده فقط "Pixel 7" را تشخیص می‌دهد)، اما شما مکانیسم کامل برنامه‌ریزی مجدد را در عمل مشاهده خواهید کرد.

برای آزمایش CUJ 2 (برنامه‌ریزی مجدد) ، prompt در control_room.py را به صورت زیر تغییر دهید:

prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"

برنامه‌ریزِ کدِ سخت‌شده این مورد را تشخیص نمی‌دهد و عبارت "Failed: Unknown item" را برمی‌گرداند. اتاق کنترل، خرابی را تشخیص می‌دهد، به صورت پویا یک برنامه‌ریزِ مجددِ LlmAgent ایجاد می‌کند و با هدف وسیع‌تری دوباره تلاش می‌کند. از آنجا که برنامه‌ریز فقط "Pixel 7" را تشخیص می‌دهد، تلاش مجدد نیز شکست خواهد خورد - اما حلقه کامل برنامه‌ریزی مجدد را در عمل مشاهده خواهید کرد. خروجی نهایی FAILED after 2 attempts: ...

۱۰. تمیز کردن

برای جلوگیری از هزینه‌های مداوم برای حساب Google Cloud خود، می‌توانید منابع ایجاد شده در طول این codelab را حذف کنید. می‌توانید به سادگی دایرکتوری ایجاد شده را حذف کنید:

cd ..
rm -rf scale-agents

۱۱. تبریک

تبریک! شما با موفقیت یک سیستم ارکستراسیون چندعاملی با استفاده از CrewAI، LangGraph، پروتکل A2A و ​​ADK ساختید.

آنچه آموخته‌اید

  • نحوه تعریف ابزارها برای اپراتورها با استفاده از دکوراتور @tool در CrewAI.
  • چگونه می‌توان عامل‌های تخصصی با نقش‌ها، ابزارها و اهداف متمایز ایجاد کرد؟
  • چگونه می‌توان عوامل را به یک گروه ترتیبی با وابستگی‌های وظیفه‌ای متصل کرد.
  • چگونه با LangGraph یک برنامه‌ریز ماشین حالت بسازیم که به خدمه محول شود.
  • نحوه نمایش برنامه‌ریز به عنوان یک سرویس A2A با AgentCard و AgentExecutor .
  • چگونه یک ADK BaseAgent سفارشی بسازیم که از طریق A2A واگذار شود و در صورت عدم موفقیت با فراخوانی یک زیرعامل LlmAgent دوباره برنامه‌ریزی کند.
  • چرا تفکیک برنامه‌ریزی، اجرا و هماهنگی در چارچوب‌های مختلف، به شما قابلیت ماژولار بودن و انعطاف‌پذیری می‌دهد؟

رفتن بیشتر

کارگاه کامل، این سیستم را با موارد زیر گسترش می‌دهد:

  • داشبورد بلادرنگ - پخش SSE برای تجسم پیشرفت چندعاملی
  • سپر هویت - امنیت مبتنی بر IAM که اقدامات مخرب را در سطح زیرساخت مسدود می‌کند، نه در سطح اعلان
  • موتور عامل هوش مصنوعی ورتکس - عامل ADK را با استفاده از adk deploy در زیرساخت ابری مدیریت‌شده مستقر کنید

اسناد مرجع