Переход на мультимодальность с помощью комплекта разработки агентов: помощник по личным расходам с Gemini 2.5, Firestore и Cloud Run

1. Введение

d029d993943b282b.png

Вы когда-нибудь были расстроены и слишком ленивы, чтобы управлять всеми своими личными расходами? Я тоже! Поэтому в этой кодовой лаборатории мы создадим персонального помощника по управлению расходами на базе Gemini 2.5, который будет выполнять всю работу за нас! От управления загруженными чеками до анализа того, не потратили ли вы уже слишком много на покупку кофе!

Этот помощник будет доступен через веб-браузер в виде веб-интерфейса чата, в котором вы можете общаться с ним, загружать некоторые изображения чеков и просить помощника сохранить их, или, может быть, хотите поискать некоторые чеки, чтобы получить файл и провести анализ расходов. И все это построено на основе фреймворка Google Agent Development Kit

Само приложение разделено на 2 сервиса: фронтенд и бэкенд, что позволяет вам быстро создать прототип и попробовать, как он работает, а также понять, как выглядит контракт API для интеграции их обоих.

В ходе выполнения кодовой лаборатории вы будете использовать пошаговый подход следующим образом:

  1. Подготовьте свой проект Google Cloud и включите в нем все необходимые API
  2. Настройка контейнера в Google Cloud Storage и базы данных в Firestore
  3. Создать индексацию Firestore
  4. Настройте рабочее пространство для вашей среды кодирования
  5. Структурирование исходного кода агента ADK, инструментов, подсказок и т. д.
  6. Тестирование агента с использованием локального пользовательского интерфейса веб-разработки ADK
  7. Создание интерфейса службы фронтенда — чат-интерфейса с использованием библиотеки Gradio для отправки запросов и загрузки изображений чеков.
  8. Создайте внутреннюю службу — HTTP-сервер с использованием FastAPI , где находится код нашего агента ADK, SessionService и Artifact Service.
  9. Управляйте переменными среды и настраивайте необходимые файлы, необходимые для развертывания приложения в Cloud Run.
  10. Разверните приложение в Cloud Run

Обзор архитектуры

6795e9abf2030334.jpeg

Предпосылки

  • Удобная работа с Python
  • Понимание базовой архитектуры полного стека с использованием HTTP-сервиса

Чему вы научитесь

  • Прототипирование веб-интерфейса с помощью Gradio
  • Разработка бэкэнд-сервисов с помощью FastAPI и Pydantic
  • Разработка архитектуры агента ADK с использованием его различных возможностей
  • Использование инструмента
  • Управление сеансами и артефактами
  • Использование обратного вызова для изменения входных данных перед отправкой в ​​Gemini
  • Использование BuiltInPlanner для улучшения выполнения задач путем планирования
  • Быстрая отладка через локальный веб-интерфейс ADK
  • Стратегия оптимизации мультимодального взаимодействия посредством анализа и извлечения информации с помощью оперативной разработки и модификации запросов Gemini с использованием обратного вызова ADK
  • Генерация дополненной агентной информации с использованием Firestore в качестве векторной базы данных
  • Управление переменными среды в файле YAML с помощью Pydantic-settings
  • Разверните приложение в облаке, запустите его с помощью Dockerfile и укажите переменные среды в файле YAML.

Что вам понадобится

  • Браузер Chrome
  • Учетная запись Gmail
  • Облачный проект с включенным биллингом

Эта кодовая лаборатория, предназначенная для разработчиков всех уровней (включая новичков), использует Python в своем примере приложения. Однако знание Python не требуется для понимания представленных концепций.

2. Прежде чем начать

Выберите активный проект в облачной консоли.

Эта кодовая лаборатория предполагает, что у вас уже есть проект Google Cloud с включенным биллингом. Если у вас его еще нет, вы можете следовать инструкциям ниже, чтобы начать.

  1. В Google Cloud Console на странице выбора проекта выберите или создайте проект Google Cloud.
  2. Убедитесь, что для вашего проекта Cloud включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .

9b27622602f6cc4f.png

Подготовить базу данных Firestore

Далее нам также понадобится создать базу данных Firestore. Firestore в режиме Native — это база данных документов NoSQL, созданная для автоматического масштабирования, высокой производительности и простоты разработки приложений. Она также может выступать в качестве векторной базы данных, которая может поддерживать технику Retrieval Augmented Generation для нашей лаборатории.

  1. Введите « firestore» в строке поиска и выберите продукт Firestore.

2986f598f448af67.png

  1. Затем нажмите кнопку «Создать базу данных Firestore».
  2. Используйте (по умолчанию) в качестве имени идентификатора базы данных и оставьте выбранным Standard Edition . Для этой лабораторной демонстрации используйте Firestore Native with Open security rules.
  1. Вы также заметите, что эта база данных на самом деле имеет статус Free-tier Usage ДА! После этого нажмите кнопку Create Database

27a5495b76ed7033.png

После этих шагов вы уже должны быть перенаправлены в базу данных Firestore, которую вы только что создали.

Настройка облачного проекта в Cloud Shell Terminal

  1. Вы будете использовать Cloud Shell , среду командной строки, работающую в Google Cloud, которая поставляется с предустановленной bq. Нажмите Активировать Cloud Shell в верхней части консоли Google Cloud.

1829c3759227c19b.png

  1. После подключения к Cloud Shell вы проверяете, что вы уже аутентифицированы и что проекту присвоен ваш идентификатор проекта, с помощью следующей команды:
gcloud auth list
  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить, что команда gcloud знает о вашем проекте.
gcloud config list project
  1. Если ваш проект не настроен, используйте следующую команду для его настройки:
gcloud config set project <YOUR_PROJECT_ID>

Кроме того, вы также можете увидеть идентификатор PROJECT_ID в консоли.

