1. 简介
在此 Codelab 中,您将通过添加持久化和专业知识,将 ADK 代理提升到新的水平。您将学习如何使用 Agent Platform Sessions 管理对话状态、使用 Memory Bank 实现长期学习,以及使用 Spark 和 AlloyDB for RAG(检索增强生成)集成复杂的城市规则数据。
您将执行的操作
- 配置代理平台会话以实现对话持久性。
- 实现记忆库,以便代理从之前的互动中学习。
- 使用 Spark Lightning Engine 来提取和处理城市规则文档。
- 使用 AlloyDB 和向量搜索构建 RAG 系统。
- 将增强型代理部署到 Agent Platform。
所需条件
预计时长:60 分钟
在此 Codelab 中创建的资源费用应低于 5 美元。
2. 准备工作
创建 Google Cloud 项目
- 在 Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目。
- 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
启动 Cloud Shell
Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了必要的工具。
- 点击 Google Cloud 控制台顶部的激活 Cloud Shell。
- 连接到 Cloud Shell 后,验证您的身份验证:
gcloud auth list - 确认您的项目已配置:
gcloud config get project - 如果项目未按预期设置,请进行设置:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
验证身份验证:
gcloud auth list
确认您的项目:
gcloud config get project
根据需要进行设置:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
启用 API
运行此命令可启用会话管理、Spark 处理和 AlloyDB 所需的所有 API:
gcloud services enable \ aiplatform.googleapis.com \ run.googleapis.com \ alloydb.googleapis.com \ dataproc.googleapis.com \ documentai.googleapis.com \ storage.googleapis.com \ secretmanager.googleapis.com
3. 设置您的环境
在此 Codelab 中,您将使用 keynote 代码库中的预配置环境。
- 克隆代码库并前往项目文件夹:
git clone https://github.com/GoogleCloudPlatform/next-26-keynotes cd next-26-keynotes/devkey/enhancing-agents-with-memory
- 设置 Python 虚拟环境并安装所需的 ADK 软件包:
uv venv source .venv/bin/activate uv sync
配置环境变量
代理需要进行特定配置才能连接到代理平台和 AlloyDB。
- 复制示例环境文件:
cp .env.example .env
- 打开
.env并更新以下字段:GOOGLE_CLOUD_PROJECT:您的项目 ID。GOOGLE_CLOUD_LOCATION:us-central1。ALLOYDB_CLUSTER_ID:rules-db。
GOOGLE_CLOUD_PROJECT=<YOUR_PROJECT_ID> GOOGLE_CLOUD_LOCATION=global GOOGLE_GENAI_USE_VERTEXAI=TRUE GOOGLE_CLOUD_REGION=us-central1 ALLOYDB_CLUSTER_ID=rules-db
- 运行以下辅助脚本来创建用于对话会话和长期记忆的 Agent Engine 实例。这会自动填充
.env文件中的AGENT_ENGINE_ID:
uv run utils/setup_agent_engine.py
成功后,您应该会看到:
Creating Agent Engine instance...
Successfully created Agent Engine. ID: 1234567890
Updated .env with AGENT_ENGINE_ID=1234567890
4. 使用会话管理创建代理
在此步骤中,您将初始化一个 Marathon Planner Agent,该代理可以跨多轮对话维护对话历史记录。这是通过使用 ADK App 类和代理平台会话来实现的。
初始化代理和服务会话
打开 planner_agent/agent.py。您将看到我们如何添加 ADK 类来集成代理平台会话。这样一来,我们就能让代理随着时间的推移保持有状态,并根据需要修改上下文。
from google.adk.agents import LlmAgent
from google.adk.sessions import VertexAiSessionService
from vertexai.agent_engines import AdkApp
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
REGION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")
# Initialize Vertex AI for regional services
if PROJECT_ID:
vertexai.init(project=PROJECT_ID, location=REGION)
# Define the agent logic
root_agent = LlmAgent(
name="planner_agent",
model="gemini-3-flash-preview",
instruction="You are a helpful marathon planning assistant...",
tools=[] # We will add tools in the next steps
)
def session_service_builder():
"""Builder for Agent Platform Sessions."""
return VertexAiSessionService(project=PROJECT_ID, location=REGION)
# Wrap the agent in an AdkApp to manage stateful context
app = AdkApp(
agent=root_agent,
session_service_builder=session_service_builder
)
5. 通过记忆库启用长期学习
虽然会话管理会跟踪单个对话,但您也可以对长期记忆执行相同的操作。在此步骤中,您将代理附加到 Agent Platform 的记忆库,这是一项企业级全托管式记忆服务。
初始化记忆库服务
记忆库可让智能体回忆起不同会话中的上下文信息。更新 planner_agent/agent.py 以包含内存服务:
from google.adk.memory import VertexAiMemoryBankService
def memory_service_builder():
"""Builder for Agent Platform Memory Bank."""
return VertexAiMemoryBankService(
project=PROJECT_ID,
location=REGION,
agent_engine_id=AGENT_ENGINE_ID
)
实现自动内存注入
为了确保智能体从每个回合中学习,我们添加了 after_agent_callback。此函数在代理完成回答后触发,以便代理“消化”会话并将相关记忆保存到记忆库中。
- 定义回调函数:
async def auto_save_memories(callback_context):
"""Callback to ingest the session into the memory bank after the turn."""
# In AdkApp, the memory service is available via the invocation context
if hasattr(callback_context._invocation_context, 'memory_service') and callback_context._invocation_context.memory_service:
await callback_context._invocation_context.memory_service.add_session_to_memory(
callback_context._invocation_context.session
)
- 将回调附加到
LlmAgent:
root_agent = LlmAgent(
# ... other params
after_agent_callback=[auto_save_memories],
)
6. 设置 AlloyDB 以用于 RAG
在注入城市规则数据之前,我们需要一个高性能数据库来存储这些数据。在此步骤中,您将创建一个 AlloyDB 集群并初始化向量搜索的数据库架构。
1. 创建 AlloyDB 集群和主实例
在 Cloud Shell 中运行以下命令,以创建集群及其主实例:
# Create the cluster gcloud alloydb clusters create rules-db \ --password=postgres \ --region=us-central1 # Create the primary instance with IAM authentication enabled gcloud alloydb instances create rules-db-primary \ --instance-type=PRIMARY \ --cpu-count=2 \ --region=us-central1 \ --cluster=rules-db \ --database-flags=alloydb.iam_authentication=on
2. 授予所需的 IAM 角色
如需使用托管式 AlloyDB MCP 服务器,您的身份需要具备特定权限。运行以下命令以授予所需角色:
export USER_EMAIL=$(gcloud config get-value account) # Role to use MCP tools gcloud projects add-iam-policy-binding $PROJECT_ID \ --member="user:$USER_EMAIL" \ --role="roles/mcp.toolUser" # Role to execute SQL in AlloyDB gcloud projects add-iam-policy-binding $PROJECT_ID \ --member="user:$USER_EMAIL" \ --role="roles/alloydb.admin" # Role for IAM database authentication gcloud projects add-iam-policy-binding $PROJECT_ID \ --member="user:$USER_EMAIL" \ --role="roles/alloydb.databaseUser" # Create the IAM-based database user gcloud alloydb users create "$USER_EMAIL" \ --cluster=rules-db \ --region=us-central1 \ --type=IAM_BASED
3. 通过 AlloyDB Studio 创建数据库和表
由于 AlloyDB 数据库和表是通过 SQL 进行管理的,因此我们将使用 Google Cloud 控制台中的 AlloyDB Studio 来完成架构。
- 前往 AlloyDB > 集群,然后点击
rules-db。 - 在左侧导航菜单中,点击 AlloyDB Studio。
- 使用 postgres 用户和您设置的密码 (
postgres) 登录。 - 运行以下 SQL 以创建数据库:
CREATE DATABASE city_rules; - 在 AlloyDB Studio 中将数据库连接切换到
city_rules,然后运行以下 SQL 来安装扩展程序并创建rules表:-- Install extensions for vector search and ML CREATE EXTENSION IF NOT EXISTS vector; CREATE EXTENSION IF NOT EXISTS google_ml_integration CASCADE; -- Create the rules table CREATE TABLE IF NOT EXISTS rules ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), text TEXT NOT NULL, city TEXT NOT NULL, embedding vector(3072) DEFAULT NULL ); -- Grant your IAM user access to the table (replace with your email) GRANT ALL PRIVILEGES ON TABLE rules TO "YOUR_EMAIL_ADDRESS";
7. 使用 Spark Lightning Engine 注入城市规则数据
为了提供真正准确的规划,智能体不仅需要精心设计的提示,还需要扎根于数据和组织背景信息。在此步骤中,您将使用 Dataproc Serverless 上的 Spark Lightning Engine 处理大型城市规则 PDF,并将它们提取到 AlloyDB 中。
为什么选择 Spark Lightning Engine?
大规模地为代理提供依据需要处理海量的非结构化数据。Spark Lightning Engine 是一款高性能 Spark 执行引擎,可显著加速这些工作负载。我们在此处使用它,通过 Google 的 Document AI 对文档执行语义分块。
探索 Spark 流水线
提取逻辑在 spark-setup/spark_alloydb_processor.py 中定义。流水线遵循以下步骤:
- 列出 PDF:从 Google Cloud Storage 存储分区中检索文档 URI。
- 语义提取:使用 UDF(用户定义的函数)调用 Document AI API。
- 写入 AlloyDB:将提取的文本块保存到名为
rules的 AlloyDB 表中。
# Extract from spark_alloydb_processor.py
def process_document(gcs_uri: str):
# ... calls Document AI to parse PDF ...
return chunks
# Parallel processing with Spark Lightning Engine
process_udf = udf(process_document, chunk_schema)
chunked_df = uri_df.withColumn("chunks", process_udf(col("gcs_uri"))) \
.select(explode(col("chunks")).alias("chunk")) \
.select("chunk.*")
# Save to AlloyDB for Vector Search
chunked_df.write.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "rules") \
.mode("append") \
.save()
运行注入作业
使用提供的脚本触发提取流程:
./spark-setup/run_dataproc.sh
8. 使用 AlloyDB 的 RAG
现在,城市规则数据已位于 AlloyDB 中,智能体可以使用这些数据执行检索增强生成 (RAG)。这样可确保马拉松计划遵循特定的城市代码。
AlloyDB 在 RAG 方面的强大功能
AlloyDB 在向量搜索方面表现出色,可让我们在同一位置存储结构化数据和向量嵌入。代理可以使用 AlloyDB 中的内置 embedding 函数来查找最相关的规则信息。
使用向量搜索进行混合召回
为了让代理能够访问这些数据,我们提供了一个使用向量相似性查询 AlloyDB 的工具。您可以在 hybrid_recall.sql 中看到此逻辑,该逻辑演示了如何计算查询与存储的规则之间的距离:
SELECT
text,
(embedding <=>
embedding('gemini-embedding-001',
'Restrictions for running a race on the Las Vegas strip')::vector)
as distance
FROM
rules
WHERE city = 'Las Vegas'
ORDER BY
distance ASC
LIMIT 5;
使用 RAG 工具以本地规则为依据来训练代理
如需使代理可以使用该工具,您必须在 planner_agent/tools.py 中定义该工具,然后在 planner_agent/agent.py 中注册该工具。我们将使用 Google Cloud 中的托管的远程 AlloyDB MCP 服务器来连接到我们的数据库。
- 使用“混合召回”模式在
planner_agent/tools.py中定义工具。我们将使用streamable_http协议连接到受管理的 AlloyDB MCP 服务器:
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def get_local_and_traffic_rules(query: str) -> str:
"""Uses vector search in AlloyDB via managed MCP server."""
# Vector search query using built-in AlloyDB embedding functions
sql = f"SELECT text FROM rules WHERE city = 'Las Vegas' ORDER BY embedding <=> google_ml.embedding('gemini-embedding-001', '{query}')::vector ASC LIMIT 5;"
# Establish a streamable HTTP connection to the MCP server
async with streamablehttp_client(url, headers=get_auth_headers()) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
result = await session.call_tool(
"execute_sql",
arguments={
"instance": full_instance_name,
"database": "city_rules",
"sqlStatement": sql
}
)
return "\n".join([c.text for c in result.content if hasattr(c, 'text')])
- 注册工具并最终确定
planner_agent/agent.py:
# ... imports ...
# Assemble the Agent
root_agent = LlmAgent(
name="planner_agent",
model="gemini-3-flash-preview",
instruction="You are a helpful marathon planning assistant...",
tools=[
get_local_and_traffic_rules,
],
after_agent_callback=[auto_save_memories],
)
# 2. Wrap the agent in an AdkApp to manage the stateful lifecycle
app = AdkApp(
agent=root_agent,
session_service_builder=session_service_builder,
memory_service_builder=memory_service_builder
)
9. 通过 Agent Skills 获得专家指导
Agent Skills 是独立的模块,可提供具体说明、指导和资源,帮助智能体更有效地执行任务。您无需在系统提示中添加针对每种工具的复杂指令,而是可以将这些专业知识封装到仅在需要时加载的技能中。
Google 提供了 Google 产品(如 AlloyDB 和 BigQuery)的预构建技能,以确保智能体遵循行业最佳实践来查询数据和管理资源。您可以前往 Google Skills Depot 探索这些模式和其他专业模式。您可以在此处找到 AlloyDB 基础技能。
1. 探索技能文件
打开位于 planner_agent/skills/get-local-and-traffic-rules/SKILL.md 的预配置技能文件。如下所示:
---
name: get-local-and-traffic-rules
description: Retrieve local rules and traffic information for a specific jurisdiction.
---
# get_local_and_traffic_rules Skill
This skill provides guidelines on how to effectively use the `get_local_and_traffic_rules` tool.
## Overview
The `get_local_and_traffic_rules` tool interfaces with an AlloyDB database to perform vector similarity searches on a corpus of rules and traffic information using a provided natural language query.
## Usage Guidelines
1. **Query Specificity**: When calling the tool, provide specific details in the `query` argument. For example, instead of querying "food rules", use "rules regarding food vendors during public events".
2. **Contextual Use**: Use the tool when planning events or activities that require adherence to local municipal or state rules (e.g., street closures, noise ordinances, environmental rules).
3. **Handling Results**: The tool returns a string containing the text of the top 5 most relevant rules. If no error occurs, parse the returned string to inform your planning tasks.
4. **Error Handling**: If an error string is returned (e.g., "Error querying rules: ..."), you must report this failure or attempt an alternative approach if applicable.
## Underlying Mechanism
- The tool uses `google_ml.embedding` to convert the query into a vector representation.
- It calculates distance (`<=>`) against the `embedding` column in the `rules` table on an AlloyDB instance.
- Results are fetched in descending order of similarity, limited to 5 results.
2. 技能的登记方式
在 planner_agent/agent.py 中,技能从目录加载并添加到代理的工具中。代码如下所示:
import pathlib
from google.adk.skills import load_skill_from_dir
from google.adk.tools import skill_toolset
# Load the AlloyDB skill from its directory
alloydb_skill = load_skill_from_dir(pathlib.Path(__file__).parent / "skills" / "get-local-and-traffic-rules")
# Assemble the Agent with the Skill Toolset
root_agent = LlmAgent(
name="planner_agent",
model="gemini-3-flash-preview",
instruction="You are a helpful marathon planning assistant...",
tools=[
get_local_and_traffic_rules,
skill_toolset.SkillToolset(skills=[alloydb_skill])
],
after_agent_callback=[auto_save_memories],
)
10. 测试代理
- 在本地启动代理:
uv run adk run planner_agent
- 询问有关城市规则的问题:
[user]: What are the rules for running a race on the Las Vegas strip?
该代理将调用 get_local_and_traffic_rules 工具,在 AlloyDB 中执行向量搜索,并根据 Spark 处理的官方规则块返回答案。
11. 部署代理
部署到 Agent Platform
uv run adk deploy agent_engine \ --env_file .env \ planner_agent
12. 清理
为避免持续产生费用,请删除在本 Codelab 中创建的资源。
删除 AlloyDB 集群
# Delete the AlloyDB Cluster gcloud alloydb clusters delete rules-db --region=us-central1 --force
删除代理运行时应用
您可以通过控制台或使用 gcloud 命令(如果您有资源名称)删除推理引擎实例。为简单起见,请使用控制台:
- 前往 Agent Runtime 页面。
- 选择
planner_agent–> 点击右侧的三点状按钮。 - 点击删除。
13. 恭喜
恭喜!您已成功使用高级记忆和数据接地功能增强了 ADK 智能体。
您学到的内容
- 有状态智能体:集成 Agent Platform 会话以保持对话上下文。
- 长期学习:附加 Agent Platform 记忆库,使代理能够从用户互动中学习。
- 数据提取:使用 Spark Lightning Engine 和 Document AI 处理非结构化文档。
- RAG:在 AlloyDB 中构建向量搜索系统,使代理能够遵循现实世界的规则。
后续步骤
- 如需详细了解受管理的部署,请参阅 Agent Platform 文档。
- 深入了解 AlloyDB Vector Search,以实现高级 RAG 模式。
- 使用 Dataproc Serverless 扩缩提取流水线。