1. مقدمة
في هذا الدرس التطبيقي حول الترميز، ستتعلّم كيفية إنشاء نظام تنسيق متعدد الوكلاء باستخدام CrewAI وLangGraph وبروتوكول A2A وADK (حزمة تطوير الوكلاء). ستنشئ نظامًا يفوّض فيه مركز تحكّم ADK عملية التخطيط إلى أداة تخطيط LangGraph، والتي ترسل المهام إلى فريق تنفيذ CrewAI، وكل ذلك يتم ربطه من خلال A2A، وذلك للتعامل مع سيناريو إعادة تخزين مستودع البيع بالتجزئة.
ما هي عملية تنسيق الوكلاء المتعدّدين؟
في نظام يستند إلى عدّة وكلاء، يتعاون عدّة وكلاء ذكاء اصطناعي متخصّصين لإنجاز مهام معقّدة جدًا بالنسبة إلى وكيل واحد. بدلاً من أن يقوم وكيل واحد ضخم بكل شيء، يمكنك تقسيم المشكلة إلى أدوار، مثل مخطِّط ومنفِّذ، ولكل منهما أدواته وخبرته.
ويشبه ذلك طريقة عمل المؤسسات البشرية: يفوّض المدير الاستراتيجية إلى المحلّلين والتنفيذ إلى المتخصّصين. تشمل المزايا ما يلي:
- فصل الاهتمامات: يركّز كل وكيل على ما يتقنه
- مرونة إطار العمل: استخدام أفضل إطار عمل لكل دور (LangGraph لمنطق التخطيط، وCrewAI لتنفيذ الأدوات)
- قابلية التوسّع: إضافة وكلاء متخصصين بدون تغيير النظام بأكمله
السيناريو
عندما يرسل مستخدم طلبًا بإعادة تخزين منتج، مثل "إعادة تخزين هاتف Pixel 7 واحد لمكتب طوكيو"، يقوم النظام بما يلي:
- يحلّل LangGraph Planner الطلب ويستخرج السلعة والكمية
- يُفوّض "المخطِّط" مهمة التنفيذ إلى فريق التنفيذ في CrewAI
- يبحث وكيل متخصص في تحديد المصادر في قائمة المنتجات باستخدام أدوات.
- يتحقّق وكيل موظف المشتريات من الميزانية ويصدر أمر شراء باستخدام الأدوات.
- تنتقل النتيجة مرة أخرى إلى أداة التخطيط، التي تنشئ تقريرًا نهائيًا
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
حزمة التكنولوجيا
طبقة | تكنولوجيا | الدور |
التخطيط | LangGraph | آلة الحالة التي تحلّل النية وتوجّه الطلبات وتنشئ التقارير |
التنفيذ | CrewAI | وكلاء مستندون إلى الأدوار يستدعون الأدوات بالتسلسل |
LLM | Gemini على Vertex AI | تعزيز قدرة الوكيل على الاستدلال واختيار الأدوات |
التواصل بين الوكلاء | بروتوكول A2A | جسر JSON-RPC 2.0 لكي تتمكّن النماذج الوكيلة من إطارات عمل مختلفة من التواصل |
المنظّم على المستوى الأعلى | ADK (BaseAgent) | تلقّي الطلبات، والتفويض من خلال A2A، وإعادة التخطيط عند حدوث خطأ |
تجربة التطبيق: إذا كان متاحًا، جرِّب نظام الإنتاج الكامل على https://scale-control-room-761793285222.us-central1.run.app. يوسّع هذا النظام نطاق ما ستنشئه هنا من خلال لوحة بيانات في الوقت الفعلي وبروتوكول A2A وأمان إدارة الهوية وإمكانية الوصول (IAM).
الإجراءات التي ستنفذّها
- تحديد أدوات مخصّصة يمكن للوكلاء استخدامها
- يمكنك إنشاء وكلاء متخصصين باستخدام CrewAI.
- أنشئ أداة تخطيط آلة حالة باستخدام LangGraph.
- تنسيق عملية التواصل بين المخطِّط وفريق التنفيذ
- يمكنك تضمين أداة التخطيط في خادم بروتوكول A2A لإتاحة التواصل بين الأُطر.
- إنشاء غرفة تحكّم في حزمة تطوير التطبيقات ذات مستوى أعلى يتم فيها التفويض من خلال A2A وإعادة التخطيط عند حدوث خطأ
المتطلبات
- متصفّح ويب، مثل Chrome
- مشروع Google Cloud تم تفعيل الفوترة فيه
هذا الدرس التطبيقي حول الترميز مخصّص للمطوّرين المتوسطي المستوى الذين لديهم خبرة في لغة Python والمفاهيم الأساسية للنماذج اللغوية الكبيرة.
المدة المقدَّرة: 35 دقيقة
تقدير التكلفة: يجب أن تكون تكلفة الموارد التي تم إنشاؤها في هذا الدرس التطبيقي حول الترميز أقل من دولار واحد.
2. قبل البدء
إنشاء مشروع على Google Cloud
- في Google Cloud Console، في صفحة اختيار المشروع، اختَر مشروعًا على Google Cloud أو أنشِئ مشروعًا.
- تأكَّد من تفعيل الفوترة لمشروعك على السحابة الإلكترونية. كيفية التحقّق مما إذا كانت الفوترة مفعَّلة في مشروع
بدء Cloud Shell
Cloud Shell هي بيئة سطر أوامر تعمل في Google Cloud ومحمّلة مسبقًا بالأدوات اللازمة.
- انقر على تفعيل Cloud Shell في أعلى "وحدة تحكّم Google Cloud".
- بعد الاتصال بـ Cloud Shell، تحقَّق من مصادقتك باتّباع الخطوات التالية:
gcloud auth list - تأكَّد من إعداد مشروعك باتّباع الخطوات التالية:
gcloud config get project - إذا لم يتم ضبط مشروعك على النحو المتوقّع، اضبطه باتّباع الخطوات التالية:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
تفعيل واجهات برمجة التطبيقات
نفِّذ الأمر التالي لتفعيل واجهة برمجة التطبيقات Vertex AI API:
gcloud services enable aiplatform.googleapis.com
ملاحظة: تتم المصادقة تلقائيًا في Cloud Shell باستخدام حسابك على Google Cloud. إذا كنت تنفّذ هذا الدرس التطبيقي حول الترميز خارج Cloud Shell، عليك تنفيذ gcloud auth application-default login للمصادقة باستخدام Vertex AI.
إعداد البيئة
في Cloud Shell، أنشئ دليلًا جديدًا لمشروعك وانتقِل إليه:
mkdir scale-agents
cd scale-agents
ثبِّت uv واستخدِمه لتثبيت الحِزم المطلوبة:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
اضبط متغيّرات البيئة في Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3- تحديد الأدوات والوكلاء
في نظام متعدد الوكلاء، تحتاج الوكلاء إلى أدوات للتفاعل مع العالم، وأدوار محددة لمعرفة ما يجب فعله.
أنشئ ملفًا باسم scale_agents.py وأضِف الرمز التالي. يؤدي ذلك إلى إعداد عمليات الاستيراد والأدوات التجريبية ووكلاء CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
المفاهيم الرئيسية
@toolأداة التزيين: تستخدم CrewAI هذه الأداة لتحويل دوال Python العادية إلى أدوات يمكن للنماذج اللغوية الكبيرة فهمها واستدعاؤها. يتم استخدام تلميحات النوع وسلسلة التوثيق الخاصة بالدالة لإنشاء مخطط أداة يمكن للنموذج اللغوي الكبير فهمه.- الدور والهدف والقصة الخلفية: تحدّد هذه العناصر شخصية الوكيل وتوجّه عملية الاستدلال في النموذج اللغوي الكبير. لا يقتصر دور القصة الخلفية على تقديم معلومات إضافية، بل إنّ عبارة "تبحث دائمًا في الفهرس" تشجّع الوكيل على استخدام أدواته بدلاً من تقديم إجابات من نسج الخيال.
reasoning=False: يؤدي إلى إيقاف الاستدلال الموسّع، وبالتالي يتبع الوكيل حلقة استدعاء الأدوات العادية بدلاً من محاولة تقديم إجابة مباشرة.allow_delegation=False: يركّز كل وكيل على الأدوات المخصّصة له بدلاً من إحالة العمل إلى وكلاء آخرين.
لماذا وكيلان بدلاً من وكيل واحد؟ لكل وكيل أدوات مختلفة ومهام مختلفة. يبحث "أخصائي التوريد" عن المنتجات فقط، بينما يتعامل "مسؤول المشتريات" مع الميزانيات والطلبات فقط. يعني فصل المهام هذا أنّ كل وكيل لديه طلب مركّز ومجموعة أدوات صغيرة ذات صلة، ما يؤدي إلى سلوك أكثر موثوقية للنموذج اللغوي الكبير مقارنةً بوكيل واحد يتولّى كل شيء.
4. تحديد المهام وطاقم العمل
لنحدّد الآن المهام التي يجب أن تنفّذها هذه الوكلاء من خلال إنشاء مهام وربطها بفريق.
أضِف الرمز التالي في نهاية ملف scale_agents.py نفسه:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
المفاهيم الرئيسية
- سياق المهمة: يخبر
context=[sourcing_task]CrewAI أنّ مهمة الشراء تحتاج إلى نتائج مهمة تحديد المصادر للمتابعة. يمكن لمسؤول المشتريات الاطّلاع على ما عثر عليه أخصائي التوريد قبل اتخاذ قرار بشأن ما سيتم طلبه. - Process.sequential: يتم تنفيذ المهام بالترتيب الذي تم إدراجها به. هذا مهم لأنّ مهمة الشراء تعتمد على نتائج مهمة تحديد المصادر، فلا يمكنك تقديم طلب قبل معرفة المنتج الذي تريد شراءه.
memory=False/planning=False: يؤدي إلى إيقاف ميزات الذاكرة والتخطيط المضمّنة في CrewAI للحفاظ على بساطة التنفيذ وإمكانية توقّعه في هذا العرض التوضيحي.
5- إنشاء أداة LangGraph Planner
يتولّى فريق التنفيذ مهمة "كيفية" تنفيذ المهام، أي البحث عن المنتجات والتحقّق من الميزانيات وتقديم الطلبات. ولكن من يقرّر "ماذا"؟ هذا هو وكيل التخطيط، الذي تم إنشاؤه باستخدام LangGraph.
تُصمّم نماذج LangGraph سير العمل على شكل آلة حالات، أي رسم بياني للعُقد (الدوال) المرتبطة بالحواف (عمليات الانتقال). تنتقل الحالة عبر الرسم البياني، حيث تقرأ كل عُقدة من الحالة المشترَكة وتكتب إليها. هذا النوع من التخطيط مناسب تمامًا لسير العمل الذي يتطلّب تدفّقًا واضحًا ومحدّدًا للتحكّم: تحليل الطلب، وتفويضه إلى الطاقم، وإنشاء تقرير.
أضِف الرمز التالي في نهاية ملف scale_agents.py نفسه:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
المفاهيم الرئيسية
- StateGraph: تحدّد آلة الحالة.
PlanStateهي الحالة المكتوبة التي تتراكم مع معالجة كل عقدة للطلب. - العُقد: هي دوال تأخذ الحالة الحالية وتعرض التعديلات عليها. لكل عقدة مسؤولية واحدة، وهي:
analyze_alertاستخراج النية، وdelegate_to_executorتشغيل الطاقم، وgenerate_reportتلخيص النتيجة. - الحواف: تحدّد التدفق بين العُقد. في هذا الدرس التطبيقي حول الترميز، نستخدم مسارًا خطيًا بسيطًا (
analyze → delegate → report). يوسّع ورشة العمل الكاملة هذا المسار من خلال التوجيه الشرطي، على سبيل المثال، توجيه الطلبات المدمرة إلى مسار أمان بدلاً من المنفّذ.
لماذا نستخدم LangGraph في أداة التخطيط؟ تُعدّ CrewAI رائعة بالنسبة إلى الوكلاء الذين يستدعون الأدوات، ولكن يحتاج المخطِّط إلى تدفّق تحكّم حتمي، أي "إذا كان الإجراء مدمِّرًا، انتقِل إلى مسار الأمان، وإلا فوِّض". يؤدي نموذج آلة الحالة في LangGraph إلى جعل عملية التوجيه هذه واضحة وقابلة للاختبار، بينما تتعامل CrewAI مع تنفيذ الأداة الحرة أدناه.
6. تشغيل "المخطِّط" و"الفريق"
لنختبر الآن أداة التخطيط LangGraph وفريق CrewAI معًا.
في وحدة Cloud Shell الطرفية، شغِّل النص البرمجي:
uv run python scale_agents.py
من المفترض أن تظهر لك نتائج تشير إلى الخطوات التي يتم اتخاذها:
- تنبيه بشأن التحليل: يتم تشغيل عقدة LangGraph.
- التفويض إلى Crew: تنفّذ عقدة LangGraph مكالمة إلى CrewAI.
- تنفيذ CrewAI: سترى "أخصائي تحديد المصادر" يبحث عن المنتج و"مسؤول المشتريات" يتحقّق من الميزانية وينشئ طلب الشراء.
- التقرير النهائي: ستتم طباعة النتيجة الموجزة في النهاية.
مثال على الناتج (مختصَر):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
ملاحظة: قد تظهر لك رسائل [CrewAIEventsBus] Warning: Event pairing mismatch في الناتج. هذه تحذيرات شكلية من ميزة تتبُّع الأحداث الداخلية في CrewAI ويمكن تجاهلها بأمان.
ملاحظة: قد تعرض CrewAI رسالة تفيد بأنّ ميزة التتبُّع غير مفعَّلة. هذه الرسالة إعلامية ويمكن تجاهلها بأمان.
ملاحظة: يبلغ الحدّ الأقصى للميزانية 100 دولار أمريكي في نظام إدارة الطلبات التجريبي. يجب أن تكون الكميات صغيرة (أقل من وحدتَين تقريبًا) لكي تنجح المسار السعيد. على سبيل المثال، يجتاز هاتف Pixel 7 واحد بقيمة 50 دولارًا أمريكيًا عملية التحقّق من الميزانية، ولكن سيتم رفض 3 هواتف بقيمة 150 دولارًا أمريكيًا باعتبارها "تتجاوز الميزانية".
7. تضمين "أداة التخطيط" في خادم A2A
يعمل مخطّط LangGraph، ولكنّه محصور داخل عملية Python. لإتاحة إمكانية استدعائه من قِبل وكلاء آخرين، قد يكونون مكتوبين بأُطر مختلفة أو يعملون على أجهزة مختلفة، نغلّفه في خادم A2A (من وكيل إلى وكيل).
A2A هو بروتوكول يستند إلى JSON-RPC 2.0 ويوحّد طريقة تواصل الوكلاء. المفاهيم الأساسية:
الفكرة | الغرض |
بطاقة الوكيل | بيانات وصفية بتنسيق JSON تصف إمكانات الوكيل (يتم عرضها على |
| طريقة JSON-RPC لإرسال مهمة إلى الوكيل |
المهمة | وحدة عمل لها حالة (تم إرسالها → قيد التنفيذ → مكتملة/تعذّر التنفيذ) |
القطع الأثرية | النتائج الوسيطة والنهائية المرفقة بمهمة |
إنشاء ملف جديد a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
المفاهيم الرئيسية
- بطاقة الوكيل: يتم عرضها على
/.well-known/agent-card.json، ويمكن لأي وكيل معرفة وظيفة هذا الخادم من خلال جلب عنوان URL هذا. وهي تسرد مهارات الوكيل وأنواع المحتوى المتوافقة وقدراته. AgentExecutor.execute(): الطريقة الوحيدة التي تنفّذها. يتلقّى الطلب الوارد، ويشغّل منطق الوكيل (هنا، أداة التخطيط LangGraph)، ويرسل النتائج مرة أخرى كعناصر.TaskUpdater: يدير دورة حياة المهمة، حيث يرسلadd_artifact()النتائج الوسيطة أو النهائية، ويضعcomplete()علامة "مكتملة" على المهمة. تتولّى مكتبة A2A جميع عمليات JSON-RPC.
اختبِر خادم A2A من خلال بدء تشغيله في نافذة طرفية:
uv run python a2a_planner.py
افتح علامة تبويب أخرى في Cloud Shell (انقر على + بجانب علامة التبويب الحالية) وتأكَّد من عرض "بطاقة الوكيل":
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
من المفترَض أن يظهر لك ملف JSON الخاص ببطاقة الوكيل. أبقِ خادم A2A قيد التشغيل في نافذة الوحدة الطرفية الأولى لتنفيذ الخطوة التالية.
8. إنشاء "غرفة التحكّم" في ADK
في أعلى الحزمة، تقع غرفة التحكّم، وهي مصمّمة باستخدام حزمة تطوير الوكلاء (ADK) من Google. يتلقّى الطلب من المستخدم، ويفوّضه إلى "المخطِّط" من خلال A2A، ويقيّم النتيجة، والأهم من ذلك أنّه يتعامل مع إعادة التخطيط عند حدوث خطأ (رحلة المستخدم الأساسية 2).
توفّر حزمة تطوير البرامج (ADK) عناصر أساسية للوكيل، مثل BaseAgent وLlmAgent وInMemoryRunner. نستخدم فئة فرعية من BaseAgent لكتابة منطق تنسيق مخصّص، مثل طلبات A2A وتصنيف التقارير وإعادة التخطيط الديناميكي باستخدام وكيل فرعي LlmAgent.
إنشاء ملف جديد control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
المفاهيم الرئيسية
-
BaseAgent: عنصر ADK الأساسي للوكلاء المخصّصين. يمكنك إنشاء فئة فرعية منه وتجاوز_run_async_implلكتابة منطق تنسيق غير محدود وغير متزامن، مثل حلقة الاتصال من تطبيق إلى تطبيق + التصنيف + إعادة التخطيط. - طلب A2A JSON-RPC: يرسل "مركز التحكّم" طلب
message/sendعاديًا إلى خادم A2A الخاص بالمخطِّط باستخدامhttpxويحلّل استجابة JSON-RPC لاستخراج التقرير النهائي. _classify_report(): تصنيف بسيط يستند إلى الكلمات الرئيسية ويحدّد ما إذا كان التقرير يشير إلى نجاح أو تعذُّر قابل لإعادة المحاولة أو تعذُّر نهائي. يؤدي ذلك إلى تشغيل حلقة إعادة التخطيط.- استدعاء الوكيل الفرعي: لإعادة التخطيط، تنشئ "غرفة التحكّم"
LlmAgentوتنفّذها من خلال إنشاءInvocationContextفرعي واستدعاءreplanner.run_async(child_ctx). يتيح لك ذلك إنشاء وكلاء نماذج لغوية كبيرة بشكل ديناميكي داخل منطق التنسيق المخصّص. InMemoryRunner: تشغيل الوكيل محليًا باستخدام مخزن جلسات داخل الذاكرة في مرحلة الإنتاج، يمكنك استخدامadk deployللتوزيع على Vertex AI Agent Engine.
9- تشغيل الحزمة الكاملة
لنختبر الآن النظام الكامل المكوّن من ثلاث طبقات: "غرفة التحكم في حزمة تطوير التطبيقات" → A2A → "مخطط LangGraph" → "فريق CrewAI".
استخدِم علامة التبويب الثانية في Cloud Shell التي فتحتها سابقًا (أو انقر على + لفتح علامة تبويب جديدة) وشغِّل "غرفة التحكّم". ملاحظة مهمة: تحتوي كل علامة تبويب في Cloud Shell على جلسة shell خاصة بها. يجب ضبط متغيرات المشروع والبيئة مرة أخرى:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
من المفترض أن يظهر لك مسار التنسيق الكامل:
- تتلقّى غرفة التحكم الطلب وترسل طلب
message/sendJSON-RPC إلى خادم A2A - يتلقّى خادم A2A الطلب ويستدعي مخطّط LangGraph
- يحلّل مخطِّط LangGraph الطلب ويفوّضه إلى فريق CrewAI.
- يدير فريق CrewAI وكلاء التوريد والشراء
- تنتقل النتيجة إلى "غرفة التحكّم".
رحلات المستخدم الأساسية (CUJ)
جرِّب تعديل السلسلة prompt في control_room.py لتجربة السيناريوهات التالية:
رحلة المستخدم الأساسية (CUJ) | الطلب | ما يحدث |
1. المسار الخالي من المشاكل |
| البحث -> التحقّق من الميزانية -> طلب الشراء (ناجح) تعمل بشكل متكامل. |
2. إعادة التخطيط |
| يعرض المخطِّط رسالة الخطأ "تعذّر التنفيذ: عنصر غير معروف". ترصد "غرفة التحكّم" ذلك وتستدعي أداة إعادة تخطيط |
لاختبار رحلة المستخدم الأساسية 2 (إعادة التخطيط)، غيِّر prompt في control_room.py إلى:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
لن يتعرّف المخطِّط المبرمَج على هذا العنصر وسيعرض الرسالة "تعذّر التنفيذ: عنصر غير معروف". سيرصد "غرفة التحكّم" هذا الخطأ، وسينشئ ديناميكيًا أداة LlmAgent لإعادة التخطيط، ثم سيعيد المحاولة باستخدام هدف أوسع. بما أنّ "المخطِّط" لا يتعرّف إلا على "Pixel 7"، ستتعذّر إعادة المحاولة أيضًا، ولكن سيظهر لك مسار إعادة التخطيط الكامل أثناء التنفيذ. سيكون الناتج النهائي FAILED after 2 attempts: ....
10. تَنظيم
لتجنُّب فرض رسوم مستمرة على حسابك على Google Cloud، يمكنك حذف الموارد التي تم إنشاؤها أثناء هذا الدرس العملي. يمكنك ببساطة إزالة الدليل الذي أنشأته:
cd ..
rm -rf scale-agents
11. تهانينا
تهانينا! لقد أنشأت بنجاح نظامًا لتنسيق الوكلاء المتعدّدين باستخدام CrewAI وLangGraph وA2A Protocol وADK.
ما تعلّمته
- كيفية تحديد أدوات للوكلاء باستخدام أداة
@toolفي CrewAI - كيفية إنشاء وكلاء متخصصين بأدوار وأدوات وأهداف مختلفة
- كيفية ربط العملاء التسلسليين بفريق تسلسلي مع تبعيات المهام
- كيفية إنشاء مخطّط آلة الحالة باستخدام LangGraph الذي يفوّض إلى الفريق
- كيفية عرض أداة التخطيط كخدمة A2A باستخدام
AgentCardوAgentExecutor - كيفية إنشاء
BaseAgentمخصّص من ADK يفوّض المهام من خلال A2A ويعيد التخطيط عند حدوث خطأ من خلال استدعاء وكيل فرعيLlmAgent - لماذا يمنحك فصل التخطيط والتنفيذ والتنسيق بين الأُطر المرونة والقدرة على التكيّف؟
مزيد من المعلومات
توسّع ورشة العمل الكاملة هذا النظام من خلال ما يلي:
- لوحة البيانات في الوقت الفعلي: بث SSE لعرض تقدّم عدة وكلاء
- Identity Shield: أمان مستند إلى إدارة الهوية وإمكانية الوصول (IAM) يحظر الإجراءات المدمرة على مستوى البنية الأساسية، وليس على مستوى الطلب
- Vertex AI Agent Engine: يمكنك نشر وكيل ADK على بنية أساسية مُدارة على السحابة الإلكترونية باستخدام
adk deploy