מעבר לשימוש בכמה מודלים באמצעות ערכת פיתוח הסוכן: כלי לניהול הוצאות אישיות עם Gemini 2.5, Firestore ו-Cloud Run

1. מבוא

d029d993943b282b.png

האם אי פעם תהיו מתוסכלים ועצלים מדי כדי לנהל את כל ההוצאות האישיות שלכם? גם אני! לכן, בסדנת הקוד הזו נבנה עוזר אישי לניהול הוצאות – שמבוסס על Gemini 2.5 – שיעשה בשבילנו את כל המשימות. החל מניהול החשבוניות שהועלו ועד לניתוח של הוצאות כדי לדעת אם כבר הוצאתם יותר מדי כסף כדי לקנות קפה.

תוכלו לגשת לעוזרת הזו דרך דפדפן אינטרנט, כממשק אינטרנט של צ'אט שבו תוכלו לתקשר איתה, להעלות תמונות של קבלות ולבקש מהעוזרת לאחסן אותן. אולי תרצו לחפש קבלות מסוימות כדי לקבל את הקובץ ולבצע ניתוח הוצאות. והכל מבוסס על המסגרת של Google Agent Development Kit.

האפליקציה עצמה מופרדת לשני שירותים: חזית (frontend) ועורפי (backend). כך תוכלו ליצור אב טיפוס מהיר ולנסות אותו, וגם להבין איך נראה חוזה ה-API לשילוב של שניהם.

במהלך הקודלאב, נשתמש בגישה הדרגתית לפי השלבים הבאים:

  1. הכנת הפרויקט ב-Google Cloud והפעלת כל ממשקי ה-API הנדרשים בו
  2. הגדרת קטגוריה ב-Google Cloud Storage ובסיס נתונים ב-Firestore
  3. יצירת הוספה לאינדקס ב-Firestore
  4. הגדרת סביבת העבודה לתכנות
  5. בניית מבנה לקוד המקור, לכלים, להנחיה וכו' של סוכן ADK
  6. בדיקת הסוכן באמצעות ממשק המשתמש המקומי לפיתוח אינטרנט של ADK
  7. פיתוח השירות בחזית – ממשק הצ'אט באמצעות הספרייה Gradio, כדי לשלוח שאילתות ולהעלות תמונות של קבלות
  8. פיתוח שירות הקצה העורפי – שרת HTTP באמצעות FastAPI, שבו נמצאים קוד הסוכן של ADK, ‏ SessionService ו-Artifact Service
  9. ניהול משתני הסביבה והגדרת הקבצים הנדרשים לפריסה של האפליקציה ב-Cloud Run
  10. פריסת האפליקציה ב-Cloud Run

סקירה כללית על הארכיטקטורה

6795e9abf2030334.jpeg

דרישות מוקדמות

  • ניסיון בעבודה עם Python
  • הבנה בסיסית של ארכיטקטורה מלאה באמצעות שירות HTTP

מה תלמדו

  • יצירת אב טיפוס של ממשק חזית אינטרנט באמצעות Gradio
  • פיתוח שירות לקצה העורפי באמצעות FastAPI ו-Pydantic
  • תכנון של ADK Agent תוך ניצול של כמה מהיכולות שלו
  • שימוש בכלי
  • ניהול סשנים ופריטי מידע שנוצרו בתהליך פיתוח (Artifacts)
  • שימוש בקריאה חוזרת (CallBack) לשינוי הקלט לפני השליחה אל Gemini
  • שימוש ב-BuiltInPlanner כדי לשפר את ביצוע המשימות באמצעות תכנון
  • ניפוי באגים מהיר באמצעות ממשק האינטרנט המקומי של ADK
  • אסטרטגיה לביצוע אופטימיזציה של אינטראקציה במגוון מודלים באמצעות ניתוח מידע ואחזור מידע באמצעות תכנון הנחיות ושינוי של בקשות Gemini באמצעות קריאה חוזרת (callback) של ADK
  • יצירת מודלים משופרים של אחזור נתונים באמצעות Firestore כמסד נתונים של וקטורים
  • ניהול משתני הסביבה בקובץ YAML באמצעות Pydantic-settings
  • פריסת אפליקציה ב-Cloud Run באמצעות Dockerfile ומתן משתני סביבה באמצעות קובץ YAML

מה צריך להכין

  • דפדפן האינטרנט Chrome
  • חשבון Gmail
  • פרויקט ב-Cloud שבו החיוב מופעל

סדנת הקוד הזו מיועדת למפתחים מכל הרמות (כולל למתחילים), והיא כוללת שימוש ב-Python באפליקציה לדוגמה. עם זאת, לא צריך ידע ב-Python כדי להבין את המושגים המוצגים.

2. לפני שמתחילים

בחירת פרויקט פעיל במסוף Cloud

אנחנו יוצאים מנקודת הנחה שכבר יש לכם פרויקט ב-Google Cloud שבו החיוב מופעל. אם עדיין לא עשיתם זאת, תוכלו לפעול לפי ההוראות שבהמשך כדי להתחיל.

  1. בדף לבחירת הפרויקט במסוף Google Cloud, בוחרים פרויקט קיים או יוצרים פרויקט חדש ב-Google Cloud.
  2. הקפידו לוודא שהחיוב מופעל בפרויקט שלכם ב-Cloud. כך בודקים אם החיוב מופעל בפרויקט

9b27622602f6cc4f.png

הכנת מסד הנתונים ב-Firestore

בשלב הבא נצטרך גם ליצור מסד נתונים של Firestore. Firestore במצב מקורי הוא מסד נתונים מסוג NoSQL לאחסון מסמכים שמיועד להתאמה לעומס (automatic scaling), לביצועים גבוהים ולפיתוח אפליקציות בקלות. הוא יכול לשמש גם כמסד נתונים של וקטורים שיכול לתמוך בשיטת ה-Retrieval Augmented Generation במעבדה שלנו.

  1. מחפשים את firestore בסרגל החיפוש ולוחצים על המוצר Firestore.

2986f598f448af67.png

  1. לאחר מכן, לוחצים על הלחצן Create A Firestore Database.
  2. משתמשים באפשרות (default) בתור השם של מזהה מסד הנתונים, ומשאירים את האפשרות Standard Edition מסומנת. במסגרת הדגמה הזו במעבדה, נשתמש ב-Firestore Native עם כללי אבטחה פתוחים.
  1. תוכלו גם לראות שבמסד הנתונים הזה יש את השדה Free-tier Usage YEAY! לאחר מכן, לוחצים על Create Database Button (לחצן יצירת מסד נתונים).

27a5495b76ed7033.png

אחרי ביצוע השלבים האלה, אמורה להתבצע הפניה אוטומטית למסד הנתונים של Firestore שיצרתם.

הגדרת פרויקט ב-Cloud ב-Cloud Shell Terminal

  1. נשתמש ב-Cloud Shell, סביבת שורת פקודה שפועלת ב-Google Cloud ומגיעה עם bq טעון מראש. לוחצים על Activate Cloud Shell בחלק העליון של מסוף Google Cloud.

1829c3759227c19b.png

  1. אחרי שמתחברים ל-Cloud Shell, בודקים שכבר בוצע אימות ושהמזהה של הפרויקט מוגדר כפרויקט באמצעות הפקודה הבאה:
gcloud auth list
  1. מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שהפקודה gcloud מכירה את הפרויקט.
gcloud config list project
  1. אם הפרויקט לא מוגדר, משתמשים בפקודה הבאה כדי להגדיר אותו:
gcloud config set project <YOUR_PROJECT_ID>

אפשר גם לראות את המזהה PROJECT_ID במסוף

4032c45803813f30.jpeg

