Agent Development Kit를 통한 멀티모달 전환: Gemini 2.5, Firestore, Cloud Run을 사용한 개인 비용 지원

1. 📖 소개

db9331886978d543.png

개인 지출을 모두 관리하기가 귀찮았던 적이 있나요? 저도 마찬가지고요. 따라서 이 Codelab에서는 Gemini 2.5로 구동되는 개인 비용 관리 어시스턴트를 빌드하여 모든 작업을 대신 처리합니다. 업로드된 영수증을 관리하여 커피를 사는 데 이미 너무 많은 돈을 썼는지 분석할 수 있습니다.

이 도우미는 웹 브라우저를 통해 채팅 웹 인터페이스 형태로 접근 가능하며, 이를 통해 도우미와 소통하고, 영수증 이미지를 업로드하여 도우미에게 저장해 달라고 요청할 수 있으며, 영수증을 검색하여 파일을 가져와 비용 분석을 수행할 수도 있습니다. 이 모든 것은 Google Agent Development Kit 프레임워크를 기반으로 구축되었습니다.

애플리케이션 자체는 프런트엔드와 백엔드라는 두 서비스로 분리되어 있으므로 빠른 프로토타입을 빌드하고 어떤 느낌인지 시도해 볼 수 있으며, API 계약이 어떻게 보이는지 이해하여 두 서비스를 통합할 수도 있습니다.

Codelab을 통해 다음과 같이 단계별 접근 방식을 사용합니다.

  1. Google Cloud 프로젝트를 준비하고 해당 프로젝트에서 필요한 모든 API를 활성화하세요.
  2. Google Cloud Storage에 버킷을 설정하고 Firestore에 데이터베이스를 설정합니다.
  3. Firestore 색인 생성
  4. 코딩 환경을 위한 작업 공간 설정
  5. ADK 에이전트 소스 코드, 도구, 프롬프트 등을 구조화합니다.
  6. ADK 로컬 웹 개발 UI를 사용하여 에이전트 테스트
  7. Gradio 라이브러리를 사용하여 프런트엔드 서비스(채팅 인터페이스)를 빌드하고 일부 쿼리를 보내고 영수증 이미지를 업로드합니다.
  8. FastAPI를 사용하여 백엔드 서비스(HTTP 서버)를 빌드합니다. 여기에는 ADK 에이전트 코드, SessionService 및 Artifact Service가 있습니다.
  9. Cloud Run에 애플리케이션을 배포하는 데 필요한 환경 변수와 필수 파일을 관리합니다.
  10. Cloud Run에 애플리케이션 배포

아키텍처 개요

90805d85052a5e5a.jpeg

기본 요건

  • Python 작업에 능숙함
  • HTTP 서비스를 사용한 기본 풀 스택 아키텍처에 대한 이해

학습할 내용

  • Gradio를 사용한 프런트엔드 웹 프로토타이핑
  • FastAPIPydantic을 사용한 백엔드 서비스 개발
  • 다양한 기능을 활용하면서 ADK 에이전트를 설계합니다.
  • 도구 사용
  • 세션 및 아티팩트 관리
  • Gemini로 전송하기 전 입력 수정을 위한 콜백 활용
  • BuiltInPlanner를 활용하여 계획을 수립하여 작업 실행을 개선합니다.
  • ADK 로컬 웹 인터페이스를 통한 빠른 디버깅
  • ADK 콜백을 사용하여 신속한 엔지니어링 및 Gemini 요청 수정을 통해 정보 구문 분석 및 검색을 통해 다중 모드 상호 작용을 최적화하는 전략
  • Firestore를 벡터 데이터베이스로 사용한 에이전트 검색 증강 생성
  • Pydantic-settings를 사용하여 YAML 파일에서 환경 변수를 관리합니다.
  • Dockerfile을 사용하여 Cloud Run에 애플리케이션을 배포하고 YAML 파일로 환경 변수를 제공합니다.

필요한 항목

  • Chrome 웹브라우저
  • Gmail 계정
  • 결제가 사용 설정된 Cloud 프로젝트

이 Codelab은 초보자를 포함한 모든 수준의 개발자를 대상으로 하며 샘플 애플리케이션에서 Python을 사용합니다. 하지만 제시된 개념을 이해하는 데 파이썬 지식은 필요하지 않습니다.

2. 🚀 시작하기 전에

Cloud Console에서 활성 프로젝트 선택

이 코드랩에서는 이미 결제가 활성화된 Google Cloud 프로젝트가 있다고 가정합니다. 아직 없으시다면 아래 지침에 따라 시작하세요.

  1. Google Cloud 콘솔의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
  2. Cloud 프로젝트에 결제가 사용 설정되어 있어야 하므로 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

fcdd90149a030bf5.png

Firestore 데이터베이스 준비

다음으로 Firestore 데이터베이스도 만들어야 합니다. 네이티브 모드의 Firestore는 자동 확장, 고성능, 애플리케이션 개발의 용이성을 위해 구축된 NoSQL 문서 데이터베이스입니다. 또한 이는 우리 연구실의 검색 증강 생성(Retrieval Augmented Generation) 기술을 지원할 수 있는 벡터 데이터베이스 역할도 할 수 있습니다.

  1. 검색창에서 'firestore'를 검색하고 Firestore 제품을 클릭합니다.

44bbce791824bed6.png

  1. 그런 다음 Firestore 데이터베이스 만들기 버튼을 클릭합니다.
  2. (기본값)을 데이터베이스 ID 이름으로 사용하고 Standard Edition을 선택된 상태로 유지합니다. 이 랩 데모에서는 Open 보안 규칙과 함께 Firestore Native를 사용합니다.
  1. 또한 이 데이터베이스에는 실제로 무료 계층 사용 YEAY!가 있다는 것을 알 수 있습니다. 그 후, 데이터베이스 생성 버튼을 클릭하세요.

b97d210c465be94c.png

이러한 단계를 거치면 방금 생성한 Firestore 데이터베이스로 리디렉션되어야 합니다.

Cloud Shell 터미널에서 클라우드 프로젝트 설정

  1. bq에 미리 로드되어 있는 Google Cloud에서 실행되는 명령줄 환경인 Cloud Shell을 사용하게 됩니다. Google Cloud 콘솔 상단에서 Cloud Shell 활성화를 클릭합니다.

26f20e837ff06119.png

  1. Cloud Shell에 연결되면 다음 명령을 사용하여 이미 인증되었고 프로젝트가 프로젝트 ID로 설정되었는지 확인합니다.
