🤖 使用 Graph RAG、ADK 和记忆库构建多模态 AI 代理

1. 简介

翻唱版

1. 面临的挑战

在灾难响应场景中,协调不同地点具有不同技能、资源和需求的幸存者需要智能数据管理和搜索功能。本研讨会将教您构建一个结合了以下要素的生产 AI 系统:

  1. 🗄️ 图数据库 (Spanner):存储幸存者、技能和资源之间的复杂关系
  2. 🔍 AI 赋能的搜索:使用嵌入的语义 + 关键字混合搜索
  3. 📸 多模态处理:从图片、文本和视频中提取结构化数据
  4. 🤖 多智能体编排:协调专用智能体以处理复杂的工作流
  5. 🧠 长期记忆:利用 Vertex AI Memory Bank 实现个性化

个会话

2. 构建内容

包含以下内容的幸存者网络图数据库

  • 🗺️ 幸存者关系的 3D 交互式图表可视化
  • 🔍 智能搜索(关键字、语义和混合)
  • 📸 多模态上传流水线(从图片/视频中提取实体)
  • 🤖 用于复杂任务编排的多智能体系统
  • 🧠 记忆库集成,打造个性化互动体验

3. 核心技术

组件

技术

用途

数据库

Cloud Spanner 图

存储节点(幸存者、技能)和边(关系)

AI Search

Gemini + Embeddings

语义理解 + 相似度搜索

代理框架

ADK(智能体开发套件)

编排 AI 工作流

内存

Vertex AI Memory Bank

长期用户偏好设置存储

前端

React + Three.js

交互式 3D 图形可视化

2. 环境准备(如果您正在参加研讨会,请跳过此步骤)

第 1 部分:启用结算账号

  • 您需要声明拥有 5 美元赠金的结算账号,以便进行部署。请务必使用您的 Gmail 账号。

第二部分:开放环境

  1. 👉 点击此链接可直接前往 Cloud Shell 编辑器
  2. 👉 如果系统在今天任何时间提示您进行授权,请点击授权继续。点击以授权 Cloud Shell
  3. 👉 如果终端未显示在屏幕底部,请打开它:
    • 点击查看
    • 点击终端在 Cloud Shell 编辑器中打开新终端
  4. 👉💻 在终端中,使用以下命令验证您是否已通过身份验证,以及项目是否已设置为您的项目 ID:
    gcloud auth list
    
  5. 👉💻 从 GitHub 克隆引导项目:
    git clone https://github.com/google-americas/way-back-home.git
    

3. 环境设置

1. 初始化

Cloud Shell 编辑器终端中,如果终端未显示在屏幕底部,请打开它:

  • 点击查看
  • 点击终端

在 Cloud Shell 编辑器中打开新终端

👉💻 在终端中,将初始化脚本设为可执行文件并运行该脚本:

cd ~/way-back-home/level_2
./init.sh

2. 配置项目

👉💻 设置项目 ID:

gcloud config set project $(cat ~/project_id.txt) --quiet

👉💻 启用必需的 API(这需要大约 2-3 分钟):

gcloud services enable compute.googleapis.com \
                       aiplatform.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       artifactregistry.googleapis.com \
                       spanner.googleapis.com \
                       storage.googleapis.com

3. 运行设置脚本

👉💻 执行设置脚本:

cd ~/way-back-home/level_2
./setup.sh

这样即可为您创建 .env。在 Cloud Shell 中,打开 way_back_home 项目。在 level_2 文件夹下,您会看到系统为您创建的 .env 文件。如果您找不到该图标,可以依次点击 View -> Toggle Hidden File 来查看。open_project

4. 加载示例数据

👉💻 导航到后端并安装依赖项:

cd ~/way-back-home/level_2/backend
uv sync

👉💻 加载初始幸存者数据:

uv run python ~/way-back-home/level_2/backend/setup_data.py

这会创建:

  • Spanner 实例 (survivor-network)
  • 数据库 (graph-db)
  • 所有节点和边缘表
  • 用于查询的属性图表预期输出
============================================================
SUCCESS! Database setup complete.
============================================================

Instance:  survivor-network
Database:  graph-db
Graph:     SurvivorGraph

Access your database at:
https://console.cloud.google.com/spanner/instances/survivor-network/databases/graph-db?project=waybackhome

如果您点击输出中 Access your database at 后面的链接,可以打开 Google Cloud 控制台 Spanner。

open_spanner

然后,您将在 Google Cloud 控制台中看到 Spanner

spanner

4. 在 Spanner Studio 中直观呈现图数据

本指南将帮助您使用 Spanner Studio 直接在 Google Cloud 控制台中直观呈现和互动操作幸存者网络图数据。在构建 AI 代理之前,这是验证数据和了解图结构的好方法。

1. 访问 Spanner Studio

  1. 在最后一步中,请务必点击链接并打开 Spanner Studio。

spanner_studio

2. 了解图结构(“大局观”)

不妨将 Survivor Network 数据集视为一个逻辑谜题或游戏状态

实体

系统中的角色

类比

幸存者

代理/玩家

玩家人数

生物群系

他们身在何处

地图区域

技能

执行的操作

特性

需要

他们缺乏什么(危机)

任务

资源

在现实世界中找到的物品

战利品

目标:AI 代理的任务是将技能(解决方案)与需求(问题)联系起来,同时考虑生物群落(位置限制)。

🔗 边(关系)

  • SurvivorInBiome:位置跟踪
  • SurvivorHasSkill:能力清单
  • SurvivorHasNeed:有效问题的列表
  • SurvivorFoundResource:商品库存
  • SurvivorCanHelp推理关系(由 AI 计算得出!)

3. 查询图

我们来运行几个查询,看看数据中的“故事”。

Spanner Graph 使用 GQL(Graph Query Language)。如需运行查询,请使用 GRAPH SurvivorNetwork,后跟匹配模式。

👉 查询 1:全球名册(谁在哪个位置?)这是基础知识 - 了解位置信息对于救援行动至关重要。

GRAPH SurvivorNetwork
MATCH result = (s:Survivors)-[:SurvivorInBiome]->(b:Biomes)
RETURN TO_JSON(result) AS json_result