לוחצים עליו ומוצגים כל הפרויקטים ומזהה הפרויקט בצד שמאל.

8dc17eb4271de6b5.jpeg

  1. מפעילים את ממשקי ה-API הנדרשים באמצעות הפקודה שמופיעה בהמשך. הפעולה עשויה להימשך כמה דקות, אז חשוב להמתין.
gcloud services enable aiplatform.googleapis.com \
                       firestore.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudresourcemanager.googleapis.com

אם הפקודה תתבצע בהצלחה, אמורה להופיע הודעה דומה לזו שבהמשך:

Operation "operations/..." finished successfully.

האפשרות החלופית לפקודה gcloud היא דרך מסוף, על ידי חיפוש של כל מוצר או באמצעות הקישור הזה.

אם חסר ממשק API כלשהו, תמיד תוכלו להפעיל אותו במהלך ההטמעה.

במסמכי העזרה מפורטות הפקודות של gcloud והשימוש בהן.

הכנת קטגוריה ב-Google Cloud Storage

בשלב הבא, מאותו מסוף, נצטרך להכין את הקטגוריה ב-GCS כדי לאחסן את הקובץ שהועלו. מריצים את הפקודה הבאה כדי ליצור את הקטגוריה

gsutil mb -l us-central1 gs://personal-expense-assistant-receipts

הפלט שיוצג יהיה

Creating gs://personal-expense-assistant-receipts/...

אפשר לאמת זאת על ידי מעבר לתפריט הניווט בפינה הימנית העליונה של הדפדפן ובחירה באפשרות Cloud Storage -> Bucket.

d27475d5ce4fcc9d.png

Firestore הוא מסד נתונים ב-NoSQL שמבוסס על נתונים גולמיים, שמציע ביצועים וגמישות מעולים במודל הנתונים, אבל יש לו מגבלות כשמדובר בשאילתות מורכבות. אנחנו מתכננים להשתמש בכמה שאילתות מורכבות עם מספר שדות ובחיפוש וקטורים, ולכן קודם נצטרך ליצור אינדקס. פרטים נוספים זמינים במסמכי התיעוד האלה

  1. מריצים את הפקודה הבאה כדי ליצור אינדקס שתומך בשאילתות מורכבות
gcloud firestore indexes composite create \
        --collection-group=personal-expense-assistant-receipts \
        --field-config field-path=total_amount,order=ASCENDING \
        --field-config field-path=transaction_time,order=ASCENDING \
        --field-config field-path=__name__,order=ASCENDING \
        --database="(default)"
  1. וצריכים להריץ את הפקודה הזו כדי לתמוך בחיפוש וקטור
gcloud firestore indexes composite create \
        --collection-group="personal-expense-assistant-receipts" \
        --query-scope=COLLECTION \
        --field-config field-path="embedding",vector-config='{"dimension":"768", "flat": "{}"}' \
        --database="(default)"

כדי לבדוק את האינדקס שנוצר, נכנסים ל-Firestore במסוף Cloud ולוחצים על מכונה של מסד הנתונים (default) ובוחרים באפשרות Indexes בסרגל הניווט.

8b3a4012985ee0b6.png

כניסה ל-Cloud Shell Editor והגדרת ספריית העבודה של האפליקציה

עכשיו אפשר להגדיר את עורך הקוד כדי לבצע פעולות תכנות. לשם כך נשתמש ב-Cloud Shell Editor

  1. לוחצים על הלחצן Open Editor (פתיחת עורך). ייפתח עורך של Cloud Shell, שבו נוכל לכתוב את הקוד שלנו b16d56e4979ec951.png
  2. מוודאים שהפרויקט ב-Cloud Code מוגדר בפינה הימנית התחתונה (סרגל הסטטוס) של עורך Cloud Shell, כפי שמודגש בתמונה שבהמשך, והוא מוגדר לפרויקט הפעיל ב-Google Cloud שבו החיוב מופעל. נותנים הרשאה אם מוצגת בקשה לכך. אם כבר פעלת לפי הפקודה הקודמת, יכול להיות שהלחצן יצביע ישירות על הפרויקט המופעל במקום על לחצן הכניסה

f5003b9c38b43262.png

  1. בשלב הבא, נשכפל את ספריית העבודה של התבנית ל-codelab הזה מ-GitHub. כדי לעשות זאת, מריצים את הפקודה הבאה. הספרייה תיצור את ספריית העבודה בספרייה personal-expense-assistant.
git clone https://github.com/alphinside/personal-expense-assistant-adk-codelab-starter.git personal-expense-assistant
  1. לאחר מכן, עוברים לקטע העליון של Cloud Shell Editor ולוחצים על File->Open Folder,מחפשים את הספרייה username ואת הספרייה personal-expense-assistant ולוחצים על הלחצן OK. כך ספריית העבודה הראשית תהיה הספרייה שנבחרה. בדוגמה הזו, שם המשתמש הוא alvinprayuda, ולכן נתיב הספרייה מוצג בהמשך

2c53696f81d805cc.png

a766d380600a988.png

עכשיו Cloud Shell Editor אמור להיראות כך

528df7169f01b016.png

הגדרת הסביבה

הכנת סביבת Python וירטואלית

השלב הבא הוא הכנת סביבת הפיתוח. מסוף העבודה הפעיל הנוכחי צריך להיות בתוך ספריית העבודה personal-expense-assistant. נשתמש ב-Python 3.12 ב-codelab הזה, ונשתמש ב-uv python project manager כדי לפשט את הצורך ביצירה ובניהול של גרסת Python וסביבה וירטואלית.

  1. אם עדיין לא פתחתם את הטרמינל, פותחים אותו בלחיצה על Terminal (טרמינל) -> New Terminal (טרמינל חדש), או באמצעות Ctrl + Shift + C. חלון הטרמינל ייפתח בחלק התחתון של הדפדפן.

f8457daf0bed059e.jpeg

  1. מורידים את uv ומתקינים את Python 3.12 באמצעות הפקודה הבאה
curl -LsSf https://astral.sh/uv/0.6.16/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
  1. עכשיו נאתחל את הסביבה הווירטואלית באמצעות uv. מריצים את הפקודה הזו
uv sync --frozen

הפקודה הזו תיצור את הספרייה ‎.venv ותתקין את יחסי התלות. הצצה מהירה לקובץ pyproject.toml תספק לכם מידע על יחסי התלות שמוצגים כך

dependencies = [
    "datasets>=3.5.0",
    "google-adk>=0.2.0",
    "google-cloud-firestore>=2.20.1",
    "gradio>=5.23.1",
    "pydantic>=2.10.6",
    "pydantic-settings[yaml]>=2.8.1",
]
  1. כדי לבדוק את הסביבה הווירטואלית, יוצרים קובץ חדש בשם main.py ומעתיקים את הקוד הבא
def main():
   print("Hello from personal-expense-assistant-adk!")

if __name__ == "__main__":
   main()
  1. לאחר מכן, מריצים את הפקודה הבאה:
uv run main.py

הפלט שיוצג יהיה דומה לזה שמופיע בהמשך.

Using CPython 3.12
Creating virtual environment at: .venv
Hello from personal-expense-assistant-adk!

זה מראה שהפרויקט ב-Python מוגדר כראוי.

הגדרת קובצי תצורה

עכשיו נצטרך להגדיר קובצי תצורה לפרויקט הזה. אנחנו משתמשים ב-pydantic-settings כדי לקרוא את ההגדרות מקובץ ה-YAML.

יוצרים קובץ בשם settings.yaml עם ההגדרות הבאות. לוחצים על קובץ->קובץ טקסט חדש וממלאים את הקוד הבא. לאחר מכן שומרים אותו בתור settings.yaml.