gcloud auth list
  1. Cloud Shell에서 다음 명령어를 실행하여 gcloud 명령어가 프로젝트를 알고 있는지 확인합니다.
gcloud config list project
  1. 프로젝트가 설정되지 않은 경우 다음 명령어를 사용하여 설정합니다.
gcloud config set project <YOUR_PROJECT_ID>

또는 콘솔에서 PROJECT_ID ID를 볼 수도 있습니다.

bb98435b79995b15.jpeg

클릭하면 오른쪽에 프로젝트 전체와 프로젝트 ID가 표시됩니다.

ffa73dee57de5307.jpeg

  1. 아래 명령을 통해 필요한 API를 활성화하세요. 이 작업에는 몇 분 정도 걸릴 수 있으니 잠시만 기다려 주시기 바랍니다.
gcloud services enable aiplatform.googleapis.com \
                       firestore.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudresourcemanager.googleapis.com

명령을 성공적으로 실행하면 아래와 비슷한 메시지가 표시됩니다.

Operation "operations/..." finished successfully.

gcloud 명령의 대안은 콘솔을 통해 각 제품을 검색하거나 이 링크를 사용하는 것입니다.

누락된 API가 있으면 구현 과정에서 언제든지 해당 API를 활성화할 수 있습니다.

gcloud 명령어 및 사용법은 문서를 참조하세요.

Google Cloud Storage 버킷 준비

다음으로, 동일한 터미널에서 업로드된 파일을 저장할 GCS 버킷을 준비해야 합니다. 다음 명령을 실행하여 버킷을 생성하세요. 개인 경비 지원 영수증과 관련된 고유하면서도 관련성 있는 버킷 이름이 필요하므로 프로젝트 ID와 결합된 다음 버킷 이름을 활용합니다.

gsutil mb -l us-central1 gs://personal-expense-{your-project-id}

이 출력이 표시됩니다

Creating gs://personal-expense-{your-project-id}

브라우저 왼쪽 상단의 탐색 메뉴로 이동하여 Cloud Storage -> Bucket을 선택하면 이를 확인할 수 있습니다.

7b9fd51982d351fa.png

Firestore는 기본적으로 NoSQL 데이터베이스로, 뛰어난 성능과 데이터 모델의 유연성을 제공하지만 복잡한 쿼리를 처리하는 데는 한계가 있습니다. 복합 다중 필드 쿼리와 벡터 검색을 활용할 계획이므로 먼저 인덱스를 만들어야 합니다. 자세한 내용은 이 문서에서 확인할 수 있습니다.

  1. 복합 쿼리를 지원하는 인덱스를 생성하려면 다음 명령을 실행하세요.
gcloud firestore indexes composite create \
        --collection-group=personal-expense-assistant-receipts \
        --field-config field-path=total_amount,order=ASCENDING \
        --field-config field-path=transaction_time,order=ASCENDING \
        --field-config field-path=__name__,order=ASCENDING \
        --database="(default)"
  1. 벡터 검색을 지원하기 위해 이것을 실행합니다.
gcloud firestore indexes composite create \
        --collection-group="personal-expense-assistant-receipts" \
        --query-scope=COLLECTION \
        --field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
        --database="(default)"

클라우드 콘솔에서 Firestore를 방문하여 (기본값) 데이터베이스 인스턴스를 클릭하고 탐색 모음에서 인덱스를 선택하면 생성된 인덱스를 확인할 수 있습니다.

9849724dd55dfab7.png

Cloud Shell 편집기로 이동하여 애플리케이션 작업 디렉터리 설정

이제 코드 편집기를 설정하여 코딩 작업을 할 수 있습니다. 이를 위해 Cloud Shell Editor를 사용합니다.

  1. 편집기 열기 버튼을 클릭하면 Cloud Shell 편집기가 열리고 여기에 코드를 작성할 수 있습니다. 168eacea651b086c.png
  2. 다음으로, 셸이 이미 올바른 프로젝트 ID로 구성되었는지도 확인해야 합니다. 터미널에서 $아이콘 앞에 값 ( )이 있는 경우 ( 아래 스크린샷에서 값은 "adk-multimodal-tool") 이 값은 활성 셸 세션에 구성된 프로젝트를 보여줍니다.

10a99ff80839b635.png

표시된 이 이미 올바른 경우 다음 명령건너뛸 수 있습니다. 하지만 올바르지 않거나 누락된 경우 다음 명령을 실행하세요.

gcloud config set project <YOUR_PROJECT_ID>
  1. 다음으로 GitHub에서 이 Codelab의 템플릿 작업 디렉터리를 클론합니다. 다음 명령어를 실행하세요. personal-expense-assistant 디렉터리에 작업 디렉터리가 생성됩니다.
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
  1. 그런 다음 Cloud Shell 편집기의 상단 섹션으로 이동하여 파일->폴더 열기를 클릭하고 사용자 이름 디렉토리를 찾은 다음 개인 경비 보조원 디렉토리를 찾은 다음 확인 버튼을 클릭합니다. 이렇게 하면 선택한 디렉터리가 기본 작업 디렉터리가 됩니다. 이 예에서 사용자 이름은 alvinprayuda이므로 디렉토리 경로는 아래와 같습니다.

c87d2b76896d0c59.png

524b9e6369f68cca.png

이제 Cloud Shell 편집기가 다음과 같이 표시됩니다.

9a58ccc43f48338d.png

환경 설정

Python 가상 환경 준비

다음 단계는 개발 환경을 준비하는 것입니다. 현재 활성화된 터미널은 personal-expense-assistant 작업 디렉토리에 있어야 합니다. 이 코드랩에서는 Python 3.12를 활용하고 uv python 프로젝트 관리자를 사용하여 Python 버전과 가상 환경을 만들고 관리하는 필요성을 간소화합니다.

  1. 아직 터미널을 열지 않았다면 터미널 -> 새 터미널을 클릭하거나 Ctrl + Shift + C를 사용하여 터미널을 열면 브라우저 하단에 터미널 창이 열립니다.

8635b60ae2f45bbc.jpeg

  1. 이제 uv를 사용하여 가상 환경을 초기화하고 다음 명령을 실행해 보겠습니다.
cd ~/personal-expense-assistant
uv sync --frozen

이렇게 하면 .venv 디렉토리가 생성되고 종속성이 설치됩니다. pyproject.toml을 잠깐 살펴보면 다음과 같이 표시되는 종속성에 대한 정보를 얻을 수 있습니다.

dependencies = [
    "datasets>=3.5.0",
    "google-adk==1.18",
    "google-cloud-firestore>=2.20.1",
    "gradio>=5.23.1",
    "pydantic>=2.10.6",
    "pydantic-settings[yaml]>=2.8.1",
]

