Введение в API разговорной аналитики

1. Введение

В этой практической работе вы научитесь использовать Python SDK для API разговорной аналитики (CA) с источником данных BigQuery. Вы узнаете, как создать нового агента, как использовать управление состоянием разговора, а также как отправлять и транслировать ответы из API.

Предпосылки

  • Базовое понимание Google Cloud и консоли Google Cloud
  • Базовые навыки работы с интерфейсом командной строки и Cloud Shell
  • Базовые навыки программирования на Python

Чему вы научитесь

  • Как использовать Python SDK API разговорной аналитики с источником данных BigQuery
  • Как создать нового агента с помощью CA API
  • Как использовать управление состоянием разговора
  • Как отправлять и транслировать ответы из API

Что вам понадобится

  • Аккаунт Google Cloud и проект Google Cloud
  • Веб-браузер, такой как Chrome

2. Настройка и требования

Выберите проект

  1. Войдите в Google Cloud Console и создайте новый проект или используйте существующий. Если у вас ещё нет учётной записи Gmail или Google Workspace, вам необходимо её создать .

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Название проекта — отображаемое имя участников проекта. Это строка символов, не используемая API Google. Вы можете изменить её в любой момент.
  • Идентификатор проекта уникален для всех проектов Google Cloud и неизменяем (нельзя изменить после установки). Cloud Console автоматически генерирует уникальную строку; обычно вам не важно, какой именно. В большинстве практических работ вам потребуется указать идентификатор проекта (обычно обозначаемый как PROJECT_ID ). Если вам не нравится сгенерированный идентификатор, вы можете сгенерировать другой случайный идентификатор. Вы также можете попробовать использовать свой собственный идентификатор и посмотреть, доступен ли он. После этого шага его нельзя будет изменить, и он останется на протяжении всего проекта.
  • К вашему сведению, существует третье значение — номер проекта , который используется некоторыми API. Подробнее обо всех трёх значениях можно узнать в документации .
  1. Далее вам нужно включить биллинг в Cloud Console для использования облачных ресурсов/API. Выполнение этой лабораторной работы не потребует больших затрат, если вообще потребует. Чтобы отключить ресурсы и избежать списания средств за пределами этого руководства, вы можете удалить созданные вами ресурсы или проект. Новые пользователи Google Cloud могут воспользоваться бесплатной пробной версией стоимостью 300 долларов США .

Запустить Cloud Shell

Хотя Google Cloud можно управлять удаленно с вашего ноутбука, в этой лабораторной работе вы будете использовать Google Cloud Shell — среду командной строки, работающую в облаке.

В консоли Google Cloud Console нажмите значок Cloud Shell на верхней правой панели инструментов:

5704d8c8d89c09d2.png

Подготовка и подключение к среде займёт всего несколько минут. После завершения вы увидите примерно следующее:

7ffe5cbb04455448.png

Эта виртуальная машина содержит все необходимые инструменты разработки. Она предоставляет постоянный домашний каталог объёмом 5 ГБ и работает в облаке Google Cloud, что значительно повышает производительность сети и аутентификацию. Всю работу в этой лабораторной работе можно выполнять в браузере. Вам не нужно ничего устанавливать.

3. Прежде чем начать

Включить необходимые API

Чтобы использовать сервисы Google Cloud, необходимо сначала активировать соответствующие API для вашего проекта. В этой лабораторной работе будут использоваться следующие сервисы Google Cloud:

  • API анализа данных с Gemini
  • Gemini для Google Cloud
  • API BigQuery

Чтобы включить эти службы, выполните следующие команды в терминале Cloud Shell:

gcloud services enable geminidataanalytics.googleapis.com
gcloud services enable cloudaicompanion.googleapis.com
gcloud services enable bigquery.googleapis.com

Установить пакеты Python

Перед запуском любого проекта Python рекомендуется создать виртуальную среду. Это изолирует зависимости проекта, предотвращая конфликты с другими проектами или глобальными пакетами Python в системе. В этом разделе мы установим uv из pip, поскольку в Cloud Shell pip уже доступен.

Установить пакет uv

pip install uv

Проверьте правильность установки УФ-излучения.

uv --version

Ожидаемый результат

Если вы видите строку вывода с uv, можно переходить к следующему шагу. Обратите внимание, что номер версии может отличаться:

3d81dc0243d27240.png

