1. מבוא
בשיעור Codelab הזה נסביר איך ליצור מערכת לתזמור של כמה סוכנים באמצעות CrewAI, LangGraph, פרוטוקול A2A ו-ADK (ערכה לפיתוח סוכנים). תצרו מערכת שבה חדר בקרה של ADK מעביר את תכנון המשימות למתכנן LangGraph, ששולח משימות לצוות ביצוע של CrewAI – והכול מחובר באמצעות A2A – כדי לטפל בתרחיש של מילוי מחדש של מלאי קמעונאי.
מהי תזמורת של כמה סוכנים?
במערכת מרובת סוכנים, כמה סוכני AI מתמחים משתפים פעולה כדי לבצע משימות שהן מורכבות מדי לסוכן יחיד. במקום סוכן מונוליטי אחד שעושה הכול, אתם מפרקים את הבעיה לתפקידים – מתכנן ומבצע – שלכל אחד מהם יש כלים ומומחיות משלו.
זה דומה לאופן שבו ארגונים אנושיים פועלים: מנהל מאציל את האסטרטגיה לאנליסטים ואת הביצוע למומחים. היתרונות כוללים:
- הפרדה בין תחומים: כל סוכן מתמקד במה שהוא עושה הכי טוב
- גמישות המסגרת: שימוש במסגרת הטובה ביותר לכל תפקיד (LangGraph ללוגיקת תכנון, CrewAI להפעלת כלים)
- יכולת הרחבה: אפשר להוסיף סוכנים מיוחדים בלי לשנות את כל המערכת
התרחיש
כשמשתמש שולח בקשה לחידוש מלאי כמו "Restock 1 Pixel 7 phone for the Tokyo office", המערכת:
- LangGraph Planner מנתח את הבקשה ושולף את הפריט והכמות
- המתכנן מעביר את ההפעלה אל צוות ההפעלה של CrewAI
- סוכן Sourcing Specialist מחפש בקטלוג המוצרים באמצעות כלים
- סוכן קצין רכש מאמת את התקציב ומבצע הזמנת רכש באמצעות כלים
- התוצאה מועברת חזרה לכלי לתכנון, שיוצר דוח סופי
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
סטאק תוכנות
שכבה | טכנולוגיה | תפקיד |
תכנון | LangGraph | מכונת מצבים שמנתחת כוונות, מנתבת בקשות ומפיקה דוחות |
הרצה | CrewAI | סוכנים מבוססי-תפקיד שקוראים לכלים באופן עקבי |
LLM | Gemini ב-Vertex AI | המודל הזה משפר את החשיבה הרציונלית של הסוכן ואת בחירת הכלים שלו |
תקשורת בין נציגים | פרוטוקול A2A | גשר JSON-RPC 2.0 כדי שסוכנים ממסגרות שונות יוכלו לתקשר |
Top-level orchestrator | ADK (BaseAgent) | מקבל בקשות, מעביר אותן באמצעות A2A, מתכנן מחדש במקרה של כשל |
כך זה נראה בפועל: אם המערכת זמינה, אפשר לנסות את מערכת הייצור המלאה בכתובת https://scale-control-room-761793285222.us-central1.run.app. המערכת הזו מרחיבה את מה שתבנו כאן באמצעות לוח בקרה בזמן אמת, פרוטוקול A2A ואבטחת IAM.
הפעולות שתבצעו:
- הגדרת כלים מותאמים אישית שהסוכנים יוכלו להשתמש בהם.
- יצירת סוכנים מיוחדים באמצעות CrewAI.
- יצירת מתכנן של מכונת מצבים באמצעות LangGraph.
- לנהל את התהליך בין צוות התכנון לצוות הביצוע.
- כדי לאפשר תקשורת בין מסגרות שונות, עוטפים את המתכנן בשרת פרוטוקול A2A.
- בונים ADK Control Room ברמה העליונה שמקצה משימות באמצעות A2A ומתכנן מחדש במקרה של כשל.
הדרישות
- דפדפן אינטרנט כמו Chrome
- פרויקט ב-Google Cloud שהחיוב בו מופעל
ה-Codelab הזה מיועד למפתחים ברמת ביניים שמכירים את Python ואת המושגים הבסיסיים של מודלים גדולים של שפה (LLM).
משך הזמן המשוער: 35 דקות.
הערכת עלות: העלות של המשאבים שייווצרו ב-codelab הזה צריכה להיות נמוכה מ-1$.
2. לפני שמתחילים
יצירת פרויקט ב-Google Cloud
- במסוף Google Cloud, בדף לבחירת הפרויקט, בוחרים פרויקט ב-Google Cloud או יוצרים פרויקט.
- הקפידו לוודא שהחיוב מופעל בפרויקט שלכם ב-Cloud. כך בודקים אם החיוב מופעל בפרויקט
הפעלת Cloud Shell
Cloud Shell היא סביבת שורת פקודה שפועלת ב-Google Cloud וכוללת מראש את הכלים הנדרשים.
- לוחצים על Activate Cloud Shell בחלק העליון של מסוף Google Cloud.
- אחרי שמתחברים ל-Cloud Shell, מאמתים את האימות:
gcloud auth list - מוודאים שהפרויקט מוגדר:
gcloud config get project - אם הפרויקט לא מוגדר כמו שציפיתם, מגדירים אותו:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
הפעלת ממשקי ה-API
מריצים את הפקודה הבאה כדי להפעיל את Vertex AI API:
gcloud services enable aiplatform.googleapis.com
הערה: Cloud Shell מבצע אימות אוטומטי באמצעות חשבון Google Cloud שלכם. אם אתם מריצים את ה-codelab הזה מחוץ ל-Cloud Shell, תצטרכו להריץ את הפקודה gcloud auth application-default login כדי לבצע אימות ב-Vertex AI.
הגדרת הסביבה
ב-Cloud Shell, יוצרים ספרייה חדשה לפרויקט ועוברים אליה:
mkdir scale-agents
cd scale-agents
מתקינים את uv ומשתמשים בו כדי להתקין את החבילות הנדרשות:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
מגדירים את משתני הסביבה של Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. הגדרת כלים וסוכנים
במערכת עם כמה סוכנים, הסוכנים צריכים כלים כדי לקיים אינטראקציה עם העולם, ותפקידים ספציפיים כדי לדעת מה לעשות.
יוצרים קובץ בשם scale_agents.py ומוסיפים את הקוד הבא. הפעולה הזו מגדירה את הייבוא, את כלי הדמה ואת סוכני CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
מושגים מרכזיים
-
@tooldecorator: CrewAI משתמשת ב-decorator הזה כדי להפוך פונקציות רגילות של Python לכלים ש-LLM יכולים להבין ולקרוא להם. ההערות על סוגי הנתונים והמחרוזת של התיעוד של הפונקציה משמשות ליצירת סכימת כלי שה-LLM יכול להבין. - תפקיד, מטרה וסיפור רקע: ההגדרות האלה מגדירות את הפרסונה של הסוכן ומנחות את ה-LLM שלו בתהליך החשיבה. הרקע הוא לא רק טקסט שמוסיף אווירה – ההנחיה "תמיד תחפש בקטלוג" מעודדת את הסוכן להשתמש בכלים שלו במקום להמציא תשובות.
-
reasoning=False: משבית את החשיבה הרציונלית המורחבת, כך שהסוכן פועל לפי לולאת קריאת הכלים הרגילה במקום לנסות לענות ישירות. -
allow_delegation=False: כל סוכן נשאר ממוקד בכלים שהוקצו לו, במקום להעביר עבודה לסוכנים אחרים.
למה שני נציגים במקום אחד? לכל סוכן יש כלים שונים ותפקיד שונה. המומחה לאיתור מקורות מחפש רק מוצרים, והקצין לרכש מטפל רק בתקציבים ובהזמנות. הפרדה בין תחומים כזו מאפשרת לכל סוכן לקבל הנחיה ממוקדת וערכת כלים קטנה ורלוונטית, וכך מתקבלת התנהגות אמינה יותר של מודל שפה גדול בהשוואה למצב שבו סוכן יחיד מטפל בכל המשימות.
4. הגדרת המשימות והצוות
עכשיו נגדיר מה הסוכנים האלה צריכים לעשות על ידי יצירת משימות וחיבור שלהן לצוות.
מוסיפים את הקוד הבא בסוף של אותו קובץ scale_agents.py:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
מושגים מרכזיים
- הקשר המשימה:
context=[sourcing_task]אומר ל-CrewAI שהמשימה של הרכש צריכה את הפלט של משימת המקור כדי להמשיך. קצין הרכש יכול לראות מה מצא מומחה המקורות לפני שהוא מחליט מה להזמין. - Process.sequential: המשימות מבוצעות לפי הסדר שבו הן מופיעות. זה חשוב כי משימת הרכש תלויה בתוצאות של משימת המקור – אי אפשר לבצע הזמנה לפני שיודעים איזה מוצר לקנות.
-
memory=False/planning=False: השבתה של תכונות הזיכרון והתכנון המובנות של CrewAI, כדי שההפעלה תהיה פשוטה וצפויה בהדגמה הזו.
5. יצירת כלי לתכנון LangGraph
צוות הביצוע אחראי על ה'איך' – חיפוש מוצרים, בדיקת תקציבים, ביצוע הזמנות. אבל מי מחליט מה צריך לקרות? זהו הסוכן לתכנון, שנבנה באמצעות LangGraph.
מודלים של LangGraph מציגים תהליכי עבודה כמכונת מצבים – גרף של צמתים (פונקציות) שמחוברים על ידי קצוות (מעברים). הסטטוס זורם בתרשים, וכל צומת קורא מהסטטוס המשותף וכותב אליו. התכונה הזו מתאימה במיוחד לתכנון תהליכי עבודה שבהם נדרש זרימת בקרה ברורה ודטרמיניסטית: ניתוח הבקשה, העברה לצוות ויצירת דוח.
מוסיפים את הקוד הבא בסוף של אותו קובץ scale_agents.py:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
מושגים מרכזיים
- StateGraph: הגדרה של מכונת המצבים.
PlanStateהוא המצב המוקלד שמצטבר כשכל צומת מעבד את הבקשה. - צמתים: פונקציות שמקבלות את המצב הנוכחי ומחזירות עדכונים שלו. לכל צומת יש אחריות אחת –
analyze_alertחילוץ כוונות,delegate_to_executorהפעלת הצוות,generate_reportסיכום התוצאה. - קצוות: מגדירים את הזרימה בין הצמתים. ב-Codelab הזה אנחנו משתמשים בתהליך ליניארי פשוט (
analyze → delegate → report). בסדנה המלאה אנחנו מרחיבים את התהליך הזה עם ניתוב מותנה – לדוגמה, ניתוב בקשות הרסניות לנתיב אבטחה במקום למבצע.
למה כדאי להשתמש ב-LangGraph לתכנון? CrewAI מצוין לסוכנים שקוראים לכלים, אבל המתכנן צריך זרימת בקרה דטרמיניסטית – "אם הפעולה הרסנית, צריך לעבור לנתיב האבטחה; אחרת, צריך להקצות את הפעולה לסוכן אחר". מודל מכונת המצבים של LangGraph הופך את הניתוב הזה למפורש ולניתן לבדיקה, בעוד ש-CrewAI מטפל בהפעלת כלי הטופס החופשי שלמטה.
6. רוצו על זה עם ג'ייק והחברים
עכשיו נבדוק את מתכנן LangGraph ואת צוות CrewAI ביחד.
בטרמינל של Cloud Shell, מריצים את הסקריפט:
uv run python scale_agents.py
הפלט אמור להראות את השלבים שבוצעו:
- ניתוח ההתראה: הצומת LangGraph פועל.
- העברה לצוות: הצומת LangGraph קורא לצוות CrewAI.
- ביצוע ב-CrewAI: תוכלו לראות את מומחה המקורות מחפש את המוצר ואת קצין הרכש בודק את התקציב ויוצר את הזמנת הרכש.
- דוח סופי: התוצאה המסוכמת תודפס בסוף.
פלט לדוגמה (מקוצר):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
הערה: יכול להיות שתראו [CrewAIEventsBus] Warning: Event pairing mismatch הודעות בפלט. אלה אזהרות קוסמטיות ממעקב אחר אירועים הפנימי של CrewAI, ואפשר להתעלם מהן בבטחה.
הערה: יכול להיות ש-CrewAI יציג הודעה על כך שהמעקב מושבת. זוהי הודעה אינפורמטיבית שאפשר להתעלם ממנה.
הערה: ב-OMS המדומה יש מגבלת תקציב של 100$. כדי שהתרחיש הנפוץ יצליח, חשוב לשמור על כמויות קטנות (מתחת ל-2 יחידות). לדוגמה, טלפון Pixel 7 אחד במחיר 50 $יעבור את בדיקת התקציב, אבל 3 יחידות במחיר 150 $יידחו עם ההערה 'חריגה מהתקציב'.
7. עטיפת הכלי לתכנון היקף החשיפה בשרת A2A
המתכנן של LangGraph פועל, אבל הוא כלוא בתהליך Python. כדי שסוכנים אחרים יוכלו להפעיל אותו – יכול להיות שהם נכתבו במסגרות שונות או פועלים במכונות שונות – אנחנו עוטפים אותו בשרת A2A (סוכן לסוכן).
A2A הוא פרוטוקול שמבוסס על JSON-RPC 2.0 ומגדיר תקן לתקשורת בין סוכנים. מושגים מרכזיים:
קונספט | מטרה |
כרטיס סוכן | מטא-נתונים בפורמט JSON שמתארים את היכולות של הסוכן (מוצגים בכתובת |
| שיטת JSON-RPC לשליחת משימה לסוכן |
משימה | יחידת עבודה עם מצב (הוגשה ← בעבודה ← הושלמה/נכשלה) |
Artifacts | תוצרי ביניים ותוצרים סופיים שמצורפים למשימה |
יוצרים קובץ חדש a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
מושגים מרכזיים
- כרטיס הסוכן: מוגש בכתובת
/.well-known/agent-card.json– כל סוכן יכול לגלות מה השרת הזה עושה על ידי אחזור כתובת ה-URL הזו. הוא כולל את הכישורים, סוגי התוכן הנתמכים והיכולות של הסוכן. -
AgentExecutor.execute(): השיטה היחידה שאתם מטמיעים. הוא מקבל את הבקשה הנכנסת, מריץ את הלוגיקה של הסוכן (במקרה הזה, המתכנן של LangGraph) ושולח את התוצאות בחזרה כארטיפקטים. -
TaskUpdater: ניהול מחזור החיים של המשימה –add_artifact()שולח פלט ביניים או פלט סופי,complete()מסמן את המשימה כהושלמה. ספריית A2A מטפלת בכל הצינורות של JSON-RPC.
כדי לבדוק את שרת ה-A2A, מפעילים אותו במסוף:
uv run python a2a_planner.py
פותחים כרטיסייה נוספת ב-Cloud Shell (לוחצים על + לצד הכרטיסייה הנוכחית) ומוודאים שהכרטיס של הסוכן מוצג:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
אמור להופיע קובץ JSON של כרטיס הסוכן. משאירים את שרת A2A פועל במסוף הראשון לשלב הבא.
8. יצירת חדר הבקרה של ADK
בראש ה-Stack נמצא Control Room, שנבנה באמצעות ADK (הערכה לפיתוח סוכנים של Google). הוא מקבל את הבקשה של המשתמש, מעביר אותה לכלי לתכנון באמצעות A2A, מעריך את התוצאה, וחשוב מכך – מטפל בתכנון מחדש במקרה של כשל (CUJ 2).
ADK מספק פרימיטיבים של סוכנים כמו BaseAgent, LlmAgent ו-InMemoryRunner. אנחנו יוצרים מחלקת משנה של BaseAgent כדי לכתוב לוגיקת תזמור מותאמת אישית – קריאות A2A, סיווג דוחות ותכנון מחדש דינמי באמצעות סוכן משנה של LlmAgent.
יוצרים קובץ חדש control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
מושגים מרכזיים
-
BaseAgent: פרימיטיב ADK לסוכנים בהתאמה אישית. אתם יוצרים מחלקה משנית ומבטלים את_run_async_implכדי לכתוב לוגיקה שרירותית של תזמור אסינכרוני – כאן, הלולאה של קריאת A2A + סיווג + תכנון מחדש. - קריאה ל-JSON-RPC של A2A: חדר הבקרה שולח בקשת
message/sendרגילה לשרת ה-A2A של הכלי לתכנון באמצעותhttpxומנתח את התשובה של JSON-RPC כדי לחלץ את הדוח הסופי. -
_classify_report(): סיווג פשוט שמבוסס על מילות מפתח וקובע אם הפעולה הצליחה, נכשלה אבל אפשר לנסות שוב, או נכשלה ולא ניתן לנסות שוב. הפעולה הזו מפעילה את לולאת התכנון מחדש. - הפעלת סוכן משנה: כדי לתכנן מחדש, Control Room יוצר
LlmAgentומריץ אותו על ידי בנייתInvocationContextצאצא וקריאה ל-replanner.run_async(child_ctx). כך אפשר להפעיל באופן דינמי סוכני LLM בתוך לוגיקת תזמור מותאמת אישית. -
InMemoryRunner: מריץ את הסוכן באופן מקומי עם חנות סשנים בזיכרון. בסביבת ייצור, משתמשים ב-adk deployכדי לפרוס ל-Vertex AI Agent Engine.
9. הפעלת הפול סטאק
עכשיו נבדוק את המערכת המלאה בת שלוש השכבות: ADK Control Room → A2A → LangGraph Planner → CrewAI Crew.
משתמשים בכרטיסייה השנייה של Cloud Shell שפתחתם קודם (או לוחצים על + כדי לפתוח כרטיסייה חדשה) ומריצים את חדר הבקרה. חשוב: לכל כרטיסייה ב-Cloud Shell יש סשן משלה של מעטפת. צריך להגדיר מחדש את משתני הפרויקט ומשתני הסביבה:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
יוצג תרשים הזרימה המלא של התזמור:
- חדר הבקרה מקבל את הבקשה ושולח קריאת
message/sendJSON-RPC לשרת A2A - שרת ה-A2A מקבל את הבקשה ומפעיל את המתכנן של LangGraph
- המתכנן של LangGraph מנתח את הבקשה ומקצה אותה לצוות CrewAI
- צוות CrewAI מפעיל את הסוכנים של איתור מקורות ורכש
- התוצאה חוזרת לחדר הבקרה
תהליכי משתמש קריטיים (CUJ)
כדי להתנסות בתרחישים האלה, אפשר לנסות לשנות את המחרוזת prompt ב-control_room.py:
CUJ | הנחיה | מה קורה |
1. נתיב אופטימלי |
| חיפוש -> בדיקת תקציב -> הזמנת רכש (הצלחה). פועל מקצה לקצה. |
2. תכנון מחדש |
| הכלי לתכנון מחזיר את השגיאה Failed: Unknown item (נכשל: פריט לא ידוע). חדר הבקרה מזהה את זה ומפעיל מתכנן מסלול מחדש |
כדי לבדוק את CUJ 2 (תכנון מחדש), משנים את prompt ב-control_room.py ל:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
הכלי לתכנון שמוטמע בקוד לא יזהה את הפריט הזה ויחזיר את השגיאה Failed: Unknown item (נכשל: פריט לא מוכר). חדר הבקרה יזהה את הכשל, ייצור באופן דינמי מתכנן מחדש של LlmAgent וינסה שוב עם מטרה רחבה יותר. מכיוון שהכלי לתכנון מזהה רק את 'Pixel 7', הניסיון החוזר ייכשל גם הוא – אבל תוכלו לראות את כל לולאת התכנון מחדש בפעולה. הפלט הסופי יהיה FAILED after 2 attempts: ....
10. הסרת המשאבים
כדי להימנע מחיובים שוטפים בחשבון Google Cloud, אתם יכולים למחוק את המשאבים שנוצרו במהלך ה-codelab הזה. פשוט מסירים את הספרייה שיצרתם:
cd ..
rm -rf scale-agents
11. מזל טוב
מעולה! יצרתם בהצלחה מערכת לתזמור של כמה סוכנים באמצעות CrewAI, LangGraph, פרוטוקול A2A ו-ADK.
מה למדתם
- איך מגדירים כלים לסוכנים באמצעות ה-decorator
@toolשל CrewAI. - איך ליצור סוכנים מיוחדים עם תפקידים, כלים ויעדים שונים.
- איך מחברים סוכנים לצוות רציף עם יחסי תלות בין משימות.
- איך לבנות מתכנן מכונות מצבים באמצעות LangGraph שמוקצה לצוות.
- איך חושפים את הכלי לתכנון בתור שירות A2A באמצעות
AgentCardו-AgentExecutor. - איך ליצור ADK מותאם אישית
BaseAgentשמבצע העברה באמצעות A2A ומתכנן מחדש במקרה של כשל על ידי הפעלת סוכן משנהLlmAgent. - למה הפרדה בין תכנון, ביצוע ותיאום בין מסגרות מאפשרת מודולריות ועמידות.
המשך
הסדנה המלאה מרחיבה את המערכת הזו עם:
- לוח בקרה בזמן אמת – סטרימינג של SSE להצגת ההתקדמות של כמה סוכנים
- Identity Shield – אבטחה מבוססת-IAM שחוסמת פעולות הרסניות ברמת התשתית, ולא ברמת ההנחיה
- Vertex AI Agent Engine – פריסת סוכן ADK בתשתית ענן מנוהלת באמצעות
adk deploy