Przejście na tryb multimodalny dzięki pakietowi dla programistów: asystent wydatków osobistych z Gemini 2.5, Firestore i Cloud Run

Informacje o tym ćwiczeniu (w Codelabs)
schedule0 minut
subjectOstatnia aktualizacja: 15 maja 2025
account_circleAutorzy: Alvin Prayuda Juniarta Dwiyantoro

d029d993943b282b.png

Czy kiedykolwiek zdarzyło Ci się, że nie chciało Ci się zarządzać swoimi wydatkami osobistymi? Ja też! Dlatego w tym CodeLab stworzymy osobistego asystenta do zarządzania wydatkami, który będzie korzystał z Gemini 2.5, aby wykonywać za nas wszystkie prace. Od zarządzania przesłanymi paragonami po analizowanie, czy nie wydaliśmy już zbyt dużo na kawę.

Asystent będzie dostępny w przeglądarce internetowej w postaci interfejsu internetowego czatu, za pomocą którego możesz się z nim komunikować, przesyłać obrazy paragonów i prosić o ich przechowywanie. Możesz też wyszukać paragony, aby pobrać plik i przeprowadzić analizę wydatków. Wszystko to zostało zbudowane na podstawie ramy Google Agent Development Kit.

Sama aplikacja jest podzielona na 2 usługi: front-end i back-end. Dzięki temu możesz szybko utworzyć prototyp i sprawdzić, jak działa, a także zrozumieć, jak wygląda integracja obu usług w ramach interfejsu API.

W ramach tego ćwiczenia będziesz wykonywać czynności krok po kroku:

  1. Przygotuj projekt Google Cloud i włącz w nim wszystkie wymagane interfejsy API
  2. Konfigurowanie zasobnika w Google Cloud Storage i bazy danych w Firestore
  3. Tworzenie indeksowania Firestore
  4. Konfigurowanie obszaru roboczego na potrzeby środowiska programowania
  5. Struktura kodu źródłowego, narzędzi i promptów agenta ADK
  6. Testowanie agenta za pomocą lokalnego interfejsu programowania internetowego ADK
  7. Utwórz usługę frontendu – interfejs czatu za pomocą biblioteki Gradio, aby wysłać zapytanie i przesłać obrazy potwierdzenia.
  8. Utwórz usługę backendową – serwer HTTP za pomocą FastAPI, w którym znajduje się kod agenta ADK, usługa SessionService i usługa Artifact Service.
  9. Zarządzanie zmiennymi środowiskowymi i plikami konfiguracyjnymi wymaganymi do wdrożenia aplikacji w Cloud Run
  10. Wdrażanie aplikacji w Cloud Run

Omówienie architektury

6795e9abf2030334.jpeg

Wymagania wstępne

  • swobodnie pracować z Pythonem,
  • podstawowa architektura pełnego zestawu usług korzystająca z usługi HTTP;

Czego się nauczysz

  • Tworzenie prototypów front-endu witryny za pomocą Gradio
  • tworzenie usług backendu za pomocą FastAPI i Pydantic;
  • projektowanie agenta ADK z wykorzystaniem jego kilku funkcji;
  • Korzystanie z narzędzia
  • Zarządzanie sesjami i artefaktami
  • Wykorzystanie funkcji wywołania zwrotnego do modyfikacji danych wejściowych przed wysłaniem ich do Gemini
  • Korzystanie z BuiltInPlanner do ulepszania wykonywania zadań przez planowanie
  • Szybkie debugowanie za pomocą lokalnego interfejsu internetowego ADK
  • Strategia optymalizacji interakcji multimodalnej poprzez analizowanie i pobieranie informacji za pomocą promptów oraz modyfikowanie żądań Gemini za pomocą funkcji ADK callback
  • Generowanie rozszerzone przez wyszukiwanie w zapisanych informacjach z użyciem Firestore jako bazy danych wektorów
  • Zarządzanie zmiennymi środowiskowymi w pliku YAML za pomocą Pydantic-settings
  • Wdrażanie aplikacji w Cloud Run za pomocą pliku Dockerfile i podawanie zmiennych środowiskowych w pliku YAML

Czego potrzebujesz

  • przeglądarka Chrome,
  • konto Gmail,
  • projekt w chmurze z włączonymi płatnościami,

Ten warsztat programistyczny przeznaczony dla deweloperów na wszystkich poziomach zaawansowania (w tym początkujących) używa Pythona w próbnej aplikacji. Jednak znajomość Pythona nie jest wymagana do zrozumienia omawianych zagadnień.

2. Zanim zaczniesz

Wybieranie aktywnego projektu w konsoli Cloud

W tym laboratorium programistycznym zakładamy, że masz już projekt Google Cloud z włączonymi płatnościami. Jeśli jeszcze go nie masz, możesz zacząć od wykonania tych instrukcji.

  1. W konsoli Google Cloud na stronie selektora projektu wybierz lub utwórz projekt Google Cloud.
  2. Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.

9b27622602f6cc4f.png

Przygotowanie bazy danych Firestore

Następnie musimy utworzyć bazę danych Firestore. Firestore w trybie natywnym to baza danych dokumentów NoSQL zaprojektowana pod kątem automatycznego skalowania, wysokiej wydajności i łatwego tworzenia aplikacji. Może też pełnić funkcję bazy danych wektorów, która może obsługiwać technikę generowania rozszerzonego wyszukiwania w naszej pracowni.

  1. Na pasku wyszukiwania wpisz „firestore” i kliknij usługę Firestore.

2986f598f448af67.png

  1. Następnie kliknij przycisk Utwórz bazę danych Firestore.
  2. Jako nazwy identyfikatora bazy danych użyj wartości (domyślnie) i pozostaw wybraną opcję Wersja standardowa. Na potrzeby tego ćwiczenia użyj Firestore Nativeotwartymi regułami zabezpieczeń.
  1. Zauważysz też, że ta baza danych ma poziom bezpłatny YEAY! Następnie kliknij przycisk Utwórz bazę danych.

27a5495b76ed7033.png

Po wykonaniu tych czynności powinieneś zostać przekierowany do utworzonej właśnie bazy danych Firestore.

Konfigurowanie projektu Cloud w terminalu Cloud Shell

  1. Użyjesz Cloud Shell, czyli środowiska wiersza poleceń działającego w Google Cloud, które jest wstępnie załadowane w bq. Kliknij Aktywuj Cloud Shell u góry konsoli Google Cloud.