설정 구성 파일

이제 이 프로젝트의 구성 파일을 설정해야 합니다. YAML 파일에서 구성을 읽기 위해 pydantic-settings를 사용합니다.

settings.yaml.example 내에 파일 템플릿이 이미 제공되어 있으므로 파일을 복사하고 settings.yaml로 이름을 바꿔야 합니다. 다음 명령어를 실행하여 파일을 만듭니다.

cp settings.yaml.example settings.yaml

그런 다음 다음 값을 파일에 복사합니다.

GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your-project-id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-{your-project-id}"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"

이 코드랩에서는 GCLOUD_LOCATION, BACKEND_URL,DB_COLLECTION_NAME에 대해 미리 구성된 값을 사용합니다 .

이제 다음 단계로 넘어가 에이전트를 구축한 다음 서비스를 구축할 수 있습니다.

3. 🚀 Google ADK 및 Gemini 2.5를 사용하여 에이전트 빌드

ADK 디렉터리 구조 소개

먼저 ADK에서 제공하는 기능과 에이전트를 빌드하는 방법을 살펴보겠습니다. ADK 전체 문서는 이 URL에서 확인할 수 있습니다 . ADK는 CLI 명령어 실행 내에서 다양한 유틸리티를 제공합니다. 일부 예는 다음과 같습니다.

  • 에이전트 디렉터리 구조 설정
  • CLI 입력 출력을 통해 상호작용을 빠르게 시도해 보세요.
  • 로컬 개발 UI 웹 인터페이스를 빠르게 설정

이제 CLI 명령을 사용하여 에이전트 디렉토리 구조를 만들어 보겠습니다. 다음 명령어를 실행합니다.

uv run adk create expense_manager_agent

메시지가 표시되면 모델 gemini-2.5-flashVertex AI 백엔드를 선택합니다. 그러면 마법사에서 프로젝트 ID와 위치를 묻습니다. Enter 키를 눌러 기본 옵션을 수락하거나 필요에 따라 변경할 수 있습니다. 이 실습의 앞부분에서 만든 올바른 프로젝트 ID를 사용하고 있는지 다시 한번 확인하세요. 출력은 다음과 같습니다.

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1
1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project, check out this link for details:
https://google.github.io/adk-docs/get-started/quickstart/#gemini---google-cloud-vertex-ai

Enter Google Cloud project ID [going-multimodal-lab]: 
Enter Google Cloud region [us-central1]: 

Agent created in /home/username/personal-expense-assistant/expense_manager_agent:
- .env
- __init__.py
- agent.py

다음과 같은 에이전트 디렉토리 구조가 생성됩니다.

expense_manager_agent/
├── __init__.py
├── .env
├── agent.py

init.pyagent.py를 검사하면 다음 코드가 표시됩니다.

# __init__.py

from . import agent
# agent.py

from google.adk.agents import Agent

root_agent = Agent(
    model='gemini-2.5-flash',
    name='root_agent',
    description='A helpful assistant for user questions.',
    instruction='Answer user questions to the best of your knowledge',
)

이제 다음을 실행하여 테스트할 수 있습니다.

uv run adk run expense_manager_agent

테스트가 끝나면 exit를 입력하거나 Ctrl+D를 눌러 에이전트를 종료할 수 있습니다.

경비 관리 에이전트 구축하기

경비 관리 에이전트를 만들어 보자! expense_manager_agent/agent.py 파일을 열고 root_agent.를 포함할 아래 코드를 복사합니다.

# expense_manager_agent/agent.py

from google.adk.agents import Agent
from expense_manager_agent.tools import (
    store_receipt_data,
    search_receipts_by_metadata_filter,
    search_relevant_receipts_by_natural_language_query,
    get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types

SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"

# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
    task_prompt = file.read()

root_agent = Agent(
    name="expense_manager_agent",
    model="gemini-2.5-flash",
    description=(
        "Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
    ),
    instruction=task_prompt,
    tools=[
        store_receipt_data,
        get_receipt_data_by_image_id,
        search_receipts_by_metadata_filter,
        search_relevant_receipts_by_natural_language_query,
    ],
    planner=BuiltInPlanner(
        thinking_config=types.ThinkingConfig(
            thinking_budget=2048,
        )
    ),
    before_model_callback=modify_image_data_in_history,
)

코드 설명

이 스크립트에는 다음 항목을 초기화하는 에이전트 초기화가 포함되어 있습니다.

  • 사용할 모델을 gemini-2.5-flash로 설정합니다.
  • task_prompt.md에서 읽어오는 시스템 프롬프트로 에이전트 설명과 요청 사항을 설정합니다.
  • 에이전트 기능을 지원하는 데 필요한 도구를 제공합니다.
  • Gemini 2.5 Flash 사고 기능을 사용하여 최종 응답이나 실행을 생성하기 전에 계획을 수립할 수 있습니다.
  • 예측을 하기 전에 전송되는 이미지 데이터 수를 제한하기 위해 Gemini에 요청을 보내기 전에 콜백 인터셉트를 설정합니다.

4. 🚀 에이전트 도구 구성

비용 관리자 에이전트는 다음 기능을 갖습니다.

  • 영수증 이미지에서 데이터를 추출하고 데이터와 파일을 저장합니다.
  • 비용 데이터에 대한 정확한 검색
  • 비용 데이터에 대한 상황별 검색

따라서 이 기능을 지원하는 적절한 도구가 필요합니다. expense_manager_agent 디렉터리 아래에 새 파일을 만들고 이름을 tools.py로 지정합니다.

touch expense_manager_agent/tools.py

expense_manage_agent/tools.py를 열고 아래 코드를 복사합니다.

# expense_manager_agent/tools.py

import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai

SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
    project=SETTINGS.GCLOUD_PROJECT_ID
)  # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
    vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""


def sanitize_image_id(image_id: str) -> str:
    """Sanitize image ID by removing any leading/trailing whitespace."""
    if image_id.startswith("[IMAGE-"):
        image_id = image_id.split("ID ")[1].split("]")[0]

    return image_id.strip()