预期会看到如下结果:query1

👉 查询 2:技能矩阵(能力)现在您已经知道每个人的位置,接下来可以了解他们能做什么

GRAPH SurvivorNetwork
MATCH result = (s:Survivors)-[h:SurvivorHasSkill]->(k:Skills)
RETURN TO_JSON(result) AS json_result

预期会看到如下结果:query2

👉 查询 3:谁处于危机之中?(“任务板”)查看需要帮助的幸存者以及他们需要什么。

GRAPH SurvivorNetwork
MATCH result = (s:Survivors)-[h:SurvivorHasNeed]->(n:Needs)
RETURN TO_JSON(result) AS json_result

预期会看到如下结果:query3

🔎 高级:配对 - 谁能帮助谁?

这时,图表就变得非常有用!此查询用于查找拥有可满足其他幸存者需求的技能的幸存者

GRAPH SurvivorNetwork
MATCH result = (helper:Survivors)-[:SurvivorHasSkill]->(skill:Skills)-[:SkillTreatsNeed]->(need:Needs)<-[:SurvivorHasNeed]-(helpee:Survivors)
RETURN TO_JSON(result) AS json_result

预期会看到如下结果:query4

aside positive 此查询的作用

此查询不会仅显示“急救处理烧伤”(这从架构中可以明显看出),而是会查找:

  • Elena Frost 医生(接受过医学培训)→ 可以治疗 → 田中队长(烧伤)
  • 陈大卫(有急救知识)→ 可以治疗 → 朴中尉(脚踝扭伤)

这种方法为何有效

AI 智能体将执行的操作

当用户询问“谁可以治疗烧伤?”时,智能体将:

  1. 运行类似的图表查询
  2. 返回:“弗罗斯特博士接受过医学培训,可以帮助田中队长”
  3. 用户无需了解中间表或关系!

5. Spanner 中已集成 AI 赋能的嵌入功能

1. 为什么要使用嵌入?(无操作,只读)

在生存场景中,时间至关重要。当幸存者报告 I need someone who can treat burnsLooking for a medic 等紧急情况时,他们不能浪费时间猜测数据库中的确切技能名称。

真实场景:幸存者:Captain Tanaka has burns—we need medical help NOW!

传统关键字搜索“medic”→ 0 条结果 ❌

使用嵌入执行语义搜索 → 找到“医疗培训”“急救”✅

这正是智能体需要的:智能、类人搜索,能够理解意图,而不仅仅是关键字。

2. 创建嵌入模型

spanner_embedding

现在,我们来创建一个使用 Google text-embedding-004 将文本转换为嵌入的模型。

👉 在 Spanner Studio 中,运行以下 SQL(将 $YOUR_PROJECT_ID 替换为您的实际项目 ID):

‼️ 在 Cloud Shell 编辑器中,依次打开 File -> Open Folder -> way-back-home/level_2,查看整个项目。

project_id

👉 在 Spanner Studio 中运行此查询,方法是复制并粘贴以下查询,然后点击“运行”按钮:

CREATE MODEL TextEmbeddings
INPUT(content STRING(MAX))
OUTPUT(embeddings STRUCT<values ARRAY<FLOAT32>>)
REMOTE OPTIONS (
    endpoint = '//aiplatform.googleapis.com/projects/$YOUR_PROJECT_ID/locations/us-central1/publishers/google/models/text-embedding-004'
);

此功能的作用

  • 在 Spanner 中创建虚拟模型(不在本地存储模型权重)
  • 指向 Vertex AI 上的 Google text-embedding-004
  • 定义合约:输入为文本,输出为 768 维浮点数组

为什么是“远程选项”?

  • Spanner 本身不运行模型
  • 当您使用 ML.PREDICT 时,它会通过 API 调用 Vertex AI
  • 零 ETL:无需将数据导出到 Python、进行处理并重新导入

点击 Run 按钮,成功后,您会看到如下结果:

spanner_result

3. 添加嵌入列

👉 添加一个用于存储嵌入的列:

ALTER TABLE Skills ADD COLUMN skill_embedding ARRAY<FLOAT32>;

点击 Run 按钮,成功后,您会看到如下结果:

embedding_result

4. 生成嵌入

👉 使用 AI 为每项技能创建向量嵌入:

UPDATE Skills
SET skill_embedding = (
    SELECT embeddings.values
    FROM ML.PREDICT(
        MODEL TextEmbeddings,
        (SELECT name AS content)
    )
)
WHERE skill_embedding IS NULL;

点击 Run 按钮,成功后,您会看到如下结果:

skills_result

发生的情况:每个技能名称(例如,“急救”) 转换为表示其语义含义的 768 维向量。

5. 验证嵌入内容

👉 检查是否已创建嵌入内容:

SELECT 
    skill_id,
    name,
    ARRAY_LENGTH(skill_embedding) AS embedding_dimensions
FROM Skills
LIMIT 5;

预期输出

spanner_result

现在,我们来测试一下场景中的确切使用情形:使用“医疗人员”一词查找医疗技能。

👉 查找与“医护人员”类似的技能:

WITH query_embedding AS (
    SELECT embeddings.values AS val
    FROM ML.PREDICT(MODEL TextEmbeddings, (SELECT "medic" AS content))
)
SELECT
    s.name AS skill_name,
    s.category,
    COSINE_DISTANCE(s.skill_embedding, (SELECT val FROM query_embedding)) AS distance
FROM Skills AS s
WHERE s.skill_embedding IS NOT NULL
ORDER BY distance ASC
LIMIT 10;
  • 将用户的搜索字词“medic”转换为嵌入
  • 将其存储在 query_embedding 临时表中

预期结果(距离越小,相似度越高):

spanner_result

7. 创建用于分析的 Gemini 模型

spanner_gemini

👉 创建生成式 AI 模型引用(将 $YOUR_PROJECT_ID 替换为您的实际项目 ID):