Создать виртуальную среду и установить пакеты

uv init ca-api-codelab
cd ca-api-codelab
uv venv --python 3.12
uv add google-cloud-geminidataanalytics pandas altair
uv pip list | grep -E 'altair|pandas|google-cloud-geminidataanalytics'

Ожидаемый результат

Если вы видите строки вывода с тремя пакетами, можно переходить к следующему шагу. Обратите внимание, что номера версий могут отличаться:

4d777e586e20bf3d.png

Запустить Python

uv run python

Ваш экран должен выглядеть так:

a50bf86ade7dec15.png

4. Создайте агента

Теперь, когда ваша среда разработки настроена и готова, пришло время заложить основу для API Gemini Data Analytics. SDK упрощает этот процесс, требуя лишь нескольких основных настроек для создания агента.

Установить переменные

Импортируйте пакет geminidataanalytics и настройте переменные среды:

import os
from google.cloud import geminidataanalytics

data_agent_client = geminidataanalytics.DataAgentServiceClient()

location = "global"
billing_project = os.environ.get('DEVSHELL_PROJECT_ID')
data_agent_id = "google_trends_analytics_agent"

Установить системные инструкции для агента

API CA считывает метаданные BigQuery, чтобы получить более подробную информацию о таблицах и столбцах, на которые ссылаются. Поскольку этот общедоступный набор данных не содержит описаний столбцов, вы можете предоставить агенту дополнительный контекст в виде строки в формате YAML. Рекомендации и шаблон для использования см. в документации :

system_instruction = """
system_instruction:
  - You are a data analyst specializing in the Google Trends dataset.
  - When querying, always use the 'week' column for date-based filtering. This needs to be a Sunday. If you are doing week over week comparison, make sure you specify a date that is a Sunday.
  - The following columns should be ignored in all queries 'dma_id', 'refresh_date'
  - The 'dma_name' column represents the city and state for about 210 metro areas in the USA.
tables:
  top_terms:
    description: "Represents the 25 most popular search terms by weekly search volume in a given US metro area (DMA)."
    fields:
      term: "The search query string."
      week: "The start date of the week (Sunday) for which the ranking is valid."
      rank: "The term's popularity rank from 1 (most popular) to 25."
      score: "Relative search interest, where 100 is the peak popularity for the term in that week."
      dma_name: "The name of the US metro area, e.g., 'New York NY'."
  top_rising_terms:
    description: "Represents the 25 fastest-growing ('breakout') search terms by momentum in a given US metro area (DMA)."
    fields:
      term: "The surging search query string."
      week: "The start date of the week (Sunday) for which the ranking is valid."
      rank: "The term's breakout rank from 1 (top rising) to 25."
      percent_gain: "The percentage growth in search volume compared to the previous period."
      dma_name: "The name of the US metro area, e.g., 'Los Angeles CA'."
      score: "Relative search interest, where 100 is the peak popularity for the term in that week."
join_instructions:
  goal: "Find terms that are simultaneously popular and rising in the same week and metro area."
  method: "INNER JOIN the two tables on their common keys."
  keys:
    - "term"
    - "week"
    - "dma_name"
golden_queries:
  - natural_language_query: "Find all terms in the 'New York NY' area that were in both the top 25 and top 25 rising lists for the week of July 6th, 2025, and show their ranks and percent gain."
    sql_query: |
      SELECT
          top.term,
          top.rank AS top_25_rank,
          rising.rank AS rising_25_rank,
          rising.percent_gain
      FROM
          `bigquery-public-data.google_trends.top_terms` AS top
      INNER JOIN
          `bigquery-public-data.google_trends.top_rising_terms` AS rising
      ON
          top.term = rising.term
          AND top.week = rising.week
          AND top.dma_name = rising.dma_name
      WHERE
          top.week = '2025-07-06'
          AND top.dma_name = 'New York NY'
      ORDER BY
          top.rank;
"""

Установить источники данных таблицы BigQuery

Теперь вы можете задать источники данных для таблиц BigQuery. API CA принимает таблицы BigQuery в виде массива:

# BigQuery table data sources
bq_top = geminidataanalytics.BigQueryTableReference(
    project_id="bigquery-public-data", dataset_id="google_trends", table_id="top_terms"
)
bq_rising = geminidataanalytics.BigQueryTableReference(
    project_id="bigquery-public-data", dataset_id="google_trends", table_id="top_rising_terms"
)
datasource_references = geminidataanalytics.DatasourceReferences(
    bq=geminidataanalytics.BigQueryTableReferences(table_references=[bq_top, bq_rising]))