4032c45803813f30.jpeg

Нажмите на нее, и справа вы увидите все данные вашего проекта и его идентификатор.

8dc17eb4271de6b5.jpeg

  1. Включите требуемые API с помощью команды, показанной ниже. Это может занять несколько минут, поэтому, пожалуйста, будьте терпеливы.
gcloud services enable aiplatform.googleapis.com \
                       firestore.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudresourcemanager.googleapis.com

При успешном выполнении команды вы должны увидеть сообщение, подобное показанному ниже:

Operation "operations/..." finished successfully.

Альтернативой команде gcloud является поиск каждого продукта через консоль или использование этой ссылки .

Если какой-либо API отсутствует, вы всегда можете включить его в ходе реализации.

Информацию о командах и использовании gcloud см. в документации .

Подготовьте Google Cloud Storage Bucket

Далее, из того же терминала, нам нужно будет подготовить контейнер GCS для хранения загруженного файла. Выполните следующую команду, чтобы создать контейнер

gsutil mb -l us-central1 gs://personal-expense-assistant-receipts

Он покажет этот вывод

Creating gs://personal-expense-assistant-receipts/...

Вы можете убедиться в этом, перейдя в меню навигации в левом верхнем углу браузера и выбрав Облачное хранилище -> Контейнер.

d27475d5ce4fcc9d.png

Firestore — это изначально NoSQL-база данных, которая обеспечивает превосходную производительность и гибкость в модели данных, но имеет ограничения, когда дело доходит до сложных запросов. Поскольку мы планируем использовать некоторые составные многополевые запросы и векторный поиск, нам сначала нужно будет создать некоторый индекс. Вы можете прочитать больше подробностей в этой документации

  1. Выполните следующую команду, чтобы создать индекс для поддержки составных запросов.
gcloud firestore indexes composite create \
        --collection-group=personal-expense-assistant-receipts \
        --field-config field-path=total_amount,order=ASCENDING \
        --field-config field-path=transaction_time,order=ASCENDING \
        --field-config field-path=__name__,order=ASCENDING \
        --database="(default)"
  1. И запустите это для поддержки поиска векторов
gcloud firestore indexes composite create \
        --collection-group="personal-expense-assistant-receipts" \
        --query-scope=COLLECTION \
        --field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
        --database="(default)"

Вы можете проверить созданный индекс, посетив Firestore в облачной консоли, щелкнув экземпляр базы данных (по умолчанию) и выбрав «Индексы» на панели навигации.

8b3a4012985ee0b6.png

Перейдите в редактор Cloud Shell и настройте рабочий каталог приложения.

Теперь мы можем настроить наш редактор кода для выполнения некоторых кодовых вещей. Мы будем использовать Cloud Shell Editor для этого

  1. Нажмите кнопку «Открыть редактор», откроется редактор Cloud Shell, здесь мы можем написать наш код. b16d56e4979ec951.png
  2. Убедитесь, что проект Cloud Code установлен в левом нижнем углу (строка состояния) редактора Cloud Shell, как выделено на изображении ниже, и установлен на активный проект Google Cloud, в котором у вас включена оплата. Авторизуйтесь, если будет предложено. Если вы уже выполнили предыдущую команду, кнопка также может указывать непосредственно на ваш активированный проект вместо кнопки входа

f5003b9c38b43262.png

  1. Далее, давайте клонируем рабочий каталог шаблона для этой codelab из Github, запустим следующую команду. Она создаст рабочий каталог в каталоге personal-expense-assistant
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
  1. После этого перейдите в верхнюю часть редактора Cloud Shell и нажмите Файл->Открыть папку, найдите каталог с именем пользователя и найдите каталог personal-expense-assistant, затем нажмите кнопку ОК . Это сделает выбранный каталог основным рабочим каталогом. В этом примере имя пользователя — alvinprayuda , поэтому путь к каталогу показан ниже

2c53696f81d805cc.png

a766d380600a988.png

Теперь ваш Cloud Shell Editor должен выглядеть так

528df7169f01b016.png

Настройка среды

Подготовка виртуальной среды Python

Следующий шаг — подготовить среду разработки. Ваш текущий активный терминал должен находиться в рабочем каталоге personal-expense-assistant . В этой кодовой лаборатории мы будем использовать Python 3.12 и менеджер проектов uv python для упрощения необходимости создания и управления версией python и виртуальной средой.

  1. Если вы еще не открыли терминал, откройте его, нажав «Терминал» -> «Новый терминал» или используйте сочетание клавиш Ctrl + Shift + C , это откроет окно терминала в нижней части браузера.

f8457daf0bed059e.jpeg

  1. Загрузите uv и установите python 3.12 с помощью следующей команды
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
  1. Теперь давайте инициализируем виртуальную среду с помощью uv . Выполните эту команду.
uv sync --frozen

Это создаст каталог .venv и установит зависимости. Быстрый взгляд на pyproject.toml даст вам информацию о зависимостях, показанных следующим образом

dependencies = [
    "datasets>=3.5.0",
    "google-adk>=0.2.0",
    "google-cloud-firestore>=2.20.1",
    "gradio>=5.23.1",
    "pydantic>=2.10.6",
    "pydantic-settings[yaml]>=2.8.1",
]
  1. Чтобы протестировать виртуальную среду, создайте новый файл main.py и скопируйте следующий код
def main():
   print("Hello from personal-expense-assistant-adk!")

if __name__ == "__main__":
   main()
  1. Затем выполните следующую команду
uv run main.py

Вы получите вывод, как показано ниже.

Using CPython 3.12
Creating virtual environment at: .venv
Hello from personal-expense-assistant-adk!

Это показывает, что проект Python настроен правильно.

Файлы конфигурации установки

Теперь нам нужно настроить файлы конфигурации для этого проекта. Мы используем pydantic-settings для чтения конфигурации из файла YAML.

