1. Wprowadzenie
Czy kiedykolwiek zdarzyło Ci się, że nie chciało Ci się zarządzać swoimi wydatkami osobistymi? Ja też! Dlatego w tym CodeLab stworzymy osobistego asystenta do zarządzania wydatkami, który będzie korzystał z Gemini 2.5, aby wykonywać za nas wszystkie prace. Od zarządzania przesłanymi paragonami po analizowanie, czy nie wydaliśmy już zbyt dużo na kawę.
Asystent będzie dostępny w przeglądarce internetowej w postaci interfejsu internetowego czatu, za pomocą którego możesz się z nim komunikować, przesyłać obrazy paragonów i prosić o ich przechowywanie. Możesz też wyszukać paragony, aby pobrać plik i przeprowadzić analizę wydatków. Wszystko to zostało zbudowane na podstawie ramy Google Agent Development Kit.
Sama aplikacja jest podzielona na 2 usługi: front-end i back-end. Dzięki temu możesz szybko utworzyć prototyp i sprawdzić, jak działa, a także zrozumieć, jak wygląda integracja obu usług w ramach interfejsu API.
W ramach tego ćwiczenia będziesz wykonywać czynności krok po kroku:
- Przygotuj projekt Google Cloud i włącz w nim wszystkie wymagane interfejsy API
- Konfigurowanie zasobnika w Google Cloud Storage i bazy danych w Firestore
- Tworzenie indeksowania Firestore
- Konfigurowanie obszaru roboczego na potrzeby środowiska programowania
- Struktura kodu źródłowego, narzędzi i promptów agenta ADK
- Testowanie agenta za pomocą lokalnego interfejsu programowania internetowego ADK
- Utwórz usługę frontendu – interfejs czatu za pomocą biblioteki Gradio, aby wysłać zapytanie i przesłać obrazy potwierdzenia.
- Utwórz usługę backendową – serwer HTTP za pomocą FastAPI, w którym znajduje się kod agenta ADK, usługa SessionService i usługa Artifact Service.
- Zarządzanie zmiennymi środowiskowymi i plikami konfiguracyjnymi wymaganymi do wdrożenia aplikacji w Cloud Run
- Wdrażanie aplikacji w Cloud Run
Omówienie architektury
Wymagania wstępne
- swobodnie pracować z Pythonem,
- podstawowa architektura pełnego zestawu usług korzystająca z usługi HTTP;
Czego się nauczysz
- Tworzenie prototypów front-endu witryny za pomocą Gradio
- tworzenie usług backendu za pomocą FastAPI i Pydantic;
- projektowanie agenta ADK z wykorzystaniem jego kilku funkcji;
- Korzystanie z narzędzia
- Zarządzanie sesjami i artefaktami
- Wykorzystanie funkcji wywołania zwrotnego do modyfikacji danych wejściowych przed wysłaniem ich do Gemini
- Korzystanie z BuiltInPlanner do ulepszania wykonywania zadań przez planowanie
- Szybkie debugowanie za pomocą lokalnego interfejsu internetowego ADK
- Strategia optymalizacji interakcji multimodalnej poprzez analizowanie i pobieranie informacji za pomocą promptów oraz modyfikowanie żądań Gemini za pomocą funkcji ADK callback
- Generowanie rozszerzone przez wyszukiwanie w zapisanych informacjach z użyciem Firestore jako bazy danych wektorów
- Zarządzanie zmiennymi środowiskowymi w pliku YAML za pomocą Pydantic-settings
- Wdrażanie aplikacji w Cloud Run za pomocą pliku Dockerfile i podawanie zmiennych środowiskowych w pliku YAML
Czego potrzebujesz
- przeglądarka Chrome,
- konto Gmail,
- projekt w chmurze z włączonymi płatnościami,
Ten warsztat programistyczny przeznaczony dla deweloperów na wszystkich poziomach zaawansowania (w tym początkujących) używa Pythona w próbnej aplikacji. Jednak znajomość Pythona nie jest wymagana do zrozumienia omawianych zagadnień.
2. Zanim zaczniesz
Wybieranie aktywnego projektu w konsoli Cloud
W tym laboratorium programistycznym zakładamy, że masz już projekt Google Cloud z włączonymi płatnościami. Jeśli jeszcze go nie masz, możesz zacząć od wykonania tych instrukcji.
- W konsoli Google Cloud na stronie selektora projektu wybierz lub utwórz projekt Google Cloud.
- Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.
Przygotowanie bazy danych Firestore
Następnie musimy utworzyć bazę danych Firestore. Firestore w trybie natywnym to baza danych dokumentów NoSQL zaprojektowana pod kątem automatycznego skalowania, wysokiej wydajności i łatwego tworzenia aplikacji. Może też pełnić funkcję bazy danych wektorów, która może obsługiwać technikę generowania rozszerzonego wyszukiwania w naszej pracowni.
- Na pasku wyszukiwania wpisz „firestore” i kliknij usługę Firestore.
- Następnie kliknij przycisk Utwórz bazę danych Firestore.
- Jako nazwy identyfikatora bazy danych użyj wartości (domyślnie) i pozostaw wybraną opcję Wersja standardowa. Na potrzeby tego ćwiczenia użyj Firestore Native z otwartymi regułami zabezpieczeń.
- Zauważysz też, że ta baza danych ma poziom bezpłatny YEAY! Następnie kliknij przycisk Utwórz bazę danych.
Po wykonaniu tych czynności powinieneś zostać przekierowany do utworzonej właśnie bazy danych Firestore.
Konfigurowanie projektu Cloud w terminalu Cloud Shell
- Użyjesz Cloud Shell, czyli środowiska wiersza poleceń działającego w Google Cloud, które jest wstępnie załadowane w bq. Kliknij Aktywuj Cloud Shell u góry konsoli Google Cloud.
- Po połączeniu z Cloud Shell sprawdź, czy jesteś już uwierzytelniony i czy projekt jest ustawiony na identyfikator Twojego projektu, używając tego polecenia:
gcloud auth list
- Aby sprawdzić, czy polecenie gcloud zna Twój projekt, uruchom w Cloud Shell to polecenie:
gcloud config list project
- Jeśli projekt nie jest ustawiony, użyj tego polecenia:
gcloud config set project <YOUR_PROJECT_ID>
Identyfikator PROJECT_ID
możesz też zobaczyć w konsoli
Kliknij go, aby wyświetlić po prawej stronie informacje o projekcie i jego identyfikator.
- Włącz wymagane interfejsy API za pomocą polecenia pokazanego poniżej. Może to potrwać kilka minut, więc zachowaj cierpliwość.
gcloud services enable aiplatform.googleapis.com \
firestore.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
cloudresourcemanager.googleapis.com
Po pomyślnym wykonaniu polecenia powinien wyświetlić się komunikat podobny do tego:
Operation "operations/..." finished successfully.
Alternatywą dla polecenia gcloud jest konsola, w której możesz wyszukać poszczególne usługi lub skorzystać z tego linku.
Jeśli pominiesz któryś interfejs API, zawsze możesz go włączyć w trakcie implementacji.
Informacje o poleceniach i użytkowaniu gcloud znajdziesz w dokumentacji.
Przygotowanie zasobnika Google Cloud Storage
Następnie w tym samym terminalu musimy przygotować zasobnik GCS do przechowywania przesłanego pliku. Aby utworzyć zasobnik, uruchom to polecenie:
gsutil mb -l us-central1 gs://personal-expense-assistant-receipts
Wyświetli się taki wynik
Creating gs://personal-expense-assistant-receipts/...
Aby to sprawdzić, otwórz menu nawigacyjne w lewym górnym rogu przeglądarki i wybierz Cloud Storage -> Bucket.
Tworzenie indeksu Firestore na potrzeby wyszukiwania
Firestore to baza danych NoSQL, która zapewnia wysoką wydajność i elastyczność w modelu danych, ale ma ograniczenia w przypadku złożonych zapytań. Planujemy użycie złożonych zapytań wielopolowych i wyszukiwania wektorów, więc musimy najpierw utworzyć indeks. Więcej informacji znajdziesz w tej dokumentacji.
- Aby utworzyć indeks obsługujący złożone zapytania, uruchom to polecenie:
gcloud firestore indexes composite create \
--collection-group=personal-expense-assistant-receipts \
--field-config field-path=total_amount,order=ASCENDING \
--field-config field-path=transaction_time,order=ASCENDING \
--field-config field-path=__name__,order=ASCENDING \
--database="(default)"
- Uruchom to, aby uzyskać wsparcie wyszukiwania wektorów
gcloud firestore indexes composite create \
--collection-group="personal-expense-assistant-receipts" \
--query-scope=COLLECTION \
--field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
--database="(default)"
Aby sprawdzić utworzony indeks, otwórz Firestore w konsoli Google Cloud, kliknij instancję bazy danych (domyślna) i na pasku nawigacyjnym wybierz Indeksy.
Otwórz edytor Cloud Shell i skonfiguruj katalog roboczy aplikacji
Teraz możemy skonfigurować edytor kodu, aby wykonać pewne czynności związane z kodowaniem. Do tego celu użyjemy edytora Cloud Shell.
- Kliknij przycisk Otwórz edytor, aby otworzyć edytor Cloud Shell, w którym możesz napisać kod
- Sprawdź, czy w lewym dolnym rogu (pasek stanu) edytora Cloud Shell ustawiony jest projekt Cloud Code (jak na obrazku poniżej) i czy jest to aktywny projekt Google Cloud, w którym masz włączone płatności. Jeśli pojawi się taka prośba, autoryzuj. Jeśli wykonasz poprzednie polecenie, przycisk może też wskazywać bezpośrednio aktywny projekt zamiast przycisku logowania.
- Następnie skopiuj katalog roboczy szablonu tego laboratorium kodu z GitHuba, wykonując to polecenie. Utworzy katalog roboczy w katalogu personal-expense-assistant.
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
- Następnie przejdź do górnej sekcji edytora Cloud Shell i kliknij Plik->Otwórz folder, odszukaj katalog username i katalog personal-expense-assistant,a następnie kliknij przycisk OK. Spowoduje to ustawienie wybranego katalogu jako głównego katalogu roboczego. W tym przykładzie nazwa użytkownika to alvinprayuda, dlatego ścieżka katalogu jest wyświetlana poniżej.
Twój edytor Cloud Shell powinien teraz wyglądać tak
Konfiguracja środowiska
Przygotowanie wirtualnego środowiska Pythona
Kolejnym krokiem jest przygotowanie środowiska programistycznego. Aktualnie aktywny terminal powinien znajdować się w katalogu roboczym personal-expense-assistant. W tym laboratorium kodu użyjemy Pythona 3.12 i narzędzia uv python project manager, aby uprościć tworzenie wersji Pythona i zarządzanie nią oraz środowiskiem wirtualnym.
- Jeśli terminal nie jest jeszcze otwarty, otwórz go, klikając Terminal > Nowy terminal, lub naciśnij Ctrl + Shift + C, aby otworzyć okno terminala w dolnej części przeglądarki.
- Pobierz
uv
i zainstaluj Pythona 3.12 za pomocą tego polecenia
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
- Teraz zainicjuj środowisko wirtualne za pomocą
uv
, uruchom to polecenie
uv sync --frozen
Spowoduje to utworzenie katalogu .venv i zainstalowanie zależności. Szybki rzut oka na plik pyproject.toml zawiera informacje o zależnościach, które wyglądają tak:
dependencies = [ "datasets>=3.5.0", "google-adk>=0.2.0", "google-cloud-firestore>=2.20.1", "gradio>=5.23.1", "pydantic>=2.10.6", "pydantic-settings[yaml]>=2.8.1", ]
- Aby przetestować wirtualne środowisko, utwórz nowy plik main.py i skopiuj ten kod
def main():
print("Hello from personal-expense-assistant-adk!")
if __name__ == "__main__":
main()
- Następnie uruchom to polecenie:
uv run main.py
Dane wyjściowe będą wyglądać tak:
Using CPython 3.12 Creating virtual environment at: .venv Hello from personal-expense-assistant-adk!
To pokazuje, że projekt Pythona jest prawidłowo skonfigurowany.
Konfigurowanie plików konfiguracji
Teraz musimy skonfigurować pliki konfiguracji dla tego projektu. Do odczytu konfiguracji z pliku YAML używamy pakietu pydantic-settings.
Utwórz plik o nazwie settings.yaml z taką konfiguracją: Kliknij Plik > Nowy plik tekstowy i wpisz ten kod. Następnie zapisz plik jako settings.yaml.
GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"
W tym laboratorium kodu używamy wstępnie skonfigurowanych wartości GCLOUD_LOCATION
,
BACKEND_URL
,
STORAGE_BUCKET_NAME
,
DB_COLLECTION_NAME
i BACKEND_URL
.
Teraz możemy przejść do następnego kroku, czyli tworzenia agenta, a potem usług.
3. Tworzenie agenta za pomocą Google ADK i Gemini 2.5
Wprowadzenie do struktury katalogu ADK
Zacznijmy od zapoznania się z możliwościami ADK i sposobem tworzenia agenta. Pełną dokumentację ADK znajdziesz pod tym adresem URL . ADK udostępnia wiele narzędzi w ramach wykonywania poleceń w interfejsie wiersza poleceń. Oto niektóre z nich :
- Konfigurowanie struktury katalogu agentów
- Szybkie testowanie interakcji za pomocą interfejsu wiersza poleceń
- Szybkie konfigurowanie lokalnego interfejsu programistycznego
Teraz utwórz strukturę katalogu agenta za pomocą polecenia wiersza poleceń. Uruchom to polecenie:
uv run adk create expense_manager_agent \
--model gemini-2.5-flash-preview-04-17 \
--project {your-project-id} \
--region us-central1
Utworzy ona następującą strukturę katalogu agenta:
expense_manager_agent/ ├── __init__.py ├── .env ├── agent.py
Jeśli sprawdzisz pliki init.py i agent.py, zobaczysz ten kod:
# __init__.py
from . import agent
# agent.py
from google.adk.agents import Agent
root_agent = Agent(
model='gemini-2.5-flash-preview-04-17',
name='root_agent',
description='A helpful assistant for user questions.',
instruction='Answer user questions to the best of your knowledge',
)
Tworzenie agenta dotyczącego zarządzania wydatkami
Zbudujmy agenta ds. zarządzania wydatkami. Otwórz plik expense_manager_agent/agent.py i skopiuj znajdujący się w nim kod root_agent.
# expense_manager_agent/agent.py
from google.adk.agents import Agent
from expense_manager_agent.tools import (
store_receipt_data,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types
SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
task_prompt = file.read()
root_agent = Agent(
name="expense_manager_agent",
model="gemini-2.5-flash-preview-04-17",
description=(
"Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
),
instruction=task_prompt,
tools=[
store_receipt_data,
get_receipt_data_by_image_id,
search_receipts_by_metadata_filter,
search_relevant_receipts_by_natural_language_query,
],
planner=BuiltInPlanner(
thinking_config=types.ThinkingConfig(
thinking_budget=2048,
)
),
before_model_callback=modify_image_data_in_history,
)
Objaśnienie kodu
Ten skrypt zawiera inicjowanie agenta, w ramach którego inicjujemy te elementy:
- Ustaw model, którego chcesz używać, na
gemini-2.5-flash-preview-04-17
- Skonfiguruj opis i instrukcje agenta jako prompt systemowy, który jest odczytywany z
task_prompt.md
- udostępnić niezbędne narzędzia do obsługi funkcji agenta;
- Umożliwienie planowania przed wygenerowaniem ostatecznej odpowiedzi lub wykonaniem za pomocą funkcji myślenia Gemini 2.5 Flash
- Przed wysłaniem żądania do Gemini skonfiguruj przechwytywanie wywołania zwrotnego, aby ograniczyć liczbę danych obrazu wysyłanych przed wykonaniem prognozy.
4. Konfigurowanie narzędzi agenta
Nasz menedżer wydatków będzie mógł:
- Wyodrębnij dane z obrazu rachunku i zapisz je wraz z pliku
- Dokładne wyszukiwanie w danych o wydatkach
- wyszukiwanie kontekstowe danych o wydatkach;
Dlatego potrzebujemy odpowiednich narzędzi do obsługi tej funkcji. Utwórz nowy plik w katalogu expense_manager_agent, nadaj mu nazwę tools.py i skopiuj kod poniżej.
# expense_manager_agent/tools.py
import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai
SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
project=SETTINGS.GCLOUD_PROJECT_ID
) # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""
def sanitize_image_id(image_id: str) -> str:
"""Sanitize image ID by removing any leading/trailing whitespace."""
if image_id.startswith("[IMAGE-"):
image_id = image_id.split("ID ")[1].split("]")[0]
return image_id.strip()
def store_receipt_data(
image_id: str,
store_name: str,
transaction_time: str,
total_amount: float,
purchased_items: List[Dict[str, Any]],
currency: str = "IDR",
) -> str:
"""
Store receipt data in the database.
Args:
image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
the ID of the image is 12345.
store_name (str): The name of the store.
transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
total_amount (float): The total amount spent.
purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
- name (str): The name of the item.
- price (float): The price of the item.
- quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
currency (str, optional): The currency of the transaction, can be derived from the store location.
If unsure, default is "IDR".
Returns:
str: A success message with the receipt ID.
Raises:
Exception: If the operation failed or input is invalid.
"""
try:
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Check if the receipt already exists
doc = get_receipt_data_by_image_id(image_id)
if doc:
return f"Receipt with ID {image_id} already exists"
# Validate transaction time
if not isinstance(transaction_time, str):
raise ValueError(
"Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
try:
datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError(
"Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
)
# Validate items format
if not isinstance(purchased_items, list):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
for _item in purchased_items:
if (
not isinstance(_item, dict)
or "name" not in _item
or "price" not in _item
):
raise ValueError(INVALID_ITEMS_FORMAT_ERR)
if "quantity" not in _item:
_item["quantity"] = 1
# Create a combined text from all receipt information for better embedding
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004",
contents=RECEIPT_DESC_FORMAT.format(
store_name=store_name,
transaction_time=transaction_time,
total_amount=total_amount,
currency=currency,
purchased_items=purchased_items,
receipt_id=image_id,
),
)
embedding = result.embeddings[0].values
doc = {
"receipt_id": image_id,
"store_name": store_name,
"transaction_time": transaction_time,
"total_amount": total_amount,
"currency": currency,
"purchased_items": purchased_items,
EMBEDDING_FIELD_NAME: Vector(embedding),
}
COLLECTION.add(doc)
return f"Receipt stored successfully with ID: {image_id}"
except Exception as e:
raise Exception(f"Failed to store receipt: {str(e)}")
def search_receipts_by_metadata_filter(
start_time: str,
end_time: str,
min_total_amount: float = -1.0,
max_total_amount: float = -1.0,
) -> str:
"""
Filter receipts by metadata within a specific time range and optionally by amount.
Args:
start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.
Returns:
str: A string containing the list of receipt data matching all applied filters.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Validate start and end times
if not isinstance(start_time, str) or not isinstance(end_time, str):
raise ValueError("start_time and end_time must be strings in ISO format")
try:
datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
except ValueError:
raise ValueError("start_time and end_time must be strings in ISO format")
# Start with the base collection reference
query = COLLECTION
# Build the composite query by properly chaining conditions
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
filters = [
FieldFilter("transaction_time", ">=", start_time),
FieldFilter("transaction_time", "<=", end_time),
]
# Add optional filters
if min_total_amount != -1:
filters.append(FieldFilter("total_amount", ">=", min_total_amount))
if max_total_amount != -1:
filters.append(FieldFilter("total_amount", "<=", max_total_amount))
# Apply the filters
composite_filter = And(filters=filters)
query = query.where(filter=composite_filter)
# Execute the query and collect results
search_result_description = "Search by Metadata Results:\n"
for doc in query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error filtering receipts: {str(e)}")
def search_relevant_receipts_by_natural_language_query(
query_text: str, limit: int = 5
) -> str:
"""
Search for receipts with content most similar to the query using vector search.
This tool can be use for user query that is difficult to translate into metadata filters.
Such as store name or item name which sensitive to string matching.
Use this tool if you cannot utilize the search by metadata filter tool.
Args:
query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
limit (int, optional): Maximum number of results to return (default: 5).
Returns:
str: A string containing the list of contextually relevant receipt data.
Raises:
Exception: If the search failed or input is invalid.
"""
try:
# Generate embedding for the query text
result = GENAI_CLIENT.models.embed_content(
model="text-embedding-004", contents=query_text
)
query_embedding = result.embeddings[0].values
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
vector_query = COLLECTION.find_nearest(
vector_field=EMBEDDING_FIELD_NAME,
query_vector=Vector(query_embedding),
distance_measure=DistanceMeasure.EUCLIDEAN,
limit=limit,
)
# Execute the query and collect results
search_result_description = "Search by Contextual Relevance Results:\n"
for doc in vector_query.stream():
data = doc.to_dict()
data.pop(
EMBEDDING_FIELD_NAME, None
) # Remove embedding as it's not needed for display
search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"
return search_result_description
except Exception as e:
raise Exception(f"Error searching receipts: {str(e)}")
def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
"""
Retrieve receipt data from the database using the image_id.
Args:
image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
[IMAGE-ID 12345], the ID to use is 12345.
Returns:
Dict[str, Any]: A dictionary containing the receipt data with the following keys:
- receipt_id (str): The unique identifier of the receipt image.
- store_name (str): The name of the store.
- transaction_time (str): The time of purchase in UTC.
- total_amount (float): The total amount spent.
- currency (str): The currency of the transaction.
- purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
Returns an empty dictionary if no receipt is found.
"""
# In case of it provide full image placeholder, extract the id string
image_id = sanitize_image_id(image_id)
# Query the receipts collection for documents with matching receipt_id (image_id)
# Notes that this demo assume 1 user only,
# need to refactor the query for multiple user
query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
docs = list(query.stream())
if not docs:
return {}
# Get the first matching document
doc_data = docs[0].to_dict()
doc_data.pop(EMBEDDING_FIELD_NAME, None)
return doc_data
Objaśnienie kodu
W ramach tej implementacji narzędzi projektujemy je z uwzględnieniem tych 2 głównych kwestii:
- Przeanalizuj dane paragonu i zmapuj je do pliku źródłowego za pomocą wskaźnika ciągu identyfikatora obrazu
[IMAGE-ID <hash-of-image-1>]
- Przechowywanie i pobieranie danych za pomocą bazy danych Firestore
Narzędzie „store_receipt_data”
To narzędzie to narzędzie do rozpoznawania znaków optycznego, które przeanalizuje wymagane informacje z danych obrazu, rozpozna ciąg znaków identyfikatora obrazu i połączy je, aby zapisać w bazie danych Firestore.
Dodatkowo to narzędzie zamienia zawartość potwierdzenia na kodowanie za pomocą funkcji text-embedding-004
, dzięki czemu wszystkie metadane i kodowanie są przechowywane i indeksowane razem. Umożliwienie elastycznego pobierania za pomocą zapytania lub wyszukiwania kontekstowego.
Po uruchomieniu tego narzędzia zobaczysz, że dane paragonu zostały już zindeksowane w bazie danych Firestore, jak pokazano poniżej.
Narzędzie „search_receipts_by_metadata_filter”
To narzędzie przekształca zapytanie użytkownika w filtr zapytania o metadane, który obsługuje wyszukiwanie według zakresu dat lub łącznej wartości transakcji. Zwróci wszystkie dopasowane dane paragonu, w których procesie pominiemy pole umieszczania, ponieważ nie jest ono potrzebne do zrozumienia kontekstu.
Narzędzie „search_relevant_receipts_by_natural_language_query”
To jest nasze narzędzie do generowania wspomaganego przez wyszukiwanie (RAG). Nasz agent może tworzyć własne zapytania, aby pobierać odpowiednie potwierdzenia z bazy danych wektorów. Może też decydować, kiedy korzystać z tego narzędzia. Możliwość podjęcia przez agenta samodzielnej decyzji, czy chce korzystać z tego narzędzia RAG, i utworzenie przez niego własnego zapytania jest jednym z definicji podejścia agentycznego RAG.
Pozwalamy mu nie tylko na tworzenie własnych zapytań, ale też na wybór liczby dokumentów, które chce pobrać. W połączeniu z odpowiednim opracowaniem promptów, np.
# Example prompt Always filter the result from tool search_relevant_receipts_by_natural_language_query as the returned result may contain irrelevant information
Dzięki temu to narzędzie stanie się potężnym narzędziem, które może wyszukiwać prawie wszystko, ale może nie zwracać wszystkich oczekiwanych wyników z powodu nieprecyzyjnego charakteru wyszukiwania najbliższego sąsiada.
5. Modyfikowanie kontekstu rozmowy za pomocą wywołań zwrotnych
Google ADK umożliwia nam „przechwytywanie” działania skryptu na różnych poziomach. Więcej informacji o tej funkcji znajdziesz w tej dokumentacji . W tym module użyjemy funkcji before_model_callback
, aby zmodyfikować żądanie przed wysłaniem do modelu LLM, aby usunąć dane obrazu z kontekstu starej historii rozmów ( zawierać tylko dane obrazu z ostatnich 3 interakcji użytkownika) w celu zwiększenia wydajności.
Chcemy jednak, aby w razie potrzeby agent miał dostęp do kontekstu danych obrazu. Dlatego dodajemy mechanizm, który po każdym bajcie danych obrazu w rozmowie dodaje ciąg znaków identyfikatora obrazu. Pomoże to pracownikowi obsługi klienta powiązać identyfikator obrazu z rzeczywistymi danymi pliku, które można wykorzystać zarówno podczas przechowywania, jak i pobierania obrazu. Struktura będzie wyglądać tak
<image-byte-data-1> [IMAGE-ID <hash-of-image-1>] <image-byte-data-2> [IMAGE-ID <hash-of-image-2>] And so on..
Gdy bajty staną się nieaktualne w historii rozmowy, ciąg znaków nadal będzie dostępny, aby umożliwić dostęp do danych za pomocą narzędzia. Przykład struktury historii po usunięciu danych obrazu
[IMAGE-ID <hash-of-image-1>] [IMAGE-ID <hash-of-image-2>] And so on..
Rozpocznij Utwórz nowy plik w katalogu expense_manager_agent o nazwie callbacks.py i skopiuj kod poniżej.
# expense_manager_agent/callbacks.py
import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
def modify_image_data_in_history(
callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
# The following code will modify the request sent to LLM
# We will only keep image data in the last 3 user messages using a reverse and counter approach
# Count how many user messages we've processed
user_message_count = 0
# Process the reversed list
for content in reversed(llm_request.contents):
# Only count for user manual query, not function call
if (content.role == "user") and (content.parts[0].function_response is None):
user_message_count += 1
modified_content_parts = []
# Check any missing image ID placeholder for any image data
# Then remove image data from conversation history if more than 3 user messages
for idx, part in enumerate(content.parts):
if part.inline_data is None:
modified_content_parts.append(part)
continue
if (
(idx + 1 >= len(content.parts))
or (content.parts[idx + 1].text is None)
or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
):
# Generate hash ID for the image and add a placeholder
image_data = part.inline_data.data
hasher = hashlib.sha256(image_data)
image_hash_id = hasher.hexdigest()[:12]
placeholder = f"[IMAGE-ID {image_hash_id}]"
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
modified_content_parts.append(types.Part(text=placeholder))
else:
# Only keep image data in the last 3 user messages
if user_message_count <= 3:
modified_content_parts.append(part)
# This will modify the contents inside the llm_request
content.parts = modified_content_parts
6. Prompt
Projektowanie agenta z zaawansowanymi interakcjami i możliwościami wymaga znalezienia odpowiedniego promptu, który pomoże mu działać tak, jak tego chcemy.
Wcześniej mieliśmy mechanizm obsługi danych obrazu w historii rozmowy, a także narzędzia, których używanie nie było proste, np. search_relevant_receipts_by_natural_language_query.
Chcemy też, aby pracownik obsługi klienta mógł wyszukać i przesłać nam prawidłowy obraz rachunku. Oznacza to, że musimy prawidłowo przekazać wszystkie te informacje w odpowiedniej strukturze prompta.
Poprosimy agenta o uporządkowanie danych wyjściowych w format Markdown, aby umożliwić analizę procesu myślowego, ostatecznej odpowiedzi i załączników ( jeśli występują).
# THINKING PROCESS Thinking process here # FINAL RESPONSE Response to the user here Attachments put inside json block { "attachments": [ "[IMAGE-ID <hash-id-1>]", "[IMAGE-ID <hash-id-2>]", ... ] }
Zacznijmy od tego prompta, aby osiągnąć nasze początkowe oczekiwania dotyczące zachowania agenta menedżera wydatków. Plik task_prompt.md powinien już istnieć w naszym obecnym katalogu roboczym, ale musimy przenieść go do katalogu expense_manager_agent. Aby przenieść plik, uruchom to polecenie:
mv task_prompt.md expense_manager_agent/task_prompt.md
7. Testowanie agenta
Spróbuj teraz komunikować się z agentem za pomocą interfejsu wiersza poleceń. Aby to zrobić, uruchom to polecenie:
uv run adk run expense_manager_agent
Wyświetli się okno, w którym możesz prowadzić rozmowę z obsługą klienta, ale możesz wysyłać tylko tekst.
Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log To access latest log: tail -F /tmp/agents_log/agent.latest.log Running agent root_agent, type exit to exit. user: hello [root_agent]: Hello there! How can I help you today? user:
Oprócz interakcji z CLI ADK umożliwia też korzystanie z interfejsu programistycznego, który pozwala na interakcję i sprawdzanie, co dzieje się podczas interakcji. Aby uruchomić serwer interfejsu użytkownika w ramach lokalnego środowiska programistycznego, uruchom to polecenie:
uv run adk web --port 8080
Wyświetli się komunikat podobny do tego, co oznacza, że mamy już dostęp do interfejsu internetowego.
INFO: Started server process [xxxx] INFO: Waiting for application startup. +-----------------------------------------------------------------------------+ | ADK Web Server started | | | | For local testing, access at http://localhost:8080. | +-----------------------------------------------------------------------------+ INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
Aby to sprawdzić, w górnej części edytora Cloud Shell kliknij przycisk Podgląd w przeglądarce i wybierz Podejrzyj na porcie 8080.
Zobaczysz stronę internetową, na której możesz wybrać dostępnych agentów w menu u góry po lewej stronie ( w naszym przypadku jest to expense_manager_agent) i rozmawiać z botem. W oknie po lewej stronie znajdziesz wiele informacji o szczegółach logowania podczas działania agenta.
Spróbujmy wykonać kilka czynności. Prześlij te 2 przykładowe paragony ( źródło : zbiory danych Hugging Face mousserlane/id_receipt_dataset
) . Kliknij każdy obraz prawym przyciskiem myszy i wybierz Zapisz obraz jako. ( spowoduje to pobranie obrazu rachunku), a następnie prześlij plik do bota, klikając ikonę „spinacza” i podając, że chcesz zapisać te rachunki.
Następnie spróbuj wykonać te zapytania, aby wyszukać lub pobrać plik.
- „Podaj podział wydatków i ich łączną kwotę w 2023 r.”
- „Give me receipt file from Indomaret”
Gdy używasz niektórych narzędzi, możesz sprawdzić, co dzieje się w interfejsie programisty.
Sprawdź, jak agent reaguje na Twoje działania, i sprawdź, czy jest zgodny ze wszystkimi regułami podanymi w promptach w pliku task_prompt.py. Gratulacje! Teraz masz działającego agenta rozwoju.
Teraz czas uzupełnić formularz, korzystając z odpowiedniego i ładnego interfejsu oraz możliwości przesyłania i pobierania plików obrazów.
8. Tworzenie usługi frontendu za pomocą Gradio
Utworzymy interfejs internetowy czatu, który będzie wyglądał tak
Zawiera on interfejs czatu z polem do wprowadzania tekstu, w którym użytkownicy mogą wysyłać tekst i przesyłać pliki z obrazami rachunków.
Usługę front-end zbudujemy za pomocą Gradio.
Utwórz nowy plik. Kliknij Plik > Nowy plik tekstowy i nazwij go frontend.py, a następnie skopiuj i zapisz podany niżej kod.
import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse
SETTINGS = get_settings()
def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
"""Encode a file to base64 string and get MIME type.
Reads an image file and returns the base64-encoded image data and its MIME type.
Args:
image_path: Path to the image file to encode.
Returns:
ImageData object containing the base64 encoded image data and its MIME type.
"""
# Read the image file
with open(image_path, "rb") as file:
image_content = file.read()
# Get the mime type
mime_type = mimetypes.guess_type(image_path)[0]
# Base64 encode the image
base64_data = base64.b64encode(image_content).decode("utf-8")
# Return as ImageData object
return ImageData(serialized_image=base64_data, mime_type=mime_type)
def decode_base64_to_image(base64_data: str) -> Image.Image:
"""Decode a base64 string to PIL Image.
Converts a base64-encoded image string back to a PIL Image object
that can be displayed or processed further.
Args:
base64_data: Base64 encoded string of the image.
Returns:
PIL Image object of the decoded image.
"""
# Decode the base64 string and convert to PIL Image
image_data = base64.b64decode(base64_data)
image_buffer = io.BytesIO(image_data)
image = Image.open(image_buffer)
return image
def get_response_from_llm_backend(
message: Dict[str, Any],
history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
"""Send the message and history to the backend and get a response.
Args:
message: Dictionary containing the current message with 'text' and optional 'files' keys.
history: List of previous message dictionaries in the conversation.
Returns:
List containing text response and any image attachments from the backend service.
"""
# Extract files and convert to base64
image_data = []
if uploaded_files := message.get("files", []):
for file_path in uploaded_files:
image_data.append(encode_image_to_base64_and_get_mime_type(file_path))
# Prepare the request payload
payload = ChatRequest(
text=message["text"],
files=image_data,
session_id="default_session",
user_id="default_user",
)
# Send request to backend
try:
response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
response.raise_for_status() # Raise exception for HTTP errors
result = ChatResponse(**response.json())
if result.error:
return [f"Error: {result.error}"]
chat_responses = []
if result.thinking_process:
chat_responses.append(
gr.ChatMessage(
role="assistant",
content=result.thinking_process,
metadata={"title": "🧠 Thinking Process"},
)
)
chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))
if result.attachments:
for attachment in result.attachments:
image_data = attachment.serialized_image
chat_responses.append(gr.Image(decode_base64_to_image(image_data)))
return chat_responses
except requests.exceptions.RequestException as e:
return [f"Error connecting to backend service: {str(e)}"]
if __name__ == "__main__":
demo = gr.ChatInterface(
get_response_from_llm_backend,
title="Personal Expense Assistant",
description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
type="messages",
multimodal=True,
textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
)
demo.launch(
server_name="0.0.0.0",
server_port=8080,
)
Następnie możemy spróbować uruchomić usługę frontendu za pomocą tego polecenia. Pamiętaj, aby zmienić nazwę pliku main.py na frontend.py.
uv run frontend.py
W konsoli Google Cloud zobaczysz dane wyjściowe podobne do tych
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
Następnie możesz sprawdzić interfejs internetowy, klikając Ctrl+klik lokalny link URL. Możesz też uzyskać dostęp do aplikacji interfejsu, klikając w prawym górnym rogu Edytora Cloud przycisk Podgląd w przeglądarce i wybierając Podejrzyj na porcie 8080.
Zobaczysz interfejs internetowy, ale podczas próby przesłania czatu pojawi się oczekiwany błąd z powodu nieskonfigurowanej jeszcze usługi backendowej.
Teraz pozwól działać usłudze i nie zabijaj jej jeszcze. Usługa backendowa będzie działać na innej karcie terminala.
Objaśnienie kodu
W tym kodzie po stronie klienta najpierw umożliwiamy użytkownikowi wysyłanie tekstu i przesyłanie wielu plików. Gradio umożliwia tworzenie tego typu funkcji za pomocą metody gr.ChatInterface w połączeniu z elementem gr.MultimodalTextbox.
Zanim wyślemy plik i tekst do backendu, musimy określić typ MIME pliku, którego backend potrzebuje. Musimy też zakodować bajty pliku obrazu w formacie base64 i wysłać je razem z mimetype.
class ImageData(BaseModel): """Model for image data with hash identifier. Attributes: serialized_image: Optional Base64 encoded string of the image content. mime_type: MIME type of the image. """ serialized_image: str mime_type: str
Schemat używany do interakcji front-endu z back-endem jest zdefiniowany w pliku schema.py. Używamy klasy Pydantic BaseModel, aby wymuszać walidację danych w schemacie.
Po otrzymaniu odpowiedzi od razu oddzielamy proces myślenia, ostateczną odpowiedź i załącznik. Dzięki temu możemy wyświetlać każdy komponent za pomocą komponentu UI.
class ChatResponse(BaseModel): """Model for a chat response. Attributes: response: The text response from the model. thinking_process: Optional thinking process of the model. attachments: List of image data to be displayed to the user. error: Optional error message if something went wrong. """ response: str thinking_process: str = "" attachments: List[ImageData] = [] error: Optional[str] = None
9. Tworzenie usługi backendu za pomocą FastAPI
Następnie musimy zbudować backend, który może zainicjować naszego agenta wraz z innymi komponentami, aby umożliwić jego uruchomienie.
Utwórz nowy plik, kliknij Plik->Nowy plik tekstowy, skopiuj i wklej podany niżej kod, a potem zapisz go jako backend.py.
from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
extract_attachment_ids_and_sanitize_response,
download_image_from_gcs,
extract_thinking_process,
format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings
SETTINGS = get_settings()
APP_NAME = "expense_manager_app"
# Application state to hold service contexts
class AppContexts(SimpleNamespace):
"""A class to hold application contexts with attribute access"""
session_service: InMemorySessionService = None
artifact_service: GcsArtifactService = None
expense_manager_agent_runner: Runner = None
# Initialize application state
app_contexts = AppContexts()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize service contexts during application startup
app_contexts.session_service = InMemorySessionService()
app_contexts.artifact_service = GcsArtifactService(
bucket_name=SETTINGS.STORAGE_BUCKET_NAME
)
app_contexts.expense_manager_agent_runner = Runner(
agent=expense_manager_agent, # The agent we want to run
app_name=APP_NAME, # Associates runs with our app
session_service=app_contexts.session_service, # Uses our session manager
artifact_service=app_contexts.artifact_service, # Uses our artifact manager
)
logger.info("Application started successfully")
yield
logger.info("Application shutting down")
# Perform cleanup during application shutdown if necessary
# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
return app_contexts
# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)
@app.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest = Body(...),
app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
"""Process chat request and get response from the agent"""
# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
format_user_request_to_adk_content_and_store_artifacts,
request=request,
app_name=APP_NAME,
artifact_service=app_context.artifact_service,
)
final_response_text = "Agent did not produce a final response." # Default
# Use the session ID from the request or default if not provided
session_id = request.session_id
user_id = request.user_id
# Create session if it doesn't exist
if not app_context.session_service.get_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
):
app_context.session_service.create_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
try:
# Process the message with the agent
# Type annotation: runner.run_async returns an AsyncIterator[Event]
events_iterator: AsyncIterator[Event] = (
app_context.expense_manager_agent_runner.run_async(
user_id=user_id, session_id=session_id, new_message=content
)
)
async for event in events_iterator: # event has type Event
# Key Concept: is_final_response() marks the concluding message for the turn
if event.is_final_response():
if event.content and event.content.parts:
# Extract text from the first part
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
# Handle potential errors/escalations
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
break # Stop processing events once the final response is found
logger.info(
"Received final response from agent", raw_final_response=final_response_text
)
# Extract and process any attachments and thinking process in the response
base64_attachments = []
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)
# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
# Download image data and get MIME type
result = await asyncio.to_thread(
download_image_from_gcs,
artifact_service=app_context.artifact_service,
image_hash=image_hash_id,
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
if result:
base64_data, mime_type = result
base64_attachments.append(
ImageData(serialized_image=base64_data, mime_type=mime_type)
)
logger.info(
"Processed response with attachments",
sanitized_response=sanitized_text,
thinking_process=thinking_process,
attachment_ids=attachment_ids,
)
return ChatResponse(
response=sanitized_text,
thinking_process=thinking_process,
attachments=base64_attachments,
)
except Exception as e:
logger.error("Error processing chat request", error_message=str(e))
return ChatResponse(
response="", error=f"Error in generating response: {str(e)}"
)
# Only run the server if this file is executed directly
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8081)
Następnie możemy spróbować uruchomić usługę backendową. Pamiętaj, że w poprzednim kroku uruchomiliśmy usługę frontendu. Teraz musimy otworzyć nowy terminal i spróbować uruchomić usługę backendu.
- Utwórz nowy terminal. Przejdź do terminala w dolnej części ekranu i znajdź przycisk „+”, aby utworzyć nowy terminal. Możesz też nacisnąć Ctrl + Shift + C, aby otworzyć nowy terminal.
- Następnie sprawdź, czy jesteś w katalogu roboczym personal-expense-assistant, a potem uruchom to polecenie:
uv run backend.py
- Jeśli się uda, pojawi się taki wynik
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
Objaśnienie kodu
Inicjowanie ADK Agent, SessionService i ArtifactService
Aby uruchomić agenta w usłudze backendu, musimy utworzyć Runner, który przyjmuje zarówno SessionService, jak i naszego agenta. SessionService będzie zarządzać historią i stanem rozmowy, a dzięki integracji z Runnerem umożliwi naszemu agentowi otrzymywanie kontekstu bieżących rozmów.
Do obsługi przesłanego pliku używamy też usługi ArtifactService. Więcej informacji o sesji i elementach w ADK znajdziesz tutaj.
... @asynccontextmanager async def lifespan(app: FastAPI): # Initialize service contexts during application startup app_contexts.session_service = InMemorySessionService() app_contexts.artifact_service = GcsArtifactService( bucket_name=SETTINGS.STORAGE_BUCKET_NAME ) app_contexts.expense_manager_agent_runner = Runner( agent=expense_manager_agent, # The agent we want to run app_name=APP_NAME, # Associates runs with our app session_service=app_contexts.session_service, # Uses our session manager artifact_service=app_contexts.artifact_service, # Uses our artifact manager ) logger.info("Application started successfully") yield logger.info("Application shutting down") # Perform cleanup during application shutdown if necessary ...
W tym pokazie używamy InMemorySessionService i GcsArtifactService do integracji z naszym agentem Runner. Historia konwersacji jest przechowywana w pamięci, więc zostanie utracona, gdy usługa backendowa zostanie zatrzymana lub ponownie uruchomiona. Inicjujemy je w cyklu życia aplikacji FastAPI, aby mogły zostać wstrzyknięte jako zależności na ścieżce /chat
.
Przesyłanie i pobieranie obrazu za pomocą usługi GcsArtifactService
Wszystkie przesłane obrazy zostaną zapisane jako artefakty przez usługę GcsArtifactService. Możesz to sprawdzić w funkcji format_user_request_to_adk_content_and_store_artifacts
w pliku utils.py.
... # Prepare the user's message in ADK format and store image artifacts content = await asyncio.to_thread( format_user_request_to_adk_content_and_store_artifacts, request=request, app_name=APP_NAME, artifact_service=app_context.artifact_service, ) ...
Wszystkie żądania, które będą przetwarzane przez agenta, muszą być sformatowane w formacie types.Content. W ramach funkcji przetwarzamy też dane każdego obrazu i wyodrębniamy jego identyfikator, który zastępujemy obiektem zastępczym identyfikatora obrazu.
Pliki załączników pobierane są za pomocą podobnego mechanizmu po wyodrębnieniu identyfikatorów obrazów za pomocą wyrażenia regularnego:
... sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response( final_response_text ) sanitized_text, thinking_process = extract_thinking_process(sanitized_text) # Download images from GCS and replace hash IDs with base64 data for image_hash_id in attachment_ids: # Download image data and get MIME type result = await asyncio.to_thread( download_image_from_gcs, artifact_service=app_context.artifact_service, image_hash=image_hash_id, app_name=APP_NAME, user_id=user_id, session_id=session_id, ) ...
10. Test integracji
W konsoli Google Cloud powinno być uruchomionych kilka usług na różnych kartach:
- Usługa interfejsu użytkownika działa na porcie 8080
* Running on local URL: http://0.0.0.0:8080 To create a public link, set `share=True` in `launch()`.
- Usługa backendowa działa na porcie 8081
INFO: Started server process [xxxxx] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
Obecnie możesz przesyłać obrazy paragonów i bezproblemowo rozmawiać z asystentem z aplikacji internetowej na porcie 8080.
W górnej części edytora Cloud Shell kliknij przycisk Podgląd w przeglądarce i wybierz Podejrzyj na porcie 8080.
Teraz spróbujmy porozmawiać z Asystentem.
Pobierz te potwierdzenia. Te dane dotyczą okresu od 2023 r. do 2024 r. i prosząc o zapisanie lub przesłanie danych, poproś asystenta
- Receipt Drive ( źródło: zbiory danych Hugging Face
mousserlane/id_receipt_dataset
)
Zapytaj o różne rzeczy
- „Podaj zestawienie miesięcznych wydatków w latach 2023–2024”
- „Pokaż mi paragon za kawę”
- „Give me receipt file from Yakiniku Like”
- itd.
Oto fragment interakcji, która zakończyła się sukcesem
11. Wdrażanie w Cloud Run
Oczywiście chcemy mieć dostęp do tej wspaniałej aplikacji z dowolnego miejsca. Aby to zrobić, możemy spakować aplikację i wdrożyć ją w Cloud Run. Na potrzeby tego demonstracyjnego scenariusza usługa będzie udostępniona jako usługa publiczna, do której mają dostęp inne osoby. Pamiętaj jednak, że w przypadku tego typu aplikacji nie jest to zalecana metoda, ponieważ lepiej nadaje się do zastosowań osobistych.
W tym laboratorium programistycznym umieścimy w 1 kontenerze zarówno usługę frontendu, jak i backendu. Do zarządzania obiema usługami będziemy potrzebować pomocy supervisord. Możesz sprawdzić plik supervisord.conf i plik Dockerfile, aby zobaczyć, że jako punkt wejścia ustawiliśmy supervisord.
Mamy już wszystkie pliki potrzebne do wdrożenia aplikacji do Cloud Run, więc zróbmy to. Otwórz terminal Cloud Shell i sprawdź, czy bieżący projekt jest ustawiony jako aktywny. Jeśli nie, użyj polecenia gcloud configure, aby ustawić identyfikator projektu:
gcloud config set project [PROJECT_ID]
Następnie uruchom to polecenie, aby wdrożyć go w Cloud Run.
gcloud run deploy personal-expense-assistant \
--source . \
--port=8080 \
--allow-unauthenticated \
--env-vars-file=settings.yaml \
--memory 1024Mi \
--region us-central1
Jeśli pojawi się prośba o potwierdzenie utworzenia rejestru komponentów dla repozytorium Dockera, odpowiedz „Y” (tak). Ponieważ jest to aplikacja demonstracyjna, zezwalamy na dostęp bez uwierzytelniania. Zalecamy stosowanie odpowiedniego uwierzytelniania w przypadku aplikacji korporacyjnych i produkcyjnych.
Po zakończeniu wdrażania powinien pojawić się link podobny do tego:
https://personal-expense-assistant-*******.us-central1.run.app
Użyj aplikacji w oknie incognito lub na urządzeniu mobilnym. Powinien być już dostępny.
12. Wyzwanie
Teraz nadszedł czas, aby zabłysnąć i doskonalić swoje umiejętności eksploracji. Czy masz wszystko, czego potrzebujesz, aby zmienić kod, tak aby backend obsługiwał wielu użytkowników? Jakie komponenty wymagają aktualizacji?
13. Czyszczenie danych
Aby uniknąć obciążenia konta Google Cloud opłatami za zasoby wykorzystane w tym ćwiczeniu, wykonaj te czynności:
- W konsoli Google Cloud otwórz stronę Zarządzanie zasobami.
- Na liście projektów wybierz projekt do usunięcia, a potem kliknij Usuń.
- W oknie wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.
- Możesz też przejść w konsoli do Cloud Run, wybrać właśnie wdrożony zasób i usunąć go.