Przejście na tryb multimodalny dzięki pakietowi dla programistów: asystent wydatków osobistych z Gemini 2.5, Firestore i Cloud Run

1. 📖 Wprowadzenie

db9331886978d543.png

Czy kiedykolwiek frustrowało Cię zarządzanie wszystkimi wydatkami osobistymi? Ja też! Dlatego w tym laboratorium kodowania stworzymy osobistego asystenta do zarządzania wydatkami, który będzie oparty na Gemini 2.5 i wykona za nas wszystkie zadania. Od zarządzania przesłanymi paragonami po analizowanie, czy nie wydajesz już za dużo na kawę.

Ten asystent będzie dostępny w przeglądarce internetowej w formie interfejsu czatu, w którym możesz się z nim komunikować, przesyłać zdjęcia paragonów i prosić go o ich przechowywanie lub wyszukiwać paragony, aby uzyskać plik i przeprowadzić analizę wydatków. Wszystko to jest oparte na platformie Google Agent Development Kit.

Sama aplikacja jest podzielona na dwie usługi: front-end i back-end. Dzięki temu można szybko zbudować prototyp i sprawdzić, jak działa, a także zrozumieć, w jaki sposób wygląda kontrakt API integrujący obie usługi.

W ramach ćwiczeń z programowania będziesz wykonywać kolejne czynności:

  1. Przygotowywanie projektu Google Cloud i włączanie w nim wszystkich wymaganych interfejsów API
  2. Skonfiguruj kontener w Google Cloud Storage i bazę danych w Firestore
  3. Utwórz indeksowanie Firestore
  4. Skonfiguruj obszar roboczy dla swojego środowiska kodowania
  5. Strukturyzacja kodu źródłowego agenta ADK, narzędzi, monitu itp.
  6. Testowanie agenta przy użyciu lokalnego interfejsu użytkownika narzędzia do tworzenia stron internetowych ADK
  7. Zbuduj usługę front-endową – interfejs czatu przy użyciu biblioteki Gradio, aby wysłać zapytanie i przesłać obrazy paragonów
  8. Zbuduj usługę zaplecza – serwer HTTP przy użyciu FastAPI, w którym znajduje się kod naszego agenta ADK, SessionService i Artifact Service
  9. Zarządzaj zmiennymi środowiskowymi i konfiguruj wymagane pliki potrzebne do wdrożenia aplikacji w usłudze Cloud Run
  10. Wdróż aplikację w Cloud Run

Omówienie architektury

90805d85052a5e5a.jpeg

Wymagania wstępne

  • Komfortowa praca z Pythonem
  • Znajomość podstawowej architektury pełnego stosu z użyciem usługi HTTP

Czego się nauczysz

  • Tworzenie prototypów stron internetowych za pomocą Gradio
  • Rozwój usług zaplecza z wykorzystaniem FastAPI i Pydantic
  • Projektowanie agenta ADK z wykorzystaniem jego wielu możliwości
  • Korzystanie z narzędzia
  • Zarządzanie sesjami i artefaktami
  • Wykorzystanie wywołania zwrotnego do modyfikowania danych wejściowych przed wysłaniem ich do Gemini
  • Wykorzystanie BuiltInPlanner do usprawnienia realizacji zadań poprzez planowanie
  • Szybkie debugowanie za pomocą lokalnego interfejsu internetowego ADK
  • Strategia optymalizacji interakcji multimodalnej poprzez analizę i wyszukiwanie informacji za pomocą szybkiej inżynierii i modyfikacji żądań Gemini przy użyciu wywołania zwrotnego ADK
  • Rozszerzona generacja pobierania agentów z wykorzystaniem Firestore jako bazy danych wektorowych
  • Zarządzanie zmiennymi środowiskowymi w pliku YAML za pomocą Pydantic-settings
  • Wdróż aplikację w Cloud Run za pomocą pliku Dockerfile i podaj zmienne środowiskowe za pomocą pliku YAML

Czego potrzebujesz

  • Przeglądarka internetowa Chrome
  • Konto Gmail
  • Projekt Cloud z włączonymi płatnościami

Ten przewodnik, przeznaczony dla deweloperów na wszystkich poziomach zaawansowania (w tym dla początkujących), wykorzystuje w przykładowej aplikacji język Python. Znajomość języka Python nie jest jednak konieczna do zrozumienia przedstawionych koncepcji.

2. 🚀 Zanim zaczniesz

Wybierz aktywny projekt w konsoli Cloud Console

W tym laboratorium zakładamy, że masz już projekt Google Cloud z włączonym rozliczaniem. Jeśli jeszcze go nie masz, możesz zacząć korzystać z poniższych instrukcji.

  1. W Konsoli Google Cloud, na stronie wyboru projektu, wybierz lub utwórz projekt Google Cloud.
  2. Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.

fcdd90149a030bf5.png

Przygotuj bazę danych Firestore

Następnie będziemy musieli utworzyć bazę danych Firestore. Firestore w trybie natywnym to baza danych dokumentów NoSQL zaprojektowana z myślą o automatycznym skalowaniu, wysokiej wydajności i łatwości tworzenia aplikacji. Może też pełnić funkcję bazy danych wektorowych, która obsługuje technikę generowania z wyszukiwaniem w naszym laboratorium.

  1. Wyszukaj „firestore” na pasku wyszukiwania i kliknij produkt Firestore

44bbce791824łóżko6.png

  1. Następnie kliknij przycisk Utwórz bazę danych Firestore
  2. Użyj (domyślnej) jako nazwy identyfikatora bazy danych i zachowaj wybraną opcję Standard Edition. Na potrzeby tej demonstracji laboratoryjnej użyj reguł zabezpieczeń Firestore Native z Open.
  1. Zauważysz również, że ta baza danych faktycznie ma opcję YEAY! – Wykorzystania bezpłatnej warstwy. Następnie kliknij przycisk Utwórz bazę danych

b97d210c465be94c.png

Po wykonaniu tych kroków powinieneś zostać przekierowany do bazy danych Firestore, którą właśnie utworzyłeś

Konfigurowanie projektu w chmurze w terminalu Cloud Shell

  1. Będziesz korzystać z Cloud Shell, środowiska wiersza poleceń działającego w Google Cloud, w którym wstępnie załadowano bq. U góry konsoli Google Cloud kliknij Aktywuj Cloud Shell.

26f20e837ff06119.png

  1. Po połączeniu z Cloud Shell sprawdź, czy dokonano uwierzytelnienia i czy projekt jest ustawiony na Twój identyfikator projektu, korzystając z następującego polecenia:
gcloud auth list
  1. Uruchom następujące polecenie w Cloud Shell, aby sprawdzić, czy polecenie gcloud rozpoznaje Twój projekt.
gcloud config list project
  1. Jeśli projekt nie jest ustawiony, użyj tego polecenia, aby go ustawić:
gcloud config set project <YOUR_PROJECT_ID>

Alternatywnie możesz również zobaczyć identyfikator PROJECT_ID w konsoli

bb98435b79995b15.jpeg

Kliknij go, a po prawej stronie zobaczysz wszystkie projekty i identyfikator projektu.

ffa73dee57de5307.jpeg

  1. Włącz wymagane interfejsy API za pomocą polecenia pokazanego poniżej. Może to potrwać kilka minut, prosimy o cierpliwość.
gcloud services enable aiplatform.googleapis.com \
                       firestore.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudresourcemanager.googleapis.com

Po pomyślnym wykonaniu polecenia powinieneś zobaczyć komunikat podobny do pokazanego poniżej:

Operation "operations/..." finished successfully.

Alternatywą dla polecenia gcloud jest wyszukanie poszczególnych usług w konsoli lub skorzystanie z tego linku.

Jeśli pominiesz jakiekolwiek API, możesz je zawsze włączyć w trakcie implementacji.

Aby zapoznać się z poleceniami gcloud i sposobem ich użycia, zapoznaj się z dokumentacją.

Przygotuj kontener Google Cloud Storage

Następnie w tym samym terminalu musimy przygotować kontener GCS do przechowywania przesłanego pliku. Uruchom poniższe polecenie, aby utworzyć kontener. Będzie potrzebna unikalna, ale istotna nazwa kontenera, odnosząca się do paragonów asystenta wydatków osobistych, dlatego wykorzystamy następującą nazwę kontenera połączoną z identyfikatorem projektu

gsutil mb -l us-central1 gs://personal-expense-{your-project-id}

Pokaże ten wynik

Creating gs://personal-expense-{your-project-id}

Możesz to sprawdzić, przechodząc do menu nawigacyjnego w lewym górnym rogu przeglądarki i wybierając Pamięć masowa w chmurze -> Pojemnik

7b9fd51982d351fa.png

Firestore jest natywną bazą danych NoSQL, która zapewnia lepszą wydajność i elastyczność modelu danych, ale ma ograniczenia, jeśli chodzi o złożone zapytania. Ponieważ planujemy wykorzystać złożone zapytania wielopolowe i wyszukiwanie wektorowe, najpierw musimy utworzyć indeks. Więcej szczegółów znajdziesz w tej dokumentacji

  1. Uruchom następujące polecenie, aby utworzyć indeks obsługujący zapytania złożone
gcloud firestore indexes composite create \
        --collection-group=personal-expense-assistant-receipts \
        --field-config field-path=total_amount,order=ASCENDING \
        --field-config field-path=transaction_time,order=ASCENDING \
        --field-config field-path=__name__,order=ASCENDING \
        --database="(default)"
  1. I uruchom ten, aby obsługiwać wyszukiwanie wektorowe
gcloud firestore indexes composite create \
        --collection-group="personal-expense-assistant-receipts" \
        --query-scope=COLLECTION \
        --field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
        --database="(default)"

Możesz sprawdzić utworzony indeks, odwiedzając Firestore w konsoli w chmurze i klikając instancję bazy danych (domyślna), a następnie wybierając Indeksy na pasku nawigacyjnym

9849724dd55dfab7.png

Przejdź do edytora Cloud Shell i skonfiguruj katalog roboczy aplikacji

Teraz możemy skonfigurować edytor kodu tak, aby wykonywał czynności kodowania. W tym celu użyjemy edytora Cloud Shell

  1. Kliknij przycisk Otwórz edytor, co spowoduje otwarcie edytora Cloud Shell, w którym możemy wpisać nasz kod tutaj 168eacea651b086c.png
  2. Następnie musimy sprawdzić, czy powłoka jest już skonfigurowana dla prawidłowego IDENTYFIKATORA PROJEKTU. Jeśli widzisz wartość wewnątrz ( ) przed ikoną $ w terminalu ( na poniższym zrzucie ekranu wartością jest "adk-multimodal-tool"), ta wartość pokazuje skonfigurowany projekt dla aktywnej sesji powłoki.

10a99ff80839b635.png

Jeśli wyświetlona wartość jest już poprawna, możesz pominąć następne polecenie. Jeśli jednak jest to nieprawidłowe lub brakuje tego, uruchom następujące polecenie

gcloud config set project <YOUR_PROJECT_ID>
  1. Następnie sklonujmy katalog roboczy szablonu dla tego codelabu z Github i uruchommy następujące polecenie. Utworzy katalog roboczy w katalogu personal-expense-assistant
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
  1. Następnie przejdź do górnej sekcji edytora Cloud Shell i kliknij Plik –> Otwórz folder. Znajdź katalog nazwa_użytkownika, a w nim katalog personal-expense-assistant. Potem kliknij przycisk OK. Spowoduje to ustawienie wybranego katalogu jako głównego katalogu roboczego. W tym przykładzie nazwa użytkownika to alvinprayuda, więc ścieżka do katalogu jest widoczna poniżej.

c87d2b76896d0c59.png

524b9e6369f68cca.png

Edytor Cloud Shell powinien teraz wyglądać tak:

9a58ccc43f48338d.png

Konfiguracja środowiska

Przygotowywanie wirtualnego środowiska Pythona

Następnym krokiem jest przygotowanie środowiska programistycznego. Aktywny terminal powinien znajdować się w katalogu roboczym personal-expense-assistant. W tym samouczku użyjemy Pythona 3.12 i menedżera projektów Pythona uv, aby uprościć tworzenie i zarządzanie wersją Pythona oraz środowiskiem wirtualnym.

  1. Jeśli jeszcze nie otworzyłeś terminala, otwórz go , klikając Terminal -> Nowy terminal lub użyj kombinacji klawiszy Ctrl + Shift + C , co spowoduje otwarcie okna terminala w dolnej części przeglądarki.

8635b60ae2f45bbc.jpeg

  1. Teraz zainicjuj środowisko wirtualne za pomocą polecenia uv. Uruchom te polecenia:
cd ~/personal-expense-assistant
uv sync --frozen

Spowoduje to utworzenie katalogu .venv i zainstalowanie zależności. Szybki podgląd pliku pyproject.toml pozwoli Ci uzyskać informacje o zależnościach, które będą wyświetlane w ten sposób:

dependencies = [
    "datasets>=3.5.0",
    "google-adk==1.18",
    "google-cloud-firestore>=2.20.1",
    "gradio>=5.23.1",
    "pydantic>=2.10.6",
    "pydantic-settings[yaml]>=2.8.1",
]

Pliki konfiguracji konfiguracji

Teraz musimy skonfigurować pliki konfiguracji dla tego projektu. Do odczytywania konfiguracji z pliku YAML używamy biblioteki pydantic-settings.

Podaliśmy już szablon pliku w pliku settings.yaml.example , musimy skopiować plik i zmienić jego nazwę na settings.yaml. Uruchom to polecenie, aby utworzyć plik

cp settings.yaml.example settings.yaml

Następnie skopiuj następującą wartość do pliku

GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your-project-id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-{your-project-id}"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"

W tym samouczku użyjemy wstępnie skonfigurowanych wartości GCLOUD_LOCATION,, BACKEND_URL,DB_COLLECTION_NAME .

Możemy teraz przejść do następnego kroku, czyli utworzenia agenta, a potem usług.

3. 🚀 Tworzenie agenta za pomocą pakietu ADK od Google i Gemini 2.5

Wprowadzenie do struktury katalogów ADK

Zacznijmy od poznania możliwości ADK i sposobu tworzenia agenta. Pełną dokumentację pakietu ADK znajdziesz pod tym adresem URL . ADK oferuje wiele narzędzi w ramach wykonywania poleceń CLI. Oto niektóre z nich :

  • Konfigurowanie struktury katalogów agenta
  • Szybkie wypróbowanie interakcji za pomocą danych wejściowych i wyjściowych interfejsu wiersza poleceń
  • Szybkie konfigurowanie lokalnego interfejsu internetowego

Teraz utwórzmy strukturę katalogów agenta za pomocą polecenia CLI. Uruchom podane niżej polecenie.

uv run adk create expense_manager_agent

Gdy pojawi się pytanie, wybierz model gemini-2.5-flash i zaplecze Vertex AI. Następnie kreator zapyta o identyfikator projektu i lokalizację. Możesz zaakceptować domyślne opcje, naciskając Enter, lub zmienić je według potrzeb. Sprawdź tylko, czy używasz prawidłowego identyfikatora projektu utworzonego wcześniej w tym ćwiczeniu. Wynik będzie wyglądał następująco:

Choose a model for the root agent:
1. gemini-2.5-flash
2. Other models (fill later)
Choose model (1, 2): 1
1. Google AI
2. Vertex AI
Choose a backend (1, 2): 2

You need an existing Google Cloud account and project, check out this link for details:
https://google.github.io/adk-docs/get-started/quickstart/#gemini---google-cloud-vertex-ai

Enter Google Cloud project ID [going-multimodal-lab]: 
Enter Google Cloud region [us-central1]: 

Agent created in /home/username/personal-expense-assistant/expense_manager_agent:
- .env
- __init__.py
- agent.py

Utworzy następującą strukturę katalogów agenta:

expense_manager_agent/
├── __init__.py
├── .env
├── agent.py

Jeśli sprawdzisz pliki init.pyagent.py, zobaczysz ten kod:

# __init__.py

from . import agent
# agent.py

from google.adk.agents import Agent

root_agent = Agent(
    model='gemini-2.5-flash',
    name='root_agent',
    description='A helpful assistant for user questions.',
    instruction='Answer user questions to the best of your knowledge',
)

Możesz teraz przetestować tę funkcję, uruchamiając

uv run adk run expense_manager_agent

Po zakończeniu testów możesz zamknąć agenta, wpisując exit lub naciskając Ctrl+D.

Budowanie naszego agenta ds. zarządzania wydatkami

Stwórzmy naszego agenta zarządzającego wydatkami! Otwórz plik expense_manager_agent/agent.py i skopiuj poniższy kod, który będzie zawierać root_agent.

# expense_manager_agent/agent.py

from google.adk.agents import Agent
from expense_manager_agent.tools import (
    store_receipt_data,
    search_receipts_by_metadata_filter,
    search_relevant_receipts_by_natural_language_query,
    get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types

SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"

# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
    task_prompt = file.read()

root_agent = Agent(
    name="expense_manager_agent",
    model="gemini-2.5-flash",
    description=(
        "Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
    ),
    instruction=task_prompt,
    tools=[
        store_receipt_data,
        get_receipt_data_by_image_id,
        search_receipts_by_metadata_filter,
        search_relevant_receipts_by_natural_language_query,
    ],
    planner=BuiltInPlanner(
        thinking_config=types.ThinkingConfig(
            thinking_budget=2048,
        )
    ),
    before_model_callback=modify_image_data_in_history,
)

Wyjaśnienie kodu

Ten skrypt zawiera inicjację agenta, w której inicjujemy te elementy:

  • Ustaw model, który ma być używany, na gemini-2.5-flash
  • Skonfiguruj opis agenta i instrukcję jako monit systemowy odczytywany z task_prompt.md
  • Zapewnij niezbędne narzędzia wspierające funkcjonalność agenta
  • Umożliwia planowanie przed wygenerowaniem ostatecznej odpowiedzi lub wykonaniem, korzystając z możliwości myślenia Gemini 2.5 Flash
  • Skonfiguruj przechwytywanie wywołania zwrotnego przed wysłaniem żądania do Gemini, aby ograniczyć liczbę wysyłanych danych obrazu przed wykonaniem prognozy

4. 🚀 Konfigurowanie narzędzi agenta

Nasz agent ds. zarządzania wydatkami będzie miał następujące możliwości:

  • Wyodrębnij dane z obrazu paragonu i zapisz dane oraz plik
  • Dokładne wyszukiwanie danych dotyczących wydatków
  • Kontekstowe wyszukiwanie danych dotyczących wydatków

Dlatego potrzebujemy odpowiednich narzędzi, które będą wspierać tę funkcjonalność. Utwórz nowy plik w katalogu expense_manager_agent i nazwij go tools.py

touch expense_manager_agent/tools.py

Otwórz plik expense_manage_agent/tools.py, a następnie skopiuj poniższy kod.

# expense_manager_agent/tools.py

import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai

SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
    project=SETTINGS.GCLOUD_PROJECT_ID
)  # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
    vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""


def sanitize_image_id(image_id: str) -> str:
    """Sanitize image ID by removing any leading/trailing whitespace."""
    if image_id.startswith("[IMAGE-"):
        image_id = image_id.split("ID ")[1].split("]")[0]

    return image_id.strip()