Создайте файл с именем settings.yaml со следующей конфигурацией. Нажмите Файл->Новый текстовый файл и заполните следующим кодом. Затем сохраните его как settings.yaml

GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"

Для этой кодовой лаборатории мы используем предварительно настроенные значения для GCLOUD_LOCATION , BACKEND_URL , STORAGE_BUCKET_NAME , DB_COLLECTION_NAME и BACKEND_URL .

Теперь мы можем перейти к следующему шагу — созданию агента, а затем и сервисов.

3. Создайте агента с использованием Google ADK и Gemini 2.5.

Введение в структуру каталогов ADK

Давайте начнем с изучения того, что может предложить ADK и как построить агента. Полная документация ADK доступна по этому URL . ADK предлагает нам множество утилит в рамках выполнения команд CLI. Вот некоторые из них:

  • Настройте структуру каталогов агентов
  • Быстро попробуйте взаимодействие через ввод-вывод CLI
  • Быстрая настройка локального веб-интерфейса разработки пользовательского интерфейса

Теперь давайте создадим структуру каталога агента с помощью команды CLI. Выполните следующую команду

uv run adk create expense_manager_agent \
   --model gemini-2.5-flash-preview-04-17 \
   --project {your-project-id} \
   --region us-central1

Будет создана следующая структура каталогов агентов

expense_manager_agent/
├── __init__.py
├── .env
├── agent.py

И если вы проверите init.py и agent.py, вы увидите этот код

# __init__.py

from . import agent
# agent.py

from google.adk.agents import Agent

root_agent = Agent(
    model='gemini-2.5-flash-preview-04-17',
    name='root_agent',
    description='A helpful assistant for user questions.',
    instruction='Answer user questions to the best of your knowledge',
)

Создание нашего агента по управлению расходами

Давайте создадим нашего агента менеджера расходов! Откройте файл expenditure_manager_agent / agent.py и скопируйте код ниже, который будет содержать root_agent.

# expense_manager_agent/agent.py

from google.adk.agents import Agent
from expense_manager_agent.tools import (
    store_receipt_data,
    search_receipts_by_metadata_filter,
    search_relevant_receipts_by_natural_language_query,
    get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types

SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"

# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
    task_prompt = file.read()

root_agent = Agent(
    name="expense_manager_agent",
    model="gemini-2.5-flash-preview-04-17",
    description=(
        "Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
    ),
    instruction=task_prompt,
    tools=[
        store_receipt_data,
        get_receipt_data_by_image_id,
        search_receipts_by_metadata_filter,
        search_relevant_receipts_by_natural_language_query,
    ],
    planner=BuiltInPlanner(
        thinking_config=types.ThinkingConfig(
            thinking_budget=2048,
        )
    ),
    before_model_callback=modify_image_data_in_history,
)

Пояснение кода

Этот скрипт содержит инициализацию нашего агента, где мы инициализируем следующие вещи:

  • Установите модель, которая будет использоваться, на gemini-2.5-flash-preview-04-17
  • Настройте описание агента и инструкцию в качестве системной подсказки, которая считывается из task_prompt.md
  • Предоставить необходимые инструменты для поддержки функциональности агента
  • Включите планирование перед формированием окончательного ответа или выполнением, используя возможности Flash-мышления Gemini 2.5
  • Настройте перехват обратного вызова перед отправкой запроса в Gemini, чтобы ограничить количество отправляемых данных изображения перед выполнением прогноза

4. Настройка инструментов агента

Наш агент по управлению расходами будет обладать следующими возможностями:

  • Извлечь данные из изображения чека и сохранить данные и файл
  • Точный поиск по данным о расходах
  • Контекстный поиск по данным о расходах

Следовательно, нам нужны соответствующие инструменты для поддержки этой функциональности. Создайте новый файл в каталоге expenditure_manager_agent и назовите его tools.py и скопируйте код ниже

# expense_manager_agent/tools.py

import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai

SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
    project=SETTINGS.GCLOUD_PROJECT_ID
)  # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
    vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""


def sanitize_image_id(image_id: str) -> str:
    """Sanitize image ID by removing any leading/trailing whitespace."""
    if image_id.startswith("[IMAGE-"):
        image_id = image_id.split("ID ")[1].split("]")[0]

    return image_id.strip()