1829c3759227c19b.png

  1. Po połączeniu z Cloud Shell sprawdź, czy jesteś już uwierzytelniony i czy projekt jest ustawiony na identyfikator Twojego projektu, używając tego polecenia:
gcloud auth list
  1. Aby sprawdzić, czy polecenie gcloud zna Twój projekt, uruchom w Cloud Shell to polecenie:
gcloud config list project
  1. Jeśli projekt nie jest ustawiony, użyj tego polecenia:
gcloud config set project <YOUR_PROJECT_ID>

Identyfikator PROJECT_ID możesz też zobaczyć w konsoli

4032c45803813f30.jpeg

Kliknij go, aby wyświetlić po prawej stronie informacje o projekcie i jego identyfikator.

8dc17eb4271de6b5.jpeg

  1. Włącz wymagane interfejsy API za pomocą polecenia pokazanego poniżej. Może to potrwać kilka minut, więc zachowaj cierpliwość.
gcloud services enable aiplatform.googleapis.com \
                       firestore.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudresourcemanager.googleapis.com

Po pomyślnym wykonaniu polecenia powinien wyświetlić się komunikat podobny do tego:

Operation "operations/..." finished successfully.

Alternatywą dla polecenia gcloud jest konsola, w której możesz wyszukać poszczególne usługi lub skorzystać z tego linku.

Jeśli pominiesz któryś interfejs API, zawsze możesz go włączyć w trakcie implementacji.

Informacje o poleceniach i użytkowaniu gcloud znajdziesz w dokumentacji.

Przygotowanie zasobnika Google Cloud Storage

Następnie w tym samym terminalu musimy przygotować zasobnik GCS do przechowywania przesłanego pliku. Aby utworzyć zasobnik, uruchom to polecenie:

gsutil mb -l us-central1 gs://personal-expense-assistant-receipts

Wyświetli się taki wynik

Creating gs://personal-expense-assistant-receipts/...

Aby to sprawdzić, otwórz menu nawigacyjne w lewym górnym rogu przeglądarki i wybierz Cloud Storage -> Bucket.

d27475d5ce4fcc9d.png

Firestore to baza danych NoSQL, która zapewnia wysoką wydajność i elastyczność w modelu danych, ale ma ograniczenia w przypadku złożonych zapytań. Planujemy użycie złożonych zapytań wielopolowych i wyszukiwania wektorów, więc musimy najpierw utworzyć indeks. Więcej informacji znajdziesz w tej dokumentacji.

  1. Aby utworzyć indeks obsługujący złożone zapytania, uruchom to polecenie:
gcloud firestore indexes composite create \
        --collection-group=personal-expense-assistant-receipts \
        --field-config field-path=total_amount,order=ASCENDING \
        --field-config field-path=transaction_time,order=ASCENDING \
        --field-config field-path=__name__,order=ASCENDING \
        --database="(default)"
  1. Uruchom to, aby uzyskać wsparcie wyszukiwania wektorów
gcloud firestore indexes composite create \
        --collection-group="personal-expense-assistant-receipts" \
        --query-scope=COLLECTION \
        --field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
        --database="(default)"

Aby sprawdzić utworzony indeks, otwórz Firestore w konsoli Google Cloud, kliknij instancję bazy danych (domyślna) i na pasku nawigacyjnym wybierz Indeksy.

8b3a4012985ee0b6.png

Otwórz edytor Cloud Shell i skonfiguruj katalog roboczy aplikacji

Teraz możemy skonfigurować edytor kodu, aby wykonać pewne czynności związane z kodowaniem. Do tego celu użyjemy edytora Cloud Shell.

  1. Kliknij przycisk Otwórz edytor, aby otworzyć edytor Cloud Shell, w którym możesz napisać kod b16d56e4979ec951.png
  2. Sprawdź, czy w lewym dolnym rogu (pasek stanu) edytora Cloud Shell ustawiony jest projekt Cloud Code (jak na obrazku poniżej) i czy jest to aktywny projekt Google Cloud, w którym masz włączone płatności. Jeśli pojawi się taka prośba, autoryzuj. Jeśli wykonasz poprzednie polecenie, przycisk może też wskazywać bezpośrednio aktywny projekt zamiast przycisku logowania.

f5003b9c38b43262.png

  1. Następnie skopiuj katalog roboczy szablonu tego laboratorium kodu z GitHuba, wykonując to polecenie. Utworzy katalog roboczy w katalogu personal-expense-assistant.
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
  1. Następnie przejdź do górnej sekcji edytora Cloud Shell i kliknij Plik->Otwórz folder, odszukaj katalog username i katalog personal-expense-assistant,a następnie kliknij przycisk OK. Spowoduje to ustawienie wybranego katalogu jako głównego katalogu roboczego. W tym przykładzie nazwa użytkownika to alvinprayuda, dlatego ścieżka katalogu jest wyświetlana poniżej.

2c53696f81d805cc.png

a766d380600a988.png

Twój edytor Cloud Shell powinien teraz wyglądać tak

528df7169f01b016.png

Konfiguracja środowiska

Przygotowanie wirtualnego środowiska Pythona

Kolejnym krokiem jest przygotowanie środowiska programistycznego. Aktualnie aktywny terminal powinien znajdować się w katalogu roboczym personal-expense-assistant. W tym laboratorium kodu użyjemy Pythona 3.12 i narzędzia uv python project manager, aby uprościć tworzenie wersji Pythona i zarządzanie nią oraz środowiskiem wirtualnym.

  1. Jeśli terminal nie jest jeszcze otwarty, otwórz go, klikając Terminal > Nowy terminal, lub naciśnij Ctrl + Shift + C, aby otworzyć okno terminala w dolnej części przeglądarki.

f8457daf0bed059e.jpeg

  1. Pobierz uv i zainstaluj Pythona 3.12 za pomocą tego polecenia
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
  1. Teraz zainicjuj środowisko wirtualne za pomocą uv , uruchom to polecenie
uv sync --frozen

Spowoduje to utworzenie katalogu .venv i zainstalowanie zależności. Szybki rzut oka na plik pyproject.toml zawiera informacje o zależnościach, które wyglądają tak:

dependencies = [
    "datasets>=3.5.0",
    "google-adk>=0.2.0",
    "google-cloud-firestore>=2.20.1",
    "gradio>=5.23.1",
    "pydantic>=2.10.6",
    "pydantic-settings[yaml]>=2.8.1",
]
  1. Aby przetestować wirtualne środowisko, utwórz nowy plik main.py i skopiuj ten kod
def main():
   print("Hello from personal-expense-assistant-adk!")

if __name__ == "__main__":
   main()
  1. Następnie uruchom to polecenie:
uv run main.py

Dane wyjściowe będą wyglądać tak:

Using CPython 3.12
Creating virtual environment at: .venv
Hello from personal-expense-assistant-adk!

To pokazuje, że projekt Pythona jest prawidłowo skonfigurowany.

Konfigurowanie plików konfiguracji

Teraz musimy skonfigurować pliki konfiguracji dla tego projektu. Do odczytu konfiguracji z pliku YAML używamy pakietu pydantic-settings.

Utwórz plik o nazwie settings.yaml z taką konfiguracją: Kliknij Plik > Nowy plik tekstowy i wpisz ten kod. Następnie zapisz plik jako settings.yaml.

GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"

W tym laboratorium kodu używamy wstępnie skonfigurowanych wartości GCLOUD_LOCATION, BACKEND_URL, STORAGE_BUCKET_NAME, DB_COLLECTION_NAME i BACKEND_URL .

Teraz możemy przejść do następnego kroku, czyli tworzenia agenta, a potem usług.

3. Tworzenie agenta za pomocą Google ADK i Gemini 2.5

Wprowadzenie do struktury katalogu ADK

Zacznijmy od zapoznania się z możliwościami ADK i sposobem tworzenia agenta. Pełną dokumentację ADK znajdziesz pod tym adresem URL . ADK udostępnia wiele narzędzi w ramach wykonywania poleceń w interfejsie wiersza poleceń. Oto niektóre z nich :

  • Konfigurowanie struktury katalogu agentów
  • Szybkie testowanie interakcji za pomocą interfejsu wiersza poleceń
  • Szybkie konfigurowanie lokalnego interfejsu programistycznego

Teraz utwórz strukturę katalogu agenta za pomocą polecenia wiersza poleceń. Uruchom to polecenie:

uv run adk create expense_manager_agent \
   --model gemini-2.5-flash-preview-04-17 \
   --project {your-project-id} \
   --region us-central1

Utworzy ona następującą strukturę katalogu agenta:

expense_manager_agent/
├── __init__.py
├── .env
├── agent.py

Jeśli sprawdzisz pliki init.pyagent.py, zobaczysz ten kod:

# __init__.py

from . import agent
# agent.py

from google.adk.agents import Agent

root_agent = Agent(
    model='gemini-2.5-flash-preview-04-17',
    name='root_agent',
    description='A helpful assistant for user questions.',
    instruction='Answer user questions to the best of your knowledge',
)

Tworzenie agenta dotyczącego zarządzania wydatkami

Zbudujmy agenta ds. zarządzania wydatkami. Otwórz plik expense_manager_agent/agent.py i skopiuj znajdujący się w nim kod root_agent.

# expense_manager_agent/agent.py

from google.adk.agents import Agent
from expense_manager_agent.tools import (
    store_receipt_data,
    search_receipts_by_metadata_filter,
    search_relevant_receipts_by_natural_language_query,
    get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types

SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"

# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
    task_prompt = file.read()

root_agent = Agent(
    name="expense_manager_agent",
    model="gemini-2.5-flash-preview-04-17",
    description=(
        "Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
    ),
    instruction=task_prompt,
    tools=[
        store_receipt_data,
        get_receipt_data_by_image_id,
        search_receipts_by_metadata_filter,
        search_relevant_receipts_by_natural_language_query,
    ],
    planner=BuiltInPlanner(
        thinking_config=types.ThinkingConfig(
            thinking_budget=2048,
        )
    ),
    before_model_callback=modify_image_data_in_history,
)

Objaśnienie kodu

Ten skrypt zawiera inicjowanie agenta, w ramach którego inicjujemy te elementy:

  • Ustaw model, którego chcesz używać, na gemini-2.5-flash-preview-04-17
  • Skonfiguruj opis i instrukcje agenta jako prompt systemowy, który jest odczytywany z task_prompt.md
  • udostępnić niezbędne narzędzia do obsługi funkcji agenta;
  • Umożliwienie planowania przed wygenerowaniem ostatecznej odpowiedzi lub wykonaniem za pomocą funkcji myślenia Gemini 2.5 Flash
  • Przed wysłaniem żądania do Gemini skonfiguruj przechwytywanie wywołania zwrotnego, aby ograniczyć liczbę danych obrazu wysyłanych przed wykonaniem prognozy.

4. Konfigurowanie narzędzi agenta

Nasz menedżer wydatków będzie mógł:

  • Wyodrębnij dane z obrazu rachunku i zapisz je wraz z pliku
  • Dokładne wyszukiwanie w danych o wydatkach
  • wyszukiwanie kontekstowe danych o wydatkach;

Dlatego potrzebujemy odpowiednich narzędzi do obsługi tej funkcji. Utwórz nowy plik w katalogu expense_manager_agent, nadaj mu nazwę tools.py i skopiuj kod poniżej.

# expense_manager_agent/tools.py

import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai

SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
    project=SETTINGS.GCLOUD_PROJECT_ID
)  # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
    vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""


def sanitize_image_id(image_id: str) -> str:
    """Sanitize image ID by removing any leading/trailing whitespace."""
    if image_id.startswith("[IMAGE-"):
        image_id = image_id.split("ID ")[1].split("]")[0]

    return image_id.strip()