GCLOUD_LOCATION: "us-central1"
GCLOUD_PROJECT_ID: "your_gcloud_project_id"
BACKEND_URL: "http://localhost:8081/chat"
STORAGE_BUCKET_NAME: "personal-expense-assistant-receipts"
DB_COLLECTION_NAME: "personal-expense-assistant-receipts"

בקודלאב הזה, נשתמש בערכים שהוגדרו מראש עבור GCLOUD_LOCATION, BACKEND_URL, STORAGE_BUCKET_NAME, DB_COLLECTION_NAME ו-BACKEND_URL .

עכשיו אפשר לעבור לשלב הבא, יצירת הסוכן ואז השירותים

3. פיתוח הסוכן באמצעות Google ADK ו-Gemini 2.5

מבוא למבנה הספריות של ADK

נתחיל בהסבר על מה ש-ADK מציע ועל אופן היצירה של הסוכן. אפשר לגשת למסמכי התיעוד המלאים של ADK בכתובת ה-URL הזו . ADK מציע לנו כלי עזר רבים במסגרת הפעלת הפקודות ב-CLI. חלק מהגורמים האלה הם :

  • הגדרת המבנה של ספריית הסוכנים
  • ניסיון מהיר של אינטראקציה באמצעות קלט ופלט של CLI
  • הגדרה מהירה של ממשק אינטרנט של ממשק משתמש לפיתוח מקומי

עכשיו נוצר את מבנה הספריות של הסוכנים באמצעות פקודת ה-CLI. מריצים את הפקודה הבאה:

uv run adk create expense_manager_agent \
   --model gemini-2.5-flash-preview-04-17 \
   --project {your-project-id} \
   --region us-central1

הפקודה תיצור את מבנה הספרייה הבא של הסוכנים

expense_manager_agent/
├── __init__.py
├── .env
├── agent.py

אם בודקים את הקבצים init.py ו-agent.py, רואים את הקוד הזה

# __init__.py

from . import agent
# agent.py

from google.adk.agents import Agent

root_agent = Agent(
    model='gemini-2.5-flash-preview-04-17',
    name='root_agent',
    description='A helpful assistant for user questions.',
    instruction='Answer user questions to the best of your knowledge',
)

פיתוח הסוכן של Expense Manager

נתחיל לבנות את הסוכן של מנהל ההוצאות. פותחים את הקובץ expense_manager_agent/agent.py ומעתיקים את הקוד שבהמשך, שיכיל את root_agent.

# expense_manager_agent/agent.py

from google.adk.agents import Agent
from expense_manager_agent.tools import (
    store_receipt_data,
    search_receipts_by_metadata_filter,
    search_relevant_receipts_by_natural_language_query,
    get_receipt_data_by_image_id,
)
from expense_manager_agent.callbacks import modify_image_data_in_history
import os
from settings import get_settings
from google.adk.planners import BuiltInPlanner
from google.genai import types

SETTINGS = get_settings()
os.environ["GOOGLE_CLOUD_PROJECT"] = SETTINGS.GCLOUD_PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = SETTINGS.GCLOUD_LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"

# Get the code file directory path and read the task prompt file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompt_path = os.path.join(current_dir, "task_prompt.md")
with open(prompt_path, "r") as file:
    task_prompt = file.read()

root_agent = Agent(
    name="expense_manager_agent",
    model="gemini-2.5-flash-preview-04-17",
    description=(
        "Personal expense agent to help user track expenses, analyze receipts, and manage their financial records"
    ),
    instruction=task_prompt,
    tools=[
        store_receipt_data,
        get_receipt_data_by_image_id,
        search_receipts_by_metadata_filter,
        search_relevant_receipts_by_natural_language_query,
    ],
    planner=BuiltInPlanner(
        thinking_config=types.ThinkingConfig(
            thinking_budget=2048,
        )
    ),
    before_model_callback=modify_image_data_in_history,
)

הסבר על הקוד

הסקריפט הזה מכיל את ההפעלה של הסוכן, שבה אנחנו מאתחלים את הדברים הבאים:

  • מגדירים את המודל לשימוש כ-gemini-2.5-flash-preview-04-17
  • מגדירים את התיאור וההוראות של הסוכן כהודעת המערכת שנקראה מ-task_prompt.md
  • לספק את הכלים הנדרשים לתמיכה בפונקציונליות של הסוכן
  • הפעלת תכנון לפני יצירת התגובה או הביצוע הסופיים באמצעות יכולות ה-Flash Thinking של Gemini 2.5
  • הגדרת ניתוב חזרה (callback) לפני שליחת הבקשה ל-Gemini כדי להגביל את מספר נתוני התמונות שנשלחים לפני ביצוע התחזית

4. הגדרת הכלים של סוכן

לסוכן של מנהל ההוצאות שלנו יהיו היכולות הבאות:

  • חילוץ נתונים מתמונת הקבלה ושמירת הנתונים והקובץ
  • חיפוש מדויק בנתוני ההוצאות
  • חיפוש לפי הקשר בנתוני ההוצאות

לכן אנחנו זקוקים לכלים המתאימים כדי לתמוך בפונקציונליות הזו. יוצרים קובץ חדש בספרייה expense_manager_agent, נותנים לו את השם tools.py ומעתיקים את הקוד שבהמשך

# expense_manager_agent/tools.py

import datetime
from typing import Dict, List, Any
from google.cloud import firestore
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1 import FieldFilter
from google.cloud.firestore_v1.base_query import And
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from settings import get_settings
from google import genai

SETTINGS = get_settings()
DB_CLIENT = firestore.Client(
    project=SETTINGS.GCLOUD_PROJECT_ID
)  # Will use "(default)" database
COLLECTION = DB_CLIENT.collection(SETTINGS.DB_COLLECTION_NAME)
GENAI_CLIENT = genai.Client(
    vertexai=True, location=SETTINGS.GCLOUD_LOCATION, project=SETTINGS.GCLOUD_PROJECT_ID
)
EMBEDDING_DIMENSION = 768
EMBEDDING_FIELD_NAME = "embedding"
INVALID_ITEMS_FORMAT_ERR = """
Invalid items format. Must be a list of dictionaries with 'name', 'price', and 'quantity' keys."""
RECEIPT_DESC_FORMAT = """
Store Name: {store_name}
Transaction Time: {transaction_time}
Total Amount: {total_amount}
Currency: {currency}
Purchased Items:
{purchased_items}
Receipt Image ID: {receipt_id}
"""


def sanitize_image_id(image_id: str) -> str:
    """Sanitize image ID by removing any leading/trailing whitespace."""
    if image_id.startswith("[IMAGE-"):
        image_id = image_id.split("ID ")[1].split("]")[0]

    return image_id.strip()