def store_receipt_data(
    image_id: str,
    store_name: str,
    transaction_time: str,
    total_amount: float,
    purchased_items: List[Dict[str, Any]],
    currency: str = "IDR",
) -> str:
    """
    Store receipt data in the database.

    Args:
        image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
            the ID of the image is 12345.
        store_name (str): The name of the store.
        transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
        total_amount (float): The total amount spent.
        purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
            - name (str): The name of the item.
            - price (float): The price of the item.
            - quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
        currency (str, optional): The currency of the transaction, can be derived from the store location.
            If unsure, default is "IDR".

    Returns:
        str: A success message with the receipt ID.

    Raises:
        Exception: If the operation failed or input is invalid.
    """
    try:
        # In case of it provide full image placeholder, extract the id string
        image_id = sanitize_image_id(image_id)

        # Check if the receipt already exists
        doc = get_receipt_data_by_image_id(image_id)

        if doc:
            return f"Receipt with ID {image_id} already exists"

        # Validate transaction time
        if not isinstance(transaction_time, str):
            raise ValueError(
                "Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )
        try:
            datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError(
                "Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )

        # Validate items format
        if not isinstance(purchased_items, list):
            raise ValueError(INVALID_ITEMS_FORMAT_ERR)

        for _item in purchased_items:
            if (
                not isinstance(_item, dict)
                or "name" not in _item
                or "price" not in _item
            ):
                raise ValueError(INVALID_ITEMS_FORMAT_ERR)

            if "quantity" not in _item:
                _item["quantity"] = 1

        # Create a combined text from all receipt information for better embedding
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004",
            contents=RECEIPT_DESC_FORMAT.format(
                store_name=store_name,
                transaction_time=transaction_time,
                total_amount=total_amount,
                currency=currency,
                purchased_items=purchased_items,
                receipt_id=image_id,
            ),
        )

        embedding = result.embeddings[0].values

        doc = {
            "receipt_id": image_id,
            "store_name": store_name,
            "transaction_time": transaction_time,
            "total_amount": total_amount,
            "currency": currency,
            "purchased_items": purchased_items,
            EMBEDDING_FIELD_NAME: Vector(embedding),
        }

        COLLECTION.add(doc)

        return f"Receipt stored successfully with ID: {image_id}"
    except Exception as e:
        raise Exception(f"Failed to store receipt: {str(e)}")


def search_receipts_by_metadata_filter(
    start_time: str,
    end_time: str,
    min_total_amount: float = -1.0,
    max_total_amount: float = -1.0,
) -> str:
    """
    Filter receipts by metadata within a specific time range and optionally by amount.

    Args:
        start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
        max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.

    Returns:
        str: A string containing the list of receipt data matching all applied filters.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Validate start and end times
        if not isinstance(start_time, str) or not isinstance(end_time, str):
            raise ValueError("start_time and end_time must be strings in ISO format")
        try:
            datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
            datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError("start_time and end_time must be strings in ISO format")

        # Start with the base collection reference
        query = COLLECTION

        # Build the composite query by properly chaining conditions
        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        filters = [
            FieldFilter("transaction_time", ">=", start_time),
            FieldFilter("transaction_time", "<=", end_time),
        ]

        # Add optional filters
        if min_total_amount != -1:
            filters.append(FieldFilter("total_amount", ">=", min_total_amount))

        if max_total_amount != -1:
            filters.append(FieldFilter("total_amount", "<=", max_total_amount))

        # Apply the filters
        composite_filter = And(filters=filters)
        query = query.where(filter=composite_filter)

        # Execute the query and collect results
        search_result_description = "Search by Metadata Results:\n"
        for doc in query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display

            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error filtering receipts: {str(e)}")


def search_relevant_receipts_by_natural_language_query(
    query_text: str, limit: int = 5
) -> str:
    """
    Search for receipts with content most similar to the query using vector search.
    This tool can be use for user query that is difficult to translate into metadata filters.
    Such as store name or item name which sensitive to string matching.
    Use this tool if you cannot utilize the search by metadata filter tool.

    Args:
        query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
        limit (int, optional): Maximum number of results to return (default: 5).

    Returns:
        str: A string containing the list of contextually relevant receipt data.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Generate embedding for the query text
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004", contents=query_text
        )
        query_embedding = result.embeddings[0].values

        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        vector_query = COLLECTION.find_nearest(
            vector_field=EMBEDDING_FIELD_NAME,
            query_vector=Vector(query_embedding),
            distance_measure=DistanceMeasure.EUCLIDEAN,
            limit=limit,
        )

        # Execute the query and collect results
        search_result_description = "Search by Contextual Relevance Results:\n"
        for doc in vector_query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display
            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error searching receipts: {str(e)}")


def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
    """
    Retrieve receipt data from the database using the image_id.

    Args:
        image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
            [IMAGE-ID 12345], the ID to use is 12345.

    Returns:
        Dict[str, Any]: A dictionary containing the receipt data with the following keys:
            - receipt_id (str): The unique identifier of the receipt image.
            - store_name (str): The name of the store.
            - transaction_time (str): The time of purchase in UTC.
            - total_amount (float): The total amount spent.
            - currency (str): The currency of the transaction.
            - purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
        Returns an empty dictionary if no receipt is found.
    """
    # In case of it provide full image placeholder, extract the id string
    image_id = sanitize_image_id(image_id)

    # Query the receipts collection for documents with matching receipt_id (image_id)
    # Notes that this demo assume 1 user only,
    # need to refactor the query for multiple user
    query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
    docs = list(query.stream())

    if not docs:
        return {}

    # Get the first matching document
    doc_data = docs[0].to_dict()
    doc_data.pop(EMBEDDING_FIELD_NAME, None)

    return doc_data

코드 설명

이 도구 기능 구현에서는 다음 2가지 주요 아이디어를 중심으로 도구를 설계합니다.

  • 이미지 ID 문자열 자리 표시자 [IMAGE-ID <hash-of-image-1>]를 사용하여 영수증 데이터를 구문 분석하고 원본 파일에 매핑합니다.
  • Firestore 데이터베이스를 사용한 데이터 저장 및 검색

'store_receipt_data' 도구

747fb55e801455f4.png

이 도구는 광학 문자 인식 도구로, 이미지 데이터에서 필수 정보를 파싱하고 이미지 ID 문자열을 인식하여 Firestore 데이터베이스에 저장되도록 함께 매핑합니다.

또한 이 도구는 text-embedding-004를 사용하여 영수증의 콘텐츠를 임베딩으로 변환하므로 모든 메타데이터와 임베딩이 함께 저장되고 색인이 생성됩니다. 쿼리 또는 컨텍스트 검색으로 검색할 수 있는 유연성을 지원합니다.

이 도구를 성공적으로 실행하면 아래와 같이 Firestore 데이터베이스에 이미 색인이 생성된 영수증 데이터를 확인할 수 있습니다.

636d56be9880f3c7.png

도구 "메타데이터_필터_로_영수증_검색"