Установить контекст для чата с отслеживанием состояния

Вы можете создать нового агента с опубликованным контекстом , который объединяет системные инструкции, ссылки на источники данных и любые другие параметры .

Обратите внимание, что у вас есть возможность создать stagingContext для тестирования и проверки изменений перед публикацией. Это позволяет разработчику добавить управление версиями к агенту данных, указав contextVersion в запросе чата. В этой практической работе вы просто опубликуете напрямую:

# Context setup for stateful chat
published_context = geminidataanalytics.Context(
    system_instruction=system_instruction,
    datasource_references=datasource_references,
    options=geminidataanalytics.ConversationOptions(
        analysis=geminidataanalytics.AnalysisOptions(
            python=geminidataanalytics.AnalysisOptions.Python(
                enabled=False
            )
        )
    ),
)

data_agent = geminidataanalytics.DataAgent(
    data_analytics_agent=geminidataanalytics.DataAnalyticsAgent(
        published_context=published_context
    ),
)

# Create the agent
data_agent_client.create_data_agent(request=geminidataanalytics.CreateDataAgentRequest(
    parent=f"projects/{billing_project}/locations/{location}",
    data_agent_id=data_agent_id,
    data_agent=data_agent,
))

После создания агента вы должны увидеть вывод, аналогичный приведенному ниже:

a7824f586c550d28.png

Получить агента

Давайте проверим агент, чтобы убедиться, что он был создан:

# Test the agent
request = geminidataanalytics.GetDataAgentRequest(
    name=data_agent_client.data_agent_path(
        billing_project, location, data_agent_id)
)
response = data_agent_client.get_data_agent(request=request)
print(response)

Вы увидите метаданные нового агента. Они будут включать в себя такие данные, как время создания и контекст агента в системных инструкциях и источниках данных.

5. Создайте беседу

Теперь вы готовы создать свой первый диалог! В этой лабораторной работе вы будете использовать ссылку на диалог для чата с отслеживанием состояния с вашим агентом.

Для справки, API CA предлагает различные способы чата с различными вариантами управления состоянием и агентами. Вот краткий обзор трёх подходов:

Состояние

История разговоров

Агент

Код

Описание

Общайтесь в чате, используя ссылку на беседу

С сохранением состояния

управляемый API

Да

беседаСсылка

Продолжает диалог с отслеживанием состояния, отправляя сообщение чата, ссылающееся на существующий диалог и связанный с ним контекст агента. Для многоэтапных диалогов история диалога хранится и управляется Google Cloud.

Чат с использованием ссылки агента данных

Без гражданства

Управляется пользователем

Да

dataAgentContext

Отправляет сообщение чата без сохранения состояния, ссылающееся на сохранённый агент данных для контекста. Для многоадресных разговоров ваше приложение должно управлять историей разговоров и предоставлять её при каждом запросе.

Чат с использованием встроенного контекста

Без гражданства

Управляется пользователем

Нет

inlineContext

Отправляет сообщение чата без сохранения состояния, предоставляя весь контекст непосредственно в запросе, без использования сохранённого агента данных. Для многоадресных разговоров ваше приложение должно управлять историей разговоров и предоставлять её при каждом запросе.

Вам нужно создать функцию для настройки беседы и указать уникальный идентификатор для беседы:

def setup_conversation(conversation_id: str):
    data_chat_client = geminidataanalytics.DataChatServiceClient()
    conversation = geminidataanalytics.Conversation(
        agents=[data_chat_client.data_agent_path(
            billing_project, location, data_agent_id)],
    )
    request = geminidataanalytics.CreateConversationRequest(
        parent=f"projects/{billing_project}/locations/{location}",
        conversation_id=conversation_id,
        conversation=conversation,
    )
    try:
        data_chat_client.get_conversation(name=data_chat_client.conversation_path(
            billing_project, location, conversation_id))
        print(f"Conversation '{conversation_id}' already exists.")
    except Exception:
        response = data_chat_client.create_conversation(request=request)
        print("Conversation created successfully:")
        print(response)


conversation_id = "my_first_conversation"
setup_conversation(conversation_id=conversation_id)

