Agent Development Kit によるマルチモーダルへの移行: Gemini 2.5、Firestore、Cloud Run を使用した個人経費アシスタント

この Codelab について
schedule0 分
subject最終更新: 2025年5月15日
account_circle作成者: Alvin Prayuda Juniarta Dwiyantoro

d029d993943b282b.png

個人的な支出をすべて管理するのが面倒で、イライラしたことはありますか?私も!この Codelab では、Gemini 2.5 を活用して、家計管理アシスタントを構築します。アップロードした領収書の管理から、コーヒーを買う余裕があるかどうかの分析まで、さまざまなことができます。

このアシスタントは、ウェブブラウザからチャット ウェブ インターフェースとしてアクセスできます。このインターフェースでは、アシスタントとやり取りしたり、領収書の画像をアップロードしてアシスタントに保存を依頼したり、領収書を検索してファイルを取得して費用分析を行ったりできます。これらはすべて、Google エージェント開発キット フレームワーク上に構築されています。

アプリケーション自体は、フロントエンドとバックエンドの 2 つのサービスに分割されています。これにより、簡単なプロトタイプを構築して動作を試したり、両方を統合する API コントラクトの概要を把握したりできます。

この Codelab では、次の手順で進めていきます。

  1. Google Cloud プロジェクトを準備し、必要なすべての API を有効にする
  2. Google Cloud Storage にバケットを設定し、Firestore にデータベースを設定する
  3. Firestore インデックスを作成する
  4. コーディング環境のワークスペースを設定する
  5. ADK エージェントのソースコード、ツール、プロンプトなどの構造化
  6. ADK ローカル ウェブ開発 UI を使用してエージェントをテストする
  7. Gradio ライブラリを使用してフロントエンド サービス - チャット インターフェースを構築し、クエリを送信して領収書の画像をアップロードする
  8. バックエンド サービス(HTTP サーバー)を構築します。FastAPI を使用して、ADK エージェント コード、SessionService、Artifact Service を配置します。
  9. 環境変数を管理し、Cloud Run にアプリケーションをデプロイするために必要なファイルを設定する
  10. アプリケーションを Cloud Run にデプロイする

アーキテクチャの概要

6795e9abf2030334.jpeg

前提条件

  • Python を使い慣れている
  • HTTP サービスを使用する基本的なフルスタック アーキテクチャの理解

学習内容

  • Gradio を使用したフロントエンド ウェブ プロトタイピング
  • FastAPIPydantic を使用したバックエンド サービスの開発
  • ADK エージェントの複数の機能を活用したアーキテクチャ
  • ツールの使用
  • セッションとアーティファクトの管理
  • Gemini に送信する前に入力を変更するためのコールバックの使用
  • BuiltInPlanner を使用して計画を立て、タスクの実行を改善する
  • ADK ローカル ウェブ インターフェースによるクイック デバッグ
  • ADK コールバックを使用したプロンプト エンジニアリングと Gemini リクエストの変更による情報の解析と取得を介してマルチモーダル インタラクションを最適化する戦略
  • ベクトル データベースとして Firestore を使用するエージェント検索拡張生成
  • Pydantic-settings を使用して YAML ファイル内の環境変数を管理する
  • Dockerfile を使用してアプリケーションを Cloud Run にデプロイし、YAML ファイルで環境変数を指定する

必要なもの

  • Chrome ウェブブラウザ
  • Gmail アカウント
  • 課金が有効になっている Cloud プロジェクト

この Codelab は、初心者を含むあらゆるレベルのデベロッパーを対象としており、サンプル アプリケーションで Python を使用します。ただし、ここで説明するコンセプトを理解するために Python の知識は必要ありません。

2. 始める前に

Cloud コンソールでアクティブなプロジェクトを選択する

この Codelab は、課金が有効になっている Google Cloud プロジェクトがすでにあることを前提としています。まだ設定していない場合は、以下の手順に沿って設定を開始してください。

  1. Google Cloud コンソールのプロジェクト選択ページで、Google Cloud プロジェクトを選択または作成します。
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

9b27622602f6cc4f.png

Firestore データベースを準備する

次に、Firestore データベースも作成する必要があります。ネイティブ モードの Firestore は、自動スケーリングと高パフォーマンスを実現し、アプリケーション開発を簡素化するように構築された NoSQL ドキュメント データベースです。また、Google のラボで検索拡張生成手法をサポートできるベクトル データベースとしても機能します。

  1. 検索バーで「firestore」を検索し、Firestore プロダクトをクリックします。

2986f598f448af67.png

  1. 次に、[Create A Firestore Database] ボタンをクリックします。
  2. データベース ID 名として [(デフォルト)] を使用し、[Standard Edition] のままにします。このラボのデモでは、Open セキュリティ ルールで Firestore Native を使用します。
  1. また、このデータベースには [Free-tier Usage YEAY!] というタグが付いています。その後、[データベースを作成] ボタンをクリックします。

27a5495b76ed7033.png

これらの手順を完了すると、作成した Firestore データベースにリダイレクトされます。

Cloud Shell ターミナルで Cloud プロジェクトを設定する

  1. Cloud Shell(Google Cloud で動作するコマンドライン環境)を使用します。この環境には bq がプリロードされています。Google Cloud コンソールの上部にある [Cloud Shell をアクティブにする] をクリックします。

1829c3759227c19b.png

  1. Cloud Shell に接続したら、次のコマンドを使用して、認証が完了していることと、プロジェクトがプロジェクト ID に設定されていることを確認します。
gcloud auth list
  1. Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。
gcloud config list project
  1. プロジェクトが設定されていない場合は、次のコマンドを使用して設定します。
gcloud config set project <YOUR_PROJECT_ID>

コンソールで PROJECT_ID ID を確認することもできます。

4032c45803813f30.jpeg

これをクリックすると、右側にすべてのプロジェクトとプロジェクト ID が表示されます。

8dc17eb4271de6b5.jpeg

  1. 次のコマンドを使用して、必要な API を有効にします。処理には数分かかることがありますので、少々お待ちください。
gcloud services enable aiplatform.googleapis.com \
                       firestore.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudresourcemanager.googleapis.com

コマンドが正常に実行されると、次のようなメッセージが表示されます。

Operation "operations/..." finished successfully.

gcloud コマンドの代わりに、コンソールで各プロダクトを検索するか、このリンクを使用します。

必要な API が不足している場合は、実装中にいつでも有効にできます。

gcloud コマンドとその使用方法については、ドキュメントをご覧ください。

Google Cloud Storage バケットを準備する

次に、同じターミナルから、アップロードしたファイルを保存する GCS バケットを準備する必要があります。次のコマンドを実行してバケットを作成します。

gsutil mb -l us-central1 gs://personal-expense-assistant-receipts

次のような出力が表示されます。

Creating gs://personal-expense-assistant-receipts/...

ブラウザの左上にあるナビゲーション メニューに移動し、[Cloud Storage -> Bucket] を選択して確認できます。

d27475d5ce4fcc9d.png

Firestore はネイティブな NoSQL データベースであり、優れたパフォーマンスとデータモデルの柔軟性を備えていますが、複雑なクエリには制限があります。複合マルチフィールド クエリとベクトル検索を使用する予定であるため、まずインデックスを作成する必要があります。詳しくは、こちらのドキュメントをご覧ください。

  1. 次のコマンドを実行して、複合クエリをサポートするインデックスを作成します。
gcloud firestore indexes composite create \
        --collection-group=personal-expense-assistant-receipts \
        --field-config field-path=total_amount,order=ASCENDING \
        --field-config field-path=transaction_time,order=ASCENDING \
        --field-config field-path=__name__,order=ASCENDING \
        --database="(default)"
  1. ベクトル検索をサポートするには、このコマンドを実行します。
gcloud firestore indexes composite create \
        --collection-group="personal-expense-assistant-receipts" \
        --query-scope=COLLECTION \
        --field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
        --database="(default)"

作成されたインデックスを確認するには、Cloud コンソールで Firestore に移動し、(デフォルト)データベース インスタンスをクリックして、ナビゲーション バーで [インデックス] を選択します。

8b3a4012985ee0b6.png

Cloud Shell エディタに移動してアプリケーションの作業ディレクトリを設定する

これで、コードエディタを設定してコーディングを開始できます。ここでは、Cloud Shell エディタを使用します。

  1. [エディタを開く] ボタンをクリックすると、Cloud Shell エディタが開き、コードを記述できます b16d56e4979ec951.png
  2. 下の図に示すように、Cloud Shell エディタの左下(ステータスバー)に Cloud Code プロジェクトが設定され、請求が有効になっているアクティブな Google Cloud プロジェクトに設定されていることを確認します。プロンプトが表示されたら [承認] をクリックします。前のコマンドに沿ってすでに操作している場合は、ボタンがログインボタンではなく、有効化されたプロジェクトに直接リンクしていることもあります。

f5003b9c38b43262.png

  1. 次に、この Codelab のテンプレート作業ディレクトリのクローンを GitHub から作成します。次のコマンドを実行します。作業ディレクトリが personal-expense-assistant ディレクトリに作成されます。
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
  1. 次に、Cloud Shell エディタの上部セクションに移動し、[ファイル] -> [フォルダを開く] をクリックして、ユーザー名 ディレクトリを見つけ、personal-expense-assistant ディレクトリを見つけて、[OK] ボタンをクリックします。これにより、選択したディレクトリがメインの作業ディレクトリになります。この例では、ユーザー名は alvinprayuda であるため、ディレクトリ パスは次のようになります。

2c53696f81d805cc.png

a766d380600a988.png

Cloud Shell エディタは次のようになります。

528df7169f01b016.png

環境の設定

Python 仮想環境を準備する

次のステップは、開発環境を準備することです。現在のアクティブなターミナルは、personal-expense-assistant 作業ディレクトリ内にある必要があります。この Codelab では Python 3.12 を使用し、uv python プロジェクト マネージャーを使用して、Python バージョンと仮想環境の作成と管理を簡素化します。

  1. ターミナルを開いていない場合は、[Terminal] -> [New Terminal] をクリックするか、Ctrl+Shift+C を使用して開きます。ターミナル ウィンドウがブラウザの下部に開きます。

f8457daf0bed059e.jpeg

  1. uv をダウンロードし、次のコマンドを使用して Python 3.12 をインストールします。
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
  1. 次に、uv を使用して仮想環境を初期化します。次のコマンドを実行します。
uv sync --frozen

これにより、.venv ディレクトリが作成され、依存関係がインストールされます。pyproject.toml をざっと見てみると、次のように依存関係に関する情報が表示されます。

dependencies = [
    "datasets>=3.5.0",
    "google-adk>=0.2.0",
    "google-cloud-firestore>=2.20.1",
    "gradio>=5.23.1",
    "pydantic>=2.10.6",
    "pydantic-settings[yaml]>=2.8.1",
]
  1. 仮想環境をテストするには、新しいファイル main.py を作成し、次のコードをコピーします。
def main():
   print("Hello from personal-expense-assistant-adk!")

if __name__ == "__main__":
   main()
  1. 次に、次のコマンドを実行します。
uv run main.py