def store_receipt_data(
    image_id: str,
    store_name: str,
    transaction_time: str,
    total_amount: float,
    purchased_items: List[Dict[str, Any]],
    currency: str = "IDR",
) -> str:
    """
    Store receipt data in the database.

    Args:
        image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
            the ID of the image is 12345.
        store_name (str): The name of the store.
        transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
        total_amount (float): The total amount spent.
        purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
            - name (str): The name of the item.
            - price (float): The price of the item.
            - quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
        currency (str, optional): The currency of the transaction, can be derived from the store location.
            If unsure, default is "IDR".

    Returns:
        str: A success message with the receipt ID.

    Raises:
        Exception: If the operation failed or input is invalid.
    """
    try:
        # In case of it provide full image placeholder, extract the id string
        image_id = sanitize_image_id(image_id)

        # Check if the receipt already exists
        doc = get_receipt_data_by_image_id(image_id)

        if doc:
            return f"Receipt with ID {image_id} already exists"

        # Validate transaction time
        if not isinstance(transaction_time, str):
            raise ValueError(
                "Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )
        try:
            datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError(
                "Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )

        # Validate items format
        if not isinstance(purchased_items, list):
            raise ValueError(INVALID_ITEMS_FORMAT_ERR)

        for _item in purchased_items:
            if (
                not isinstance(_item, dict)
                or "name" not in _item
                or "price" not in _item
            ):
                raise ValueError(INVALID_ITEMS_FORMAT_ERR)

            if "quantity" not in _item:
                _item["quantity"] = 1

        # Create a combined text from all receipt information for better embedding
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004",
            contents=RECEIPT_DESC_FORMAT.format(
                store_name=store_name,
                transaction_time=transaction_time,
                total_amount=total_amount,
                currency=currency,
                purchased_items=purchased_items,
                receipt_id=image_id,
            ),
        )

        embedding = result.embeddings[0].values

        doc = {
            "receipt_id": image_id,
            "store_name": store_name,
            "transaction_time": transaction_time,
            "total_amount": total_amount,
            "currency": currency,
            "purchased_items": purchased_items,
            EMBEDDING_FIELD_NAME: Vector(embedding),
        }

        COLLECTION.add(doc)

        return f"Receipt stored successfully with ID: {image_id}"
    except Exception as e:
        raise Exception(f"Failed to store receipt: {str(e)}")


def search_receipts_by_metadata_filter(
    start_time: str,
    end_time: str,
    min_total_amount: float = -1.0,
    max_total_amount: float = -1.0,
) -> str:
    """
    Filter receipts by metadata within a specific time range and optionally by amount.

    Args:
        start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
        max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.

    Returns:
        str: A string containing the list of receipt data matching all applied filters.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Validate start and end times
        if not isinstance(start_time, str) or not isinstance(end_time, str):
            raise ValueError("start_time and end_time must be strings in ISO format")
        try:
            datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
            datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError("start_time and end_time must be strings in ISO format")

        # Start with the base collection reference
        query = COLLECTION

        # Build the composite query by properly chaining conditions
        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        filters = [
            FieldFilter("transaction_time", ">=", start_time),
            FieldFilter("transaction_time", "<=", end_time),
        ]

        # Add optional filters
        if min_total_amount != -1:
            filters.append(FieldFilter("total_amount", ">=", min_total_amount))

        if max_total_amount != -1:
            filters.append(FieldFilter("total_amount", "<=", max_total_amount))

        # Apply the filters
        composite_filter = And(filters=filters)
        query = query.where(filter=composite_filter)

        # Execute the query and collect results
        search_result_description = "Search by Metadata Results:\n"
        for doc in query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display

            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error filtering receipts: {str(e)}")


def search_relevant_receipts_by_natural_language_query(
    query_text: str, limit: int = 5
) -> str:
    """
    Search for receipts with content most similar to the query using vector search.
    This tool can be use for user query that is difficult to translate into metadata filters.
    Such as store name or item name which sensitive to string matching.
    Use this tool if you cannot utilize the search by metadata filter tool.

    Args:
        query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
        limit (int, optional): Maximum number of results to return (default: 5).

    Returns:
        str: A string containing the list of contextually relevant receipt data.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Generate embedding for the query text
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004", contents=query_text
        )
        query_embedding = result.embeddings[0].values

        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        vector_query = COLLECTION.find_nearest(
            vector_field=EMBEDDING_FIELD_NAME,
            query_vector=Vector(query_embedding),
            distance_measure=DistanceMeasure.EUCLIDEAN,
            limit=limit,
        )

        # Execute the query and collect results
        search_result_description = "Search by Contextual Relevance Results:\n"
        for doc in vector_query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display
            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error searching receipts: {str(e)}")


def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
    """
    Retrieve receipt data from the database using the image_id.

    Args:
        image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
            [IMAGE-ID 12345], the ID to use is 12345.

    Returns:
        Dict[str, Any]: A dictionary containing the receipt data with the following keys:
            - receipt_id (str): The unique identifier of the receipt image.
            - store_name (str): The name of the store.
            - transaction_time (str): The time of purchase in UTC.
            - total_amount (float): The total amount spent.
            - currency (str): The currency of the transaction.
            - purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
        Returns an empty dictionary if no receipt is found.
    """
    # In case of it provide full image placeholder, extract the id string
    image_id = sanitize_image_id(image_id)

    # Query the receipts collection for documents with matching receipt_id (image_id)
    # Notes that this demo assume 1 user only,
    # need to refactor the query for multiple user
    query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
    docs = list(query.stream())

    if not docs:
        return {}

    # Get the first matching document
    doc_data = docs[0].to_dict()
    doc_data.pop(EMBEDDING_FIELD_NAME, None)

    return doc_data

Пояснение кода

При реализации функций этого инструментария мы разрабатываем инструменты вокруг следующих двух основных идей:

  • Анализ данных чека и сопоставление с исходным файлом с использованием заполнителя строки идентификатора изображения [IMAGE-ID <hash-of-image-1>]
  • Хранение и извлечение данных с использованием базы данных Firestore

Инструмент «store_receipt_data»

6119e1f37f516707.png

Этот инструмент представляет собой инструмент оптического распознавания символов. Он анализирует необходимую информацию из данных изображения, распознает строку идентификатора изображения и сопоставляет их для сохранения в базе данных Firestore.

Кроме того, этот инструмент также преобразует содержимое чека во встраивание с помощью text-embedding-004 так что все метаданные и встраивание хранятся и индексируются вместе. Обеспечивая гибкость для извлечения либо по запросу, либо по контекстному поиску.

После успешного выполнения этого инструмента вы увидите, что данные о чеках уже проиндексированы в базе данных Firestore, как показано ниже.

7b448fcde40fac5a.png

Инструмент "поиск_квитанций_по_фильтру_метаданных"

9d51a3f12289d184.png

Этот инструмент преобразует запрос пользователя в фильтр запроса метаданных, который поддерживает поиск по диапазону дат и/или общей транзакции. Он вернет все соответствующие данные о чеке, где в процессе мы отбросим поле встраивания, поскольку оно не нужно агенту для контекстного понимания

Инструмент «поиск_релевантных_квитанций_по_естественному_языковому_запросу»

b97d3aab9aa53bc9.png

