1. Giới thiệu
Trong lớp học lập trình này, bạn sẽ xây dựng một hệ thống nhiều tác nhân, trong đó 3 tác nhân AI chuyên biệt cộng tác để lên kế hoạch cho một cuộc chạy marathon trong thành phố. Một tác nhân Công cụ lập kế hoạch chạy marathon điều phối thiết kế, một tác nhân Trình đánh giá chấm điểm chất lượng kế hoạch bằng Vertex AI Evaluation và một tác nhân Bộ điều khiển mô phỏng xác thực trạng thái sẵn sàng – tất cả đều giao tiếp thông qua giao thức Agent-to-Agent (A2A).
Bạn sẽ thực hiện
- Tạo tác nhân AI bằng Bộ công cụ phát triển tác nhân (ADK) bằng các mô hình Gemini trên Vertex AI
- Xác định đầu ra có cấu trúc bằng giản đồ Pydantic để có câu trả lời đáng tin cậy của tác nhân
- Xây dựng các chỉ số Vertex AI Evaluation tuỳ chỉnh bằng
MetricPromptBuilder(LLM-as-Judge) - Kết nối các tác nhân dưới dạng tác nhân phụ bằng
AgentToolvà kết nối các tác nhân từ xa thông qua giao thức A2A - Sử dụng Kỹ năng ADK để có kiến thức về quy trình và Ngân hàng bộ nhớ để học tập giữa các phiên
- Phân phát các tác nhân cục bộ bằng máy chủ A2A và kiểm thử khả năng cộng tác của nhiều tác nhân
Bạn cần có
- Một trình duyệt web như Chrome
- Một dự án trên Google Cloud đã bật tính năng thanh toán
Lớp học lập trình này dành cho các nhà phát triển trung cấp đã quen thuộc với Python và các khái niệm về AI.
Thời gian hoàn thành ước tính: 90 phút.
Các tài nguyên được tạo trong lớp học lập trình này sẽ có chi phí dưới 5 USD.
2. Trước khi bắt đầu
Tạo dự án trên Google Cloud
- Trong Bảng điều khiển Google Cloud, chọn hoặc tạo một dự án trên Google Cloud.
- Đảm bảo rằng bạn đã bật tính năng thanh toán cho dự án trên Cloud.
Bắt đầu Cloud Shell
- Nhấp vào Kích hoạt Cloud Shell ở đầu bảng điều khiển Cloud.
- Xác minh quy trình xác thực:
gcloud auth list
- Xác nhận dự án của bạn:
gcloud config get project
- Đặt dự án nếu cần:
export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID
Bật API
gcloud services enable aiplatform.googleapis.com
Cài đặt trình quản lý gói Python
Cài đặt uv, một trình quản lý gói Python nhanh:
curl -LsSf https://astral.sh/uv/install.sh | sh
source $HOME/.local/bin/env
3. Thiết lập dự án
Ở bước này, bạn sẽ tạo cấu trúc dự án, cài đặt các phần phụ thuộc và định cấu hình các dịch vụ dùng chung mà tất cả các tác nhân sử dụng.
Tạo thư mục dự án
mkdir -p marathon-agents && cd marathon-agents
Tạo cấu hình dự án
Tạo pyproject.toml:
[project]
name = "marathon-agents"
version = "0.1.0"
description = "Multi-agent marathon planning system with ADK, A2A, and Vertex AI Evaluation"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"google-cloud-aiplatform[agent_engines,adk,evaluation]>=1.121.0",
"google-adk>=1.25.0",
"a2a-sdk>=0.3.9",
"pydantic>=2.12.0",
"python-dotenv>=1.0.0",
"httpx>=0.27.0",
"uvicorn>=0.30.0",
"google-auth>=2.0.0",
"pandas>=2.0.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
Cài đặt các phần phụ thuộc
uv sync
Định cấu hình các biến môi trường
Tạo tệp .env:
cat > .env << 'EOF'
GOOGLE_CLOUD_PROJECT=<YOUR_PROJECT_ID>
GOOGLE_CLOUD_LOCATION=us-central1
GOOGLE_GENAI_USE_VERTEXAI=true
EOF
Tạo cây thư mục nguồn
Dự án tuân theo quy ước nhiều tác nhân của ADK – mỗi tác nhân là một gói độc lập trong src/:
mkdir -p src/planner_agent/{agent,evaluator/{services},skills/{route-planning/references,plan-evaluation/references},services,runtime}
mkdir -p src/simulator_agent/{agent,skills/review-marathon-plan/references,services,runtime}
Khởi chạy các gói Python
touch src/__init__.py
touch src/planner_agent/__init__.py src/planner_agent/agent/__init__.py
touch src/planner_agent/evaluator/__init__.py src/planner_agent/evaluator/services/__init__.py
touch src/planner_agent/services/__init__.py src/planner_agent/runtime/__init__.py
touch src/simulator_agent/__init__.py src/simulator_agent/agent/__init__.py
touch src/simulator_agent/services/__init__.py src/simulator_agent/runtime/__init__.py
Tạo cấu hình dùng chung
Tạo src/config.py – cấu hình GCP dùng chung cho tất cả các tác nhân:
"""Shared configuration for the multi-agent system."""
import os
from dotenv import load_dotenv
load_dotenv()
# GCP Configuration
GOOGLE_CLOUD_PROJECT = os.environ.get("GOOGLE_CLOUD_PROJECT")
GOOGLE_CLOUD_LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
BUCKET_URI = os.environ.get("BUCKET_URI")
def validate_config():
"""Validate required configuration is set."""
required = {
"GOOGLE_CLOUD_PROJECT": GOOGLE_CLOUD_PROJECT,
}
missing = [k for k, v in required.items() if not v]
if missing:
raise ValueError(
f"Missing required environment variables: {', '.join(missing)}. "
"Check your .env file."
)
return True
Tạo Trình quản lý bộ nhớ
Mỗi tác nhân sử dụng một Ngân hàng bộ nhớ để học tập giữa các phiên. Tạo src/planner_agent/services/memory_manager.py:
"""Memory Manager for Marathon Planner Agent.
Manages Memory Bank integration with custom topics.
Enables cross-session learning.
"""
import os
from typing import TYPE_CHECKING
from google.adk.memory import VertexAiMemoryBankService
from vertexai._genai.types import (
MemoryBankCustomizationConfig,
MemoryBankCustomizationConfigMemoryTopic as MemoryTopic,
MemoryBankCustomizationConfigMemoryTopicCustomMemoryTopic as CustomMemoryTopic,
)
if TYPE_CHECKING:
from google.adk.agents.callback_context import CallbackContext
# Custom memory topics for cross-session learning
PLANNING_HISTORY = MemoryTopic(
custom_memory_topic=CustomMemoryTopic(
label="planning_history",
description="""Track marathon plans generated across sessions.
Extract: City, date, key route decisions, final score, iteration count.
Format: "Plan: city={city}, date={date}, score={score}, iterations={n}"
""",
)
)
USER_PREFERENCES = MemoryTopic(
custom_memory_topic=CustomMemoryTopic(
label="user_preferences",
description="""Track user preferences and constraints across sessions.
Extract: Preferred themes, budget priorities, scale, city preferences.
Format: "Preference: type={type}, value={value}"
""",
)
)
ROUTE_PATTERNS = MemoryTopic(
custom_memory_topic=CustomMemoryTopic(
label="route_patterns",
description="""Track successful route patterns and lessons learned.
Extract: Route segments with positive feedback, landmark combinations.
Format: "Route: city={city}, pattern={pattern}, outcome={outcome}"
""",
)
)
LOGISTICS_INSIGHTS = MemoryTopic(
custom_memory_topic=CustomMemoryTopic(
label="logistics_insights",
description="""Track logistics decisions and capacity learnings.
Extract: Water station placement, venue capacity, crowd management.
Format: "Logistics: category={cat}, insight={insight}"
""",
)
)
def create_marathon_planner_memory_topics() -> MemoryBankCustomizationConfig:
"""Create Memory Bank customization config with custom topics."""
return MemoryBankCustomizationConfig(
memory_topics=[
PLANNING_HISTORY, USER_PREFERENCES,
ROUTE_PATTERNS, LOGISTICS_INSIGHTS,
]
)
def create_memory_service(
project: str | None = None,
location: str | None = None,
agent_engine_id: str | None = None,
) -> VertexAiMemoryBankService | None:
"""Create a VertexAiMemoryBankService.
Returns None if agent_engine_id is not set (local development).
"""
project = project or os.environ.get("GOOGLE_CLOUD_PROJECT")
location = location or (
os.environ.get("AGENT_ENGINE_LOCATION")
or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
)
agent_engine_id = agent_engine_id or os.environ.get("AGENT_ENGINE_ID")
if not project:
raise ValueError("GOOGLE_CLOUD_PROJECT environment variable required")
if not agent_engine_id:
return None
return VertexAiMemoryBankService(
project=project, location=location,
agent_engine_id=agent_engine_id,
)
async def auto_save_memories(callback_context: "CallbackContext") -> None:
"""Automatically save session to Memory Bank after agent responds."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = (
os.environ.get("AGENT_ENGINE_LOCATION")
or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
)
agent_engine_id = os.environ.get("AGENT_ENGINE_ID")
if not agent_engine_id:
return
try:
memory_service = VertexAiMemoryBankService(
project=project_id, location=location,
agent_engine_id=agent_engine_id,
)
await memory_service.add_session_to_memory(
callback_context._invocation_context.session
)
except Exception as e:
print(f"Warning: Failed to save memories: {e}")
Lệnh gọi lại auto_save_memories chạy sau mỗi phản hồi của tác nhân, duy trì cuộc trò chuyện vào Ngân hàng bộ nhớ khi được triển khai vào Agent Engine. Trong quá trình phát triển cục bộ, lệnh gọi lại này sẽ bỏ qua một cách nhẹ nhàng nếu AGENT_ENGINE_ID không được đặt.
Tạo Trình quản lý phiên
Tạo src/planner_agent/services/session_manager.py – quản lý vòng đời phiên bằng tính năng lưu vào bộ nhớ đệm TTL:
"""Session Manager — manages session lifecycle with TTL-based caching."""
import os
import time
from typing import Any
from google.adk.sessions import InMemorySessionService, VertexAiSessionService
class TTLCache:
"""Simple TTL cache for mapping A2A context_id to session_id."""
def __init__(self, maxsize: int = 1000, ttl: int = 3600):
self._cache: dict[str, tuple[Any, float]] = {}
self._maxsize = maxsize
self._ttl = ttl
def get(self, key: str) -> Any | None:
if key in self._cache:
value, timestamp = self._cache[key]
if time.time() - timestamp < self._ttl:
return value
else:
del self._cache[key]
return None
def set(self, key: str, value: Any) -> None:
if len(self._cache) >= self._maxsize:
self._evict_oldest()
self._cache[key] = (value, time.time())
def _evict_oldest(self) -> None:
if not self._cache:
return
sorted_keys = sorted(
self._cache.keys(), key=lambda k: self._cache[k][1]
)
for key in sorted_keys[: max(1, len(sorted_keys) // 10)]:
del self._cache[key]
def __contains__(self, key: str) -> bool:
return self.get(key) is not None
def create_session_service(
project: str | None = None,
location: str | None = None,
agent_engine_id: str | None = None,
use_vertex: bool | None = None,
) -> VertexAiSessionService | InMemorySessionService:
"""Create appropriate session service based on environment.
Returns VertexAiSessionService for production, InMemorySessionService for local dev.
"""
project = project or os.environ.get("GOOGLE_CLOUD_PROJECT")
location = location or (
os.environ.get("AGENT_ENGINE_LOCATION")
or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
)
agent_engine_id = agent_engine_id or os.environ.get("AGENT_ENGINE_ID")
if use_vertex is None:
use_vertex = agent_engine_id is not None
if use_vertex:
if not project:
raise ValueError("GOOGLE_CLOUD_PROJECT environment variable required")
return VertexAiSessionService(
project=project, location=location,
agent_engine_id=agent_engine_id,
)
else:
return InMemorySessionService()
class SessionManager:
"""Manages sessions with A2A context mapping and TTL caching."""
def __init__(self, session_service=None, cache_maxsize=1000, cache_ttl=3600):
self.session_service = session_service or create_session_service()
self.session_cache = TTLCache(maxsize=cache_maxsize, ttl=cache_ttl)
async def get_or_create_session(self, context_id, app_name, user_id):
cached = self.session_cache.get(context_id)
if cached:
return cached
session = await self.session_service.create_session(
app_name=app_name, user_id=user_id,
)
self.session_cache.set(context_id, session.id)
return session.id
Bây giờ, hãy sao chép các tệp dịch vụ này cho các tác nhân khác – chúng tuân theo cùng một mẫu khung với các chủ đề bộ nhớ dành riêng cho tác nhân:
cp src/planner_agent/services/session_manager.py src/simulator_agent/services/session_manager.py
cp src/planner_agent/services/session_manager.py src/planner_agent/evaluator/services/session_manager.py
Xác minh cấu trúc dự án
find src -type f -name "*.py" | sort
Bạn sẽ thấy cấu hình dùng chung, trình quản lý bộ nhớ, trình quản lý phiên và các tệp __init__.py trên cả 3 gói tác nhân.
4. Tạo tác nhân Trình đánh giá
Tác nhân Trình đánh giá là người đánh giá chất lượng của hệ thống. Tác nhân này chấm điểm các kế hoạch chạy marathon theo 7 tiêu chí bằng Vertex AI Evaluation với các chỉ số LLM-as-Judge tuỳ chỉnh. Đây là nơi bạn thấy được sức mạnh của việc kết hợp khả năng đánh giá của Gemini với các tiêu chí đánh giá có cấu trúc.
Xác định giản đồ đánh giá
Tạo src/planner_agent/evaluator/schemas.py – các mô hình Pydantic xác định định dạng đầu ra có cấu trúc:
"""Schemas for the Evaluator Agent.
Defines structured output formats for plan evaluation.
"""
from pydantic import BaseModel, Field
EVALUATION_CRITERIA = [
"safety_compliance",
"community_impact",
"logistics_completeness",
"financial_viability",
"participant_experience",
"intent_alignment",
"distance_compliance",
]
class EvaluationFinding(BaseModel):
"""A specific finding from plan evaluation."""
criterion: str = Field(
description=(
f"Which evaluation criterion triggered this finding. "
f"Must be one of: {', '.join(EVALUATION_CRITERIA)}"
),
)
severity: str = Field(
description="Severity of the finding: 'high', 'medium', or 'low'",
)
description: str = Field(
description="Description of the finding and why it matters",
)
class EvaluationResult(BaseModel):
"""Structured output from the Evaluator Agent."""
passed: bool = Field(
description="Whether the plan passes evaluation (overall_score >= 85)",
)
overall_score: float = Field(
ge=0.0, le=100.0,
description="Weighted average score across all criteria (0.0 to 100.0)",
)
scores: dict[str, float] = Field(
default_factory=dict,
description="Per-criterion scores: {criterion_name: score}",
)
findings: list[EvaluationFinding] = Field(
default_factory=list,
description="List of specific evaluation findings",
)
improvement_suggestions: list[str] = Field(
default_factory=list,
description="Actionable suggestions to improve the plan score",
)
iteration_number: int = Field(
default=1, ge=1,
description="Which evaluation iteration this is",
)
summary: str = Field(
default="",
description="Brief natural-language summary of the evaluation",
)
Việc sử dụng output_schema=EvaluationResult trên tác nhân đảm bảo mọi phản hồi đều khớp với cấu trúc này – không cần phân tích cú pháp.
Viết hướng dẫn đánh giá
Tạo src/planner_agent/evaluator/instruction.md:
You are the Evaluator Agent — the quality judge for marathon plans.
Your role is to evaluate marathon plans across multiple criteria and provide actionable feedback so the planning team can iteratively improve the plan. You use a multi-step "Chain of Thought" process to ensure your evaluation is thorough, fair, and constructive.
## Phase 1: Evaluation Methodology
When you receive a plan, evaluate it across the following 7 criteria. For each, you must derive a raw score (1-100) based on the specific checks below:
1. **Participant Experience (15% weight)**
- **Checks**: Route scenic quality, landmarks, surface variety, amenities.
- **Rubric**: 100: Outstanding landmarks/view; 1: Boring/unpleasant experience.
2. **Intent Alignment (10% weight)**
- **Checks**: City match, theme match (scenic/fast), scale, budget goals.
- **Rubric**: 100: Perfectly aligned; 1: Completely misaligned.
3. **Distance Compliance (5% weight)**
- **Checks**: Exactly 26.2 miles or 42.195 km.
- **Rubric**: 100: Exactly correct; 1: Wrong distance.
4. **Safety Compliance (20% weight)**
- **Checks**: Emergency vehicle access, crowd management.
- **Rubric**: 100: Fully safe, hospital access prioritized; 60: Some concerns; 1: Dangerous blockages.
5. **Logistics Completeness (20% weight)**
- **Checks**: Timing systems, course marshals, infrastructure.
- **Rubric**: 100: Comprehensive; 60: Significant gaps; 1: Critical logistics missing.
6. **Community Impact (15% weight)**
- **Checks**: Noise disruption, neighborhood equity, residential/business access.
- **Rubric**: 100: Community benefits/event integration; 1: Major negative impact.
7. **Financial Viability (15% weight)**
- **Checks**: Budget balance, revenue (sponsorships), realistic cost estimates.
- **Rubric**: 100: Strong/sustainable; 1: Major financial gaps/un-viable.
**Action**: Use the `evaluate_plan` tool to get computed scores from the **Vertex AI Evaluation API**. Use these scores as your baseline.
## Phase 2: Score Interpretation
- **Pass Threshold**: A plan passes ONLY if `overall_score >= 85` AND there are no High-Severity findings.
- **Severity Mapping**: Score < 40 → High, < 60 → Medium, < 80 → Low.
## Phase 3: Improvement Strategy
Generate actionable suggestions for any criterion scoring below 80. Provide 3-5 prioritized improvements. Be specific: "Add course marshals at miles 12, 14, and 16" instead of "Add more marshals".
Return a structured `EvaluationResult` with your assessment.
Tạo src/planner_agent/evaluator/prompts.py:
"""Prompts for the Evaluator Agent."""
import pathlib
_PROMPT_DIR = pathlib.Path(__file__).parent
INSTRUCTION = (_PROMPT_DIR / "instruction.md").read_text()
Xây dựng các công cụ đánh giá bằng Vertex AI
Đây là cốt lõi của Trình đánh giá – các chỉ số tuỳ chỉnh bằng MetricPromptBuilder. Tạo src/planner_agent/evaluator/tools.py:
"""Tools for the Evaluator Agent.
Provides tools for evaluating marathon plans using Vertex AI Evaluation
with custom metrics. Uses a hybrid approach: LLM-based evaluation with
custom rubrics, plus deterministic checks, with heuristic fallback.
"""
import asyncio
import json
import os
import re
from typing import Any
import logging
import pandas as pd
import vertexai
from google.genai import types as genai_types
from vertexai import types
from .agent import CRITERION_WEIGHTS, SEVERITY_THRESHOLDS, MODEL
from .schemas import EvaluationResult
logger = logging.getLogger(__name__)
def _get_model_resource() -> str:
"""Get the full resource path for the Vertex AI evaluation model."""
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
return (
f"projects/{project_id}/locations/{location}/publishers/google/models/{MODEL}"
if project_id else MODEL
)
# ============================================================================
# CUSTOM METRICS — each uses MetricPromptBuilder for structured rubrics
# ============================================================================
def _create_safety_compliance_metric() -> types.LLMMetric:
"""Evaluate emergency corridor access and crowd safety."""
builder = types.MetricPromptBuilder(
metric_definition=(
"Evaluate whether the proposed marathon route maintains emergency "
"vehicle access and crowd safety. Check for blocked hospitals, fire stations, "
"and major emergency corridors."
),
criteria={
"Emergency corridor access": (
"The route does not permanently block access to hospitals, "
"fire stations, or police stations without providing clear detour routes."
),
"Evacuation routes": (
"Major evacuation routes remain accessible or have documented alternatives."
),
"Emergency vehicle passage": (
"The plan includes provisions for emergency vehicle crossing "
"at regular intervals along the route."
),
},
rating_scores={
"1": "Dangerous - Major emergency corridors blocked with no detours",
"25": "Unsafe - Several emergency access points compromised",
"50": "Concerning - Some emergency access issues that need resolution",
"75": "Mostly safe - Minor emergency access concerns, easy to fix",
"100": "Fully safe - Emergency access maintained throughout",
},
)
return types.LLMMetric(
name="safety_compliance",
prompt_template=str(builder),
judge_model=_get_model_resource(),
)
def _create_community_impact_metric() -> types.LLMMetric:
"""Evaluate community disruption, inclusivity, and neighborhood equity."""
builder = types.MetricPromptBuilder(
metric_definition=(
"Evaluate the marathon plan's impact on the local community. "
"Check for noise disruption, residential access, business impact, "
"inclusivity, and equitable neighborhood treatment."
),
criteria={
"Residential disruption": (
"The plan minimizes disruption to residential areas, with "
"reasonable timing and notification plans."
),
"Business access": (
"Local businesses along the route can still operate or have "
"been accommodated with alternative access."
),
"Neighborhood equity": (
"The route and plan are accessible to diverse communities "
"and do not disproportionately burden any demographic group."
),
"Community engagement": (
"The plan includes cheer zones, community events, or other "
"ways to involve and benefit local residents."
),
},
rating_scores={
"1": "Harmful - Major negative impact on community with no mitigation",
"25": "Disruptive - Significant community issues that need resolution",
"50": "Mixed - Some community concerns that need attention",
"75": "Considerate - Minor community impacts with good mitigation",
"100": "Excellent - Community benefits from the event",
},
)
return types.LLMMetric(
name="community_impact",
prompt_template=str(builder),
judge_model=_get_model_resource(),
)
def _create_logistics_completeness_metric() -> types.LLMMetric:
"""Evaluate logistics coverage: water stations, timing, marshals."""
builder = types.MetricPromptBuilder(
metric_definition=(
"Evaluate whether the marathon plan's logistics are complete and "
"adequate for the expected scale."
),
criteria={
"Water and nutrition": "Hydration and nutrition are considered for the expected participant count.",
"Timing and tracking": "Timing systems (chip timing, mats) are specified.",
"Course management": "Sufficient course marshals, signage, barriers, and traffic control are planned.",
"Start/finish infrastructure": "Start/finish areas have adequate capacity for the participant count.",
},
rating_scores={
"1": "Incomplete - Critical logistics missing",
"25": "Sparse - Major logistics gaps",
"50": "Partial - Some logistics covered but significant gaps remain",
"75": "Mostly complete - Minor logistics details to finalize",
"100": "Comprehensive - All logistics thoroughly planned",
},
)
return types.LLMMetric(
name="logistics_completeness",
prompt_template=str(builder),
judge_model=_get_model_resource(),
)
def _create_financial_viability_metric() -> types.LLMMetric:
"""Evaluate budget balance, revenue sources, and cost control."""
builder = types.MetricPromptBuilder(
metric_definition=(
"Evaluate the financial viability of the marathon plan. "
"Check budget balance, revenue sources, cost estimates, and sustainability."
),
criteria={
"Budget balance": "Revenue projections meet or exceed estimated costs.",
"Cost estimates": "Cost estimates are realistic and cover major expense categories.",
"Revenue diversity": "Revenue comes from multiple sources, not just registration fees.",
},
rating_scores={
"1": "Unviable - Major financial gaps",
"25": "Risky - Budget concerns that could threaten viability",
"50": "Uncertain - Financial plan needs more detail",
"75": "Viable - Sound financial plan with minor gaps",
"100": "Strong - Well-balanced budget with diverse revenue streams",
},
)
return types.LLMMetric(
name="financial_viability",
prompt_template=str(builder),
judge_model=_get_model_resource(),
)
def _create_participant_experience_metric() -> types.LLMMetric:
"""Evaluate route quality, scenic value, and runner amenities."""
builder = types.MetricPromptBuilder(
metric_definition=(
"Evaluate the participant experience of a proposed marathon plan."
),
criteria={
"Route Quality": "The route surface is suitable and elevation profile appropriate.",
"Scenic Value": "The route passes through interesting or landmark areas.",
"Runner Amenities": "Post-race amenities (food, medals, recovery area) are planned.",
},
rating_scores={
"1": "Poor experience - Boring route, no amenities",
"50": "Average - Adequate but unremarkable",
"100": "Excellent - Outstanding experience that runners will remember",
},
)
return types.LLMMetric(
name="participant_experience",
prompt_template=str(builder),
judge_model=_get_model_resource(),
)
def _create_intent_alignment_metric() -> types.LLMMetric:
"""Evaluate whether the plan matches the user's original request."""
builder = types.MetricPromptBuilder(
metric_definition=(
"Evaluate whether a proposed marathon plan matches the user's original intent."
),
criteria={
"City Match": "The plan takes place in the requested city.",
"Theme Match": "The plan matches the desired theme (scenic, fast, charity, etc.).",
"Scale Match": "The plan's scale matches the user's vision.",
},
rating_scores={
"1": "Completely misaligned",
"50": "Partially aligned - Key requirements missed",
"100": "Perfectly aligned - All requirements addressed",
},
)
return types.LLMMetric(
name="intent_alignment",
prompt_template=str(builder),
judge_model=_get_model_resource(),
)
def _check_distance_compliance_logic(response_text: str) -> dict:
"""Deterministic check for 26.2 mile (42.195 km) marathon distance."""
text_lower = response_text.lower()
score = 100.0
issues = []
for distance_str in re.findall(r'(\d+(?:\.\d+)?)\s*(?:miles?|mi)\b', text_lower):
distance = float(distance_str)
if 20 <= distance <= 30:
deviation = abs(distance - 26.2)
if deviation > 0.5:
score = 1.0
issues.append(f"Route distance {distance} miles deviates from 26.2 mile standard")
elif deviation > 0.1:
score = 60.0
issues.append(f"Route distance {distance} miles is close but not exactly 26.2 miles")
for distance_str in re.findall(r'(\d+(?:\.\d+)?)\s*(?:kilometers?|km)\b', text_lower):
distance = float(distance_str)
if 35 <= distance <= 50 and abs(distance - 42.195) > 1.0:
score = 1.0
issues.append(f"Route distance {distance} km deviates from 42.195 km standard")
return {"score": score, "explanation": "; ".join(issues) if issues else "No distance issues detected"}
def _create_distance_compliance_metric() -> types.Metric:
"""Deterministic check for marathon distance — no LLM needed."""
def check_distance_compliance(instance: dict) -> dict:
response_text = instance["response"]["parts"][0]["text"]
return _check_distance_compliance_logic(response_text)
return types.Metric(
name="distance_compliance",
custom_function=check_distance_compliance,
)
# ============================================================================
# MAIN EVALUATION TOOL
# ============================================================================
async def evaluate_plan(evaluation_request: str) -> dict[str, Any]:
"""Evaluate a proposed marathon plan across quality criteria.
Uses Vertex AI Evaluation with 7 custom metrics.
Falls back to heuristic evaluation if API fails.
Args:
evaluation_request: JSON string with user_intent and proposed_plan
Returns:
dict with evaluation results
"""
try:
request_data = json.loads(evaluation_request)
except json.JSONDecodeError as e:
return {
"passed": False, "scores": {}, "overall_score": 0.0,
"findings": [{"criterion": "intent_alignment",
"description": f"Invalid JSON input: {e}", "severity": "high"}],
"improvement_suggestions": ["Provide valid JSON with 'user_intent' and 'proposed_plan' fields."],
"eval_method": "error",
}
user_intent_raw = request_data.get("user_intent", "Unknown intent")
proposed_plan_raw = request_data.get("proposed_plan", "No plan provided")
user_intent = json.dumps(user_intent_raw) if not isinstance(user_intent_raw, str) else user_intent_raw
proposed_plan = json.dumps(proposed_plan_raw) if not isinstance(proposed_plan_raw, str) else proposed_plan_raw
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
if project_id:
try:
scores, details = await _run_custom_eval(
project_id=project_id, location=location,
user_intent=user_intent, proposed_plan=proposed_plan,
)
return _build_result(scores, details, eval_method="vertex_ai_eval")
except Exception as e:
logger.warning(f"Vertex AI Eval failed, using heuristics: {e}")
scores, details = _heuristic_eval(user_intent, proposed_plan)
return _build_result(scores, details, eval_method="heuristic")
async def _run_custom_eval(
project_id: str, location: str,
user_intent: str, proposed_plan: str,
) -> tuple[dict[str, float], dict[str, str]]:
"""Run Vertex AI Evaluation with custom metrics."""
vertexai.init(project=project_id, location=location)
client = vertexai.Client(
project=project_id, location=location,
http_options=genai_types.HttpOptions(api_version="v1beta1"),
)
df = pd.DataFrame({"prompt": [user_intent], "response": [proposed_plan]})
metrics = [
_create_safety_compliance_metric(),
_create_community_impact_metric(),
_create_logistics_completeness_metric(),
_create_financial_viability_metric(),
_create_participant_experience_metric(),
_create_intent_alignment_metric(),
_create_distance_compliance_metric(),
]
result = client.evals.evaluate(dataset=df, metrics=metrics)
scores, details = {}, {}
for case in result.eval_case_results:
for cand in case.response_candidate_results:
for metric_name, metric_result in cand.metric_results.items():
raw_score = metric_result.score if hasattr(metric_result, "score") and metric_result.score is not None else 50.0
scores[metric_name] = round(float(raw_score), 2)
if hasattr(metric_result, "explanation") and metric_result.explanation:
details[metric_name] = metric_result.explanation
return scores, details
def _heuristic_eval(user_intent: str, proposed_plan: str) -> tuple[dict[str, float], dict[str, str]]:
"""Heuristic evaluation fallback when Vertex AI Eval is unavailable."""
plan_lower = proposed_plan.lower()
scores, details = {}, {}
# Safety
safety_score = 80.0
if "emergency" in plan_lower and "no detour" in plan_lower:
safety_score = 20.0
if "emergency vehicle" in plan_lower:
safety_score = min(safety_score + 10.0, 100.0)
scores["safety_compliance"] = safety_score
details["safety_compliance"] = "Heuristic safety check"
# Community
community_score = 75.0
if any(kw in plan_lower for kw in ["cheer zone", "community"]):
community_score = min(community_score + 10.0, 100.0)
scores["community_impact"] = community_score
details["community_impact"] = "Heuristic community check"
# Logistics
logistics_score = 60.0
for kw in ["hydration", "timing", "chip", "marshal"]:
if kw in plan_lower:
logistics_score += 10.0
scores["logistics_completeness"] = min(logistics_score, 100.0)
details["logistics_completeness"] = "Heuristic logistics check"
# Financial
financial_score = 65.0
for kw in ["budget", "cost", "revenue", "sponsor"]:
if kw in plan_lower:
financial_score += 8.0
scores["financial_viability"] = min(financial_score, 100.0)
details["financial_viability"] = "Heuristic financial check"
# Experience
experience_score = 70.0
for kw in ["scenic", "landmark", "medal", "post-race"]:
if kw in plan_lower:
experience_score += 8.0
scores["participant_experience"] = min(experience_score, 100.0)
details["participant_experience"] = "Heuristic experience check"
# Intent
scores["intent_alignment"] = 70.0
details["intent_alignment"] = "Heuristic intent check"
# Distance
distance_result = _check_distance_compliance_logic(proposed_plan)
scores["distance_compliance"] = float(distance_result["score"])
details["distance_compliance"] = distance_result["explanation"]
return scores, details
def _build_result(scores, details, eval_method):
"""Build the final evaluation result from scores and details."""
findings, improvement_suggestions = [], []
for criterion, score in scores.items():
if score < 80.0:
severity = "high" if score < SEVERITY_THRESHOLDS["high"] else "medium" if score < SEVERITY_THRESHOLDS["medium"] else "low"
findings.append({"criterion": criterion, "description": details.get(criterion, f"Score: {score}"), "severity": severity})
improvement_suggestions.append(_suggest_improvement(criterion, score, details.get(criterion, "")))
overall_score = round(sum(scores.get(c, 50.0) * w for c, w in CRITERION_WEIGHTS.items()), 2)
passed = overall_score >= 85.0 and not any(f["severity"] == "high" for f in findings)
return {"passed": passed, "scores": scores, "findings": findings,
"improvement_suggestions": improvement_suggestions,
"overall_score": overall_score, "eval_method": eval_method}
def _suggest_improvement(criterion, score, detail):
suggestions = {
"safety_compliance": "Add emergency vehicle crossing points every 2 miles and plan detour routes around hospitals.",
"community_impact": "Include cheer zones, notify affected residents, and ensure equitable route distribution.",
"logistics_completeness": "Add timing chip details, plan course marshal positions, and detail start/finish infrastructure.",
"financial_viability": "Add budget breakdown with cost estimates and identify 3+ revenue sources.",
"participant_experience": "Highlight scenic landmarks and detail post-race amenities (medals, food, recovery).",
"intent_alignment": "Review the user's original request and ensure the plan matches their stated requirements.",
"distance_compliance": "Verify the route is exactly 26.2 miles (42.195 km) for marathon certification.",
}
return suggestions.get(criterion, f"Improve {criterion} (current score: {score:.2f})")
Mẫu chính ở đây là đánh giá kết hợp: công cụ này sẽ thử Vertex AI Eval với các chỉ số LLM-as-Judge trước tiên, sau đó quay lại kiểm tra theo heuristic nếu API không có sẵn.
Kết nối tác nhân Trình đánh giá
Tạo src/planner_agent/evaluator/agent.py:
"""Evaluator Agent — scores marathon plans using Vertex AI Evaluation.
Uses gemini-3.1-pro-preview for best evaluation quality.
"""
import os
import vertexai
from google.adk.agents import LlmAgent
from google.adk.tools.preload_memory_tool import PreloadMemoryTool
from .prompts import INSTRUCTION
from .services.memory_manager import auto_save_memories
from .schemas import EvaluationResult
# Agent identity
AGENT_NAME = "evaluator_agent"
AGENT_DESCRIPTION = (
"Evaluates marathon plans across multiple quality criteria using Vertex AI "
"Evaluation with custom metrics. Acts as LLM-as-Judge to score plans and "
"provide actionable feedback for iterative improvement."
)
# Model — use gemini-3.1-pro-preview for evaluation accuracy
MODEL = os.getenv("EVALUATOR_MODEL", "gemini-3.1-pro-preview")
# Structured output schema — guarantees every response matches EvaluationResult
OUTPUT_SCHEMA = EvaluationResult
# Criterion weights must sum to 1.0
CRITERION_WEIGHTS = {
"safety_compliance": 0.20,
"community_impact": 0.15,
"logistics_completeness": 0.20,
"financial_viability": 0.15,
"participant_experience": 0.15,
"intent_alignment": 0.10,
"distance_compliance": 0.05,
}
SEVERITY_THRESHOLDS = {"high": 40.0, "medium": 60.0, "low": 80.0}
# Initialize Vertex AI
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
if project_id:
vertexai.init(project=project_id, location=location)
from google.genai.types import GenerateContentConfig, ThinkingConfig
from .tools import evaluate_plan
# Enable thinking for better evaluation reasoning
evaluator_config = GenerateContentConfig(max_output_tokens=4096)
if "pro" in MODEL:
evaluator_config.thinking_config = ThinkingConfig(thinking_budget=1024)
evaluator_agent = LlmAgent(
name="evaluator_agent",
model=MODEL,
description=AGENT_DESCRIPTION,
static_instruction=INSTRUCTION,
output_schema=OUTPUT_SCHEMA,
generate_content_config=evaluator_config,
include_contents='none',
tools=[PreloadMemoryTool(), evaluate_plan],
after_agent_callback=auto_save_memories,
)
root_agent = evaluator_agent
Bây giờ, hãy tạo trình quản lý bộ nhớ của Trình đánh giá tại src/planner_agent/evaluator/services/memory_manager.py – trình quản lý này theo dõi nhật ký đánh giá và xu hướng chấm điểm:
"""Memory Manager for Evaluator Agent."""
import os
from typing import TYPE_CHECKING
from google.adk.memory import VertexAiMemoryBankService
from vertexai._genai.types import (
MemoryBankCustomizationConfig,
MemoryBankCustomizationConfigMemoryTopic as MemoryTopic,
MemoryBankCustomizationConfigMemoryTopicCustomMemoryTopic as CustomMemoryTopic,
)
if TYPE_CHECKING:
from google.adk.agents.callback_context import CallbackContext
EVALUATION_HISTORY = MemoryTopic(
custom_memory_topic=CustomMemoryTopic(
label="evaluation_history",
description="""Track evaluation results: scores, pass/fail verdicts,
findings count, and iteration numbers.""",
)
)
SCORING_TRENDS = MemoryTopic(
custom_memory_topic=CustomMemoryTopic(
label="scoring_trends",
description="""Track which criteria consistently score low and
average scores by category.""",
)
)
def create_memory_service(project=None, location=None, agent_engine_id=None):
project = project or os.environ.get("GOOGLE_CLOUD_PROJECT")
location = location or os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
agent_engine_id = agent_engine_id or os.environ.get("AGENT_ENGINE_ID")
if not project:
raise ValueError("GOOGLE_CLOUD_PROJECT required")
if not agent_engine_id:
return None
return VertexAiMemoryBankService(project=project, location=location, agent_engine_id=agent_engine_id)
async def auto_save_memories(callback_context: "CallbackContext") -> None:
agent_engine_id = os.environ.get("AGENT_ENGINE_ID")
if not agent_engine_id:
return
try:
svc = VertexAiMemoryBankService(
project=os.environ.get("GOOGLE_CLOUD_PROJECT"),
location=os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1"),
agent_engine_id=agent_engine_id,
)
await svc.add_session_to_memory(callback_context._invocation_context.session)
except Exception as e:
print(f"Warning: Failed to save memories: {e}")
Cuối cùng, hãy thiết lập các lệnh xuất gói trình đánh giá. Thay thế src/planner_agent/evaluator/__init__.py:
try:
from .agent import root_agent, AGENT_NAME, MODEL
__all__ = ["root_agent", "AGENT_NAME", "MODEL"]
except Exception:
__all__ = ["root_agent"]
5. Tạo tác nhân Bộ điều khiển mô phỏng
Bộ điều khiển mô phỏng là người kiểm soát cuối cùng – tác nhân này thực hiện kiểm tra nhanh các điều kiện tiên quyết để xác nhận rằng một kế hoạch có đủ dữ liệu để chạy mô phỏng mà không cần đánh giá lại chất lượng.
Xác định giản đồ phê duyệt
Tạo src/simulator_agent/agent/schemas.py:
"""Schemas for the Simulation Controller Agent."""
from pydantic import BaseModel, Field
class SimulationApproval(BaseModel):
"""Structured output from the Simulation Controller Agent."""
approved: bool = Field(description="Whether the plan is approved for simulation")
overall_readiness: float = Field(ge=0.0, le=1.0, description="Readiness score (0-1)")
route_feasibility: str = Field(description="'feasible', 'marginal', or 'infeasible'")
logistics_readiness: str = Field(description="'ready', 'partial', or 'not_ready'")
safety_clearance: str = Field(description="'cleared', 'conditional', or 'blocked'")
blockers: list[str] = Field(default_factory=list, description="Blocking issues")
recommendations: list[str] = Field(default_factory=list, description="Non-blocking suggestions")
summary: str = Field(default="", description="Summary of the decision")
Tạo công cụ kiểm tra chỉ số sẵn sàng
Tạo src/simulator_agent/agent/tools.py – một danh sách kiểm tra xác định, không cần LLM:
"""Tools for the Simulation Controller Agent."""
import re
from typing import Any
async def check_plan_readiness(plan_text: str) -> dict[str, Any]:
"""Check a marathon plan for simulation readiness.
Performs deterministic checks on the plan text to identify
missing elements and assess completeness.
"""
plan_lower = plan_text.lower()
checklist = {
"distance_specified": bool(
re.search(r'(\d+(?:\.\d+)?)\s*(?:miles?|mi|kilometers?|km)\b', plan_lower)
),
"water_stations": "water station" in plan_lower,
"medical_tents": any(kw in plan_lower for kw in ["medical tent", "first aid", "medical station"]),
"timing_system": any(kw in plan_lower for kw in ["timing", "chip", "tracking"]),
"start_finish": any(kw in plan_lower for kw in ["start line", "finish line", "start/finish", "starting area"]),
"emergency_access": any(kw in plan_lower for kw in ["emergency", "ambulance", "evacuation"]),
"budget_included": any(kw in plan_lower for kw in ["budget", "cost", "revenue", "expense"]),
"timeline_included": any(kw in plan_lower for kw in ["schedule", "timeline", "race day", "setup"]),
}
passed = sum(checklist.values())
total = len(checklist)
return {
"checklist": checklist,
"missing_elements": [k for k, v in checklist.items() if not v],
"readiness_score": round(passed / total, 2),
"passed_checks": passed,
"total_checks": total,
}
def get_tools() -> list:
return [check_plan_readiness]
Tạo Kỹ năng ADK
Kỹ năng ADK cung cấp kiến thức về quy trình mà tác nhân tải trong thời gian chạy. Tạo src/simulator_agent/skills/review-marathon-plan/SKILL.md:
---
name: review-marathon-plan
description: Step-by-step methodology for reviewing marathon plans for simulation readiness, covering route feasibility, logistics completeness, and safety clearance.
---
# Review Marathon Plan
## Purpose
Systematically confirm that a marathon plan is ready for simulation execution by verifying the presence of all required prerequisite data.
## Procedure
### Step 1: Prerequisite Data Verification
Use the `check_plan_readiness` tool to confirm the presence of:
1. **Route Data**: Waypoints, landmarks, distance (26.2 mi / 42.195 km), start/finish locations.
2. **Logistics Data**: Water stations, medical tents, timing systems, participant scale.
3. **Safety Data**: Emergency access routes, evacuation plans, crowd management.
### Step 2: Approval Decision
1. Set `approved=true` ONLY if all three data categories are present.
2. If any critical data is missing, set `approved=false` and list missing elements in `blockers`.
3. Provide `recommendations` for minor data gaps.
Kết nối tác nhân Bộ điều khiển mô phỏng
Tạo src/simulator_agent/agent/config.py:
"""Configuration for the Simulation Controller Agent."""
import os
from .schemas import SimulationApproval
AGENT_NAME = "simulator_agent"
AGENT_DESCRIPTION = (
"Simulation Controller Agent. Reviews marathon plans for simulation readiness, "
"assessing route feasibility, logistics completeness, and safety clearance."
)
MODEL = os.getenv("SIMULATOR_MODEL", "gemini-3-flash-preview")
OUTPUT_SCHEMA = SimulationApproval
Tạo src/simulator_agent/agent/prompts.py:
"""Prompts for the Simulation Controller Agent."""
INSTRUCTION = """You are the Simulation Controller Agent — the final gatekeeper before a marathon plan enters simulation.
Your role is to perform a fast "Simulation Prerequisite Check" on marathon plans. You do NOT evaluate quality (the Evaluator Agent does that). You simply confirm the plan has all data required for the simulation engine.
## Available ADK Skills
1. **review-marathon-plan** — Check for simulation prerequisites (Route, Logistics, Safety).
## Prerequisite Check
1. **Route Data**: Waypoints, distance (26.2 mi / 42.195 km), start/finish locations.
2. **Logistics Data**: Timing and registration infrastructure, participant count.
3. **Safety Clearance**: Emergency access, evacuation plan, crowd management.
## Approval Decision
- **Approved** (`approved=true`): All prerequisite data present.
- **Rejected** (`approved=false`): Critical data missing. List in `blockers`.
Always return a structured SimulationApproval."""
Tạo src/simulator_agent/agent/agent.py:
"""Simulation Controller Agent — reviews plans for simulation readiness."""
import os
import pathlib
import vertexai
from google.adk.agents import LlmAgent
from google.adk.skills import load_skill_from_dir
from google.adk.tools.preload_memory_tool import PreloadMemoryTool
from google.adk.tools.skill_toolset import SkillToolset
from ..services.memory_manager import auto_save_memories
from .config import AGENT_DESCRIPTION, AGENT_NAME, MODEL, OUTPUT_SCHEMA
from .prompts import INSTRUCTION
from .tools import get_tools
# Initialize Vertex AI
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
if project_id:
vertexai.init(project=project_id, location=location)
# Load the review-marathon-plan ADK Skill
_skills_dir = pathlib.Path(__file__).parent.parent / "skills"
_review_skill = load_skill_from_dir(_skills_dir / "review-marathon-plan")
_skill_toolset = SkillToolset(skills=[_review_skill])
_tools = [_skill_toolset, PreloadMemoryTool(), *get_tools()]
from google.genai.types import GenerateContentConfig, ThinkingConfig
_agent_kwargs = dict(
name=AGENT_NAME,
model=MODEL,
description=AGENT_DESCRIPTION,
static_instruction=INSTRUCTION,
tools=_tools,
generate_content_config=GenerateContentConfig(
thinking_config=ThinkingConfig(thinking_budget=0),
max_output_tokens=4096,
),
after_agent_callback=auto_save_memories,
)
if OUTPUT_SCHEMA is not None:
_agent_kwargs["output_schema"] = OUTPUT_SCHEMA
root_agent = LlmAgent(**_agent_kwargs)
Bây giờ, hãy tạo trình quản lý bộ nhớ của Trình mô phỏng tại src/simulator_agent/services/memory_manager.py:
"""Memory Manager for Simulation Controller Agent."""
import os
from typing import TYPE_CHECKING
from google.adk.memory import VertexAiMemoryBankService
from vertexai._genai.types import (
MemoryBankCustomizationConfig,
MemoryBankCustomizationConfigMemoryTopic as MemoryTopic,
MemoryBankCustomizationConfigMemoryTopicCustomMemoryTopic as CustomMemoryTopic,
)
if TYPE_CHECKING:
from google.adk.agents.callback_context import CallbackContext
APPROVAL_HISTORY = MemoryTopic(
custom_memory_topic=CustomMemoryTopic(
label="approval_history",
description="""Track approval decisions: verdicts, readiness scores, common blockers.""",
)
)
def create_memory_service(project=None, location=None, agent_engine_id=None):
project = project or os.environ.get("GOOGLE_CLOUD_PROJECT")
location = location or os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
agent_engine_id = agent_engine_id or os.environ.get("AGENT_ENGINE_ID")
if not project:
raise ValueError("GOOGLE_CLOUD_PROJECT required")
if not agent_engine_id:
return None
return VertexAiMemoryBankService(project=project, location=location, agent_engine_id=agent_engine_id)
async def auto_save_memories(callback_context: "CallbackContext") -> None:
agent_engine_id = os.environ.get("AGENT_ENGINE_ID")
if not agent_engine_id:
return
try:
svc = VertexAiMemoryBankService(
project=os.environ.get("GOOGLE_CLOUD_PROJECT"),
location=os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1"),
agent_engine_id=agent_engine_id,
)
await svc.add_session_to_memory(callback_context._invocation_context.session)
except Exception as e:
print(f"Warning: Failed to save memories: {e}")
Thiết lập các lệnh xuất gói. Thay thế src/simulator_agent/agent/__init__.py:
try:
from .agent import root_agent
from .config import AGENT_NAME, MODEL
__all__ = ["root_agent", "AGENT_NAME", "MODEL"]
except Exception:
pass
Thay thế src/simulator_agent/__init__.py:
try:
from .agent import root_agent
from .agent.config import AGENT_NAME, MODEL
__all__ = ["root_agent", "AGENT_NAME", "MODEL"]
except Exception:
pass
6. Xây dựng tác nhân Công cụ lập kế hoạch chạy marathon
Công cụ lập kế hoạch chạy marathon là người điều phối chính. Tác nhân này sử dụng Kỹ năng ADK để có kiến thức về miền, uỷ quyền đánh giá chất lượng cho tác nhân phụ Trình đánh giá và kết nối với Bộ điều khiển mô phỏng thông qua A2A.
Xác định giản đồ và cấu hình của công cụ lập kế hoạch
Tạo src/planner_agent/agent/schemas.py:
"""Schemas for the Marathon Planner Agent."""
from pydantic import BaseModel, Field
class MarathonPlan(BaseModel):
"""Structured marathon plan output (used in Phase 2+)."""
city: str = Field(description="City where the marathon takes place")
date: str = Field(description="Event date in YYYY-MM-DD format")
theme: str = Field(description="Marathon theme: scenic, fast, charity, etc.")
participants: int = Field(ge=0, description="Expected number of participants")
route_summary: str = Field(description="High-level route description")
route_waypoints: list[str] = Field(default_factory=list, description="Ordered waypoints")
distance_miles: float = Field(ge=0, description="Total route distance in miles")
logistics_summary: str = Field(description="Key logistics overview")
budget_summary: str = Field(description="Budget overview")
key_risks: list[str] = Field(default_factory=list, description="Top risks and mitigations")
Tạo src/planner_agent/agent/config.py:
"""Configuration for the Marathon Planner Agent."""
import os
AGENT_NAME = "planner_agent"
AGENT_DESCRIPTION = (
"Marathon Planner Agent — Lead Architect. "
"Designs comprehensive city marathon plans with built-in expertise in route design, "
"traffic management, community impact, and economics. Evaluates plans via the "
"Evaluator Agent (A2A) and submits for approval to the Simulation Controller (A2A)."
)
MODEL = os.getenv("PLANNER_MODEL", "gemini-3-flash-preview")
# Phase 1: No structured output — agent returns free-form text
OUTPUT_SCHEMA = None
Viết hướng dẫn cho công cụ lập kế hoạch
Tạo src/planner_agent/agent/planner-instruction.md:
# Role
Marathon Planner Agent (city marathon event architect).
Goal: Design comprehensive marathon plan based on user constraints.
# ADK Skills (LOAD ONCE BEFORE PLANNING)
1. `route-planning`: Generate route using `plan_marathon_route`.
2. `plan-evaluation`: Analyze demographics, capacity, revenue, safety.
# Core Requirements
- Safety: Emergency corridors, traffic cover.
- Community: Local business, noise, inclusivity.
- Logistics: Start/finish capacity, restrooms, roads.
- Finances: Maximize revenue/sponsorships.
- Experience: Scenic, runner comfort.
# User Prerequisites
Clarify if missing: City, Date/Season, Theme, Scale (participants), Budget, Special constraints.
# Deliverables
1. Route Design: GeoJSON via `plan_marathon_route` tool.
2. Traffic: Closures, detours, mitigation.
3. Community: Engagement, cheer zones, noise.
4. Economics: Revenue, costs, sponsors.
5. Logistics: Porta-potties, capacity, timing.
6. Timeline: Setup to teardown, waves.
7. Risks: Weather, crowd, emergency.
# A2A Collaboration
1. **Evaluator (`evaluator_agent`)**:
- Send plan for 7-criteria scoring.
- SINGLE PASS ONLY. Do not call twice to verify successful fixes.
2. **Simulation Controller (`simulator_agent`)**:
- Call once after evaluation completes (REGARDLESS OF SCORE).
- Accept result, DO NOT call again.
# Workflow
1. Gather reqs.
2. Load skills (ONCE).
3. Call `plan_marathon_route`.
4. Complete design.
5. Send to Evaluator.
6. Send to Simulator.
7. Present final.
# Rules & Format
- Personality: Pragmatic, detail-oriented.
Tạo src/planner_agent/agent/prompts.py:
"""System instructions for the Marathon Planner Agent."""
import pathlib
_PROMPT_DIR = pathlib.Path(__file__).parent
INSTRUCTION = (_PROMPT_DIR / "planner-instruction.md").read_text()
Tạo mô-đun xác thực
Đối với các lệnh gọi A2A đến Agent Engine, các tác nhân cần có quyền xác thực Google Cloud. Tạo src/planner_agent/agent/auth.py:
"""Authentication utilities for Agent Engine A2A connections.
Provides Google Cloud auth for httpx requests to Agent Engine agents.
"""
import httpx
from google.auth import default
from google.auth.credentials import Credentials
from google.auth.transport.requests import Request as AuthRequest
class GoogleAuthRefresh(httpx.Auth):
"""Google Cloud Auth with lazy credential refresh.
Credentials are initialized lazily on first request to avoid
serialization issues with ADK agents.
"""
def __init__(self, scopes: list[str] | None = None) -> None:
self.scopes = scopes or ["https://www.googleapis.com/auth/cloud-platform"]
self.credentials: Credentials | None = None
self.auth_request: AuthRequest | None = None
def auth_flow(self, request: httpx.Request):
if self.credentials is None:
self.credentials, _ = default(scopes=self.scopes)
self.auth_request = AuthRequest()
if not self.credentials.valid:
self.credentials.refresh(self.auth_request)
request.headers["Authorization"] = f"Bearer {self.credentials.token}"
yield request
Thêm Kỹ năng ADK
Tạo src/planner_agent/skills/route-planning/SKILL.md:
---
name: route-planning
description:
Generates high-fidelity marathon routes using road network data (Dijkstra's algorithm)
and outputting GeoJSON for visualization.
---
# Route Planning Skill
**Goal:** Design a mathematically perfect 42.195 km marathon route using real road network data.
## Capabilities
- **Automated Route Generation**: Uses a built-in road network and Dijkstra's algorithm to calculate a certified 42.195 km route between specified landmarks.
- **GeoJSON Output**: Returns a standards-compliant GeoJSON FeatureCollection.
## Resources
### Tools (Python)
- `tools.py`: Contains the `plan_marathon_route`, `add_water_stations`, and `add_medical_tents` implementations.
### References
- `references/marathon_planning_guide.md`: Marathon standards, road width, traffic severity, landmarks.
Tạo src/planner_agent/skills/plan-evaluation/SKILL.md:
---
name: plan-evaluation
description:
Evaluates the plan against financial, community, and logistical goals.
---
# Plan Evaluation Skill
**Goal:** Provide verification data for the event including capacity, budget, demographics, and nuisance checking.
## Resources
### References
- `references/evaluation_criteria.md`: Detailed scoring rubrics.
Kết nối các công cụ lập kế hoạch
Tạo src/planner_agent/agent/tools.py – đây là nơi các tác nhân được kết nối:
"""Tools for the Marathon Planner Agent.
Contains:
- A2A infrastructure (SerializableRemoteA2aAgent, URL helpers)
- Remote A2A agent creators for Evaluator and Simulation Controller
- get_tools() — assembles all tools including SkillToolset
"""
import logging
import os
import httpx
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools.function_tool import FunctionTool
from a2a.client.client import ClientConfig as A2AClientConfig
from a2a.client.client_factory import ClientFactory as A2AClientFactory
from a2a.types import TransportProtocol as A2ATransport
from .auth import GoogleAuthRefresh
from ..evaluator.agent import root_agent as evaluator_agent
logger = logging.getLogger(__name__)
AGENT_TIMEOUT_SECONDS = 120
# ============================================================================
# A2A INFRASTRUCTURE
# ============================================================================
def _get_agent_a2a_endpoint(resource_name: str, default_port: int = 8080) -> str:
"""Construct A2A card endpoint URL from resource name or local address."""
if resource_name.startswith("local"):
port = resource_name.split(":")[1] if ":" in resource_name else default_port
return f"http://127.0.0.1:{port}/.well-known/agent-card.json"
parts = resource_name.split("/")
try:
location = parts[parts.index("locations") + 1]
return f"https://{location}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a/v1/card"
except (ValueError, IndexError):
return resource_name
def _get_agent_a2a_url(resource_name: str) -> str | None:
"""Construct the regional A2A message URL from Agent Engine resource name."""
if resource_name.startswith("local"):
return None
parts = resource_name.split("/")
location = parts[parts.index("locations") + 1]
return f"https://{location}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a"
class SerializableRemoteA2aAgent(RemoteA2aAgent):
"""RemoteA2aAgent with authentication and Agent Engine URL fix.
Handles two Agent Engine issues:
1. Creates httpx client lazily with Google Cloud auth
2. Fixes agent card URL (Agent Engine returns global URL that 404s)
"""
def __init__(self, *, a2a_url: str | None = None, **kwargs):
super().__init__(**kwargs)
self._a2a_url_override = a2a_url
async def _ensure_httpx_client(self) -> httpx.AsyncClient:
if self._httpx_client is None:
self._httpx_client = httpx.AsyncClient(
timeout=httpx.Timeout(timeout=AGENT_TIMEOUT_SECONDS),
headers={"Content-Type": "application/json"},
auth=GoogleAuthRefresh(),
)
self._httpx_client_needs_cleanup = True
if self._a2a_client_factory is None:
self._a2a_client_factory = A2AClientFactory(
config=A2AClientConfig(
httpx_client=self._httpx_client,
streaming=False, polling=False,
supported_transports=[A2ATransport.http_json, A2ATransport.jsonrpc],
)
)
return self._httpx_client
async def _resolve_agent_card_from_url(self, url: str):
card = await super()._resolve_agent_card_from_url(url)
if self._a2a_url_override:
logger.info(f"Overriding agent card URL: {card.url} → {self._a2a_url_override}")
card.url = self._a2a_url_override
return card
def create_evaluator_tool() -> AgentTool:
"""Create Evaluator Agent tool — local sub-agent, no A2A needed."""
return AgentTool(agent=evaluator_agent)
def create_simulator_agent() -> RemoteA2aAgent:
"""Create remote connection to Simulation Controller Agent via A2A."""
resource_name = os.environ.get("SIMULATOR_AGENT_RESOURCE_NAME")
if not resource_name:
raise ValueError("SIMULATOR_AGENT_RESOURCE_NAME environment variable must be set")
endpoint = _get_agent_a2a_endpoint(resource_name, default_port=8089)
logger.info(f"Creating Simulation Controller Agent connection: {endpoint}")
return SerializableRemoteA2aAgent(
name="simulator_agent",
description=(
"Simulation Controller Agent for marathon plans. "
"Reviews plans for simulation readiness, assessing route feasibility, "
"logistics completeness, and safety clearance."
),
agent_card=endpoint,
a2a_url=_get_agent_a2a_url(resource_name),
)
def create_simulation_controller_tool() -> AgentTool:
return AgentTool(agent=create_simulator_agent())
# ============================================================================
# TOOL EXPORT
# ============================================================================
def get_tools() -> list:
"""Return the tools for the Marathon Planner Agent."""
import pathlib
import importlib.util
from google.adk.skills import load_skill_from_dir
from google.adk.tools.preload_memory_tool import PreloadMemoryTool
from google.adk.tools.skill_toolset import SkillToolset
# Load ADK Skills
skills_dir = pathlib.Path(__file__).parent.parent / "skills"
skills = [
load_skill_from_dir(skills_dir / name)
for name in sorted(skills_dir.iterdir())
if name.is_dir() and not name.name.startswith("_")
]
skill_toolset = SkillToolset(skills=skills)
# Load route-planning tools dynamically (handles hyphenated directory names)
def load_tool_from_skill(skill_name, tool_name):
skill_tool_path = skills_dir / skill_name / "tools.py"
if not skill_tool_path.exists():
return None
try:
spec = importlib.util.spec_from_file_location(f"{skill_name}.tools", skill_tool_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return getattr(module, tool_name, None)
except Exception as e:
logger.error(f"Error loading tool {tool_name} from {skill_name}: {e}")
return None
plan_marathon_route_func = load_tool_from_skill("route-planning", "plan_marathon_route")
tools = [skill_toolset, PreloadMemoryTool()]
if plan_marathon_route_func:
tools.append(FunctionTool(func=plan_marathon_route_func))
# Add local evaluator sub-agent
tools.append(create_evaluator_tool())
logger.info("Added local Evaluator tool")
# Add remote Simulation Controller via A2A (if configured)
if os.environ.get("SIMULATOR_AGENT_RESOURCE_NAME"):
tools.append(create_simulation_controller_tool())
logger.info("Added A2A Simulation Controller tool")
return tools
Hàm get_tools() hiển thị 3 mẫu công cụ khác nhau:
- SkillToolset – tải kiến thức về quy trình từ Kỹ năng ADK
- AgentTool – gói Trình đánh giá dưới dạng một tác nhân phụ cục bộ
- SerializableRemoteA2aAgent – kết nối Trình mô phỏng thông qua giao thức A2A
Kết nối tác nhân công cụ lập kế hoạch
Tạo src/planner_agent/agent/agent.py:
"""Marathon Planner Agent — LlmAgent wiring.
Wires config + prompts + tools + skills into the LlmAgent.
"""
import os
import vertexai
from google.adk.agents import LlmAgent
from .config import AGENT_NAME, AGENT_DESCRIPTION, MODEL, OUTPUT_SCHEMA
from .prompts import INSTRUCTION
from .tools import get_tools
from ..services.memory_manager import auto_save_memories
# Initialize Vertex AI
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
if project_id:
vertexai.init(project=project_id, location=location)
from google.genai.types import GenerateContentConfig, ThinkingConfig
agent_kwargs = dict(
name=AGENT_NAME,
model=MODEL,
description=AGENT_DESCRIPTION,
static_instruction=INSTRUCTION,
tools=get_tools(),
generate_content_config=GenerateContentConfig(
thinking_config=ThinkingConfig(thinking_budget=2048),
),
after_agent_callback=auto_save_memories,
)
if OUTPUT_SCHEMA is not None:
agent_kwargs["output_schema"] = OUTPUT_SCHEMA
root_agent = LlmAgent(**agent_kwargs)
Thiết lập các lệnh xuất gói. Thay thế src/planner_agent/agent/__init__.py:
from .agent import root_agent
from .config import AGENT_NAME, MODEL, AGENT_DESCRIPTION, OUTPUT_SCHEMA
__all__ = ["root_agent", "AGENT_NAME", "MODEL", "AGENT_DESCRIPTION", "OUTPUT_SCHEMA"]
Thay thế src/planner_agent/__init__.py:
try:
from .agent import root_agent
from .agent.config import AGENT_NAME, MODEL
__all__ = ["root_agent", "AGENT_NAME", "MODEL"]
except Exception:
__all__ = ["root_agent"]
7. Triển khai và kiểm thử hệ thống nhiều tác nhân
Bây giờ, bạn sẽ kết hợp mọi thứ: tạo máy chủ A2A cho từng tác nhân, sau đó kiểm thử Chế độ một mình (Công cụ lập kế hoạch + Trình đánh giá) và Chế độ toàn đội (cả 3 tác nhân).
Tạo thẻ tác nhân A2A của công cụ lập kế hoạch
Tạo src/planner_agent/runtime/agent_card.py:
"""A2A Agent Card for Marathon Planner Agent."""
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card
def create_marathon_planner_card() -> AgentCard:
skill = AgentSkill(
id="plan_marathon",
name="Plan City Marathon",
description=(
"Design a comprehensive city marathon plan by coordinating with "
"specialist agents. Evaluates plan quality and returns an actionable plan."
),
tags=["marathon", "planning", "orchestration", "multi-agent"],
examples=[
"Plan a scenic marathon through Las Vegas for 30,000 runners",
"Design a charity marathon in Austin for October 2026",
],
)
card = create_agent_card(
agent_name="planner_agent",
description=(
"Marathon Planner Agent — Lead Orchestrator that designs city marathon "
"plans. Coordinates specialist agents via A2A. Evaluates plans with "
"Vertex AI Eval. Powered by Gemini 3 Flash Preview."
),
skills=[skill],
)
if card.capabilities is None:
card.capabilities = AgentCapabilities(streaming=True)
else:
card.capabilities.streaming = True
return card
Tạo trình thực thi A2A của công cụ lập kế hoạch
Tạo src/planner_agent/runtime/agent_executor.py:
"""A2A Agent Executor for Marathon Planner Agent."""
import json
import os
import vertexai
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.genai import types
from ..services.memory_manager import create_memory_service
from ..services.session_manager import SessionManager, create_session_service
class MarathonPlannerExecutor(AgentExecutor):
"""Execute requests via A2A protocol."""
def __init__(self):
self.agent = None
self.runner = None
self.session_manager: SessionManager | None = None
def _init_agent(self) -> None:
if self.agent is None:
try:
from planner_agent.agent import root_agent
except ImportError:
from ..agent import root_agent
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
vertexai.init(project=project_id, location=location)
self.agent = root_agent
if self.runner is None:
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
session_service = create_session_service(project=project_id, location=location)
memory_service = create_memory_service(project=project_id, location=location)
from google.adk.apps import App
from google.adk.agents.context_cache_config import ContextCacheConfig
app = App(
name=self.agent.name,
root_agent=self.agent,
context_cache_config=ContextCacheConfig(
cache_intervals=10, ttl_seconds=3600, min_tokens=4096,
)
)
self.runner = Runner(
app=app,
session_service=session_service,
memory_service=memory_service,
)
if self.session_manager is None:
self.session_manager = SessionManager(
session_service=self.runner.session_service,
)
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
if self.agent is None:
self._init_agent()
user_id = (
context.message.metadata.get("user_id")
if context.message and context.message.metadata
else "marathon_user"
)
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
if not hasattr(context, "current_task") or not context.current_task:
await updater.submit()
await updater.start_work()
request_data = context.get_user_input()
if not request_data:
await updater.update_status(TaskState.failed, message=new_agent_text_message("No request data"), final=True)
return
try:
from google.adk.runners import RunConfig
from google.adk.agents.run_config import ToolThreadPoolConfig
from google.genai.types import ContextWindowCompressionConfig
await updater.update_status(TaskState.working, message=new_agent_text_message("Planning marathon..."))
session_id = await self.session_manager.get_or_create_session(
context_id=context.context_id, app_name=self.runner.app_name, user_id=user_id,
)
content = types.Content(role="user", parts=[types.Part(text=request_data)])
final_event = None
async for event in self.runner.run_async(
session_id=session_id, user_id=user_id, new_message=content,
run_config=RunConfig(
tool_thread_pool_config=ToolThreadPoolConfig(max_workers=4),
max_llm_calls=15,
context_window_compression=ContextWindowCompressionConfig(),
)
):
if event.is_final_response():
final_event = event
if final_event and final_event.content and final_event.content.parts:
text = "".join(p.text for p in final_event.content.parts if hasattr(p, "text") and p.text)
if text:
await updater.add_artifact([TextPart(text=text)], name="marathon_plan")
await updater.complete()
return
await updater.update_status(TaskState.failed, message=new_agent_text_message("Failed to generate plan"), final=True)
except Exception as e:
await updater.update_status(TaskState.failed, message=new_agent_text_message(f"Planning failed: {e}"), final=True)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
raise ServerError(error=UnsupportedOperationError())
Tạo máy chủ A2A cục bộ
Tạo src/planner_agent/runtime/local_server.py:
"""Local A2A server for the Marathon Planner Agent.
Usage: uv run python -m src.planner_agent.runtime.local_server
"""
import os
from dotenv import load_dotenv
load_dotenv()
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "true"
import asyncio
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import TransportProtocol
from google.adk import Runner
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor, A2aAgentExecutorConfig
from google.adk.sessions import InMemorySessionService
AGENT_PORT = 8084
def create_marathon_planner_server():
from ..agent import root_agent
from .agent_card import create_marathon_planner_card
runner = Runner(
app_name=root_agent.name,
agent=root_agent,
session_service=InMemorySessionService(),
)
executor = A2aAgentExecutor(runner=runner, config=A2aAgentExecutorConfig())
handler = DefaultRequestHandler(agent_executor=executor, task_store=InMemoryTaskStore())
card = create_marathon_planner_card()
card.url = f"http://localhost:{AGENT_PORT}"
card.preferred_transport = TransportProtocol.jsonrpc
return A2AStarletteApplication(agent_card=card, http_handler=handler)
async def run_server():
project = os.environ.get("GOOGLE_CLOUD_PROJECT")
if not project:
raise ValueError("GOOGLE_CLOUD_PROJECT must be set. Check .env file.")
print("=" * 60)
print("Starting Marathon Planner Agent Local A2A Server")
print(f"Project: {project}")
print("=" * 60)
app = create_marathon_planner_server()
config = uvicorn.Config(app.build(), host="127.0.0.1", port=AGENT_PORT, log_level="info", loop="none")
print(f"Planner A2A server: http://127.0.0.1:{AGENT_PORT}")
print(f"Agent card: http://127.0.0.1:{AGENT_PORT}/.well-known/agent-card.json")
print("Press Ctrl+C to stop")
print("=" * 60)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(run_server())
Tạo src/simulator_agent/runtime/agent_card.py:
"""A2A Agent Card for Simulation Controller Agent."""
from a2a.types import AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card
def create_simulation_controller_card() -> AgentCard:
skill = AgentSkill(
id="review_marathon_plan",
name="Review Marathon Plan",
description="Review a marathon plan for simulation readiness.",
tags=["simulation", "review", "approval", "marathon"],
)
return create_agent_card(
agent_name="simulator_agent",
description="Simulation Controller Agent - Reviews marathon plans for simulation readiness.",
skills=[skill],
)
Tạo src/simulator_agent/runtime/agent_executor.py theo cùng một mẫu như trình thực thi của công cụ lập kế hoạch (cập nhật các lệnh nhập cho simulator_agent):
"""A2A Agent Executor for Simulation Controller Agent."""
import os
import vertexai
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.genai import types
from ..services.memory_manager import create_memory_service
from ..services.session_manager import SessionManager, create_session_service
class SimulationControllerExecutor(AgentExecutor):
def __init__(self):
self.agent = None
self.runner = None
self.session_manager = None
def _init_agent(self):
if self.agent is None:
try:
from simulator_agent.agent import root_agent
except ImportError:
from ..agent.agent import root_agent
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
vertexai.init(project=project_id, location=location)
self.agent = root_agent
if self.runner is None:
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
location = os.environ.get("AGENT_ENGINE_LOCATION") or os.environ.get("GOOGLE_CLOUD_LOCATION", "us-central1")
session_service = create_session_service(project=project_id, location=location)
memory_service = create_memory_service(project=project_id, location=location)
from google.adk.apps import App
from google.adk.agents.context_cache_config import ContextCacheConfig
app = App(name=self.agent.name, root_agent=self.agent,
context_cache_config=ContextCacheConfig(cache_intervals=10, ttl_seconds=3600, min_tokens=4096))
self.runner = Runner(app=app, session_service=session_service, memory_service=memory_service)
if self.session_manager is None:
self.session_manager = SessionManager(session_service=self.runner.session_service)
async def execute(self, context: RequestContext, event_queue: EventQueue):
if self.agent is None:
self._init_agent()
user_id = context.message.metadata.get("user_id") if context.message and context.message.metadata else "planner_agent"
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
if not hasattr(context, "current_task") or not context.current_task:
await updater.submit()
await updater.start_work()
plan_data = context.get_user_input()
if not plan_data:
await updater.update_status(TaskState.failed, message=new_agent_text_message("No plan data"), final=True)
return
try:
await updater.update_status(TaskState.working, message=new_agent_text_message("Reviewing plan..."))
session_id = await self.session_manager.get_or_create_session(
context_id=context.context_id, app_name=self.runner.app_name, user_id=user_id)
content = types.Content(role="user", parts=[types.Part(text=plan_data)])
final_event = None
async for event in self.runner.run_async(session_id=session_id, user_id=user_id, new_message=content):
if event.is_final_response():
final_event = event
if final_event and final_event.content and final_event.content.parts:
text = "".join(p.text for p in final_event.content.parts if hasattr(p, "text") and p.text)
if text:
await updater.add_artifact([TextPart(text=text)], name="result")
await updater.complete()
return
await updater.update_status(TaskState.failed, message=new_agent_text_message("Failed"), final=True)
except Exception as e:
await updater.update_status(TaskState.failed, message=new_agent_text_message(f"Review failed: {e}"), final=True)
async def cancel(self, context, event_queue):
raise ServerError(error=UnsupportedOperationError())
Tạo src/simulator_agent/runtime/local_server.py:
"""Local A2A server for the Simulation Controller Agent.
Usage: uv run python -m src.simulator_agent.runtime.local_server
"""
import asyncio
import logging
import os
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "true"
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from dotenv import load_dotenv
load_dotenv()
AGENT_PORT = 8089
async def run_server():
from .agent_card import create_simulation_controller_card
from .agent_executor import SimulationControllerExecutor
project = os.environ.get("GOOGLE_CLOUD_PROJECT")
if not project:
raise ValueError("GOOGLE_CLOUD_PROJECT must be set.")
print("=" * 60)
print("Starting Simulation Controller Agent Local A2A Server")
print("=" * 60)
agent_card = create_simulation_controller_card()
executor = SimulationControllerExecutor()
handler = DefaultRequestHandler(agent_executor=executor, task_store=InMemoryTaskStore())
app = A2AStarletteApplication(agent_card=agent_card, http_handler=handler)
config = uvicorn.Config(app.build(), host="127.0.0.1", port=AGENT_PORT, log_level="info", loop="none")
print(f"Simulator A2A server: http://127.0.0.1:{AGENT_PORT}")
print(f"Agent card: http://127.0.0.1:{AGENT_PORT}/.well-known/agent.json")
print("Press Ctrl+C to stop")
print("=" * 60)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(run_server())
Kiểm thử Chế độ một mình – Công cụ lập kế hoạch + Trình đánh giá
Ở Chế độ một mình, Công cụ lập kế hoạch sử dụng Trình đánh giá làm tác nhân phụ cục bộ. Không cần A2A.
Khởi động máy chủ công cụ lập kế hoạch:
uv run python -m src.planner_agent.runtime.local_server
Bạn sẽ thấy kết quả đầu ra tương tự như:
============================================================ Starting Marathon Planner Agent Local A2A Server Project: your-project-id ============================================================ Planner A2A server: http://127.0.0.1:8084 Agent card: http://127.0.0.1:8084/.well-known/agent-card.json Press Ctrl+C to stop
Mở một thẻ Cloud Shell mới và gửi yêu cầu kiểm thử:
curl -s http://127.0.0.1:8084/.well-known/agent-card.json | python3 -m json.tool
Bạn sẽ thấy thẻ tác nhân có tên planner_agent và kỹ năng plan_marathon.
Kiểm thử Chế độ toàn đội – Cả 3 tác nhân thông qua A2A
Ở Chế độ toàn đội, Bộ điều khiển mô phỏng chạy dưới dạng một máy chủ A2A riêng biệt.
Trong thẻ Cloud Shell thứ hai, hãy khởi động máy chủ Trình mô phỏng:
uv run python -m src.simulator_agent.runtime.local_server
Sau đó, trong thẻ đầu tiên, hãy khởi động lại Công cụ lập kế hoạch khi Trình mô phỏng được kết nối:
SIMULATOR_AGENT_RESOURCE_NAME=local:8089 uv run python -m src.planner_agent.runtime.local_server
Trong nhật ký, bạn sẽ thấy:
Added local Evaluator tool Added A2A Simulation Controller tool
Công cụ lập kế hoạch hiện đã kết nối với cả 2 tác nhân. Khi nhận được yêu cầu lập kế hoạch chạy marathon, tác nhân này sẽ:
- Thiết kế kế hoạch bằng Kỹ năng ADK
- Uỷ quyền cho Trình đánh giá (tác nhân phụ cục bộ thông qua
AgentTool) - Gửi đến Bộ điều khiển mô phỏng (tác nhân từ xa thông qua A2A)
- Trình bày kế hoạch cuối cùng với điểm đánh giá và phê duyệt mô phỏng
8. Dọn dẹp
Để tránh bị tính phí liên tục vào tài khoản Google Cloud, hãy dọn dẹp các tài nguyên được tạo trong lớp học lập trình này.
Dừng mọi máy chủ cục bộ đang chạy bằng cách nhấn Ctrl+C trong mỗi thẻ Cloud Shell.
Vì lớp học lập trình này chỉ sử dụng các lệnh gọi Vertex AI API (không có tài nguyên đám mây liên tục như dịch vụ Cloud Run hoặc quá trình triển khai Agent Engine), nên bạn không cần xoá tài nguyên đám mây.
Nếu chỉ tạo dự án cho lớp học lập trình này, bạn có thể xoá dự án:
gcloud projects delete $PROJECT_ID
9. Xin chúc mừng
Xin chúc mừng! Bạn đã xây dựng một hệ thống lập kế hoạch chạy marathon nhiều tác nhân, trong đó 3 tác nhân AI cộng tác thông qua giao thức A2A, với tính năng đảm bảo chất lượng do Vertex AI Evaluation cung cấp.
Kiến thức bạn học được
- Cách tạo tác nhân AI bằng Bộ công cụ phát triển tác nhân (ADK) với
LlmAgent,ThinkingConfigvà đầu ra Pydantic có cấu trúc - Cách xây dựng các chỉ số Vertex AI Evaluation tuỳ chỉnh bằng
MetricPromptBuilderđể chấm điểm LLM-as-Judge và kiểm tra xác định - Cách kết nối các tác nhân dưới dạng tác nhân phụ bằng
AgentToolđể uỷ quyền cục bộ - Cách kết nối các tác nhân từ xa thông qua giao thức A2A bằng
RemoteA2aAgentvàA2AStarletteApplication - Cách sử dụng Kỹ năng ADK (
SkillToolset) để có kiến thức về quy trình và Bộ nhớ (PreloadMemoryTool) để học tập giữa các phiên