6d8fbd9b43ff7ea7.png

이 도구는 사용자 쿼리를 날짜 범위 또는 총 거래별 검색을 지원하는 메타데이터 쿼리 필터로 변환합니다. 일치하는 모든 영수증 데이터를 반환합니다. 이 과정에서 에이전트가 맥락을 이해하는 데 필요하지 않으므로 삽입 필드는 삭제됩니다.

'search_relevant_receipts_by_natural_language_query' 도구

7262c75114af0060.png

이것이 검색 증강 생성 (RAG) 도구입니다. Google Cloud의 에이전트는 벡터 데이터베이스에서 관련 영수증을 가져오기 위한 자체 쿼리를 설계할 수 있으며 이 도구를 사용할 시점을 선택할 수도 있습니다. 에이전트가 이 RAG 도구를 사용할지 여부를 독립적으로 결정하고 자체 쿼리를 설계하도록 허용한다는 개념은 에이전트 RAG 접근 방식의 정의 중 하나입니다.

AI 모델이 자체 쿼리를 빌드할 수 있을 뿐만 아니라 검색할 관련 문서 수를 선택할 수도 있습니다. 적절한 프롬프트 엔지니어링과 결합하면 다음과 같은 결과를 얻을 수 있습니다.

# Example prompt

Always filter the result from tool
search_relevant_receipts_by_natural_language_query as the returned 
result may contain irrelevant information

이를 통해 이 도구는 거의 모든 것을 검색할 수 있는 강력한 도구가 되지만, 최근접 이웃 검색의 비정확한 특성으로 인해 예상한 모든 결과를 반환하지는 못할 수 있습니다.

5. 🚀 콜백을 통한 대화 컨텍스트 수정

Google ADK를 사용하면 다양한 수준에서 에이전트 런타임을 '인터셉트'할 수 있습니다. 이 상세 기능에 관한 자세한 내용은 이 문서를 참고하세요 . 이 실습에서는 before_model_callback를 활용하여 LLM으로 전송되기 전에 요청을 수정하여 효율성을 위해 이전 대화 기록 컨텍스트에서 이미지 데이터를 삭제합니다 ( 마지막 3개의 사용자 상호작용에만 이미지 데이터 포함).

하지만 필요할 때 에이전트가 이미지 데이터 컨텍스트를 갖기를 원합니다. 따라서 대화에서 각 이미지 바이트 데이터 뒤에 문자열 이미지 ID 플레이스홀더를 추가하는 메커니즘을 추가합니다. 이를 통해 에이전트는 이미지 ID를 실제 파일 데이터에 연결할 수 있으며, 이는 이미지를 저장하거나 검색할 때 모두 활용할 수 있습니다. 구조는 다음과 같습니다.

<image-byte-data-1>
[IMAGE-ID <hash-of-image-1>]
<image-byte-data-2>
[IMAGE-ID <hash-of-image-2>]
And so on..

대화 기록에서 바이트 데이터가 더 이상 사용되지 않더라도 문자열 식별자는 여전히 존재하여 도구를 사용하여 데이터에 액세스할 수 있습니다. 이미지 데이터 제거 후의 예시 히스토리 구조

[IMAGE-ID <hash-of-image-1>]
[IMAGE-ID <hash-of-image-2>]
And so on..

시작 expense_manager_agent 디렉토리 아래에 새 파일을 만들고 이름을 callbacks.py로 지정합니다.

touch expense_manager_agent/callbacks.py

expense_manager_agent/callbacks.py 파일을 열고 아래 코드를 복사하세요.

# expense_manager_agent/callbacks.py

import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest


def modify_image_data_in_history(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
    # The following code will modify the request sent to LLM
    # We will only keep image data in the last 3 user messages using a reverse and counter approach

    # Count how many user messages we've processed
    user_message_count = 0

    # Process the reversed list
    for content in reversed(llm_request.contents):
        # Only count for user manual query, not function call
        if (content.role == "user") and (content.parts[0].function_response is None):
            user_message_count += 1
            modified_content_parts = []

            # Check any missing image ID placeholder for any image data
            # Then remove image data from conversation history if more than 3 user messages
            for idx, part in enumerate(content.parts):
                if part.inline_data is None:
                    modified_content_parts.append(part)
                    continue

                if (
                    (idx + 1 >= len(content.parts))
                    or (content.parts[idx + 1].text is None)
                    or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
                ):
                    # Generate hash ID for the image and add a placeholder
                    image_data = part.inline_data.data
                    hasher = hashlib.sha256(image_data)
                    image_hash_id = hasher.hexdigest()[:12]
                    placeholder = f"[IMAGE-ID {image_hash_id}]"

                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

                    modified_content_parts.append(types.Part(text=placeholder))

                else:
                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

            # This will modify the contents inside the llm_request
            content.parts = modified_content_parts

6. 🚀 프롬프트

복잡한 상호작용과 기능을 갖춘 에이전트를 설계하려면 에이전트가 원하는 대로 동작할 수 있도록 안내하는 충분히 좋은 프롬프트를 찾아야 합니다.

이전에는 대화 내역의 이미지 데이터를 처리하는 방법에 대한 메커니즘이 있었고, search_relevant_receipts_by_natural_language_query.와 같이 사용하기 쉽지 않은 도구도 있었습니다. 또한 상담원이 올바른 영수증 이미지를 검색하여 가져올 수 있도록 하려고 합니다. 이는 우리가 이 모든 정보를 적절한 신속한 구조로 적절하게 전달해야 한다는 것을 의미합니다.

우리는 에이전트에게 사고 과정, 최종 응답 및 첨부 파일 ( 있는 경우)을 분석하기 위해 다음과 같은 마크다운 형식으로 출력을 구성하도록 요청할 것입니다.

# THINKING PROCESS

Thinking process here

# FINAL RESPONSE

Response to the user here

Attachments put inside json block

{
    "attachments": [
      "[IMAGE-ID <hash-id-1>]",
      "[IMAGE-ID <hash-id-2>]",
      ...
    ]
}

경비 관리자 에이전트의 행동에 대한 우리의 초기 기대를 달성하기 위해 다음 프롬프트부터 시작해 보겠습니다. task_prompt.md 파일은 기존 작업 디렉토리에 이미 있어야 하지만, 이를 expense_manager_agent 디렉토리 아래로 옮겨야 합니다. 다음 명령을 실행하여 이동하세요.

mv task_prompt.md expense_manager_agent/task_prompt.md

7. 🚀 에이전트 테스트

이제 CLI를 통해 에이전트와 통신을 시도해 보겠습니다. 다음 명령을 실행하세요.

uv run adk run expense_manager_agent

다음과 같은 출력이 표시되며, 여기서 에이전트와 번갈아 채팅할 수 있습니다. 하지만 이 인터페이스를 통해서는 텍스트만 전송할 수 있습니다.

Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root_agent, type exit to exit.
user: hello
[root_agent]: Hello there! How can I help you today?
user: 

이제 ADK를 사용하면 CLI 상호 작용 외에도 개발 UI를 통해 상호 작용하고 상호 작용 중에 무슨 일이 일어나는지 검사할 수 있습니다. 다음 명령어를 실행하여 로컬 개발 UI 서버를 시작합니다.

uv run adk web --port 8080

다음 예제와 같은 출력이 생성됩니다. 즉, 이미 웹 인터페이스에 액세스할 수 있다는 의미입니다.

INFO:     Started server process [xxxx]
INFO:     Waiting for application startup.

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://localhost:8080.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

이제 이를 확인하려면 Cloud Shell 편집기의 상단 영역에 있는 웹 미리보기 버튼을 클릭하고 포트 8080에서 미리보기를 선택합니다.

edc73e971b9fc60c.png

다음 웹 페이지가 표시되며, 왼쪽 상단 드롭다운 버튼에서 사용 가능한 에이전트를 선택할 수 있습니다 ( 이 경우 expense_manager_agent). 그리고 봇과 상호 작용할 수 있습니다. 왼쪽 창에는 에이전트 런타임 중 로그 세부정보에 관한 많은 정보가 표시됩니다.

16c333a4b782eeba.png

몇 가지 작업을 시도해 보겠습니다. 다음 2개의 영수증 예시를 업로드하세요 ( 출처 : 포옹하는 얼굴 데이터 세트 mousserlane/id_receipt_dataset) . 각 이미지를 마우스 오른쪽 버튼으로 클릭하고 이미지를 다른 이름으로 저장을 선택합니다. ( 이렇게 하면 영수증 이미지가 다운로드됩니다.) 그런 다음 "클립" 아이콘을 클릭하여 봇에 파일을 업로드하고 이 영수증을 저장하고 싶다고 말합니다.

2975b3452e0ac0bd.png 143a2e147a18fc38.png

그 후 다음 쿼리를 시도하여 검색이나 파일 검색을 수행해 보세요.

  • "2023년 동안의 지출 내역과 총액을 기재해 주십시오."
  • 'Indomaret의 영수증 파일을 줘'

일부 도구를 사용하면 개발 UI에서 무슨 일이 일어나고 있는지 검사할 수 있습니다.

da461a67b7d81ad5.png

에이전트가 어떻게 대답하는지 확인하고 task_prompt.py 내 프롬프트에 제공된 모든 규칙을 준수하는지 확인합니다. 축하합니다! 이제 완전하게 작동하는 개발 에이전트가 준비되었습니다.

이제 적절하고 멋진 UI와 이미지 파일을 업로드하고 다운로드하는 기능을 사용하여 이를 완성할 차례입니다.

8. 🚀 Gradio를 사용하여 프런트엔드 서비스 빌드

우리는 이와 같은 채팅 웹 인터페이스를 구축할 것입니다.

db9331886978d543.png

여기에는 사용자가 텍스트를 보내고 영수증 이미지 파일을 업로드할 수 있는 입력란이 있는 채팅 인터페이스가 포함되어 있습니다.

Gradio를 사용하여 프런트엔드 서비스를 구축하겠습니다.

새 파일을 만들고 이름을 frontend.py로 지정합니다.

touch frontend.py

그런 다음 다음 코드를 복사하여 저장하세요.

import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse


SETTINGS = get_settings()


def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
    """Encode a file to base64 string and get MIME type.

    Reads an image file and returns the base64-encoded image data and its MIME type.

    Args:
        image_path: Path to the image file to encode.

    Returns:
        ImageData object containing the base64 encoded image data and its MIME type.
    """
    # Read the image file
    with open(image_path, "rb") as file:
        image_content = file.read()

    # Get the mime type
    mime_type = mimetypes.guess_type(image_path)[0]

    # Base64 encode the image
    base64_data = base64.b64encode(image_content).decode("utf-8")

    # Return as ImageData object
    return ImageData(serialized_image=base64_data, mime_type=mime_type)


def decode_base64_to_image(base64_data: str) -> Image.Image:
    """Decode a base64 string to PIL Image.

    Converts a base64-encoded image string back to a PIL Image object
    that can be displayed or processed further.

    Args:
        base64_data: Base64 encoded string of the image.

    Returns:
        PIL Image object of the decoded image.
    """
    # Decode the base64 string and convert to PIL Image
    image_data = base64.b64decode(base64_data)
    image_buffer = io.BytesIO(image_data)
    image = Image.open(image_buffer)

    return image


def get_response_from_llm_backend(
    message: Dict[str, Any],
    history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
    """Send the message and history to the backend and get a response.

    Args:
        message: Dictionary containing the current message with 'text' and optional 'files' keys.
        history: List of previous message dictionaries in the conversation.

    Returns:
        List containing text response and any image attachments from the backend service.
    """
    # Extract files and convert to base64
    image_data = []
    if uploaded_files := message.get("files", []):
        for file_path in uploaded_files:
            image_data.append(encode_image_to_base64_and_get_mime_type(file_path))

    # Prepare the request payload
    payload = ChatRequest(
        text=message["text"],
        files=image_data,
        session_id="default_session",
        user_id="default_user",
    )

    # Send request to backend
    try:
        response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
        response.raise_for_status()  # Raise exception for HTTP errors

        result = ChatResponse(**response.json())
        if result.error:
            return [f"Error: {result.error}"]

        chat_responses = []

        if result.thinking_process:
            chat_responses.append(
                gr.ChatMessage(
                    role="assistant",
                    content=result.thinking_process,
                    metadata={"title": "🧠 Thinking Process"},
                )
            )

        chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))

        if result.attachments:
            for attachment in result.attachments:
                image_data = attachment.serialized_image
                chat_responses.append(gr.Image(decode_base64_to_image(image_data)))

        return chat_responses
    except requests.exceptions.RequestException as e:
        return [f"Error connecting to backend service: {str(e)}"]


if __name__ == "__main__":
    demo = gr.ChatInterface(
        get_response_from_llm_backend,
        title="Personal Expense Assistant",
        description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
        type="messages",
        multimodal=True,
        textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
    )

    demo.launch(
        server_name="0.0.0.0",
        server_port=8080,
    )