def store_receipt_data(
    image_id: str,
    store_name: str,
    transaction_time: str,
    total_amount: float,
    purchased_items: List[Dict[str, Any]],
    currency: str = "IDR",
) -> str:
    """
    Store receipt data in the database.

    Args:
        image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
            the ID of the image is 12345.
        store_name (str): The name of the store.
        transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
        total_amount (float): The total amount spent.
        purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
            - name (str): The name of the item.
            - price (float): The price of the item.
            - quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
        currency (str, optional): The currency of the transaction, can be derived from the store location.
            If unsure, default is "IDR".

    Returns:
        str: A success message with the receipt ID.

    Raises:
        Exception: If the operation failed or input is invalid.
    """
    try:
        # In case of it provide full image placeholder, extract the id string
        image_id = sanitize_image_id(image_id)

        # Check if the receipt already exists
        doc = get_receipt_data_by_image_id(image_id)

        if doc:
            return f"Receipt with ID {image_id} already exists"

        # Validate transaction time
        if not isinstance(transaction_time, str):
            raise ValueError(
                "Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )
        try:
            datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError(
                "Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )

        # Validate items format
        if not isinstance(purchased_items, list):
            raise ValueError(INVALID_ITEMS_FORMAT_ERR)

        for _item in purchased_items:
            if (
                not isinstance(_item, dict)
                or "name" not in _item
                or "price" not in _item
            ):
                raise ValueError(INVALID_ITEMS_FORMAT_ERR)

            if "quantity" not in _item:
                _item["quantity"] = 1

        # Create a combined text from all receipt information for better embedding
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004",
            contents=RECEIPT_DESC_FORMAT.format(
                store_name=store_name,
                transaction_time=transaction_time,
                total_amount=total_amount,
                currency=currency,
                purchased_items=purchased_items,
                receipt_id=image_id,
            ),
        )

        embedding = result.embeddings[0].values

        doc = {
            "receipt_id": image_id,
            "store_name": store_name,
            "transaction_time": transaction_time,
            "total_amount": total_amount,
            "currency": currency,
            "purchased_items": purchased_items,
            EMBEDDING_FIELD_NAME: Vector(embedding),
        }

        COLLECTION.add(doc)

        return f"Receipt stored successfully with ID: {image_id}"
    except Exception as e:
        raise Exception(f"Failed to store receipt: {str(e)}")


def search_receipts_by_metadata_filter(
    start_time: str,
    end_time: str,
    min_total_amount: float = -1.0,
    max_total_amount: float = -1.0,
) -> str:
    """
    Filter receipts by metadata within a specific time range and optionally by amount.

    Args:
        start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
        max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.

    Returns:
        str: A string containing the list of receipt data matching all applied filters.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Validate start and end times
        if not isinstance(start_time, str) or not isinstance(end_time, str):
            raise ValueError("start_time and end_time must be strings in ISO format")
        try:
            datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
            datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError("start_time and end_time must be strings in ISO format")

        # Start with the base collection reference
        query = COLLECTION

        # Build the composite query by properly chaining conditions
        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        filters = [
            FieldFilter("transaction_time", ">=", start_time),
            FieldFilter("transaction_time", "<=", end_time),
        ]

        # Add optional filters
        if min_total_amount != -1:
            filters.append(FieldFilter("total_amount", ">=", min_total_amount))

        if max_total_amount != -1:
            filters.append(FieldFilter("total_amount", "<=", max_total_amount))

        # Apply the filters
        composite_filter = And(filters=filters)
        query = query.where(filter=composite_filter)

        # Execute the query and collect results
        search_result_description = "Search by Metadata Results:\n"
        for doc in query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display

            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error filtering receipts: {str(e)}")


def search_relevant_receipts_by_natural_language_query(
    query_text: str, limit: int = 5
) -> str:
    """
    Search for receipts with content most similar to the query using vector search.
    This tool can be use for user query that is difficult to translate into metadata filters.
    Such as store name or item name which sensitive to string matching.
    Use this tool if you cannot utilize the search by metadata filter tool.

    Args:
        query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
        limit (int, optional): Maximum number of results to return (default: 5).

    Returns:
        str: A string containing the list of contextually relevant receipt data.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Generate embedding for the query text
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004", contents=query_text
        )
        query_embedding = result.embeddings[0].values

        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        vector_query = COLLECTION.find_nearest(
            vector_field=EMBEDDING_FIELD_NAME,
            query_vector=Vector(query_embedding),
            distance_measure=DistanceMeasure.EUCLIDEAN,
            limit=limit,
        )

        # Execute the query and collect results
        search_result_description = "Search by Contextual Relevance Results:\n"
        for doc in vector_query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display
            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error searching receipts: {str(e)}")


def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
    """
    Retrieve receipt data from the database using the image_id.

    Args:
        image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
            [IMAGE-ID 12345], the ID to use is 12345.

    Returns:
        Dict[str, Any]: A dictionary containing the receipt data with the following keys:
            - receipt_id (str): The unique identifier of the receipt image.
            - store_name (str): The name of the store.
            - transaction_time (str): The time of purchase in UTC.
            - total_amount (float): The total amount spent.
            - currency (str): The currency of the transaction.
            - purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
        Returns an empty dictionary if no receipt is found.
    """
    # In case of it provide full image placeholder, extract the id string
    image_id = sanitize_image_id(image_id)

    # Query the receipts collection for documents with matching receipt_id (image_id)
    # Notes that this demo assume 1 user only,
    # need to refactor the query for multiple user
    query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
    docs = list(query.stream())

    if not docs:
        return {}

    # Get the first matching document
    doc_data = docs[0].to_dict()
    doc_data.pop(EMBEDDING_FIELD_NAME, None)

    return doc_data

Objaśnienie kodu

W ramach tej implementacji narzędzi projektujemy je z uwzględnieniem tych 2 głównych kwestii:

  • Przeanalizuj dane paragonu i zmapuj je do pliku źródłowego za pomocą wskaźnika ciągu identyfikatora obrazu [IMAGE-ID <hash-of-image-1>]
  • Przechowywanie i pobieranie danych za pomocą bazy danych Firestore

Narzędzie „store_receipt_data”

6119e1f37f516707.png

To narzędzie to narzędzie do rozpoznawania znaków optycznego, które przeanalizuje wymagane informacje z danych obrazu, rozpozna ciąg znaków identyfikatora obrazu i połączy je, aby zapisać w bazie danych Firestore.

Dodatkowo to narzędzie zamienia zawartość potwierdzenia na kodowanie za pomocą funkcji text-embedding-004, dzięki czemu wszystkie metadane i kodowanie są przechowywane i indeksowane razem. Umożliwienie elastycznego pobierania za pomocą zapytania lub wyszukiwania kontekstowego.

Po uruchomieniu tego narzędzia zobaczysz, że dane paragonu zostały już zindeksowane w bazie danych Firestore, jak pokazano poniżej.

7b448fcde40fac5a.png

Narzędzie „search_receipts_by_metadata_filter”

9d51a3f12289d184.png