次のような出力が表示されます。

Using CPython 3.12
Creating virtual environment at: .venv
Hello from personal-expense-assistant-adk!

これは、Python プロジェクトが正しく設定されていることを示しています。

設定構成ファイル

次に、このプロジェクトの構成ファイルを設定する必要があります。pydantic-settings を使用して、YAML ファイルから構成を読み取ります。

次の内容の settings.yaml というファイルを作成します。[ファイル] > [新しいテキスト ファイル] をクリックし、次のコードを入力します。ファイルを settings.yaml として保存します。

GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"

この Codelab では、GCLOUD_LOCATION, BACKEND_URL, STORAGE_BUCKET_NAME, DB_COLLECTION_NAMEBACKEND_URL の事前構成済み値を使用します。

これで、次のステップであるエージェントとサービスのビルドに進むことができます。

3. Google ADK と Gemini 2.5 を使用してエージェントを構築する

ADK ディレクトリ構造の概要

まず、ADK の機能とエージェントの構築方法について説明します。ADK の完全なドキュメントは、こちらの URL でご覧いただけます。ADK には、CLI コマンド実行内で使用できる多くのユーティリティが用意されています。たとえば、次のような要因が含まれます。

  • エージェントのディレクトリ構造を設定する
  • CLI の入力出力によるインタラクションをすばやく試す
  • ローカル開発 UI ウェブ インターフェースをすばやく設定する

CLI コマンドを使用して、エージェントのディレクトリ構造を作成しましょう。次のコマンドを実行します。

uv run adk create expense_manager_agent \
   --model gemini-2.5-flash-preview-04-17 \
   --project {your-project-id} \
   --region us-central1

これにより、次のエージェント ディレクトリ構造が作成されます。

expense_manager_agent/
├── __init__.py
├── .env
├── agent.py

init.pyagent.py を調べると、次のコードが表示されます。

# __init__.py

from . import agent
# agent.py

from google.adk.agents import Agent

root_agent = Agent(
    model='gemini-2.5-flash-preview-04-17',
    name='root_agent',
    description='A helpful assistant for user questions.',
    instruction='Answer user questions to the best of your knowledge',
)

経費マネージャー エージェントの構築

経費マネージャー エージェントを作成しましょう。expense_manager_agent/agent.py ファイルを開き、root_agent を含む以下のコードをコピーします。

# expense_manager_agent/agent.py

from google.adk.agents import Agent
from expense_manager_agent.tools import (
    store_receipt_data,
    search_receipts_by_metadata_filter,
    search_relevant_receipts_by_natural_language_query,
    get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types

SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"

# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
    task_prompt = file.read()

root_agent = Agent(
    name="expense_manager_agent",
    model="gemini-2.5-flash-preview-04-17",
    description=(
        "Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
    ),
    instruction=task_prompt,
    tools=[
        store_receipt_data,
        get_receipt_data_by_image_id,
        search_receipts_by_metadata_filter,
        search_relevant_receipts_by_natural_language_query,
    ],
    planner=BuiltInPlanner(
        thinking_config=types.ThinkingConfig(
            thinking_budget=2048,
        )
    ),
    before_model_callback=modify_image_data_in_history,
)

コードの説明

このスクリプトには、エージェントの開始が含まれており、次のものを初期化します。

  • 使用するモデルを gemini-2.5-flash-preview-04-17 に設定します。
  • エージェントの説明と指示を、task_prompt.md から読み取られるシステム プロンプトとして設定します。
  • エージェント機能をサポートするために必要なツールを提供する
  • Gemini 2.5 Flash Thinking 機能を使用して、最終的な回答の生成または実行の前に計画を有効にする
  • Gemini にリクエストを送信する前にコールバック インターセプトを設定し、予測の前に送信される画像データの数を制限

4. エージェント ツールの構成

費用マネージャー エージェントは、次の機能を備えています。

  • 領収書の画像からデータを抽出し、データとファイルを保存する
  • 費用データの完全一致検索
  • 費用データのコンテキスト検索

そのため、この機能をサポートする適切なツールが必要です。expense_manager_agent ディレクトリに新しいファイルを作成し、tools.py という名前を付け、以下のコードをコピーします。

# expense_manager_agent/tools.py

import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai

SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
    project=SETTINGS.GCLOUD_PROJECT_ID
)  # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
    vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""


def sanitize_image_id(image_id: str) -> str:
    """Sanitize image ID by removing any leading/trailing whitespace."""
    if image_id.startswith("[IMAGE-"):
        image_id = image_id.split("ID ")[1].split("]")[0]

    return image_id.strip()