Это наш инструмент Retrieval Augmented Generation (RAG). Наш агент имеет возможность разрабатывать свой собственный запрос для извлечения соответствующих квитанций из векторной базы данных, а также может выбирать, когда использовать этот инструмент. Понятие предоставления агенту возможности независимого решения относительно того, будет ли он использовать этот инструмент RAG или нет, и разрабатывать свой собственный запрос, является одним из определений подхода Agentic RAG .

Мы не только позволяем ему строить свой собственный запрос, но и позволяем ему выбирать, сколько релевантных документов он хочет получить. В сочетании с правильной инженерией подсказок, например

# Example prompt

Always filter the result from tool
search_relevant_receipts_by_natural_language_query as the returned 
result may contain irrelevant information

Это сделает данный инструмент мощным инструментом, способным выполнять поиск практически по чему угодно, хотя он может не вернуть все ожидаемые результаты из-за неточной природы поиска ближайшего соседа .

5. Изменение контекста разговора с помощью обратных вызовов

Google ADK позволяет нам «перехватывать» время выполнения агента на различных уровнях. Вы можете прочитать больше об этой подробной возможности в этой документации . В этой лабораторной работе мы используем before_model_callback для изменения запроса перед отправкой в ​​LLM для удаления данных изображения в старом контексте истории разговора (включать только данные изображения в последние 3 взаимодействия с пользователем) для эффективности

Однако мы все равно хотим, чтобы агент имел контекст данных изображения, когда это необходимо. Поэтому мы добавляем механизм для добавления строкового идентификатора изображения после каждого байта данных изображения в разговоре. Это поможет агенту связать идентификатор изображения с его фактическими данными файла, которые могут быть использованы как при сохранении изображения, так и во время его извлечения. Структура будет выглядеть следующим образом

<image-byte-data-1>
[IMAGE-ID <hash-of-image-1>]
<image-byte-data-2>
[IMAGE-ID <hash-of-image-2>]
And so on..

И когда байтовые данные устаревают в истории разговора, идентификатор строки все еще там, чтобы по-прежнему обеспечивать доступ к данным с помощью использования инструмента. Пример структуры истории после удаления данных изображения

[IMAGE-ID <hash-of-image-1>]
[IMAGE-ID <hash-of-image-2>]
And so on..

Давайте начнем! Создайте новый файл в каталоге expenditure_manager_agent и назовите его callbacks.py и скопируйте код ниже.

# expense_manager_agent/callbacks.py

import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest


def modify_image_data_in_history(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
    # The following code will modify the request sent to LLM
    # We will only keep image data in the last 3 user messages using a reverse and counter approach

    # Count how many user messages we've processed
    user_message_count = 0

    # Process the reversed list
    for content in reversed(llm_request.contents):
        # Only count for user manual query, not function call
        if (content.role == "user") and (content.parts[0].function_response is None):
            user_message_count += 1
            modified_content_parts = []

            # Check any missing image ID placeholder for any image data
            # Then remove image data from conversation history if more than 3 user messages
            for idx, part in enumerate(content.parts):
                if part.inline_data is None:
                    modified_content_parts.append(part)
                    continue

                if (
                    (idx + 1 >= len(content.parts))
                    or (content.parts[idx + 1].text is None)
                    or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
                ):
                    # Generate hash ID for the image and add a placeholder
                    image_data = part.inline_data.data
                    hasher = hashlib.sha256(image_data)
                    image_hash_id = hasher.hexdigest()[:12]
                    placeholder = f"[IMAGE-ID {image_hash_id}]"

                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

                    modified_content_parts.append(types.Part(text=placeholder))

                else:
                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

            # This will modify the contents inside the llm_request
            content.parts = modified_content_parts

6. Подсказка

Проектируя агента со сложным взаимодействием и возможностями, нам необходимо найти достаточно хорошую подсказку, которая будет направлять агента, чтобы он мог вести себя так, как мы хотим.

Раньше у нас был механизм обработки данных изображений в истории разговоров, а также были инструменты, которые могли быть непростыми в использовании, такие как search_relevant_receipts_by_natural_language_query. Мы также хотим, чтобы агент мог искать и извлекать для нас правильное изображение квитанции. Это означает, что нам нужно правильно передавать всю эту информацию в правильной структуре подсказки

Мы попросим агента структурировать вывод в следующем формате разметки, чтобы проанализировать процесс мышления, окончательный ответ и вложение (если есть)

# THINKING PROCESS

Thinking process here

# FINAL RESPONSE

Response to the user here

Attachments put inside json block

{
    "attachments": [
      "[IMAGE-ID <hash-id-1>]",
      "[IMAGE-ID <hash-id-2>]",
      ...
    ]
}

Давайте начнем со следующего приглашения, чтобы достичь наших первоначальных ожиданий от поведения агента менеджера расходов. Файл task_prompt.md уже должен существовать в нашем текущем рабочем каталоге, но нам нужно переместить его в каталог expenditure_manager_agent . Выполните следующую команду, чтобы переместить его

mv task_prompt.md expense_manager_agent/task_prompt.md

7. Тестирование агента

Теперь попробуем связаться с агентом через CLI, выполнив следующую команду

uv run adk run expense_manager_agent

Он покажет вывод, подобный этому, где вы можете по очереди общаться с агентом, однако вы можете отправлять только текст через этот интерфейс

Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root_agent, type exit to exit.
user: hello
[root_agent]: Hello there! How can I help you today?
user: 

Теперь, помимо взаимодействия CLI, ADK также позволяет нам иметь пользовательский интерфейс разработки для взаимодействия и проверки того, что происходит во время взаимодействия. Выполните следующую команду, чтобы запустить локальный сервер пользовательского интерфейса разработки

uv run adk web --port 8080

Это создаст вывод, подобный следующему примеру, что означает, что мы уже можем получить доступ к веб-интерфейсу.

INFO:     Started server process [xxxx]
INFO:     Waiting for application startup.

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://localhost:8080.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

Теперь, чтобы проверить это, нажмите кнопку «Веб-просмотр» в верхней части редактора Cloud Shell и выберите «Просмотр на порту 8080».

e7c9f56c2463164.png

Вы увидите следующую веб-страницу, где вы можете выбрать доступных агентов в раскрывающемся списке слева вверху (в нашем случае это должен быть expenditure_manager_agent ) и взаимодействовать с ботом. Вы увидите много информации о деталях журнала во время работы агента в левом окне

b0244afd8da6cc42.png

Давайте попробуем некоторые действия! Загрузите эти 2 примера чеков (источник: Hugging face datasets mousserlane/id_receipt_dataset ). Щелкните правой кнопкой мыши по каждому изображению и выберите Сохранить изображение как.. (это загрузит изображение чека), затем загрузите файл в бот, нажав на значок "clip" и скажите, что вы хотите сохранить эти чеки

b8ee334373c6e6af.pngc83a8c58ac2eff28.png

После этого попробуйте выполнить следующие запросы для поиска или извлечения файлов.

  • «Укажите разбивку расходов и их общую сумму за 2023 год»
  • «Дайте мне файл с квитанцией от Indomaret»

При использовании некоторых инструментов вы можете проверить, что происходит в пользовательском интерфейсе разработки.

bf47d0b35d5a4f28.png

Посмотрите, как агент вам отвечает, и проверьте, соответствует ли он всем правилам, указанным в подсказке внутри task_prompt.py. Поздравляем! Теперь у вас есть полностью рабочий агент разработки.

Теперь пришло время дополнить его удобным и понятным пользовательским интерфейсом, а также возможностями загрузки и скачивания файлов изображений.

8. Создание службы Frontend с использованием Gradio

Мы создадим веб-интерфейс чата, который будет выглядеть следующим образом.

d029d993943b282b.png

Он содержит интерфейс чата с полем ввода, позволяющим пользователям отправлять текст и загружать файлы изображений чеков.

Мы создадим фронтенд-сервис с использованием Gradio .

Создайте новый файл, нажмите Файл->Новый текстовый файл и назовите его frontend.py , затем скопируйте следующий код и сохраните его.

import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse


SETTINGS = get_settings()


def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
    """Encode a file to base64 string and get MIME type.

    Reads an image file and returns the base64-encoded image data and its MIME type.

    Args:
        image_path: Path to the image file to encode.

    Returns:
        ImageData object containing the base64 encoded image data and its MIME type.
    """
    # Read the image file
    with open(image_path, "rb") as file:
        image_content = file.read()

    # Get the mime type
    mime_type = mimetypes.guess_type(image_path)[0]

    # Base64 encode the image
    base64_data = base64.b64encode(image_content).decode("utf-8")

    # Return as ImageData object
    return ImageData(serialized_image=base64_data, mime_type=mime_type)


def decode_base64_to_image(base64_data: str) -> Image.Image:
    """Decode a base64 string to PIL Image.

    Converts a base64-encoded image string back to a PIL Image object
    that can be displayed or processed further.

    Args:
        base64_data: Base64 encoded string of the image.

    Returns:
        PIL Image object of the decoded image.
    """
    # Decode the base64 string and convert to PIL Image
    image_data = base64.b64decode(base64_data)
    image_buffer = io.BytesIO(image_data)
    image = Image.open(image_buffer)

    return image


def get_response_from_llm_backend(
    message: Dict[str, Any],
    history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
    """Send the message and history to the backend and get a response.

    Args:
        message: Dictionary containing the current message with 'text' and optional 'files' keys.
        history: List of previous message dictionaries in the conversation.

    Returns:
        List containing text response and any image attachments from the backend service.
    """
    # Extract files and convert to base64
    image_data = []
    if uploaded_files := message.get("files", []):
        for file_path in uploaded_files:
            image_data.append(encode_image_to_base64_and_get_mime_type(file_path))

    # Prepare the request payload
    payload = ChatRequest(
        text=message["text"],
        files=image_data,
        session_id="default_session",
        user_id="default_user",
    )

    # Send request to backend
    try:
        response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
        response.raise_for_status()  # Raise exception for HTTP errors

        result = ChatResponse(**response.json())
        if result.error:
            return [f"Error: {result.error}"]

        chat_responses = []

        if result.thinking_process:
            chat_responses.append(
                gr.ChatMessage(
                    role="assistant",
                    content=result.thinking_process,
                    metadata={"title": "🧠 Thinking Process"},
                )
            )

        chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))

        if result.attachments:
            for attachment in result.attachments:
                image_data = attachment.serialized_image
                chat_responses.append(gr.Image(decode_base64_to_image(image_data)))

        return chat_responses
    except requests.exceptions.RequestException as e:
        return [f"Error connecting to backend service: {str(e)}"]


if __name__ == "__main__":
    demo = gr.ChatInterface(
        get_response_from_llm_backend,
        title="Personal Expense Assistant",
        description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
        type="messages",
        multimodal=True,
        textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
    )

    demo.launch(
        server_name="0.0.0.0",
        server_port=8080,
    )

После этого мы можем попробовать запустить службу frontend с помощью следующей команды. Не забудьте переименовать файл main.py в frontend.py

uv run frontend.py

Вы увидите вывод, похожий на этот, в вашей облачной консоли

* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.

После этого вы можете проверить веб-интерфейс, когда вы ctrl+кликните по локальной ссылке URL. Кроме того, вы также можете получить доступ к приложению frontend, нажав кнопку Web Preview в правом верхнем углу Cloud Editor и выбрав Preview на порту 8080

49cbdfdf77964065.jpeg

