۱. مقدمه
در این آزمایشگاه کد، شما یاد خواهید گرفت که چگونه یک سیستم ارکستراسیون چند عاملی با استفاده از CrewAI ، LangGraph ، پروتکل A2A و ADK (کیت توسعه عامل) بسازید. شما سیستمی ایجاد خواهید کرد که در آن یک اتاق کنترل ADK برنامهریزی را به یک برنامهریز LangGraph واگذار میکند، که وظایف را به یک تیم اجرایی CrewAI - که همگی از طریق A2A به هم متصل هستند - ارسال میکند تا یک سناریوی انبارداری مجدد خردهفروشی را مدیریت کند.
ارکستراسیون چند عاملی چیست؟
در یک سیستم چندعاملی ، چندین عامل هوش مصنوعی متخصص برای انجام وظایفی که برای یک عامل واحد بسیار پیچیده است، با یکدیگر همکاری میکنند. به جای اینکه یک عامل یکپارچه همه کارها را انجام دهد، شما مسئله را به نقشهایی - یک برنامهریز و یک مجری - تجزیه میکنید که هر کدام ابزار و تخصص خاص خود را دارند.
این نشان دهنده نحوه کار سازمانهای انسانی است: یک مدیر، استراتژی را به تحلیلگران و اجرا را به متخصصان واگذار میکند. مزایای این کار عبارتند از:
- تفکیک دغدغهها : هر عامل بر کاری که در آن بهترین است تمرکز میکند
- انعطافپذیری چارچوب : استفاده از بهترین چارچوب برای هر نقش (LangGraph برای منطق برنامهریزی، CrewAI برای اجرای ابزار)
- مقیاسپذیری : اضافه کردن عوامل تخصصی بدون تغییر کل سیستم
سناریو
وقتی کاربری درخواست موجودی مجدد مانند «۱ گوشی Pixel 7 برای دفتر توکیو موجودی مجدد» ارسال میکند، سیستم:
- برنامهریز LangGraph درخواست را تجزیه و تحلیل کرده و کالا و مقدار آن را استخراج میکند.
- برنامهریز، اجرا را به گروه اجرایی CrewAI واگذار میکند.
- یک متخصص منابع انسانی با استفاده از ابزارها، کاتالوگ محصولات را جستجو میکند.
- یک نماینده مسئول تدارکات، بودجه را تأیید میکند و با استفاده از ابزارها، سفارش خرید را ثبت میکند.
- نتیجه به برنامهریز برمیگردد که گزارش نهایی را تولید میکند.
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
پشته فنی
لایه | فناوری | نقش |
برنامهریزی | لانگگراف | دستگاه حالت که قصد را تجزیه و تحلیل میکند، درخواستها را مسیریابی میکند و گزارش تولید میکند |
اعدام | CrewAI | عاملهای مبتنی بر نقش که ابزارها را به صورت متوالی فراخوانی میکنند |
کارشناسی ارشد حقوق | جمینی در Vertex AI | استدلال عامل قدرت و انتخاب ابزار |
ارتباط بین عاملی | پروتکل A2A | پل JSON-RPC 2.0 تا عاملها از چارچوبهای مختلف بتوانند با هم صحبت کنند |
ارکستراتور سطح بالا | ADK (عامل پایه) | درخواستها و نمایندگان را از طریق A2A دریافت میکند و در صورت عدم موفقیت، برنامهریزی مجدد انجام میدهد. |
آن را در عمل ببینید: در صورت موجود بودن، سیستم تولید کامل را در https://scale-control-room-761793285222.us-central1.run.app امتحان کنید - این سیستم آنچه را که در اینجا خواهید ساخت با یک داشبورد بلادرنگ، پروتکل A2A و امنیت IAM گسترش میدهد.
کاری که انجام خواهید داد
- ابزارهای سفارشی برای استفاده نمایندگان تعریف کنید.
- با CrewAI ماموران متخصص بسازید.
- یک برنامهریز ماشین حالت با LangGraph ایجاد کنید.
- جریان بین برنامهریز و تیم اجرا را هماهنگ کنید.
- برنامهریز را برای ارتباط بین فریمورکی در یک سرور پروتکل A2A قرار دهید.
- یک اتاق کنترل ADK سطح بالا بسازید که از طریق A2A تفویض اختیار کند و در صورت شکست، دوباره برنامهریزی کند.
آنچه نیاز دارید
- یک مرورگر وب مانند کروم
- یک پروژه گوگل کلود با قابلیت پرداخت صورتحساب
این آزمایشگاه کد برای توسعهدهندگان سطح متوسط است که با پایتون و مفاهیم پایه LLM آشنا هستند.
مدت زمان تخمینی: ۳۵ دقیقه .
تخمین هزینه: منابع ایجاد شده در این آزمایشگاه کد باید کمتر از ۱ دلار هزینه داشته باشند.
۲. قبل از شروع
ایجاد یک پروژه ابری گوگل
- در کنسول گوگل کلود ، در صفحه انتخاب پروژه، یک پروژه گوگل کلود را انتخاب یا ایجاد کنید .
- مطمئن شوید که صورتحساب برای پروژه ابری شما فعال است. یاد بگیرید که چگونه بررسی کنید که آیا صورتحساب در یک پروژه فعال است یا خیر .
شروع پوسته ابری
Cloud Shell یک محیط خط فرمان است که در Google Cloud اجرا میشود و ابزارهای لازم از قبل روی آن بارگذاری شدهاند.
- روی فعال کردن Cloud Shell در بالای کنسول Google Cloud کلیک کنید.
- پس از اتصال به Cloud Shell، احراز هویت خود را تأیید کنید:
gcloud auth list - تأیید کنید که پروژه شما پیکربندی شده است:
gcloud config get project - اگر پروژه شما مطابق انتظار تنظیم نشده است، آن را تنظیم کنید:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
فعال کردن APIها
برای فعال کردن Vertex AI API، این دستور را اجرا کنید:
gcloud services enable aiplatform.googleapis.com
توجه: Cloud Shell به طور خودکار با حساب Google Cloud شما احراز هویت میشود. اگر این codelab را خارج از Cloud Shell اجرا میکنید، برای احراز هویت با Vertex AI باید gcloud auth application-default login اجرا کنید.
محیط خود را تنظیم کنید
در Cloud Shell، یک دایرکتوری جدید برای پروژه خود ایجاد کنید و به آن بروید:
mkdir scale-agents
cd scale-agents
uv را نصب کنید و از آن برای نصب بستههای مورد نیاز استفاده کنید:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
متغیرهای محیطی را برای Vertex AI تنظیم کنید:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
۳. ابزارها و عاملها را تعریف کنید
در یک سیستم چندعاملی، عاملها به ابزارهایی برای تعامل با جهان و نقشهای خاصی برای دانستن اینکه چه کاری باید انجام دهند، نیاز دارند.
فایلی با نام scale_agents.py ایجاد کنید و کد زیر را به آن اضافه کنید. این کد، ایمپورتها، ابزارهای شبیهسازی و عوامل CrewAI را تنظیم میکند.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
مفاهیم کلیدی
-
@tooldecorator : CrewAI از این برای تبدیل توابع معمولی پایتون به ابزارهایی که LLMها بتوانند آنها را بفهمند و فراخوانی کنند، استفاده میکند. نکات نوع تابع و رشته سند آن برای تولید یک طرحواره ابزار که LLM بتواند آن را بفهمد، استفاده میشوند. - نقش، هدف و پیشینه : اینها شخصیت عامل را تعریف میکنند و استدلال LLM آن را هدایت میکنند. پیشینه فقط متن خوشطعم نیست - جملهی «شما همیشه کاتالوگ را جستجو میکنید» عامل را تشویق میکند تا از ابزارهای خود به جای پاسخهای توهمزا استفاده کند.
-
reasoning=False: استدلال گسترده را غیرفعال میکند، بنابراین عامل به جای تلاش برای پاسخ مستقیم، حلقه استاندارد فراخوانی ابزار را دنبال میکند. -
allow_delegation=False: هر عامل را به جای واگذاری کار به سایر عاملها، روی ابزارهای اختصاص داده شده به خودش متمرکز نگه میدارد.
چرا به جای یک نماینده، دو نماینده داریم؟ هر نماینده ابزار و وظیفه متفاوتی دارد. متخصص تامین فقط محصولات را جستجو میکند؛ مسئول تدارکات فقط بودجهها و سفارشات را مدیریت میکند. این تفکیک وظایف به این معنی است که هر نماینده یک دستورالعمل متمرکز و یک مجموعه ابزار کوچک و مرتبط دارد - که منجر به رفتار LLM قابل اعتمادتری نسبت به یک نماینده واحد میشود که همه چیز را مدیریت میکند.
۴. وظایف و تیم را تعریف کنید
حالا بیایید با ایجاد Taskها و اتصال آنها به Crew ، مشخص کنیم که این Agentها چه کاری باید انجام دهند.
کد زیر را در انتهای همان فایل scale_agents.py اضافه کنید:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
مفاهیم کلیدی
- متن وظیفه :
context=[sourcing_task]به CrewAI میگوید که وظیفه تدارکات برای ادامه به خروجی وظیفه منبعیابی نیاز دارد. مسئول تدارکات میتواند قبل از تصمیمگیری در مورد سفارش، ببیند که متخصص منبعیابی چه یافته است. - Process.sequential : وظایف به ترتیبی که فهرست شدهاند اجرا میشوند. این مهم است زیرا وظیفه تدارکات به نتایج وظیفه منبعیابی بستگی دارد - شما نمیتوانید قبل از اینکه بدانید کدام محصول را باید بخرید، سفارشی ثبت کنید.
-
memory=False/planning=False: حافظه داخلی و ویژگیهای برنامهریزی CrewAI را غیرفعال میکند تا اجرا برای این نسخه آزمایشی ساده و قابل پیشبینی باشد.
۵. برنامهریز LangGraph را ایجاد کنید
تیم اجرایی «چگونگی» انجام کارها را انجام میدهد - جستجوی محصولات، بررسی بودجهها، ثبت سفارشها. اما چه کسی «چه چیزی» را تعیین میکند؟ این همان « عامل برنامهریزی » است که با LangGraph ساخته شده است.
LangGraph جریانهای کاری را به عنوان یک ماشین حالت مدلسازی میکند - گرافی از گرهها (توابع) که توسط لبهها (انتقالها) به هم متصل شدهاند. حالت در این گراف جریان مییابد و هر گره از حالت مشترک میخواند و در آن مینویسد. این یک گزینه طبیعی برای برنامهریزی جریانهای کاری است که در آن به جریان کنترل واضح و قطعی نیاز دارید: درخواست را تجزیه و تحلیل کنید، به تیم واگذار کنید و یک گزارش تهیه کنید.
کد زیر را در انتهای همان فایل scale_agents.py اضافه کنید:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
مفاهیم کلیدی
- StateGraph : ماشین حالت را تعریف میکند.
PlanStateحالت تایپشدهای است که با پردازش درخواست توسط هر گره، جمعآوری میشود. - گرهها : توابعی که وضعیت فعلی را دریافت کرده و بهروزرسانیهایی را برای آن برمیگردانند. هر گره یک مسئولیت واحد دارد -
analyze_alertهدف را استخراج میکند،delegate_to_executorاجرا میکند وgenerate_reportنتیجه را خلاصه میکند. - لبهها : جریان بین گرهها را تعریف میکنند. در این آزمایشگاه کد، ما از یک جریان خطی ساده (
analyze → delegate → report) استفاده میکنیم. کارگاه کامل این را با مسیریابی شرطی گسترش میدهد - برای مثال، مسیریابی درخواستهای مخرب به یک مسیر امنیتی به جای مجری.
چرا LangGraph برای برنامهریز؟ CrewAI برای عاملهای فراخوانی ابزار عالی است، اما برنامهریز به جریان کنترل قطعی نیاز دارد - "اگر مخرب است، به مسیر امنیتی بروید؛ در غیر این صورت، واگذار کنید." مدل ماشین حالت LangGraph این مسیریابی را صریح و قابل آزمایش میکند، در حالی که CrewAI اجرای ابزار freeform زیر را مدیریت میکند.
۶. برنامهریز و گروه را اجرا کنید
حالا بیایید برنامهریز LangGraph و تیم CrewAI را با هم آزمایش کنیم.
در ترمینال Cloud Shell خود، اسکریپت را اجرا کنید:
uv run python scale_agents.py
باید خروجی را مشاهده کنید که مراحل انجام شده را نشان میدهد:
- تحلیل هشدار : گره LangGraph اجرا میشود.
- واگذاری به خدمه : گره LangGraph، خدمه CrewAI را فرا میخواند.
- اجرای CrewAI : خواهید دید که متخصص منبعیابی در حال جستجوی محصول است و مسئول تدارکات در حال بررسی بودجه و ایجاد سفارش خرید.
- گزارش نهایی : نتیجه خلاصه شده در پایان چاپ خواهد شد.
مثال خروجی (به اختصار):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
توجه: ممکن است در خروجی، پیامهای [CrewAIEventsBus] Warning: Event pairing mismatch را مشاهده کنید. اینها هشدارهای ظاهری از ردیابی رویدادهای داخلی CrewAI هستند و میتوان آنها را با خیال راحت نادیده گرفت.
توجه: CrewAI ممکن است پیامی مبنی بر غیرفعال بودن ردیابی نمایش دهد. این پیام جنبهی اطلاعرسانی دارد و میتوان با خیال راحت آن را نادیده گرفت.
توجه: OMS آزمایشی محدودیت بودجه ۱۰۰ دلاری دارد. برای موفقیت مسیر شاد، تعداد را کوچک (کمتر از حدود ۲ واحد) نگه دارید. به عنوان مثال، ۱ گوشی Pixel 7 با قیمت ۵۰ دلار از بررسی بودجه عبور میکند، اما ۳ واحد با قیمت ۱۵۰ دلار به دلیل «بیش از بودجه» رد میشوند.
۷. برنامهریز را در یک سرور A2A قرار دهید
برنامهریز LangGraph کار میکند، اما درون یک فرآیند پایتون گیر افتاده است. برای اینکه توسط سایر عاملها - که احتمالاً در چارچوبهای مختلف نوشته شدهاند یا روی ماشینهای مختلف اجرا میشوند - قابل فراخوانی باشد، ما آن را در یک سرور A2A (عامل به عامل) قرار میدهیم.
A2A یک پروتکل مبتنی بر JSON-RPC 2.0 است که نحوه ارتباط عاملها را استاندارد میکند. مفاهیم کلیدی:
مفهوم | هدف |
کارت نماینده | فراداده JSON که قابلیتهای عامل را توصیف میکند (در |
| روش JSON-RPC برای ارسال یک وظیفه به عامل |
وظیفه | یک واحد کار با وضعیت (ارسال شده → در حال کار → تکمیل شده/ناموفق) |
مصنوعات | خروجیهای میانی و نهایی متصل به یک وظیفه |
یک فایل جدید به a2a_planner.py ایجاد کنید:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
مفاهیم کلیدی
- کارت عامل : در آدرس
/.well-known/agent-card.jsonارائه میشود - هر عاملی میتواند با دریافت آن URL، متوجه شود که این سرور چه کاری انجام میدهد. این آدرس، مهارتهای عامل، انواع محتوای پشتیبانیشده و قابلیتهای او را فهرست میکند. -
AgentExecutor.execute(): تنها متدی که پیادهسازی میکنید. این متد درخواست ورودی را دریافت میکند، منطق عامل شما (در اینجا، برنامهریز LangGraph) را اجرا میکند و نتایج را به عنوان خروجی ارسال میکند. -
TaskUpdater: چرخه حیات وظیفه را مدیریت میکند -add_artifact()خروجیهای میانی/نهایی را ارسال میکند،complete()وظیفه را به عنوان انجام شده علامتگذاری میکند. کتابخانه A2A تمام اتصالات JSON-RPC را مدیریت میکند.
سرور A2A را با اجرای آن در ترمینال آزمایش کنید:
uv run python a2a_planner.py
یک برگه Cloud Shell دیگر باز کنید (روی + کنار برگه فعلی کلیک کنید) و تأیید کنید که کارت Agent ارائه شده است:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
شما باید کارت عامل JSON را ببینید. سرور A2A را برای مرحله بعدی در ترمینال اول در حال اجرا نگه دارید.
۸. اتاق کنترل ADK را بسازید
در بالای این مجموعه، اتاق کنترل قرار دارد که با ADK (کیت توسعه عامل گوگل) ساخته شده است. این اتاق درخواست کاربر را دریافت میکند، از طریق A2A به برنامهریز ارجاع میدهد، نتیجه را ارزیابی میکند و - به طور بحرانی - برنامهریزی مجدد در صورت شکست (CUJ 2) را انجام میدهد.
ADK عناصر اولیه عامل مانند BaseAgent ، LlmAgent و InMemoryRunner را فراهم میکند. ما BaseAgent زیرکلاس میکنیم تا منطق تنظیم سفارشی - فراخوانیهای A2A، طبقهبندی گزارش و برنامهریزی مجدد پویا با یک زیرعامل LlmAgent - را بنویسیم.
یک فایل جدید control_room.py ایجاد کنید:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
مفاهیم کلیدی
-
BaseAgent: ADK اولیه برای عاملهای سفارشی. شما آن را زیرکلاس میکنید و_run_async_implرا برای نوشتن منطق تنظیم ناهمگام دلخواه بازنویسی میکنید - در اینجا، فراخوانی A2A + طبقهبندی + حلقه برنامهریزی مجدد. - فراخوانی A2A JSON-RPC : اتاق کنترل یک درخواست
message/sendاستاندارد را با استفاده ازhttpxبه سرور A2A برنامهریز ارسال میکند و پاسخ JSON-RPC را برای استخراج گزارش نهایی تجزیه و تحلیل میکند. -
_classify_report(): طبقهبندی ساده مبتنی بر کلمه کلیدی که موفقیت، شکست قابل امتحان مجدد یا شکست ترمینال را از متن گزارش تعیین میکند. این حلقه برنامهریزی مجدد را هدایت میکند. - فراخوانی زیرعامل : برای برنامهریزی مجدد، اتاق کنترل یک
LlmAgentایجاد میکند و با ساخت یکInvocationContextفرزند و فراخوانیreplanner.run_async(child_ctx)آن را اجرا میکند. این به شما امکان میدهد تا به صورت پویا عاملهای LLM را در منطق ارکستراسیون سفارشی بچرخانید. -
InMemoryRunner: عامل را به صورت محلی با یک حافظه نشست درون حافظه اجرا میکند. در محیط عملیاتی، شماadk deployبرای استقرار در Vertex AI Agent Engine استفاده خواهید کرد.
۹. اجرای فول استک
حالا بیایید سیستم سه لایه کامل را آزمایش کنیم: اتاق کنترل ADK → A2A → LangGraph Planner → CrewAI Crew.
از دومین تب Cloud Shell که قبلاً باز کردید استفاده کنید (یا برای باز شدن تب جدید روی + کلیک کنید) و Control Room را اجرا کنید. مهم: هر تب Cloud Shell جلسه پوسته مخصوص به خود را دارد. شما باید متغیرهای پروژه و محیط را دوباره تنظیم کنید:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
شما باید جریان کامل ارکستراسیون را ببینید:
- اتاق کنترل درخواست را دریافت میکند و یک
message/sendتماس JSON-RPC به سرور A2A ارسال میکند. - سرور A2A درخواست را دریافت کرده و برنامهریز LangGraph را فراخوانی میکند.
- برنامهریز LangGraph درخواست را تجزیه و تحلیل میکند و آن را به تیم CrewAI واگذار میکند.
- خدمه CrewAI، عوامل تامین و تدارکات را اداره میکنند.
- نتیجه به اتاق کنترل برمیگردد
سفرهای بحرانی کاربر (CUJ)
سعی کنید رشته prompt را در control_room.py تغییر دهید تا این سناریوها را آزمایش کنید:
سی یو جی | سریع | چه اتفاقی میافتد؟ |
۱. مسیر شاد | | جستجو -> بررسی بودجه -> سفارش خرید (موفق). به صورت سرتاسری کار میکند. |
۲. برنامهریزی مجدد | | برنامهریز عبارت "ناموفق: مورد ناشناخته" را برمیگرداند. اتاق کنترل این موضوع را تشخیص میدهد و یک برنامهریز مجدد |
برای آزمایش CUJ 2 (برنامهریزی مجدد) ، prompt در control_room.py را به صورت زیر تغییر دهید:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
برنامهریزِ کدِ سختشده این مورد را تشخیص نمیدهد و عبارت "Failed: Unknown item" را برمیگرداند. اتاق کنترل، خرابی را تشخیص میدهد، به صورت پویا یک برنامهریزِ مجددِ LlmAgent ایجاد میکند و با هدف وسیعتری دوباره تلاش میکند. از آنجا که برنامهریز فقط "Pixel 7" را تشخیص میدهد، تلاش مجدد نیز شکست خواهد خورد - اما حلقه کامل برنامهریزی مجدد را در عمل مشاهده خواهید کرد. خروجی نهایی FAILED after 2 attempts: ...
۱۰. تمیز کردن
برای جلوگیری از هزینههای مداوم برای حساب Google Cloud خود، میتوانید منابع ایجاد شده در طول این codelab را حذف کنید. میتوانید به سادگی دایرکتوری ایجاد شده را حذف کنید:
cd ..
rm -rf scale-agents
۱۱. تبریک
تبریک! شما با موفقیت یک سیستم ارکستراسیون چندعاملی با استفاده از CrewAI، LangGraph، پروتکل A2A و ADK ساختید.
آنچه آموختهاید
- نحوه تعریف ابزارها برای اپراتورها با استفاده از دکوراتور
@toolدر CrewAI. - چگونه میتوان عاملهای تخصصی با نقشها، ابزارها و اهداف متمایز ایجاد کرد؟
- چگونه میتوان عوامل را به یک گروه ترتیبی با وابستگیهای وظیفهای متصل کرد.
- چگونه با LangGraph یک برنامهریز ماشین حالت بسازیم که به خدمه محول شود.
- نحوه نمایش برنامهریز به عنوان یک سرویس A2A با
AgentCardوAgentExecutor. - چگونه یک ADK
BaseAgentسفارشی بسازیم که از طریق A2A واگذار شود و در صورت عدم موفقیت با فراخوانی یک زیرعاملLlmAgentدوباره برنامهریزی کند. - چرا تفکیک برنامهریزی، اجرا و هماهنگی در چارچوبهای مختلف، به شما قابلیت ماژولار بودن و انعطافپذیری میدهد؟
رفتن بیشتر
کارگاه کامل، این سیستم را با موارد زیر گسترش میدهد:
- داشبورد بلادرنگ - پخش SSE برای تجسم پیشرفت چندعاملی
- سپر هویت - امنیت مبتنی بر IAM که اقدامات مخرب را در سطح زیرساخت مسدود میکند، نه در سطح اعلان
- موتور عامل هوش مصنوعی ورتکس - عامل ADK را با استفاده از
adk deployدر زیرساخت ابری مدیریتشده مستقر کنید