1. 简介
您是否曾因为管理所有个人支出而感到沮丧和懒惰?我也是!因此,在此 Codelab 中,我们将构建一个由 Gemini 2.5 提供支持的个人支出管理助理,为我们处理所有琐事!从管理上传的收据到分析您是否已经花费太多买咖啡!
您可以通过网络浏览器访问此助理,它以聊天网页界面的形式呈现,您可以在其中与它交流、上传一些收据图片并要求它存储这些图片,或者您也可以搜索一些收据以获取文件并进行一些支出分析。所有这些都基于 Google 智能体开发套件框架构建而成
应用本身分为 2 项服务:前端和后端;这样,您就可以快速构建原型并试用其效果,还可以了解 API 协定如何集成这两项服务。
在本 Codelab 中,您将采用分步方法,具体步骤如下:
- 准备您的 Google Cloud 项目并在其中启用所有所需的 API
- 在 Google Cloud Storage 上设置存储分区,在 Firestore 上设置数据库
- 创建 Firestore 索引
- 为您的编码环境设置工作区
- 构建 ADK 代理源代码、工具、提示等
- 使用 ADK 本地 Web 开发界面测试智能体
- 使用 Gradio 库构建前端服务 - 聊天界面,以发送一些查询和上传收据图片
- 使用 FastAPI 构建后端服务 - HTTP 服务器,其中包含我们的 ADK 代理代码、SessionService 和工件服务
- 管理环境变量并设置将应用部署到 Cloud Run 所需的文件
- 将应用部署到 Cloud Run
架构概览
前提条件
- 熟练使用 Python
- 了解使用 HTTP 服务的基本全栈架构
学习内容
- 使用 Gradio 制作前端 Web 原型
- 使用 FastAPI 和 Pydantic 开发后端服务
- 架构 ADK Agent 并利用其多项功能
- 工具使用
- 会话和工件管理
- 在发送到 Gemini 之前对输入进行修改的回调使用情况
- 利用 BuiltInPlanner 通过规划来改进任务执行
- 通过 ADK 本地 Web 界面快速调试
- 通过提示工程和使用 ADK 回调修改 Gemini 请求,通过信息解析和检索优化多模态互动
- 使用 Firestore 作为向量数据库的代理检索增强生成
- 使用 Pydantic-settings 在 YAML 文件中管理环境变量
- 使用 Dockerfile 将应用部署到 Cloud Run,并使用 YAML 文件提供环境变量
所需条件
- Chrome 网络浏览器
- Gmail 账号
- 启用了结算功能的 Cloud 项目
此 Codelab 面向各种级别(包括新手)的开发者,其示例应用中使用了 Python。不过,您无需了解 Python 即可理解所介绍的概念。
2. 准备工作
在 Cloud 控制台中选择“有效项目”
本 Codelab 假定您已有一个启用了结算功能的 Google Cloud 项目。如果您尚未拥有此应用,可以按照以下说明开始使用。
- 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。
- 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
准备 Firestore 数据库
接下来,我们还需要创建一个 Firestore 数据库。原生模式 Firestore 是一个 NoSQL 文档数据库,以可自动扩缩、性能出色和易于进行应用开发为设计目标。它还可以充当向量数据库,为我们的实验室支持检索增强生成技术。
- 在搜索栏中搜索“firestore”,然后点击 Firestore 产品
- 然后,点击 Create A Firestore Database(创建 Firestore 数据库)按钮
- 使用 (默认) 作为数据库 ID 名称,并保持 Standard Edition 的选择状态。在本实验演示中,请将 Firestore 原生与开放安全规则搭配使用。
- 您还会注意到,此数据库实际上显示了免费层级使用量然后,点击“创建数据库”按钮
完成上述步骤后,您应该已重定向到您刚刚创建的 Firestore 数据库
在 Cloud Shell 终端中设置 Cloud 项目
- 您将使用 Cloud Shell,这是一个在 Google Cloud 中运行的命令行环境,它预加载了 bq。点击 Google Cloud 控制台顶部的“激活 Cloud Shell”。
- 连接到 Cloud Shell 后,您可以使用以下命令检查自己是否已通过身份验证,以及项目是否已设置为您的项目 ID:
gcloud auth list
- 在 Cloud Shell 中运行以下命令,以确认 gcloud 命令了解您的项目。
gcloud config list project
- 如果项目未设置,请使用以下命令进行设置:
gcloud config set project <YOUR_PROJECT_ID>
或者,您也可以在控制台中查看 PROJECT_ID
ID
点击它,您会在右侧看到您的所有项目和项目 ID
- 通过以下命令启用所需的 API。此过程可能需要几分钟的时间,请耐心等待。
gcloud services enable aiplatform.googleapis.com \
firestore.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
cloudresourcemanager.googleapis.com
成功执行该命令后,您应该会看到如下所示的消息:
Operation "operations/..." finished successfully.
您可以通过控制台搜索各个产品或使用此链接,以替代 gcloud 命令。
如果缺少任何 API,您随时可以在实现过程中启用它。
如需了解 gcloud 命令和用法,请参阅文档。
准备 Google Cloud Storage 存储分区
接下来,我们需要在同一终端中准备 GCS 存储分区来存储上传的文件。运行以下命令以创建存储分区
gsutil mb -l us-central1 gs://personal-expense-assistant-receipts
系统会显示以下输出
Creating gs://personal-expense-assistant-receipts/...
您可以通过以下方式进行验证:前往浏览器左上角的导航菜单,然后依次选择 Cloud Storage -> Bucket
为搜索创建 Firestore 索引
Firestore 是原生 NoSQL 数据库,在数据模型方面具有卓越的性能和灵活性,但在处理复杂查询时存在局限性。由于我们计划使用一些复合多字段查询和向量搜索,因此需要先创建一些索引。如需详细了解,请参阅此文档
- 运行以下命令以创建索引来支持复合查询
gcloud firestore indexes composite create \
--collection-group=personal-expense-assistant-receipts \
--field-config field-path=total_amount,order=ASCENDING \
--field-config field-path=transaction_time,order=ASCENDING \
--field-config field-path=__name__,order=ASCENDING \
--database="(default)"
- 并运行此脚本以支持向量搜索
gcloud firestore indexes composite create \
--collection-group="personal-expense-assistant-receipts" \
--query-scope=COLLECTION \
--field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
--database="(default)"
如需查看创建的索引,请访问 Cloud 控制台中的 Firestore,点击 (default) 数据库实例,然后在导航栏中选择 Indexes
前往 Cloud Shell 编辑器并设置应用工作目录
现在,我们可以设置代码编辑器来进行一些编码操作了。我们将使用 Cloud Shell Editor 来完成此操作
- 点击“打开编辑器”按钮,这将打开 Cloud Shell 编辑器,我们可以在此处编写代码
- 确保在 Cloud Shell 编辑器的左下角(状态栏)中设置 Cloud Code 项目(如下图所示),并将其设置为已启用结算功能的有效 Google Cloud 项目。在系统提示时授权。如果您已按照上一条命令操作,该按钮可能也会直接指向已启用的项目,而不是登录按钮
- 接下来,我们从 GitHub 克隆此 Codelab 的模板工作目录,运行以下命令。它会在 personal-expense-assistant 目录中创建工作目录
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
- 然后,前往 Cloud Shell 编辑器的顶部,依次点击文件->打开文件夹,找到您的用户名目录,找到 personal-expense-assistant 目录,然后点击 OK 按钮。这会将所选目录设为主工作目录。在此示例中,用户名为 alvinprayuda,因此目录路径如下所示
现在,您的 Cloud Shell 编辑器应如下所示
环境设置
准备 Python 虚拟环境
下一步是准备开发环境。您当前使用的终端应位于 personal-expense-assistant 工作目录中。在本 Codelab 中,我们将使用 Python 3.12,并使用 uv Python 项目管理器来简化创建和管理 Python 版本和虚拟环境的流程
- 如果您尚未打开终端,请依次点击 Terminal(终端)-> New Terminal(新建终端),或使用 Ctrl + Shift + C 打开终端,它会在浏览器底部打开一个终端窗口
- 使用以下命令下载
uv
并安装 Python 3.12
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
- 现在,我们使用
uv
初始化虚拟环境,运行以下命令
uv sync --frozen
这将创建 .venv 目录并安装依赖项。快速浏览 pyproject.toml 即可了解依赖项的相关信息,如下所示
dependencies = [ "datasets>=3.5.0", "google-adk>=0.2.0", "google-cloud-firestore>=2.20.1", "gradio>=5.23.1", "pydantic>=2.10.6", "pydantic-settings[yaml]>=2.8.1", ]
- 如需测试虚拟环境,请创建新文件 main.py 并复制以下代码
def main():
print("Hello from personal-expense-assistant-adk!")
if __name__ == "__main__":
main()
- 然后,运行以下命令
uv run main.py
您将会看到如下所示的输出
Using CPython 3.12 Creating virtual environment at: .venv Hello from personal-expense-assistant-adk!
这表明 Python 项目已正确设置。
设置配置文件
现在,我们需要为此项目设置配置文件。我们使用 pydantic-settings 从 YAML 文件读取配置。
创建名为 settings.yaml 的文件,并添加以下配置。依次点击 File->New Text File,然后使用以下代码进行填充。然后将其另存为 settings.yaml
GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"
在本 Codelab 中,我们将使用为 GCLOUD_LOCATION
,
BACKEND_URL
,
STORAGE_BUCKET_NAME
,
DB_COLLECTION_NAME
和 BACKEND_URL
预配置的值。
现在,我们可以继续执行下一步,构建代理,然后构建服务
3. 使用 Google ADK 和 Gemini 2.5 构建智能体
简介 ADK 目录结构
首先,我们来探索 ADK 提供的功能以及如何构建代理。您可以访问此网址,查看 ADK 完整文档。ADK 在其 CLI 命令执行中提供了许多实用程序。其中一些包括:
- 设置代理目录结构
- 快速尝试通过 CLI 输入输出进行互动
- 快速设置本地开发界面 Web 界面
现在,我们使用 CLI 命令创建代理目录结构。运行以下命令
uv run adk create expense_manager_agent \
--model gemini-2.5-flash-preview-04-17 \
--project {your-project-id} \
--region us-central1
它将创建以下代理目录结构
expense_manager_agent/ ├── __init__.py ├── .env ├── agent.py
如果您检查 init.py 和 agent.py,会看到以下代码
# __init__.py
from . import agent
# agent.py
from google.adk.agents import Agent
root_agent = Agent(
model='gemini-2.5-flash-preview-04-17',
name='root_agent',
description='A helpful assistant for user questions.',
instruction='Answer user questions to the best of your knowledge',
)
构建我们的支出管理器智能客服
接下来,我们来构建支出管理器代理!打开 expense_manager_agent/agent.py 文件,然后复制以下代码,其中包含 root_agent。
# expense_manager_agent/agent.py
from google.adk.agents import Agent
from expense_manager_agent.tools import (
store_receipt_data,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types
SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
task_prompt = file.read()
root_agent = Agent(
name="expense_manager_agent",
model="gemini-2.5-flash-preview-04-17",
description=(
"Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
),
instruction=task_prompt,
tools=[
store_receipt_data,
get_receipt_data_by_image_id,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
],
planner=BuiltInPlanner(
thinking_config=types.ThinkingConfig(
thinking_budget=2048,
)
),
before_model_callback=modify_image_data_in_history,
)
代码说明
此脚本包含代理初始化,其中我们会初始化以下内容:
- 将要使用的模型设置为
gemini-2.5-flash-preview-04-17
- 将客服人员说明和说明设置为从
task_prompt.md
读取的系统提示 - 提供必要的工具来支持代理功能
- 在使用 Gemini 2.5 Flash Thinking 功能生成最终回答或执行操作之前,启用规划功能
- 在向 Gemini 发送请求之前设置回调拦截,以限制在进行预测之前发送的图片数据数量
4. 配置代理工具
我们的支出管理器客服人员将具备以下功能:
- 从收据图片中提取数据,并存储数据和文件
- 对支出数据进行完全匹配搜索
- 对支出数据进行内容相关搜索
因此,我们需要适当的工具来支持此功能。在 expense_manager_agent 目录下新建一个文件,并将其命名为 tools.py,然后复制以下代码
# expense_manager_agent/tools.py
import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai
SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
project=SETTINGS.GCLOUD_PROJECT_ID
) # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""
def sanitize_image_id(image_id: str) -> str:
"""Sanitize image ID by removing any leading/trailing whitespace."""
if image_id.startswith("[IMAGE-"):
image_id = image_id.split("ID ")[1].split("]")[0]
return image_id.strip()
def store_receipt_data(
image_id: str,
store_name: str,
transaction_time: str,
total_amount: float,
purchased_items: List[Dict[str, Any]],
currency: str = "IDR",
) -> str:
"""
Store receipt data in the database.
Args:
image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
the ID of the image is 12345.
store_name (str): The name of the store.
transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
total_amount (float): The total amount spent.
purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
- name (str): The name of the item.
- price (float): The price of the item.
- quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
currency (str, optional): The currency of the transaction, can be derived from the store location.
If unsure, default is "IDR".
Returns:
str: A success message with the receipt ID.
Raises:
Exception: If the operation failed or input is invalid.
"""
try:
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Check if the receipt already exists
doc = get_receipt_data_by_image_id(image_id)
if doc:
return f"Receipt with ID {image_id} already exists"
# Validate transaction time
if not isinstance(transaction_time, str):
raise ValueError(
"Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
try:
datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError(
"Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
# Validate items format
if not isinstance(purchased_items, list):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
for _item in purchased_items:
if (
not isinstance(_item, dict)
or "name" not in _item
or "price" not in _item
):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
if "quantity" not in _item:
_item["quantity"] = 1
# Create a combined text from all receipt information for better embedding
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004",
contents=RECEIPT_DESC_FORMAT.format(
store_name=store_name,
transaction_time=transaction_time,
total_amount=total_amount,
currency=currency,
purchased_items=purchased_items,
receipt_id=image_id,
),
)
embedding = result.embeddings[0].values
doc = {
"receipt_id": image_id,
"store_name": store_name,
"transaction_time": transaction_time,
"total_amount": total_amount,
"currency": currency,
"purchased_items": purchased_items,
EMBEDDING_FIELD_NAME: Vector(embedding),
}
COLLECTION.add(doc)
return f"Receipt stored successfully with ID: {image_id}"
except Exception as e:
raise Exception(f"Failed to store receipt: {str(e)}")
def search_receipts_by_metadata_filter(
start_time: str,
end_time: str,
min_total_amount: float = -1.0,
max_total_amount: float = -1.0,
) -> str:
"""
Filter receipts by metadata within a specific time range and optionally by amount.
Args:
start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.
Returns:
str: A string containing the list of receipt data matching all applied filters.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Validate start and end times
if not isinstance(start_time, str) or not isinstance(end_time, str):
raise ValueError("start_time and end_time must be strings in ISO format")
try:
datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError("start_time and end_time must be strings in ISO format")
# Start with the base collection reference
query = COLLECTION
# Build the composite query by properly chaining conditions
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
filters = [
FieldFilter("transaction_time", ">=", start_time),
FieldFilter("transaction_time", "<=", end_time),
]
# Add optional filters
if min_total_amount != -1:
filters.append(FieldFilter("total_amount", ">=", min_total_amount))
if max_total_amount != -1:
filters.append(FieldFilter("total_amount", "<=", max_total_amount))
# Apply the filters
composite_filter = And(filters=filters)
query = query.where(filter=composite_filter)
# Execute the query and collect results
search_result_description = "Search by Metadata Results:\n"
for doc in query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error filtering receipts: {str(e)}")
def search_relevant_receipts_by_natural_language_query(
query_text: str, limit: int = 5
) -> str:
"""
Search for receipts with content most similar to the query using vector search.
This tool can be use for user query that is difficult to translate into metadata filters.
Such as store name or item name which sensitive to string matching.
Use this tool if you cannot utilize the search by metadata filter tool.
Args:
query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
limit (int, optional): Maximum number of results to return (default: 5).
Returns:
str: A string containing the list of contextually relevant receipt data.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Generate embedding for the query text
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004", contents=query_text
)
query_embedding = result.embeddings[0].values
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
vector_query = COLLECTION.find_nearest(
vector_field=EMBEDDING_FIELD_NAME,
query_vector=Vector(query_embedding),
distance_measure=DistanceMeasure.EUCLIDEAN,
limit=limit,
)
# Execute the query and collect results
search_result_description = "Search by Contextual Relevance Results:\n"
for doc in vector_query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error searching receipts: {str(e)}")
def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
"""
Retrieve receipt data from the database using the image_id.
Args:
image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
[IMAGE-ID 12345], the ID to use is 12345.
Returns:
Dict[str, Any]: A dictionary containing the receipt data with the following keys:
- receipt_id (str): The unique identifier of the receipt image.
- store_name (str): The name of the store.
- transaction_time (str): The time of purchase in UTC.
- total_amount (float): The total amount spent.
- currency (str): The currency of the transaction.
- purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
Returns an empty dictionary if no receipt is found.
"""
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Query the receipts collection for documents with matching receipt_id (image_id)
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
docs = list(query.stream())
if not docs:
return {}
# Get the first matching document
doc_data = docs[0].to_dict()
doc_data.pop(EMBEDDING_FIELD_NAME, None)
return doc_data
代码说明
在此工具函数实现中,我们围绕以下 2 个主要理念设计了工具:
- 使用图片 ID 字符串占位符
[IMAGE-ID <hash-of-image-1>]
解析收据数据并映射到原始文件 - 使用 Firestore 数据库存储和检索数据
工具“store_receipt_data”
此工具是光学字符识别工具,它会解析图片数据中的必要信息,同时识别图片 ID 字符串并将其映射在一起,以便存储在 Firestore 数据库中。
此外,此工具还会使用 text-embedding-004
将收据内容转换为嵌入,以便将所有元数据和嵌入一起存储和编入索引。支持灵活地通过查询或内容相关搜索检索。
成功执行此工具后,您会看到收据数据已在 Firestore 数据库中编入索引,如下所示
工具“search_receipts_by_metadata_filter”
此工具会将用户查询转换为元数据查询过滤条件,支持按日期范围和/或交易总额进行搜索。它将返回所有匹配的收据数据,在此过程中,我们会删除嵌入字段,因为代理不需要该字段来理解上下文
工具“search_relevant_receipts_by_natural_language_query”
这是我们的检索增强生成 (RAG) 工具。我们的代理可以自行设计查询,从向量数据库中检索相关收据,还可以选择何时使用此工具。允许代理独立决定是否使用此 RAG 工具并自行设计查询,这是代理 RAG 方法的定义之一。
我们不仅允许它构建自己的查询,还允许它选择要检索的相关文档数量。结合适当的提示工程,例如
# Example prompt Always filter the result from tool search_relevant_receipts_by_natural_language_query as the returned result may contain irrelevant information
这使得该工具成为一款强大的工具,能够搜索几乎所有内容,但由于最近邻搜索的非精确性,它可能无法返回所有预期结果。
5. 通过回调修改对话上下文
借助 Google ADK,我们可以在不同级别“拦截”代理运行时。如需详细了解此功能,请参阅此文档。在本实验中,我们将使用 before_model_callback
在发送到 LLM 之前修改请求,以移除旧对话历史记录上下文中的图片数据(仅包含最近 3 次用户互动中的图片数据),以提高效率
不过,我们仍希望代理在需要时拥有图片数据上下文。因此,我们添加了一种机制,用于在对话中的每个图片字节数据后面添加字符串图片 ID 占位符。这有助于代理将图片 ID 与其实际文件数据相关联,这些数据可在图片存储或检索时使用。结构如下所示
<image-byte-data-1> [IMAGE-ID <hash-of-image-1>] <image-byte-data-2> [IMAGE-ID <hash-of-image-2>] And so on..
当字节数据在对话记录中过时时,字符串标识符仍然存在,以便您借助工具使用数据。移除图片数据后的示例历史记录结构
[IMAGE-ID <hash-of-image-1>] [IMAGE-ID <hash-of-image-2>] And so on..
我们开始吧!在 expense_manager_agent 目录下创建一个新文件,并将其命名为 callbacks.py,然后复制以下代码
# expense_manager_agent/callbacks.py
import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
def modify_image_data_in_history(
callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
# The following code will modify the request sent to LLM
# We will only keep image data in the last 3 user messages using a reverse and counter approach
# Count how many user messages we've processed
user_message_count = 0
# Process the reversed list
for content in reversed(llm_request.contents):
# Only count for user manual query, not function call
if (content.role == "user") and (content.parts[0].function_response is None):
user_message_count += 1
modified_content_parts = []
# Check any missing image ID placeholder for any image data
# Then remove image data from conversation history if more than 3 user messages
for idx, part in enumerate(content.parts):
if part.inline_data is None:
modified_content_parts.append(part)
continue
if (
(idx + 1 >= len(content.parts))
or (content.parts[idx + 1].text is None)
or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
):
# Generate hash ID for the image and add a placeholder
image_data = part.inline_data.data
hasher = hashlib.sha256(image_data)
image_hash_id = hasher.hexdigest()[:12]
placeholder = f"[IMAGE-ID {image_hash_id}]"
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
modified_content_parts.append(types.Part(text=placeholder))
else:
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
# This will modify the contents inside the llm_request
content.parts = modified_content_parts
6. 提示
若要设计具有复杂互动和功能的客服人员,我们需要找到足够好的提示来引导客服人员,使其能够按照我们期望的方式运行。
以前,我们有一种处理对话历史记录中图片数据的机制,还有一些可能不太容易使用的工具,例如 search_relevant_receipts_by_natural_language_query.
。我们还希望客服人员能够搜索并检索正确的收据图片,以便将其发送给我们。这意味着,我们需要以适当的提示结构正确传达所有这些信息
我们会要求客服人员将输出内容以以下 Markdown 格式进行结构化,以便解析思考过程、最终回答和附件(如果有)
# THINKING PROCESS Thinking process here # FINAL RESPONSE Response to the user here Attachments put inside json block { "attachments": [ "[IMAGE-ID <hash-id-1>]", "[IMAGE-ID <hash-id-2>]", ... ] }
我们先从以下问题入手,以实现对支出管理器客服人员行为的初始预期。task_prompt.md 文件应该已经存在于现有工作目录中,但我们需要将其移至 expense_manager_agent 目录下。运行以下命令进行移动
mv task_prompt.md expense_manager_agent/task_prompt.md
7. 测试代理
现在,我们尝试通过 CLI 与代理进行通信,运行以下命令
uv run adk run expense_manager_agent
系统会显示如下输出,您可以轮流与客服人员聊天,但只能通过此界面发送文本
Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log To access latest log: tail -F /tmp/agents_log/agent.latest.log Running agent root_agent, type exit to exit. user: hello [root_agent]: Hello there! How can I help you today? user:
现在,除了 CLI 互动之外,ADK 还允许我们使用开发界面进行互动,并检查互动期间发生的情况。运行以下命令以启动本地开发界面服务器
uv run adk web --port 8080
它将生成类似于以下示例的输出,表示我们已经可以访问 Web 界面
INFO: Started server process [xxxx] INFO: Waiting for application startup. +-----------------------------------------------------------------------------+ | ADK Web Server started | | | | For local testing, access at http://localhost:8080. | +-----------------------------------------------------------------------------+ INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
现在,如需进行检查,请点击 Cloud Shell 编辑器顶部区域的 Web Preview 按钮,然后选择 Preview on port 8080
您会看到以下网页,您可以在左上角的下拉按钮中选择可用的客服人员(在本例中,应该是 expense_manager_agent),然后与聊天机器人互动。您会在左侧窗口中看到有关代理运行时日志详情的许多信息
我们来试试一些操作!请上传以下 2 个收据示例(来源:Hugging Face 数据集 mousserlane/id_receipt_dataset
)。右键点击每张图片,然后选择图片另存为(这将下载收据图片),然后点击“剪辑”图标将文件上传到聊天机器人,并说明您想存储这些收据
然后,尝试使用以下查询执行一些搜索或文件检索
- “Provide a breakdown of expenses and its total during 2023”(提供 2023 年支出明细及其总额)
- “Give me receipt file from Indomaret”
使用某些工具时,您可以在开发界面中检查正在发生的情况
查看代理对您的回复,并检查其是否遵循 task_prompt.py 中的提示中提供的所有规则。恭喜!现在,您已经拥有了一个完整的开发代理。
现在,我们需要使用合适的界面和功能来上传和下载图片文件,以便完成该应用。
8. 使用 Gradio 构建前端服务
我们将构建一个如下所示的聊天 Web 界面
该页面包含一个聊天界面,其中包含一个输入字段,供用户发送文本和上传收据图片文件。
我们将使用 Gradio 构建前端服务。
创建新文件,依次点击 File->New Text File(文件 -> 新建文本文件),并将其命名为 frontend.py,然后复制以下代码并保存
import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse
SETTINGS = get_settings()
def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
"""Encode a file to base64 string and get MIME type.
Reads an image file and returns the base64-encoded image data and its MIME type.
Args:
image_path: Path to the image file to encode.
Returns:
ImageData object containing the base64 encoded image data and its MIME type.
"""
# Read the image file
with open(image_path, "rb") as file:
image_content = file.read()
# Get the mime type
mime_type = mimetypes.guess_type(image_path)[0]
# Base64 encode the image
base64_data = base64.b64encode(image_content).decode("utf-8")
# Return as ImageData object
return ImageData(serialized_image=base64_data, mime_type=mime_type)
def decode_base64_to_image(base64_data: str) -> Image.Image:
"""Decode a base64 string to PIL Image.
Converts a base64-encoded image string back to a PIL Image object
that can be displayed or processed further.
Args:
base64_data: Base64 encoded string of the image.
Returns:
PIL Image object of the decoded image.
"""
# Decode the base64 string and convert to PIL Image
image_data = base64.b64decode(base64_data)
image_buffer = io.BytesIO(image_data)
image = Image.open(image_buffer)
return image
def get_response_from_llm_backend(
message: Dict[str, Any],
history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
"""Send the message and history to the backend and get a response.
Args:
message: Dictionary containing the current message with 'text' and optional 'files' keys.
history: List of previous message dictionaries in the conversation.
Returns:
List containing text response and any image attachments from the backend service.
"""
# Extract files and convert to base64
image_data = []
if uploaded_files := message.get("files", []):
for file_path in uploaded_files:
image_data.append(encode_image_to_base64_and_get_mime_type(file_path))
# Prepare the request payload
payload = ChatRequest(
text=message["text"],
files=image_data,
session_id="default_session",
user_id="default_user",
)
# Send request to backend
try:
response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
response.raise_for_status() # Raise exception for HTTP errors
result = ChatResponse(**response.json())
if result.error:
return [f"Error: {result.error}"]
chat_responses = []
if result.thinking_process:
chat_responses.append(
gr.ChatMessage(
role="assistant",
content=result.thinking_process,
metadata={"title": "🧠 Thinking Process"},
)
)
chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))
if result.attachments:
for attachment in result.attachments:
image_data = attachment.serialized_image
chat_responses.append(gr.Image(decode_base64_to_image(image_data)))
return chat_responses
except requests.exceptions.RequestException as e:
return [f"Error connecting to backend service: {str(e)}"]
if __name__ == "__main__":
demo = gr.ChatInterface(
get_response_from_llm_backend,
title="Personal Expense Assistant",
description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
type="messages",
multimodal=True,
textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
)
demo.launch(
server_name="0.0.0.0",
server_port=8080,
)
之后,我们可以尝试使用以下命令运行前端服务。别忘了将 main.py 文件重命名为 frontend.py
uv run frontend.py
您将在 Cloud 控制台中看到类似于以下内容的输出
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
之后,您可以Ctrl 键 + 点击本地网址链接,查看 Web 界面。或者,您也可以点击 Cloud 编辑器右上角的网页预览按钮,然后选择在端口 8080 上预览,以访问前端应用
您会看到 Web 界面,但由于后端服务尚未设置,因此在尝试提交聊天时会收到预期错误
现在,让服务运行,暂时不要终止它。我们将在另一个终端标签页中运行后端服务
代码说明
在此前端代码中,我们首先让用户能够发送文本和上传多个文件。借助 Gradio,我们可以使用 gr.ChatInterface 方法与 gr.MultimodalTextbox 结合来创建此类功能
现在,在将文件和文本发送到后端之前,我们需要确定文件的 mimetype,因为后端需要它。我们还需要将图片文件字节编码为 base64 格式,并将其与 mimetype 一起发送。
class ImageData(BaseModel): """Model for image data with hash identifier. Attributes: serialized_image: Optional Base64 encoded string of the image content. mime_type: MIME type of the image. """ serialized_image: str mime_type: str
用于前端与后端交互的架构在 schema.py 中定义。我们使用 Pydantic BaseModel 在架构中强制执行数据验证
收到回复后,我们会将思考过程、最终回复和附件分开。因此,我们可以使用 Gradio 组件通过界面组件显示每个组件。
class ChatResponse(BaseModel): """Model for a chat response. Attributes: response: The text response from the model. thinking_process: Optional thinking process of the model. attachments: List of image data to be displayed to the user. error: Optional error message if something went wrong. """ response: str thinking_process: str = "" attachments: List[ImageData] = [] error: Optional[str] = None
9. 使用 FastAPI 构建后端服务
接下来,我们需要构建后端,该后端可以将我们的代理与其他组件一起初始化,以便能够执行代理运行时。
创建新文件,依次点击 File->New Text File,然后复制并粘贴以下代码,并将其另存为 backend.py
from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
extract_attachment_ids_and_sanitize_response,
download_image_from_gcs,
extract_thinking_process,
format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings
SETTINGS = get_settings()
APP_NAME = "expense_manager_app"
# Application state to hold service contexts
class AppContexts(SimpleNamespace):
"""A class to hold application contexts with attribute access"""
session_service: InMemorySessionService = None
artifact_service: GcsArtifactService = None
expense_manager_agent_runner: Runner = None
# Initialize application state
app_contexts = AppContexts()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize service contexts during application startup
app_contexts.session_service = InMemorySessionService()
app_contexts.artifact_service = GcsArtifactService(
bucket_name=SETTINGS.STORAGE_BUCKET_NAME
)
app_contexts.expense_manager_agent_runner = Runner(
agent=expense_manager_agent, # The agent we want to run
app_name=APP_NAME, # Associates runs with our app
session_service=app_contexts.session_service, # Uses our session manager
artifact_service=app_contexts.artifact_service, # Uses our artifact manager
)
logger.info("Application started successfully")
yield
logger.info("Application shutting down")
# Perform cleanup during application shutdown if necessary
# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
return app_contexts
# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)
@app.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest = Body(...),
app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
"""Process chat request and get response from the agent"""
# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
format_user_request_to_adk_content_and_store_artifacts,
request=request,
app_name=APP_NAME,
artifact_service=app_context.artifact_service,
)
final_response_text = "Agent did not produce a final response." # Default
# Use the session ID from the request or default if not provided
session_id = request.session_id
user_id = request.user_id
# Create session if it doesn't exist
if not app_context.session_service.get_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
):
app_context.session_service.create_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
try:
# Process the message with the agent
# Type annotation: runner.run_async returns an AsyncIterator[Event]
events_iterator: AsyncIterator[Event] = (
app_context.expense_manager_agent_runner.run_async(
user_id=user_id, session_id=session_id, new_message=content
)
)
async for event in events_iterator: # event has type Event
# Key Concept: is_final_response() marks the concluding message for the turn
if event.is_final_response():
if event.content and event.content.parts:
# Extract text from the first part
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
# Handle potential errors/escalations
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
break # Stop processing events once the final response is found
logger.info(
"Received final response from agent", raw_final_response=final_response_text
)
# Extract and process any attachments and thinking process in the response
base64_attachments = []
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)
# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
# Download image data and get MIME type
result = await asyncio.to_thread(
download_image_from_gcs,
artifact_service=app_context.artifact_service,
image_hash=image_hash_id,
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
if result:
base64_data, mime_type = result
base64_attachments.append(
ImageData(serialized_image=base64_data, mime_type=mime_type)
)
logger.info(
"Processed response with attachments",
sanitized_response=sanitized_text,
thinking_process=thinking_process,
attachment_ids=attachment_ids,
)
return ChatResponse(
response=sanitized_text,
thinking_process=thinking_process,
attachments=base64_attachments,
)
except Exception as e:
logger.error("Error processing chat request", error_message=str(e))
return ChatResponse(
response="", error=f"Error in generating response: {str(e)}"
)
# Only run the server if this file is executed directly
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8081)
之后,我们可以尝试运行后端服务。请记得,在上一步中,我们正确运行了前端服务,现在我们需要打开新的终端并尝试运行此后端服务
- 创建一个新终端。前往底部区域的终端,找到“+”按钮以创建新的终端。或者,您也可以按 Ctrl + Shift + C 打开新的终端
- 之后,确保您位于工作目录 personal-expense-assistant 中,然后运行以下命令
uv run backend.py
- 如果成功,则会显示如下所示的输出
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
代码说明
初始化 ADK Agent、SessionService 和 ArtifactService
为了在后端服务中运行代理,我们需要创建一个同时接受 SessionService 和代理的 Runner。SessionService 将管理对话历史记录和状态,因此与 Runner 集成后,它将使我们的客服人员能够接收正在进行的对话上下文。
我们还使用 ArtifactService 来处理上传的文件。您可以点击此处,详细了解 ADK 会话和工件
... @asynccontextmanager async def lifespan(app: FastAPI): # Initialize service contexts during application startup app_contexts.session_service = InMemorySessionService() app_contexts.artifact_service = GcsArtifactService( bucket_name=SETTINGS.STORAGE_BUCKET_NAME ) app_contexts.expense_manager_agent_runner = Runner( agent=expense_manager_agent, # The agent we want to run app_name=APP_NAME, # Associates runs with our app session_service=app_contexts.session_service, # Uses our session manager artifact_service=app_contexts.artifact_service, # Uses our artifact manager ) logger.info("Application started successfully") yield logger.info("Application shutting down") # Perform cleanup during application shutdown if necessary ...
在此演示中,我们将 InMemorySessionService 和 GcsArtifactService 与代理 Runner 集成。由于对话记录存储在内存中,因此在后端服务被终止或重启后,对话记录将会丢失。我们在 FastAPI 应用生命周期内初始化这些依赖项,以便将其作为依赖项注入 /chat
路由中。
使用 GcsArtifactService 上传和下载图片
所有上传的图片都将由 GcsArtifactService 存储为工件,您可以在 utils.py 中的 format_user_request_to_adk_content_and_store_artifacts
函数内检查这一点
... # Prepare the user's message in ADK format and store image artifacts content = await asyncio.to_thread( format_user_request_to_adk_content_and_store_artifacts, request=request, app_name=APP_NAME, artifact_service=app_context.artifact_service, ) ...
所有将由代理运行程序处理的请求都需要采用 types.Content 类型的格式。在该函数中,我们还会处理每项图片数据并提取其 ID,以便替换为图片 ID 占位符。
使用正则表达式提取图片 ID 后,系统会采用类似的机制下载附件:
... sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response( final_response_text ) sanitized_text, thinking_process = extract_thinking_process(sanitized_text) # Download images from GCS and replace hash IDs with base64 data for image_hash_id in attachment_ids: # Download image data and get MIME type result = await asyncio.to_thread( download_image_from_gcs, artifact_service=app_context.artifact_service, image_hash=image_hash_id, app_name=APP_NAME, user_id=user_id, session_id=session_id, ) ...
10. 集成测试
现在,您应该可以在不同的 Cloud 控制台标签页中运行多个服务:
- 在端口 8080 上运行的前端服务
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
- 在端口 8081 上运行的后端服务
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
目前,您应该能够通过端口 8080 上的 Web 应用上传收据图片,并与 Google 助理进行无缝聊天。
点击 Cloud Shell 编辑器顶部区域的 Web Preview 按钮,然后选择 Preview on port 8080
现在,我们来与 Google 助理互动一下!
下载以下收据。这些收据数据的日期范围为 2023 年至 2024 年,并要求 Google 助理存储/上传这些数据
- 收据云端硬盘(来源 Hugging Face 数据集
mousserlane/id_receipt_dataset
)
询问各种各样的事情
- “请提供 2023 年至 2024 年的每月支出明细”
- “Show me a receipt for coffee transaction”(向我显示咖啡交易的收据)
- “给我发送 Yakiniku Like 的收据文件”
- 等等
以下是成功互动的一些片段
11. 部署到 Cloud Run
现在,我们当然希望随时随地访问这款出色的应用。为此,我们可以打包此应用并将其部署到 Cloud Run。在本演示中,此服务将作为可供他人访问的公有服务公开。不过,请注意,这并不是此类应用的最佳实践,因为这更适用于个人应用
在此 Codelab 中,我们将前端服务和后端服务都放入一个容器中。我们需要借助监督功能来管理这两项服务。您可以检查 supervisord.conf 文件,并查看我们将 supervisord 设置为入口点的 Dockerfile。
至此,我们已经有了将应用部署到 Cloud Run 所需的所有文件,接下来就开始部署吧。前往 Cloud Shell 终端,并确保当前项目已配置为您的有效项目,如果没有,请使用 gcloud configure 命令设置项目 ID:
gcloud config set project [PROJECT_ID]
然后,运行以下命令将其部署到 Cloud Run。
gcloud run deploy personal-expense-assistant \
--source . \
--port=8080 \
--allow-unauthenticated \
--env-vars-file=settings.yaml \
--memory 1024Mi \
--region us-central1
如果系统提示您确认为 Docker 仓库创建了工件注册库,请直接回答 Y。请注意,由于这是演示版应用,因此我们允许未经身份验证的访问。建议为企业应用和生产应用使用适当的身份验证。
部署完成后,您应该会收到类似于以下内容的链接:
https://personal-expense-assistant-*******.us-central1.run.app
接下来,您可以通过无痕式窗口或移动设备使用您的应用。它应该已经上线。
12. 挑战
现在是时候大显身手,磨练您的探索技能了。您是否有更改代码的必要条件,以便后端能够容纳多个用户?哪些组件需要更新?
13. 清理
为避免系统因本 Codelab 中使用的资源向您的 Google Cloud 账号收取费用,请按照以下步骤操作: