Next ‘26 开发者主旨演讲:利用记忆功能增强智能体

1. 简介

在此 Codelab 中,您将通过添加持久化和专业知识,将 ADK 代理提升到新的水平。您将学习如何使用 Agent Platform Sessions 管理对话状态、使用 Memory Bank 实现长期学习,以及使用 Spark 和 AlloyDB for RAG(检索增强生成)集成复杂的城市规则数据。

您将执行的操作

  • 配置代理平台会话以实现对话持久性。
  • 实现记忆库,以便代理从之前的互动中学习。
  • 使用 Spark Lightning Engine 来提取和处理城市规则文档。
  • 使用 AlloyDB 和向量搜索构建 RAG 系统。
  • 将增强型代理部署到 Agent Platform。

所需条件

预计时长:60 分钟

在此 Codelab 中创建的资源费用应低于 5 美元。

2. 准备工作

创建 Google Cloud 项目

  1. Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

启动 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了必要的工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 连接到 Cloud Shell 后,验证您的身份验证:
    gcloud auth list
    
  3. 确认您的项目已配置:
    gcloud config get project
    
  4. 如果项目未按预期设置,请进行设置:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

验证身份验证:

gcloud auth list

确认您的项目:

gcloud config get project

根据需要进行设置:

export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID

启用 API

运行此命令可启用会话管理、Spark 处理和 AlloyDB 所需的所有 API:

gcloud services enable \
  aiplatform.googleapis.com \
  run.googleapis.com \
  alloydb.googleapis.com \
  dataproc.googleapis.com \
  documentai.googleapis.com \
  storage.googleapis.com \
  secretmanager.googleapis.com

3. 设置您的环境

在此 Codelab 中,您将使用 keynote 代码库中的预配置环境。

  1. 克隆代码库并前往项目文件夹:
git clone https://github.com/GoogleCloudPlatform/next-26-keynotes
cd next-26-keynotes/devkey/enhancing-agents-with-memory
  1. 设置 Python 虚拟环境并安装所需的 ADK 软件包:
uv venv
source .venv/bin/activate
uv sync

配置环境变量

代理需要进行特定配置才能连接到代理平台和 AlloyDB。

  1. 复制示例环境文件:
cp .env.example .env
  1. 打开 .env 并更新以下字段:
    • GOOGLE_CLOUD_PROJECT:您的项目 ID。
    • GOOGLE_CLOUD_LOCATIONus-central1
    • ALLOYDB_CLUSTER_IDrules-db
GOOGLE_CLOUD_PROJECT=<YOUR_PROJECT_ID>
GOOGLE_CLOUD_LOCATION=global
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_REGION=us-central1
ALLOYDB_CLUSTER_ID=rules-db
  1. 运行以下辅助脚本来创建用于对话会话和长期记忆的 Agent Engine 实例。这会自动填充 .env 文件中的 AGENT_ENGINE_ID
uv run utils/setup_agent_engine.py

成功后,您应该会看到:

Creating Agent Engine instance...
Successfully created Agent Engine. ID: 1234567890
Updated .env with AGENT_ENGINE_ID=1234567890

4. 使用会话管理创建代理

在此步骤中,您将初始化一个 Marathon Planner Agent,该代理可以跨多轮对话维护对话历史记录。这是通过使用 ADK App 类和代理平台会话来实现的。

初始化代理和服务会话

打开 planner_agent/agent.py。您将看到我们如何添加 ADK 类来集成代理平台会话。这样一来,我们就能让代理随着时间的推移保持有状态,并根据需要修改上下文。

from google.adk.agents import LlmAgent
from google.adk.sessions import VertexAiSessionService
from vertexai.agent_engines import AdkApp

PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
REGION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

# Initialize Vertex AI for regional services
if PROJECT_ID:
    vertexai.init(project=PROJECT_ID, location=REGION)

# Define the agent logic
root_agent = LlmAgent(
    name="planner_agent",
    model="gemini-3-flash-preview",
    instruction="You are a helpful marathon planning assistant...",
    tools=[] # We will add tools in the next steps
)

def session_service_builder():
    """Builder for Agent Platform Sessions."""
    return VertexAiSessionService(project=PROJECT_ID, location=REGION)

# Wrap the agent in an AdkApp to manage stateful context
app = AdkApp(
    agent=root_agent,
    session_service_builder=session_service_builder
)

5. 通过记忆库启用长期学习