Вы увидите веб-интерфейс, однако при попытке отправить чат возникнет ожидаемая ошибка из-за того, что серверная служба еще не настроена.

5caec77d95c35927.png

Теперь, пусть служба запустится и не убивайте ее пока. Мы запустим службу бэкэнда в другой вкладке терминала

Пояснение кода

В этом коде frontend мы сначала позволяем пользователю отправлять текст и загружать несколько файлов. Gradio позволяет нам создавать такую ​​функциональность с помощью метода gr.ChatInterface в сочетании с gr.MultimodalTextbox

Теперь перед отправкой файла и текста на бэкенд, нам нужно выяснить mimetype файла, так как он нужен бэкенду. Нам также нужно закодировать байт файла изображения в base64 и отправить его вместе с mimetype.

class ImageData(BaseModel):
    """Model for image data with hash identifier.

    Attributes:
        serialized_image: Optional Base64 encoded string of the image content.
        mime_type: MIME type of the image.
    """

    serialized_image: str
    mime_type: str

Схема, используемая для взаимодействия frontend-backend, определена в schema.py . Мы используем Pydantic BaseModel для обеспечения проверки данных в схеме

При получении ответа мы уже разделяем, какая часть является мыслительным процессом, окончательным ответом и привязанностью. Таким образом, мы можем использовать компонент Gradio для отображения каждого компонента с компонентом UI.

class ChatResponse(BaseModel):
    """Model for a chat response.

    Attributes:
        response: The text response from the model.
        thinking_process: Optional thinking process of the model.
        attachments: List of image data to be displayed to the user.
        error: Optional error message if something went wrong.
    """

    response: str
    thinking_process: str = ""
    attachments: List[ImageData] = []
    error: Optional[str] = None

9. Создание внутреннего сервиса с использованием FastAPI

Далее нам необходимо создать бэкэнд, который сможет инициализировать наш агент вместе с другими компонентами, чтобы иметь возможность выполнять среду выполнения агента.

Создайте новый файл, нажмите Файл->Новый текстовый файл, скопируйте и вставьте следующий код, затем сохраните его как backend.py.

from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
    extract_attachment_ids_and_sanitize_response,
    download_image_from_gcs,
    extract_thinking_process,
    format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings

SETTINGS = get_settings()
APP_NAME = "expense_manager_app"


# Application state to hold service contexts
class AppContexts(SimpleNamespace):
    """A class to hold application contexts with attribute access"""

    session_service: InMemorySessionService = None
    artifact_service: GcsArtifactService = None
    expense_manager_agent_runner: Runner = None


# Initialize application state
app_contexts = AppContexts()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary


# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
    return app_contexts


# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)


@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest = Body(...),
    app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
    """Process chat request and get response from the agent"""

    # Prepare the user's message in ADK format and store image artifacts
    content = await asyncio.to_thread(
        format_user_request_to_adk_content_and_store_artifacts,
        request=request,
        app_name=APP_NAME,
        artifact_service=app_context.artifact_service,
    )

    final_response_text = "Agent did not produce a final response."  # Default

    # Use the session ID from the request or default if not provided
    session_id = request.session_id
    user_id = request.user_id

    # Create session if it doesn't exist
    if not app_context.session_service.get_session(
        app_name=APP_NAME, user_id=user_id, session_id=session_id
    ):
        app_context.session_service.create_session(
            app_name=APP_NAME, user_id=user_id, session_id=session_id
        )

    try:
        # Process the message with the agent
        # Type annotation: runner.run_async returns an AsyncIterator[Event]
        events_iterator: AsyncIterator[Event] = (
            app_context.expense_manager_agent_runner.run_async(
                user_id=user_id, session_id=session_id, new_message=content
            )
        )
        async for event in events_iterator:  # event has type Event
            # Key Concept: is_final_response() marks the concluding message for the turn
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Extract text from the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate:
                    # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break  # Stop processing events once the final response is found

        logger.info(
            "Received final response from agent", raw_final_response=final_response_text
        )

        # Extract and process any attachments and thinking process in the response
        base64_attachments = []
        sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
            final_response_text
        )
        sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

        # Download images from GCS and replace hash IDs with base64 data
        for image_hash_id in attachment_ids:
            # Download image data and get MIME type
            result = await asyncio.to_thread(
                download_image_from_gcs,
                artifact_service=app_context.artifact_service,
                image_hash=image_hash_id,
                app_name=APP_NAME,
                user_id=user_id,
                session_id=session_id,
            )
            if result:
                base64_data, mime_type = result
                base64_attachments.append(
                    ImageData(serialized_image=base64_data, mime_type=mime_type)
                )

        logger.info(
            "Processed response with attachments",
            sanitized_response=sanitized_text,
            thinking_process=thinking_process,
            attachment_ids=attachment_ids,
        )

        return ChatResponse(
            response=sanitized_text,
            thinking_process=thinking_process,
            attachments=base64_attachments,
        )

    except Exception as e:
        logger.error("Error processing chat request", error_message=str(e))
        return ChatResponse(
            response="", error=f"Error in generating response: {str(e)}"
        )


# Only run the server if this file is executed directly
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081)

После этого мы можем попробовать запустить службу backend. Помните, что на предыдущем шаге мы запустили службу frontend правильно, теперь нам нужно будет открыть новый терминал и попробовать запустить эту службу backend

  1. Создайте новый терминал. Перейдите к своему терминалу в нижней области и найдите кнопку "+", чтобы создать новый терминал. Или вы можете нажать Ctrl + Shift + C , чтобы открыть новый терминал

3e52a362475553dc.jpeg

  1. После этого убедитесь, что вы находитесь в рабочем каталоге personal-expense-assistant, затем выполните следующую команду
uv run backend.py
  1. В случае успеха будет показан такой вывод:
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

Пояснение кода