그 후, 다음 명령을 사용하여 프런트엔드 서비스를 실행해 볼 수 있습니다. main.py 파일의 이름을 frontend.py로 바꾸는 것을 잊지 마세요.

uv run frontend.py

클라우드 콘솔에 다음과 비슷한 출력이 표시됩니다.

* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.

그런 다음 로컬 URL 링크를 Ctrl+클릭하면 웹 인터페이스를 확인할 수 있습니다. 또는 Cloud Editor의 오른쪽 상단에 있는 웹 미리보기 버튼을 클릭하고 포트 8080에서 미리보기를 선택하여 프런트엔드 애플리케이션에 액세스할 수도 있습니다.

b477bc3c686a5fc3.jpeg

웹 인터페이스가 표시되지만 아직 설정되지 않은 백엔드 서비스로 인해 채팅을 제출하려고 하면 예상 오류가 발생합니다.

b5de2f284155dac2.png

이제 서비스를 실행하고 아직 종료하지 마세요. 다른 터미널 탭에서 백엔드 서비스를 실행합니다.

코드 설명

이 프런트엔드 코드에서는 먼저 사용자가 텍스트를 보내고 여러 개의 파일을 업로드할 수 있도록 합니다. Gradio를 사용하면 gr.ChatInterface 메서드와 gr.MultimodalTextbox를 결합하여 이러한 종류의 기능을 만들 수 있습니다.

이제 파일과 텍스트를 백엔드로 전송하기 전에 백엔드에 필요한 파일의 MIME 유형을 파악해야 합니다. 이미지 파일 바이트를 base64로 인코딩하고 mimetype과 함께 전송해야 합니다.

class ImageData(BaseModel):
    """Model for image data with hash identifier.

    Attributes:
        serialized_image: Optional Base64 encoded string of the image content.
        mime_type: MIME type of the image.
    """

    serialized_image: str
    mime_type: str

프런트엔드-백엔드 상호작용에 사용되는 스키마는 schema.py에 정의되어 있습니다. 스키마에서 데이터 검증을 시행하기 위해 Pydantic BaseModel을 활용합니다.

응답을 받을 때, 우리는 이미 어떤 부분이 사고과정인지, 어떤 부분이 최종 응답인지, 어떤 부분이 애착인지 구분합니다. 따라서 Gradio 구성 요소를 활용하여 UI 구성 요소와 함께 각 구성 요소를 표시할 수 있습니다.

class ChatResponse(BaseModel):
    """Model for a chat response.

    Attributes:
        response: The text response from the model.
        thinking_process: Optional thinking process of the model.
        attachments: List of image data to be displayed to the user.
        error: Optional error message if something went wrong.
    """

    response: str
    thinking_process: str = ""
    attachments: List[ImageData] = []
    error: Optional[str] = None

9. 🚀 FastAPI를 사용하여 백엔드 서비스 빌드

다음으로, 에이전트 런타임을 실행하기 위해 다른 구성 요소와 함께 에이전트를 초기화할 수 있는 백엔드를 구축해야 합니다.

새 파일을 만들고 이름을 backend.py로 지정합니다.

touch backend.py

다음 코드를 복사합니다.

from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
from utils import (
    extract_attachment_ids_and_sanitize_response,
    download_image_from_gcs,
    extract_thinking_process,
    format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings

SETTINGS = get_settings()
APP_NAME = "expense_manager_app"


# Application state to hold service contexts
class AppContexts(SimpleNamespace):
    """A class to hold application contexts with attribute access"""

    session_service: InMemorySessionService = None
    artifact_service: GcsArtifactService = None
    expense_manager_agent_runner: Runner = None


# Initialize application state
app_contexts = AppContexts()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary


# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
    return app_contexts


# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)


@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest = Body(...),
    app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
    """Process chat request and get response from the agent"""

    # Prepare the user's message in ADK format and store image artifacts
    content = await format_user_request_to_adk_content_and_store_artifacts(
        request=request,
        app_name=APP_NAME,
        artifact_service=app_context.artifact_service,
    )

    final_response_text = "Agent did not produce a final response."  # Default

    # Use the session ID from the request or default if not provided
    session_id = request.session_id
    user_id = request.user_id

    # Create session if it doesn't exist
    if not await app_context.session_service.get_session(
        app_name=APP_NAME, user_id=user_id, session_id=session_id
    ):
        await app_context.session_service.create_session(
            app_name=APP_NAME, user_id=user_id, session_id=session_id
        )

    try:
        # Process the message with the agent
        # Type annotation: runner.run_async returns an AsyncIterator[Event]
        events_iterator: AsyncIterator[Event] = (
            app_context.expense_manager_agent_runner.run_async(
                user_id=user_id, session_id=session_id, new_message=content
            )
        )
        async for event in events_iterator:  # event has type Event
            # Key Concept: is_final_response() marks the concluding message for the turn
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Extract text from the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate:
                    # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break  # Stop processing events once the final response is found

        logger.info(
            "Received final response from agent", raw_final_response=final_response_text
        )

        # Extract and process any attachments and thinking process in the response
        base64_attachments = []
        sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
            final_response_text
        )
        sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

        # Download images from GCS and replace hash IDs with base64 data
        for image_hash_id in attachment_ids:
            # Download image data and get MIME type
            result = await download_image_from_gcs(
                artifact_service=app_context.artifact_service,
                image_hash=image_hash_id,
                app_name=APP_NAME,
                user_id=user_id,
                session_id=session_id,
            )
            if result:
                base64_data, mime_type = result
                base64_attachments.append(
                    ImageData(serialized_image=base64_data, mime_type=mime_type)
                )

        logger.info(
            "Processed response with attachments",
            sanitized_response=sanitized_text,
            thinking_process=thinking_process,
            attachment_ids=attachment_ids,
        )

        return ChatResponse(
            response=sanitized_text,
            thinking_process=thinking_process,
            attachments=base64_attachments,
        )

    except Exception as e:
        logger.error("Error processing chat request", error_message=str(e))
        return ChatResponse(
            response="", error=f"Error in generating response: {str(e)}"
        )


# Only run the server if this file is executed directly
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081)

그런 다음 백엔드 서비스를 실행해 볼 수 있습니다. 이전 단계에서 프런트엔드 서비스를 실행했으므로 이제 새 터미널을 열고 이 백엔드 서비스를 실행해야 합니다.

  1. 새 터미널을 만듭니다. 하단 영역의 터미널로 이동하여 '+' 버튼을 찾아 새 터미널을 만듭니다. 또는 Ctrl + Shift + C를 눌러 새 터미널을 열 수 있습니다.

235e2f9144d82803.jpeg

  1. 그런 다음 작업 디렉터리 personal-expense-assistant에 있는지 확인하고 다음 명령어를 실행합니다.
uv run backend.py
  1. 성공하면 다음과 같은 출력이 표시됩니다.
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

코드 설명

ADK 에이전트, SessionService 및 ArtifactService 초기화

백엔드 서비스에서 에이전트를 실행하려면 SessionService와 에이전트를 모두 사용하는 Runner를 만들어야 합니다. SessionService는 대화 기록과 상태를 관리하므로 Runner와 통합되면 에이전트가 진행 중인 대화 컨텍스트를 수신할 수 있습니다.

업로드된 파일을 처리하기 위해 ArtifactService도 활용합니다. ADK 세션아티팩트에 관한 자세한 내용은 여기를 참고하세요.

...

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary

...

이 데모에서는 InMemorySessionServiceGcsArtifactService를 사용하여 에이전트 Runner와 통합합니다. 대화 기록은 메모리에 저장되므로 백엔드 서비스가 종료되거나 다시 시작되면 손실됩니다. FastAPI 애플리케이션 수명 주기 내에서 이를 초기화하여 /chat 경로에 종속 항목으로 삽입합니다.

GcsArtifactService를 사용하여 이미지 업로드 및 다운로드

업로드된 모든 이미지는 GcsArtifactService에 의해 아티팩트로 저장됩니다. 이는 utils.py 내부의 format_user_request_to_adk_content_and_store_artifacts 함수에서 확인할 수 있습니다.

...    

# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
    format_user_request_to_adk_content_and_store_artifacts,
    request=request,
    app_name=APP_NAME,
    artifact_service=app_context.artifact_service,
)

...

에이전트 러너에서 처리할 모든 요청은 types.Content 유형으로 포맷해야 합니다. 함수 내에서 각 이미지 데이터를 처리하고 이미지 ID 자리표시자로 대체할 ID를 추출합니다.

정규식을 사용하여 이미지 ID를 추출한 후 첨부 파일을 다운로드하는 데에도 비슷한 메커니즘이 사용됩니다.

...
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
    final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
    # Download image data and get MIME type
    result = await asyncio.to_thread(
        download_image_from_gcs,
        artifact_service=app_context.artifact_service,
        image_hash=image_hash_id,
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id,
    )
...

10. 🚀 통합 테스트

이제 여러 클라우드 콘솔 탭에서 여러 서비스를 실행할 수 있습니다.

  • 포트 8080에서 실행되는 프런트엔드 서비스
* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.
  • 포트 8081에서 실행되는 백엔드 서비스
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

현재 상태에서는 영수증 이미지를 업로드하고 포트 8080의 웹 애플리케이션에서 어시스턴트와 원활하게 채팅할 수 있습니다.

Cloud Shell 편집기 상단 영역에 있는 웹 미리보기 버튼을 클릭하고 포트 8080에서 미리보기를 선택합니다.

edc73e971b9fc60c.png

이제 도우미와 상호작용을 해보세요!

다음 영수증을 다운로드하세요. 이 영수증 데이터 날짜 범위는 2023-2024년 사이이며 보조자에게 저장/업로드를 요청하세요.

  • 영수증 Drive ( 소스 Hugging Face 데이터 세트 mousserlane/id_receipt_dataset)

다양한 것을 물어보세요

  • "2023~2024년 월별 지출 내역을 알려주세요"
  • "커피 거래 영수증 보여주세요"
  • "야키니쿠 라이크 영수증 파일 주세요"

성공적인 상호작용의 일부 내용은 다음과 같습니다.

e01dc7a8ec673aa4.png

9341212f8d54c98a.png

11. 🚀 Cloud Run에 배포

물론, 우리는 이 놀라운 앱에 어디서든 접속하고 싶습니다. 이를 위해 이 애플리케이션을 패키징하여 Cloud Run에 배포할 수 있습니다. 이 데모에서는 이 서비스가 다른 사용자가 액세스할 수 있는 공개 서비스로 노출됩니다. 그러나 이것은 개인용 애플리케이션에 더 적합하므로 이러한 종류의 애플리케이션에 대한 모범 사례가 아니라는 점을 명심하십시오.

90805d85052a5e5a.jpeg

이 Codelab에서는 프런트엔드와 백엔드 서비스를 모두 하나의 컨테이너에 넣습니다. 두 서비스를 모두 관리하려면 supervisord의 도움이 필요합니다. supervisord.conf 파일을 검사하고 Dockerfile에서 supervisord를 진입점으로 설정했는지 확인할 수 있습니다.

이제 Cloud Run에 애플리케이션을 배포하는 데 필요한 모든 파일이 준비되었으므로 배포해 보겠습니다. Cloud Shell 터미널로 이동하여 현재 프로젝트가 활성 프로젝트로 구성되어 있는지 확인합니다. 그렇지 않은 경우 gcloud configure 명령어를 사용하여 프로젝트 ID를 설정해야 합니다.

gcloud config set project [PROJECT_ID]

그런 다음 다음 명령어를 실행하여 Cloud Run에 배포합니다.

gcloud run deploy personal-expense-assistant \
                  --source . \
                  --port=8080 \
                  --allow-unauthenticated \
                  --env-vars-file=settings.yaml \
                  --memory 1024Mi \
                  --region us-central1

Docker 저장소용 Artifact Registry 생성을 확인하라는 메시지가 표시되면 Y를 입력합니다. 데모 애플리케이션이기 때문에 인증되지 않은 액세스를 허용하고 있습니다. 엔터프라이즈 및 프로덕션 애플리케이션에 적절한 인증을 사용하는 것이 좋습니다.

배포가 완료되면 다음과 비슷한 링크가 표시됩니다.

https://personal-expense-assistant-*******.us-central1.run.app

시크릿 모드 창이나 휴대기기에서 애플리케이션을 사용하세요. 이미 게시되었을 것입니다.

12. 🎯 챌린지

이제 여러분의 탐험 기술을 빛내고 다듬을 시간입니다. 백엔드가 여러 사용자를 수용할 수 있도록 코드를 변경할 능력이 있나요? 어떤 구성요소를 업데이트해야 합니까?

13. 🧹 청소하다

이 코드랩에서 사용된 리소스에 대해 Google Cloud 계정에 요금이 청구되지 않도록 하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 리소스 관리 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력하고 종료를 클릭하여 프로젝트를 삭제합니다.
  4. 또는 콘솔에서 Cloud Run으로 이동하여 방금 배포한 서비스를 선택하고 삭제할 수도 있습니다.