To narzędzie przekształca zapytanie użytkownika w filtr zapytania o metadane, który obsługuje wyszukiwanie według zakresu dat lub łącznej wartości transakcji. Zwróci wszystkie dopasowane dane paragonu, w których procesie pominiemy pole umieszczania, ponieważ nie jest ono potrzebne do zrozumienia kontekstu.

Narzędzie „search_relevant_receipts_by_natural_language_query”

b97d3aab9aa53bc9.png

To jest nasze narzędzie do generowania wspomaganego przez wyszukiwanie (RAG). Nasz agent może tworzyć własne zapytania, aby pobierać odpowiednie potwierdzenia z bazy danych wektorów. Może też decydować, kiedy korzystać z tego narzędzia. Możliwość podjęcia przez agenta samodzielnej decyzji, czy chce korzystać z tego narzędzia RAG, i utworzenie przez niego własnego zapytania jest jednym z definicji podejścia agentycznego RAG.

Pozwalamy mu nie tylko na tworzenie własnych zapytań, ale też na wybór liczby dokumentów, które chce pobrać. W połączeniu z odpowiednim opracowaniem promptów, np.

# Example prompt

Always filter the result from tool
search_relevant_receipts_by_natural_language_query as the returned 
result may contain irrelevant information

Dzięki temu to narzędzie stanie się potężnym narzędziem, które może wyszukiwać prawie wszystko, ale może nie zwracać wszystkich oczekiwanych wyników z powodu nieprecyzyjnego charakteru wyszukiwania najbliższego sąsiada.

5. Modyfikowanie kontekstu rozmowy za pomocą wywołań zwrotnych

Google ADK umożliwia nam „przechwytywanie” działania skryptu na różnych poziomach. Więcej informacji o tej funkcji znajdziesz w tej dokumentacji . W tym module użyjemy funkcji before_model_callback, aby zmodyfikować żądanie przed wysłaniem do modelu LLM, aby usunąć dane obrazu z kontekstu starej historii rozmów ( zawierać tylko dane obrazu z ostatnich 3 interakcji użytkownika) w celu zwiększenia wydajności.

Chcemy jednak, aby w razie potrzeby agent miał dostęp do kontekstu danych obrazu. Dlatego dodajemy mechanizm, który po każdym bajcie danych obrazu w rozmowie dodaje ciąg znaków identyfikatora obrazu. Pomoże to pracownikowi obsługi klienta powiązać identyfikator obrazu z rzeczywistymi danymi pliku, które można wykorzystać zarówno podczas przechowywania, jak i pobierania obrazu. Struktura będzie wyglądać tak

<image-byte-data-1>
[IMAGE-ID <hash-of-image-1>]
<image-byte-data-2>
[IMAGE-ID <hash-of-image-2>]
And so on..

Gdy bajty staną się nieaktualne w historii rozmowy, ciąg znaków nadal będzie dostępny, aby umożliwić dostęp do danych za pomocą narzędzia. Przykład struktury historii po usunięciu danych obrazu

[IMAGE-ID <hash-of-image-1>]
[IMAGE-ID <hash-of-image-2>]
And so on..

Rozpocznij Utwórz nowy plik w katalogu expense_manager_agent o nazwie callbacks.py i skopiuj kod poniżej.

# expense_manager_agent/callbacks.py

import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest


def modify_image_data_in_history(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
    # The following code will modify the request sent to LLM
    # We will only keep image data in the last 3 user messages using a reverse and counter approach

    # Count how many user messages we've processed
    user_message_count = 0

    # Process the reversed list
    for content in reversed(llm_request.contents):
        # Only count for user manual query, not function call
        if (content.role == "user") and (content.parts[0].function_response is None):
            user_message_count += 1
            modified_content_parts = []

            # Check any missing image ID placeholder for any image data
            # Then remove image data from conversation history if more than 3 user messages
            for idx, part in enumerate(content.parts):
                if part.inline_data is None:
                    modified_content_parts.append(part)
                    continue

                if (
                    (idx + 1 >= len(content.parts))
                    or (content.parts[idx + 1].text is None)
                    or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
                ):
                    # Generate hash ID for the image and add a placeholder
                    image_data = part.inline_data.data
                    hasher = hashlib.sha256(image_data)
                    image_hash_id = hasher.hexdigest()[:12]
                    placeholder = f"[IMAGE-ID {image_hash_id}]"

                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

                    modified_content_parts.append(types.Part(text=placeholder))

                else:
                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

            # This will modify the contents inside the llm_request
            content.parts = modified_content_parts

6. Prompt

Projektowanie agenta z zaawansowanymi interakcjami i możliwościami wymaga znalezienia odpowiedniego promptu, który pomoże mu działać tak, jak tego chcemy.

Wcześniej mieliśmy mechanizm obsługi danych obrazu w historii rozmowy, a także narzędzia, których używanie nie było proste, np. search_relevant_receipts_by_natural_language_query. Chcemy też, aby pracownik obsługi klienta mógł wyszukać i przesłać nam prawidłowy obraz rachunku. Oznacza to, że musimy prawidłowo przekazać wszystkie te informacje w odpowiedniej strukturze prompta.

Poprosimy agenta o uporządkowanie danych wyjściowych w format Markdown, aby umożliwić analizę procesu myślowego, ostatecznej odpowiedzi i załączników ( jeśli występują).

# THINKING PROCESS

Thinking process here

# FINAL RESPONSE

Response to the user here

Attachments put inside json block

{
    "attachments": [
      "[IMAGE-ID <hash-id-1>]",
      "[IMAGE-ID <hash-id-2>]",
      ...
    ]
}

Zacznijmy od tego prompta, aby osiągnąć nasze początkowe oczekiwania dotyczące zachowania agenta menedżera wydatków. Plik task_prompt.md powinien już istnieć w naszym obecnym katalogu roboczym, ale musimy przenieść go do katalogu expense_manager_agent. Aby przenieść plik, uruchom to polecenie:

mv task_prompt.md expense_manager_agent/task_prompt.md

7. Testowanie agenta

Spróbuj teraz komunikować się z agentem za pomocą interfejsu wiersza poleceń. Aby to zrobić, uruchom to polecenie:

uv run adk run expense_manager_agent

Wyświetli się okno, w którym możesz prowadzić rozmowę z obsługą klienta, ale możesz wysyłać tylko tekst.

Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root_agent, type exit to exit.
user: hello
[root_agent]: Hello there! How can I help you today?
user: 

Oprócz interakcji z CLI ADK umożliwia też korzystanie z interfejsu programistycznego, który pozwala na interakcję i sprawdzanie, co dzieje się podczas interakcji. Aby uruchomić serwer interfejsu użytkownika w ramach lokalnego środowiska programistycznego, uruchom to polecenie:

uv run adk web --port 8080

Wyświetli się komunikat podobny do tego, co oznacza, że mamy już dostęp do interfejsu internetowego.

INFO:     Started server process [xxxx]
INFO:     Waiting for application startup.

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://localhost:8080.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

Aby to sprawdzić, w górnej części edytora Cloud Shell kliknij przycisk Podgląd w przeglądarce i wybierz Podejrzyj na porcie 8080.

e7c9f56c2463164.png

Zobaczysz stronę internetową, na której możesz wybrać dostępnych agentów w menu u góry po lewej stronie ( w naszym przypadku jest to expense_manager_agent) i rozmawiać z botem. W oknie po lewej stronie znajdziesz wiele informacji o szczegółach logowania podczas działania agenta.

b0244afd8da6cc42.png

Spróbujmy wykonać kilka czynności. Prześlij te 2 przykładowe paragony ( źródło : zbiory danych Hugging Face mousserlane/id_receipt_dataset) . Kliknij każdy obraz prawym przyciskiem myszy i wybierz Zapisz obraz jako. ( spowoduje to pobranie obrazu rachunku), a następnie prześlij plik do bota, klikając ikonę „spinacza” i podając, że chcesz zapisać te rachunki.

b8ee334373c6e6af.png c83a8c58ac2eff28.png

Następnie spróbuj wykonać te zapytania, aby wyszukać lub pobrać plik.

  • „Podaj podział wydatków i ich łączną kwotę w 2023 r.”
  • „Give me receipt file from Indomaret”

Gdy używasz niektórych narzędzi, możesz sprawdzić, co dzieje się w interfejsie programisty.

bf47d0b35d5a4f28.png

Sprawdź, jak agent reaguje na Twoje działania, i sprawdź, czy jest zgodny ze wszystkimi regułami podanymi w promptach w pliku task_prompt.py. Gratulacje! Teraz masz działającego agenta rozwoju.

Teraz czas uzupełnić formularz, korzystając z odpowiedniego i ładnego interfejsu oraz możliwości przesyłania i pobierania plików obrazów.

8. Tworzenie usługi frontendu za pomocą Gradio

Utworzymy interfejs internetowy czatu, który będzie wyglądał tak

d029d993943b282b.png

Zawiera on interfejs czatu z polem do wprowadzania tekstu, w którym użytkownicy mogą wysyłać tekst i przesyłać pliki z obrazami rachunków.

Usługę front-end zbudujemy za pomocą Gradio.

Utwórz nowy plik. Kliknij Plik > Nowy plik tekstowy i nazwij go frontend.py, a następnie skopiuj i zapisz podany niżej kod.

import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse


SETTINGS = get_settings()


def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
    """Encode a file to base64 string and get MIME type.

    Reads an image file and returns the base64-encoded image data and its MIME type.

    Args:
        image_path: Path to the image file to encode.

    Returns:
        ImageData object containing the base64 encoded image data and its MIME type.
    """
    # Read the image file
    with open(image_path, "rb") as file:
        image_content = file.read()

    # Get the mime type
    mime_type = mimetypes.guess_type(image_path)[0]

    # Base64 encode the image
    base64_data = base64.b64encode(image_content).decode("utf-8")

    # Return as ImageData object
    return ImageData(serialized_image=base64_data, mime_type=mime_type)


def decode_base64_to_image(base64_data: str) -> Image.Image:
    """Decode a base64 string to PIL Image.

    Converts a base64-encoded image string back to a PIL Image object
    that can be displayed or processed further.

    Args:
        base64_data: Base64 encoded string of the image.

    Returns:
        PIL Image object of the decoded image.
    """
    # Decode the base64 string and convert to PIL Image
    image_data = base64.b64decode(base64_data)
    image_buffer = io.BytesIO(image_data)
    image = Image.open(image_buffer)

    return image


def get_response_from_llm_backend(
    message: Dict[str, Any],
    history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
    """Send the message and history to the backend and get a response.

    Args:
        message: Dictionary containing the current message with 'text' and optional 'files' keys.
        history: List of previous message dictionaries in the conversation.

    Returns:
        List containing text response and any image attachments from the backend service.
    """
    # Extract files and convert to base64
    image_data = []
    if uploaded_files := message.get("files", []):
        for file_path in uploaded_files:
            image_data.append(encode_image_to_base64_and_get_mime_type(file_path))

    # Prepare the request payload
    payload = ChatRequest(
        text=message["text"],
        files=image_data,
        session_id="default_session",
        user_id="default_user",
    )

    # Send request to backend
    try:
        response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
        response.raise_for_status()  # Raise exception for HTTP errors

        result = ChatResponse(**response.json())
        if result.error:
            return [f"Error: {result.error}"]

        chat_responses = []

        if result.thinking_process:
            chat_responses.append(
                gr.ChatMessage(
                    role="assistant",
                    content=result.thinking_process,
                    metadata={"title": "🧠 Thinking Process"},
                )
            )

        chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))

        if result.attachments:
            for attachment in result.attachments:
                image_data = attachment.serialized_image
                chat_responses.append(gr.Image(decode_base64_to_image(image_data)))

        return chat_responses
    except requests.exceptions.RequestException as e:
        return [f"Error connecting to backend service: {str(e)}"]


if __name__ == "__main__":
    demo = gr.ChatInterface(
        get_response_from_llm_backend,
        title="Personal Expense Assistant",
        description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
        type="messages",
        multimodal=True,
        textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
    )

    demo.launch(
        server_name="0.0.0.0",
        server_port=8080,
    )

Następnie możemy spróbować uruchomić usługę frontendu za pomocą tego polecenia. Pamiętaj, aby zmienić nazwę pliku main.py na frontend.py.

uv run frontend.py

W konsoli Google Cloud zobaczysz dane wyjściowe podobne do tych

* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.