def store_receipt_data(
    image_id: str,
    store_name: str,
    transaction_time: str,
    total_amount: float,
    purchased_items: List[Dict[str, Any]],
    currency: str = "IDR",
) -> str:
    """
    Store receipt data in the database.

    Args:
        image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
            the ID of the image is 12345.
        store_name (str): The name of the store.
        transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
        total_amount (float): The total amount spent.
        purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
            - name (str): The name of the item.
            - price (float): The price of the item.
            - quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
        currency (str, optional): The currency of the transaction, can be derived from the store location.
            If unsure, default is "IDR".

    Returns:
        str: A success message with the receipt ID.

    Raises:
        Exception: If the operation failed or input is invalid.
    """
    try:
        # In case of it provide full image placeholder, extract the id string
        image_id = sanitize_image_id(image_id)

        # Check if the receipt already exists
        doc = get_receipt_data_by_image_id(image_id)

        if doc:
            return f"Receipt with ID {image_id} already exists"

        # Validate transaction time
        if not isinstance(transaction_time, str):
            raise ValueError(
                "Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )
        try:
            datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError(
                "Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )

        # Validate items format
        if not isinstance(purchased_items, list):
            raise ValueError(INVALID_ITEMS_FORMAT_ERR)

        for _item in purchased_items:
            if (
                not isinstance(_item, dict)
                or "name" not in _item
                or "price" not in _item
            ):
                raise ValueError(INVALID_ITEMS_FORMAT_ERR)

            if "quantity" not in _item:
                _item["quantity"] = 1

        # Create a combined text from all receipt information for better embedding
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004",
            contents=RECEIPT_DESC_FORMAT.format(
                store_name=store_name,
                transaction_time=transaction_time,
                total_amount=total_amount,
                currency=currency,
                purchased_items=purchased_items,
                receipt_id=image_id,
            ),
        )

        embedding = result.embeddings[0].values

        doc = {
            "receipt_id": image_id,
            "store_name": store_name,
            "transaction_time": transaction_time,
            "total_amount": total_amount,
            "currency": currency,
            "purchased_items": purchased_items,
            EMBEDDING_FIELD_NAME: Vector(embedding),
        }

        COLLECTION.add(doc)

        return f"Receipt stored successfully with ID: {image_id}"
    except Exception as e:
        raise Exception(f"Failed to store receipt: {str(e)}")


def search_receipts_by_metadata_filter(
    start_time: str,
    end_time: str,
    min_total_amount: float = -1.0,
    max_total_amount: float = -1.0,
) -> str:
    """
    Filter receipts by metadata within a specific time range and optionally by amount.

    Args:
        start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
        max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.

    Returns:
        str: A string containing the list of receipt data matching all applied filters.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Validate start and end times
        if not isinstance(start_time, str) or not isinstance(end_time, str):
            raise ValueError("start_time and end_time must be strings in ISO format")
        try:
            datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
            datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError("start_time and end_time must be strings in ISO format")

        # Start with the base collection reference
        query = COLLECTION

        # Build the composite query by properly chaining conditions
        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        filters = [
            FieldFilter("transaction_time", ">=", start_time),
            FieldFilter("transaction_time", "<=", end_time),
        ]

        # Add optional filters
        if min_total_amount != -1:
            filters.append(FieldFilter("total_amount", ">=", min_total_amount))

        if max_total_amount != -1:
            filters.append(FieldFilter("total_amount", "<=", max_total_amount))

        # Apply the filters
        composite_filter = And(filters=filters)
        query = query.where(filter=composite_filter)

        # Execute the query and collect results
        search_result_description = "Search by Metadata Results:\n"
        for doc in query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display

            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error filtering receipts: {str(e)}")


def search_relevant_receipts_by_natural_language_query(
    query_text: str, limit: int = 5
) -> str:
    """
    Search for receipts with content most similar to the query using vector search.
    This tool can be use for user query that is difficult to translate into metadata filters.
    Such as store name or item name which sensitive to string matching.
    Use this tool if you cannot utilize the search by metadata filter tool.

    Args:
        query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
        limit (int, optional): Maximum number of results to return (default: 5).

    Returns:
        str: A string containing the list of contextually relevant receipt data.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Generate embedding for the query text
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004", contents=query_text
        )
        query_embedding = result.embeddings[0].values

        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        vector_query = COLLECTION.find_nearest(
            vector_field=EMBEDDING_FIELD_NAME,
            query_vector=Vector(query_embedding),
            distance_measure=DistanceMeasure.EUCLIDEAN,
            limit=limit,
        )

        # Execute the query and collect results
        search_result_description = "Search by Contextual Relevance Results:\n"
        for doc in vector_query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display
            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error searching receipts: {str(e)}")


def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
    """
    Retrieve receipt data from the database using the image_id.

    Args:
        image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
            [IMAGE-ID 12345], the ID to use is 12345.

    Returns:
        Dict[str, Any]: A dictionary containing the receipt data with the following keys:
            - receipt_id (str): The unique identifier of the receipt image.
            - store_name (str): The name of the store.
            - transaction_time (str): The time of purchase in UTC.
            - total_amount (float): The total amount spent.
            - currency (str): The currency of the transaction.
            - purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
        Returns an empty dictionary if no receipt is found.
    """
    # In case of it provide full image placeholder, extract the id string
    image_id = sanitize_image_id(image_id)

    # Query the receipts collection for documents with matching receipt_id (image_id)
    # Notes that this demo assume 1 user only,
    # need to refactor the query for multiple user
    query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
    docs = list(query.stream())

    if not docs:
        return {}

    # Get the first matching document
    doc_data = docs[0].to_dict()
    doc_data.pop(EMBEDDING_FIELD_NAME, None)

    return doc_data

Wyjaśnienie kodu

W implementacji funkcji tego narzędzia projektujemy je wokół tych dwóch głównych idei:

  • Przeanalizuj dane potwierdzenia i mapowanie do oryginalnego pliku, używając symbolu zastępczego ciągu identyfikatora obrazu [IMAGE-ID <hash-of-image-1>]
  • Przechowywanie i pobieranie danych za pomocą bazy danych Firestore

Narzędzie „store_receipt_data”

747fb55e801455f4.png

Narzędzie to służy do optycznego rozpoznawania znaków i analizuje wymagane informacje z danych obrazu, rozpoznaje ciąg znaków identyfikacyjnych obrazu i mapuje je w celu zapisania w bazie danych Firestore.

Ponadto narzędzie to konwertuje również zawartość paragonu na osadzony element za pomocą text-embedding-004, dzięki czemu wszystkie metadane i osadzony element są przechowywane i indeksowane razem. Umożliwia elastyczne wyszukiwanie za pomocą zapytań lub kontekstowego wyszukiwania.