虽然会话管理会跟踪单个对话,但您也可以对长期记忆执行相同的操作。在此步骤中,您将代理附加到 Agent Platform 的记忆库,这是一项企业级全托管式记忆服务。

初始化记忆库服务

记忆库可让智能体回忆起不同会话中的上下文信息。更新 planner_agent/agent.py 以包含内存服务:

from google.adk.memory import VertexAiMemoryBankService

def memory_service_builder():
    """Builder for Agent Platform Memory Bank."""
    return VertexAiMemoryBankService(
        project=PROJECT_ID,
        location=REGION,
        agent_engine_id=AGENT_ENGINE_ID
    )

实现自动内存注入

为了确保智能体从每个回合中学习,我们添加了 after_agent_callback。此函数在代理完成回答后触发,以便代理“消化”会话并将相关记忆保存到记忆库中。

  1. 定义回调函数:
async def auto_save_memories(callback_context):
    """Callback to ingest the session into the memory bank after the turn."""
    # In AdkApp, the memory service is available via the invocation context
    if hasattr(callback_context._invocation_context, 'memory_service') and callback_context._invocation_context.memory_service:
        await callback_context._invocation_context.memory_service.add_session_to_memory(
            callback_context._invocation_context.session
        )
  1. 将回调附加到 LlmAgent
root_agent = LlmAgent(
    # ... other params
    after_agent_callback=[auto_save_memories],
)

6. 设置 AlloyDB 以用于 RAG

在注入城市规则数据之前,我们需要一个高性能数据库来存储这些数据。在此步骤中,您将创建一个 AlloyDB 集群并初始化向量搜索的数据库架构。

1. 创建 AlloyDB 集群和主实例

在 Cloud Shell 中运行以下命令,以创建集群及其主实例:

# Create the cluster
gcloud alloydb clusters create rules-db \
  --password=postgres \
  --region=us-central1

# Create the primary instance with IAM authentication enabled
gcloud alloydb instances create rules-db-primary \
  --instance-type=PRIMARY \
  --cpu-count=2 \
  --region=us-central1 \
  --cluster=rules-db \
  --database-flags=alloydb.iam_authentication=on

2. 授予所需的 IAM 角色

如需使用托管式 AlloyDB MCP 服务器,您的身份需要具备特定权限。运行以下命令以授予所需角色:

export USER_EMAIL=$(gcloud config get-value account)

# Role to use MCP tools
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="user:$USER_EMAIL" \
  --role="roles/mcp.toolUser"

# Role to execute SQL in AlloyDB
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="user:$USER_EMAIL" \
  --role="roles/alloydb.admin"

# Role for IAM database authentication
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="user:$USER_EMAIL" \
  --role="roles/alloydb.databaseUser"

# Create the IAM-based database user
gcloud alloydb users create "$USER_EMAIL" \
  --cluster=rules-db \
  --region=us-central1 \
  --type=IAM_BASED

3. 通过 AlloyDB Studio 创建数据库和表

由于 AlloyDB 数据库和表是通过 SQL 进行管理的,因此我们将使用 Google Cloud 控制台中的 AlloyDB Studio 来完成架构。

  1. 前往 AlloyDB > 集群,然后点击 rules-db
  2. 在左侧导航菜单中,点击 AlloyDB Studio
  3. 使用 postgres 用户和您设置的密码 (postgres) 登录。
  4. 运行以下 SQL 以创建数据库:
    CREATE DATABASE city_rules;
    
  5. 在 AlloyDB Studio 中将数据库连接切换到 city_rules,然后运行以下 SQL 来安装扩展程序并创建 rules 表:
    -- Install extensions for vector search and ML
    CREATE EXTENSION IF NOT EXISTS vector;
    CREATE EXTENSION IF NOT EXISTS google_ml_integration CASCADE;
    
    -- Create the rules table
    CREATE TABLE IF NOT EXISTS rules (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        text TEXT NOT NULL,
        city TEXT NOT NULL,
        embedding vector(3072) DEFAULT NULL
    );
    
    -- Grant your IAM user access to the table (replace with your email)
    GRANT ALL PRIVILEGES ON TABLE rules TO "YOUR_EMAIL_ADDRESS";
    

7. 使用 Spark Lightning Engine 注入城市规则数据

为了提供真正准确的规划,智能体不仅需要精心设计的提示,还需要扎根于数据和组织背景信息。在此步骤中,您将使用 Dataproc Serverless 上的 Spark Lightning Engine 处理大型城市规则 PDF,并将它们提取到 AlloyDB 中。

为什么选择 Spark Lightning Engine?

大规模地为代理提供依据需要处理海量的非结构化数据。Spark Lightning Engine 是一款高性能 Spark 执行引擎,可显著加速这些工作负载。我们在此处使用它,通过 Google 的 Document AI 对文档执行语义分块

探索 Spark 流水线

提取逻辑在 spark-setup/spark_alloydb_processor.py 中定义。流水线遵循以下步骤:

  1. 列出 PDF:从 Google Cloud Storage 存储分区中检索文档 URI。
  2. 语义提取:使用 UDF(用户定义的函数)调用 Document AI API。
  3. 写入 AlloyDB:将提取的文本块保存到名为 rules 的 AlloyDB 表中。
# Extract from spark_alloydb_processor.py
def process_document(gcs_uri: str):
    # ... calls Document AI to parse PDF ...
    return chunks

# Parallel processing with Spark Lightning Engine
process_udf = udf(process_document, chunk_schema)
chunked_df = uri_df.withColumn("chunks", process_udf(col("gcs_uri"))) \
                   .select(explode(col("chunks")).alias("chunk")) \
                   .select("chunk.*")

# Save to AlloyDB for Vector Search
chunked_df.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "rules") \
    .mode("append") \
    .save()

运行注入作业

使用提供的脚本触发提取流程:

./spark-setup/run_dataproc.sh

8. 使用 AlloyDB 的 RAG

现在,城市规则数据已位于 AlloyDB 中,智能体可以使用这些数据执行检索增强生成 (RAG)。这样可确保马拉松计划遵循特定的城市代码。

AlloyDB 在 RAG 方面的强大功能

AlloyDB 在向量搜索方面表现出色,可让我们在同一位置存储结构化数据和向量嵌入。代理可以使用 AlloyDB 中的内置 embedding 函数来查找最相关的规则信息。

为了让代理能够访问这些数据,我们提供了一个使用向量相似性查询 AlloyDB 的工具。您可以在 hybrid_recall.sql 中看到此逻辑,该逻辑演示了如何计算查询与存储的规则之间的距离:

SELECT
    text,
    (embedding <=> 
     embedding('gemini-embedding-001', 
               'Restrictions for running a race on the Las Vegas strip')::vector) 
    as distance
FROM
    rules
WHERE city = 'Las Vegas'
ORDER BY
    distance ASC
LIMIT 5;

使用 RAG 工具以本地规则为依据来训练代理

如需使代理可以使用该工具,您必须在 planner_agent/tools.py 中定义该工具,然后在 planner_agent/agent.py 中注册该工具。我们将使用 Google Cloud 中的托管的远程 AlloyDB MCP 服务器来连接到我们的数据库。

  1. 使用“混合召回”模式在 planner_agent/tools.py 中定义工具。我们将使用 streamable_http 协议连接到受管理的 AlloyDB MCP 服务器:
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def get_local_and_traffic_rules(query: str) -> str:
    """Uses vector search in AlloyDB via managed MCP server."""
    # Vector search query using built-in AlloyDB embedding functions
    sql = f"SELECT text FROM rules WHERE city = 'Las Vegas' ORDER BY embedding <=> google_ml.embedding('gemini-embedding-001', '{query}')::vector ASC LIMIT 5;"
    
    # Establish a streamable HTTP connection to the MCP server
    async with streamablehttp_client(url, headers=get_auth_headers()) as (read_stream, write_stream, _):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            result = await session.call_tool(
                "execute_sql",
                arguments={
                    "instance": full_instance_name,
                    "database": "city_rules",
                    "sqlStatement": sql
                }
            )
            return "\n".join([c.text for c in result.content if hasattr(c, 'text')])
  1. 注册工具并最终确定 planner_agent/agent.py
# ... imports ...

# Assemble the Agent
root_agent = LlmAgent(
    name="planner_agent",
    model="gemini-3-flash-preview",
    instruction="You are a helpful marathon planning assistant...",
    tools=[
        get_local_and_traffic_rules,
    ],
    after_agent_callback=[auto_save_memories],
)

# 2. Wrap the agent in an AdkApp to manage the stateful lifecycle
app = AdkApp(
    agent=root_agent,
    session_service_builder=session_service_builder,
    memory_service_builder=memory_service_builder
)