Następnie możesz sprawdzić interfejs internetowy, klikając Ctrl+klik lokalny link URL. Możesz też uzyskać dostęp do aplikacji interfejsu, klikając w prawym górnym rogu Edytora Cloud przycisk Podgląd w przeglądarce i wybierając Podejrzyj na porcie 8080.

49cbdfdf77964065.jpeg

Zobaczysz interfejs internetowy, ale podczas próby przesłania czatu pojawi się oczekiwany błąd z powodu nieskonfigurowanej jeszcze usługi backendowej.

5caec77d95c35927.png

Teraz pozwól działać usłudze i nie zabijaj jej jeszcze. Usługa backendowa będzie działać na innej karcie terminala.

Objaśnienie kodu

W tym kodzie po stronie klienta najpierw umożliwiamy użytkownikowi wysyłanie tekstu i przesyłanie wielu plików. Gradio umożliwia tworzenie tego typu funkcji za pomocą metody gr.ChatInterface w połączeniu z elementem gr.MultimodalTextbox.

Zanim wyślemy plik i tekst do backendu, musimy określić typ MIME pliku, którego backend potrzebuje. Musimy też zakodować bajty pliku obrazu w formacie base64 i wysłać je razem z mimetype.

class ImageData(BaseModel):
    """Model for image data with hash identifier.

    Attributes:
        serialized_image: Optional Base64 encoded string of the image content.
        mime_type: MIME type of the image.
    """

    serialized_image: str
    mime_type: str

Schemat używany do interakcji front-endu z back-endem jest zdefiniowany w pliku schema.py. Używamy klasy Pydantic BaseModel, aby wymuszać walidację danych w schemacie.

Po otrzymaniu odpowiedzi od razu oddzielamy proces myślenia, ostateczną odpowiedź i załącznik. Dzięki temu możemy wyświetlać każdy komponent za pomocą komponentu UI.

class ChatResponse(BaseModel):
    """Model for a chat response.

    Attributes:
        response: The text response from the model.
        thinking_process: Optional thinking process of the model.
        attachments: List of image data to be displayed to the user.
        error: Optional error message if something went wrong.
    """

    response: str
    thinking_process: str = ""
    attachments: List[ImageData] = []
    error: Optional[str] = None

9. Tworzenie usługi backendu za pomocą FastAPI

Następnie musimy zbudować backend, który może zainicjować naszego agenta wraz z innymi komponentami, aby umożliwić jego uruchomienie.

Utwórz nowy plik, kliknij Plik->Nowy plik tekstowy, skopiuj i wklej podany niżej kod, a potem zapisz go jako backend.py.

from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
    extract_attachment_ids_and_sanitize_response,
    download_image_from_gcs,
    extract_thinking_process,
    format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings

SETTINGS = get_settings()
APP_NAME = "expense_manager_app"


# Application state to hold service contexts
class AppContexts(SimpleNamespace):
    """A class to hold application contexts with attribute access"""

    session_service: InMemorySessionService = None
    artifact_service: GcsArtifactService = None
    expense_manager_agent_runner: Runner = None


# Initialize application state
app_contexts = AppContexts()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary


# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
    return app_contexts


# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)


@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest = Body(...),
    app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
    """Process chat request and get response from the agent"""

    # Prepare the user's message in ADK format and store image artifacts
    content = await asyncio.to_thread(
        format_user_request_to_adk_content_and_store_artifacts,
        request=request,
        app_name=APP_NAME,
        artifact_service=app_context.artifact_service,
    )

    final_response_text = "Agent did not produce a final response."  # Default

    # Use the session ID from the request or default if not provided
    session_id = request.session_id
    user_id = request.user_id

    # Create session if it doesn't exist
    if not app_context.session_service.get_session(
        app_name=APP_NAME, user_id=user_id, session_id=session_id
    ):
        app_context.session_service.create_session(
            app_name=APP_NAME, user_id=user_id, session_id=session_id
        )

    try:
        # Process the message with the agent
        # Type annotation: runner.run_async returns an AsyncIterator[Event]
        events_iterator: AsyncIterator[Event] = (
            app_context.expense_manager_agent_runner.run_async(
                user_id=user_id, session_id=session_id, new_message=content
            )
        )
        async for event in events_iterator:  # event has type Event
            # Key Concept: is_final_response() marks the concluding message for the turn
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Extract text from the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate:
                    # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break  # Stop processing events once the final response is found

        logger.info(
            "Received final response from agent", raw_final_response=final_response_text
        )

        # Extract and process any attachments and thinking process in the response
        base64_attachments = []
        sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
            final_response_text
        )
        sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

        # Download images from GCS and replace hash IDs with base64 data
        for image_hash_id in attachment_ids:
            # Download image data and get MIME type
            result = await asyncio.to_thread(
                download_image_from_gcs,
                artifact_service=app_context.artifact_service,
                image_hash=image_hash_id,
                app_name=APP_NAME,
                user_id=user_id,
                session_id=session_id,
            )
            if result:
                base64_data, mime_type = result
                base64_attachments.append(
                    ImageData(serialized_image=base64_data, mime_type=mime_type)
                )

        logger.info(
            "Processed response with attachments",
            sanitized_response=sanitized_text,
            thinking_process=thinking_process,
            attachment_ids=attachment_ids,
        )

        return ChatResponse(
            response=sanitized_text,
            thinking_process=thinking_process,
            attachments=base64_attachments,
        )

    except Exception as e:
        logger.error("Error processing chat request", error_message=str(e))
        return ChatResponse(
            response="", error=f"Error in generating response: {str(e)}"
        )


# Only run the server if this file is executed directly
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081)

Następnie możemy spróbować uruchomić usługę backendową. Pamiętaj, że w poprzednim kroku uruchomiliśmy usługę frontendu. Teraz musimy otworzyć nowy terminal i spróbować uruchomić usługę backendu.

  1. Utwórz nowy terminal. Przejdź do terminala w dolnej części ekranu i znajdź przycisk „+”, aby utworzyć nowy terminal. Możesz też nacisnąć Ctrl + Shift + C, aby otworzyć nowy terminal.

3e52a362475553dc.jpeg

  1. Następnie sprawdź, czy jesteś w katalogu roboczym personal-expense-assistant, a potem uruchom to polecenie:
uv run backend.py
  1. Jeśli się uda, pojawi się taki wynik
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

Objaśnienie kodu

Inicjowanie ADK Agent, SessionService i ArtifactService