Po pomyślnym uruchomieniu tego narzędzia możesz zobaczyć, że dane dotyczące paragonu są już zaindeksowane w bazie danych Firestore, jak pokazano poniżej

636d56be9880f3c7.png

Narzędzie „search_receipts_by_metadata_filter”

6d8fbd9b43ff7ea7.png

To narzędzie konwertuje zapytanie użytkownika na filtr zapytań metadanych, który obsługuje wyszukiwanie według zakresu dat i/lub łącznej liczby transakcji. Zwróci wszystkie pasujące dane z paragonu, a w trakcie tego procesu usuniemy pole osadzania, ponieważ nie jest ono potrzebne agentowi do zrozumienia kontekstu.

Narzędzie „wyszukaj_odpowiednie_paragony_w_języku_naturalnym”

7262c75114af0060.png

To nasze narzędzie do generowania wspomaganego wyszukiwaniem (RAG). Nasz agent może samodzielnie tworzyć zapytania, aby pobierać z bazy wektorowej odpowiednie rachunki, i samodzielnie decydować, kiedy używać tego narzędzia. Koncepcja umożliwienia agentowi samodzielnego podejmowania decyzji o tym, czy użyje tego narzędzia RAG, i opracowania własnego zapytania jest jedną z definicji podejścia Agentic RAG.

Umożliwiamy mu nie tylko tworzenie własnego zapytania, ale także wybór liczby odpowiednich dokumentów, które chce pobrać. W połączeniu z odpowiednią szybką inżynierią, np.

# Example prompt

Always filter the result from tool
search_relevant_receipts_by_natural_language_query as the returned 
result may contain irrelevant information

Dzięki temu narzędzie będzie bardzo przydatne i umożliwi wyszukiwanie niemal wszystkiego, ale ze względu na nieprecyzyjny charakter wyszukiwania najbliższego sąsiada może nie zwracać wszystkich oczekiwanych wyników.

5. 🚀 Modyfikowanie kontekstu rozmowy za pomocą wywołań zwrotnych

Google ADK umożliwia nam „przechwytywanie” czasu działania agenta na różnych poziomach. Więcej informacji o tej funkcji znajdziesz w tej dokumentacji . W tym module wykorzystujemy before_model_callback, aby zmodyfikować żądanie przed wysłaniem go do LLM w celu usunięcia danych obrazu ze starego kontekstu historii rozmów ( uwzględniamy tylko dane obrazu z 3 ostatnich interakcji użytkownika) dla większej wydajności.

Chcemy jednak, aby w razie potrzeby agent miał kontekst danych obrazu. Dlatego dodajemy mechanizm, który po każdym bajcie danych obrazu w rozmowie dodaje ciąg znaków będący identyfikatorem obrazu. Pomoże to agentowi powiązać identyfikator obrazu z rzeczywistymi danymi pliku, które można wykorzystać zarówno podczas przechowywania, jak i pobierania obrazu. Struktura będzie wyglądać tak:

<image-byte-data-1>
[IMAGE-ID <hash-of-image-1>]
<image-byte-data-2>
[IMAGE-ID <hash-of-image-2>]
And so on..

Gdy dane w postaci bajtów w historii rozmów staną się nieaktualne, identyfikator w postaci ciągu znaków nadal będzie umożliwiał dostęp do danych za pomocą narzędzia. Przykładowa struktura historii po usunięciu danych obrazu

[IMAGE-ID <hash-of-image-1>]
[IMAGE-ID <hash-of-image-2>]
And so on..

Rozpocznij Utwórz nowy plik w katalogu expense_manager_agent i nadaj mu nazwę callbacks.py.

touch expense_manager_agent/callbacks.py

Otwórz plik expense_manager_agent/callbacks.py, a następnie skopiuj poniższy kod.

# expense_manager_agent/callbacks.py

import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest


def modify_image_data_in_history(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
    # The following code will modify the request sent to LLM
    # We will only keep image data in the last 3 user messages using a reverse and counter approach

    # Count how many user messages we've processed
    user_message_count = 0

    # Process the reversed list
    for content in reversed(llm_request.contents):
        # Only count for user manual query, not function call
        if (content.role == "user") and (content.parts[0].function_response is None):
            user_message_count += 1
            modified_content_parts = []

            # Check any missing image ID placeholder for any image data
            # Then remove image data from conversation history if more than 3 user messages
            for idx, part in enumerate(content.parts):
                if part.inline_data is None:
                    modified_content_parts.append(part)
                    continue

                if (
                    (idx + 1 >= len(content.parts))
                    or (content.parts[idx + 1].text is None)
                    or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
                ):
                    # Generate hash ID for the image and add a placeholder
                    image_data = part.inline_data.data
                    hasher = hashlib.sha256(image_data)
                    image_hash_id = hasher.hexdigest()[:12]
                    placeholder = f"[IMAGE-ID {image_hash_id}]"

                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

                    modified_content_parts.append(types.Part(text=placeholder))

                else:
                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

            # This will modify the contents inside the llm_request
            content.parts = modified_content_parts

6. 🚀 Prompt

Projektowanie agenta z zaawansowanymi interakcjami i możliwościami wymaga znalezienia odpowiedniego promptu, który będzie nim sterować, aby zachowywał się w pożądany sposób.

Wcześniej mieliśmy mechanizm obsługi danych obrazów w historii rozmów, a także narzędzia, które mogły być trudne w użyciu, np. search_relevant_receipts_by_natural_language_query. Chcemy też, aby agent mógł wyszukiwać i pobierać odpowiedni obraz paragonu. Oznacza to, że musimy przekazać wszystkie te informacje w odpowiedniej strukturze promptu.

Poprosimy agenta o ustrukturyzowanie danych wyjściowych w następującym formacie Markdown, aby przeanalizować proces myślenia, ostateczną odpowiedź i załącznik ( jeśli taki istnieje).

# THINKING PROCESS

Thinking process here

# FINAL RESPONSE

Response to the user here

Attachments put inside json block

{
    "attachments": [
      "[IMAGE-ID <hash-id-1>]",
      "[IMAGE-ID <hash-id-2>]",
      ...
    ]
}

Zacznijmy od tego prompta, aby osiągnąć początkowe oczekiwania dotyczące działania agenta do zarządzania wydatkami. Plik task_prompt.md powinien już znajdować się w naszym bieżącym katalogu roboczym, ale musimy go przenieść do katalogu expense_manager_agent. Aby go przenieść, uruchom to polecenie:

mv task_prompt.md expense_manager_agent/task_prompt.md

7. 🚀 Testowanie agenta

Teraz spróbujmy komunikować się z agentem za pomocą interfejsu wiersza poleceń. Uruchom to polecenie:

uv run adk run expense_manager_agent

Wyświetli się taki wynik, w którym możesz porozmawiać z agentem, jednak za pośrednictwem tego interfejsu możesz wysyłać tylko wiadomości tekstowe

Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root_agent, type exit to exit.
user: hello
[root_agent]: Hello there! How can I help you today?
user: 

Teraz, oprócz interakcji z CLI, ADK pozwala nam również na korzystanie z interfejsu użytkownika dla programistów, który umożliwia interakcję i sprawdzanie, co się dzieje podczas interakcji. Aby uruchomić lokalny serwer interfejsu programistycznego, wykonaj to polecenie:

uv run adk web --port 8080

Wygeneruje to dane wyjściowe podobne do tych w przykładzie poniżej, co oznacza, że możemy już uzyskać dostęp do interfejsu internetowego.

INFO:     Started server process [xxxx]
INFO:     Waiting for application startup.

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://localhost:8080.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

Aby to sprawdzić, kliknij przycisk Podgląd w przeglądarce w górnej części edytora Cloud Shell i wybierz Podejrzyj na porcie 8080.

edc73e971b9fc60c.png

Zobaczysz następującą stronę internetową, na której możesz wybrać dostępnych agentów za pomocą przycisku rozwijanego w lewym górnym rogu ( w naszym przypadku powinno to być expense_manager_agent) i wejść w interakcję z botem. W oknie po lewej stronie zobaczysz wiele informacji o szczegółach logu podczas działania agenta.

16c333a4b782eeba.png

Spróbujmy wykonać jakieś działania! Prześlij te 2 przykładowe paragony ( źródło : Zestawy danych o przytulaniu twarzy mousserlane/id_receipt_dataset) . Kliknij prawym przyciskiem myszy każdy obraz i wybierz Zapisz obraz jako… ( spowoduje to pobranie obrazu paragonu), a następnie przesłanie pliku do bota poprzez kliknięcie ikony „klip” i poinformowanie, że chcesz zapisać te paragony

2975b3452e0ac0bd.png 143a2e147a18fc38.png

Następnie wypróbuj te zapytania, aby wyszukać coś lub odzyskać pliki:

  • „Podaj zestawienie wydatków i ich sumę w roku 2023”
  • „Daj mi plik z potwierdzeniem odbioru z Indomaret”

Korzystając z niektórych narzędzi, możesz sprawdzić, co dzieje się w interfejsie programistycznym

da461a67b7d81ad5.png

Sprawdź, jak agent odpowiada na Twoje pytania, i upewnij się, że przestrzega wszystkich zasad podanych w prompcie w pliku task_prompt.py. Gratulacje! Masz teraz w pełni działającego agenta programistycznego.

Nadszedł czas na dodanie odpowiedniego i ładnego interfejsu użytkownika oraz możliwości przesyłania i pobierania plików graficznych.

8. 🚀 Tworzenie usługi frontendu za pomocą Gradio

Zbudujemy interfejs internetowy czatu, który będzie wyglądał tak

db9331886978d543.png

Zawiera interfejs czatu z polem wprowadzania tekstu, w którym użytkownicy mogą wysyłać tekst i przesyłać pliki z obrazami paragonów.

Usługę frontendu utworzymy za pomocą Gradio.

Utwórz nowy plik i nadaj mu nazwę frontend.py.

touch frontend.py

następnie skopiuj poniższy kod i zapisz go

import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse


SETTINGS = get_settings()


def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
    """Encode a file to base64 string and get MIME type.

    Reads an image file and returns the base64-encoded image data and its MIME type.

    Args:
        image_path: Path to the image file to encode.

    Returns:
        ImageData object containing the base64 encoded image data and its MIME type.
    """
    # Read the image file
    with open(image_path, "rb") as file:
        image_content = file.read()

    # Get the mime type
    mime_type = mimetypes.guess_type(image_path)[0]

    # Base64 encode the image
    base64_data = base64.b64encode(image_content).decode("utf-8")

    # Return as ImageData object
    return ImageData(serialized_image=base64_data, mime_type=mime_type)


def decode_base64_to_image(base64_data: str) -> Image.Image:
    """Decode a base64 string to PIL Image.

    Converts a base64-encoded image string back to a PIL Image object
    that can be displayed or processed further.

    Args:
        base64_data: Base64 encoded string of the image.

    Returns:
        PIL Image object of the decoded image.
    """
    # Decode the base64 string and convert to PIL Image
    image_data = base64.b64decode(base64_data)
    image_buffer = io.BytesIO(image_data)
    image = Image.open(image_buffer)

    return image


def get_response_from_llm_backend(
    message: Dict[str, Any],
    history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
    """Send the message and history to the backend and get a response.

    Args:
        message: Dictionary containing the current message with 'text' and optional 'files' keys.
        history: List of previous message dictionaries in the conversation.

    Returns:
        List containing text response and any image attachments from the backend service.
    """
    # Extract files and convert to base64
    image_data = []
    if uploaded_files := message.get("files", []):
        for file_path in uploaded_files:
            image_data.append(encode_image_to_base64_and_get_mime_type(file_path))

    # Prepare the request payload
    payload = ChatRequest(
        text=message["text"],
        files=image_data,
        session_id="default_session",
        user_id="default_user",
    )

    # Send request to backend
    try:
        response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
        response.raise_for_status()  # Raise exception for HTTP errors

        result = ChatResponse(**response.json())
        if result.error:
            return [f"Error: {result.error}"]

        chat_responses = []

        if result.thinking_process:
            chat_responses.append(
                gr.ChatMessage(
                    role="assistant",
                    content=result.thinking_process,
                    metadata={"title": "🧠 Thinking Process"},
                )
            )

        chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))

        if result.attachments:
            for attachment in result.attachments:
                image_data = attachment.serialized_image
                chat_responses.append(gr.Image(decode_base64_to_image(image_data)))

        return chat_responses
    except requests.exceptions.RequestException as e:
        return [f"Error connecting to backend service: {str(e)}"]


if __name__ == "__main__":
    demo = gr.ChatInterface(
        get_response_from_llm_backend,
        title="Personal Expense Assistant",
        description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
        type="messages",
        multimodal=True,
        textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
    )

    demo.launch(
        server_name="0.0.0.0",
        server_port=8080,
    )

Następnie możemy spróbować uruchomić usługę frontendu za pomocą tego polecenia. Nie zapomnij zmienić nazwy pliku main.py na frontend.py.

uv run frontend.py

W konsoli Google Cloud zobaczysz dane wyjściowe podobne do tych:

* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.

Następnie możesz sprawdzić interfejs internetowy, klikając z naciśniętym klawiszem Ctrl lokalny link do adresu URL. Możesz też otworzyć aplikację frontendową, klikając przycisk Podgląd w przeglądarce w prawym górnym rogu edytora Cloud i wybierając Podejrzyj na porcie 8080.

b477bc3c686a5fc3.jpeg

Zobaczysz interfejs sieciowy, jednak podczas próby wysłania czatu pojawi się oczekiwany błąd, ponieważ usługa zaplecza nie została jeszcze skonfigurowana

b5de2f284155dac2.png

Teraz pozwól usłudze działać i nie zamykaj jej jeszcze. Uruchomimy usługę zaplecza w innej karcie terminala

Wyjaśnienie kodu

W tym kodzie front-endu najpierw umożliwiamy użytkownikowi wysyłanie tekstu i przesyłanie wielu plików. Gradio pozwala nam tworzyć tego typu funkcjonalność za pomocą metody gr.ChatInterface połączonej z gr.MultimodalTextbox

Teraz, zanim prześlemy plik i tekst do zaplecza, musimy ustalić typ MIME pliku, ponieważ jest on wymagany przez zaplecze. Musimy również zakodować plik obrazu bajtowo do base64 i wysłać go razem z typem MIME.

class ImageData(BaseModel):
    """Model for image data with hash identifier.

    Attributes:
        serialized_image: Optional Base64 encoded string of the image content.
        mime_type: MIME type of the image.
    """

    serialized_image: str
    mime_type: str

Schemat używany do interakcji między frontendem a backendem jest zdefiniowany w pliku schema.py. Do wymuszania walidacji danych w schemacie używamy klasy Pydantic BaseModel.

Otrzymując odpowiedź, rozróżniamy już, która część jest procesem myślowym, która jest reakcją ostateczną, a która przywiązaniem. W ten sposób możemy wykorzystać komponent Gradio do wyświetlania każdego komponentu za pomocą komponentu interfejsu użytkownika.

class ChatResponse(BaseModel):
    """Model for a chat response.

    Attributes:
        response: The text response from the model.
        thinking_process: Optional thinking process of the model.
        attachments: List of image data to be displayed to the user.
        error: Optional error message if something went wrong.
    """

    response: str
    thinking_process: str = ""
    attachments: List[ImageData] = []
    error: Optional[str] = None

9. 🚀 Zbuduj usługę zaplecza przy użyciu FastAPI

Następnie musimy zbudować zaplecze, które będzie mogło zainicjować naszego agenta razem z innymi komponentami, aby móc uruchomić środowisko wykonawcze agenta.

Utwórz nowy plik i nazwij go backend.py

touch backend.py

i skopiuj ten kod:

from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
from utils import (
    extract_attachment_ids_and_sanitize_response,
    download_image_from_gcs,
    extract_thinking_process,
    format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings

SETTINGS = get_settings()
APP_NAME = "expense_manager_app"


# Application state to hold service contexts
class AppContexts(SimpleNamespace):
    """A class to hold application contexts with attribute access"""

    session_service: InMemorySessionService = None
    artifact_service: GcsArtifactService = None
    expense_manager_agent_runner: Runner = None


# Initialize application state
app_contexts = AppContexts()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary


# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
    return app_contexts


# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)


@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest = Body(...),
    app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
    """Process chat request and get response from the agent"""

    # Prepare the user's message in ADK format and store image artifacts
    content = await format_user_request_to_adk_content_and_store_artifacts(
        request=request,
        app_name=APP_NAME,
        artifact_service=app_context.artifact_service,
    )

    final_response_text = "Agent did not produce a final response."  # Default

    # Use the session ID from the request or default if not provided
    session_id = request.session_id
    user_id = request.user_id

    # Create session if it doesn't exist
    if not await app_context.session_service.get_session(
        app_name=APP_NAME, user_id=user_id, session_id=session_id
    ):
        await app_context.session_service.create_session(
            app_name=APP_NAME, user_id=user_id, session_id=session_id
        )

    try:
        # Process the message with the agent
        # Type annotation: runner.run_async returns an AsyncIterator[Event]
        events_iterator: AsyncIterator[Event] = (
            app_context.expense_manager_agent_runner.run_async(
                user_id=user_id, session_id=session_id, new_message=content
            )
        )
        async for event in events_iterator:  # event has type Event
            # Key Concept: is_final_response() marks the concluding message for the turn
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Extract text from the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate:
                    # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break  # Stop processing events once the final response is found

        logger.info(
            "Received final response from agent", raw_final_response=final_response_text
        )

        # Extract and process any attachments and thinking process in the response
        base64_attachments = []
        sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
            final_response_text
        )
        sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

        # Download images from GCS and replace hash IDs with base64 data
        for image_hash_id in attachment_ids:
            # Download image data and get MIME type
            result = await download_image_from_gcs(
                artifact_service=app_context.artifact_service,
                image_hash=image_hash_id,
                app_name=APP_NAME,
                user_id=user_id,
                session_id=session_id,
            )
            if result:
                base64_data, mime_type = result
                base64_attachments.append(
                    ImageData(serialized_image=base64_data, mime_type=mime_type)
                )

        logger.info(
            "Processed response with attachments",
            sanitized_response=sanitized_text,
            thinking_process=thinking_process,
            attachment_ids=attachment_ids,
        )

        return ChatResponse(
            response=sanitized_text,
            thinking_process=thinking_process,
            attachments=base64_attachments,
        )

    except Exception as e:
        logger.error("Error processing chat request", error_message=str(e))
        return ChatResponse(
            response="", error=f"Error in generating response: {str(e)}"
        )


# Only run the server if this file is executed directly
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081)

Następnie możemy spróbować uruchomić usługę backendu. Pamiętaj, że w poprzednim kroku uruchomiliśmy usługę frontendową. Teraz musimy otworzyć nowy terminal i spróbować uruchomić tę usługę backendową.

  1. Utwórz nowy terminal. Przejdź do terminala w dolnej części i kliknij przycisk „+”, aby utworzyć nowy terminal. Możesz też nacisnąć Ctrl + Shift + C, aby otworzyć nowy terminal.

235e2f9144d82803.jpeg

  1. Następnie upewnij się, że jesteś w katalogu roboczym personal-expense-assistant, i uruchom to polecenie:
uv run backend.py
  1. Jeśli się powiedzie, wyświetli się taki wynik
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

Wyjaśnienie kodu

Inicjowanie agenta ADK, SessionService i ArtifactService