def store_receipt_data(
    image_id: str,
    store_name: str,
    transaction_time: str,
    total_amount: float,
    purchased_items: List[Dict[str, Any]],
    currency: str = "IDR",
) -> str:
    """
    Store receipt data in the database.

    Args:
        image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
            the ID of the image is 12345.
        store_name (str): The name of the store.
        transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
        total_amount (float): The total amount spent.
        purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
            - name (str): The name of the item.
            - price (float): The price of the item.
            - quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
        currency (str, optional): The currency of the transaction, can be derived from the store location.
            If unsure, default is "IDR".

    Returns:
        str: A success message with the receipt ID.

    Raises:
        Exception: If the operation failed or input is invalid.
    """
    try:
        # In case of it provide full image placeholder, extract the id string
        image_id = sanitize_image_id(image_id)

        # Check if the receipt already exists
        doc = get_receipt_data_by_image_id(image_id)

        if doc:
            return f"Receipt with ID {image_id} already exists"

        # Validate transaction time
        if not isinstance(transaction_time, str):
            raise ValueError(
                "Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )
        try:
            datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError(
                "Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )

        # Validate items format
        if not isinstance(purchased_items, list):
            raise ValueError(INVALID_ITEMS_FORMAT_ERR)

        for _item in purchased_items:
            if (
                not isinstance(_item, dict)
                or "name" not in _item
                or "price" not in _item
            ):
                raise ValueError(INVALID_ITEMS_FORMAT_ERR)

            if "quantity" not in _item:
                _item["quantity"] = 1

        # Create a combined text from all receipt information for better embedding
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004",
            contents=RECEIPT_DESC_FORMAT.format(
                store_name=store_name,
                transaction_time=transaction_time,
                total_amount=total_amount,
                currency=currency,
                purchased_items=purchased_items,
                receipt_id=image_id,
            ),
        )

        embedding = result.embeddings[0].values

        doc = {
            "receipt_id": image_id,
            "store_name": store_name,
            "transaction_time": transaction_time,
            "total_amount": total_amount,
            "currency": currency,
            "purchased_items": purchased_items,
            EMBEDDING_FIELD_NAME: Vector(embedding),
        }

        COLLECTION.add(doc)

        return f"Receipt stored successfully with ID: {image_id}"
    except Exception as e:
        raise Exception(f"Failed to store receipt: {str(e)}")


def search_receipts_by_metadata_filter(
    start_time: str,
    end_time: str,
    min_total_amount: float = -1.0,
    max_total_amount: float = -1.0,
) -> str:
    """
    Filter receipts by metadata within a specific time range and optionally by amount.

    Args:
        start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
        max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.

    Returns:
        str: A string containing the list of receipt data matching all applied filters.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Validate start and end times
        if not isinstance(start_time, str) or not isinstance(end_time, str):
            raise ValueError("start_time and end_time must be strings in ISO format")
        try:
            datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
            datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError("start_time and end_time must be strings in ISO format")

        # Start with the base collection reference
        query = COLLECTION

        # Build the composite query by properly chaining conditions
        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        filters = [
            FieldFilter("transaction_time", ">=", start_time),
            FieldFilter("transaction_time", "<=", end_time),
        ]

        # Add optional filters
        if min_total_amount != -1:
            filters.append(FieldFilter("total_amount", ">=", min_total_amount))

        if max_total_amount != -1:
            filters.append(FieldFilter("total_amount", "<=", max_total_amount))

        # Apply the filters
        composite_filter = And(filters=filters)
        query = query.where(filter=composite_filter)

        # Execute the query and collect results
        search_result_description = "Search by Metadata Results:\n"
        for doc in query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display

            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error filtering receipts: {str(e)}")


def search_relevant_receipts_by_natural_language_query(
    query_text: str, limit: int = 5
) -> str:
    """
    Search for receipts with content most similar to the query using vector search.
    This tool can be use for user query that is difficult to translate into metadata filters.
    Such as store name or item name which sensitive to string matching.
    Use this tool if you cannot utilize the search by metadata filter tool.

    Args:
        query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
        limit (int, optional): Maximum number of results to return (default: 5).

    Returns:
        str: A string containing the list of contextually relevant receipt data.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Generate embedding for the query text
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004", contents=query_text
        )
        query_embedding = result.embeddings[0].values

        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        vector_query = COLLECTION.find_nearest(
            vector_field=EMBEDDING_FIELD_NAME,
            query_vector=Vector(query_embedding),
            distance_measure=DistanceMeasure.EUCLIDEAN,
            limit=limit,
        )

        # Execute the query and collect results
        search_result_description = "Search by Contextual Relevance Results:\n"
        for doc in vector_query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display
            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error searching receipts: {str(e)}")


def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
    """
    Retrieve receipt data from the database using the image_id.

    Args:
        image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
            [IMAGE-ID 12345], the ID to use is 12345.

    Returns:
        Dict[str, Any]: A dictionary containing the receipt data with the following keys:
            - receipt_id (str): The unique identifier of the receipt image.
            - store_name (str): The name of the store.
            - transaction_time (str): The time of purchase in UTC.
            - total_amount (float): The total amount spent.
            - currency (str): The currency of the transaction.
            - purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
        Returns an empty dictionary if no receipt is found.
    """
    # In case of it provide full image placeholder, extract the id string
    image_id = sanitize_image_id(image_id)

    # Query the receipts collection for documents with matching receipt_id (image_id)
    # Notes that this demo assume 1 user only,
    # need to refactor the query for multiple user
    query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
    docs = list(query.stream())

    if not docs:
        return {}

    # Get the first matching document
    doc_data = docs[0].to_dict()
    doc_data.pop(EMBEDDING_FIELD_NAME, None)

    return doc_data

コードの説明

このツール関数の実装では、次の 2 つの主要なアイデアを中心にツールを設計します。

  • イメージ ID 文字列プレースホルダ [IMAGE-ID <hash-of-image-1>] を使用して、領収書データと元のファイルへのマッピングを解析する
  • Firestore データベースを使用したデータの保存と取得

ツール「store_receipt_data」

6119e1f37f516707.png

このツールは光学文字認識ツールです。画像データから必要な情報を解析し、画像 ID 文字列を認識して、Firestore データベースに保存されるようにマッピングします。

また、このツールは text-embedding-004 を使用して領収書のコンテンツをエンベディングに変換し、すべてのメタデータとエンベディングを一緒に保存してインデックス登録します。クエリまたはコンテキスト検索のいずれかによって柔軟に取得できるようにします。

このツールを正常に実行すると、次のように、領収書データが Firestore データベースにすでにインデックス登録されていることがわかります。

7b448fcde40fac5a.png