def store_receipt_data(
    image_id: str,
    store_name: str,
    transaction_time: str,
    total_amount: float,
    purchased_items: List[Dict[str, Any]],
    currency: str = "IDR",
) -> str:
    """
    Store receipt data in the database.

    Args:
        image_id (str): The unique identifier of the image. For example IMAGE-POSITION 0-ID 12345,
            the ID of the image is 12345.
        store_name (str): The name of the store.
        transaction_time (str): The time of purchase, in ISO format ("YYYY-MM-DDTHH:MM:SS.ssssssZ").
        total_amount (float): The total amount spent.
        purchased_items (List[Dict[str, Any]]): A list of items purchased with their prices. Each item must have:
            - name (str): The name of the item.
            - price (float): The price of the item.
            - quantity (int, optional): The quantity of the item. Defaults to 1 if not provided.
        currency (str, optional): The currency of the transaction, can be derived from the store location.
            If unsure, default is "IDR".

    Returns:
        str: A success message with the receipt ID.

    Raises:
        Exception: If the operation failed or input is invalid.
    """
    try:
        # In case of it provide full image placeholder, extract the id string
        image_id = sanitize_image_id(image_id)

        # Check if the receipt already exists
        doc = get_receipt_data_by_image_id(image_id)

        if doc:
            return f"Receipt with ID {image_id} already exists"

        # Validate transaction time
        if not isinstance(transaction_time, str):
            raise ValueError(
                "Invalid transaction time: must be a string in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )
        try:
            datetime.datetime.fromisoformat(transaction_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError(
                "Invalid transaction time format. Must be in ISO format 'YYYY-MM-DDTHH:MM:SS.ssssssZ'"
            )

        # Validate items format
        if not isinstance(purchased_items, list):
            raise ValueError(INVALID_ITEMS_FORMAT_ERR)

        for _item in purchased_items:
            if (
                not isinstance(_item, dict)
                or "name" not in _item
                or "price" not in _item
            ):
                raise ValueError(INVALID_ITEMS_FORMAT_ERR)

            if "quantity" not in _item:
                _item["quantity"] = 1

        # Create a combined text from all receipt information for better embedding
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004",
            contents=RECEIPT_DESC_FORMAT.format(
                store_name=store_name,
                transaction_time=transaction_time,
                total_amount=total_amount,
                currency=currency,
                purchased_items=purchased_items,
                receipt_id=image_id,
            ),
        )

        embedding = result.embeddings[0].values

        doc = {
            "receipt_id": image_id,
            "store_name": store_name,
            "transaction_time": transaction_time,
            "total_amount": total_amount,
            "currency": currency,
            "purchased_items": purchased_items,
            EMBEDDING_FIELD_NAME: Vector(embedding),
        }

        COLLECTION.add(doc)

        return f"Receipt stored successfully with ID: {image_id}"
    except Exception as e:
        raise Exception(f"Failed to store receipt: {str(e)}")


def search_receipts_by_metadata_filter(
    start_time: str,
    end_time: str,
    min_total_amount: float = -1.0,
    max_total_amount: float = -1.0,
) -> str:
    """
    Filter receipts by metadata within a specific time range and optionally by amount.

    Args:
        start_time (str): The start datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        end_time (str): The end datetime for the filter (in ISO format, e.g. 'YYYY-MM-DDTHH:MM:SS.ssssssZ').
        min_total_amount (float): The minimum total amount for the filter (inclusive). Defaults to -1.
        max_total_amount (float): The maximum total amount for the filter (inclusive). Defaults to -1.

    Returns:
        str: A string containing the list of receipt data matching all applied filters.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Validate start and end times
        if not isinstance(start_time, str) or not isinstance(end_time, str):
            raise ValueError("start_time and end_time must be strings in ISO format")
        try:
            datetime.datetime.fromisoformat(start_time.replace("Z", "+00:00"))
            datetime.datetime.fromisoformat(end_time.replace("Z", "+00:00"))
        except ValueError:
            raise ValueError("start_time and end_time must be strings in ISO format")

        # Start with the base collection reference
        query = COLLECTION

        # Build the composite query by properly chaining conditions
        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        filters = [
            FieldFilter("transaction_time", ">=", start_time),
            FieldFilter("transaction_time", "<=", end_time),
        ]

        # Add optional filters
        if min_total_amount != -1:
            filters.append(FieldFilter("total_amount", ">=", min_total_amount))

        if max_total_amount != -1:
            filters.append(FieldFilter("total_amount", "<=", max_total_amount))

        # Apply the filters
        composite_filter = And(filters=filters)
        query = query.where(filter=composite_filter)

        # Execute the query and collect results
        search_result_description = "Search by Metadata Results:\n"
        for doc in query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display

            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error filtering receipts: {str(e)}")


def search_relevant_receipts_by_natural_language_query(
    query_text: str, limit: int = 5
) -> str:
    """
    Search for receipts with content most similar to the query using vector search.
    This tool can be use for user query that is difficult to translate into metadata filters.
    Such as store name or item name which sensitive to string matching.
    Use this tool if you cannot utilize the search by metadata filter tool.

    Args:
        query_text (str): The search text (e.g., "coffee", "dinner", "groceries").
        limit (int, optional): Maximum number of results to return (default: 5).

    Returns:
        str: A string containing the list of contextually relevant receipt data.

    Raises:
        Exception: If the search failed or input is invalid.
    """
    try:
        # Generate embedding for the query text
        result = GENAI_CLIENT.models.embed_content(
            model="text-embedding-004", contents=query_text
        )
        query_embedding = result.embeddings[0].values

        # Notes that this demo assume 1 user only,
        # need to refactor the query for multiple user
        vector_query = COLLECTION.find_nearest(
            vector_field=EMBEDDING_FIELD_NAME,
            query_vector=Vector(query_embedding),
            distance_measure=DistanceMeasure.EUCLIDEAN,
            limit=limit,
        )

        # Execute the query and collect results
        search_result_description = "Search by Contextual Relevance Results:\n"
        for doc in vector_query.stream():
            data = doc.to_dict()
            data.pop(
                EMBEDDING_FIELD_NAME, None
            )  # Remove embedding as it's not needed for display
            search_result_description += f"\n{RECEIPT_DESC_FORMAT.format(**data)}"

        return search_result_description
    except Exception as e:
        raise Exception(f"Error searching receipts: {str(e)}")


def get_receipt_data_by_image_id(image_id: str) -> Dict[str, Any]:
    """
    Retrieve receipt data from the database using the image_id.

    Args:
        image_id (str): The unique identifier of the receipt image. For example, if the placeholder is
            [IMAGE-ID 12345], the ID to use is 12345.

    Returns:
        Dict[str, Any]: A dictionary containing the receipt data with the following keys:
            - receipt_id (str): The unique identifier of the receipt image.
            - store_name (str): The name of the store.
            - transaction_time (str): The time of purchase in UTC.
            - total_amount (float): The total amount spent.
            - currency (str): The currency of the transaction.
            - purchased_items (List[Dict[str, Any]]): List of items purchased with their details.
        Returns an empty dictionary if no receipt is found.
    """
    # In case of it provide full image placeholder, extract the id string
    image_id = sanitize_image_id(image_id)

    # Query the receipts collection for documents with matching receipt_id (image_id)
    # Notes that this demo assume 1 user only,
    # need to refactor the query for multiple user
    query = COLLECTION.where(filter=FieldFilter("receipt_id", "==", image_id)).limit(1)
    docs = list(query.stream())

    if not docs:
        return {}

    # Get the first matching document
    doc_data = docs[0].to_dict()
    doc_data.pop(EMBEDDING_FIELD_NAME, None)

    return doc_data

הסבר על הקוד

בתהליך ההטמעה של פונקציית הכלים הזו, אנחנו מתכננים את הכלים סביב 2 הרעיונות העיקריים הבאים:

  • ניתוח נתוני הקבלה ומיפוי לקובץ המקורי באמצעות placeholder של מחרוזת מזהה התמונה [IMAGE-ID <hash-of-image-1>]
  • אחסון נתונים ואחזור שלהם באמצעות מסד הנתונים של Firestore

הכלי 'store_receipt_data'

6119e1f37f516707.png

הכלי הזה הוא הכלי לזיהוי אופטית של תווים (OCR). הוא ינתח את המידע הנדרש מנתוני התמונה, יחד עם זיהוי המחרוזת של מזהה התמונה, וימפה אותם יחד כדי לשמור אותם במסד הנתונים של Firestore.

בנוסף, הכלי הזה ממיר את תוכן הקבלה להטמעה באמצעות text-embedding-004, כך שכל המטא-נתונים וההטמעה מאוחסנים ומתווספים לאינדקס יחד. מאפשרים גמישות באחזור באמצעות שאילתה או חיפוש לפי הקשר.

אחרי שמריצים את הכלי הזה, אפשר לראות שנתוני הקבלה כבר נוספו לאינדקס במסד הנתונים של Firestore, כפי שמוצג בהמשך.

7b448fcde40fac5a.png

הכלי 'search_receipts_by_metadata_filter'

9d51a3f12289d184.png

הכלי הזה ממיר את שאילתת המשתמש למסנן של שאילתת מטא-נתונים שתומך בחיפוש לפי טווח תאריכים ו/או סכום עסקה כולל. הפונקציה תחזיר את כל נתוני הקבלה שתואמים, כאשר בתהליך נמחק את שדה ההטמעה כי הוא לא נדרש לסוכנות לצורך הבנת ההקשר.

הכלי 'search_relevant_receipts_by_natural_language_query'

b97d3aab9aa53bc9.png

זהו הכלי שלנו ליצירה עם הרחבת יכולות אחזור (RAG). הסוכן שלנו יכול לתכנן שאילתה משלו כדי לאחזר קבלות רלוונטיות ממסד הנתונים של הווקטורים, וגם לבחור מתי להשתמש בכלי הזה. הרעיון של מתן אפשרות לסוכן לקבל החלטה עצמאית אם להשתמש בכלי ה-RAG הזה או לא, ולתכנן שאילתה משלו, הוא אחד מההגדרות של הגישה Agentic RAG.

אנחנו לא רק מאפשרים לו ליצור שאילתה משלו, אלא גם לבחור כמה מסמכים רלוונטיים הוא רוצה לאחזר. בשילוב עם תכנון הנחיות מתאים, למשל:

# Example prompt

Always filter the result from tool
search_relevant_receipts_by_natural_language_query as the returned 
result may contain irrelevant information

כך הכלי הזה יהפוך לכלי יעיל שאפשר לחפש בו כמעט כל דבר, אבל יכול להיות שהוא לא יחזיר את כל התוצאות הצפויות בגלל הטבע הלא מדויק של חיפוש השכנות הקרובה ביותר.

5. שינוי ההקשר של השיחה באמצעות קריאות חזרה (callbacks)

Google ADK מאפשר לנו "לעצור" את סביבת זמן הריצה של הסוכן ברמות שונות. מידע נוסף על היכולת המפורטת הזו זמין במסמכי העזרה האלה . במעבדה הזו, אנחנו משתמשים ב-before_model_callback כדי לשנות את הבקשה לפני שהיא נשלחת ל-LLM, כדי להסיר נתוני תמונות מההקשר של היסטוריית השיחות הקודמת ( כולל רק נתוני תמונות מ-3 האינטראקציות האחרונות של המשתמש) למען יעילות.

עם זאת, אנחנו עדיין רוצים שהסוכן יקבל את ההקשר של נתוני התמונה במקרה הצורך. לכן, הוספנו מנגנון להוספת placeholder של מזהה תמונה מסוג מחרוזת אחרי כל נתוני בייט של תמונה בשיחה. כך הסוכנות תוכל לקשר את מזהה התמונה לנתוני הקובץ בפועל, שניתן להשתמש בהם גם בזמן אחסון התמונה וגם בזמן אחזור שלה. המבנה ייראה כך

<image-byte-data-1>
[IMAGE-ID <hash-of-image-1>]
<image-byte-data-2>
[IMAGE-ID <hash-of-image-2>]
And so on..

כשנתוני הבייטים הופכים ללא רלוונטיים בהיסטוריית השיחות, מזהה המחרוזת עדיין נשאר כדי לאפשר גישה לנתונים באמצעות שימוש בכלי. דוגמה למבנה ההיסטוריה אחרי הסרת נתוני התמונות

[IMAGE-ID <hash-of-image-1>]
[IMAGE-ID <hash-of-image-2>]
And so on..

אפשר להתחיל! יוצרים קובץ חדש בספרייה expense_manager_agent, נותנים לו את השם callbacks.py ומעתיקים את הקוד שבהמשך.

# expense_manager_agent/callbacks.py

import hashlib
from google.genai import types
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest


def modify_image_data_in_history(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> None:
    # The following code will modify the request sent to LLM
    # We will only keep image data in the last 3 user messages using a reverse and counter approach

    # Count how many user messages we've processed
    user_message_count = 0

    # Process the reversed list
    for content in reversed(llm_request.contents):
        # Only count for user manual query, not function call
        if (content.role == "user") and (content.parts[0].function_response is None):
            user_message_count += 1
            modified_content_parts = []

            # Check any missing image ID placeholder for any image data
            # Then remove image data from conversation history if more than 3 user messages
            for idx, part in enumerate(content.parts):
                if part.inline_data is None:
                    modified_content_parts.append(part)
                    continue

                if (
                    (idx + 1 >= len(content.parts))
                    or (content.parts[idx + 1].text is None)
                    or (not content.parts[idx + 1].text.startswith("[IMAGE-ID "))
                ):
                    # Generate hash ID for the image and add a placeholder
                    image_data = part.inline_data.data
                    hasher = hashlib.sha256(image_data)
                    image_hash_id = hasher.hexdigest()[:12]
                    placeholder = f"[IMAGE-ID {image_hash_id}]"

                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

                    modified_content_parts.append(types.Part(text=placeholder))

                else:
                    # Only keep image data in the last 3 user messages
                    if user_message_count <= 3:
                        modified_content_parts.append(part)

            # This will modify the contents inside the llm_request
            content.parts = modified_content_parts

6. ההנחיה

כדי לתכנן נציג עם יכולות ואינטראקציה מורכבות, אנחנו צריכים למצוא הנחיה טובה מספיק שתוביל את הנציג כך שיוכל להתנהג כפי שאנחנו רוצים.

בעבר היה לנו מנגנון לטיפול בנתוני תמונות בהיסטוריית השיחות, וגם כלים שיכול להיות שהשימוש בהם לא פשוט, כמו search_relevant_receipts_by_natural_language_query. אנחנו גם רוצים שהנציג יוכל לחפש ולאחזר את תמונת הקבלה הנכונה עבורנו. כלומר, אנחנו צריכים להעביר את כל המידע הזה בצורה נכונה במבנה הנחיה מתאים.

נבקש מהנציג לבנות את הפלט בפורמט ה-Markdown הבא כדי לנתח את תהליך החשיבה, התשובה הסופית והקובץ המצורף ( אם יש כזה).

# THINKING PROCESS

Thinking process here

# FINAL RESPONSE

Response to the user here

Attachments put inside json block

{
    "attachments": [
      "[IMAGE-ID <hash-id-1>]",
      "[IMAGE-ID <hash-id-2>]",
      ...
    ]
}

נתחיל בהנחיה הבאה כדי לעמוד בציפיות הראשוניות שלנו לגבי התנהגות הסוכן של מנהל ההוצאות. קובץ task_prompt.md אמור כבר להופיע בספריית העבודה הקיימת שלנו, אבל אנחנו צריכים להעביר אותו לספרייה expense_manager_agent. מריצים את הפקודה הבאה כדי להעביר אותו

mv task_prompt.md expense_manager_agent/task_prompt.md

7. בדיקת הסוכן

עכשיו ננסה לתקשר עם הסוכן דרך CLI. מריצים את הפקודה הבאה:

uv run adk run expense_manager_agent

יוצג פלט כזה, שבו תוכלו להתכתב בצ'אט עם הנציג, אבל תוכלו לשלוח רק טקסט דרך הממשק הזה.

Log setup complete: /tmp/agents_log/agent.xxxx_xxx.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root_agent, type exit to exit.
user: hello
[root_agent]: Hello there! How can I help you today?
user: 

עכשיו, בנוסף לאינטראקציה עם CLI, ADK מאפשר לנו גם ליצור ממשק משתמש לפיתוח כדי ליצור אינטראקציה ולבדוק מה קורה במהלך האינטראקציה. מריצים את הפקודה הבאה כדי להפעיל את שרת ממשק המשתמש של הפיתוח המקומי

uv run adk web --port 8080

הפלט יהיה דומה לדוגמה הבאה, והמשמעות היא שכבר יש לנו גישה לממשק האינטרנט.

INFO:     Started server process [xxxx]
INFO:     Waiting for application startup.

+-----------------------------------------------------------------------------+
| ADK Web Server started                                                      |
|                                                                             |
| For local testing, access at http://localhost:8080.                         |
+-----------------------------------------------------------------------------+

INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

עכשיו, כדי לבדוק את זה, לוחצים על הלחצן Web Preview (תצוגה מקדימה באינטרנט) באזור העליון של Cloud Shell Editor ובוחרים באפשרות Preview on port 8080 (תצוגה מקדימה ביציאה 8080).

e7c9f56c2463164.png

יוצג דף האינטרנט הבא, שבו תוכלו לבחור את הנציגים הזמינים באמצעות התפריט הנפתח בפינה הימנית העליונה ( במקרה שלנו, זה אמור להיות expense_manager_agent) ולנהל אינטראקציה עם הבוט. בחלון הימני יוצגו פרטים רבים על יומני הפעילות במהלך זמן הריצה של הסוכנים.

b0244afd8da6cc42.png

ננסה כמה פעולות! מעלים את 2 הקבלות לדוגמה האלה ( מקור : מערכי נתונים של Hugging face mousserlane/id_receipt_dataset) . לוחצים לחיצה ימנית על כל תמונה ובוחרים באפשרות שמירת תמונה בשם. ( הפעולה הזו תגרום להורדת תמונת הקבלה), ואז מעלים את הקובץ לרובוט בלחיצה על סמל הקליפ ואומרים שאתם רוצים לשמור את הקבלות האלה.

b8ee334373c6e6af.png c83a8c58ac2eff28.png

לאחר מכן, אפשר לנסות את השאילתות הבאות כדי לבצע חיפוש או אחזור של קובץ.

  • "Give breakdown of expenses and its total during 2023"
  • "Give me receipt file from Indomaret"

כשמשתמשים בכלים מסוימים, אפשר לבדוק מה קורה בממשק המשתמש לפיתוח

bf47d0b35d5a4f28.png

בודקים איך הנציג מגיב לכם ובודקים אם הוא עומד בכל הכללים שצוינו בהנחיה בתוך task_prompt.py. מזל טוב! עכשיו יש לכם סוכן פיתוח פעיל ומלא.

עכשיו הגיע הזמן להשלים אותו עם ממשק משתמש נחמד ויכולות להעלאה ולהורדה של קובץ התמונה.

8. פיתוח שירות לקצה הקדמי באמצעות Gradio

נבנה ממשק אינטרנט לצ'אט שייראה כך

d029d993943b282b.png

הוא מכיל ממשק צ'אט עם שדה קלט שבו המשתמשים יכולים לשלוח טקסט ולהעלות את קובצי התמונות של הקבלות.

נשתמש ב-Gradio כדי ליצור את שירות הקצה.

יוצרים קובץ חדש, לוחצים על קובץ->קובץ טקסט חדש, נותנים לו את השם frontend.py, מעתיקים את הקוד הבא ושומרים אותו.

import mimetypes
import gradio as gr
import requests
import base64
from typing import List, Dict, Any
from settings import get_settings
from PIL import Image
import io
from schema import ImageData, ChatRequest, ChatResponse


SETTINGS = get_settings()


def encode_image_to_base64_and_get_mime_type(image_path: str) -> ImageData:
    """Encode a file to base64 string and get MIME type.

    Reads an image file and returns the base64-encoded image data and its MIME type.

    Args:
        image_path: Path to the image file to encode.

    Returns:
        ImageData object containing the base64 encoded image data and its MIME type.
    """
    # Read the image file
    with open(image_path, "rb") as file:
        image_content = file.read()

    # Get the mime type
    mime_type = mimetypes.guess_type(image_path)[0]

    # Base64 encode the image
    base64_data = base64.b64encode(image_content).decode("utf-8")

    # Return as ImageData object
    return ImageData(serialized_image=base64_data, mime_type=mime_type)


def decode_base64_to_image(base64_data: str) -> Image.Image:
    """Decode a base64 string to PIL Image.

    Converts a base64-encoded image string back to a PIL Image object
    that can be displayed or processed further.

    Args:
        base64_data: Base64 encoded string of the image.

    Returns:
        PIL Image object of the decoded image.
    """
    # Decode the base64 string and convert to PIL Image
    image_data = base64.b64decode(base64_data)
    image_buffer = io.BytesIO(image_data)
    image = Image.open(image_buffer)

    return image


def get_response_from_llm_backend(
    message: Dict[str, Any],
    history: List[Dict[str, Any]],
) -> List[str | gr.Image]:
    """Send the message and history to the backend and get a response.

    Args:
        message: Dictionary containing the current message with 'text' and optional 'files' keys.
        history: List of previous message dictionaries in the conversation.

    Returns:
        List containing text response and any image attachments from the backend service.
    """
    # Extract files and convert to base64
    image_data = []
    if uploaded_files := message.get("files", []):
        for file_path in uploaded_files:
            image_data.append(encode_image_to_base64_and_get_mime_type(file_path))

    # Prepare the request payload
    payload = ChatRequest(
        text=message["text"],
        files=image_data,
        session_id="default_session",
        user_id="default_user",
    )

    # Send request to backend
    try:
        response = requests.post(SETTINGS.BACKEND_URL, json=payload.model_dump())
        response.raise_for_status()  # Raise exception for HTTP errors

        result = ChatResponse(**response.json())
        if result.error:
            return [f"Error: {result.error}"]

        chat_responses = []

        if result.thinking_process:
            chat_responses.append(
                gr.ChatMessage(
                    role="assistant",
                    content=result.thinking_process,
                    metadata={"title": "🧠 Thinking Process"},
                )
            )

        chat_responses.append(gr.ChatMessage(role="assistant", content=result.response))

        if result.attachments:
            for attachment in result.attachments:
                image_data = attachment.serialized_image
                chat_responses.append(gr.Image(decode_base64_to_image(image_data)))

        return chat_responses
    except requests.exceptions.RequestException as e:
        return [f"Error connecting to backend service: {str(e)}"]


if __name__ == "__main__":
    demo = gr.ChatInterface(
        get_response_from_llm_backend,
        title="Personal Expense Assistant",
        description="This assistant can help you to store receipts data, find receipts, and track your expenses during certain period.",
        type="messages",
        multimodal=True,
        textbox=gr.MultimodalTextbox(file_count="multiple", file_types=["image"]),
    )

    demo.launch(
        server_name="0.0.0.0",
        server_port=8080,
    )

לאחר מכן, נוכל לנסות להריץ את שירות הקצה באמצעות הפקודה הבאה. חשוב לשנות את שם הקובץ main.py ל-frontend.py.

uv run frontend.py

במסוף Cloud יוצג פלט דומה לזה

* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.

לאחר מכן תוכלו לבדוק את ממשק האינטרנט על ידי לחיצה על ctrl+click על הקישור של כתובת ה-URL המקומית. לחלופין, אפשר לגשת לאפליקציית הקצה הקדמי גם על ידי לחיצה על הלחצן תצוגה מקדימה באינטרנט בפינה השמאלית העליונה של Cloud Editor, ובחירה באפשרות תצוגה מקדימה ביציאה 8080.

49cbdfdf77964065.jpeg

תוצג לכם ממשק האינטרנט, אבל תופיע שגיאה צפויה כשתנסו לשלוח צ'אט בגלל ששירות הקצה העורפי עדיין לא הוגדר.

5caec77d95c35927.png

עכשיו נותנים לשירות לפעול ולא מפסיקים אותו עדיין. נפעיל את שירות הקצה העורפי בכרטיסייה אחרת של מסוף

הסבר על הקוד

בקוד הזה של הקצה הקדמי, קודם מאפשרים למשתמש לשלוח טקסט ולהעלות כמה קבצים. Gradio מאפשר לנו ליצור פונקציונליות כזו באמצעות השיטה gr.ChatInterface בשילוב עם gr.MultimodalTextbox

עכשיו, לפני ששולחים את הקובץ והטקסט לקצה העורפי, צריך להבין מהו ה-MIME type של הקובץ, כפי שהוא נדרש לקצה העורפי. אנחנו צריכים גם לקודד את הבייט של קובץ התמונה ל-base64 ולשלוח אותו יחד עם ה-MIME-type.

class ImageData(BaseModel):
    """Model for image data with hash identifier.

    Attributes:
        serialized_image: Optional Base64 encoded string of the image content.
        mime_type: MIME type of the image.
    """

    serialized_image: str
    mime_type: str

הסכימה המשמשת לאינטראקציה בין הקצה הקדמי לקצה העורפי מוגדרת בקובץ schema.py. אנחנו משתמשים ב-BaseModel של Pydantic כדי לאכוף את אימות הנתונים בסכימה

כשאנחנו מקבלים את התשובה, אנחנו כבר מפרידים בין החלק של תהליך החשיבה, התשובה הסופית והקובץ המצורף. כך נוכל להשתמש ברכיב Gradio כדי להציג כל רכיב עם רכיב ממשק המשתמש.

class ChatResponse(BaseModel):
    """Model for a chat response.

    Attributes:
        response: The text response from the model.
        thinking_process: Optional thinking process of the model.
        attachments: List of image data to be displayed to the user.
        error: Optional error message if something went wrong.
    """

    response: str
    thinking_process: str = ""
    attachments: List[ImageData] = []
    error: Optional[str] = None

9. פיתוח שירות לקצה העורפי באמצעות FastAPI

בשלב הבא נצטרך ליצור את הקצה העורפי, שיכול לאתחל את ה-Agent שלנו יחד עם שאר הרכיבים כדי שנוכל להריץ את סביבת זמן הריצה של ה-Agent.

יוצרים קובץ חדש, לוחצים על קובץ->קובץ טקסט חדש,מעתיקים ומדביקים את הקוד הבא ושומרים אותו בשם backend.py.

from expense_manager_agent.agent import root_agent as expense_manager_agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from fastapi import FastAPI, Body, Depends
from typing import AsyncIterator
from types import SimpleNamespace
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from utils import (
    extract_attachment_ids_and_sanitize_response,
    download_image_from_gcs,
    extract_thinking_process,
    format_user_request_to_adk_content_and_store_artifacts,
)
from schema import ImageData, ChatRequest, ChatResponse
import logger
from google.adk.artifacts import GcsArtifactService
from settings import get_settings

SETTINGS = get_settings()
APP_NAME = "expense_manager_app"


# Application state to hold service contexts
class AppContexts(SimpleNamespace):
    """A class to hold application contexts with attribute access"""

    session_service: InMemorySessionService = None
    artifact_service: GcsArtifactService = None
    expense_manager_agent_runner: Runner = None


# Initialize application state
app_contexts = AppContexts()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary


# Helper function to get application state as a dependency
async def get_app_contexts() -> AppContexts:
    return app_contexts


# Create FastAPI app
app = FastAPI(title="Personal Expense Assistant API", lifespan=lifespan)


@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest = Body(...),
    app_context: AppContexts = Depends(get_app_contexts),
) -> ChatResponse:
    """Process chat request and get response from the agent"""

    # Prepare the user's message in ADK format and store image artifacts
    content = await asyncio.to_thread(
        format_user_request_to_adk_content_and_store_artifacts,
        request=request,
        app_name=APP_NAME,
        artifact_service=app_context.artifact_service,
    )

    final_response_text = "Agent did not produce a final response."  # Default

    # Use the session ID from the request or default if not provided
    session_id = request.session_id
    user_id = request.user_id

    # Create session if it doesn't exist
    if not app_context.session_service.get_session(
        app_name=APP_NAME, user_id=user_id, session_id=session_id
    ):
        app_context.session_service.create_session(
            app_name=APP_NAME, user_id=user_id, session_id=session_id
        )

    try:
        # Process the message with the agent
        # Type annotation: runner.run_async returns an AsyncIterator[Event]
        events_iterator: AsyncIterator[Event] = (
            app_context.expense_manager_agent_runner.run_async(
                user_id=user_id, session_id=session_id, new_message=content
            )
        )
        async for event in events_iterator:  # event has type Event
            # Key Concept: is_final_response() marks the concluding message for the turn
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Extract text from the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate:
                    # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break  # Stop processing events once the final response is found

        logger.info(
            "Received final response from agent", raw_final_response=final_response_text
        )

        # Extract and process any attachments and thinking process in the response
        base64_attachments = []
        sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
            final_response_text
        )
        sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

        # Download images from GCS and replace hash IDs with base64 data
        for image_hash_id in attachment_ids:
            # Download image data and get MIME type
            result = await asyncio.to_thread(
                download_image_from_gcs,
                artifact_service=app_context.artifact_service,
                image_hash=image_hash_id,
                app_name=APP_NAME,
                user_id=user_id,
                session_id=session_id,
            )
            if result:
                base64_data, mime_type = result
                base64_attachments.append(
                    ImageData(serialized_image=base64_data, mime_type=mime_type)
                )

        logger.info(
            "Processed response with attachments",
            sanitized_response=sanitized_text,
            thinking_process=thinking_process,
            attachment_ids=attachment_ids,
        )

        return ChatResponse(
            response=sanitized_text,
            thinking_process=thinking_process,
            attachments=base64_attachments,
        )

    except Exception as e:
        logger.error("Error processing chat request", error_message=str(e))
        return ChatResponse(
            response="", error=f"Error in generating response: {str(e)}"
        )


# Only run the server if this file is executed directly
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081)

לאחר מכן נוכל לנסות להריץ את שירות הקצה העורפי. חשוב לזכור שבשלב הקודם הפעלנו את שירות הקצה הקדמי. עכשיו נצטרך לפתוח מסוף חדש ולנסות להפעיל את שירות הקצה העורפי.

  1. יוצרים מסוף חדש. עוברים אל מסוף ה-CLI באזור התחתון ומאתרים את הלחצן '+' כדי ליצור מסוף חדש. לחלופין, אפשר להקיש על Ctrl + Shift + C כדי לפתוח טרמינל חדש.

3e52a362475553dc.jpeg

  1. לאחר מכן, מוודאים שנמצאים בספריית העבודה personal-expense-assistant ומריצים את הפקודה הבאה:
uv run backend.py
  1. אם הפעולה תתבצע בהצלחה, יוצג פלט דומה לזה:
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

הסבר על הקוד

הפעלת ADK Agent, ‏ SessionService ו-ArtifactService

כדי להריץ את הסוכן בשירות הקצה העורפי, נצטרך ליצור Runner שיכלול גם את SessionService וגם את הסוכן שלנו. SessionService ינהל את ההיסטוריה והמצב של השיחות, ולכן כשהוא ישתלב עם Runner, הוא יאפשר לנציג התמיכה לקבל את ההקשר של השיחות המתמשכות.

אנחנו משתמשים גם ב-ArtifactService כדי לטפל בקובץ שהועלו. כאן מפורט מידע נוסף על סשן וארטיפקטים ב-ADK

...

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize service contexts during application startup
    app_contexts.session_service = InMemorySessionService()
    app_contexts.artifact_service = GcsArtifactService(
        bucket_name=SETTINGS.STORAGE_BUCKET_NAME
    )
    app_contexts.expense_manager_agent_runner = Runner(
        agent=expense_manager_agent,  # The agent we want to run
        app_name=APP_NAME,  # Associates runs with our app
        session_service=app_contexts.session_service,  # Uses our session manager
        artifact_service=app_contexts.artifact_service,  # Uses our artifact manager
    )

    logger.info("Application started successfully")
    yield
    logger.info("Application shutting down")
    # Perform cleanup during application shutdown if necessary

...

בדמו הזה אנחנו משתמשים ב-InMemorySessionService וב-GcsArtifactService כדי לשלב אותם עם הסוכן שלנו, Runner. היסטוריית השיחות מאוחסנת בזיכרון, ולכן היא תימחק אחרי שהשירות לקצה העורפי יופסק או יופעל מחדש. אנחנו מפעילים אותם במחזור החיים של אפליקציית FastAPI כדי להזריק אותם כיחסי תלות במסלול /chat.

העלאה והורדה של קובץ אימג' באמצעות GcsArtifactService

כל התמונות שהועלו יישמרו כפריטי ארטיפקט על ידי GcsArtifactService. אפשר לבדוק זאת בפונקציה format_user_request_to_adk_content_and_store_artifacts בתוך utils.py.

...    

# Prepare the user's message in ADK format and store image artifacts
content = await asyncio.to_thread(
    format_user_request_to_adk_content_and_store_artifacts,
    request=request,
    app_name=APP_NAME,
    artifact_service=app_context.artifact_service,
)

...

כל הבקשות שיעובדו על ידי ה-agent runner צריכות להיות בפורמט של סוג types.Content. בתוך הפונקציה אנחנו גם מעבדים את נתוני כל תמונה ומחליפים את המזהה שלה ב-placeholder של מזהה תמונה.

מנגנון דומה משמש להורדת הקבצים המצורפים אחרי חילוץ מזהי התמונות באמצעות ביטוי רגולרי (regex):

...
sanitized_text, attachment_ids = extract_attachment_ids_and_sanitize_response(
    final_response_text
)
sanitized_text, thinking_process = extract_thinking_process(sanitized_text)

# Download images from GCS and replace hash IDs with base64 data
for image_hash_id in attachment_ids:
    # Download image data and get MIME type
    result = await asyncio.to_thread(
        download_image_from_gcs,
        artifact_service=app_context.artifact_service,
        image_hash=image_hash_id,
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id,
    )
...

10. בדיקת אינטגרציה

עכשיו אמורים לפעול כמה שירותים בכרטיסיות שונות במסוף Cloud:

  • שירות הקצה הקדמי פועל ביציאה 8080
* Running on local URL:  http://0.0.0.0:8080

To create a public link, set `share=True` in `launch()`.
  • שירות לקצה העורפי שפועל ביציאה 8081
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)

בשלב הזה, אמורה להיות לך אפשרות להעלות את התמונות של הקבלות ולצ'אט בצורה חלקה עם העוזרת מאפליקציית האינטרנט ביציאה 8080.

לוחצים על הלחצן Web Preview (תצוגה מקדימה באינטרנט) באזור העליון של Cloud Shell Editor ובוחרים באפשרות Preview on port 8080 (תצוגה מקדימה ביציאה 8080).

e7c9f56c2463164.png

עכשיו נבצע אינטראקציה עם העוזר הדיגיטלי.

מורידים את הקבלות הבאות. טווח התאריכים של נתוני הקבלות הוא בין השנים 2023-2024, ומבקשים מהעוזרת לאחסן או להעלות אותם

  • Receipt Drive ( מקור: מערכי הנתונים של Hugging Face mousserlane/id_receipt_dataset)

שואלים שאלות שונות

  • "Give me monthly expense breakdown during 2023-2024"
  • "Show me receipt for coffee transaction"
  • "Give me receipt file from Yakiniku Like"
  • וכו'

הנה קטע מתוך אינטראקציה מוצלחת

f6ba4537438033b2.png

313a43d32b0901ef.png

11. פריסה ב-Cloud Run

כמובן שאנחנו רוצים לגשת לאפליקציה המדהימה הזו מכל מקום. כדי לעשות זאת, אפשר לארוז את האפליקציה ולפרוס אותה ב-Cloud Run. לצורך הדגמה, השירות הזה יוצג כשירות ציבורי שאנשים אחרים יוכלו לגשת אליו. עם זאת, חשוב לזכור שזו לא השיטה המומלצת לאפליקציות מהסוג הזה, כי היא מתאימה יותר לאפליקציות אישיות.

6795e9abf2030334.jpeg

ב-codelab הזה נציב את שירות הקצה הקדמי ואת שירות הקצה העורפי בקונטיינר אחד. נצטרך את עזרת supervisord כדי לנהל את שני השירותים. אפשר לבדוק את הקובץ supervisord.conf ואת Dockerfile כדי לראות שאנחנו מגדירים את supervisord כנקודת הכניסה.

בשלב הזה כבר יש לנו את כל הקבצים הנדרשים לפריסה של האפליקציות שלנו ב-Cloud Run. עכשיו נפרוס אותן. עוברים ל-Cloud Shell Terminal ומוודאים שהפרויקט הנוכחי מוגדר כפרויקט הפעיל. אם לא, צריך להשתמש בפקודה gcloud configure כדי להגדיר את מזהה הפרויקט:

gcloud config set project [PROJECT_ID]

לאחר מכן, מריצים את הפקודה הבאה כדי לפרוס אותה ב-Cloud Run.

gcloud run deploy personal-expense-assistant \
                  --source . \
                  --port=8080 \
                  --allow-unauthenticated \
                  --env-vars-file=settings.yaml \
                  --memory 1024Mi \
                  --region us-central1

אם מופיעה בקשה לאשר את היצירה של מאגר פריטים ל-Artifact Registry למאגר Docker, פשוט משיבים Y. חשוב לזכור שאנחנו מאפשרים כאן גישה ללא אימות כי זוהי אפליקציית הדגמה. מומלץ להשתמש באימות מתאים לאפליקציות הארגון והייצור.

בסיום הפריסה, אמור להופיע קישור שדומה לזה:

https://personal-expense-assistant-*******.us-central1.run.app

אפשר להמשיך להשתמש באפליקציה מחלון הפרטיות או מהנייד. הוא כבר אמור להיות פעיל.

12. האתגר

עכשיו זה הזמן שלכם להפגין את כישורי הניתוח שלכם. יש לכם את היכולת לשנות את הקוד כך שהקצה העורפי יוכל להכיל כמה משתמשים? אילו רכיבים צריך לעדכן?

13. הסרת המשאבים

כדי להימנע מחיובים בחשבון Google Cloud על המשאבים שבהם השתמשתם בקודלאב הזה, יש לפעול לפי השלבים הבאים:

  1. נכנסים לדף Manage resources במסוף Google Cloud.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבת הדו-שיח ולוחצים על Shut down.
  4. לחלופין, אפשר לעבור אל Cloud Run במסוף, לבחור את השירות שפרסמתם ולמחוק אותו.