Aby uruchomić agenta w usłudze backendu, musimy utworzyć Runnera, który przyjmuje zarówno SessionService, jak i naszego agenta. SessionService będzie zarządzać historią i stanem rozmowy, więc po zintegrowaniu z Runnerem umożliwi agentowi otrzymywanie kontekstu bieżących rozmów.

Wykorzystujemy również ArtifactService do obsługi przesłanego pliku. Więcej informacji o sesjiartefaktach w ADK znajdziesz tutaj.

...

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary

...

W tej wersji demonstracyjnej używamy InMemorySessionService i GcsArtifactService do integracji z naszym agentem Runner. Historia konwersacji jest przechowywana w pamięci, dlatego zostanie utracona po zamknięciu lub ponownym uruchomieniu usługi zaplecza. Inicjalizujemy je w cyklu życia aplikacji FastAPI, aby wstrzyknąć je jako zależność w trasie /chat.

Przesyłanie i pobieranie obrazu za pomocą usługi GcsArtifactService

Wszystkie przesłane obrazy zostaną zapisane jako artefakty przez GcsArtifactService. Możesz to sprawdzić w funkcji format_user_request_to_adk_content_and_store_artifacts w pliku utils.py

...    

# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
    format_user_request_to_adk_content_and_store_artifacts,
    request=request,
    app_name=APP_NAME,
    artifact_service=app_context.artifact_service,
)

...

Wszystkie żądania, które będą przetwarzane przez wykonawcę agenta, muszą być sformatowane jako types.Content. W funkcji przetwarzamy też dane każdego obrazu i wyodrębniamy jego identyfikator, który zastępujemy obiektem zastępczym identyfikatora obrazu.

Podobny mechanizm jest stosowany do pobierania załączników po wyodrębnieniu identyfikatorów obrazów za pomocą wyrażenia regularnego:

...
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
    final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
    # Download image data and get MIME type
    result = await asyncio.to_thread(
        download_image_from_gcs,
        artifact_service=app_context.artifact_service,
        image_hash=image_hash_id,
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id,
    )
...

10. 🚀 Test integracyjny

Teraz powinieneś mieć wiele usług uruchomionych na różnych kartach konsoli w chmurze:

  • Usługa front-end uruchomiona na porcie 8080
* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.
  • Usługa backendu działa na porcie 8081.
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

Na obecnym etapie powinieneś mieć możliwość przesyłania obrazów paragonów i płynnej komunikacji z asystentem za pośrednictwem aplikacji internetowej na porcie 8080.

Kliknij przycisk Podgląd w sieci Web w górnej części edytora Cloud Shell i wybierz opcję Podgląd na porcie 8080

edc73e971b9fc60c.png

Teraz nawiążmy interakcję z asystentem!

Pobierz następujące paragony. Zakres dat tych danych na paragonie mieści się w latach 2023-2024 i poproś asystenta o ich zapisanie/przesłanie

  • Receipt Drive ( źródło: zestawy danych Hugging face mousserlane/id_receipt_dataset)

Zapytaj o różne rzeczy

  • „Podaj mi miesięczne zestawienie wydatków w latach 2023–2024”.
  • „Pokaż mi paragon za transakcję dotyczącą kawy”
  • „Prześlij mi plik z rachunkiem z restauracji Yakiniku Like”
  • Itp

Oto fragment udanej interakcji

e01dc7a8ec673aa4.png

9341212f8d54c98a.png

11. 🚀 Wdrażanie w Cloud Run

Oczywiście chcemy mieć dostęp do tej niesamowitej aplikacji z dowolnego miejsca. Aby to zrobić, możemy spakować tę aplikację i wdrożyć ją w Cloud Run. Na potrzeby tej wersji demonstracyjnej usługa będzie udostępniana jako usługa publiczna, do której dostęp będą miały inne osoby. Należy jednak pamiętać, że nie jest to najlepsza praktyka w przypadku tego typu zastosowań, ponieważ jest ona bardziej odpowiednia do zastosowań osobistych

90805d85052a5e5a.jpeg

W tym laboratorium umieścimy zarówno usługę frontendu, jak i backendu w 1 kontenerze. Do zarządzania obiema usługami potrzebna będzie pomoc supervisord. Możesz sprawdzić plik supervisord.confDockerfile, w którym ustawiliśmy supervisord jako punkt wejścia.

Mamy już wszystkie pliki potrzebne do wdrożenia aplikacji w Cloud Run. Możemy więc to zrobić. Otwórz terminal Cloud Shell i sprawdź, czy bieżący projekt jest skonfigurowany jako aktywny. Jeśli nie, użyj polecenia gcloud configure, aby ustawić identyfikator projektu:

gcloud config set project [PROJECT_ID]

Następnie uruchom poniższe polecenie, aby wdrożyć je w Cloud Run.

gcloud run deploy personal-expense-assistant \
                  --source . \
                  --port=8080 \
                  --allow-unauthenticated \
                  --env-vars-file=settings.yaml \
                  --memory 1024Mi \
                  --region us-central1

Jeśli pojawi się prośba o potwierdzenie utworzenia repozytorium Artifact Registry dla Dockera, wpisz Y. Pamiętaj, że w tym przypadku zezwalamy na nieuwierzytelniony dostęp, ponieważ jest to aplikacja demonstracyjna. Zalecamy stosowanie odpowiedniego uwierzytelniania w przypadku aplikacji firmowych i produkcyjnych.

Po zakończeniu wdrażania powinien pojawić się link podobny do tego poniżej:

https://personal-expense-assistant-*******.us-central1.run.app

Możesz teraz korzystać z aplikacji w oknie incognito lub na urządzeniu mobilnym. Powinien być już dostępny.

12. 🎯 Wyzwanie

Teraz nadszedł czas, aby zabłysnąć i doszlifować swoje umiejętności eksploracyjne. Czy masz to, czego potrzeba, aby zmienić kod tak, aby zaplecze mogło obsługiwać wielu użytkowników? Które komponenty wymagają aktualizacji?

13. 🧹 Sprzątanie

Aby uniknąć naliczania opłat na Twoje konto Google Cloud za zasoby wykorzystane w tym laboratorium programistycznym, wykonaj następujące kroki:

  1. W konsoli Google Cloud przejdź do strony Zarządzaj zasobami.
  2. Z listy projektów wybierz projekt do usunięcia, a potem kliknij Usuń.
  3. W oknie dialogowym wpisz identyfikator projektu, a następnie kliknij Zamknij, aby usunąć projekt.
  4. Możesz też otworzyć Cloud Run w konsoli, wybrać wdrożoną usługę i ją usunąć.