ツール「search_receipts_by_metadata_filter」

9d51a3f12289d184.png

このツールは、ユーザーのクエリを、期間や合計トランザクションによる検索をサポートするメタデータ クエリ フィルタに変換します。一致したすべての領収書データが返されます。このプロセスでは、エージェントがコンテキストを理解するために必要ないため、エンベディング フィールドは破棄されます。

ツール「search_relevant_receipts_by_natural_language_query」

b97d3aab9aa53bc9.png

これは検索拡張生成(RAG)ツールです。エージェントは、ベクトル データベースから関連する領収書を取得するための独自のクエリを設計できます。また、このツールを使用するタイミングも選択できます。この RAG ツールを使用するかどうかをエージェントが独自に判断し、独自のクエリを設計できるようにする考え方は、エージェント RAG アプローチの定義の一つです。

独自のクエリを作成できるだけでなく、取得する関連ドキュメントの数を選択することもできます。適切なプロンプト エンジニアリングと組み合わせる(例:

# Example prompt

Always filter the result from tool
search_relevant_receipts_by_natural_language_query as the returned 
result may contain irrelevant information

これにより、このツールはほぼ何でも検索できる強力なツールになりますが、最近傍検索は完全ではないため、期待どおりの結果がすべて返されない場合があります。

5. コールバックによる会話コンテキストの変更

Google ADK を使用すると、さまざまなレベルでエージェントのランタイムを「インターセプト」できます。この機能の詳細については、こちらのドキュメントをご覧ください。このラボでは、before_model_callback を使用して、LLM に送信する前にリクエストを変更し、古い会話履歴のコンテキスト内の画像データを削除します(最後の 3 件のユーザー操作の画像データのみを含めます)。これにより、効率性が向上します。

ただし、必要な場合はエージェントに画像データのコンテキストを提供する必要があるため、そのため、会話内の各画像バイトデータの後に文字列画像 ID プレースホルダを追加するメカニズムを追加します。これにより、エージェントは画像 ID を実際のファイルデータにリンクできます。このリンクは、画像の保存時と取得時の両方で使用できます。構造は次のようになります。

<image-byte-data-1>
[IMAGE-ID <hash-of-image-1>]
<image-byte-data-2>
[IMAGE-ID <hash-of-image-2>]
And so on..

会話履歴でバイトデータが古くなった場合でも、文字列 ID は残るため、ツールを使用してデータにアクセスできます。画像データが削除された後の履歴構造の例

[IMAGE-ID <hash-of-image-1>]
[IMAGE-ID <hash-of-image-2>]
And so on..

では、始めましょう。expense_manager_agent ディレクトリに新しいファイルを作成し、callbacks.py という名前を付け、以下のコードをコピーします。

# expense_manager_agent/callbacks.py

import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest


def modify_image_data_in_history(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
    # The following code will modify the request sent to LLM
    # We will only keep image data in the last 3 user messages using a reverse and counter approach

    # Count how many user messages we've processed
    user_message_count = 0

    # Process the reversed list
    for content in reversed(llm_request.contents):
        # Only count for user manual query, not function call
        if (content.role == "user") and (content.parts[0].function_response is None):
            user_message_count += 1
            modified_content_parts = []

            # Check any missing image ID placeholder for any image data
            # Then remove image data from conversation history if more than 3 user messages
            for idx, part in enumerate(content.parts):
                if part.inline_data is None:
                    modified_content_parts.append(part)
                    continue

                if (
                    (idx + 1 >= len(content.parts))
                    or (content.parts[idx + 1].text is None)
                    or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
                ):
                    # Generate hash ID for the image and add a placeholder
                    image_data = part.inline_data.data
                    hasher = hashlib.sha256(image_data)
                    image_hash_id = hasher.hexdigest()[:12]
                    placeholder = f"[IMAGE-ID {image_hash_id}]"

                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

                    modified_content_parts.append(types.Part(text=placeholder))

                else:
                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

            # This will modify the contents inside the llm_request
            content.parts = modified_content_parts

6. プロンプト

複雑なインタラクションと機能を備えたエージェントを設計するには、エージェントを適切にガイドして、望ましい動作を実現できる十分なプロンプトを見つける必要があります。

以前は、会話履歴で画像データを処理するメカニズムがあり、search_relevant_receipts_by_natural_language_query. などの使いづらいツールもありました。また、エージェントが正しい領収書の画像を検索して Google に取得できるようにしたいと考えています。つまり、これらの情報をすべて適切なプロンプト構造で適切に伝える必要があります。

エージェントは、出力を次のマークダウン形式に構造化して、思考プロセス、最終的な回答、添付ファイル(該当する場合)を解析する必要があります。

# THINKING PROCESS

Thinking process here

# FINAL RESPONSE

Response to the user here

Attachments put inside json block

{
    "attachments": [
      "[IMAGE-ID <hash-id-1>]",
      "[IMAGE-ID <hash-id-2>]",
      ...
    ]
}

最初に、経費マネージャー エージェントの動作に関する初期の期待値を満たすプロンプトから始めましょう。task_prompt.md ファイルは既存の作業ディレクトリにすでに存在していますが、expense_manager_agent ディレクトリに移動する必要があります。次のコマンドを実行して移動します。

mv task_prompt.md expense_manager_agent/task_prompt.md

7. エージェントのテスト

CLI 経由でエージェントと通信してみましょう。次のコマンドを実行します。

uv run adk run expense_manager_agent

次のような出力が表示されます。エージェントと交代でチャットできますが、このインターフェースから送信できるのはテキストのみです。

Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root_agent, type exit to exit.
user: hello
[root_agent]: Hello there! How can I help you today?
user: 

ADK では、CLI による操作に加えて、開発 UI を使用して操作したり、操作中に何が起こっているかを調べたりすることもできます。次のコマンドを実行して、ローカル開発 UI サーバーを起動します。

uv run adk web --port 8080

次の例のような出力が生成されます。これは、すでにウェブ インターフェースにアクセスできることを意味します。

INFO:     Started server process [xxxx]
INFO:     Waiting for application startup.

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://localhost:8080.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

確認するには、Cloud Shell エディタの上部にある [ウェブでプレビュー] ボタンをクリックし、[ポート 8080 でプレビュー] を選択します。

e7c9f56c2463164.png

次のウェブページが表示されます。左上のプルダウン ボタンで利用可能なエージェント(この場合は expense_manager_agent)を選択し、bot を操作できます。左側のウィンドウに、エージェントの実行中のログの詳細に関する多くの情報が表示されます。

b0244afd8da6cc42.png

アクションを試してみましょう。以下の 2 つの領収書のサンプル(ソース : Hugging Face データセット mousserlane/id_receipt_dataset)をアップロードします。各画像を右クリックして、[名前を付けて画像を保存] を選択します。(これにより領収書の画像がダウンロードされます)。次に、[クリップ] アイコンをクリックしてファイルをボットにアップロードし、これらの領収書を保存することを伝えます。

b8ee334373c6e6af.png c83a8c58ac2eff28.png

その後、次のクエリを試して検索やファイルの取得を行います。

  • 「2023 年の費用の内訳と合計額を教えてください」
  • 「Indomaret の領収書ファイルを提供してください」

一部のツールを使用すると、開発 UI で何が行われているかを確認できます。

bf47d0b35d5a4f28.png

エージェントがどのように応答するかを確認し、task_prompt.py 内のプロンプトで指定されているすべてのルールに準拠しているかどうかを確認します。おめでとうございます。これで、動作する開発エージェントが完成しました。

次に、適切な UI と、画像ファイルをアップロードおよびダウンロードする機能を追加して、完成させます。

8. Gradio を使用してフロントエンド サービスを構築する

次のようなチャット ウェブ インターフェースを構築します。

d029d993943b282b.png

チャット インターフェースには、ユーザーがテキストを送信したり、領収書の画像ファイルをアップロードしたりするための入力フィールドがあります。

Gradio を使用してフロントエンド サービスを構築します。

新しいファイルを作成し、[ファイル] > [新しいテキスト ファイル] をクリックして frontend.py という名前を付け、次のコードをコピーして保存します。

import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse


SETTINGS = get_settings()


def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
    """Encode a file to base64 string and get MIME type.

    Reads an image file and returns the base64-encoded image data and its MIME type.

    Args:
        image_path: Path to the image file to encode.

    Returns:
        ImageData object containing the base64 encoded image data and its MIME type.
    """
    # Read the image file
    with open(image_path, "rb") as file:
        image_content = file.read()

    # Get the mime type
    mime_type = mimetypes.guess_type(image_path)[0]

    # Base64 encode the image
    base64_data = base64.b64encode(image_content).decode("utf-8")

    # Return as ImageData object
    return ImageData(serialized_image=base64_data, mime_type=mime_type)


def decode_base64_to_image(base64_data: str) -> Image.Image:
    """Decode a base64 string to PIL Image.

    Converts a base64-encoded image string back to a PIL Image object
    that can be displayed or processed further.

    Args:
        base64_data: Base64 encoded string of the image.

    Returns:
        PIL Image object of the decoded image.
    """
    # Decode the base64 string and convert to PIL Image
    image_data = base64.b64decode(base64_data)
    image_buffer = io.BytesIO(image_data)
    image = Image.open(image_buffer)

    return image


def get_response_from_llm_backend(
    message: Dict[str, Any],
    history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
    """Send the message and history to the backend and get a response.

    Args:
        message: Dictionary containing the current message with 'text' and optional 'files' keys.
        history: List of previous message dictionaries in the conversation.

    Returns:
        List containing text response and any image attachments from the backend service.
    """
    # Extract files and convert to base64
    image_data = []
    if uploaded_files := message.get("files", []):
        for file_path in uploaded_files:
            image_data.append(encode_image_to_base64_and_get_mime_type(file_path))

    # Prepare the request payload
    payload = ChatRequest(
        text=message["text"],
        files=image_data,
        session_id="default_session",
        user_id="default_user",
    )

    # Send request to backend
    try:
        response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
        response.raise_for_status()  # Raise exception for HTTP errors

        result = ChatResponse(**response.json())
        if result.error:
            return [f"Error: {result.error}"]

        chat_responses = []

        if result.thinking_process:
            chat_responses.append(
                gr.ChatMessage(
                    role="assistant",
                    content=result.thinking_process,
                    metadata={"title": "🧠 Thinking Process"},
                )
            )

        chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))

        if result.attachments:
            for attachment in result.attachments:
                image_data = attachment.serialized_image
                chat_responses.append(gr.Image(decode_base64_to_image(image_data)))

        return chat_responses
    except requests.exceptions.RequestException as e:
        return [f"Error connecting to backend service: {str(e)}"]


if __name__ == "__main__":
    demo = gr.ChatInterface(
        get_response_from_llm_backend,
        title="Personal Expense Assistant",
        description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
        type="messages",
        multimodal=True,
        textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
    )

    demo.launch(
        server_name="0.0.0.0",
        server_port=8080,
    )

その後、次のコマンドでフロントエンド サービスを実行してみます。main.py ファイルの名前を frontend.py に変更してください。

uv run frontend.py

Cloud コンソールに次のような出力が表示されます。

* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.

その後、ローカル URL リンクを Ctrl+クリックすると、ウェブ インターフェースを確認できます。または、Cloud エディタの右上にある [ウェブでプレビュー] ボタンをクリックし、[ポート 8080 でプレビュー] を選択して、フロントエンド アプリケーションにアクセスすることもできます。

49cbdfdf77964065.jpeg

ウェブ インターフェースが表示されますが、バックエンド サービスがまだ設定されていないため、チャットを送信しようとすると想定されるエラーが表示されます。

5caec77d95c35927.png

サービスを実行し、まだ終了させないでください。バックエンド サービスを別のターミナル タブで実行します。

コードの説明

このフロントエンド コードでは、まず、ユーザーがテキストを送信し、複数のファイルをアップロードできるようにします。Gradio では、gr.ChatInterface メソッドと gr.MultimodalTextbox を組み合わせて、このような機能を作成できます。

ファイルとテキストをバックエンドに送信する前に、バックエンドに必要なファイルの mimetype を把握する必要があります。また、画像ファイルのバイト数を base64 でエンコードし、mimetype とともに送信する必要があります。

class ImageData(BaseModel):
    """Model for image data with hash identifier.

    Attributes:
        serialized_image: Optional Base64 encoded string of the image content.
        mime_type: MIME type of the image.
    """

    serialized_image: str
    mime_type: str

フロントエンドとバックエンドのやり取りに使用されるスキーマは、schema.py で定義されています。Pydantic BaseModel を使用して、スキーマでデータ検証を適用します。

回答を受け取る際に、思考プロセス、最終的な回答、添付ファイルの部分をすでに分離しています。したがって、Gradio コンポーネントを使用して、UI コンポーネントで各コンポーネントを表示できます。

class ChatResponse(BaseModel):
    """Model for a chat response.

    Attributes:
        response: The text response from the model.
        thinking_process: Optional thinking process of the model.
        attachments: List of image data to be displayed to the user.
        error: Optional error message if something went wrong.
    """

    response: str
    thinking_process: str = ""
    attachments: List[ImageData] = []
    error: Optional[str] = None

9. FastAPI を使用してバックエンド サービスを構築する

次に、エージェント ランタイムを実行できるように、他のコンポーネントとともにエージェントを初期化できるバックエンドを構築する必要があります。

新しいファイルを作成し、[ファイル] > [新規テキスト ファイル] をクリックして、次のコードをコピーして貼り付け、backend.py として保存します。

from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
    extract_attachment_ids_and_sanitize_response,
    download_image_from_gcs,
    extract_thinking_process,
    format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings

SETTINGS = get_settings()
APP_NAME = "expense_manager_app"


# Application state to hold service contexts
class AppContexts(SimpleNamespace):
    """A class to hold application contexts with attribute access"""

    session_service: InMemorySessionService = None
    artifact_service: GcsArtifactService = None
    expense_manager_agent_runner: Runner = None


# Initialize application state
app_contexts = AppContexts()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary


# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
    return app_contexts


# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)


@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest = Body(...),
    app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
    """Process chat request and get response from the agent"""

    # Prepare the user's message in ADK format and store image artifacts
    content = await asyncio.to_thread(
        format_user_request_to_adk_content_and_store_artifacts,
        request=request,
        app_name=APP_NAME,
        artifact_service=app_context.artifact_service,
    )

    final_response_text = "Agent did not produce a final response."  # Default

    # Use the session ID from the request or default if not provided
    session_id = request.session_id
    user_id = request.user_id

    # Create session if it doesn't exist
    if not app_context.session_service.get_session(
        app_name=APP_NAME, user_id=user_id, session_id=session_id
    ):
        app_context.session_service.create_session(
            app_name=APP_NAME, user_id=user_id, session_id=session_id
        )

    try:
        # Process the message with the agent
        # Type annotation: runner.run_async returns an AsyncIterator[Event]
        events_iterator: AsyncIterator[Event] = (
            app_context.expense_manager_agent_runner.run_async(
                user_id=user_id, session_id=session_id, new_message=content
            )
        )
        async for event in events_iterator:  # event has type Event
            # Key Concept: is_final_response() marks the concluding message for the turn
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Extract text from the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate:
                    # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break  # Stop processing events once the final response is found

        logger.info(
            "Received final response from agent", raw_final_response=final_response_text
        )

        # Extract and process any attachments and thinking process in the response
        base64_attachments = []
        sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
            final_response_text
        )
        sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

        # Download images from GCS and replace hash IDs with base64 data
        for image_hash_id in attachment_ids:
            # Download image data and get MIME type
            result = await asyncio.to_thread(
                download_image_from_gcs,
                artifact_service=app_context.artifact_service,
                image_hash=image_hash_id,
                app_name=APP_NAME,
                user_id=user_id,
                session_id=session_id,
            )
            if result:
                base64_data, mime_type = result
                base64_attachments.append(
                    ImageData(serialized_image=base64_data, mime_type=mime_type)
                )

        logger.info(
            "Processed response with attachments",
            sanitized_response=sanitized_text,
            thinking_process=thinking_process,
            attachment_ids=attachment_ids,
        )

        return ChatResponse(
            response=sanitized_text,
            thinking_process=thinking_process,
            attachments=base64_attachments,
        )

    except Exception as e:
        logger.error("Error processing chat request", error_message=str(e))
        return ChatResponse(
            response="", error=f"Error in generating response: {str(e)}"
        )


# Only run the server if this file is executed directly
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081)

その後、バックエンド サービスを実行してみます。前の手順でフロントエンド サービスを正しく実行しました。ここでは、新しいターミナルを開いて、このバックエンド サービスを実行してみます。

  1. 新しいターミナルを作成します。下部にあるターミナルに移動し、[+] ボタンをクリックして新しいターミナルを作成します。または、Ctrl+Shift+C キーを押して新しいターミナルを開きます。

3e52a362475553dc.jpeg

  1. その後、作業ディレクトリ personal-expense-assistant に移動して、次のコマンドを実行します。
uv run backend.py
  1. 成功すると、次のような出力が表示されます。
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

コードの説明

ADK エージェント、SessionService、ArtifactService の初期化

バックエンド サービスでエージェントを実行するには、SessionService とエージェントの両方を受け取る Runner を作成する必要があります。SessionService は会話履歴と状態を管理するため、Runner と統合すると、エージェントは進行中の会話のコンテキストを受け取ることができます。

また、ArtifactService を使用してアップロードされたファイルを処理します。ADK のセッションアーティファクトについて詳しくは、こちらをご覧ください。

...

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary

...

このデモでは、InMemorySessionServiceGcsArtifactService を使用して、エージェント Runner と統合します。会話履歴はメモリに保存されるため、バックエンド サービスが強制終了または再起動されると失われます。これらは FastAPI アプリケーション ライフサイクル内で初期化され、/chat ルートに依存関係として挿入されます。