Вы должны увидеть сообщение об успешном создании беседы.

6. Добавить вспомогательные функции

Вы почти готовы начать общение с агентом. Прежде чем начать, давайте добавим несколько вспомогательных функций для форматирования сообщений, чтобы их было легче читать, а также для визуализации. API CA отправит спецификацию Vega , которую вы сможете построить с помощью пакета Altair:

# Utility functions for streaming and formatting responses
import altair as alt
import http.server
import pandas as pd
import proto
import socketserver
import threading

_server_thread = None
_httpd = None


# Prints a formatted section title
def display_section_title(text):
    print(f"\n--- {text.upper()} ---")


# Handles and displays data responses
def handle_data_response(resp):
    if "query" in resp:
        query = resp.query
        display_section_title("Retrieval query")
        print(f"Query name: {query.name}")
        print(f"Question: {query.question}")
        print("Data sources:")
        for datasource in query.datasources:
            display_datasource(datasource)
    elif "generated_sql" in resp:
        display_section_title("SQL generated")
        print(resp.generated_sql)
    elif "result" in resp:
        display_section_title("Data retrieved")
        fields = [field.name for field in resp.result.schema.fields]
        d = {field: [] for field in fields}
        for el in resp.result.data:
            for field in fields:
                d[field].append(el[field])
        print(pd.DataFrame(d))


# Starts a local web server to preview charts
def preview_in_browser(port: int = 8080):
    """Starts a web server in a background thread and waits for user to stop it."""
    global _server_thread, _httpd
    if _server_thread and _server_thread.is_alive():
        print(
            f"\n--> A new chart was generated. Refresh your browser at http://localhost:{port}")
        return
    Handler = http.server.SimpleHTTPRequestHandler
    socketserver.TCPServer.allow_reuse_address = True
    try:
        _httpd = socketserver.TCPServer(("", port), Handler)
    except OSError as e:
        print(f"❌ Could not start server on port {port}: {e}")
        return
    _server_thread = threading.Thread(target=_httpd.serve_forever)
    _server_thread.daemon = False
    _server_thread.start()
    print("\n" + "=" * 60)
    print(" 📈 CHART READY - PREVIEW IN BROWSER ".center(60))
    print("=" * 60)
    print(
        f"1. In the Cloud Shell toolbar, click 'Web Preview' and select port {port}.")
    print(f"2. Or, open your local browser to http://localhost:{port}")
    print("=" * 60)
    try:
        input(
            "\n--> Press Enter here after viewing all charts to shut down the server...\n\n")
    finally:
        print("Shutting down server...")
        _httpd.shutdown()
        _server_thread.join()
        _httpd, _server_thread = None, None
        print("Server stopped.")


# Handles chart responses
def handle_chart_response(resp, chart_generated_flag: list):
    def _value_to_dict(v):
        if isinstance(v, proto.marshal.collections.maps.MapComposite):
            return {k: _value_to_dict(v[k]) for k in v}
        elif isinstance(v, proto.marshal.collections.RepeatedComposite):
            return [_value_to_dict(el) for el in v]
        return v
    if "query" in resp:
        print(resp.query.instructions)
    elif "result" in resp:
        vega_config_dict = _value_to_dict(resp.result.vega_config)
        chart = alt.Chart.from_dict(vega_config_dict)
        chart_filename = "index.html"
        chart.save(chart_filename)
        if chart_generated_flag:
            chart_generated_flag[0] = True


# Displays the schema of a data source
def display_schema(data):
    fields = getattr(data, "fields")
    df = pd.DataFrame({
        "Column": [f.name for f in fields],
        "Type": [f.type for f in fields],
        "Description": [getattr(f, "description", "-") for f in fields],
        "Mode": [f.mode for f in fields],
    })
    print(df)


# Displays information about a BigQuery data source
def display_datasource(datasource):
    table_ref = datasource.bigquery_table_reference
    source_name = f"{table_ref.project_id}.{table_ref.dataset_id}.{table_ref.table_id}"
    print(source_name)
    display_schema(datasource.schema)


# Handles and displays schema resolution responses
def handle_schema_response(resp):
    if "query" in resp:
        print(resp.query.question)
    elif "result" in resp:
        display_section_title("Schema resolved")
        print("Data sources:")
        for datasource in resp.result.datasources:
            display_datasource(datasource)


# Handles and prints simple text responses
def handle_text_response(resp):
    parts = resp.parts
    print("".join(parts))


# Processes and displays different types of system messages
def show_message(msg, chart_generated_flag: list):
    m = msg.system_message
    if "text" in m:
        handle_text_response(getattr(m, "text"))
    elif "schema" in m:
        handle_schema_response(getattr(m, "schema"))
    elif "data" in m:
        handle_data_response(getattr(m, "data"))
    elif "chart" in m:
        handle_chart_response(getattr(m, "chart"), chart_generated_flag)
    print("\n")

7. Создайте функцию чата.

Последний шаг — создание функции чата, которую можно использовать повторно, и вызов функции show_message для каждого фрагмента в потоке ответа:

def stream_chat_response(question: str):
    """
    Sends a chat request, processes the streaming response, and if a chart
    was generated, starts the preview server and waits for it to be closed.
    """
    data_chat_client = geminidataanalytics.DataChatServiceClient()
    chart_generated_flag = [False]
    messages = [
        geminidataanalytics.Message(
            user_message=geminidataanalytics.UserMessage(text=question)
        )
    ]
    conversation_reference = geminidataanalytics.ConversationReference(
        conversation=data_chat_client.conversation_path(
            billing_project, location, conversation_id
        ),
        data_agent_context=geminidataanalytics.DataAgentContext(
            data_agent=data_chat_client.data_agent_path(
                billing_project, location, data_agent_id
            ),
        ),
    )
    request = geminidataanalytics.ChatRequest(
        parent=f"projects/{billing_project}/locations/{location}",
        messages=messages,
        conversation_reference=conversation_reference,
    )
    stream = data_chat_client.chat(request=request)
    for response in stream:
        show_message(response, chart_generated_flag)
    if chart_generated_flag[0]:
        preview_in_browser()

Функция stream_chat_response теперь определена и готова к использованию с вашими подсказками.

8. Начните общаться

Вопрос 1

Теперь вы готовы задавать вопросы! Давайте для начала посмотрим, что умеет этот агент:

question = "Hey what data do you have access to?"
stream_chat_response(question=question)

Агент должен ответить примерно так:

c63ae1edc9cc3dc8.png

Вопрос 2

Отлично, давайте попробуем найти больше информации о последних популярных поисковых запросах:

question = "What are the top 20 most popular search terms last week in NYC based on rank? Display each term and score as a column chart"
stream_chat_response(question=question)

Выполнение этого займёт некоторое время. Вы увидите, как агент выполняет различные этапы и потоковые обновления: от извлечения схемы и метаданных до написания SQL-запроса, получения результатов, указания инструкций по визуализации и суммирования результатов.

Чтобы просмотреть диаграмму, перейдите на панель инструментов Cloud Shell, нажмите «Веб-просмотр» и выберите порт 8080:

e04d97ef6d2f7d3a.png

Вы должны увидеть визуализацию, подобную этой:

d19b28630514a76.png

Нажмите Enter, чтобы выключить сервер и продолжить.

Вопрос 3

Давайте попробуем задать дополнительный вопрос и подробнее изучить эти результаты:

question = "What was the percent gain in growth for these search terms from the week before?"
stream_chat_response(question=question)

Вы должны увидеть что-то похожее на следующее. В этом случае агент сгенерировал запрос на объединение двух таблиц для определения процентного прироста. Обратите внимание, что ваш запрос может выглядеть немного иначе:

1819f64371ccbf1.png

И визуализированно это будет выглядеть так:

df7d3361f46d98fc.png

9. Уборка

Поскольку эта лабораторная работа не включает в себя долго выполняемые продукты, достаточно просто остановить активный сеанс Python, введя exit() в терминале.

Удалить папки и файлы проекта

Если вы хотите удалить код из среды Cloud Shell, используйте следующие команды:

cd ~
rm -rf ca-api-codelab

Отключить API

Чтобы отключить ранее включенные API, выполните эту команду

gcloud services disable geminidataanalytics.googleapis.com
gcloud services disable cloudaicompanion.googleapis.com
gcloud services disable bigquery.googleapis.com

10. Заключение

Поздравляем! Вы успешно создали простой агент разговорной аналитики с помощью CA SDK. Ознакомьтесь со справочными материалами, чтобы узнать больше!

Справочные материалы: