Next ‘26 デベロッパー基調講演: メモリによるエージェントの強化

1. はじめに

この Codelab では、永続的で専門的な知識を追加して、ADK エージェントを次のレベルに引き上げます。Agent Platform Sessions で会話の状態を管理する方法、Memory Bank で長期的な学習を有効にする方法、Spark と AlloyDB を使用して複雑な都市ルールデータを RAG(検索拡張生成)に統合する方法について説明します。

演習内容

  • 会話の永続化のために Agent Platform Sessions を構成します。
  • エージェントが以前のやり取りから学習できるように メモリバンク を実装します。
  • Spark Lightning Engine を使用して、都市ルールに関するドキュメントを取り込んで処理します。
  • AlloyDB とベクトル検索を使用して RAG システムを構築します。
  • 強化されたエージェントを Agent Platform にデプロイします。

必要なもの

推定所要時間: 60 分

この Codelab で作成するリソースの費用は 5 ドル未満です。

2. 始める前に

Google Cloud プロジェクトの作成

  1. Google Cloud コンソールのプロジェクト選択ページで、Google Cloud プロジェクトを選択または作成します。
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

Cloud Shell の起動

Cloud Shell は Google Cloud 上で動作するコマンドライン環境で、必要なツールがプリロードされています。

  1. Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする 」アイコン をクリックします。
  2. Cloud Shell に接続したら、認証を確認します。
    gcloud auth list
    
  3. プロジェクトが構成されていることを確認します。
    gcloud config get project
    
  4. プロジェクトが想定どおりに設定されていない場合は、設定します。
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

認証を確認します。

gcloud auth list

プロジェクトを確認します。

gcloud config get project

必要に応じて設定します。

export PROJECT_ID=<YOUR_PROJECT_ID>
gcloud config set project $PROJECT_ID

API を有効にする

次のコマンドを実行して、セッション管理、Spark 処理、AlloyDB に必要なすべての API を有効にします。

gcloud services enable \
  aiplatform.googleapis.com \
  run.googleapis.com \
  alloydb.googleapis.com \
  dataproc.googleapis.com \
  documentai.googleapis.com \
  storage.googleapis.com \
  secretmanager.googleapis.com

3. 環境を設定する

この Codelab では、基調講演リポジトリの事前構成済み環境を使用します。

  1. リポジトリのクローンを作成し、プロジェクト フォルダに移動します。
git clone https://github.com/GoogleCloudPlatform/next-26-keynotes
cd next-26-keynotes/devkey/enhancing-agents-with-memory
  1. Python 仮想環境を設定し、必要な ADK パッケージをインストールします。
uv venv
source .venv/bin/activate
uv sync

環境変数を構成する

エージェントが Agent Platform と AlloyDB に接続するには、特定の構成が必要です。

  1. サンプル環境ファイルをコピーします。
cp .env.example .env
  1. .env を開き、次のフィールドを更新します。
    • GOOGLE_CLOUD_PROJECT: 実際のプロジェクト ID。
    • GOOGLE_CLOUD_LOCATION: us-central1
    • ALLOYDB_CLUSTER_ID: rules-db
GOOGLE_CLOUD_PROJECT=<YOUR_PROJECT_ID>
GOOGLE_CLOUD_LOCATION=global
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_REGION=us-central1
ALLOYDB_CLUSTER_ID=rules-db
  1. 次のヘルパースクリプトを実行して、会話セッションと長期記憶に使用する Agent Engine インスタンス を作成します。これにより、.env ファイルの AGENT_ENGINE_ID が自動的に入力されます。
uv run utils/setup_agent_engine.py

成功すると、次のように表示されます。

Creating Agent Engine instance...
Successfully created Agent Engine. ID: 1234567890
Updated .env with AGENT_ENGINE_ID=1234567890

4. セッション管理を使用してエージェントを作成する

このステップでは、複数回にわたって会話履歴を維持できる Marathon Planner Agent を初期化します。これは、ADK App クラスと Agent Platform Sessions を使用して実現します。

エージェントとセッション サービスを初期化する

planner_agent/agent.py を開きます。ADK クラスを追加して Agent Platform Sessions を統合する方法を確認します。これにより、エージェントを時間とともにステートフルにし、必要に応じてコンテキストを変更できます。

from google.adk.agents import LlmAgent
from google.adk.sessions import VertexAiSessionService
from vertexai.agent_engines import AdkApp

PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
REGION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

# Initialize Vertex AI for regional services
if PROJECT_ID:
    vertexai.init(project=PROJECT_ID, location=REGION)

# Define the agent logic
root_agent = LlmAgent(
    name="planner_agent",
    model="gemini-3-flash-preview",
    instruction="You are a helpful marathon planning assistant...",
    tools=[] # We will add tools in the next steps
)

def session_service_builder():
    """Builder for Agent Platform Sessions."""
    return VertexAiSessionService(project=PROJECT_ID, location=REGION)

# Wrap the agent in an AdkApp to manage stateful context
app = AdkApp(
    agent=root_agent,
    session_service_builder=session_service_builder
)

5. Memory Bank で長期的な学習を有効にする

セッション管理では個々の会話を追跡しますが、長期記憶でも同じことができます。このステップでは、エージェントを Agent Platform の Memory Bank にアタッチします。これは、エンタープライズ対応のフルマネージド メモリサービスです。

メモリバンク サービスを初期化する

メモリバンクを使用すると、エージェントはさまざまなセッションでコンテキストを呼び出すことができます。メモリサービスを含めるように planner_agent/agent.py を更新します。

from google.adk.memory import VertexAiMemoryBankService

def memory_service_builder():
    """Builder for Agent Platform Memory Bank."""
    return VertexAiMemoryBankService(
        project=PROJECT_ID,
        location=REGION,
        agent_engine_id=AGENT_ENGINE_ID
    )

自動メモリ取り込みを実装する

エージェントがすべてのターンから学習できるように、after_agent_callback を追加します。この関数は、エージェントがレスポンスを完了した後にトリガーされ、セッションを「消化」して関連する記憶をバンクに保存できます。

  1. コールバック関数を定義します。
async def auto_save_memories(callback_context):
    """Callback to ingest the session into the memory bank after the turn."""
    # In AdkApp, the memory service is available via the invocation context
    if hasattr(callback_context._invocation_context, 'memory_service') and callback_context._invocation_context.memory_service:
        await callback_context._invocation_context.memory_service.add_session_to_memory(
            callback_context._invocation_context.session
        )
  1. コールバックを LlmAgent に接続します。
root_agent = LlmAgent(
    # ... other params
    after_agent_callback=[auto_save_memories],
)

6. RAG 用に AlloyDB を設定する

都市ルールデータを取り込むには、データを保存するための高性能データベースが必要です。このステップでは、AlloyDB クラスタを作成し、ベクトル検索用のデータベース スキーマを初期化します。

1. AlloyDB クラスタとプライマリ インスタンスを作成する

Cloud Shell で次のコマンドを実行して、クラスタとそのプライマリ インスタンスを作成します。

# Create the cluster
gcloud alloydb clusters create rules-db \
  --password=postgres \
  --region=us-central1

# Create the primary instance with IAM authentication enabled
gcloud alloydb instances create rules-db-primary \
  --instance-type=PRIMARY \
  --cpu-count=2 \
  --region=us-central1 \
  --cluster=rules-db \
  --database-flags=alloydb.iam_authentication=on

2. 必要な IAM ロールを付与する

マネージド AlloyDB MCP サーバーを使用するには、ID に特定の権限が必要です。次のコマンドを実行して、必要なロールを付与します。

export USER_EMAIL=$(gcloud config get-value account)

# Role to use MCP tools
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="user:$USER_EMAIL" \
  --role="roles/mcp.toolUser"

# Role to execute SQL in AlloyDB
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="user:$USER_EMAIL" \
  --role="roles/alloydb.admin"

# Role for IAM database authentication
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="user:$USER_EMAIL" \
  --role="roles/alloydb.databaseUser"

# Create the IAM-based database user
gcloud alloydb users create "$USER_EMAIL" \
  --cluster=rules-db \
  --region=us-central1 \
  --type=IAM_BASED

3. AlloyDB Studio でデータベースとテーブルを作成する

AlloyDB データベースとテーブルは SQL で管理されるため、Google Cloud コンソールの AlloyDB Studio を使用してスキーマを確定します。

  1. [AlloyDB] > [クラスタ] に移動して、rules-db をクリックします。
  2. 左側のナビゲーション メニューで [AlloyDB Studio] をクリックします。
  3. postgres ユーザーと設定したパスワード(postgres)を使用してログインします。
  4. 次の SQL を実行してデータベースを作成します。
    CREATE DATABASE city_rules;
    
  5. AlloyDB Studio でデータベース接続を city_rules に切り替え、次の SQL を実行して拡張機能をインストールし、rules テーブルを作成します。
    -- Install extensions for vector search and ML
    CREATE EXTENSION IF NOT EXISTS vector;
    CREATE EXTENSION IF NOT EXISTS google_ml_integration CASCADE;
    
    -- Create the rules table
    CREATE TABLE IF NOT EXISTS rules (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        text TEXT NOT NULL,
        city TEXT NOT NULL,
        embedding vector(3072) DEFAULT NULL
    );
    
    -- Grant your IAM user access to the table (replace with your email)
    GRANT ALL PRIVILEGES ON TABLE rules TO "YOUR_EMAIL_ADDRESS";
    

7. Spark Lightning Engine を使用して都市ルールデータを取り込む

真に正確な計画を提供するには、エージェントは適切に作成されたプロンプトだけでなく、データと組織のコンテキストに基づく 必要があります。このステップでは、Dataproc Serverless で Spark Lightning Engine を使用して、大規模な都市ルール PDF を処理し、AlloyDB に取り込みます。

Spark Lightning Engine を選ぶ理由

エージェントを大規模にグラウンディングするには、大量の非構造化データを処理する必要があります。Spark Lightning Engine は、Spark の高性能実行エンジンであり、これらのワークロードを大幅に高速化します。ここでは、Google の Document AI を使用してドキュメントのセマンティック チャンク分割 を行います。

Spark パイプラインを確認する

取り込みロジックは spark-setup/spark_alloydb_processor.py で定義されています。パイプラインは次の手順で実行されます。

  1. PDF の一覧表示: Google Cloud Storage バケットからドキュメント URI を取得します。
  2. セマンティック抽出: UDF(ユーザー定義関数)を使用して Document AI API を呼び出します。
  3. AlloyDB に書き込む: 抽出されたテキスト チャンクを rules という名前の AlloyDB テーブルに保存します。
# Extract from spark_alloydb_processor.py
def process_document(gcs_uri: str):
    # ... calls Document AI to parse PDF ...
    return chunks

# Parallel processing with Spark Lightning Engine
process_udf = udf(process_document, chunk_schema)
chunked_df = uri_df.withColumn("chunks", process_udf(col("gcs_uri"))) \
                   .select(explode(col("chunks")).alias("chunk")) \
                   .select("chunk.*")

# Save to AlloyDB for Vector Search
chunked_df.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "rules") \
    .mode("append") \
    .save()

取り込みジョブを実行する

提供されたスクリプトを使用して、取り込みプロセスをトリガーします。

./spark-setup/run_dataproc.sh

8. AlloyDB を使用した RAG

都市ルールデータが AlloyDB に保存されたため、エージェントはこれを使用して検索拡張生成(RAG) を実行できます。これにより、マラソン プランが特定の都市コードに準拠していることが保証されます。

RAG における AlloyDB のパワー

AlloyDB はベクトル検索に優れており、構造化データとベクトル エンベディングの両方を同じ場所に保存できます。エージェントは、AlloyDB の組み込み embedding 関数を使用して、最も関連性の高いルール情報を見つけることができます。

エージェントがこのデータにアクセスできるように、ベクトル類似度を使用して AlloyDB にクエリを実行するツールを提供します。このロジックは hybrid_recall.sql で確認できます。このロジックでは、クエリと保存されたルール間の距離を計算する方法を示しています。

SELECT
    text,
    (embedding <=> 
     embedding('gemini-embedding-001', 
               'Restrictions for running a race on the Las Vegas strip')::vector) 
    as distance
FROM
    rules
WHERE city = 'Las Vegas'
ORDER BY
    distance ASC
LIMIT 5;

RAG ツールを使用してエージェントをローカルルールにグラウンディングする

エージェントがツールを使用できるようにするには、planner_agent/tools.py で定義してから、planner_agent/agent.py で登録する必要があります。Google Cloud のマネージド リモート AlloyDB MCP サーバー を使用してデータベースに接続します。

  1. planner_agent/tools.py で「ハイブリッド リコール」パターンを使用してツールを定義します。streamable_http プロトコルを使用して、マネージド AlloyDB MCP サーバーに接続します。
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def get_local_and_traffic_rules(query: str) -> str:
    """Uses vector search in AlloyDB via managed MCP server."""
    # Vector search query using built-in AlloyDB embedding functions
    sql = f"SELECT text FROM rules WHERE city = 'Las Vegas' ORDER BY embedding <=> google_ml.embedding('gemini-embedding-001', '{query}')::vector ASC LIMIT 5;"
    
    # Establish a streamable HTTP connection to the MCP server
    async with streamablehttp_client(url, headers=get_auth_headers()) as (read_stream, write_stream, _):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            result = await session.call_tool(
                "execute_sql",
                arguments={
                    "instance": full_instance_name,
                    "database": "city_rules",
                    "sqlStatement": sql
                }
            )
            return "\n".join([c.text for c in result.content if hasattr(c, 'text')])
  1. ツールを登録して、planner_agent/agent.py を確定します。
# ... imports ...

# Assemble the Agent
root_agent = LlmAgent(
    name="planner_agent",
    model="gemini-3-flash-preview",
    instruction="You are a helpful marathon planning assistant...",
    tools=[
        get_local_and_traffic_rules,
    ],
    after_agent_callback=[auto_save_memories],
)

# 2. Wrap the agent in an AdkApp to manage the stateful lifecycle
app = AdkApp(
    agent=root_agent,
    session_service_builder=session_service_builder,
    memory_service_builder=memory_service_builder
)

9. エージェント スキルによる専門家によるガイダンス

エージェント スキル は、エージェントがタスクをより効果的に実行できるように、具体的な手順、ガイダンス、リソースを提供する自己完結型のモジュールです。すべてのツールに対して複雑な手順でシステム プロンプトを煩雑にするのではなく、必要に応じてロードされるスキルにその専門知識をカプセル化できます。

Google は、エージェントがデータのクエリとリソースの管理に関する業界のベスト プラクティスに準拠するように、Google プロダクト(AlloyDB や BigQuery など)の事前構築済みスキル を提供しています。これらのスキルやその他の特殊なパターンについては、Google Skills Depot をご覧ください。AlloyDB の基本スキルはこちらにあります。

1. スキルファイルを確認する

planner_agent/skills/get-local-and-traffic-rules/SKILL.md で事前構成済みのスキルファイルを開きます。内容は次のとおりです。

---
name: get-local-and-traffic-rules
description: Retrieve local rules and traffic information for a specific jurisdiction.
---
# get_local_and_traffic_rules Skill

This skill provides guidelines on how to effectively use the `get_local_and_traffic_rules` tool.

## Overview
The `get_local_and_traffic_rules` tool interfaces with an AlloyDB database to perform vector similarity searches on a corpus of rules and traffic information using a provided natural language query.

## Usage Guidelines
1. **Query Specificity**: When calling the tool, provide specific details in the `query` argument. For example, instead of querying "food rules", use "rules regarding food vendors during public events".
2. **Contextual Use**: Use the tool when planning events or activities that require adherence to local municipal or state rules (e.g., street closures, noise ordinances, environmental rules).
3. **Handling Results**: The tool returns a string containing the text of the top 5 most relevant rules. If no error occurs, parse the returned string to inform your planning tasks.
4. **Error Handling**: If an error string is returned (e.g., "Error querying rules: ..."), you must report this failure or attempt an alternative approach if applicable.

## Underlying Mechanism
- The tool uses `google_ml.embedding` to convert the query into a vector representation.
- It calculates distance (`<=>`) against the `embedding` column in the `rules` table on an AlloyDB instance.
- Results are fetched in descending order of similarity, limited to 5 results.

2. スキルの登録方法

planner_agent/agent.py で、スキルがディレクトリから読み込まれ、エージェントのツールに追加されます。コードは次のようになります。

import pathlib
from google.adk.skills import load_skill_from_dir
from google.adk.tools import skill_toolset

# Load the AlloyDB skill from its directory
alloydb_skill = load_skill_from_dir(pathlib.Path(__file__).parent / "skills" / "get-local-and-traffic-rules")

# Assemble the Agent with the Skill Toolset
root_agent = LlmAgent(
    name="planner_agent",
    model="gemini-3-flash-preview",
    instruction="You are a helpful marathon planning assistant...",
    tools=[
        get_local_and_traffic_rules,
        skill_toolset.SkillToolset(skills=[alloydb_skill])
    ],
    after_agent_callback=[auto_save_memories],
)

10. エージェントをテストする

  1. ローカルでエージェントを起動します。
uv run adk run planner_agent
  1. 都市ルールについて質問します。[user]: What are the rules for running a race on the Las Vegas strip?

エージェントは get_local_and_traffic_rules ツールを呼び出し、AlloyDB でベクトル検索を実行し、Spark で処理された公式ルール チャンクに基づいて回答を返します。

11. エージェントをデプロイする

Agent Platform にデプロイする

uv run adk deploy agent_engine \
  --env_file .env \
  planner_agent

12. クリーンアップ

継続的な課金を避けるため、この Codelab で作成したリソースを削除します。

AlloyDB クラスタを削除する

# Delete the AlloyDB Cluster
gcloud alloydb clusters delete rules-db --region=us-central1 --force

エージェント ランタイム アプリを削除する

推論エンジン インスタンスは、コンソールまたは gcloud コマンドを使用して削除できます(リソース名がある場合)。わかりやすくするために、コンソールを使用します。

  1. [エージェント ランタイム] ページに移動します。
  2. planner_agent[planner_agent] を選択し、右側のその他アイコンをクリックします。
  3. [削除] をクリックします。

13. 完了

おめでとうございます!高度なメモリ機能とデータ グラウンディング機能を備えた ADK エージェントを強化できました。

学習した内容

  • ステートフル エージェント: Agent Platform Sessions を統合して会話のコンテキストを維持します。
  • 長期的な学習: Agent Platform メモリバンク を接続して、エージェントがユーザー インタラクションから学習できるようにします。
  • データ取り込み: Spark Lightning EngineDocument AI を使用して非構造化ドキュメントを処理します。
  • RAG: AlloyDB にベクトル検索システムを構築して、エージェントを実際のルールにグラウンディングします。

次のステップ