GcsArtifactService を使用した画像のアップロードとダウンロード

アップロードされたすべてのイメージは GcsArtifactService によってアーティファクトとして保存されます。これは、utils.py 内の format_user_request_to_adk_content_and_store_artifacts 関数で確認できます。

...    

# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
    format_user_request_to_adk_content_and_store_artifacts,
    request=request,
    app_name=APP_NAME,
    artifact_service=app_context.artifact_service,
)

...

エージェント ランナーによって処理されるすべてのリクエストは、types.Content 型にフォーマットする必要があります。この関数内で、各画像データを処理し、ID を抽出して画像 ID プレースホルダに置き換えます。

同様のメカニズムを使用して、正規表現を使用して画像 ID を抽出した後、添付ファイルをダウンロードします。

...
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
    final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
    # Download image data and get MIME type
    result = await asyncio.to_thread(
        download_image_from_gcs,
        artifact_service=app_context.artifact_service,
        image_hash=image_hash_id,
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id,
    )
...

10. 統合テスト

これで、複数のサービスが異なる Cloud コンソール タブで実行されるようになります。

  • ポート 8080 で実行されるフロントエンド サービス
* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.
  • ポート 8081 で実行されるバックエンド サービス
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

現時点では、領収書の画像をアップロードし、ポート 8080 のウェブ アプリケーションからアシスタントとシームレスにチャットできるはずです。

Cloud Shell エディタの上部にある [ウェブでプレビュー] ボタンをクリックし、[ポート 8080 でプレビュー] を選択します。

e7c9f56c2463164.png

アシスタントとやり取りしてみましょう。

次の領収書をダウンロードします。これらの領収書データの期間は 2023 ~ 2024 年で、アシスタントに保存またはアップロードを依頼します。

さまざまな質問をする

  • 「2023 ~ 2024 年の月ごとの経費の内訳を教えて」
  • 「コーヒーの取引の領収書を見せて」
  • 「焼肉ライクのレシート ファイルを送ってください」
  • など。

以下に、成功したインタラクションの例を示します。

f6ba4537438033b2.png

313a43d32b0901ef.png

11. Cloud Run へのデプロイ

もちろん、この素晴らしいアプリにはどこからでもアクセスしたいですよね。そのためには、このアプリケーションをパッケージ化して Cloud Run にデプロイします。このデモでは、このサービスを他のユーザーがアクセスできるパブリック サービスとして公開します。ただし、これは個人的なアプリケーションに適しているため、この種のアプリケーションではベスト プラクティスではありません。

6795e9abf2030334.jpeg

この Codelab では、フロントエンド サービスとバックエンド サービスの両方を 1 つのコンテナに配置します。両方のサービスを管理するには、supervisord が必要です。supervisord.conf ファイルを調べて、supervisord をエントリポイントとして設定した Dockerfile を確認できます。

この時点で、アプリケーションを Cloud Run にデプロイするために必要なファイルはすべて揃っています。デプロイしましょう。Cloud Shell ターミナルに移動し、現在のプロジェクトがアクティブなプロジェクトに構成されていることを確認します。構成されていない場合は、gcloud configure コマンドを使用してプロジェクト ID を設定します。

gcloud config set project [PROJECT_ID]

次に、次のコマンドを実行して Cloud Run にデプロイします。

gcloud run deploy personal-expense-assistant \
                  --source . \
                  --port=8080 \
                  --allow-unauthenticated \
                  --env-vars-file=settings.yaml \
                  --memory 1024Mi \
                  --region us-central1

Docker リポジトリの Artifact Registry の作成を確認するよう求められたら、Y と答えます。これはデモ用アプリケーションであるため、未認証のアクセスを許可しています。エンタープライズ アプリケーションと本番環境のアプリケーションには、適切な認証を使用することをおすすめします。

デプロイが完了すると、次のようなリンクが表示されます。

https://personal-expense-assistant-*******.us-central1.run.app

シークレット ウィンドウまたはモバイル デバイスからアプリを使用してください。すでに公開されているはずです。

12. 課題

データ探索スキルを磨くチャンスです。バックエンドが複数のユーザーに対応できるようにコードを変更する能力はありますか?更新が必要なコンポーネント

13. クリーンアップ

この Codelab で使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の操作を行います。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
  4. または、コンソールで [Cloud Run] に移動し、デプロイしたサービスを選択して削除することもできます。