1. Giới thiệu
Trong lớp học lập trình này, bạn sẽ tìm hiểu cách xây dựng một hệ thống điều phối nhiều tác nhân bằng CrewAI, LangGraph, giao thức A2A và ADK (Bộ công cụ phát triển tác nhân). Bạn sẽ tạo một hệ thống trong đó phòng điều khiển ADK uỷ quyền lập kế hoạch cho một trình lập kế hoạch LangGraph, trình này sẽ điều phối các tác vụ cho một nhóm thực thi CrewAI – tất cả đều được kết nối thông qua A2A – để xử lý một tình huống bổ sung hàng tồn kho bán lẻ.
Điều phối nhiều tác nhân là gì?
Trong hệ thống đa tác nhân, nhiều tác nhân AI chuyên biệt sẽ cộng tác để hoàn thành những nhiệm vụ quá phức tạp đối với một tác nhân duy nhất. Thay vì một tác nhân nguyên khối làm mọi thứ, bạn sẽ phân tách vấn đề thành các vai trò – một người lập kế hoạch và một người thực thi – mỗi vai trò có công cụ và chuyên môn riêng.
Điều này phản ánh cách các tổ chức của con người hoạt động: nhà quản lý uỷ quyền chiến lược cho các nhà phân tích và uỷ quyền việc thực thi cho các chuyên gia. Các lợi ích bao gồm:
- Tách biệt vấn đề: Mỗi tác nhân tập trung vào những gì mà tác nhân đó làm tốt nhất
- Tính linh hoạt của khung: Sử dụng khung tốt nhất cho từng vai trò (LangGraph cho logic lập kế hoạch, CrewAI cho việc thực thi công cụ)
- Khả năng mở rộng: Thêm các tác nhân chuyên biệt mà không cần thay đổi toàn bộ hệ thống
Tình huống
Khi người dùng gửi yêu cầu bổ sung hàng như "Bổ sung 1 điện thoại Pixel 7 cho văn phòng Tokyo", hệ thống sẽ:
- LangGraph Planner phân tích yêu cầu và trích xuất mặt hàng cũng như số lượng
- Planner uỷ quyền thực thi cho CrewAI Execution Crew
- Một nhân viên Chuyên trách tìm nguồn cung ứng sẽ tìm kiếm danh mục sản phẩm bằng các công cụ
- Một tác nhân Nhân viên mua sắm xác thực ngân sách và đặt đơn đặt hàng bằng các công cụ
- Kết quả sẽ được chuyển lại cho trình lập kế hoạch, trình này sẽ tạo ra một báo cáo cuối cùng
User Request
│
▼
┌──────────────────────┐
│ ADK Control Room │ ← Top-level orchestrator, re-plans on failure
│ (BaseAgent) │
└──────────┬───────────┘
│ A2A (JSON-RPC)
▼
┌──────────────────────┐
│ LangGraph Planner │ ← Analyzes intent, delegates, reports
│ (State Machine) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ CrewAI Execution Crew│ ← Runs agents with tools
│ ├─ Sourcing Agent │ → search_products
│ └─ Procurement Agent│ → check_budget, create_purchase_order
└──────────────────────┘
Bộ phần mềm cơ sở
Lớp | Công nghệ | Vai trò |
Lập kế hoạch | LangGraph | Máy trạng thái phân tích ý định, định tuyến yêu cầu, tạo báo cáo |
Thực thi | CrewAI | Các tác nhân dựa trên vai trò gọi các công cụ theo trình tự |
LLM | Gemini trên Vertex AI | Hỗ trợ khả năng suy luận và lựa chọn công cụ của tác nhân |
Giao tiếp giữa các tác nhân | Giao thức A2A | Cầu nối JSON-RPC 2.0 để các tác nhân từ các khung khác nhau có thể giao tiếp |
Trình điều phối cấp cao nhất | ADK (BaseAgent) | Nhận yêu cầu, uỷ quyền thông qua A2A, lập kế hoạch lại khi thất bại |
Xem cách hoạt động: Nếu có thể, hãy dùng hệ thống phát hành công khai đầy đủ tại https://scale-control-room-761793285222.us-central1.run.app – hệ thống này mở rộng những gì bạn sẽ tạo ở đây bằng một trang tổng quan theo thời gian thực, giao thức A2A và bảo mật IAM.
Bạn sẽ thực hiện
- Xác định các công cụ tuỳ chỉnh để tác nhân sử dụng.
- Tạo các tác nhân chuyên biệt bằng CrewAI.
- Tạo trình lập kế hoạch máy trạng thái bằng LangGraph.
- Điều phối quy trình giữa người lập kế hoạch và nhóm thực hiện.
- Bao bọc trình lập kế hoạch trong một máy chủ Giao thức A2A để giao tiếp trên nhiều khung.
- Tạo một Phòng điều khiển ADK cấp cao nhất uỷ quyền thông qua A2A và lập kế hoạch lại khi có lỗi.
Bạn cần có
- Một trình duyệt web như Chrome
- Một dự án trên Google Cloud đã bật tính năng thanh toán
Lớp học lập trình này dành cho các nhà phát triển trung cấp đã quen thuộc với Python và các khái niệm cơ bản về LLM.
Thời lượng ước tính: 35 phút.
Ước tính chi phí: Các tài nguyên được tạo trong lớp học lập trình này sẽ có chi phí dưới 1 USD.
2. Trước khi bắt đầu
Tạo một dự án trên Google Cloud
- Trong Google Cloud Console, trên trang chọn dự án, hãy chọn hoặc tạo một dự án trên Google Cloud.
- Đảm bảo rằng bạn đã bật tính năng thanh toán cho dự án trên Cloud. Tìm hiểu cách kiểm tra xem tính năng thanh toán có được bật trên một dự án hay không.
Khởi động Cloud Shell
Cloud Shell là một môi trường dòng lệnh chạy trong Google Cloud và được tải sẵn các công cụ cần thiết.
- Nhấp vào Kích hoạt Cloud Shell ở đầu bảng điều khiển Cloud.
- Sau khi kết nối với Cloud Shell, hãy xác minh thông tin xác thực của bạn:
gcloud auth list - Xác nhận rằng dự án của bạn đã được định cấu hình:
gcloud config get project - Nếu dự án của bạn không được thiết lập như mong đợi, hãy thiết lập dự án:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Bật API
Chạy lệnh này để bật Vertex AI API:
gcloud services enable aiplatform.googleapis.com
Lưu ý: Cloud Shell tự động xác thực bằng tài khoản Google Cloud của bạn. Nếu đang chạy lớp học lập trình này bên ngoài Cloud Shell, bạn sẽ cần chạy gcloud auth application-default login để xác thực bằng Vertex AI.
Thiết lập môi trường
Trong Cloud Shell, hãy tạo một thư mục mới cho dự án của bạn rồi chuyển đến thư mục đó:
mkdir scale-agents
cd scale-agents
Cài đặt uv và dùng công cụ này để cài đặt các gói bắt buộc:
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.local/bin:$PATH"
uv init --no-workspace --no-readme
rm main.py
sed -i 's/requires-python = ">=3.12"/requires-python = ">=3.12,<3.14"/' pyproject.toml
uv add crewai 'litellm[google]' langgraph 'a2a-sdk>=0.3.25,<0.4' httpx uvicorn 'google-adk>=1.0.0' --prerelease=allow
Thiết lập các biến môi trường cho Vertex AI:
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
3. Xác định Công cụ và Agent
Trong hệ thống đa tác nhân, các tác nhân cần có công cụ để tương tác với thế giới và các vai trò cụ thể để biết cần làm gì.
Tạo một tệp có tên scale_agents.py rồi thêm đoạn mã sau. Thao tác này sẽ thiết lập các hoạt động nhập, công cụ mô phỏng và các tác nhân CrewAI.
import os
from typing import Optional
from crewai import Agent, Crew, Process, Task, LLM
from crewai.tools import tool
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
# CrewAI requires this env var even when using Vertex AI
os.environ["OPENAI_API_KEY"] = "NA"
# Set the project ID for Vertex AI
os.environ["VERTEXAI_PROJECT"] = os.getenv("GOOGLE_CLOUD_PROJECT", "")
os.environ["VERTEXAI_LOCATION"] = "us-central1"
# Initialize the LLM to use Vertex AI
llm = LLM(
model="vertex_ai/gemini-2.5-flash",
temperature=0.0,
max_tokens=4096,
)
# --- Step 1: Define Tools ---
@tool("search_products")
def search_products(query: str) -> list:
"""Search for products in the catalog."""
# Mock product catalog
products = [
{"product_id": "pixel-7", "name": "Pixel 7 Phone", "price": 50.0},
{"product_id": "pixel-8", "name": "Pixel 8 Phone", "price": 80.0},
]
return [p for p in products if query.lower() in p["name"].lower()]
@tool("check_budget")
def check_budget(amount: float) -> dict:
"""Check if a purchase amount is within the budget."""
limit = 100.0
if amount <= limit:
return {"approved": True, "remaining": limit - amount}
return {"approved": False, "reason": f"Exceeds budget of ${limit}"}
@tool("create_purchase_order")
def create_purchase_order(product_id: str, quantity: int) -> dict:
"""Create a purchase order for a product."""
return {
"status": "SUCCESS",
"po_id": f"PO-{product_id}-{quantity}",
"message": f"Successfully ordered {quantity} units of {product_id}."
}
# --- Step 2: Define Agents ---
sourcing_agent = Agent(
role="Sourcing Specialist",
goal="Find the best available products that match the intent of the request. You MUST use the search_products tool to look up products -- never make up product data.",
backstory="You are a veteran procurement specialist with an eye for detail. You always search the catalog before recommending a product.",
tools=[search_products],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
procurement_agent = Agent(
role="Procurement Officer",
goal="Validate the purchase against budget constraints and execute the order. You MUST use the check_budget tool before ordering and the create_purchase_order tool to place the order.",
backstory="You are the gatekeeper of the budget. You always verify budget before placing any order.",
tools=[check_budget, create_purchase_order],
llm=llm,
verbose=True,
allow_delegation=False,
memory=False,
reasoning=False,
)
Khái niệm chính
- Trình trang trí
@tool: CrewAI dùng trình trang trí này để biến các hàm Python thông thường thành những công cụ mà LLM có thể hiểu và gọi. Gợi ý về loại và chuỗi tài liệu của hàm được dùng để tạo giản đồ công cụ mà LLM có thể hiểu. - Vai trò, Mục tiêu và Bối cảnh: Những yếu tố này xác định tính cách của đặc vụ và hướng dẫn suy luận LLM. Thông tin cơ bản không chỉ là văn bản bổ sung – "Bạn luôn tìm kiếm trong danh mục" khuyến khích trợ lý sử dụng các công cụ của mình thay vì đưa ra câu trả lời ảo.
reasoning=False: Tắt tính năng suy luận mở rộng để tác nhân tuân theo vòng lặp gọi công cụ tiêu chuẩn thay vì cố gắng trả lời trực tiếp.allow_delegation=False: Giúp mỗi nhân viên hỗ trợ tập trung vào các công cụ được chỉ định thay vì chuyển công việc cho nhân viên hỗ trợ khác.
Tại sao cần có hai tác nhân thay vì một? Mỗi nhân viên hỗ trợ có các công cụ và công việc khác nhau. Chuyên viên tìm nguồn cung ứng chỉ tìm kiếm sản phẩm; Nhân viên mua sắm chỉ xử lý ngân sách và đơn đặt hàng. Việc phân tách các mối lo ngại này có nghĩa là mỗi tác nhân đều có một câu lệnh tập trung và một bộ công cụ nhỏ, phù hợp – điều này dẫn đến hành vi LLM đáng tin cậy hơn so với một tác nhân duy nhất phải xử lý mọi thứ.
4. Xác định các nhiệm vụ và nhóm
Giờ hãy xác định những việc mà các tác nhân này cần làm bằng cách tạo Tasks (Tác vụ) và kết nối chúng vào một Crew (Nhóm).
Thêm đoạn mã sau vào cuối tệp scale_agents.py:
# --- Step 3: Define Tasks & Crew ---
sourcing_task = Task(
description="Use the search_products tool to find products matching: '{item_description}'. Return the product_id and price of the best match from the tool results.",
expected_output="The product_id and price of the best matching product from the search_products tool.",
agent=sourcing_agent
)
procurement_task = Task(
description="First, use the check_budget tool to verify the total cost for {quantity} units. Then use the create_purchase_order tool with the product_id and quantity to place the order.",
expected_output="The purchase order details returned by the create_purchase_order tool.",
agent=procurement_agent,
context=[sourcing_task] # This task depends on the output of sourcing_task
)
def run_crew(item_description: str, quantity: int):
crew = Crew(
agents=[sourcing_agent, procurement_agent],
tasks=[sourcing_task, procurement_task],
process=Process.sequential, # Run tasks in order
verbose=True,
memory=False,
planning=False,
)
result = crew.kickoff(inputs={
"item_description": item_description,
"quantity": quantity
})
return result
Khái niệm chính
- Task Context (Bối cảnh của nhiệm vụ):
context=[sourcing_task]cho CrewAI biết rằng nhiệm vụ mua sắm cần có kết quả của nhiệm vụ tìm nguồn cung ứng để tiếp tục. Nhân viên mua sắm có thể xem những gì Chuyên viên tìm nguồn cung ứng đã tìm thấy trước khi quyết định đặt hàng. - Process.sequential: Các tác vụ được thực thi theo thứ tự được liệt kê. Điều này rất quan trọng vì nhiệm vụ thu mua phụ thuộc vào kết quả của nhiệm vụ tìm nguồn cung ứng – bạn không thể đặt hàng trước khi biết nên mua sản phẩm nào.
memory=False/planning=False: Tắt các tính năng lập kế hoạch và bộ nhớ tích hợp của CrewAI để quá trình thực thi đơn giản và dễ đoán trong bản minh hoạ này.
5. Tạo LangGraph Planner
Nhóm thực hiện sẽ xử lý "cách thức" – tìm kiếm sản phẩm, kiểm tra ngân sách, đặt hàng. Nhưng ai quyết định "cái gì"? Đó là Tác nhân lập kế hoạch, được tạo bằng LangGraph.
LangGraph mô hình hoá quy trình làm việc dưới dạng một máy trạng thái – một biểu đồ gồm các nút (hàm) được kết nối bằng các cạnh (quá trình chuyển đổi). Trạng thái truyền qua biểu đồ, với mỗi nút đọc và ghi vào trạng thái dùng chung. Đây là một lựa chọn phù hợp cho các quy trình lập kế hoạch mà bạn cần có luồng kiểm soát rõ ràng và xác định được: phân tích yêu cầu, uỷ quyền cho nhóm, tạo báo cáo.
Thêm đoạn mã sau vào cuối tệp scale_agents.py:
# --- Step 4: Define LangGraph Planner ---
class PlanState(TypedDict):
objective: str
item_description: Optional[str]
quantity_needed: Optional[int]
execution_result: Optional[str]
final_report: Optional[str]
def analyze_alert(state: PlanState) -> PlanState:
"""Node 1: Extract intent from the raw objective string."""
print("--- ANALYZING ALERT ---")
# In a production app, you would use an LLM here to extract details.
# For simplicity, we simulate extraction here.
objective = state["objective"]
# Hardcoded extraction for the demo
if "Pixel 7" in objective:
return {
"item_description": "Pixel 7",
"quantity_needed": 1,
}
return {
"item_description": "unknown",
"quantity_needed": 0,
}
def delegate_to_executor(state: PlanState) -> PlanState:
"""Node 2: Call the CrewAI Execution Crew."""
print("--- DELEGATING TO CREW ---")
if state["item_description"] == "unknown":
return {"execution_result": "Failed: Unknown item"}
result = run_crew(
item_description=state["item_description"],
quantity=state["quantity_needed"]
)
return {"execution_result": str(result)}
def generate_report(state: PlanState) -> PlanState:
"""Node 3: Synthesize the final outcome."""
print("--- GENERATING REPORT ---")
return {
"final_report": f"Objective handled: {state['objective']}. Result: {state['execution_result']}"
}
# Build the graph
workflow = StateGraph(PlanState)
workflow.add_node("analyze_alert", analyze_alert)
workflow.add_node("delegate", delegate_to_executor)
workflow.add_node("generate_report", generate_report)
workflow.set_entry_point("analyze_alert")
workflow.add_edge("analyze_alert", "delegate")
workflow.add_edge("delegate", "generate_report")
workflow.add_edge("generate_report", END)
app = workflow.compile()
# --- Main Execution ---
if __name__ == "__main__":
print("Starting Multi-Agent System...")
initial_state = {
"objective": "Restock 1 Pixel 7 phones for the Tokyo office"
}
final_state = app.invoke(initial_state)
print("\n=== FINAL REPORT ===")
print(final_state["final_report"])
Khái niệm chính
- StateGraph: Xác định máy trạng thái.
PlanStatelà trạng thái được nhập tích luỹ khi mỗi nút xử lý yêu cầu. - Nút: Các hàm lấy trạng thái hiện tại và trả về nội dung cập nhật cho trạng thái đó. Mỗi nút có một trách nhiệm duy nhất –
analyze_alerttrích xuất ý định,delegate_to_executorchạy nhóm,generate_reporttóm tắt kết quả. - Cạnh: Xác định luồng giữa các nút. Trong lớp học lập trình này, chúng ta sẽ sử dụng một quy trình tuyến tính đơn giản (
analyze → delegate → report). Hội thảo đầy đủ sẽ mở rộng quy trình này bằng định tuyến có điều kiện – ví dụ: định tuyến các yêu cầu phá huỷ đến một đường dẫn bảo mật thay vì trình thực thi.
Tại sao nên dùng LangGraph cho công cụ lập kế hoạch? CrewAI rất phù hợp với các tác nhân gọi công cụ, nhưng trình lập kế hoạch cần có luồng kiểm soát xác định – "nếu có tính huỷ diệt, hãy chuyển sang đường dẫn bảo mật; nếu không, hãy uỷ quyền". Mô hình máy trạng thái của LangGraph giúp việc định tuyến này trở nên rõ ràng và có thể kiểm thử, trong khi CrewAI xử lý việc thực thi công cụ dạng tự do bên dưới.
6. Chạy Planner và Crew
Bây giờ, hãy cùng nhau thử nghiệm trình lập kế hoạch LangGraph và nhóm CrewAI.
Trong cửa sổ dòng lệnh Cloud Shell, hãy chạy tập lệnh:
uv run python scale_agents.py
Bạn sẽ thấy kết quả cho biết các bước đang được thực hiện:
- Phân tích cảnh báo: Nút LangGraph đang chạy.
- Uỷ quyền cho Crew: Nút LangGraph gọi nhóm CrewAI.
- Thực thi CrewAI: Bạn sẽ thấy Chuyên viên tìm nguồn cung ứng tìm kiếm sản phẩm và Nhân viên mua sắm kiểm tra ngân sách cũng như tạo đơn đặt hàng.
- Báo cáo cuối cùng: Kết quả tóm tắt sẽ được in ở cuối.
Ví dụ về kết quả đầu ra (rút gọn):
Starting Multi-Agent System...
--- ANALYZING ALERT ---
--- DELEGATING TO CREW ---
Agent: Sourcing Specialist
Tool: search_products Args: {'query': 'Pixel 7'}
Tool Completed Output: [{'product_id': 'pixel-7', 'name': 'Pixel 7 Phone', 'price': 50.0}]
Agent: Procurement Officer
Tool: check_budget Args: {'amount': 50}
Tool: create_purchase_order Args: {'product_id': 'pixel-7', 'quantity': 1}
Tool Completed Output: {'status': 'SUCCESS', 'po_id': 'PO-pixel-7-1', ...}
--- GENERATING REPORT ---
=== FINAL REPORT ===
Objective handled: Restock 1 Pixel 7 phones for the Tokyo office. Result: ...PO-pixel-7-1...SUCCESS...
Lưu ý: Bạn có thể thấy các thông báo [CrewAIEventsBus] Warning: Event pairing mismatch trong kết quả. Đây là những cảnh báo mang tính thẩm mỹ từ tính năng theo dõi sự kiện nội bộ của CrewAI và bạn có thể yên tâm bỏ qua.
Lưu ý: CrewAI có thể hiển thị thông báo về việc tính năng theo dõi bị vô hiệu hoá. Đây là thông tin và bạn có thể yên tâm bỏ qua.
Lưu ý: OMS mô phỏng có hạn mức ngân sách là 100 USD. Đảm bảo số lượng nhỏ (dưới khoảng 2 đơn vị) để quy trình diễn ra suôn sẻ. Ví dụ: 1 điện thoại Pixel 7 có giá 50 đô la sẽ vượt qua bước kiểm tra ngân sách, nhưng 3 chiếc có giá 150 đô la sẽ bị từ chối vì "Vượt quá ngân sách".
7. Bọc Planner trong một Máy chủ A2A
Trình lập kế hoạch LangGraph hoạt động, nhưng bị mắc kẹt trong một quy trình Python. Để các tác nhân khác có thể gọi được (có thể được viết bằng các khung khác nhau hoặc chạy trên các máy khác nhau), chúng ta sẽ bao bọc tác nhân này trong một máy chủ A2A (Tác nhân với tác nhân).
A2A là một giao thức dựa trên JSON-RPC 2.0, giúp chuẩn hoá cách các tác nhân giao tiếp. Các khái niệm chính:
Khái niệm | Mục đích |
Thẻ tác nhân | Siêu dữ liệu JSON mô tả các chức năng của tác nhân (được phân phát tại |
| Phương thức JSON-RPC để gửi một tác vụ đến tác nhân |
Việc cần làm | Một đơn vị công việc có trạng thái (đã gửi → đang hoạt động → đã hoàn tất/thất bại) |
Hiện vật | Đầu ra trung gian và đầu ra cuối cùng được đính kèm vào một tác vụ |
Tạo tệp mới a2a_planner.py:
import asyncio
import os
import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import (AgentCapabilities, AgentCard, AgentSkill,
InternalError, Part, TextPart)
from a2a.utils import new_task
from a2a.utils.errors import ServerError
# Import the LangGraph planner from Step 4
from scale_agents import app as planner_app
class PlannerAgentExecutor(AgentExecutor):
"""Wraps the LangGraph planner as an A2A service."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
async def execute(self, context: RequestContext, event_queue: EventQueue):
objective = context.get_user_input()
# Initialize A2A task tracking
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
try:
# Run the LangGraph planner synchronously in a thread
initial_state = {"objective": objective}
result = await asyncio.to_thread(planner_app.invoke, initial_state)
final_report = result.get("final_report", "No report generated.")
except Exception as e:
final_report = f"Execution failed: {e}"
# Send the result back as an artifact
await updater.add_artifact(
[Part(root=TextPart(text=final_report))],
name="orchestration_report"
)
await updater.complete()
async def cancel(self, context, event_queue):
raise ServerError(error=InternalError(message="Cancel not supported"))
# Define the Agent Card — this is what other agents see
port = int(os.environ.get("PORT", 8080))
agent_card = AgentCard(
name="Retail-Planner-A2A",
description="LangGraph planner that delegates logistics tasks to a CrewAI crew.",
url=f"http://localhost:{port}/",
version="1.0.0",
default_input_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
default_output_modes=PlannerAgentExecutor.SUPPORTED_CONTENT_TYPES,
capabilities=AgentCapabilities(streaming=False),
skills=[
AgentSkill(
id="plan_logistics",
name="Plan Logistics",
description="Analyzes inventory alerts and orchestrates procurement.",
tags=["logistics", "planning"],
examples=["Restock 1 Pixel 7 phones for the Tokyo office"],
)
],
)
if __name__ == "__main__":
executor = PlannerAgentExecutor()
handler = DefaultRequestHandler(
agent_executor=executor, task_store=InMemoryTaskStore()
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=handler
)
print(f"Starting A2A Planner Server on port {port}...")
uvicorn.run(server.build(), host="0.0.0.0", port=port)
Khái niệm chính
- Thẻ đại lý: Được phân phát tại
/.well-known/agent-card.json– mọi đại lý đều có thể khám phá chức năng của máy chủ này bằng cách tìm nạp URL đó. Tệp này liệt kê các kỹ năng, loại nội dung được hỗ trợ và khả năng của trợ lý ảo. AgentExecutor.execute(): Phương thức duy nhất mà bạn triển khai. Nó nhận yêu cầu đến, chạy logic của tác nhân (ở đây là trình lập kế hoạch LangGraph) và gửi kết quả trở lại dưới dạng các cấu phần phần mềm.TaskUpdater: Quản lý vòng đời của tác vụ –add_artifact()gửi đầu ra trung gian/cuối cùng,complete()đánh dấu tác vụ là hoàn tất. Thư viện A2A xử lý tất cả các thành phần JSON-RPC.
Kiểm thử máy chủ A2A bằng cách khởi động máy chủ này trong một thiết bị đầu cuối:
uv run python a2a_planner.py
Mở một thẻ Cloud Shell khác (nhấp vào + bên cạnh thẻ hiện tại) và xác minh rằng Thẻ đại lý được phân phát:
cd ~/scale-agents
curl http://localhost:8080/.well-known/agent-card.json | python3 -m json.tool
Bạn sẽ thấy JSON của thẻ tác nhân. Giữ cho máy chủ A2A chạy trong thiết bị đầu cuối đầu tiên cho bước tiếp theo.
8. Xây dựng Phòng điều khiển ADK
Đầu ngăn xếp là Phòng điều khiển, được xây dựng bằng ADK (Bộ công cụ phát triển tác nhân của Google). Nó nhận yêu cầu của người dùng, uỷ quyền cho trình lập kế hoạch thông qua A2A, đánh giá kết quả và – quan trọng nhất – xử lý lập kế hoạch lại khi thất bại (CUJ 2).
ADK cung cấp các nguyên hàm của tác nhân như BaseAgent, LlmAgent và InMemoryRunner. Chúng tôi phân lớp BaseAgent để viết logic điều phối tuỳ chỉnh – lệnh gọi A2A, phân loại báo cáo và lập kế hoạch lại linh hoạt bằng một tác nhân phụ LlmAgent.
Tạo tệp mới control_room.py:
import asyncio
import uuid
import os
import httpx
from google.adk.agents import BaseAgent, LlmAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.genai import types
from typing import AsyncGenerator
A2A_SERVER_URL = os.environ.get("PLANNER_AGENT_URL", "http://127.0.0.1:8080")
def _classify_report(report: str) -> tuple[bool, bool]:
"""Return (is_success, should_retry) for a planner report."""
normalized = (report or "").replace("*", "").strip().lower()
success_markers = [
"status: success", "'status': 'success'",
"outcome: success", "po_id", "successfully ordered",
]
retryable_markers = ["not found", "discontinued", "no inventory",
"unknown item"]
terminal_markers = [
"status: failed", "over budget", "not issued",
"procurement failed",
]
if any(m in normalized for m in terminal_markers):
return False, False # Failed, don't retry
if any(m in normalized for m in retryable_markers):
return False, True # Failed, but retryable
if any(m in normalized for m in success_markers):
return True, False # Success!
return False, False # Unknown → treat as failure
class ControlRoomAgent(BaseAgent):
"""Top-level orchestrator: delegates via A2A, re-plans on failure."""
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
# Extract user input from session events
user_msg = ""
if ctx.session and ctx.session.events:
for evt in reversed(ctx.session.events):
if evt.content and evt.content.role == "user":
user_msg = evt.content.parts[0].text
break
max_attempts = 2
current_objective = user_msg
final_report = "No report returned."
for attempt in range(1, max_attempts + 1):
print(f"\n--- Attempt {attempt}: Calling A2A Planner ---")
print(f" Objective: {current_objective}")
# Build the A2A JSON-RPC request
payload = {
"jsonrpc": "2.0",
"id": f"req-{attempt}",
"method": "message/send",
"params": {
"message": {
"message_id": str(uuid.uuid4()),
"parts": [{"text": current_objective}],
"role": "user"
}
}
}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{A2A_SERVER_URL}/", json=payload
)
data = resp.json()
if "error" in data:
final_report = data["error"].get("message", "Unknown A2A error")
elif "result" in data:
artifacts = data["result"].get("artifacts", [])
if artifacts and "parts" in artifacts[-1]:
parts = artifacts[-1]["parts"]
if parts and "text" in parts[0]:
final_report = parts[0]["text"]
except Exception as e:
final_report = f"Connection error: {e}"
print(f"\n--- Report ---\n{final_report}\n")
is_success, should_retry = _classify_report(final_report)
if is_success:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=final_report)]
),
)
return
# --- Re-planning (CUJ 2) ---
if should_retry and attempt < max_attempts:
print("--- Re-planning with LLM ---")
replanner = LlmAgent(
name=f"replanner_{attempt}",
model="gemini-2.5-flash",
instruction=(
"You are a strategic re-planner. The previous request "
"failed. Rewrite the objective to be broader or more "
"likely to succeed. Output ONLY the new objective text."
),
)
# Run the re-planner as a sub-agent
child_ctx = InvocationContext(
invocation_id=f"{ctx.invocation_id}_replan_{attempt}",
agent=replanner,
session=ctx.session,
session_service=ctx.session_service,
run_config=ctx.run_config or RunConfig(),
)
child_ctx.user_content = types.Content(
role="user",
parts=[types.Part.from_text(text=(
f"Original Objective: {current_objective}\n"
f"Failure Reason: {final_report}\n"
"Please broaden the search."
))]
)
new_objective = ""
async for event in replanner.run_async(child_ctx):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
new_objective += part.text
current_objective = new_objective.strip()
print(f"New objective: {current_objective}")
continue
# Terminal failure (not retryable)
if not should_retry:
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(text=f"FAILED: {final_report}")]
),
)
return
# Max attempts exhausted
yield Event(
author=self.name,
invocation_id=ctx.invocation_id,
content=types.Content(
role="model",
parts=[types.Part.from_text(
text=f"FAILED after {max_attempts} attempts: {final_report}"
)]
),
)
async def main():
prompt = "Restock 1 Pixel 7 phones for the Tokyo office"
print(f"Starting Control Room with: {prompt}\n")
agent = ControlRoomAgent(name="control_room")
runner = InMemoryRunner(app_name="control_room", agent=agent)
session = await runner.session_service.create_session(
app_name="control_room", user_id="admin"
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=prompt)]
)
async for event in runner.run_async(
user_id="admin", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "text") and part.text:
print(f"Result: {part.text}")
print("\n=== CONTROL ROOM COMPLETE ===")
if __name__ == "__main__":
asyncio.run(main())
Khái niệm chính
BaseAgent: Nguyên tắc cơ bản của ADK cho các tác nhân tuỳ chỉnh. Bạn phân lớp và ghi đè_run_async_implđể viết logic điều phối không đồng bộ tuỳ ý – ở đây là lệnh gọi A2A + phân loại + vòng lặp lập kế hoạch lại.- Lệnh gọi A2A JSON-RPC: Phòng điều khiển gửi một yêu cầu
message/sendtiêu chuẩn đến máy chủ A2A của trình lập kế hoạch bằng cách sử dụnghttpxvà phân tích cú pháp phản hồi JSON-RPC để trích xuất báo cáo cuối cùng. _classify_report(): Phân loại đơn giản dựa trên từ khoá, xác định thành công, lỗi có thể thử lại hoặc lỗi không thể khắc phục từ văn bản báo cáo. Điều này thúc đẩy vòng lặp lập kế hoạch lại.- Lời gọi tác nhân phụ: Để lập kế hoạch lại, Phòng điều khiển sẽ tạo một
LlmAgentvà chạy bằng cách tạo mộtInvocationContextcon và gọireplanner.run_async(child_ctx). Điều này cho phép bạn tạo các tác nhân LLM một cách linh hoạt trong logic điều phối tuỳ chỉnh. InMemoryRunner: Chạy tác nhân cục bộ với một kho lưu trữ phiên trong bộ nhớ. Trong quá trình sản xuất, bạn sẽ dùngadk deployđể triển khai cho Vertex AI Agent Engine.
9. Chạy Full Stack
Bây giờ, hãy kiểm thử hệ thống hoàn chỉnh gồm 3 lớp: Phòng điều khiển ADK → A2A → Công cụ lập kế hoạch LangGraph → Nhóm CrewAI.
Sử dụng thẻ Cloud Shell thứ hai mà bạn đã mở trước đó (hoặc nhấp vào + để mở một thẻ mới) rồi chạy Phòng điều khiển. Quan trọng: Mỗi thẻ Cloud Shell có một phiên shell riêng. Bạn phải thiết lập lại dự án và các biến môi trường:
cd ~/scale-agents
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export GOOGLE_CLOUD_LOCATION=us-central1
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
uv run python control_room.py
Bạn sẽ thấy toàn bộ quy trình điều phối:
- Phòng điều khiển nhận yêu cầu và gửi lệnh gọi
message/sendJSON-RPC đến máy chủ A2A - Máy chủ A2A nhận yêu cầu và gọi trình lập kế hoạch LangGraph
- Công cụ lập kế hoạch LangGraph phân tích yêu cầu và uỷ quyền cho nhóm CrewAI
- Nhóm CrewAI điều hành các tác nhân Tìm nguồn cung ứng và Mua sắm
- Kết quả sẽ được chuyển ngược lại cho Phòng điều khiển
Hành trình trọng yếu của người dùng (CUJ)
Hãy thử sửa đổi chuỗi prompt trong control_room.py để thử nghiệm các trường hợp sau:
CUJ (Hành trình trọng yếu của người dùng) | Câu lệnh | Điều gì sẽ xảy ra |
1. Hành trình suôn sẻ |
| Tìm kiếm -> kiểm tra ngân sách -> đơn đặt hàng (THÀNH CÔNG). Hoạt động từ đầu đến cuối. |
2. Lập kế hoạch lại |
| Trình lập kế hoạch trả về "Không thành công: Mục không xác định". Control Room sẽ phát hiện điều này và gọi một |
Để kiểm thử CUJ 2 (Lập kế hoạch lại), hãy thay đổi prompt trong control_room.py thành:
prompt = "Order 1 unit of the discontinued XR-7000 Quantum Holographic Display"
Trình lập kế hoạch được mã hoá cứng sẽ không nhận dạng được mục này và sẽ trả về "Thất bại: Mục không xác định". Control Room sẽ phát hiện lỗi, tự động tạo một bộ lập kế hoạch lại LlmAgent và thử lại với mục tiêu rộng hơn. Vì trình lập kế hoạch chỉ nhận dạng "Pixel 7", nên lần thử lại cũng sẽ thất bại – nhưng bạn sẽ thấy toàn bộ vòng lặp lập kế hoạch lại đang hoạt động. Kết quả cuối cùng sẽ là FAILED after 2 attempts: ....
10. Dọn dẹp
Để tránh các khoản phí liên tục cho tài khoản Google Cloud của mình, bạn có thể xoá các tài nguyên đã tạo trong lớp học lập trình này. Bạn chỉ cần xoá thư mục mà mình đã tạo:
cd ..
rm -rf scale-agents
11. Xin chúc mừng
Xin chúc mừng! Bạn đã xây dựng thành công một hệ thống điều phối nhiều tác nhân bằng CrewAI, LangGraph, Giao thức A2A và ADK.
Kiến thức bạn học được
- Cách xác định các công cụ cho tác nhân bằng trình trang trí
@toolcủa CrewAI. - Cách tạo các tác nhân chuyên biệt có vai trò, công cụ và mục tiêu riêng biệt.
- Cách kết nối các tác nhân vào một nhóm tuần tự có các phần phụ thuộc của nhiệm vụ.
- Cách tạo một trình lập kế hoạch máy trạng thái bằng LangGraph để uỷ quyền cho nhóm.
- Cách hiển thị trình lập kế hoạch dưới dạng dịch vụ A2A bằng
AgentCardvàAgentExecutor. - Cách tạo
BaseAgentADK tuỳ chỉnh uỷ quyền thông qua A2A và lập kế hoạch lại khi gặp lỗi bằng cách gọi một tác nhân phụLlmAgent. - Lý do việc tách biệt lập kế hoạch, thực thi và điều phối trên các khung hình mang lại cho bạn tính mô-đun và khả năng phục hồi.
Vươn xa hơn
Hội thảo đầy đủ sẽ mở rộng hệ thống này bằng:
- Trang tổng quan theo thời gian thực – Truyền phát trực tiếp SSE để trực quan hoá tiến trình của nhiều tác nhân
- Identity Shield – Bảo mật dựa trên IAM giúp chặn các hành động phá hoại ở cấp độ cơ sở hạ tầng, chứ không phải cấp độ lời nhắc
- Vertex AI Agent Engine – triển khai tác nhân ADK vào cơ sở hạ tầng đám mây được quản lý bằng
adk deploy