1. 学习内容
欢迎!我们今天将开启一段非常酷的旅程。我们先来考虑一下热门社交活动平台 InstaVibe。虽然成功了,但我们知道,对于某些用户来说,实际规划群组活动可能是一件令人头疼的事情。想象一下,您需要弄清楚所有朋友的兴趣爱好,然后从无数活动或场地选项中进行过滤,最后还要协调所有事宜。这可真是了不起!这正是我们可以引入 AI(更具体地说,是智能代理)来真正发挥作用的地方。
我们的想法是构建一个系统,让这些代理可以处理繁重的工作,例如巧妙地“聆听”以了解用户和朋友的偏好,然后主动建议量身定制的精彩活动。我们的目标是将 InstaVibe 上的社交规划转变为顺畅而愉悦的体验。若要开始构建这些智能助理,我们需要使用合适的工具打下坚实的基础。
您会看到以下概念:
Google ADK 基础知识:掌握使用 Google 智能体开发套件 (ADK) 构建首个智能体的基础知识。了解基本组件、代理生命周期,以及如何有效利用框架的内置工具。
使用 Model Context Protocol (MCP) 扩展智能体功能:了解如何为智能体配备自定义工具和上下文,使其能够执行专门的任务并访问特定信息。介绍 Model Context Protocol (MCP) 概念。您将学习如何设置 MCP 服务器来提供此上下文。
设计智能体互动和编排:从单个智能体入手,了解智能体编排。设计从简单顺序工作流到涉及循环、条件逻辑和并行处理的复杂场景的互动模式。在 ADK 框架内引入子代理的概念,以管理模块化任务。
构建协作式多智能体系统:了解如何设计多个智能体协作以实现复杂目标的系统。学习并实现智能体到智能体 (A2A) 通信协议,为分布式智能体(可能在不同的机器或服务上运行)建立可靠的标准化交互方式。
在 Google Cloud 上将代理投入生产:将代理应用从开发环境迁移到云端。了解在 Google Cloud Platform (GCP) 上设计和部署可扩缩的强大多代理系统的最佳实践。深入了解如何利用 Cloud Run 等 GCP 服务,并探索最新的 Google Agent Engine 在托管和管理代理方面的功能。
2. 架构
利用 InstaVibe 制定 AI 赋能的社交媒体规划
什么是社交媒体聆听?
社交媒体监听是指在社交媒体、论坛和新闻网站等平台上监控数字对话,以了解人们对某个主题、品牌或行业的看法。它可提供有关公众情绪、趋势和用户需求的宝贵数据洞见。在此研讨会中,我们将在基于代理的系统中利用这一概念。
您是 InstaVibe 团队的成员
假设您在 InstaVibe 工作,这是一家成功的初创公司,其社交活动平台深受年轻人的欢迎。一切进展顺利,但与许多科技公司一样,您的团队面临着来自投资者的压力,需要利用 AI 进行创新。在内部,您还注意到有一部分用户的互动程度不如其他用户,他们可能不太愿意发起群组活动,或者觉得规划过程很有挑战性。对于贵公司而言,这意味着这一重要用户群体的平台粘性较低。
您团队的研究表明,AI 驱动的辅助功能可以显著改善这些用户的体验。该应用的理念是,根据用户及其好友的兴趣主动建议相关活动,从而简化社交出游的规划流程。您和同事面临的问题是:AI 智能体如何才能自动执行通常非常耗时的任务,例如发现感兴趣的活动、研究活动,以及可能进行的初步协调?
基于代理的解决方案(原型概念)
您建议开发一项由多代理系统提供支持的原型功能。以下是概念性细分:
- 社交信息分析代理:此代理采用社交聆听技术来分析用户的人际关系、互动以及可能与用户偏好相关的更广泛的公开趋势。其目的是确定共同的兴趣爱好和合适的活动特征(例如,对安静聚会的偏好、特定的爱好)。
- 活动规划代理:此代理使用社交媒体分析代理提供的分析洞见,在线搜索符合所确定条件(例如地点、兴趣)的特定活动、场地或创意。
- 平台互动代理(使用 MCP):此代理从活动规划代理处获取最终确定的方案。其主要功能是利用预定义的 MCP(模型上下文协议)工具直接与 InstaVibe 平台互动。此工具可为代理提供特定功能,以起草活动建议并创建概述计划的帖子。
- 编排器代理:此代理充当中央协调器。它从 InstaVibe 平台接收初始用户请求,了解总体目标(例如,“帮我和我的朋友策划活动”),然后以逻辑顺序将特定任务委托给相应的专业代理。它负责管理代理之间的信息流,并确保将最终结果返回给用户。
关键架构元素和技术
Google Cloud Platform (GCP):
- Vertex AI:
- Gemini 模型:可访问 Google 最先进的大语言模型 (LLM),例如 Gemini,这些模型可为代理提供推理和决策能力。
- Vertex AI Agent Engine:一项用于部署、托管和扩缩编排器代理的托管式服务,可简化生产化流程并抽象化基础设施复杂性。
- Cloud Run:用于部署容器化应用的无服务器平台。我们使用它来:
- 托管主要 InstaVibe Web 应用。
- 将各个启用 A2A 的代理(Planner、Social Profiling、Platform Interaction)部署为独立的微服务。
- 运行 MCP 工具服务器,使 InstaVibe 的内部 API 可供代理使用。
- Spanner:一款全代管式、全球分布式且具有强一致性的关系型数据库。在此研讨会中,我们将利用其作为图数据库的功能,使用其 GRAPH DDL 和查询功能来执行以下操作:
- 对复杂社交关系(用户、好友关系、活动出席情况、帖子)进行建模和存储。
- 使社交分析代理能够高效查询这些关系。
- Artifact Registry:一项全托管式服务,用于存储、管理和保护容器映像。
- Cloud Build:一项可在 Google Cloud 上执行构建的服务。我们使用它从代理和应用源代码自动构建 Docker 容器映像。
- Cloud Storage:供 Cloud Build 等服务用于存储 build 工件,以及供 Agent Engine 用于满足其运营需求。
- 核心代理框架和协议:
- Google 的智能体开发套件 (ADK):主要框架,用于:
- 为各个智能代理定义核心逻辑、行为和指令集。
- 管理代理生命周期、状态和内存(短期会话状态和潜在的长期知识)。
- 集成代理可用于与世界互动的工具(例如 Google 搜索或自定义构建的工具)。
- 编排多代理工作流,包括子代理的顺序、循环和并行执行。
- Agent-to-Agent (A2A) Communication Protocol: An open standard enabling:
- 不同 AI 代理之间的直接标准化通信和协作,即使它们作为单独的服务运行或在不同的机器上运行。
- 代理能够发现彼此的能力(通过代理卡片)并委托任务。这对于我们的 Orchestrator 代理与专业的 Planner、Social 和 Platform 代理进行交互至关重要。
- A2A Python 库 (a2a-python):用于使 ADK 代理能够使用 A2A 协议的具体库。它提供所需的服务器端组件,以便:
- 将我们的代理公开为符合 A2A 标准的服务器。
- 自动处理“代理卡”的投放以供发现。
- 接收和管理来自其他代理(例如 Orchestrator)的传入任务请求。
- Model Context Protocol (MCP):一种开放标准,可让代理执行以下操作:
- 以标准化方式连接和利用外部工具、数据源和系统。
- 我们的平台互动代理使用 MCP 客户端与 MCP 服务器通信,而 MCP 服务器会公开一些工具,以便与 InstaVibe 平台的现有 API 进行互动。
- Google 的智能体开发套件 (ADK):主要框架,用于:
- 调试工具:
- A2A 检查器:A2A 检查器是一种基于网页的调试工具,在本讲座中用于连接、检查和与支持 A2A 的代理互动。虽然它不是最终生产架构的一部分,但却是我们开发工作流程的重要组成部分。它提供:
- 代理卡片查看器:用于提取和验证代理的公开功能。
- 实时聊天界面:直接向已部署的代理发送消息,以便立即进行测试。
- 调试控制台:用于查看检查器和代理之间交换的原始 JSON-RPC 消息。
- A2A 检查器:A2A 检查器是一种基于网页的调试工具,在本讲座中用于连接、检查和与支持 A2A 的代理互动。虽然它不是最终生产架构的一部分,但却是我们开发工作流程的重要组成部分。它提供:
- 语言模型 (LLM):系统的“大脑”:
- Google 的 Gemini 模型:具体来说,我们使用 gemini-2.0-flash 等版本。之所以选择这些模型,是因为:
- 高级推理和指令遵循:它们能够理解复杂的提示、遵循详细的指令并对任务进行推理,因此非常适合为智能体决策提供支持。
- 工具使用(函数调用):Gemini 模型非常擅长确定何时以及如何使用通过 ADK 提供的工具,从而使代理能够收集信息或执行操作。
- 效率(Flash 模型):“Flash”变体在性能和性价比方面实现了良好的平衡,适用于许多需要快速响应的交互式代理任务。
- Google 的 Gemini 模型:具体来说,我们使用 gemini-2.0-flash 等版本。之所以选择这些模型,是因为:
需要 Google Cloud 赠金?
3. 准备工作
👉点击 Google Cloud 控制台顶部的激活 Cloud Shell(这是 Cloud Shell 窗格顶部的终端形状图标),
👉点击“打开编辑器”按钮(看起来像一个带有铅笔的打开的文件夹)。此操作会在窗口中打开 Cloud Shell 代码编辑器。您会在左侧看到文件浏览器。
👉如图所示,点击底部状态栏中的 Cloud Code 登录按钮。按照说明对插件进行授权。如果您在状态栏中看到 Cloud Code - no project,请选择该选项,然后在下拉菜单中选择“Select a Google Cloud Project”(选择 Google Cloud 项目),然后从您创建的项目列表中选择特定的 Google Cloud 项目。
👉 查找您的 Google Cloud 项目 ID:
- 打开 Google Cloud 控制台:https://console.cloud.google.com
- 从页面顶部的项目下拉菜单中选择要用于本次研讨会的项目。
- 您的项目 ID 会显示在信息中心的项目信息卡片中
👉在云 IDE 中打开终端,
👉💻 在终端中,使用以下命令验证您是否已通过身份验证,以及项目是否已设置为您的项目 ID:
gcloud auth list
👉💻 从 GitHub 克隆 instavibe-bootstrap
项目:
git clone -b adk-1.2.1-a2a-0.2.7 https://github.com/weimeilin79/instavibe-bootstrap.git
chmod +x ~/instavibe-bootstrap/init.sh
chmod +x ~/instavibe-bootstrap/set_env.sh
了解项目结构
在开始构建之前,我们先花点时间了解您刚刚克隆的 instavibe-bootstrap
项目的布局。这有助于您了解在整个研讨会期间,在何处查找和修改文件。
instavibe-bootstrap/
├── agents/
│ ├── orchestrate/
│ ├── planner/
│ ├── platform_mcp_client/
│ └── social/
├── instavibe/
│ ├── static/
│ └── templates/
├── tools/
│ └── instavibe/
├── utils/
├── init.sh
└── set_env.sh
以下是主要目录的细分:
agents/
:这是我们 AI 系统的核心。每个子目录(planner/、social/ 等)都包含特定智能代理的源代码。agent.py
:在每个代理的文件夹中,这是代理逻辑的主要文件。a2a_server.py
:此文件使用 Agent-to-Agent (A2A) 服务器封装 ADK 代理。Dockerfile
:定义如何构建容器映像,以便将代理部署到 Cloud Run 或 Agent Engine。
instavibe/
:此目录包含 InstaVibe Web 应用的整个源代码。tools/
:此目录用于构建我们的代理可以使用的外部工具。instavibe/
包含 Model Context Protocol (MCP) 服务器。
这种模块化结构将 Web 应用与各种 AI 组件分开,使整个系统更易于管理、测试和部署。
👉💻 运行初始化脚本:
此脚本会提示您输入 Google Cloud 项目 ID。
当 init.sh
脚本提示时,输入您在上一步中找到的 Google Cloud 项目 ID:
cd ~/instavibe-bootstrap
./init.sh
👉💻 设置所需的项目 ID:
gcloud config set project $(cat ~/project_id.txt) --quiet
👉💻 运行以下命令以启用必要的 Google Cloud API:
gcloud services enable run.googleapis.com \
cloudfunctions.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
spanner.googleapis.com \
apikeys.googleapis.com \
iam.googleapis.com \
compute.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com \
maps-backend.googleapis.com
👉💻 设置所需的所有环境变量:
export PROJECT_ID=$(gcloud config get project)
export PROJECT_NUMBER=$(gcloud projects describe ${PROJECT_ID} --format="value(projectNumber)")
export SERVICE_ACCOUNT_NAME=$(gcloud compute project-info describe --format="value(defaultServiceAccount)")
export SPANNER_INSTANCE_ID="instavibe-graph-instance"
export SPANNER_DATABASE_ID="graphdb"
export GOOGLE_CLOUD_PROJECT=$(gcloud config get project)
export GOOGLE_GENAI_USE_VERTEXAI=TRUE
export GOOGLE_CLOUD_LOCATION="us-central1"
设置权限
👉💻 授予权限。在终端中,运行以下命令:
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/spanner.admin"
# Spanner Database User
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/spanner.databaseUser"
# Artifact Registry Admin
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/artifactregistry.admin"
# Cloud Build Editor
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/cloudbuild.builds.editor"
# Cloud Run Admin
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/run.admin"
# IAM Service Account User
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/iam.serviceAccountUser"
# Vertex AI User
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/aiplatform.user"
# Logging Writer (to allow writing logs)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/logging.logWriter"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/logging.viewer"
👉 在 IAM 控制台中验证结果
👉💻 在终端中运行以下命令,以创建 Artifact Registry 代码库。在部署到 Cloud Run 或 Agent Engine 之前,我们代理、MCP 服务器和 InstaVibe 应用的所有 Docker 映像都存储在此处。
export REPO_NAME="introveally-repo"
gcloud artifacts repositories create $REPO_NAME \
--repository-format=docker \
--location=us-central1 \
--description="Docker repository for InstaVibe workshop"
为 API 密钥设置地图平台
如需在 InstaVibe 应用中使用 Google 地图服务,您需要创建 API 密钥并对其进行适当的限制。
👉 在新标签页中,前往 API 和服务 > 凭据。在“凭据”页面上,点击顶部的“+ 创建凭据”按钮。从下拉菜单中选择“API 密钥”。
👉 系统会显示一个对话框,其中包含您新创建的 API 密钥。稍后您需要使用它来配置应用。
👉 点击“已创建的 API 密钥”对话框中的“关闭”。
👉 您会看到列出的新 API 密钥(例如,“API 密钥 1”)。点击右侧的三点状图标,然后选择修改 API 密钥,以打开“限制和重命名 API 密钥”页面。
👉 在顶部的“名称”字段中,将默认名称更改为:Maps Platform API 密钥(🚨🚨重要提示🚨🚨请使用此名称!)
Maps Platform API Key
👉 在“应用限制”部分下,确保已选择无。
👉 在“API 限制”部分下,选择“限制密钥”单选按钮。
👉 点击“选择 API”下拉菜单。在随即显示的搜索框中,输入 Maps JavaScript API
,然后从列表中选择该应用。
👉 点击“确定”。
👉 点击页面底部的“保存”按钮。
您现在已成功创建名为“Maps Platform API Key”的 API 密钥,并将其限制为仅允许使用“Maps JavaScript API”,同时确保该 API 已针对您的项目启用。
4. 设置图数据库
在构建智能代理之前,我们需要一种方法来存储和理解 InstaVibe 社交网络中的丰富关联。这时,图数据库就派上用场了。与将数据存储在行和列组成的表格中的传统关系型数据库不同,图数据库专门用于以节点(例如人物、活动或帖子)和连接这些节点的关系(边)(例如好友关系、活动出席情况或提及情况)的形式表示和查询数据。这种结构对于社交媒体应用来说非常强大,因为它反映了现实世界中社交网络的结构,因此可以直观地探索不同实体之间的关联。
我们正在使用 Google Cloud Spanner 实现此图数据库。虽然 Spanner 主要以全球分布式、高度一致的关系型数据库而闻名,但它还允许我们直接在关系型表之上定义和查询图结构。
这让我们能够同时受益于 Spanner 的可伸缩性、事务一致性和熟悉的 SQL 界面,以及图查询在分析对 AI 赋能功能至关重要的复杂社会动态方面的表达能力。
👉💻 在 Cloud Shell IDE 终端中。在 Google Cloud 上预配必要的基础设施。我们首先创建一个 Spanner 实例,该实例充当数据库的专用容器。实例准备就绪后,我们将在其中创建实际的 Spanner 数据库,该数据库将包含 InstaVibe 的所有表和图数据:
. ~/instavibe-bootstrap/set_env.sh
gcloud spanner instances create $SPANNER_INSTANCE_ID \
--config=regional-us-central1 \
--description="GraphDB Instance InstaVibe" \
--processing-units=100 \
--edition=ENTERPRISE
gcloud spanner databases create $SPANNER_DATABASE_ID \
--instance=$SPANNER_INSTANCE_ID \
--database-dialect=GOOGLE_STANDARD_SQL
👉💻 向默认服务账号授予 Spanner 读/写权限
echo "Granting Spanner read/write access to ${SERVICE_ACCOUNT_NAME} for database ${SPANNER_DATABASE_ID}..."
gcloud spanner databases add-iam-policy-binding ${SPANNER_DATABASE_ID} \
--instance=${SPANNER_INSTANCE_ID} \
--member="serviceAccount:${SERVICE_ACCOUNT_NAME}" \
--role="roles/spanner.databaseUser" \
--project=${PROJECT_ID}
👉💻 现在。我们将设置 Python 虚拟环境,安装所需的 Python 软件包,然后在 Spanner 中设置图数据库架构,并使用初始数据加载该架构,然后运行 setup.py
脚本。
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap
python -m venv env
source env/bin/activate
pip install -r requirements.txt
cd instavibe
python setup.py
👉 在新的浏览器标签页中,前往 Google Cloud 控制台,然后前往 Spanner,您应该会看到 Spanner 实例列表。点击 instavibe-graph-instance
。 👉 在实例概览页面上,您会看到该实例中的数据库列表。点击
graphdb
👉 在数据库的左侧导航窗格中,点击 Spanner Studio
👉 在查询编辑器(“无标题查询”标签页)中,粘贴以下 Graph SQL 查询。此查询将查找所有 Person 节点以及它们与其他 Person 节点之间的直接 Friendship 关系。然后点击运行以查看结果。
Graph SocialGraph
MATCH result_paths = ((p:Person)-[f:Friendship]-(friend:Person))
RETURN SAFE_TO_JSON(result_paths) AS result_paths
👉 在同一查询编辑器中,替换之前的 DDL 以查找参加过同一活动的人员,这意味着他们通过共同的活动间接联系在一起。
Graph SocialGraph
MATCH result_paths = (p1:Person)-[:Attended]->(e:Event)<-[:Attended]-(p2:Person)
WHERE p1.person_id < p2.person_id
RETURN SAFE_TO_JSON(result_paths) AS result_paths
👉 此查询探索了另一种类型的连接,即特定用户的朋友所撰写的帖子中提及的用户。在查询编辑器中运行以下查询。
Graph SocialGraph
MATCH result_paths = (user:Person {name: "Alice"})-[:Friendship]-(friend:Person)-[:Wrote]->(post:Post)-[:Mentioned]->(mentioned_person:Person)
WHERE user <> mentioned_person AND friend <> mentioned_person -- Avoid self-mentions or friend mentioning themselves in their own post if not intended
RETURN SAFE_TO_JSON(result_paths) AS result_paths
这些查询只是展示了将 Spanner 用作 InstaVibe 应用的图数据库的强大功能。通过将社交数据建模为互连图,我们可以对关系和活动进行精细分析,这将为 AI 智能体理解用户背景信息、发现用户兴趣并最终提供智能社交规划辅助功能奠定基础。
现在,我们已经设置并测试了基础数据结构,接下来让我们将注意力转向代理将与之互动的现有 InstaVibe 应用。
5. InstaVibe 的当前状态
为了了解 AI 代理将如何发挥作用,我们首先需要部署并运行现有的 InstaVibe Web 应用。此应用提供用户界面和基本功能,可连接到我们已设置的 Spanner 图数据库。
InstaVibe 应用使用 Google 地图在其活动详情页面上直观地显示活动地点。如需启用此功能,应用需要我们之前创建的 API 密钥。以下脚本将使用我们分配的显示名称(“Maps Platform API Key”)检索实际的密钥字符串。
👉💻 返回到 Cloud Shell IDE。运行以下脚本。之后,仔细检查输出,确保显示的 GOOGLE_MAPS_API_KEY 与您之前在 Google Cloud 控制台中创建并复制的密钥一致。
. ~/instavibe-bootstrap/set_env.sh
export KEY_DISPLAY_NAME="Maps Platform API Key"
GOOGLE_MAPS_KEY_ID=$(gcloud services api-keys list \
--project="${PROJECT_ID}" \
--filter="displayName='${KEY_DISPLAY_NAME}'" \
--format="value(uid)" \
--limit=1)
GOOGLE_MAPS_API_KEY=$(gcloud services api-keys get-key-string "${GOOGLE_MAPS_KEY_ID}" \
--project="${PROJECT_ID}" \
--format="value(keyString)")
echo "${GOOGLE_MAPS_API_KEY}" > ~/mapkey.txt
echo "Retrieved GOOGLE_MAPS_API_KEY: ${GOOGLE_MAPS_API_KEY}"
👉💻 现在,我们来为 InstaVibe Web 应用构建容器映像,并将其推送到 Artifact Registry 代码库。
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/instavibe/
export IMAGE_TAG="latest"
export APP_FOLDER_NAME="instavibe"
export IMAGE_NAME="instavibe-webapp"
export IMAGE_PATH="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${IMAGE_TAG}"
export SERVICE_NAME="instavibe"
gcloud builds submit . \
--tag=${IMAGE_PATH} \
--project=${PROJECT_ID}
👉💻 将新 build 的 InstaVibe Web 应用映像部署到 Cloud Run
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/instavibe/
export IMAGE_TAG="latest"
export APP_FOLDER_NAME="instavibe"
export IMAGE_NAME="instavibe-webapp"
export IMAGE_PATH="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${IMAGE_TAG}"
export SERVICE_NAME="instavibe"
gcloud run deploy ${SERVICE_NAME} \
--image=${IMAGE_PATH} \
--platform=managed \
--region=${REGION} \
--allow-unauthenticated \
--set-env-vars="SPANNER_INSTANCE_ID=${SPANNER_INSTANCE_ID}" \
--set-env-vars="SPANNER_DATABASE_ID=${SPANNER_DATABASE_ID}" \
--set-env-vars="APP_HOST=0.0.0.0" \
--set-env-vars="APP_PORT=8080" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_MAPS_API_KEY=${GOOGLE_MAPS_API_KEY}" \
--project=${PROJECT_ID} \
--min-instances=1
部署成功完成后,Cloud Run 日志应显示正在运行的 InstaVibe 应用的公开网址。
您还可以在 Google Cloud 控制台中前往 Cloud Run 部分,然后选择 instavibe 服务,找到此网址。
现在,请在网络浏览器中打开该网址,探索 InstaVibe 平台的基本功能。查看由我们设置的图数据库提供支持的帖子、活动和用户连接。
现在,我们的目标应用已在运行,接下来,我们开始构建第一个智能代理,以增强其功能。
6. 使用 ADK 构建基本智能体“活动规划师”
ADK 框架
Google 的 ADK 框架简介 现在,我们已经打好了基础(InstaVibe 应用和数据库),接下来就可以使用 Google 的智能体开发套件 (ADK) 开始构建第一个智能代理了。
智能体开发套件 (ADK) 是一个灵活的模块化框架,专门用于开发和部署 AI 智能体。其设计原则是让智能体开发更像传统的软件开发,旨在让开发者能够更轻松地创建、部署和编排智能体架构,从而处理从简单的单用途任务到复杂的多智能体工作流等各种任务。
从核心本质上来说,ADK 围绕着 Agent
的概念,其中封装了指令、配置(例如所选的语言模型,如 Gemini),以及一组可用于执行操作或收集信息的 Tools
。
我们的初始代理将是“活动策划师”。其核心用途是接收用户关于社交活动的请求(指定地点、日期和兴趣),并生成富有创意且量身定制的建议。为了确保建议的相关性并基于最新信息(例如周末发生的特定活动),我们将利用 ADK 的内置工具之一:Google 搜索。这样,代理就可以根据实时网络搜索结果生成回答,获取符合用户条件的场地、活动和事件的最新详细信息。
👉📝 返回 Cloud Shell IDE,在 ~/instavibe-bootstrap/agents/planner/agent.py
中添加以下提示和指令以创建代理
from google.adk.agents import Agent
from google.adk.tools import google_search
root_agent = Agent(
name="planner_agent",
model="gemini-2.0-flash",
description="Agent tasked with generating creative and fun dating plan suggestions",
instruction="""
You are a specialized AI assistant tasked with generating creative and fun plan suggestions.
Request:
For the upcoming weekend, specifically from **[START_DATE_YYYY-MM-DD]** to **[END_DATE_YYYY-MM-DD]**, in the location specified as **[TARGET_LOCATION_NAME_OR_CITY_STATE]** (if latitude/longitude are provided, use these: Lat: **[TARGET_LATITUDE]**, Lon: **[TARGET_LONGITUDE]**), please generate a distinct dating plan suggestions.
Constraints and Guidelines for Suggestions:
1. Creativity & Fun: Plans should be engaging, memorable, and offer a good experience for a date.
2. Budget: All generated plans should aim for a moderate budget (conceptually "$$"), meaning they should be affordable yet offer good value, without being overly cheap or extravagant. This budget level should be *reflected in the choice of activities and venues*, but **do not** explicitly state "Budget: $$" in the `plan_description`.
3. Interest Alignment:
Consider the following user interests: **[COMMA_SEPARATED_LIST_OF_INTERESTS, e.g., outdoors, arts & culture, foodie, nightlife, unique local events, live music, active/sports]**. Tailor suggestions specifically to these where possible. The plan should *embody* these interests.
Fallback: If specific events or venues perfectly matching all listed user interests cannot be found for the specified weekend, you should create a creative and fun generic dating plan that is still appealing, suitable for the location, and adheres to the moderate budget. This plan should still sound exciting and fun, even if it's more general.
4. Current & Specific: Prioritize finding specific, current events, festivals, pop-ups, or unique local venues operating or happening during the specified weekend dates. If exact current events cannot be found, suggest appealing evergreen options or implement the fallback generic plan.
5. Location Details: For each place or event mentioned within a plan, you MUST provide its name, precise latitude, precise longitude, and a brief, helpful description.
6. Maximum Activities: The plan must contain a maximum of 3 distinct activities.
RETURN PLAN in MARKDOWN FORMAT
""",
tools=[google_search]
)
这样,我们就定义了第一个代理!ADK 的一大优势在于其直观性以及提供的实用工具。其中一个特别有用的工具是 ADK Dev UI,它可让您以交互方式测试代理,并实时查看其响应。
👉💻 让我们开始吧。以下命令将启动 ADK DEV 界面:
. ~/instavibe-bootstrap/set_env.sh
source ~/instavibe-bootstrap/env/bin/activate
cd ~/instavibe-bootstrap/agents
sed -i "s|^\(O\?GOOGLE_CLOUD_PROJECT\)=.*|GOOGLE_CLOUD_PROJECT=${PROJECT_ID}|" ~/instavibe-bootstrap/agents/planner/.env
adk web
运行命令后,您应该会在终端中看到表明 ADK Web 服务器已启动的输出,如下所示:
+-----------------------------------------------------------------------------+
| ADK Web Server started |
| |
| For local testing, access at http://localhost:8000. |
+-----------------------------------------------------------------------------+
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
👉 接下来,如需通过浏览器访问 ADK 开发者界面,请执行以下操作:
在 Cloud Shell 工具栏(通常位于右上角)中,点击网页预览图标(通常看起来像眼睛或带有箭头的正方形),然后选择更改端口。在弹出式窗口中,将端口设置为 8000,然后点击“更改并预览”。然后,Cloud Shell 会打开一个新的浏览器标签页或窗口,其中显示 ADK Dev 界面。
在浏览器中打开 ADK Dev 界面后:在界面右上角的下拉菜单中,选择要与之互动的代理 planner。现在,在右侧的聊天对话框中,尝试为代理分配任务。例如,与智能体对话:
Search and plan something in Seattle for me this weekend
This weekend and I enjoy food and anime
建议日期(您的偏好)
July 12 2025
您应该会看到代理处理您的请求,并根据其 Google 搜索结果提供方案。
现在,与代理互动是一回事,但我们如何知道它是否始终按预期运行,尤其是在我们进行更改时?
由于 AI 代理具有生成性和不确定性,因此传统的软件测试方法往往无法满足其需求。为了弥合从酷炫的演示到可靠的生产代理之间的差距,制定可靠的评估策略至关重要。与仅检查生成模型的最终输出不同,评估智能体通常涉及评估其决策过程以及在各种场景中正确使用工具或遵循指令的能力。ADK 提供了一些功能来帮助解决此问题。
👉 在 ADK Dev 界面中,点击左侧导航栏中的“Eval”标签页。您应该会看到一个名为 plan_eval
的预加载测试文件。此文件包含用于测试规划器代理的预定义输入和条件。
👉 选择一个场景,例如“boston”,然后点击 Run Evaluation 按钮。在随即显示的弹出式窗口中,将匹配得分降低到 0.3,然后点击“开始”。
这将使用测试输入执行代理,并检查其输出是否符合定义的预期。这样一来,您就可以系统地测试代理的性能。
👉 现在,我们来看看如果采用更严格的阈值会发生什么情况。选择“nyc”方案,然后再次点击 Run Evaluation。这次,将匹配得分保留为默认值(回答匹配得分:0.7),然后点击“开始”。您会注意到结果为“失败”。这是预期行为,因为代理的创意输出与预定义的“标准”答案并不完全一致。
👉 如需了解失败原因,请点击“nyc”行中的失败图标。界面现在会并排显示代理的实际响应和测试用例的预期响应。此视图对于调试至关重要,可让您准确了解代理的输出在何处出现偏差,并相应地优化其指令。
探索完界面和评估后,返回 Cloud Shell 编辑器终端,然后按 Ctrl+C
停止 ADK Dev 界面。
虽然自由格式的文本输出是一个不错的开端,但对于 InstaVibe 等应用来说,为了便于使用代理的建议,结构化数据(如 JSON)会更加实用。我们来修改代理,使其以一致的 JSON 格式返回其计划。
👉📝 在 ~/instavibe-bootstrap/agents/planner/agent.py
中,找到代理的指令字符串中目前显示为 RETURN PLAN in MARKDOWN FORMAT
的行。将该行替换为以下详细的 JSON 结构:
Return your response *exclusively* as a single JSON object. This object should contain a top-level key, "fun_plans", which holds a plan objects. Each plan object in the list must strictly adhere to the following structure:
--json--
{
"plan_description": "A summary of the overall plan, consisting of **exactly three sentences**. Craft these sentences in a friendly, enthusiastic, and conversational tone, as if you're suggesting this awesome idea to a close friend. Make it sound exciting and personal, highlighting the positive aspects and appeal of the plan without explicitly mentioning budget or listing interest categories.",
"locations_and_activities": [
{
"name": "Name of the specific place or event",
"latitude": 0.000000, // Replace with actual latitude
"longitude": 0.000000, // Replace with actual longitude
"description": "A brief description of this place/event, why it's suitable for the date, and any specific details for the weekend (e.g., opening hours, event time)."
}
// Add more location/activity objects here if the plan involves multiple stops/parts
]
}
现在,您已更新代理的指令,明确要求 JSON 输出,接下来我们来验证一下更改。
👉💻 使用与之前相同的命令重新启动 ADK Dev 界面:
. ~/instavibe-bootstrap/set_env.sh
source ~/instavibe-bootstrap/env/bin/activate
cd ~/instavibe-bootstrap/agents
adk web
如果您已打开该标签页,请刷新该标签页。或者,按照之前的相同步骤在浏览器中打开 ADK Dev 界面(通过 Cloud Shell 在端口 8000 上进行网页预览)。界面加载完毕后,确保已选择规划器代理。
👉 这次,我们给它一个不同的请求。在聊天对话框中,输入:
Plan an event Boston this weekend with art and coffee
仔细检查客服人员的回答。现在,您应该会看到严格按照我们在指令中定义的结构(包含 fun_plans、plan_description、locations_and_activities 等)格式化的 JSON 对象,而不是纯粹的对话式文本回答。这确认了代理现在可以生成适合我们的 InstaVibe 应用以程序化方式使用的结构化输出。
确认 JSON 输出后,返回到 Cloud Shell 终端,然后按 Ctrl+C
停止 ADK Dev 界面。
ADK 组件
虽然 ADK Dev 界面非常适合交互式测试,但我们经常需要以编程方式运行代理,也许是作为大型应用或后端服务的一部分。为了了解其工作原理,我们来看看与运行时和上下文管理相关的一些核心 ADK 概念。
有意义的多轮对话需要代理了解上下文,即回忆之前说过和做过的事情,以保持对话的连贯性。ADK 提供结构化方式,通过会话、状态和记忆来管理此上下文:
- 会话:当用户开始与智能体互动时,系统会创建会话。您可以将其视为单个特定聊天串的容器。它包含唯一 ID、互动历史记录(事件)、当前工作数据(状态)以及上次更新时间等元数据。
- 状态:这是代理在单个会话中的短期工作记忆。这是一个可变字典,代理可以在其中存储完成当前任务所需的临时信息(例如,目前收集的用户偏好设置、工具调用的中间结果)。
- 记忆:这表示代理在不同会话中进行长期回忆或访问外部知识库的潜力。虽然会话和状态可以处理即时对话,但内存(通常由 MemoryService 管理)允许代理从过去的互动或结构化数据源中检索信息,从而获得更广泛的知识背景。(注意:为简单起见,我们的简单客户端使用内存服务,这意味着内存/状态仅在脚本运行时保持不变)。
- 事件:会话中的每次互动(用户消息、智能体回答、工具使用请求、工具结果、状态变化、错误)都会记录为不可变的事件。这会创建一个按时间顺序排列的日志,其中包含对话的转写内容和操作历史记录。
那么,当代理运行时,这些变量是如何管理的?这是运行程序的工作。
- 运行器:运行器是 ADK 提供的核心执行引擎。您定义代理及其使用的工具,而 Runner 会协调完成用户请求的过程。它管理会话、处理事件流、更新状态、调用底层语言模型、协调工具调用,并可能与 MemoryService 交互。您可以将其视为指挥家,确保所有不同的部分都能正确地协同工作。
我们可以使用 Runner 将代理作为独立的 Python 应用运行,完全独立于开发者界面。
我们来创建一个简单的客户端脚本,以通过编程方式调用规划器代理。
👉📝 在 ~/instavibe-bootstrap/agents/planner/planner_client.py
文件中,在现有导入项下添加以下 Python 代码。在 planner_client.py
中,在导入内容下添加以下内容:
async def async_main():
session_service = InMemorySessionService()
session = await session_service.create_session(
state={}, app_name='planner_app', user_id='user_dc'
)
query = "Plan Something for me in San Francisco this weekend on wine and fashion "
print(f"User Query: '{query}'")
content = types.Content(role='user', parts=[types.Part(text=query)])
root_agent = agent.root_agent
runner = Runner(
app_name='planner_app',
agent=root_agent,
session_service=session_service,
)
print("Running agent...")
events_async = runner.run_async(
session_id=session.id, user_id=session.user_id, new_message=content
)
async for event in events_async:
print(f"Event received: {event}")
if __name__ == '__main__':
try:
asyncio.run(async_main())
except Exception as e:
print(f"An error occurred: {e}")
此代码会设置用于会话和制品管理的内存中服务(为简单起见,在此示例中),创建会话,定义用户查询,使用我们的代理配置 Runner,然后异步运行代理,并输出执行期间生成的每个事件。
👉💻 现在,从终端执行此客户端脚本:
. ~/instavibe-bootstrap/set_env.sh
source ~/instavibe-bootstrap/env/bin/activate
cd ~/instavibe-bootstrap/agents
python -m planner.planner_client
👀 观察输出。您将看到代理执行流程期间生成的每个 Event 对象的详细结构,而不仅仅是最终的 JSON 计划。这包括初始用户消息事件、与工具调用(例如 Google 搜索)相关的潜在事件,以及最终包含 JSON 计划的模型响应事件。此详细的事件流对于调试和了解 ADK 运行时中发生的逐步处理非常有用。
Running agent...
Event received: content=Content(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='```json\n{\n "fun_plans": [\n {\n "plan_description": "Embark on a stylish adventure through Hayes Valley,
...(turncated)
, offering a variety of fashion styles to browse and enjoy."\n }\n ]\n }\n ]\n}\n```')], role='model') grounding_metadata=GroundingMetadata(grounding_chunks=[GroundingChunk(retrieved_context=None, web=GroundingChunkWeb(domain='islands.com', title='islands.com', uri='http
...(turncated)
QyTpPV7jS6wUt-Ix7GuP2mC9J4eY_8Km6Vv44liF9cb2VSs='))], grounding_supports=[GroundingSupport(confide
...(turncated)
>\n', sdk_blob=None), web_search_queries=['..e']) partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None invocation_id='e-04d97b8b-9021-47a5-ab41-17b5cbb4bf03' author='location_search_agent' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='CInHdkKw' timestamp=1746978846.232674
如果脚本持续运行或挂起,您可能需要按 Ctrl+C
手动停止它。
7. 平台互动代理 - 与 MCP 服务器互动
虽然 ADK 有助于构建代理,但代理通常需要与外部系统或 API 进行交互才能执行实际操作。
Model Context Protocol (MCP)
Model Context Protocol (MCP) 是一种开放标准,旨在标准化 AI 应用(如智能体)与外部数据源、工具和系统的连接方式。它旨在提供通用接口,从而解决需要为每种 AI 应用和数据源组合进行自定义集成的问题。MCP 采用客户端-服务器架构,其中位于 AI 应用(主机)内的 MCP 客户端管理与 MCP 服务器的连接。这些服务器是外部程序,可提供特定功能,例如访问本地数据、通过 API 与远程服务互动或提供预定义的提示,从而使 AI 模型能够访问当前信息并执行超出其初始训练范围的任务。这种结构使 AI 模型能够以标准化方式发现外部功能并与之互动,从而简化集成并提高可伸缩性。
构建并部署 InstaVibe MCP 服务器
我们的代理最终需要与 InstaVibe 平台本身进行互动。具体来说,就是使用该平台的现有 API 来创建帖子和注册活动。InstaVibe 应用已通过标准 HTTP 端点公开了以下功能:
Enpoint | 网址 | HTTP 方法 | 说明 |
创建帖子 | api/posts | POST | 用于添加新帖子的 API 端点。预期 JSON 正文: |
创建活动 | api/events | POST | 用于添加新活动及其参与者的 API 端点(简化版架构)。 |
为了让我们的代理通过 MCP 使用这些功能,我们首先需要创建简单的 Python 函数,作为这些 API 调用的封装容器。这些函数将处理 HTTP 请求逻辑。
👉 首先,我们来实现用于创建帖子的封装容器函数。打开文件 ~/instavibe-bootstrap/tools/instavibe/instavibe.py
,并将 #REPLACE ME CREATE POST
注释替换为以下 Python 代码:
def create_post(author_name: str, text: str, sentiment: str, base_url: str = BASE_URL):
"""
Sends a POST request to the /posts endpoint to create a new post.
Args:
author_name (str): The name of the post's author.
text (str): The content of the post.
sentiment (str): The sentiment associated with the post (e.g., 'positive', 'negative', 'neutral').
base_url (str, optional): The base URL of the API. Defaults to BASE_URL.
Returns:
dict: The JSON response from the API if the request is successful.
Returns None if an error occurs.
Raises:
requests.exceptions.RequestException: If there's an issue with the network request (e.g., connection error, timeout).
"""
url = f"{base_url}/posts"
headers = {"Content-Type": "application/json"}
payload = {
"author_name": author_name,
"text": text,
"sentiment": sentiment
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
print(f"Successfully created post. Status Code: {response.status_code}")
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error creating post: {e}")
# Optionally re-raise the exception if the caller needs to handle it
# raise e
return None
except json.JSONDecodeError:
print(f"Error decoding JSON response from {url}. Response text: {response.text}")
return None
👉📝 接下来,我们将为事件创建 API 创建封装容器函数。在同一 ~/instavibe-bootstrap/tools/instavibe/instavibe.py
文件中,将 #REPLACE ME CREATE EVENTS
注释替换为以下代码:
def create_event(event_name: str, description: str, event_date: str, locations: list, attendee_names: list[str], base_url: str = BASE_URL):
"""
Sends a POST request to the /events endpoint to create a new event registration.
Args:
event_name (str): The name of the event.
description (str): The detailed description of the event.
event_date (str): The date and time of the event (ISO 8601 format recommended, e.g., "2025-06-10T09:00:00Z").
locations (list): A list of location dictionaries. Each dictionary should contain:
'name' (str), 'description' (str, optional),
'latitude' (float), 'longitude' (float),
'address' (str, optional).
attendee_names (list[str]): A list of names of the people attending the event.
base_url (str, optional): The base URL of the API. Defaults to BASE_URL.
Returns:
dict: The JSON response from the API if the request is successful.
Returns None if an error occurs.
Raises:
requests.exceptions.RequestException: If there's an issue with the network request (e.g., connection error, timeout).
"""
url = f"{base_url}/events"
headers = {"Content-Type": "application/json"}
payload = {
"event_name": event_name,
"description": description,
"event_date": event_date,
"locations": locations,
"attendee_names": attendee_names,
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
print(f"Successfully created event registration. Status Code: {response.status_code}")
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error creating event registration: {e}")
# Optionally re-raise the exception if the caller needs to handle it
# raise e
return None
except json.JSONDecodeError:
print(f"Error decoding JSON response from {url}. Response text: {response.text}")
return None
如您所见,这些函数是现有 InstaVibe API 的简单封装容器。如果您已为服务创建 API,则可以通过创建此类封装容器轻松将这些 API 的功能作为代理的工具公开。
MCP 服务器实现
现在,我们已经有了执行操作(调用 InstaVibe API)的 Python 函数,接下来需要构建 MCP 服务器组件。此服务器将根据 MCP 标准将这些函数公开为“工具”,从而使 MCP 客户端(例如我们的代理)能够发现并调用它们。
MCP 服务器通常实现两项关键功能:
- list_tools:负责允许客户端发现服务器上可用的工具,提供元数据(例如名称、说明和必需的参数),通常使用 JSON 架构定义
- call_tool:处理客户端请求的特定工具的执行,接收工具的名称和实参并执行相应操作,例如在本例中与 API 进行交互
MCP 服务器用于为 AI 模型提供对真实世界数据和操作的访问权限,从而实现发送电子邮件、在项目管理系统中创建任务、搜索数据库或与各种软件和 Web 服务交互等任务。虽然初始实现通常侧重于通过标准输入/输出 (stdio) 进行通信的本地服务器,以实现简单性,尤其是在开发或“工作室”环境中,但转向利用 HTTP 等协议和服务器发送的事件 (SSE) 的远程服务器对于更广泛的采用和企业用例来说更有意义。
尽管增加了网络通信层,但远程架构仍具有显著优势:它允许多个 AI 客户端共享对单个服务器的访问权限,集中管理和更新工具,通过将敏感数据和 API 密钥保留在服务器端而不是分布在可能众多的客户端机器上来增强安全性,并将 AI 模型与外部系统集成的具体细节分离,从而使整个生态系统比要求每个 AI 实例管理自己的直接集成更具可伸缩性、安全性和可维护性。
我们将使用 HTTP 和服务器发送的事件 (SSE) 来实现 MCP 服务器,这非常适合可能长时间运行的工具执行和企业场景。
👉📝 首先,我们来实现 list_tools 端点。打开文件 ~/instavibe-bootstrap/tools/instavibe/mcp_server.py
,并将 #REPLACE ME - LIST TOOLS
注释替换为以下代码。:
@app.list_tools()
async def list_tools() -> list[mcp_types.Tool]:
"""MCP handler to list available tools."""
# Convert the ADK tool's definition to MCP format
mcp_tool_schema_event = adk_to_mcp_tool_type(event_tool)
mcp_tool_schema_post = adk_to_mcp_tool_type(post_tool)
print(f"MCP Server: Received list_tools request. \n MCP Server: Advertising tool: {mcp_tool_schema_event.name} and {mcp_tool_schema_post}")
return [mcp_tool_schema_event,mcp_tool_schema_post]
此函数定义了工具(create_event、create_post),并告知连接的客户端这些工具。
👉📝 接下来,实现 call_tool
端点,该端点用于处理来自客户端的实际执行请求。在同一 ~/instavibe-bootstrap/tools/instavibe/mcp_server.py
文件中,将 #REPLACE ME - CALL TOOLS
注释替换为以下代码。
@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[mcp_types.TextContent | mcp_types.ImageContent | mcp_types.EmbeddedResource]:
"""MCP handler to execute a tool call."""
print(f"MCP Server: Received call_tool request for '{name}' with args: {arguments}")
# Look up the tool by name in our dictionary
tool_to_call = available_tools.get(name)
if tool_to_call:
try:
adk_response = await tool_to_call.run_async(
args=arguments,
tool_context=None, # No ADK context available here
)
print(f"MCP Server: ADK tool '{name}' executed successfully.")
response_text = json.dumps(adk_response, indent=2)
return [mcp_types.TextContent(type="text", text=response_text)]
except Exception as e:
print(f"MCP Server: Error executing ADK tool '{name}': {e}")
# Creating a proper MCP error response might be more robust
error_text = json.dumps({"error": f"Failed to execute tool '{name}': {str(e)}"})
return [mcp_types.TextContent(type="text", text=error_text)]
else:
# Handle calls to unknown tools
print(f"MCP Server: Tool '{name}' not found.")
error_text = json.dumps({"error": f"Tool '{name}' not implemented."})
return [mcp_types.TextContent(type="text", text=error_text)]
此函数接收工具名称和实参,找到我们之前定义的相应 Python 封装函数,执行该函数并返回结果
👉💻 定义 MCP 服务器逻辑后,我们现在需要将其打包为容器。在终端中运行以下脚本,以使用 Cloud Build 构建 Docker 映像:
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/tools/instavibe
export IMAGE_TAG="latest"
export MCP_IMAGE_NAME="mcp-tool-server"
export IMAGE_PATH="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${MCP_IMAGE_NAME}:${IMAGE_TAG}"
export SERVICE_NAME="mcp-tool-server"
export INSTAVIBE_BASE_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep instavibe)/api
gcloud builds submit . \
--tag=${IMAGE_PATH} \
--project=${PROJECT_ID}
👉💻 并将映像作为服务部署到 Google Cloud Run。
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/tools/instavibe
export IMAGE_TAG="latest"
export MCP_IMAGE_NAME="mcp-tool-server"
export IMAGE_PATH="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${MCP_IMAGE_NAME}:${IMAGE_TAG}"
export SERVICE_NAME="mcp-tool-server"
export INSTAVIBE_BASE_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep instavibe)/api
gcloud run deploy ${SERVICE_NAME} \
--image=${IMAGE_PATH} \
--platform=managed \
--region=${REGION} \
--allow-unauthenticated \
--set-env-vars="INSTAVIBE_BASE_URL=${INSTAVIBE_BASE_URL}" \
--set-env-vars="APP_HOST=0.0.0.0" \
--set-env-vars="APP_PORT=8080" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=TRUE" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--project=${PROJECT_ID} \
--min-instances=1
👉💻 部署成功完成后,MCP 服务器将开始运行,并且可通过公共网址访问。我们需要捕获此网址,以便代理(充当 MCP 客户端)知道要连接到哪里。
export MCP_SERVER_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep mcp-tool-server)/sse
现在,您还应该能够在 Google Cloud 控制台的 Cloud Run 部分中看到 mcp-tool-server 服务列为“正在运行”。
部署 MCP 服务器并捕获其网址后,我们现在可以实现将充当 MCP 客户端并利用此服务器公开的工具的代理。
8. 平台互动代理(使用 MCP)
MCP 客户端:MCP 客户端是驻留在 AI 应用或代理中的组件,充当 AI 模型与一个或多个 MCP 服务器之间的接口;在我们的实现中,此客户端将直接集成到我们的代理中。此客户端的主要功能是与 MCP 服务器通信,通过 list_tools
函数发现可用的工具,然后使用 call_tool
函数请求执行特定工具,并传递 AI 模型或编排调用的代理提供的必要实参。
现在,我们将构建充当 MCP 客户端的代理。此代理在 ADK 框架内运行,将负责与我们刚刚部署的 mcp-tool-server
进行通信。
👉 首先,我们需要修改代理定义,以从正在运行的 MCP 服务器动态提取工具。在 agents/platform_mcp_client/agent.py
中,将 #REPLACE ME - FETCH TOOLS
替换为以下内容:
"""Gets tools from the File System MCP Server."""
tools = MCPToolset(
connection_params=SseServerParams(url=MCP_SERVER_URL, headers={})
)
此代码使用 MCPToolset.from_server 方法连接到 MCP_SERVER_网址(我们之前将其设置为环境变量),并检索可用工具的列表。
接下来,我们需要告知 ADK 代理定义实际使用这些动态提取的工具。
👉 在 agents/platform_mcp_client/agent.py
中,将 #REPLACE ME - SET TOOLs
替换为以下内容:
tools=[tools],
👉💻 现在,我们使用 ADK Dev 界面在本地测试此代理,看看它是否可以正确连接到 MCP 服务器并使用工具与正在运行的 InstaVibe 应用进行交互。
. ~/instavibe-bootstrap/set_env.sh
source ~/instavibe-bootstrap/env/bin/activate
export MCP_SERVER_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep mcp-tool-server)/sse
cd ~/instavibe-bootstrap/agents
sed -i "s|^\(O\?GOOGLE_CLOUD_PROJECT\)=.*|GOOGLE_CLOUD_PROJECT=${PROJECT_ID}|" ~/instavibe-bootstrap/agents/platform_mcp_client/.env
sed -i "s|^\(O\?MCP_SERVER_URL\)=.*|MCP_SERVER_URL=${MCP_SERVER_URL}|" ~/instavibe-bootstrap/agents/platform_mcp_client/.env
adk web
在浏览器中再次打开 ADK Dev 界面(使用 Cloud Shell 在端口 8000 上进行的网页预览)。这次,在右上角的下拉菜单中选择 platform_mcp_client
代理。
我们来测试一下 create_post 工具。在聊天对话框中,输入以下请求:
Create a post saying "Y'all I just got the cutest lil void baby 😭✨ Naming him Abyss bc he's deep, mysterious, and lowkey chaotic 🔥🖤 #VoidCat #NewRoomie" I'm Julia
代理应处理此请求,确定需要使用 create_post 工具,与 MCP 服务器通信,然后由 MCP 服务器调用 InstaVibe API。
👉 验证步骤:在代理确认操作后,打开运行 InstaVibe 应用的标签页(或刷新该标签页)。您应该会在主动态中看到“Julia”的新帖子!
👉💻 在单独的终端中运行此脚本,以获取 Instavibe 链接(如果需要):
gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep instavibe
👉📝 现在,我们来测试 create_event 工具。在聊天对话框中输入以下多行请求:
Hey, can you set up an event for Hannah and George and me, and I'm Julia? Let's call it 'Mexico City Culinary & Art Day'.
here are more info
{"event_name": "Mexico City Culinary & Art Day",
"description": "A vibrant day in Mexico City for Hannah and George, starting with lunch at one of the city's best taco spots in the hip Condesa neighborhood, followed by an inspiring afternoon exploring the Museo Soumaya's stunning art collection.",
"event_date": "2025-10-17T12:00:00-06:00",
"locations": [
{
"name": "El Tizoncito",
"description": "Considered one of the original creators of tacos al pastor, El Tizoncito offers a legendary taco experience in the heart of Condesa. Their flavorful meats, house salsas, and casual vibe make it a must-visit for foodies.",
"latitude": 19.412179,
"longitude": -99.171308,
"address": "Av. Tamaulipas 122, Hipódromo, Cuauhtémoc, 06100 Ciudad de México, CDMX, Mexico"
},
{
"name": "Museo Soumaya",
"description": "An architectural icon in Mexico City, Museo Soumaya houses over 66,000 works of art, including pieces by Rodin, Dalí, and Rivera. The striking silver structure is a cultural landmark and a visual feast inside and out.",
"latitude": 19.440056,
"longitude": -99.204281,
"address": "Plaza Carso, Blvd. Miguel de Cervantes Saavedra 303, Granada, Miguel Hidalgo, 11529 Ciudad de México, CDMX, Mexico"
}
],
"attendee_names": ["Hannah", "George", Julia],
}
同样,代理应通过 MCP 服务器使用适当的工具。在“事件”标签页中,您可以点击各个事件,查看详细的执行跟踪记录。
👉 验证步骤:返回正在运行的 InstaVibe 应用,然后前往“活动”部分(或类似部分)。现在,您应该会看到新创建的“墨西哥城美食与艺术日”活动。
这成功展示了 MCP 如何以标准化方式让我们的代理利用外部工具(在本例中为 InstaVibe 的 API)。
验证这两项操作后,返回到 Cloud Shell 终端,然后按 Ctrl+C
停止 ADK Dev 界面。
9. ADK 中的工作流代理和多代理
目前,我们的智能体可以规划外出活动并与平台互动。不过,要制定真正个性化的计划,需要了解用户的社交圈。对于可能不会密切关注好友动态的忙碌用户来说,手动收集此类背景信息非常困难。为了解决这个问题,我们将构建一个社交分析代理,利用 Spanner 图数据库分析好友的活动和兴趣,从而提供更量身定制的建议。
首先,我们需要为该智能体提供访问图表数据的工具。
👉📝 将以下 Python 函数添加到文件 ~/instavibe-bootstrap/agents/social/instavibe.py
的末尾:
def get_person_attended_events(person_id: str)-> list[dict]:
"""
Fetches events attended by a specific person using Graph Query.
Args:
person_id (str): The ID of the person whose posts to fetch.
Returns: list[dict] or None.
"""
if not db_instance: return None
graph_sql = """
Graph SocialGraph
MATCH (p:Person)-[att:Attended]->(e:Event)
WHERE p.person_id = @person_id
RETURN e.event_id, e.name, e.event_date, att.attendance_time
ORDER BY e.event_date DESC
"""
params = {"person_id": person_id}
param_types_map = {"person_id": param_types.STRING}
fields = ["event_id", "name", "event_date", "attendance_time"]
results = run_graph_query( graph_sql, params=params, param_types=param_types_map, expected_fields=fields)
if results is None: return None
for event in results:
if isinstance(event.get('event_date'), datetime):
event['event_date'] = event['event_date'].isoformat()
if isinstance(event.get('attendance_time'), datetime):
event['attendance_time'] = event['attendance_time'].isoformat()
return results
def get_person_id_by_name( name: str) -> str:
"""
Fetches the person_id for a given name using SQL.
Args:
name (str): The name of the person to search for.
Returns:
str or None: The person_id if found, otherwise None.
Returns the ID of the *first* match if names are duplicated.
"""
if not db_instance: return None
sql = """
SELECT person_id
FROM Person
WHERE name = @name
LIMIT 1 -- Return only the first match in case of duplicate names
"""
params = {"name": name}
param_types_map = {"name": param_types.STRING}
fields = ["person_id"]
# Use the standard SQL query helper
results = run_sql_query( sql, params=params, param_types=param_types_map, expected_fields=fields)
if results: # Check if the list is not empty
return results[0].get('person_id') # Return the ID from the first dictionary
else:
return None # Name not found
def get_person_posts( person_id: str)-> list[dict]:
"""
Fetches posts written by a specific person using Graph Query.
Args:
person_id (str): The ID of the person whose posts to fetch.
Returns:
list[dict] or None: List of post dictionaries with ISO date strings,
or None if an error occurs.
"""
if not db_instance: return None
# Graph Query: Find the specific Person node, follow 'Wrote' edge to Post nodes
graph_sql = """
Graph SocialGraph
MATCH (author:Person)-[w:Wrote]->(post:Post)
WHERE author.person_id = @person_id
RETURN post.post_id, post.author_id, post.text, post.sentiment, post.post_timestamp, author.name AS author_name
ORDER BY post.post_timestamp DESC
"""
# Parameters now include person_id and limit
params = {
"person_id": person_id
}
param_types_map = {
"person_id": param_types.STRING
}
# Fields returned remain the same
fields = ["post_id", "author_id", "text", "sentiment", "post_timestamp", "author_name"]
results = run_graph_query(graph_sql, params=params, param_types=param_types_map, expected_fields=fields)
if results is None:
return None
# Convert datetime objects to ISO format strings
for post in results:
if isinstance(post.get('post_timestamp'), datetime):
post['post_timestamp'] = post['post_timestamp'].isoformat()
return results
def get_person_friends( person_id: str)-> list[dict]:
"""
Fetches friends for a specific person using Graph Query.
Args:
person_id (str): The ID of the person whose posts to fetch.
Returns: list[dict] or None.
"""
if not db_instance: return None
graph_sql = """
Graph SocialGraph
MATCH (p:Person {person_id: @person_id})-[f:Friendship]-(friend:Person)
RETURN DISTINCT friend.person_id, friend.name
ORDER BY friend.name
"""
params = {"person_id": person_id}
param_types_map = {"person_id": param_types.STRING}
fields = ["person_id", "name"]
results = run_graph_query( graph_sql, params=params, param_types=param_types_map, expected_fields=fields)
return results
现在,我们来讨论一下如何构建代理。分析多位好友的个人资料,然后总结分析结果,这涉及多个步骤。这是使用 ADK 多智能体功能(尤其是工作流智能体)的理想场景。
在 Google 的 ADK 中,工作流代理本身不执行任务,而是编排其他代理(称为子代理)。这样可以实现模块化设计,将复杂问题分解为专门的组件。ADK 提供内置的工作流类型,例如
- 顺序(分步)
- 并行(并发执行)
- 和循环(重复执行)
在社交分析任务中,我们的设计使用 Loop Agent 来创建迭代工作流。目的是一次处理一个人:profile_agent
收集数据,summary_agent
更新分析,以及 check_agent
确定是否应再次循环。
我们来定义此工作流所需的子代理。
👉📝 在 ~/instavibe-bootstrap/agents/social/agent.py
中,将 #REPLACE FOR profile_agent
替换为以下内容:
profile_agent = LlmAgent(
name="profile_agent",
model="gemini-2.5-flash",
description=(
"Agent to answer questions about the this person social profile. Provide the person's profile using their name, make sure to fetch the id before getting other data."
),
instruction=(
"You are a helpful agent to answer questions about the this person social profile. You'll be given a list of names, provide the person's profile using their name, make sure to fetch the id before getting other data. Get one person at a time, start with the first one on the list, and skip if already provided. return this person's result"
),
tools=[get_person_posts,get_person_friends,get_person_id_by_name,get_person_attended_events],
)
接下来,代理会获取收集到的个人资料信息(在循环迭代中累积),并生成最终摘要,如果分析了多个人,则会找出共同点。
👉📝 在同一 ~/instavibe-bootstrap/agents/social/agent.py
中,将 #REPLACE FOR summary_agent
替换为以下内容:
summary_agent = LlmAgent(
name="summary_agent",
model="gemini-2.5-flash",
description=(
"Generate a comprehensive social summary as a single, cohesive paragraph. This summary should cover the activities, posts, friend networks, and event participation of one or more individuals. If multiple profiles are analyzed, the paragraph must also identify and integrate any common ground found between them."
),
instruction=(
"""
Your primary task is to synthesize social profile information into a single, comprehensive paragraph.
**Input Scope & Default Behavior:**
* If specific individuals are named by the user, focus your analysis on them.
* **If no individuals are specified, or if the request is general, assume the user wants an analysis of *all relevant profiles available in the current dataset/context*.**
**For each profile (whether specified or determined by default), you must analyze:**
1. **Post Analysis:**
* Systematically review their posts (e.g., content, topics, frequency, engagement).
* Identify recurring themes, primary interests, and expressed sentiments.
2. **Friendship Relationship Analysis:**
* Examine their connections/friends list.
* Identify key relationships, mutual friends (especially if comparing multiple profiles), and the general structure of their social network.
3. **Event Participation Analysis:**
* Investigate their past (and if available, upcoming) event participation.
* Note the types of events, frequency of attendance, and any notable roles (e.g., organizer, speaker).
**Output Generation (Single Paragraph):**
* **Your entire output must be a single, cohesive summary paragraph.**
* **If analyzing a single profile:** This paragraph will detail their activities, interests, and social connections based on the post, friend, and event analysis.
* **If analyzing multiple profiles:** This paragraph will synthesize the key findings regarding posts, friends, and events for each individual. Crucially, it must then seamlessly integrate or conclude with an identification and description of the common ground found between them (e.g., shared interests from posts, overlapping event attendance, mutual friends). The aim is a unified narrative within this single paragraph.
**Key Considerations:**
* Base your summary strictly on the available data.
* If data for a specific category (posts, friends, events) is missing or sparse for a profile, you may briefly acknowledge this within the narrative if relevant.
"""
),
output_key="summary"
)
我们需要一种方法来确定循环何时应停止(即,何时已总结所有请求的配置文件)
👉📝 在同一 ~/instavibe-bootstrap/agents/social/agent.py
中,将 #REPLACE FOR check_agent
替换为以下内容:
check_agent = LlmAgent(
name="check_agent",
model="gemini-2.5-flash",
description=(
"Check if everyone's social profile are summarized and has been generated. Output 'completed' or 'pending'."
),
output_key="summary_status"
)
我们添加了一个简单的程序化检查 (CheckCondition),用于明确查看存储在 State 中的 summary_status
(由 check_agent
返回),并告知 Loop 代理是否继续(escalate=False)或停止(escalate=True)。
👉📝 在同一 ~/instavibe-bootstrap/agents/social/agent.py
中,将位于文件顶部的 #REPLACE FOR CheckCondition
替换为以下内容:
class CheckCondition(BaseAgent):
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
#log.info(f"Checking status: {ctx.session.state.get("summary_status", "fail")}")
log.info(f"Summary: {ctx.session.state.get("summary")}")
status = ctx.session.state.get("summary_status", "fail").strip()
is_done = (status == "completed")
yield Event(author=self.name, actions=EventActions(escalate=is_done))
循环结果的状态和回调
在 Google 的 ADK 中,状态是一个关键概念,表示代理在执行期间的内存或工作数据。它本质上是一种持久性上下文,其中包含智能体在不同步骤、工具调用或互动中需要维护的信息。此状态可以存储中间结果、用户信息、后续操作的参数,或代理在完成任务过程中需要记住的任何其他数据。
在我们的场景中,随着 Loop 代理的迭代,summary_agent
和 check_agent
会将其输出(摘要和 summary_status)存储在代理的状态中。这样一来,信息就可以在迭代之间保持不变。不过,Loop Agent 本身不会在完成时自动从状态返回最终摘要。
ADK 中的回调允许我们注入自定义逻辑,以便在代理生命周期的特定时间点或响应某些事件(例如工具调用完成或代理完成执行之前)执行这些逻辑。它们提供了一种自定义代理行为和动态处理结果的方法。
我们将使用在循环结束时运行的 after_agent_callback
(因为 CheckCondition 已升级)。此回调 modify_output_after_agent
从状态中检索最终总结,并将其格式化为代理的最终输出消息。
👉📝 在同一 ~/instavibe-bootstrap/agents/social/agent.py
中,将 #REPLACE FOR modify_output_after_agent
替换为以下内容:
def modify_output_after_agent(callback_context: CallbackContext) -> Optional[types.Content]:
agent_name = callback_context.agent_name
invocation_id = callback_context.invocation_id
current_state = callback_context.state.to_dict()
current_user_content = callback_context.user_content
print(f"[Callback] Exiting agent: {agent_name} (Inv: {invocation_id})")
print(f"[Callback] Current summary_status: {current_state.get("summary_status")}")
print(f"[Callback] Current Content: {current_user_content}")
status = current_state.get("summary_status").strip()
is_done = (status == "completed")
# Retrieve the final summary from the state
final_summary = current_state.get("summary")
print(f"[Callback] final_summary: {final_summary}")
if final_summary and is_done and isinstance(final_summary, str):
log.info(f"[Callback] Found final summary, constructing output Content.")
# Construct the final output Content object to be sent back
return types.Content(role="model", parts=[types.Part(text=final_summary.strip())])
else:
log.warning("[Callback] No final summary found in state or it's not a string.")
# Optionally return a default message or None if no summary was generated
return None
定义根循环代理
最后,我们定义了主 LoopAgent。它在每个循环迭代中按顺序协调各个子代理(profile_agent -> summary_agent -> check_agent -> CheckCondition)。它会重复此序列,最多重复 max_iterations 次,或者直到 CheckCondition 发出完成信号。after_agent_callback 可确保返回最终摘要。
👉📝 在同一 ~/instavibe-bootstrap/agents/social/agent.py
中,将 #REPLACE FOR root_agent
替换为以下内容:
root_agent = LoopAgent(
name="InteractivePipeline",
sub_agents=[
profile_agent,
summary_agent,
check_agent,
CheckCondition(name="Checker")
],
description="Find everyone's social profile on events, post and friends",
max_iterations=10,
after_agent_callback=modify_output_after_agent
)
接下来,我们使用 ADK Dev 界面测试此多智能体工作流。
👉💻 启动 ADK 网络服务器:
. ~/instavibe-bootstrap/set_env.sh
source ~/instavibe-bootstrap/env/bin/activate
cd ~/instavibe-bootstrap/agents
sed -i "s|^\(O\?GOOGLE_CLOUD_PROJECT\)=.*|GOOGLE_CLOUD_PROJECT=${PROJECT_ID}|" ~/instavibe-bootstrap/agents/social/.env
adk web
通过网页预览打开 ADK Dev 界面(端口 8000)。在代理下拉菜单(右上角)中,选择社交代理。
👉 现在,让它分析多个人物。在聊天对话框中,输入:
Tell me about Mike and Bob
在代理做出回答后(由于循环和多次 LLM 调用,这可能需要更长时间),不要只查看最终的聊天输出。前往 ADK 开发者界面左侧窗格中的“事件”标签页。
👉 验证步骤:在“事件”标签页中,您将看到详细的执行跟踪记录,其中包含每个步骤。
在观察代理如何调用每个子代理后,您会发现流程从 profile_agent -> summary_agent -> check_agent,在每次迭代中都包含 Checker。但在实践中,我们看到了代理强大的“自我优化”能力。
由于底层模型会看到整个请求(例如,“分析 Mike 和 Bob”),它通常会选择最有效的路径,在一次整合的对话中收集所有必需的数据,而不是多次迭代。您可以查看每个步骤的输入、输出和状态,包括 profile_agent 进行的工具调用
以及来自 check_agent 和 CheckCondition 的状态更新。
这种可视化轨迹对于了解和调试多代理工作流如何运行(直到回调生成并返回最终摘要)非常有用。
探索完聊天回答和事件轨迹后,返回 Cloud Shell 终端并按 Ctrl+C
停止 ADK Dev 界面。
10. 智能体到智能体 (A2A) 通信
到目前为止,我们已经构建了专业代理,但它们在同一台机器上以隔离方式或在预定义的工作流程中运行。为了构建真正分布式和协作式的多智能体系统,我们需要一种方法,让可能作为单独服务运行的智能体能够相互发现并有效通信。这时,Agent-to-Agent (A2A) 协议就派上用场了。
A2A 协议是一种开放标准,专门用于在 AI 智能体之间实现互操作通信。虽然 MCP 侧重于智能体与工具之间的互动,但 A2A 侧重于智能体与智能体之间的互动。借助此功能,客服人员可以:
- 发现:通过标准化的智能体卡片查找其他智能体并了解其功能。
- 通信:安全地交换消息和数据。
- 协作:委托任务并协调行动,以实现复杂的目标。
A2A 协议通过“代理卡片”等机制促进这种通信,代理可以使用这些机制来宣传其功能和连接信息。
A2A 利用熟悉的 Web 标准(HTTP、SSE、JSON-RPC),通常采用客户端-服务器模型,其中一个代理(客户端)将任务发送给另一个代理(远程代理/服务器)。这种标准化对于构建模块化、可扩缩的系统至关重要,在该系统中,独立开发的代理可以协同工作。
为 InstaVibe 代理启用 A2A
为了让其他代理能够通过 A2A 访问我们现有的 Planner、Platform Interaction 和 Social 代理,我们需要使用 A2A Server 组件封装每个代理。此服务器将:
- 公开代理卡片:通过 HTTP 端点提供代理功能的标准说明。
- Listen for Tasks(Request Messages):根据 A2A 协议接受来自其他代理(A2A 客户端)的传入任务请求。
- 管理任务(请求消息)执行:将收到的任务交给底层 ADK 代理逻辑进行处理。
规划器代理(已启用 A2A)
我们先将 A2A 服务器层添加到 Planner Agent。
定义 A2A 服务器启动逻辑。此代码定义了 AgentCard(代理的公开说明)、配置了 A2AServer 并启动了该服务器,同时将其与 PlatformAgentExecutor 相关联。
👉📝 将以下代码添加到 ~/instavibe-bootstrap/agents/planner/a2a_server.py
的末尾:
class PlannerAgent:
"""An agent to help user planning a event with its desire location."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
def __init__(self):
self._agent = self._build_agent()
self.runner = Runner(
app_name=self._agent.name,
agent=self._agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
capabilities = AgentCapabilities(streaming=True)
skill = AgentSkill(
id="event_planner",
name="Event planner",
description="""
This agent generates multiple fun plan suggestions tailored to your specified location, dates, and interests,
all designed for a moderate budget. It delivers detailed itineraries,
including precise venue information (name, latitude, longitude, and description), in a structured JSON format.
""",
tags=["instavibe"],
examples=["What about Bostona MA this weekend?"],
)
self.agent_card = AgentCard(
name="Event Planner Agent",
description="""
This agent generates multiple fun plan suggestions tailored to your specified location, dates, and interests,
all designed for a moderate budget. It delivers detailed itineraries,
including precise venue information (name, latitude, longitude, and description), in a structured JSON format.
""",
url=f"{PUBLIC_URL}",
version="1.0.0",
defaultInputModes=PlannerAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=PlannerAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
def get_processing_message(self) -> str:
return "Processing the planning request..."
def _build_agent(self) -> LlmAgent:
"""Builds the LLM agent for the night out planning agent."""
return agent.root_agent
if __name__ == '__main__':
try:
plannerAgent = PlannerAgent()
request_handler = DefaultRequestHandler(
agent_executor=PlannerAgentExecutor(plannerAgent.runner,plannerAgent.agent_card),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
agent_card=plannerAgent.agent_card,
http_handler=request_handler,
)
logger.info(f"Attempting to start server with Agent Card: {plannerAgent.agent_card.name}")
logger.info(f"Server object created: {server}")
uvicorn.run(server.build(), host='0.0.0.0', port=port)
except Exception as e:
logger.error(f"An error occurred during server startup: {e}")
exit(1)
👉💻 让我们快速测试一下 A2A 服务器是否在本地正确启动并提供其智能体卡片。在第一个终端中运行以下命令:
. ~/instavibe-bootstrap/set_env.sh
source ~/instavibe-bootstrap/env/bin/activate
cd ~/instavibe-bootstrap/agents/
python -m planner.a2a_server
👉 现在,打开另一个终端窗口。(点击终端面板中的加号)
👉💻 使用 curl 从本地运行的服务器请求代理卡:
curl http://localhost:10003/.well-known/agent.json | jq
您应该会看到我们定义的 AgentCard 的 JSON 表示法,这表明服务器正在运行并宣传 Planner 代理。
返回到第一个终端(服务器正在运行),然后按 Ctrl+C
停止服务器。
👉💻 添加 A2A 服务器逻辑后,我们现在可以构建容器映像了。
构建并部署规划器代理
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/agents
# Set variables specific to the PLANNER agent
export IMAGE_TAG="latest"
export AGENT_NAME="planner"
export IMAGE_NAME="planner-agent"
export IMAGE_PATH="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${IMAGE_TAG}"
export SERVICE_NAME="planner-agent"
export PUBLIC_URL="https://planner-agent-${PROJECT_NUMBER}.${REGION}.run.app"
echo "Building ${AGENT_NAME} agent..."
gcloud builds submit . \
--config=cloudbuild-build.yaml \
--project=${PROJECT_ID} \
--region=${REGION} \
--substitutions=_AGENT_NAME=${AGENT_NAME},_IMAGE_PATH=${IMAGE_PATH}
echo "Image built and pushed to: ${IMAGE_PATH}"
👉💻 并在 Cloud Run 上部署 Planner Agent。
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/agents
# Set variables specific to the PLANNER agent
export IMAGE_TAG="latest"
export AGENT_NAME="planner"
export IMAGE_NAME="planner-agent"
export IMAGE_PATH="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${IMAGE_TAG}"
export SERVICE_NAME="planner-agent"
export PUBLIC_URL="https://planner-agent-${PROJECT_NUMBER}.${REGION}.run.app"
gcloud run deploy ${SERVICE_NAME} \
--image=${IMAGE_PATH} \
--platform=managed \
--region=${REGION} \
--set-env-vars="A2A_HOST=0.0.0.0" \
--set-env-vars="A2A_PORT=8080" \
--set-env-vars="GOOGLE_GENAI_USE_VERTEXAI=TRUE" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="PUBLIC_URL=${PUBLIC_URL}" \
--allow-unauthenticated \
--project=${PROJECT_ID} \
--min-instances=1
我们来验证一下,已部署的服务是否正在运行,以及是否正在从云端正确提供其代理卡片(使用 A2A Inspector)。
👉 在 Cloud Shell 工具栏中,选择“网页预览”图标,然后选择“更改端口”。将端口设置为 8081,然后点击“更改并预览”。系统随即会打开一个新的浏览器标签页,其中包含 A2A 检查器界面。
👉💻 在终端中,获取已部署的规划器代理的网址:
export PLANNER_AGENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep planner-agent)
echo ${PLANNER_AGENT_URL}
👉💻 复制输出网址。
👉 在 A2A 检查器界面中,将网址粘贴到“代理网址”字段中,然后点击“连接”。
👀 代理的卡片详细信息和 JSON 应显示在“代理卡片”标签页上,以确认连接成功。
👉 点击 A2A 检查器中的“聊天”标签页。您可以在此处直接与已部署的代理互动,向其发送消息以测试其规划能力。例如:
Plan something for me in Boston MA this weekend, and I enjoy classical music
👀 如需检查原始通信内容,请依次点击聊天窗口中的消息气泡和代理的回答气泡。点击每个消息后,系统会显示已发送或接收的完整 JSON-RPC 2.0 消息,这对于调试来说非常宝贵。
让我们将 A2A 检查器标签页放在方便的位置。请勿关闭!我们稍后会再次使用它来测试其他两个代理。
平台互动代理 (A2A 已启用)
接下来,我们将针对平台互动代理(使用 MCP 的代理)重复此流程。
👉📝 在 ~/instavibe-bootstrap/agents/platform_mcp_client/a2a_server.py
的末尾定义 A2A 服务器设置,包括其唯一的 AgentCard:
class PlatformAgent:
"""An agent that post event and post to instavibe."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
def __init__(self):
self._agent = self._build_agent()
self.runner = Runner(
app_name=self._agent.name,
agent=self._agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
capabilities = AgentCapabilities(streaming=True)
skill = AgentSkill(
id="instavibe_posting",
name="Post social post and events on instavibe",
description="""
This "Instavibe" agent helps you create posts (identifying author, text, and sentiment – inferred if unspecified) and register
for events (gathering name, date, attendee). It efficiently collects required information and utilizes dedicated tools
to perform these actions on your behalf, ensuring a smooth sharing experience.
""",
tags=["instavibe"],
examples=["Create a post for me, the post is about my cute cat and make it positive, and I'm Alice"],
)
self.agent_card = AgentCard(
name="Instavibe Posting Agent",
description="""
This "Instavibe" agent helps you create posts (identifying author, text, and sentiment – inferred if unspecified) and register
for events (gathering name, date, attendee). It efficiently collects required information and utilizes dedicated tools
to perform these actions on your behalf, ensuring a smooth sharing experience.
""",
url=f"{PUBLIC_URL}",
version="1.0.0",
defaultInputModes=PlatformAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=PlatformAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
def get_processing_message(self) -> str:
return "Processing the social post and event request..."
def _build_agent(self) -> LlmAgent:
"""Builds the LLM agent for the Processing the social post and event request."""
return agent.root_agent
if __name__ == '__main__':
try:
platformAgent = PlatformAgent()
request_handler = DefaultRequestHandler(
agent_executor=PlatformAgentExecutor(platformAgent.runner,platformAgent.agent_card),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
agent_card=platformAgent.agent_card,
http_handler=request_handler,
)
uvicorn.run(server.build(), host='0.0.0.0', port=port)
except Exception as e:
logger.error(f"An error occurred during server startup: {e}")
exit(1)
社交代理 (A2A 已启用)
最后,我们为社交分析代理启用 A2A。
👉📝 在 ~/instavibe-bootstrap/agents/social/a2a_server.py
的末尾定义 A2A 服务器设置和 AgentCard:
class SocialAgent:
"""An agent that handles social profile analysis."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
def __init__(self):
self._agent = self._build_agent()
self.runner = Runner(
app_name=self._agent.name,
agent=self._agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
capabilities = AgentCapabilities(streaming=True)
skill = AgentSkill(
id="social_profile_analysis",
name="Analyze Instavibe social profile",
description="""
Using a provided list of names, this agent synthesizes Instavibe social profile information by analyzing posts, friends, and events.
It delivers a comprehensive single-paragraph summary for individuals, and for groups, identifies commonalities in their social activities
and connections based on profile data.
""",
tags=["instavibe"],
examples=["Can you tell me about Bob and Alice?"],
)
self.agent_card = AgentCard(
name="Social Profile Agent",
description="""
Using a provided list of names, this agent synthesizes Instavibe social profile information by analyzing posts, friends, and events.
It delivers a comprehensive single-paragraph summary for individuals, and for groups, identifies commonalities in their social activities
and connections based on profile data.
""",
url=f"{PUBLIC_URL}",
version="1.0.0",
defaultInputModes=self.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=self.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
def get_processing_message(self) -> str:
return "Processing the social profile analysis request..."
def _build_agent(self) -> LoopAgent:
"""Builds the LLM agent for the social profile analysis agent."""
return agent.root_agent
if __name__ == '__main__':
try:
socialAgent = SocialAgent()
request_handler = DefaultRequestHandler(
agent_executor=SocialAgentExecutor(socialAgent.runner,socialAgent.agent_card),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
agent_card=socialAgent.agent_card,
http_handler=request_handler,
)
uvicorn.run(server.build(), host='0.0.0.0', port=port)
except Exception as e:
logger.error(f"An error occurred during server startup: {e}")
exit(1)
构建和部署平台互动代理和社交代理
这些代理需要访问 Spanner,因此请确保在部署期间正确传递 SPANNER_INSTANCE_ID
、SPANNER_DATABASE_ID
和 MCP_SERVER_URL
环境变量。
👉💻 使用 Cloud Build 构建并部署到 Cloud Run:
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/agents
export MCP_SERVER_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep mcp-tool-server)/sse
gcloud builds submit . \
--config=cloudbuild.yaml \
--project="${PROJECT_ID}" \
--region="${REGION}" \
--substitutions=\
_PROJECT_ID="${PROJECT_ID}",\
_PROJECT_NUMBER="${PROJECT_NUMBER}",\
_REGION="${REGION}",\
_REPO_NAME="${REPO_NAME}",\
_SPANNER_INSTANCE_ID="${SPANNER_INSTANCE_ID}",\
_SPANNER_DATABASE_ID="${SPANNER_DATABASE_ID}",\
_MCP_SERVER_URL="${MCP_SERVER_URL}"
👉💻 在终端中,获取已部署的平台代理的网址:
export PLATFORM_MPC_CLIENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep platform-mcp-client)
echo $PLATFORM_MPC_CLIENT_URL
👉💻 复制输出网址。
👉 在 A2A 检查器界面中,将网址粘贴到“代理网址”字段中,然后点击“连接”。
👀 代理的卡片详细信息和 JSON 应显示在“代理卡片”标签页上,以确认连接成功。
👉 点击 A2A 检查器中的“聊天”标签页。您可以在此处直接与已部署的代理互动,向其发送消息以测试代理创建帖子的能力:
Create a post for me, the post says 'Paws, purrs, and ocean views 🐾☕🌊. Spent my morning at the Morning Seaside Cat Café, where every sip comes with a side of snuggles and sea breeze.' and make it positive, and I'm Oscar.
👀 如需检查原始通信内容,请依次点击聊天窗口中的消息气泡和代理的回答气泡。点击每个消息后,系统会显示已发送或接收的完整 JSON-RPC 2.0 消息,这对于调试来说非常宝贵。
👉💻 在终端中,获取已部署的社交代理的网址:
export SOCIAL_AGENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep social-agent)
echo $SOCIAL_AGENT_URL
👉💻 复制输出网址。
👉 在 A2A 检查器界面中,将网址粘贴到“代理网址”字段中,然后点击“连接”。
👀 代理的卡片详细信息和 JSON 应显示在“代理卡片”标签页上,以确认连接成功。
👉 点击 A2A 检查器中的“聊天”标签页。您可以在此处直接与已部署的代理互动,向其发送消息以分析数据库中的用户个人资料:
Can you tell me about both Ian and Kevin's profile, what are their common interests?
👀 如需检查原始通信内容,请依次点击聊天窗口中的消息气泡和代理的回答气泡。点击每个消息后,系统会显示已发送或接收的完整 JSON-RPC 2.0 消息,这对于调试来说非常宝贵。
👉 很好,我们已完成对所有代理的检查。您现在可以关闭 A2A 检查器标签页。
11. 编排器代理 (A2A 客户端)
现在,我们有三个专用代理(Planner、Platform、Social)在 Cloud Run 上作为独立的 A2A 启用服务运行。最后一部分是 Orchestrator 代理。此代理将充当中央协调器或 A2A 客户端。它将接收用户请求,确定需要哪些远程代理来完成请求(可能按顺序),然后使用 A2A 协议将任务委托给这些远程代理。在此研讨会中,我们将使用 ADK 开发界面在本地运行 Orchestrator 代理。
首先,我们来增强 Orchestrator 的逻辑,以处理其发现的远程代理的注册。在初始化期间存储从提取的代理卡中获取的连接详细信息。
👉📝 在 ~/instavibe-bootstrap/agents/orchestrate/agent.py
中,将 #REPLACE ME REG AGENT CARD
替换为:
async with httpx.AsyncClient(timeout=30) as client:
for i, address in enumerate(REMOTE_AGENT_ADDRESSES):
log.info(f"--- STEP 3.{i}: Attempting connection to: {address} ---")
try:
card_resolver = A2ACardResolver(client, address)
card = await card_resolver.get_agent_card()
remote_connection = RemoteAgentConnections(agent_card=card, agent_url=address)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
log.info(f"--- STEP 5.{i}: Successfully stored connection for {card.name} ---")
except Exception as e:
log.error(f"--- CRITICAL FAILURE at STEP 4.{i} for address: {address} ---")
log.error(f"--- The hidden exception type is: {type(e).__name__} ---")
log.error(f"--- Full exception details and traceback: ---", exc_info=True)
接下来,在 ADK 中为 Orchestrator 代理本身定义工具。
send_message
(用于委托工作的 A2A 函数)。
👉📝 将 ~/instavibe-bootstrap/agents/orchestrate/agent.py
中的 #REPLACE ME CREATE AGENT
替换为:
def create_agent(self) -> Agent:
"""Synchronously creates the ADK Agent object."""
return Agent(
model="gemini-2.5-flash",
name="orchestrate_agent",
instruction=self.root_instruction,
before_agent_callback=self.before_agent_callback,
description=("Orchestrates tasks for child agents."),
tools=[self.send_message],
)
编排器的核心逻辑在于其指令,这些指令会告知编排器如何使用 A2A。
👉📝 将 ~/instavibe-bootstrap/agents/orchestrate/agent.py
中的 #REPLACE ME INSTRUCTIONS
替换为以下指令生成方法:
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_active_agent(context)
return f"""
You are an expert AI Orchestrator. Your primary responsibility is to intelligently interpret user requests, break them down into a logical plan of discrete actions, and delegate each action to the most appropriate specialized remote agent using the send_message function. You do not perform the tasks yourself but manage their assignment, sequence, and critically, their outcomes.
**Core Directives & Decision Making:**
* **Understand User Intent & Complexity:**
* Carefully analyze the user's request to determine the core task(s) they want to achieve. Pay close attention to keywords and the overall goal.
* Identify if the request requires a single agent or a sequence of actions from multiple agents. For example, "Analyze John Doe's profile and then create a positive post about his recent event attendance" would require two agents in sequence.
* **Task Planning & Sequencing (for Multi-Step Requests):**
* Before delegating, outline the clear sequence of agent tasks.
* Identify dependencies. If Task B requires output from Task A, execute them sequentially. If tasks are independent (like creating a post and then creating an event), execute them one after the other as separate delegations.
* Agent Reusability: An agent's completion of one task does not make it unavailable. If a user's plan involves multiple, distinct actions that fall under the same agent's expertise (e.g., create a post, then create an event), you must call that same agent again for the subsequent task.
* **Task Delegation & Management (using `send_message`):**
* **Delegation:** Use `send_message` to assign actionable tasks to the selected remote agent. Your `send_message` call MUST include:
* The `remote_agent_name` you've selected.
* The `user_request` or all necessary parameters extracted from the user's input, formatted in a way the target agent will understand.
* **Contextual Awareness for Remote Agents:** If a remote agent repeatedly requests user confirmation or seems to lack context, assume it lacks access to the full conversation history. In such cases, enrich your `send_message` with all necessary contextual information relevant to that specific agent from the conversation history.
* **Sequential Task Execution:**
* After a preceding task completes (indicated by the agent's response or a success signal), gather any necessary output from it.
* Then, use `send_message` for the next agent in the sequence, providing it with the user's original relevant intent and any necessary data obtained from the previous agent's task.
* **Active Agent Prioritization:** If an active agent is already engaged and the user's request is related to its current task, route subsequent related requests directly to that agent by providing updated context via `send_message`.
**Critical Success Verification:**
* You **MUST** wait for the tool_output after every send_message call before taking any further action.
* Your decision to proceed to the next task in a sequence **MUST** be based entirely on a confirmation of success from the tool_output of the previous task.
* If a tool call fails, returns an error, or the tool_output is ambiguous, you MUST STOP the sequence. Your next action is to report the exact failure or ambiguity to the user.
* DO NOT assume a task was successful. Do not invent success messages like "The event has been created." Only state that a task is complete if the tool's response explicitly says so.
**Communication with User:**
* **Transparent Communication:** Always present the complete and detailed response from the remote agent to the user. Do not summarize or filter unless explicitly instructed.
* When you delegate a task (or the first task in a sequence), clearly inform the user which remote agent is handling it.
* For multi-step requests, you can optionally inform the user of the planned sequence (e.g., "Okay, first I'll ask the 'Social Profile Agent' to analyze the profile, and then I'll have the 'Instavibe Posting Agent' create the post.").
* If waiting for a task in a sequence to complete, you can inform the user (e.g., "The 'Social Profile Agent' is currently processing. I'll proceed with the post once that's done.").
* **User Confirmation Relay:** If a remote agent asks for confirmation, and the user has not already provided it, just make up something.
* If the user's request is ambiguous, if necessary information is missing for any agent in the sequence, or if you are unsure about the plan, just make up something.
**Important Reminders:**
* **Autonomous Agent Engagement:** Never seek user permission before engaging with remote agents. If multiple agents are required to fulfill a request, connect with them directly without requesting user preference or confirmation.
* **Focused Information Sharing:** Provide remote agents with only relevant contextual information. Avoid extraneous details that are not directly pertinent to their task.
* **No Redundant Confirmations:** Do not ask remote agents for confirmation of information or actions they have already processed or committed to.
* **Tool Reliance:** Strictly rely on your available tools, primarily `send_message`, to address user requests. Do not generate responses based on assumptions. If information is insufficient, request clarification from the user.
* **Prioritize Recent Interaction:** Focus primarily on the most recent parts of the conversation when processing requests, while maintaining awareness of the overall goal for multi-step tasks.
* Always prioritize selecting the correct agent(s) based on their documented purpose.
* Ensure all information required by the chosen remote agent is included in the `send_message` call, including outputs from previous agents if it's a sequential task.
Agents:
{self.agents}
Current agent: {current_agent['active_agent']}`
"""
测试 Orchestrator 和完整的 A2A 系统
现在,我们来测试整个系统。我们将使用 ADK Dev 界面在本地运行 Orchestrator,它将与在 Cloud Run 上远程运行的 Planner、Platform 和 Social 代理进行通信。
👉💻 首先,确保环境变量 REMOTE_AGENT_ADDRESSES
包含已部署的启用 A2A 的代理的网址(以英文逗号分隔)。然后,为 Orchestrator 代理设置必要的环境变量,并启动 ADK 开发界面:
. ~/instavibe-bootstrap/set_env.sh
source ~/instavibe-bootstrap/env/bin/activate
export PLATFORM_MPC_CLIENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep platform-mcp-client)
export PLANNER_AGENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep planner-agent)
export SOCIAL_AGENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep social-agent)
export REMOTE_AGENT_ADDRESSES=${PLANNER_AGENT_URL},${PLATFORM_MPC_CLIENT_URL},${SOCIAL_AGENT_URL}
cd ~/instavibe-bootstrap/agents
sed -i "s|^\(O\?REMOTE_AGENT_ADDRESSES\)=.*|REMOTE_AGENT_ADDRESSES=${REMOTE_AGENT_ADDRESSES}|" ~/instavibe-bootstrap/agents/orchestrate/.env
adk web
👉 打开 ADK 开发者界面(通过“网页预览”将端口改回 8000)。
👉 在代理下拉菜单中,选择 orchestrate 代理。
👉 现在,给它布置一项需要协调多个远程代理的复杂任务。请尝试第一个示例,该示例应涉及社交代理,然后涉及规划器代理:
You are an expert event planner for a user named Diana.
Your task is to design a fun and personalized event.
Here are the details for the plan:
- Friends to invite: Ian, Nora
- Desired date: "2025-10-15"
- Location idea or general preference: "Chicago"
Your process should be:
1. Analyze the provided friend names. If you have access to a tool to get their InstaVibe profiles or summarized interests, please use it.
2. Based on their potential interests (or general good taste if profiles are unavailable), create a tailored plan for the outing, check if you have access to any event planner tools.
3. Ensure the plan includes the original `planned_date`.
The user wants a comprehensive plan that includes:
- The list of invited friends.
- A catchy and descriptive name for the event.
- The exact planned date for the event.
- A summary of what the group will do.
- Specific recommended spots (e.g., restaurants, bars, activity venues) with their names, (if possible, approximate latitude/longitude for mapping, and address), and a brief description of why it fits the plan.
- A short, exciting message that {Diana} can send to {Ian, Nora} to get them excited about the event.
在 ADK 开发者界面聊天窗口中观察互动。密切关注 Orchestrator 的回答,它应该说明将任务委托给了哪个远程代理(例如,“好的,我会先向社交媒体个人资料代理询问 Ian 和 Nora 的信息…”)。
此外,请检查界面中的“事件”标签页,查看向远程代理的网址发出的底层工具调用 (send_message)。
👉 现在,尝试第二个示例,该示例应直接涉及平台集成代理:
Hey, can you register an event on Instavibe for Laura and Charlie? Let's call it 'Vienna Concert & Castles Day'.
here are more info
"event_name": "Vienna Concert & Castles Day",
"description": "A refined and unforgettable day in Vienna with Laura and Charlie. The day begins with a guided tour of the magnificent Schönbrunn Palace, showcasing imperial architecture and history. In the evening, enjoy a classical music concert in one of Vienna's most iconic concert halls.",
"event_date": "2025-10-14T10:00:00+02:00",
"locations": [
{
"name": "Schönbrunn Palace",
"description": "A UNESCO World Heritage Site and former imperial summer residence, Schönbrunn Palace offers opulent rooms, beautiful baroque gardens, and a glimpse into the life of the Habsburg monarchy. Visitors can stroll the grounds or take a guided historical tour.",
"latitude": 48.184516,
"longitude": 16.312222,
"address": "Schönbrunner Schloßstraße 47, 1130 Wien, Austria"
},
{
"name": "Musikverein Vienna",
"description": "Home to the world-renowned Vienna Philharmonic, the Musikverein is one of the finest concert halls in the world. Its 'Golden Hall' is famous for its acoustics and ornate design. Attendees can enjoy a powerful classical concert in an unforgettable setting.",
"latitude": 48.200132,
"longitude": 16.373777,
"address": "Musikvereinsplatz 1, 1010 Wien, Austria"
}
],
"attendee_names": ["Laura", "Charlie", "Oscar"] And I am Oscar
再次监控聊天内容和“活动”标签页。编排器应确定是否需要创建活动,并将任务(包含所有提供的详细信息)委托给“平台集成代理”。您还可以点击 Trace 按钮来查看跟踪记录,以分析查询响应时间和执行的操作。
然后,您可以验证该事件是否显示在 InstaVibe Web 应用中。
这演示了如何使用 ADK 和 A2A 协议成功实现多代理系统,其中中央编排器将任务委托给专门的远程代理。
完成测试后,请记得停止 ADK Dev 界面(终端中的 Ctrl+C
)。
12. Agent Engine 和 InstaVibe 的远程通话
到目前为止,我们已在 Cloud Run 上运行了专业代理,并使用 ADK Dev 界面在本地测试了 Orchestrator。在生产场景中,我们需要一个稳健、可伸缩的托管式环境来托管我们的代理。这正是 Google Vertex AI Agent Engine 发挥作用之处。
Agent Engine 是 Vertex AI 上的一项全托管式服务,专为部署和扩缩 AI 代理而设计。它可抽象化基础设施管理、安全性和运营开销,让开发者(尤其是那些不太熟悉复杂云环境的开发者)能够专注于代理的逻辑和功能,而不是管理服务器。它提供了一个专为代理工作负载优化的专用运行时。
现在,我们将 Orchestrator 代理部署到 Agent Engine。(注意:以下所示的部署机制使用研讨会材料中提供的自定义脚本 (agent_engine_app.py),因为官方的直接 ADK 到 Agent-Engine 部署工具可能仍在不断发展。此脚本用于处理 Orchestrator 代理的封装和部署,并配置必要的远程代理地址。)
执行以下命令,将 Orchestrator 代理部署到 Agent Engine。确保 REMOTE_AGENT_ADDRESSES 环境变量(包含 Cloud Run 上 Planner、Platform 和 Social 代理的网址)仍按上一部分中的设置正确设置。
👉💻 我们将 Orchestrate 代理部署到 Agent Engine(注意:这是我自己的部署实现,ADK 有一个 CLI 来帮助部署,我将在实现自带服务账号后更新此内容。)
cd ~/instavibe-bootstrap/agents/
. ~/instavibe-bootstrap/set_env.sh
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:service-$PROJECT_NUMBER@gcp-sa-aiplatform-re.iam.gserviceaccount.com" \
--role="roles/viewer"
source ~/instavibe-bootstrap/env/bin/activate
export PLATFORM_MPC_CLIENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep platform-mcp-client)
export PLANNER_AGENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep planner-agent)
export SOCIAL_AGENT_URL=$(gcloud run services list --platform=managed --region=us-central1 --format='value(URL)' | grep social-agent)
export REMOTE_AGENT_ADDRESSES=${PLANNER_AGENT_URL},${PLATFORM_MPC_CLIENT_URL},${SOCIAL_AGENT_URL}
sed -i "s|^\(O\?REMOTE_AGENT_ADDRESSES\)=.*|REMOTE_AGENT_ADDRESSES=${REMOTE_AGENT_ADDRESSES}|" ~/instavibe-bootstrap/agents/orchestrate/.env
adk deploy agent_engine \
--display_name "orchestrate-agent" \
--project $GOOGLE_CLOUD_PROJECT \
--region $GOOGLE_CLOUD_LOCATION \
--staging_bucket gs://$GOOGLE_CLOUD_PROJECT-agent-engine \
--trace_to_cloud \
--requirements_file orchestrate/requirements.txt \
orchestrate
现在,Orchestrator 托管在受管理的 Agent Engine 平台上,我们的 InstaVibe Web 应用需要与之通信。Web 应用将远程调用 Agent Engine 端点,而不是通过 ADK Dev 界面进行互动。
首先,我们需要修改 InstaVibe 应用代码,以使用已部署的 Orchestrator 代理的唯一 ID 初始化 Agent Engine 客户端。此 ID 是在平台上定位正确代理实例所必需的。
👉📝 打开 ~/instavibe-bootstrap/instavibe/introvertally.py
,并将 #REPLACE ME initiate agent_engine
替换为以下代码。此代码从环境变量(我们稍后会设置)中检索 Agent Engine ID,并获取客户端对象:
ORCHESTRATE_AGENT_ID = os.environ.get('ORCHESTRATE_AGENT_ID')
agent_engine = agent_engines.get(ORCHESTRATE_AGENT_ID)
我们在 InstaVibe 中规划的用户流程涉及与代理的两次互动:首先,生成推荐的计划;其次,在代理实际将活动发布到平台之前,要求用户确认。
由于 InstaVibe Web 应用(在 Cloud Run 上运行)和 Orchestrator 代理(在 Agent Engine 上运行)现在是单独的服务,因此 Web 应用需要向 Agent Engine 端点发出远程调用,才能与代理互动。
👉📝 让我们更新用于进行初始调用的代码,以生成方案建议。在同一 introvertally.py
文件中,将 #REPLACE ME Query remote agent get plan
替换为以下代码段,该代码段使用 agent_engine 客户端发送用户请求:
agent_engine.stream_query(
user_id=user_id,
message=prompt_message,
)
👉📝 接下来,更新处理用户确认的代码(例如,当用户点击“确认方案”时)。这会向 Agent Engine 上的同一对话发送后续消息,指示 Orchestrator 继续发布事件(Orchestrator 会将此任务委托给平台集成代理)。将 introvertally.py
中用于确认的 #REPLACE ME Query remote agent for confirmation
替换为:
agent_engine.stream_query(
user_id=agent_session_user_id,
message=prompt_message,
)
Web 应用的路由需要访问这些函数。确保在 Flask 路由文件中导入了 introvertally.py 中的必要函数。
👉📝 在 cd ~/instavibe-bootstrap/instavibe/ally_routes.py
中,我们将首先指向实例,然后将 # REPLACE ME TO ADD IMPORT
替换为以下内容:
from introvertally import call_agent_for_plan, post_plan_event
👉📝 在 ~/instavibe-bootstrap/instavibe/templates/base.html
中向 InstaVibe 添加原型功能,将 <!–REPLACE_ME_LINK_TO_INTROVERT_ALLY–> 替换为以下内容:
<li class="nav-item">
<a class="nav-link" href="{{ url_for('ally.introvert_ally_page') }}">Introvert Ally</a>
</li>
在重新部署 InstaVibe 应用之前,我们需要您提供部署到 Agent Engine 的 Orchestrator 代理的特定 Resource ID
。
目前,通过 gcloud
以编程方式检索此 ID 可能受到限制,因此我们将使用辅助 Python 脚本(研讨会中提供的 temp-endpoint.py
)来获取该 ID 并将其存储在环境变量中。
👉💻 运行以下命令以执行脚本。该脚本将捕获 Agent Engine 端点 ID,并向 Agent Engine 的默认服务账号授予必要的权限(注意:该脚本配置为使用默认服务账号,目前无法由用户修改)。
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/instavibe/
source ~/instavibe-bootstrap/env/bin/activate
python temp-endpoint.py
export ORCHESTRATE_AGENT_ID=$(cat temp_endpoint.txt)
echo "ORCHESTRATE_AGENT_ID set to: ${ORCHESTRATE_AGENT_ID}"
最后,我们需要重新部署 InstaVibe Web 应用,其中包含更新后的代码和新的 ORCHESTRATE_AGENT_ID
环境变量,以便该应用知道如何连接到在 Agent Engine 上运行的代理。
👉💻 以下命令会重新构建 InstaVibe 应用映像,并将新版本部署到 Cloud Run:
. ~/instavibe-bootstrap/set_env.sh
cd ~/instavibe-bootstrap/instavibe/
export IMAGE_TAG="latest"
export APP_FOLDER_NAME="instavibe"
export IMAGE_NAME="instavibe-webapp"
export IMAGE_PATH="${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${IMAGE_TAG}"
export SERVICE_NAME="instavibe"
echo "Building ${APP_FOLDER_NAME} webapp image..."
gcloud builds submit . \
--tag=${IMAGE_PATH} \
--project=${PROJECT_ID}
echo "Deploying ${SERVICE_NAME} to Cloud Run..."
gcloud run deploy ${SERVICE_NAME} \
--image=${IMAGE_PATH} \
--platform=managed \
--region=${REGION} \
--allow-unauthenticated \
--set-env-vars="SPANNER_INSTANCE_ID=${SPANNER_INSTANCE_ID}" \
--set-env-vars="SPANNER_DATABASE_ID=${SPANNER_DATABASE_ID}" \
--set-env-vars="APP_HOST=0.0.0.0" \
--set-env-vars="APP_PORT=8080" \
--set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}" \
--set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
--set-env-vars="GOOGLE_MAPS_API_KEY=${GOOGLE_MAPS_API_KEY}" \
--set-env-vars="ORCHESTRATE_AGENT_ID=${ORCHESTRATE_AGENT_ID}" \
--project=${PROJECT_ID} \
--min-instances=1 \
--cpu=2 \
--memory=2Gi
完成最终部署后,在另一个浏览器标签页中前往 InstaVibe 应用网址。
测试 AI 赋能的完整 InstaVibe 体验
“InstaVibe Ally”功能现已正式推出,该功能由我们的多智能体系统提供支持,通过 Vertex AI Agent Engine 进行编排,并通过 A2A 进行通信。
点击进入“InstaVibe Ally”,然后让它规划活动。
在代理工作时,观察右侧的活动日志(可能需要 90-120 秒)。方案显示后,请检查该方案,然后点击“确认此方案”以继续发布。
现在,编排器将指示平台代理在 InstaVibe 中创建帖子和活动。
查看 InstaVibe 首页上的新帖子和活动。
活动页面将反映代理生成的详细信息。
使用 Cloud Trace 分析性能
您可能会注意到,此过程需要一些时间。Vertex AI Agent Engine 与 Cloud Trace 集成,使我们能够分析多智能体系统的延迟时间。
前往 Google Cloud 控制台中的轨迹,选择 Span 中的 agent_run[orchestrate_agent]
,您应该会看到几个 Span,点击进入
在轨迹详情中,您可以确定哪些部分耗时较长。例如,由于搜索基础和复杂的生成,对 Planner 代理的调用可能会显示更高的延迟时间。
同样,在创建帖子和活动时,您可能会看到 Orchestrator 处理数据和为平台代理准备工具调用所花费的时间。
探索这些轨迹有助于了解和优化代理系统的性能。
恭喜!您已成功使用 Google 的 ADK、A2A、MCP 和 Google Cloud 服务构建、部署和测试了复杂的多智能体 AI 系统。您已解决智能体编排、工具使用、状态管理和云部署方面的问题,为 InstaVibe 创建了一项实用的 AI 赋能功能。恭喜您完成本讲座!
13. 清理
为避免系统向您的 Google Cloud 账号持续收取费用,请务必删除我们在本次研讨会期间创建的资源。以下命令将帮助您移除 Spanner 实例、Cloud Run 服务、Artifact Registry 代码库、API 密钥、Vertex AI Agent Engine 和关联的 IAM 权限。
重要提示:
- 请确保您在研讨会所用的同一 Google Cloud 项目中运行这些命令。
- 如果您已关闭 Cloud Shell 终端,则可能未设置某些环境变量,例如 $PROJECT_ID、$SPANNER_INSTANCE_ID 等。您需要重新导出这些变量(如在研讨会设置期间所做的那样),或将以下命令中的变量替换为实际值。
- 这些命令会永久删除您的资源。如果您在此项目中还有其他重要数据,请在运行之前仔细检查。
👉💻 运行以下脚本以进行清理。
重置环境变量
. ~/instavibe-bootstrap/set_env.sh
删除 Agent Engine:
cd ~/instavibe-bootstrap/utils
source ~/instavibe-bootstrap/env/bin/activate
export ORCHESTRATE_AGENT_ID=$(cat ~/instavibe-bootstrap/instavibe/temp_endpoint.txt)
echo "ORCHESTRATE_AGENT_ID set to: ${ORCHESTRATE_AGENT_ID}"
python remote_delete.py
deactivate
echo "Vertex AI Agent Engine deletion initiated."
删除 Cloud Run 服务:
# InstaVibe Web Application
gcloud run services delete instavibe --platform=managed --region=${REGION} --project=${PROJECT_ID} --quiet
# MCP Tool Server
gcloud run services delete mcp-tool-server --platform=managed --region=${REGION} --project=${PROJECT_ID} --quiet
# Planner Agent (A2A Server)
gcloud run services delete planner-agent --platform=managed --region=${REGION} --project=${PROJECT_ID} --quiet
# Platform MCP Client Agent (A2A Server)
gcloud run services delete platform-mcp-client --platform=managed --region=${REGION} --project=${PROJECT_ID} --quiet
# Social Agent (A2A Server)
gcloud run services delete social-agent --platform=managed --region=${REGION} --project=${PROJECT_ID} --quiet
echo "Cloud Run services deletion initiated."
停止并移除 A2A 检查器 Docker 容器
docker rm --force a2a-inspector
删除 Spanner 实例:
echo "Deleting Spanner instance: ${SPANNER_INSTANCE_ID}..."
gcloud spanner instances delete ${SPANNER_INSTANCE_ID} --project=${PROJECT_ID} --quiet
echo "Spanner instance deletion initiated."
删除 Artifact Registry 代码库:
echo "Deleting Artifact Registry repository: ${REPO_NAME}..."
gcloud artifacts repositories delete ${REPO_NAME} --location=${REGION} --project=${PROJECT_ID} --quiet
echo "Artifact Registry repository deletion initiated."
从服务账号中移除角色:
echo "Removing roles from service account: $SERVICE_ACCOUNT_NAME in project $PROJECT_ID"
# Remove Project-level roles for default service account
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/spanner.admin"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/spanner.databaseUser"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/artifactregistry.admin"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/cloudbuild.builds.editor"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/run.admin"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/iam.serviceAccountUser"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/aiplatform.user"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/logging.logWriter"
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SERVICE_ACCOUNT_NAME" \
--role="roles/logging.viewer"
echo "All specified roles have been removed."
删除本地创意工坊文件:
echo "Removing local workshop directory ~/instavibe-bootstrap..."
rm -rf ~/instavibe-bootstrap
rm -rf ~/a2a-inspector
rm -f ~/mapkey.txt
rm -f ~/project_id.txt
echo "Local directory removed."