9. 通过 Agent Skills 获得专家指导

Agent Skills 是独立的模块,可提供具体说明、指导和资源,帮助智能体更有效地执行任务。您无需在系统提示中添加针对每种工具的复杂指令,而是可以将这些专业知识封装到仅在需要时加载的技能中。

Google 提供了 Google 产品(如 AlloyDB 和 BigQuery)的预构建技能,以确保智能体遵循行业最佳实践来查询数据和管理资源。您可以前往 Google Skills Depot 探索这些模式和其他专业模式。您可以在此处找到 AlloyDB 基础技能。

1. 探索技能文件

打开位于 planner_agent/skills/get-local-and-traffic-rules/SKILL.md 的预配置技能文件。如下所示:

---
name: get-local-and-traffic-rules
description: Retrieve local rules and traffic information for a specific jurisdiction.
---
# get_local_and_traffic_rules Skill

This skill provides guidelines on how to effectively use the `get_local_and_traffic_rules` tool.

## Overview
The `get_local_and_traffic_rules` tool interfaces with an AlloyDB database to perform vector similarity searches on a corpus of rules and traffic information using a provided natural language query.

## Usage Guidelines
1. **Query Specificity**: When calling the tool, provide specific details in the `query` argument. For example, instead of querying "food rules", use "rules regarding food vendors during public events".
2. **Contextual Use**: Use the tool when planning events or activities that require adherence to local municipal or state rules (e.g., street closures, noise ordinances, environmental rules).
3. **Handling Results**: The tool returns a string containing the text of the top 5 most relevant rules. If no error occurs, parse the returned string to inform your planning tasks.
4. **Error Handling**: If an error string is returned (e.g., "Error querying rules: ..."), you must report this failure or attempt an alternative approach if applicable.

## Underlying Mechanism
- The tool uses `google_ml.embedding` to convert the query into a vector representation.
- It calculates distance (`<=>`) against the `embedding` column in the `rules` table on an AlloyDB instance.
- Results are fetched in descending order of similarity, limited to 5 results.

2. 技能的登记方式

planner_agent/agent.py 中,技能从目录加载并添加到代理的工具中。代码如下所示:

import pathlib
from google.adk.skills import load_skill_from_dir
from google.adk.tools import skill_toolset

# Load the AlloyDB skill from its directory
alloydb_skill = load_skill_from_dir(pathlib.Path(__file__).parent / "skills" / "get-local-and-traffic-rules")

# Assemble the Agent with the Skill Toolset
root_agent = LlmAgent(
    name="planner_agent",
    model="gemini-3-flash-preview",
    instruction="You are a helpful marathon planning assistant...",
    tools=[
        get_local_and_traffic_rules,
        skill_toolset.SkillToolset(skills=[alloydb_skill])
    ],
    after_agent_callback=[auto_save_memories],
)

10. 测试代理

  1. 在本地启动代理:
uv run adk run planner_agent
  1. 询问有关城市规则的问题:[user]: What are the rules for running a race on the Las Vegas strip?

该代理将调用 get_local_and_traffic_rules 工具,在 AlloyDB 中执行向量搜索,并根据 Spark 处理的官方规则块返回答案。

11. 部署代理

部署到 Agent Platform

uv run adk deploy agent_engine \
  --env_file .env \
  planner_agent

12. 清理

为避免持续产生费用,请删除在本 Codelab 中创建的资源。

删除 AlloyDB 集群

# Delete the AlloyDB Cluster
gcloud alloydb clusters delete rules-db --region=us-central1 --force

删除代理运行时应用

您可以通过控制台或使用 gcloud 命令(如果您有资源名称)删除推理引擎实例。为简单起见,请使用控制台:

  1. 前往 Agent Runtime 页面。
  2. 选择 planner_agent –> 点击右侧的三点状按钮。
  3. 点击删除

13. 恭喜

恭喜!您已成功使用高级记忆和数据接地功能增强了 ADK 智能体。

您学到的内容

  • 有状态智能体:集成 Agent Platform 会话以保持对话上下文。
  • 长期学习:附加 Agent Platform 记忆库,使代理能够从用户互动中学习。
  • 数据提取:使用 Spark Lightning EngineDocument AI 处理非结构化文档。
  • RAG:在 AlloyDB 中构建向量搜索系统,使代理能够遵循现实世界的规则。

后续步骤