Aby uruchomić agenta w usłudze backendu, musimy utworzyć Runner, który przyjmuje zarówno SessionService, jak i naszego agenta. SessionService będzie zarządzać historią i stanem rozmowy, a dzięki integracji z Runnerem umożliwi naszemu agentowi otrzymywanie kontekstu bieżących rozmów.

Do obsługi przesłanego pliku używamy też usługi ArtifactService. Więcej informacji o sesjielementach w ADK znajdziesz tutaj.

...

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary

...

W tym pokazie używamy InMemorySessionService i GcsArtifactService do integracji z naszym agentem Runner. Historia konwersacji jest przechowywana w pamięci, więc zostanie utracona, gdy usługa backendowa zostanie zatrzymana lub ponownie uruchomiona. Inicjujemy je w cyklu życia aplikacji FastAPI, aby mogły zostać wstrzyknięte jako zależności na ścieżce /chat.

Przesyłanie i pobieranie obrazu za pomocą usługi GcsArtifactService

Wszystkie przesłane obrazy zostaną zapisane jako artefakty przez usługę GcsArtifactService. Możesz to sprawdzić w funkcji format_user_request_to_adk_content_and_store_artifacts w pliku utils.py.

...    

# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
    format_user_request_to_adk_content_and_store_artifacts,
    request=request,
    app_name=APP_NAME,
    artifact_service=app_context.artifact_service,
)

...

Wszystkie żądania, które będą przetwarzane przez agenta, muszą być sformatowane w formacie types.Content. W ramach funkcji przetwarzamy też dane każdego obrazu i wyodrębniamy jego identyfikator, który zastępujemy obiektem zastępczym identyfikatora obrazu.

Pliki załączników pobierane są za pomocą podobnego mechanizmu po wyodrębnieniu identyfikatorów obrazów za pomocą wyrażenia regularnego:

...
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
    final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
    # Download image data and get MIME type
    result = await asyncio.to_thread(
        download_image_from_gcs,
        artifact_service=app_context.artifact_service,
        image_hash=image_hash_id,
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id,
    )
...

10. Test integracji

W konsoli Google Cloud powinno być uruchomionych kilka usług na różnych kartach:

  • Usługa interfejsu użytkownika działa na porcie 8080
* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.
  • Usługa backendowa działa na porcie 8081
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

Obecnie możesz przesyłać obrazy paragonów i bezproblemowo rozmawiać z asystentem z aplikacji internetowej na porcie 8080.

W górnej części edytora Cloud Shell kliknij przycisk Podgląd w przeglądarce i wybierz Podejrzyj na porcie 8080.

e7c9f56c2463164.png

Teraz spróbujmy porozmawiać z Asystentem.

Pobierz te potwierdzenia. Te dane dotyczą okresu od 2023 r. do 2024 r. i prosząc o zapisanie lub przesłanie danych, poproś asystenta

  • Receipt Drive ( źródło: zbiory danych Hugging Facemousserlane/id_receipt_dataset)

Zapytaj o różne rzeczy

  • „Podaj zestawienie miesięcznych wydatków w latach 2023–2024”
  • „Pokaż mi paragon za kawę”
  • „Give me receipt file from Yakiniku Like”
  • itd.

Oto fragment interakcji, która zakończyła się sukcesem

f6ba4537438033b2.png

313a43d32b0901ef.png

11. Wdrażanie w Cloud Run

Oczywiście chcemy mieć dostęp do tej wspaniałej aplikacji z dowolnego miejsca. Aby to zrobić, możemy spakować aplikację i wdrożyć ją w Cloud Run. Na potrzeby tego demonstracyjnego scenariusza usługa będzie udostępniona jako usługa publiczna, do której mają dostęp inne osoby. Pamiętaj jednak, że w przypadku tego typu aplikacji nie jest to zalecana metoda, ponieważ lepiej nadaje się do zastosowań osobistych.

6795e9abf2030334.jpeg

W tym laboratorium programistycznym umieścimy w 1 kontenerze zarówno usługę frontendu, jak i backendu. Do zarządzania obiema usługami będziemy potrzebować pomocy supervisord. Możesz sprawdzić plik supervisord.conf i plik Dockerfile, aby zobaczyć, że jako punkt wejścia ustawiliśmy supervisord.

Mamy już wszystkie pliki potrzebne do wdrożenia aplikacji do Cloud Run, więc zróbmy to. Otwórz terminal Cloud Shell i sprawdź, czy bieżący projekt jest ustawiony jako aktywny. Jeśli nie, użyj polecenia gcloud configure, aby ustawić identyfikator projektu:

gcloud config set project [PROJECT_ID]

Następnie uruchom to polecenie, aby wdrożyć go w Cloud Run.

gcloud run deploy personal-expense-assistant \
                  --source . \
                  --port=8080 \
                  --allow-unauthenticated \
                  --env-vars-file=settings.yaml \
                  --memory 1024Mi \
                  --region us-central1

Jeśli pojawi się prośba o potwierdzenie utworzenia rejestru komponentów dla repozytorium Dockera, odpowiedz „Y” (tak). Ponieważ jest to aplikacja demonstracyjna, zezwalamy na dostęp bez uwierzytelniania. Zalecamy stosowanie odpowiedniego uwierzytelniania w przypadku aplikacji korporacyjnych i produkcyjnych.

Po zakończeniu wdrażania powinien pojawić się link podobny do tego:

https://personal-expense-assistant-*******.us-central1.run.app

Użyj aplikacji w oknie incognito lub na urządzeniu mobilnym. Powinien być już dostępny.

12. Wyzwanie

Teraz nadszedł czas, aby zabłysnąć i doskonalić swoje umiejętności eksploracji. Czy masz wszystko, czego potrzebujesz, aby zmienić kod, tak aby backend obsługiwał wielu użytkowników? Jakie komponenty wymagają aktualizacji?

13. Czyszczenie danych

Aby uniknąć obciążenia konta Google Cloud opłatami za zasoby wykorzystane w tym ćwiczeniu, wykonaj te czynności:

  1. W konsoli Google Cloud otwórz stronę Zarządzanie zasobami.
  2. Na liście projektów wybierz projekt do usunięcia, a potem kliknij Usuń.
  3. W oknie wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.
  4. Możesz też przejść w konsoli do Cloud Run, wybrać właśnie wdrożony zasób i usunąć go.