CREATE MODEL GeminiPro
INPUT(prompt STRING(MAX))
OUTPUT(content STRING(MAX))
REMOTE OPTIONS (
    endpoint = '//aiplatform.googleapis.com/projects/$YOUR_PROJECT_ID/locations/us-central1/publishers/google/models/gemini-2.5-pro',
    default_batch_size = 1
);

与嵌入模型的区别

  • 嵌入:文本 → 向量(用于相似性搜索)
  • Gemini:文本 → 生成的文本(用于推理/分析)

spanner_result

8. 使用 Gemini 进行兼容性分析

👉 分析幸存者配对的任务兼容性:

WITH PairData AS (
    SELECT
        s1.name AS Name_A,
        s2.name AS Name_B,
        CONCAT(
            "Assess compatibility of these two survivors for a resource-gathering mission. ",
            "Survivor 1: ", s1.name, ". ",
            "Survivor 2: ", s2.name, ". ",
            "Give a score from 1-10 and a 1-sentence reason."
        ) AS prompt
    FROM Survivors s1
    JOIN Survivors s2 ON s1.survivor_id < s2.survivor_id
    LIMIT 1
)
SELECT
    Name_A,
    Name_B,
    content AS ai_assessment
FROM ML.PREDICT(
    MODEL GeminiPro,
    (SELECT Name_A, Name_B, prompt FROM PairData)
);

预期输出

Name_A          | Name_B            | ai_assessment
----------------|-------------------|----------------
"David Chen"    | "Dr. Elena Frost" | "**Score: 9/10** Their compatibility is extremely high as David's practical, hands-on scavenging skills are perfectly complemented by Dr. Frost's specialized knowledge to identify critical medical supplies and avoid biological hazards."

6. 使用混合搜索构建图 RAG 代理

1. 系统架构概览

本部分将构建一个多方法搜索系统,让您的代理能够灵活地处理不同类型的查询。该系统有三个层:代理层工具层服务层

architecture_hybrid_search

为什么要使用三层?

  • 关注点分离:代理专注于意图,工具专注于界面,服务专注于实现
  • 灵活性:代理可以强制使用特定方法,也可以让 AI 自动路由
  • 优化:如果方法已知,可以跳过昂贵的 AI 分析

在本部分中,您将主要实现语义搜索 (RAG) - 根据含义(而非仅依据关键字)来查找结果。稍后,我们将介绍混合搜索如何融合多种方法。

2. RAG 服务实现

👉💻 在终端中,运行以下命令,在 Cloud Shell Editor 中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/services/hybrid_search_service.py

找到相应评论 # TODO: REPLACE_SQL

将整行代码替换为以下代码:

        # This is your working query from the successful run!
        sql = """
            WITH query_embedding AS (
                SELECT embeddings.values AS val
                FROM ML.PREDICT(
                    MODEL TextEmbeddings,
                    (SELECT @query AS content)
                )
            )
            SELECT
                s.survivor_id,
                s.name AS survivor_name,
                s.biome,
                sk.skill_id,
                sk.name AS skill_name,
                sk.category,
                COSINE_DISTANCE(
                    sk.skill_embedding, 
                    (SELECT val FROM query_embedding)
                ) AS distance
            FROM Survivors s
            JOIN SurvivorHasSkill shs ON s.survivor_id = shs.survivor_id
            JOIN Skills sk ON shs.skill_id = sk.skill_id
            WHERE sk.skill_embedding IS NOT NULL
            ORDER BY distance ASC
            LIMIT @limit
        """

3. 语义搜索工具定义

👉💻 在终端中,运行以下命令,在 Cloud Shell Editor 中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/agent/tools/hybrid_search_tools.py

hybrid_search_tools.py 中,找到评论 # TODO: REPLACE_SEMANTIC_SEARCH_TOOL

👉将整行代码替换为以下代码:

async def semantic_search(query: str, limit: int = 10) -> str:
    """
    Force semantic (RAG) search using embeddings.
    
    Use this when you specifically want to find things by MEANING,
    not just matching keywords. Great for:
    - Finding conceptually similar items
    - Handling vague or abstract queries
    - When exact terms are unknown
    
    Example: "healing abilities" will find "first aid", "surgery", 
    "herbalism" even though no keywords match exactly.
    
    Args:
        query: What you're looking for (describe the concept)
        limit: Maximum results
        
    Returns:
        Semantically similar results ranked by relevance
    """
    try:
        service = _get_service()
        result = service.smart_search(
            query, 
            force_method=SearchMethod.RAG,
            limit=limit
        )
        
        return _format_results(
            result["results"],
            result["analysis"],
            show_analysis=True
        )
        
    except Exception as e:
        return f"Error in semantic search: {str(e)}"

代理使用情况

  • 要求查找相似内容(“查找与 X 类似的内容”)的查询
  • 概念性查询(“修复能力”)
  • 当理解含义至关重要时

4. 代理决策指南(说明)

在代理定义中,将与语义搜索相关的部分复制粘贴到指令中。

👉💻 在终端中,运行以下命令,在 Cloud Shell Editor 中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/agent/agent.py

智能体使用此指令来选择合适的工具:

👉在 agent.py 文件中,找到注释 # TODO: REPLACE_SEARCH_LOGIC将整行替换为以下代码:

- `semantic_search`: Force RAG/embedding search
  Use for: "Find similar to X", conceptual queries, unknown terminology
  Example: "Find skills related to healing"

👉找到注释 # TODO: ADD_SEARCH_TOOLReplace this whole line,将其替换为以下代码:

    semantic_search,         # Force RAG

5. 了解混合搜索的运作方式(只读,无需采取任何行动)

在第 2-4 步中,您实现了语义搜索 (RAG),这是一种通过含义查找结果的核心搜索方法。不过,您可能已经注意到,该系统被称为“混合搜索”。以下是整个流程的运作方式:

混合合并的工作原理

在文件 way-back-home/level_2/backend/services/hybrid_search_service.py 中,当调用 hybrid_search() 时,服务会同时运行这两个搜索并合并结果:

# Location: backend/services/hybrid_search_service.py

    rank_kw = keyword_ranks.get(surv_id, float('inf'))
    rank_rag = rag_ranks.get(surv_id, float('inf'))

    rrf_score = 0.0
    if rank_kw != float('inf'):
        rrf_score += 1.0 / (K + rank_kw)
    if rank_rag != float('inf'):
        rrf_score += 1.0 / (K + rank_rag)

    combined_score = rrf_score

在本 Codelab 中,您实现了作为基础的语义搜索组件 (RAG)。关键字方法和混合方法已在服务中实现,您的代理可以使用这三种方法!

恭喜!您已成功完成具有混合搜索功能的 Graph RAG 代理!

7. 使用 ADK Web 测试智能体

测试智能体最简单的方法是使用 adk web 命令,该命令会启动您的智能体并显示内置的聊天界面。

1. 运行代理

👉💻 前往后端目录(您的代理定义所在的位置),然后启动 Web 界面:

cd ~/way-back-home/level_2/backend
uv run adk web

此命令会启动 中定义的代理

agent/agent.py

并打开用于测试的网页界面。

👉 打开网址:

该命令将输出一个本地网址(通常为 http://127.0.0.1:8000 或类似网址)。在浏览器中打开此链接。

adk web

点击网址后,您会看到 ADK Web 界面。确保您从左上角选择了“代理”。

adk_ui

2. 测试搜索功能

该代理旨在智能地路由您的查询。在聊天窗口中尝试输入以下内容,看看不同的搜索方法是如何运作的。

根据含义和概念查找商品,即使关键字不匹配也是如此。

测试查询:(选择以下任意一项)

Who can help with injuries?
What abilities are related to survival?

需要注意的事项

  • 推理应提及语义RAG 搜索。
  • 您应该会看到概念上相关的结果(例如,“手术”)。
  • 结果将带有 🧬 图标。

将关键字过滤条件与语义理解相结合,以处理复杂的查询。

测试查询:(选择以下任意一项)

Find someone who can ply a plane in the volcanic area
Who has healing abilities in the FOSSILIZED?
Who has healing abilities in the mountains?

需要注意的事项

  • 推理应提及混合搜索。
  • 结果应同时符合这两个条件(概念 + 位置/类别)。
  • 通过这两种方法找到的结果会带有 🔀 图标,并且排名最高。

👉💻 完成测试后,请在命令行中按 Ctrl+C 结束该进程。

8. 运行完整应用

全栈架构概览

architecture_fullstack

添加了 SessionService 和 Runner

👉💻 在终端中,运行以下命令,在 Cloud Shell 编辑器中打开文件 chat.py(请务必先按“Ctrl+C”结束上一个进程,然后再继续):

cloudshell edit ~/way-back-home/level_2/backend/api/routes/chat.py

👉在 chat.py 文件中,找到注释 # TODO: REPLACE_INMEMORY_SERVICES将整行替换为以下代码:

    session_service = InMemorySessionService()
    memory_service = InMemoryMemoryService()

👉在 chat.py 文件中,找到注释 # TODO: REPLACE_RUNNER将整行替换为以下代码:

runner = Runner(
    agent=root_agent, 
    session_service=session_service,
    memory_service=memory_service,
    app_name="survivor-network"
)

1. 开始申请

如果之前的终端仍在运行,请按 Ctrl+C 结束该终端。

👉💻 启动应用:

cd ~/way-back-home/level_2/
./start_app.sh

当后端成功启动时,您会看到如下所示的 Local: http://localhost:5173/"fronted

👉 在终端中点击 Local: http://localhost:5173/

个会话

查询

Find skills similar to healing

聊天

发生的情况

  • 代理识别相似性请求
  • 为“healing”生成嵌入
  • 使用余弦距离查找语义相似的技能
  • 返回:急救(即使名称与“治疗”不匹配)

查询

Find medical skills in the mountains

发生的情况

  1. “关键字”组件:过滤出 category='medical'
  2. 语义组件:嵌入“医疗”并按相似度进行排名
  3. 合并:合并结果,优先显示通过两种方法找到的结果 🔀

查询(可选)

Who is good at survival and in the forest?

发生的情况

  • 关键字查找次数:biome='forest'
  • 语义查找:与“生存”类似的技能
  • 混合搜索兼具两者优势,可提供最佳结果

👉💻 测试完成后,在终端中按 Ctrl+C 结束测试。

9. 多模态流水线 - 工具层

为什么需要多模态流水线?

生存网络不仅仅是文字。现场的幸存者直接通过聊天发送非结构化数据

  • 📸 图片:资源、危险或设备的照片
  • 🎥 视频:状态报告或 SOS 广播
  • 📄 文本:实地记录或日志

我们正在处理哪些文件?

与上一步中搜索现有数据不同,这一步中我们会处理用户上传的文件chat.py 接口可动态处理文件附件:

来源

内容

目标

用户附件

图片/视频/文字

要添加到图表中的信息

聊天上下文

“这是用品的照片”

意图和其他详细信息

计划方法:顺序智能体流水线

我们使用将专业代理串联在一起的序列代理 (multimedia_agent.py):

architecture_uploading

此项在 backend/agent/multimedia_agent.py 中定义为 SequentialAgent

工具层提供智能体可以调用的功能。工具负责处理“如何”操作,即上传文件、提取实体并保存到数据库。

1. 打开工具文件

👉💻 打开终端。在终端中,在 Cloud Shell Editor 中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/agent/tools/extraction_tools.py

2. 实现 upload_media 工具

此工具可将本地文件上传到 Google Cloud Storage。

👉 在 extraction_tools.py 中,找到评论 pass # TODO: REPLACE_UPLOAD_MEDIA_FUNCTION

将整行代码替换为以下代码:

    """
    Upload media file to GCS and detect its type.
    
    Args:
        file_path: Path to the local file
        survivor_id: Optional survivor ID to associate with upload
        
    Returns:
        Dict with gcs_uri, media_type, and status
    """
    try:
        if not file_path:
            return {"status": "error", "error": "No file path provided"}
        
        # Strip quotes if present
        file_path = file_path.strip().strip("'").strip('"')
        
        if not os.path.exists(file_path):
            return {"status": "error", "error": f"File not found: {file_path}"}
        
        gcs_uri, media_type, signed_url = gcs_service.upload_file(file_path, survivor_id)
        
        return {
            "status": "success",
            "gcs_uri": gcs_uri,
            "signed_url": signed_url,
            "media_type": media_type.value,
            "file_name": os.path.basename(file_path),
            "survivor_id": survivor_id
        }
    except Exception as e:
        logger.error(f"Upload failed: {e}")
        return {"status": "error", "error": str(e)}

3. 实现 extract_from_media 工具

此工具是一个路由器,用于检查 media_type 并调度到正确的提取器(文本、图片或视频)。

👉在 extraction_tools.py 中,找到评论 pass # TODO: REPLACE_EXTRACT_FROM_MEDIA

将整行代码替换为以下代码:

    """
    Extract entities and relationships from uploaded media.
    
    Args:
        gcs_uri: GCS URI of the uploaded file
        media_type: Type of media (text/image/video)
        signed_url: Optional signed URL for public/temporary access
        
    Returns:
        Dict with extraction results
    """
    try:
        if not gcs_uri:
             return {"status": "error", "error": "No GCS URI provided"}

        # Select appropriate extractor
        if media_type == MediaType.TEXT.value or media_type == "text":
            result = await text_extractor.extract(gcs_uri)
        elif media_type == MediaType.IMAGE.value or media_type == "image":
            result = await image_extractor.extract(gcs_uri)
        elif media_type == MediaType.VIDEO.value or media_type == "video":
            result = await video_extractor.extract(gcs_uri)
        else:
            return {"status": "error", "error": f"Unsupported media type: {media_type}"}
            
        # Inject signed URL into broadcast info if present
        if signed_url:
            if not result.broadcast_info:
                result.broadcast_info = {}
            result.broadcast_info['thumbnail_url'] = signed_url
        
        return {
            "status": "success",
            "extraction_result": result.to_dict(), # Return valid JSON dict instead of object
            "summary": result.summary,
            "entities_count": len(result.entities),
            "relationships_count": len(result.relationships),
            "entities": [e.to_dict() for e in result.entities],
            "relationships": [r.to_dict() for r in result.relationships]
        }
    except Exception as e:
        logger.error(f"Extraction failed: {e}")
        return {"status": "error", "error": str(e)}

关键实现细节

  • 多模态输入:我们将文本提示 (_get_extraction_prompt()) 和图片对象都传递给 generate_content
  • 结构化输出response_mime_type="application/json" 可确保 LLM 返回有效的 JSON,这对于流水线至关重要。
  • 视觉实体链接:提示包含已知实体,以便 Gemini 识别特定字符。

4. 实现 save_to_spanner 工具

此工具会将提取的实体和关系持久保存到 Spanner Graph 数据库。

👉在 extraction_tools.py 中,找到评论 pass # TODO: REPLACE_SPANNER_AGENT

将整行代码替换为以下代码:

    """
    Save extracted entities and relationships to Spanner Graph DB.
    
    Args:
        extraction_result: ExtractionResult object (or dict from previous step if passed as dict)
        survivor_id: Optional survivor ID to associate with the broadcast
        
    Returns:
        Dict with save statistics
    """
    try:
        # Handle if extraction_result is passed as the wrapper dict from extract_from_media
        result_obj = extraction_result
        if isinstance(extraction_result, dict) and 'extraction_result' in extraction_result:
             result_obj = extraction_result['extraction_result']
        
        # If result_obj is a dict (from to_dict()), reconstruct it
        if isinstance(result_obj, dict):
            from extractors.base_extractor import ExtractionResult
            result_obj = ExtractionResult.from_dict(result_obj)
        
        if not result_obj:
            return {"status": "error", "error": "No extraction result provided"}
            
        stats = spanner_service.save_extraction_result(result_obj, survivor_id)
        
        return {
            "status": "success",
            "entities_created": stats['entities_created'],
            "entities_existing": stats['entities_found_existing'],
            "relationships_created": stats['relationships_created'],
            "broadcast_id": stats['broadcast_id'],
            "errors": stats['errors'] if stats['errors'] else None
        }
    except Exception as e:
        logger.error(f"Spanner save failed: {e}")
        return {"status": "error", "error": str(e)}

通过为代理提供高级工具,我们可在利用代理的推理能力的同时确保数据完整性

5. 更新 GCS 服务

GCSService 负责将文件实际上传到 Google Cloud Storage。

👉💻 在终端中,在 Cloud Shell Editor 中打开相应文件:

cloudshell edit ~/way-back-home/level_2/backend/services/gcs_service.py

👉 在文件 gcs_service.py 中,找到 upload_file 函数内的注释 # TODO: REPLACE_SAVE_TO_GCS

将整行代码替换为以下代码:

        blob = self.bucket.blob(blob_name)
        blob.upload_from_filename(file_path)

通过将此功能抽象为服务,代理无需了解 GCS 存储分区、Blob 名称或签名网址生成。它只会要求“上传”。

6. (只读)为什么选择 Agentic 工作流 > 传统方法?

智能体优势

功能

批处理流水线

事件驱动型

智能体工作流

复杂性

低(1 个脚本)

高(5 项以上服务)

(1 个 Python 文件:multimedia_agent.py

状态管理

全局变量

硬性(解耦)

统一(代理状态)

错误处理

崩溃

静默日志

互动式(“我无法读取该文件”)

用户反馈

控制台打印

需要轮询

立即(聊天的一部分)

自适应

修复了逻辑

刚性函数

智能(由 LLM 决定下一步)

情境感知

完整(了解用户意图)

重要性:通过使用 multimedia_agent.py(包含 4 个子代理的 SequentialAgent:上传 → 提取 → 保存 → 总结),我们用智能对话式应用逻辑取代了复杂的基础设施和脆弱的脚本。

10. 多模态流水线 - 代理层

智能体层定义了智能,即使用工具来完成任务的智能体。每个代理都有特定的角色,并将上下文传递给下一个代理。下图是多代理系统的架构图。

agent_diagram

1. 打开代理文件

👉💻 在终端中,在 Cloud Shell Editor 中打开相应文件:

cloudshell edit ~/way-back-home/level_2/backend/agent/multimedia_agent.py

2. 定义上传代理

此代理会从用户消息中提取文件路径,并将其上传到 GCS。

👉在 multimedia_agent.py 文件中,找到注释 # TODO: REPLACE_UPLOAD_AGENT

将整行代码替换为以下代码:

upload_agent = LlmAgent(
    name="UploadAgent",
    model="gemini-2.5-flash",
    instruction="""Extract the file path from the user's message and upload it.

Use `upload_media(file_path, survivor_id)` to upload the file.
The survivor_id is optional - include it if the user mentions a specific survivor (e.g., "survivor Sarah" -> "Sarah").
If the user provides a path like "/path/to/file", use that.

Return the upload result with gcs_uri and media_type.""",
    tools=[upload_media],
    output_key="upload_result"
)

3. 定义提取代理

此代理会“看到”上传的媒体,并使用 Gemini Vision 提取结构化数据。

👉在 multimedia_agent.py 文件中,找到注释 # TODO: REPLACE_EXTRACT_AGENT

将整行代码替换为以下代码:

extraction_agent = LlmAgent(
    name="ExtractionAgent", 
    model="gemini-2.5-flash",
    instruction="""Extract information from the uploaded media.

Previous step result: {upload_result}

Use `extract_from_media(gcs_uri, media_type, signed_url)` with the values from the upload result.
The gcs_uri is in upload_result['gcs_uri'], media_type in upload_result['media_type'], and signed_url in upload_result['signed_url'].

Return the extraction results including entities and relationships found.""",
    tools=[extract_from_media],
    output_key="extraction_result"
)

请注意 instruction 如何引用 {upload_result} - 这就是 ADK 中智能体之间传递状态的方式。

4. 定义 Spanner 代理

此代理会将提取的实体和关系保存到图数据库中。

👉在 multimedia_agent.py 文件中,找到注释 # TODO: REPLACE_SPANNER_AGENT

将整行代码替换为以下代码:

spanner_agent = LlmAgent(
    name="SpannerAgent",
    model="gemini-2.5-flash", 
    instruction="""Save the extracted information to the database.

Upload result: {upload_result}
Extraction result: {extraction_result}

Use `save_to_spanner(extraction_result, survivor_id)` to save to Spanner.
Pass the WHOLE `extraction_result` object/dict from the previous step.
Include survivor_id if it was provided in the upload step.

Return the save statistics.""",
    tools=[save_to_spanner],
    output_key="spanner_result"
)

此代理会从之前的两个步骤(upload_resultextraction_result)接收上下文。

5. 定义总结代理

此代理会将之前所有步骤的结果合成为用户友好的回答。

👉在 multimedia_agent.py 文件中,找到注释 summary_instruction="" # TODO: REPLACE_SUMMARY_AGENT_PROMPT

将整行代码替换为以下代码:

USE_MEMORY_BANK = os.getenv("USE_MEMORY_BANK", "false").lower() == "true"
save_msg = "6. Mention that the data is also being synced to the memory bank." if USE_MEMORY_BANK else ""

summary_instruction = f"""Provide a user-friendly summary of the media processing.

Upload: {{upload_result}}
Extraction: {{extraction_result}}
Database: {{spanner_result}}

Summarize:
1. What file was processed (name and type)
2. Key information extracted (survivors, skills, needs, resources found) - list names and counts
3. Relationships identified
4. What was saved to the database (broadcast ID, number of entities)
5. Any issues encountered
{save_msg}

Be concise but informative."""

此代理不需要工具,只需读取共享的上下文,然后为用户生成简洁的摘要。

🧠 架构摘要

图层

文件

责任

工具

extraction_tools.py + gcs_service.py

操作方法 - 上传、提取、保存

Agent

multimedia_agent.py

内容 - 编排流水线

11. 多模态数据流水线 - 编排

新系统的核心是 backend/agent/multimedia_agent.py 中定义的 MultimediaExtractionPipeline。它使用 ADK(智能体开发套件)中的顺序智能体模式。

1. 为什么选择顺序执行?

处理上传内容是一个线性依赖链:

  1. 在拥有文件(上传)之前,您无法提取数据。
  2. 您必须先提取数据(提取),然后才能保存数据。
  3. 您必须先获得结果(保存),然后才能进行总结。

SequentialAgent 非常适合此用途。它将一个代理的输出作为上下文/输入传递给下一个代理。

2. 代理定义

我们来看看 multimedia_agent.py 底部的流水线是如何组装的:👉💻 在终端中,运行以下命令,在 Cloud Shell 编辑器中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/agent/multimedia_agent.py

它会接收前两个步骤的输入。找到注释 # TODO: REPLACE_ORCHESTRATION将整行代码替换为以下代码:

    sub_agents=[upload_agent, extraction_agent, spanner_agent, summary_agent]

3. 与根代理建立连接

👉💻 在终端中,运行以下命令,在 Cloud Shell Editor 中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/agent/agent.py

找到注释 # TODO: REPLACE_ADD_SUBAGENT将整行代码替换为以下代码:

    sub_agents=[multimedia_agent],

此单个对象实际上将四个“专家”捆绑到一个可调用实体中。

4. 代理之间的数据流

每个代理都会将其输出存储在共享的上下文中,后续代理可以访问该上下文:

architecture_uploading

5. 打开应用(如果应用仍在运行,请跳过此步骤)

👉💻 启动应用:

cd ~/way-back-home/level_2/
./start_app.sh

👉 在终端中点击 Local: http://localhost:5173/

6. 测试图片上传

👉 在聊天界面中,选择此处的任意照片并上传到界面:

在聊天界面中,向代理说明您的具体情况:

Here is the survivor note

然后在此处附上图片。

upload_input

upload_result

👉💻 在终端中,测试完成后,按“Ctrl+C”结束进程。

6. 验证 GCS 存储分区中的多模态上传

gcs

  • 选择您的存储分区,然后点击 media

媒体

  • 您上传的图片会显示在此处。uploaded_img

7. 验证 Spanner 中的多模态上传功能(可选)

以下是 test_photo1 在界面中的输出示例。

  • 打开 Google Cloud 控制台 Spanner
  • 选择您的实例:Survivor Network
  • 选择数据库:graph-db
  • 在左侧边栏中,点击 Spanner Studio

👉 在 Spanner Studio 中,查询新数据:

SELECT 
  s.name AS Survivor,
  s.role AS Role,
  b.name AS Biome,
  r.name AS FoundResource,
  s.created_at
FROM Survivors s
LEFT JOIN SurvivorInBiome sib ON s.survivor_id = sib.survivor_id
LEFT JOIN Biomes b ON sib.biome_id = b.biome_id
LEFT JOIN SurvivorFoundResource sfr ON s.survivor_id = sfr.survivor_id
LEFT JOIN Resources r ON sfr.resource_id = r.resource_id
ORDER BY s.created_at DESC;

我们可以通过查看以下结果来验证这一点:

spanner_verify

12. 搭配 Agent Engine 使用的记忆库

1. 记忆的运作方式

该系统采用双重记忆方法来处理即时上下文和长期学习。

memory_bank

2. 什么是记忆主题?

记忆主题用于定义代理应在对话中记住的信息类别。您可以将它们视为用于存放不同类型用户偏好的文件柜。

我们的 2 个主题

  1. search_preferences:用户喜欢的搜索方式
    • 他们更喜欢关键字搜索还是语义搜索?
    • 他们经常搜索哪些技能/群系?
    • 示例记忆:“用户偏好使用语义搜索来查找医疗技能”
  2. urgent_needs_context:他们正在跟踪哪些危机
    • 他们监控哪些资源?
    • 他们担心哪些幸存者?
    • 示例记忆:“用户正在跟踪北部营地的药品短缺情况”

3. 设置记忆主题

自定义记忆主题用于定义代理应记住哪些内容。这些是在部署 Agent Engine 时配置的。

👉💻 在终端中,运行以下命令,在 Cloud Shell Editor 中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/deploy_agent.py

这会在编辑器中打开 ~/way-back-home/level_2/backend/deploy_agent.py

我们定义了结构 MemoryTopic 对象,以指导 LLM 提取和保存哪些信息。

👉在文件 deploy_agent.py 中,将 # TODO: SET_UP_TOPIC 替换为以下内容:

# backend/deploy_agent.py

    custom_topics = [
        # Topic 1: Survivor Search Preferences
        MemoryTopic(
            custom_memory_topic=CustomMemoryTopic(
                label="search_preferences",
                description="""Extract the user's preferences for how they search for survivors. Include:
                - Preferred search methods (keyword, semantic, direct lookup)
                - Common filters used (biome, role, status)
                - Specific skills they value or frequently look for
                - Geographic areas of interest (e.g., "forest biome", "mountain outpost")
                
                Example: "User prefers semantic search for finding similar skills."
                Example: "User frequently checks for survivors in the Swamp Biome."
                """,
            )
        ),
        # Topic 2: Urgent Needs Context
        MemoryTopic(
            custom_memory_topic=CustomMemoryTopic(
                label="urgent_needs_context",
                description="""Track the user's focus on urgent needs and resource shortages. Include:
                - Specific resources they are monitoring (food, medicine, ammo)
                - Critical situations they are tracking
                - Survivors they are particularly concerned about
                
                Example: "User is monitoring the medicine shortage in the Northern Camp."
                Example: "User is looking for a doctor for the injured survivors."
                """,
            )
        )
    ]

4. 代理集成

代理代码必须知道记忆库,才能保存和检索信息。

👉💻 在终端中,运行以下命令,在 Cloud Shell Editor 中打开该文件:

cloudshell edit ~/way-back-home/level_2/backend/agent/agent.py

这会在编辑器中打开 ~/way-back-home/level_2/backend/agent/agent.py

代理创建

创建代理时,我们传递了 after_agent_callback,以确保会话在互动后保存到内存中。add_session_to_memory 函数以异步方式运行,以避免减慢聊天响应速度。

👉在 agent.py 文件中,找到注释 # TODO: REPLACE_ADD_SESSION_MEMORY将整行替换为以下代码:

async def add_session_to_memory(
        callback_context: CallbackContext
) -> Optional[types.Content]:
    """Automatically save completed sessions to memory bank in the background"""
    if hasattr(callback_context, "_invocation_context"):
        invocation_context = callback_context._invocation_context
        if invocation_context.memory_service:
            # Use create_task to run this in the background without blocking the response
            asyncio.create_task(
                invocation_context.memory_service.add_session_to_memory(
                    invocation_context.session
                )
            )
            logger.info("Scheduled session save to memory bank in background")

后台保存

👉在 agent.py 文件中,找到注释 # TODO: REPLACE_ADD_MEMORY_BANK_TOOL将整行替换为以下代码:

if USE_MEMORY_BANK:
    agent_tools.append(PreloadMemoryTool())

👉在 agent.py 文件中,找到注释 # TODO: REPLACE_ADD_CALLBACK将整行替换为以下代码:

    after_agent_callback=add_session_to_memory if USE_MEMORY_BANK else None

设置 Vertex AI 会话服务

👉💻 在终端中,运行以下命令,在 Cloud Shell Editor 中打开文件 chat.py

cloudshell edit ~/way-back-home/level_2/backend/api/routes/chat.py

👉在 chat.py 文件中,找到注释 # TODO: REPLACE_VERTEXAI_SERVICES将整行替换为以下代码:

    session_service = VertexAiSessionService(
        project=project_id,
        location=location,
        agent_engine_id=agent_engine_id
    )
    memory_service = VertexAiMemoryBankService(
        project=project_id,
        location=location,
        agent_engine_id=agent_engine_id
    )

4. 设置和部署

在测试记忆功能之前,您需要部署包含新记忆主题的代理,并确保环境配置正确。

我们提供了一个便捷脚本来处理此流程。

运行部署脚本

👉💻 在终端中,运行部署脚本:

cd ~/way-back-home/level_2
./deploy_and_update_env.sh

此脚本会执行以下操作:

  • 运行 backend/deploy_agent.py 以向 Vertex AI 注册代理和记忆主题。
  • 捕获新的代理引擎 ID
  • 使用 AGENT_ENGINE_ID 自动更新 .env 文件。
  • 确保在 .env 文件中设置了 USE_MEMORY_BANK=TRUE

[!IMPORTANT] 如果您在 deploy_agent.py 中更改了 custom_topics,则必须重新运行此脚本以更新代理引擎。

13. 使用多模态数据验证记忆库

您可以教智能体一项偏好设置,然后检查该偏好设置是否在会话之间保持不变,从而验证记忆库是否正常运行。

1. 打开应用(如果应用已在运行,请跳过此步骤)

按照以下说明再次打开应用:如果之前的终端仍在运行,请按 Ctrls+C 结束该终端。

👉💻 启动应用:

cd ~/way-back-home/level_2/
./start_app.sh

👉 在终端中点击 Local: http://localhost:5173/

2. 使用文本测试记忆库

在聊天界面中,向代理说明您的具体情况:

"I'm planning a medical rescue mission in the mountains. I need survivors with first aid and climbing skills."

👉 等待约 30 秒,让系统在后台处理记忆。

2. 开始新会话

刷新页面可清除当前对话记录(短期记忆)。

提出一个依赖于您之前提供的上下文的问题:

"What kind of missions am I interested in?"

预期响应

“根据您之前的对话,您对以下内容感兴趣:

  • 医疗救援任务
  • 山区/高海拔地区作业
  • 所需技能:急救、攀岩

您是否希望我查找符合这些条件的幸存者?

3. 使用图片上传功能进行测试

上传图片,然后提出以下问题:

remember this

您可以选择此处的任意照片或您自己的照片,然后将其上传到界面:

4. 在 Vertex AI Agent Engine 中验证

前往 Google Cloud 控制台 Agent Engine

  1. 请确保您从左上角的项目选择器中选择了项目:项目选择器
  2. 验证您刚刚通过上一个命令 use_memory_bank.sh 部署的代理引擎:Agent Engine点击您刚刚创建的代理引擎。
  3. 点击此已部署代理中的 Memories 标签页,即可在此处查看所有记忆。查看回忆

👉💻 完成测试后,在终端中点击“Ctrl + C”以结束进程。

🎉 恭喜!您刚刚将记忆库附加到智能体!

14. 部署到 Cloud Run

1. 运行部署脚本

👉💻 运行部署脚本:

cd ~/way-back-home/level_2
./deploy_cloud_run.sh

成功部署后,您将获得网址,这就是为您部署的网址!已部署

👉💻 在获取网址之前,请运行以下命令来授予权限:

source .env && gcloud run services add-iam-policy-binding survivor-frontend --region $REGION --member=allUsers --role=roles/run.invoker && gcloud run services add-iam-policy-binding survivor-backend --region $REGION --member=allUsers --role=roles/run.invoker

前往已部署的网址,您会看到您的应用已在其中上线!

2. 了解 build 流水线

cloudbuild.yaml 文件定义了以下顺序步骤:

  1. 后端 build:从 backend/Dockerfile 构建 Docker 映像。
  2. 后端部署:将后端容器部署到 Cloud Run。
  3. 捕获网址:获取新的后端网址。
  4. 前端 build
    • 安装依赖项。
    • 构建 React 应用,注入 VITE_API_URL=
  5. 前端映像:从 frontend/Dockerfile 构建 Docker 映像(打包静态资源)。
  6. 前端部署:部署前端容器。

3. 验证部署

构建完成后(查看脚本提供的日志链接),您可以验证:

  1. 前往 Cloud Run 控制台
  2. 找到 survivor-frontend 服务。
  3. 点击网址即可打开应用。
  4. 执行搜索查询,以确保前端可以与后端通信。

4. (仅限研讨会参与者)更新您的位置信息

👉💻 运行补全脚本:

cd ~/way-back-home/level_2
./set_level_2.sh

现在打开 waybackhome.dev,您会看到自己的位置信息已更新。恭喜您完成第 2 级!

最终结果

(可选)5. 手动部署

如果您希望手动运行命令或更好地了解该流程,请按以下步骤直接使用 cloudbuild.yaml

写入 cloudbuild.yaml

cloudbuild.yaml 文件会告知 Google Cloud Build 要执行哪些步骤。

  • steps:一系列按顺序执行的操作。每个步骤都在容器中运行(例如,dockergcloudnodebash)。
  • 替代变量:可在 build 时传递的变量(例如,$_REGION)。
  • workspace:一个共享目录,其中包含的步骤可以共享文件(就像我们共享 backend_url.txt 一样)。

运行部署

如需在不使用脚本的情况下手动部署,请使用 gcloud builds submit 命令。您必须传递所需的替换变量。

# Load your env vars first or replace these values manually
export PROJECT_ID=your-project-id
export REGION=us-central1

gcloud builds submit --config cloudbuild.yaml \
    --project "$PROJECT_ID" \
    --substitutions _REGION="us-central1",_GOOGLE_API_KEY="",_AGENT_ENGINE_ID="your-agent-id",_USE_MEMORY_BANK="TRUE",_GOOGLE_GENAI_USE_VERTEXAI="TRUE"

15. 总结

1. 您构建的内容

图数据库:包含节点(幸存者、技能)和边(关系)的 Spanner
AI Search:使用嵌入的关键字搜索、语义搜索和混合搜索
多模态流水线:使用 Gemini 从图片/视频中提取实体
多代理系统:使用 ADK 的协调工作流
记忆库:使用 Vertex AI 实现长期个性化
生产部署:Cloud Run + Agent Engine

2. 架构摘要

architecture_fullstack

3. 重要经验

  1. Graph RAG:将图数据库结构与语义嵌入相结合,实现智能搜索
  2. 多智能体模式:用于复杂多步骤工作流的顺序流水线
  3. 多模态 AI:从非结构化媒体(图片/视频)中提取结构化数据
  4. 有状态智能体:记忆库可实现跨会话的个性化

4. 研讨会内容

5. 资源