Инициализация ADK Agent, SessionService и ArtifactService

Чтобы запустить агента в бэкэнд-сервисе, нам нужно будет создать Runner , который принимает как SessionService , так и нашего агента. SessionService будет управлять историей и состоянием разговоров, поэтому при интеграции с Runner он даст нашему агенту возможность получать контекст текущих разговоров.

Мы также используем ArtifactService для обработки загруженного файла. Вы можете прочитать больше подробностей здесь о сеансе ADK и артефактах

...

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary

...

В этой демонстрации мы используем InMemorySessionService и GcsArtifactService для интеграции с нашим агентом Runner. Поскольку история разговоров хранится в памяти, она будет потеряна после завершения или перезапуска бэкэнд-службы. Мы инициализируем их внутри жизненного цикла приложения FastAPI для внедрения в качестве зависимости в маршрут /chat .

Загрузка и скачивание изображения с помощью GcsArtifactService

Все загруженные изображения будут сохранены как артефакты службой GcsArtifactService , вы можете проверить это внутри функции format_user_request_to_adk_content_and_store_artifacts внутри utils.py

...    

# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
    format_user_request_to_adk_content_and_store_artifacts,
    request=request,
    app_name=APP_NAME,
    artifact_service=app_context.artifact_service,
)

...

Все запросы, которые будут обрабатываться агентом-бегуном, должны быть отформатированы в типы.Тип контента. Внутри функции мы также обрабатываем данные каждого изображения и извлекаем его идентификатор, который будет заменен заполнителем идентификатора изображения.

Похожий механизм используется для загрузки вложений после извлечения идентификаторов изображений с помощью регулярного выражения:

...
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
    final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
    # Download image data and get MIME type
    result = await asyncio.to_thread(
        download_image_from_gcs,
        artifact_service=app_context.artifact_service,
        image_hash=image_hash_id,
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id,
    )
...

10. Интеграционный тест

Теперь у вас должно быть запущено несколько служб на разных вкладках облачной консоли:

  • Служба frontend запущена на порту 8080
* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.
  • Бэкэнд-сервис запущен на порту 8081
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

На текущий момент вы сможете загружать изображения чеков и беспрепятственно общаться с помощником из веб-приложения через порт 8080.

Нажмите кнопку «Веб-просмотр» в верхней части редактора Cloud Shell и выберите «Просмотр на порту 8080».

e7c9f56c2463164.png

Теперь давайте поработаем с помощником!

Загрузите следующие квитанции. Диапазон дат этих квитанций составляет 2023-2024 годы и попросите помощника сохранить/загрузить их.

  • Receipt Drive (источник Hugging face datasets mousserlane/id_receipt_dataset )

Задавайте разные вопросы

  • «Дайте мне ежемесячную разбивку расходов в 2023–2024 годах»
  • «Покажите мне чек за покупку кофе»
  • «Дайте мне файл с квитанцией от Yakiniku Like»
  • И т. д

Вот фрагмент успешного взаимодействия

f6ba4537438033b2.png

313a43d32b0901ef.png

11. Развертывание в облаке Run

Теперь, конечно, мы хотим получить доступ к этому замечательному приложению из любой точки мира. Чтобы сделать это, мы можем упаковать это приложение и развернуть его в Cloud Run. Ради этой демонстрации эта служба будет представлена ​​как публичная служба, к которой могут получить доступ другие. Однако имейте в виду, что это не лучшая практика для такого рода приложений, поскольку она больше подходит для личных приложений.

6795e9abf2030334.jpeg

В этой кодовой лаборатории мы поместим как frontend, так и backend сервис в один контейнер. Нам понадобится помощь supervisord для управления обоими сервисами. Вы можете проверить файл supervisord.conf и проверить Dockerfile , в котором мы установили supervisord в качестве точки входа.

На этом этапе у нас уже есть все файлы, необходимые для развертывания наших приложений в Cloud Run, давайте развернем его. Перейдите в Cloud Shell Terminal и убедитесь, что текущий проект настроен на ваш активный проект, если нет, используйте команду gcloud configure, чтобы задать идентификатор проекта:

gcloud config set project [PROJECT_ID]

Затем выполните следующую команду, чтобы развернуть его в Cloud Run.

gcloud run deploy personal-expense-assistant \
                  --source . \
                  --port=8080 \
                  --allow-unauthenticated \
                  --env-vars-file=settings.yaml \
                  --memory 1024Mi \
                  --region us-central1

Если вам будет предложено подтвердить создание реестра артефактов для репозитория Docker, просто ответьте Y. Обратите внимание, что мы разрешаем здесь неаутентифицированный доступ, поскольку это демонстрационное приложение. Рекомендуется использовать соответствующую аутентификацию для ваших корпоративных и производственных приложений.

После завершения развертывания вы должны получить ссылку, похожую на следующую:

https://personal-expense-assistant-*******.us-central1.run.app

Продолжайте и используйте свое приложение из окна Incognito или вашего мобильного устройства. Оно должно быть уже запущено.

12. Вызов

Теперь пришло время отточить и отточить свои навыки исследователя. Есть ли у вас все необходимое, чтобы изменить код так, чтобы бэкенд мог обслуживать нескольких пользователей? Какие компоненты необходимо обновить?

13. Уборка

Чтобы избежать списания средств с вашего аккаунта Google Cloud за ресурсы, используемые в этой лабораторной работе, выполните следующие действия:

  1. В консоли Google Cloud перейдите на страницу Управление ресурсами .
  2. В списке проектов выберите проект, который вы хотите удалить, и нажмите Удалить .
  3. В диалоговом окне введите идентификатор проекта, а затем нажмите «Завершить работу» , чтобы удалить проект.
  4. Либо вы можете перейти в Cloud Run на консоли, выбрать только что развернутую службу и удалить ее.