1. Übersicht
In der heutigen datenreichen Welt ist es unerlässlich, aussagekräftige Statistiken aus unstrukturierten Inhalten, insbesondere Videos, zu gewinnen. Stellen Sie sich vor, Sie müssten Hunderte oder Tausende von Video-URLs analysieren, ihre Inhalte zusammenfassen, wichtige Technologien extrahieren und sogar Frage-Antwort-Paare für Lernmaterialien generieren. Das ist nicht nur zeitaufwendig, sondern auch ineffizient. Hier kommen moderne Cloud-Architekturen ins Spiel.
In diesem Lab stellen wir eine skalierbare, serverlose Lösung zum Verarbeiten von Videoinhalten mit der leistungsstarken Suite von Google Cloud-Diensten vor: Cloud Run, BigQuery und generative KI von Google (Gemini). Wir beschreiben, wie wir von der Verarbeitung einer einzelnen URL zur Orchestrierung der parallelen Ausführung über einen großen Datensatz gelangt sind – und das alles ohne den Aufwand, komplexe Messaging-Warteschlangen und Integrationen zu verwalten.
Die Herausforderung
Wir hatten die Aufgabe, einen großen Katalog von Videoinhalten zu verarbeiten, wobei der Schwerpunkt auf praktischen Laborsitzungen lag. Ziel war es, jedes Video zu analysieren und eine strukturierte Zusammenfassung zu erstellen, einschließlich Kapiteltiteln, Einführungskontext, Schritt-für-Schritt-Anleitungen, verwendeten Technologien und relevanten Frage-Antwort-Paaren. Diese Ausgabe musste effizient gespeichert werden, um später für die Erstellung von Lernmaterialien verwendet werden zu können.
Anfangs hatten wir einen einfachen HTTP-basierten Cloud Run-Dienst, der jeweils eine URL verarbeiten konnte. Das hat sich für Tests und Ad-hoc-Analysen bewährt. Bei einer Liste mit Tausenden von URLs aus BigQuery wurden die Einschränkungen dieses Modells mit einer Anfrage und einer Antwort jedoch deutlich. Die sequentielle Verarbeitung würde Tage, wenn nicht Wochen dauern.
Die Chance bestand darin, einen manuellen oder langsamen sequenziellen Prozess in einen automatisierten, parallelisierten Workflow umzuwandeln. Durch die Nutzung der Cloud wollten wir Folgendes erreichen:
- Daten parallel verarbeiten: Die Verarbeitungszeit für große Datasets wird erheblich verkürzt.
- Vorhandene KI-Funktionen nutzen: Nutzen Sie die Leistungsfähigkeit von Gemini für anspruchsvolle Inhaltsanalysen.
- Serverlose Architektur beibehalten: Vermeiden Sie die Verwaltung von Servern oder komplexer Infrastruktur.
- Daten zentralisieren: Verwenden Sie BigQuery als zentrale Quelle für Eingabe-URLs und als zuverlässiges Ziel für verarbeitete Ergebnisse.
- Robuste Pipeline erstellen: Erstellen Sie ein System, das gegen Fehler resistent ist und sich einfach verwalten und überwachen lässt.
Ziel
Parallele KI-Verarbeitung mit Cloud Run-Jobs orchestrieren:
Unsere Lösung basiert auf einem Cloud Run-Job, der als Orchestrator fungiert. Dabei werden Batches von URLs aus BigQuery gelesen, an den vorhandenen, bereitgestellten Cloud Run-Dienst gesendet (der die KI-Verarbeitung für eine einzelne URL übernimmt) und die Ergebnisse werden zusammengefasst, um sie wieder in BigQuery zu schreiben. Dieser Ansatz bietet folgende Vorteile:
- Orchestrierung von der Verarbeitung entkoppeln: Der Job verwaltet den Workflow, während sich der separate Dienst auf die KI-Aufgabe konzentriert.
- Parallelität von Cloud Run-Jobs nutzen: Der Job kann mehrere Containerinstanzen hochskalieren, um den KI-Dienst gleichzeitig aufzurufen.
- Komplexität reduzieren: Wir erreichen Parallelität, indem der Job gleichzeitige HTTP-Aufrufe direkt verwaltet, was die Architektur vereinfacht.
Anwendungsfall
KI-basierte Statistiken aus Code Vipassana-Sitzungsvideos
In unserem konkreten Anwendungsfall haben wir Videos von Google Cloud-Sitzungen der Code Vipassana-Praxislabs analysiert. Ziel war es, automatisch strukturierte Dokumentation (Gliederungen von Buchkapiteln) zu generieren, einschließlich:
- Kapiteltitel: Kurze Titel für jedes Videosegment
- Kontext der Einführung: Die Relevanz des Videos in einem umfassenderen Lernpfad wird erläutert.
- Was wird erstellt: Die Kernaufgabe oder das Kernziel der Sitzung
- Verwendete Technologien: Eine Liste der erwähnten Cloud-Dienste und anderer Technologien
- Schritt-für-Schritt-Anleitung: Wie die Aufgabe ausgeführt wurde, einschließlich Code-Snippets
- Quellcode-/Demo-URLs: Links im Video
- Segment „Fragen und Antworten“: Relevante Fragen und Antworten für Wissensüberprüfungen generieren.
Flow
Ablauf der Architektur
Was ist Cloud Run? Was sind Cloud Run-Jobs?
Cloud Run
Eine vollständig verwaltete, serverlose Plattform, auf der Sie zustandslose Container ausführen können. Sie ist ideal für Webservices, APIs und Mikrodienste, die je nach eingehenden Anfragen automatisch skaliert werden können. Sie stellen ein Container-Image bereit und Cloud Run übernimmt den Rest – von der Bereitstellung und Skalierung bis hin zur Infrastrukturverwaltung. Sie eignet sich hervorragend für synchrone Arbeitslasten mit Anfrage/Antwort-Muster.
Cloud Run-Jobs
Ein Angebot, das Cloud Run-Dienste ergänzt. Cloud Run-Jobs sind für Batchverarbeitungsaufgaben konzipiert, die abgeschlossen und dann beendet werden müssen. Sie eignen sich perfekt für die Datenverarbeitung, ETL, Batchinferenz für maschinelles Lernen und alle Aufgaben, bei denen ein Dataset verarbeitet wird, anstatt Liveanfragen zu bearbeiten. Ein wichtiges Merkmal ist die Möglichkeit, die Anzahl der gleichzeitig ausgeführten Containerinstanzen (Aufgaben) zu skalieren, um eine Reihe von Aufgaben zu verarbeiten. Sie können durch verschiedene Ereignisquellen oder manuell ausgelöst werden.
Wichtigster Unterschied
Cloud Run-Dienste sind für Anwendungen mit langer Ausführungszeit, die auf Anfragen basieren. Cloud Run-Jobs eignen sich für die endliche, aufgabenorientierte Batchverarbeitung, die bis zum Abschluss ausgeführt wird.
Aufgaben
Eine Retail Search-Anwendung
Dabei gehen Sie so vor:
- BigQuery-Dataset und ‑Tabelle erstellen und Daten aufnehmen (Code für Vipassana-Metadaten)
- Python-Cloud Run Functions zum Implementieren der generativen KI-Funktionalität erstellen (Video in JSON-Datei für Buchkapitel umwandeln)
- Python-Anwendung für die Data-to-AI-Pipeline erstellen – Daten aus BigQuery lesen, den Cloud Run Functions-Endpunkt für Statistiken aufrufen und den Kontext zurück in BigQuery schreiben
- Anwendung erstellen und containerisieren
- Cloud Run-Jobs mit diesem Container konfigurieren
- Job ausführen und überwachen
- Berichtsergebnis
Voraussetzungen
2. Hinweis
Projekt erstellen
- Wählen Sie in der Google Cloud Console auf der Seite zur Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.
- Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist .
Google Cloud-Guthaben: Wenn Sie Google Cloud-Guthaben für den Einstieg erhalten möchten, verwenden Sie diesen Link, um Guthaben einzulösen. Hier findest du eine Anleitung zum Einlösen.
- Sie verwenden Cloud Shell, eine Befehlszeilenumgebung, die in Google Cloud ausgeführt wird. Klicken Sie oben in der Google Cloud Console auf „Cloud Shell aktivieren“.
- Wenn Sie mit Cloud Shell verbunden sind, können Sie mit dem folgenden Befehl prüfen, ob Sie bereits authentifiziert sind und das Projekt auf Ihre Projekt-ID festgelegt ist:
gcloud auth list
- Führen Sie den folgenden Befehl in Cloud Shell aus, um zu bestätigen, dass der gcloud-Befehl Ihr Projekt kennt.
gcloud config list project
- Wenn Ihr Projekt nicht festgelegt ist, verwenden Sie den folgenden Befehl, um es festzulegen:
gcloud config set project <YOUR_PROJECT_ID>
- Aktivieren Sie die erforderlichen APIs: Folgen Sie dem Link und aktivieren Sie die APIs.
Alternativ können Sie dazu den gcloud-Befehl verwenden. Informationen zu gcloud-Befehlen und deren Verwendung finden Sie in der Dokumentation.
3. Datenbank/Data Warehouse einrichten
BigQuery bildete das Rückgrat unserer Datenpipeline. Da es serverlos und hochgradig skalierbar ist, eignet es sich perfekt für die Speicherung unserer Eingabedaten und der verarbeiteten Ergebnisse.
- Datenspeicher:BigQuery diente als Data Warehouse. Darin wird die Liste der Video-URLs mit ihrem Status gespeichert (z.B. PENDING, PROCESSING, COMPLETED) und den endgültigen generierten Kontext. Sie ist die einzige Quelle, die angibt, welche Videos verarbeitet werden müssen.
- Ziel:Hier werden die KI-generierten Statistiken gespeichert, sodass sie für nachgelagerte Anwendungen oder die manuelle Überprüfung leicht abrufbar sind. Unser Dataset bestand aus Details zu Videositzungen, insbesondere aus Inhalten der Reihe „Code Vipassana Seasons“, die oft detaillierte technische Demonstrationen enthalten.
- Quelltabelle:Eine BigQuery-Tabelle (z.B. „post_session_labs“) mit Einträgen wie:
- id: Eine eindeutige Kennung für jede Sitzung/Zeile.
- url: Die URL des Videos, z.B. ein YouTube-Link oder ein zugänglicher Drive-Link.
- status: Ein String, der den Verarbeitungsstatus angibt (z.B. PENDING, PROCESSING, COMPLETED, FAILED_PROCESSING).
- context: Ein Stringfeld zum Speichern der KI-generierten Zusammenfassung.
- Datenaufnahme: In diesem Szenario wurden die Daten mit INSERT-Skripts in BigQuery aufgenommen. Für unsere Pipeline war BigQuery der Ausgangspunkt.
Rufen Sie die BigQuery-Konsole auf, öffnen Sie einen neuen Tab und führen Sie die folgenden SQL-Anweisungen aus:
--1. Create your dataset for the project
CREATE SCHEMA `<<YOUR_PROJECT_ID>>.cv_metadata`
OPTIONS(
location = 'us-central1', -- Specify the location (e.g., 'US', 'EU', 'asia-east1')
description = 'Code Vipassana Sessions Metadata' -- Optional: Add a description
);
--2. Create table
create table cv_metadata.post_session_labs(id STRING, descr STRING, url STRING, context STRING, status STRING);
4. Datenaufnahme
Jetzt ist es an der Zeit, eine Tabelle mit den Daten zum Geschäft hinzuzufügen. Rufen Sie in BigQuery Studio einen Tab auf und führen Sie die folgenden SQL-Anweisungen aus, um die Beispieldatensätze einzufügen:
--Insert sample data
insert into cv_metadata.post_session_labs(id,descr,url) values('10-1','Gen AI to Agents, where do I begin? Get started with building a single agent application on ADK Python SDK','https://youtu.be/tyqnQQXpxtI');
insert into cv_metadata.post_session_labs(id,descr,url) values('10-2','Build an E2E multi-agent kitchen renovation app on ADK in Python with AlloyDB data and multiple tools','https://youtu.be/RdrMo2lNh0o');
insert into cv_metadata.post_session_labs(id,descr,url) values('10-3','Augment your multiagent app with tools from MCP Toolbox for AlloyDB','https://youtu.be/9VVNh77Q3ZU?si=oQ4fhAX59Y3D5iWa');
insert into cv_metadata.post_session_labs(id,descr,url) values('10-4','Build an agentic MCP client application using MCP Toolbox for BigQuery','https://youtu.be/HmluMag5s20');
insert into cv_metadata.post_session_labs(id,descr,url) values('10-5','Build a travel agent using ADK & MCP Toolbox for Cloud SQL','https://youtu.be/IWg5CH6ZNs0');
insert into cv_metadata.post_session_labs(id,descr,url) values('10-6','Build an E2E Patent Analysis Agent using ADK and Advanced Vector Search with AlloyDB','https://youtu.be/yCXJ3sk3Lxc');
insert into cv_metadata.post_session_labs(id,descr,url) values('10-7','Getting Started with MCP, ADK and A2A','https://youtu.be/JcQ_DyWc0X0');
5. Funktion zur Gewinnung von Videostatistiken erstellen
Wir müssen eine Cloud Run-Funktion erstellen und bereitstellen, um den Kern der Funktionalität zu implementieren, die darin besteht, ein strukturiertes Buchkapitel aus der Video-URL zu erstellen. Damit wir über ein unabhängiges Endpunkt-Toolbox-Tool darauf zugreifen können, haben wir gerade eine Cloud Run-Funktion erstellt und bereitgestellt. Alternativ können Sie dies als separate Funktion in die eigentliche Python-Anwendung für den Cloud Run-Job einfügen:
- Wechseln Sie in der Google Cloud Console zur Cloud Run-Seite.
- Klicken Sie auf „Funktion schreiben“.
- Geben Sie im Feld „Dienstname“ einen Namen ein, um Ihre Funktion zu beschreiben. Dienstnamen müssen mit einem Buchstaben beginnen und dürfen maximal 49 Zeichen enthalten, darunter Buchstaben, Zahlen oder Bindestriche. Dienstnamen dürfen nicht auf Bindestriche enden und müssen pro Region und Projekt eindeutig sein. Ein Dienstname kann später nicht mehr geändert werden und ist öffentlich sichtbar. ( generate-video-insights**)**
- Verwenden Sie in der Liste „Region“ den Standardwert oder wählen Sie die Region aus, in der Sie Ihre Funktion bereitstellen möchten. (us-central1 auswählen)
- Verwenden Sie in der Liste „Laufzeit“ den Standardwert oder wählen Sie eine Laufzeitversion aus. (Python 3.11 auswählen)
- Wählen Sie im Bereich „Authentifizierung“ die Option „Öffentlichen Zugriff zulassen“ aus.
- Klicken Sie auf die Schaltfläche „Erstellen“.
- Die Funktion wird erstellt und mit einer Vorlage für main.py und requirements.txt geladen.
- Ersetzen Sie sie durch die Dateien main.py und requirements.txt aus dem Repository dieses Projekts.
WICHTIGER HINWEIS: Denken Sie daran, in der Datei „main.py“ <<YOUR_PROJECT_ID>> durch Ihre Projekt-ID zu ersetzen.
- Stellen Sie den Endpunkt bereit und speichern Sie ihn, damit Sie ihn in der Quelle für den Cloud Run-Job verwenden können.
Ihr Endpunkt sollte so oder ähnlich aussehen: https://generate-video-insights-<<YOUR_POJECT_NUMBER>>.us-central1.run.app
Was ist in dieser Cloud Run-Funktion enthalten?
Gemini 2.5 Flash für die Videoverarbeitung
Für die Kernaufgabe, Videoinhalte zu verstehen und zusammenzufassen, haben wir das Gemini 2.5 Flash-Modell von Google verwendet. Gemini-Modelle sind leistungsstarke, multimodale KI-Modelle, die verschiedene Arten von Eingaben wie Text und mit bestimmten Integrationen auch Video verstehen und verarbeiten können.
In unserem Setup haben wir die Videodatei nicht direkt an Gemini übergeben. Stattdessen haben wir einen Text-Prompt gesendet, der die Video-URL enthielt und Gemini anwies, die (hypothetischen) Inhalte eines Videos unter dieser URL zu analysieren. Gemini 2.5 Flash kann zwar multimodale Eingaben verarbeiten, in dieser Pipeline wurde jedoch ein textbasierter Prompt verwendet, in dem die Art des Videos (eine praktische Lab-Sitzung) beschrieben und eine strukturierte JSON-Ausgabe angefordert wurde. Dabei werden die fortschrittlichen Funktionen von Gemini zum Ziehen von Schlussfolgerungen und zum Verarbeiten natürlicher Sprache genutzt, um Informationen basierend auf dem Kontext des Prompts abzuleiten und zusammenzufassen.
Der Gemini-Prompt: Die KI anleiten
Ein gut formulierter Prompt ist für KI-Modelle unerlässlich. Unser Prompt wurde so konzipiert, dass er sehr spezifische Informationen extrahiert und in einem JSON-Format strukturiert, sodass sie von unserer Anwendung leicht geparst werden können.
PROMPT_TEMPLATE = """
In the video at the following URL: {youtube_url}, which is a hands-on lab session:
Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Take only the first 30-40 minutes of the video without throwing any error.
Analyze the rest of the content of the video.
Extract and synthesize information to create a book chapter section with the following structure, formatted as a JSON string:
1. **chapter_title:** A concise and engaging title for the chapter.
2. **introduction_context:** Briefly explain the relevance of this video segment within a broader learning context.
3. **what_will_build:** Clearly state the specific task or goal accomplished in this video segment.
4. **technologies_and_services:** List all mentioned Google Cloud services and any other relevant technologies (e.g., programming languages, tools, frameworks).
5. **how_we_did_it:** Provide a clear, numbered step-by-step guide of the actions performed. Include any exact commands or code snippets as they appear in the video. Format code/commands using markdown backticks (e.g., `my-command`).
6. **source_code_url:** Provide a URL to the source code repository if mentioned or implied. If not available, use "N/A".
7. **demo_url:** Provide a URL to a demo if mentioned or implied. If not available, use "N/A".
8. **qa_segment:** Generate 10–15 relevant questions based on the content of this segment, along with concise answers. Ensure the questions are thought-provoking and test understanding of the material.
REMEMBER: Ignore the credits set-up part particularly the coupon code and credits link aspect should not be included in your analysis or the extaction of context. Also exclude any credentials that are explicit in the video.
Format the entire output as a JSON string. Ensure all keys and string values are enclosed in double quotes.
Example structure:
...
"""
Dieser Prompt ist sehr spezifisch und weist Gemini an, als eine Art Lehrkraft zu fungieren. Die Anfrage nach einem JSON-String sorgt für eine strukturierte, maschinenlesbare Ausgabe.
Hier ist der Code zum Analysieren von Videoeingaben und zum Zurückgeben des zugehörigen Kontexts:
def process_videos_batch(video_url: str, PROMPT_TEMPLATE: str) -> str:
"""
Processes a video URL, generates chapter content using Gemini
"""
formatted_prompt = PROMPT_TEMPLATE.format(youtube_url=video_url)
try:
client = genai.Client(vertexai=True,project='<<YOUR_PROJECT_ID>>',location='us-central1',http_options=HttpOptions(api_version="v1"))
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=formatted_prompt,
)
print(response.text)
except Exception as e:
print(f"An error occurred during content generation: {e}")
return f"Error processing video: {e}"
print(response.text)
return response.text
Das obige Snippet veranschaulicht die Kernfunktion des Anwendungsfalls. Es empfängt eine Video-URL und verwendet das Gemini-Modell über den Vertex AI-Client, um den Videoinhalt zu analysieren und gemäß dem Prompt relevante Informationen zu extrahieren. Der extrahierte Kontext wird dann zur weiteren Verarbeitung zurückgegeben. Dies stellt einen synchronen Vorgang dar, bei dem der Cloud Run-Job auf den Abschluss des Dienstes wartet.
6. Pipeline-Anwendungsentwicklung (Python)
Unsere zentrale Pipeline-Logik befindet sich im Quellcode der Anwendung, der in einen Cloud Run-Job containerisiert wird, der die gesamte parallele Ausführung orchestriert. Hier sehen Sie die wichtigsten Teile:
Die Rolle des Orchestrators bei der Verwaltung des Workflows und der Gewährleistung der Datenintegrität:
# ... (imports and configuration) ...
def process_batch_from_bq(request_or_trigger_data=None):
# ... (initial checks for config) ...
BATCH_SIZE = 5 # Fetch 5 URLs at a time per job instance
query = f"""
SELECT url, id
FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
WHERE status = 'PENDING'
LIMIT {BATCH_SIZE}
"""
try:
logging.info(f"Fetching up to {BATCH_SIZE} pending URLs from BigQuery...")
rows = bq_client.query(query).result() # job_should_wait=True is default for result()
pending_urls_data = []
for row in rows:
pending_urls_data.append({"url": row.url, "id": row.id})
if not pending_urls_data:
logging.info("No pending URLs found. Job finished.")
return "No pending URLs found. Job finished.", 200
row_ids_to_process = [item["id"] for item in pending_urls_data]
# --- Mark as PROCESSING to prevent duplicate work ---
update_status_query = f"""
UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
SET status = 'PROCESSING'
WHERE id IN UNNEST(@row_ids_to_process)
"""
status_update_job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ArrayQueryParameter("row_ids_to_process", "STRING", values=row_ids_to_process)
]
)
update_status_job = bq_client.query(update_status_query, job_config=status_update_job_config)
update_status_job.result()
logging.info(f"Marked {len(row_ids_to_process)} URLs as 'PROCESSING'.")
# ... (rest of the code for parallel processing and writing) ...
except Exception as e:
# ... (error handling) ...
In diesem Snippet wird zuerst ein Batch von Video-URLs mit dem Status „PENDING“ aus der BigQuery-Quelltabelle abgerufen. Anschließend wird der Status dieser URLs in BigQuery auf „PROCESSING“ (Wird verarbeitet) aktualisiert, um eine doppelte Verarbeitung zu verhindern.
Parallele Verarbeitung mit ThreadPoolExecutor und Aufrufen des Prozessordienstes:
# ... (inside process_batch_from_bq function) ...
# --- Step 3: Call the external URL Processor Service in parallel ---
processed_results = {}
futures = []
# ThreadPoolExecutor for I/O-bound tasks (HTTP requests to the processor service)
# MAX_CONCURRENT_TASKS_PER_INSTANCE controls parallelism within one job instance.
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TASKS_PER_INSTANCE) as executor:
for item in pending_urls_data:
url = item["url"]
row_id = item["id"]
# Submit the task: call the processor service for this URL
future = executor.submit(call_url_processor_service, url)
futures.append((row_id, future))
# Collect results as they complete
for row_id, future in futures:
try:
content = future.result(timeout=URL_PROCESSOR_TIMEOUT_SECONDS)
# Check if the processor service returned an error message
if content.startswith("ERROR:"):
processed_results[row_id] = {"context": content, "status": "FAILED_PROCESSING"}
else:
processed_results[row_id] = {"context": content, "status": "COMPLETED"}
except TimeoutError:
logging.warning(f"URL processing timed out (service call for row ID {row_id}). Marking as FAILED.")
processed_results[row_id] = {"context": f"ERROR: Processing timed out for '{row_id}'.", "status": "FAILED_PROCESSING"}
except Exception as e:
logging.error(f"Exception during future result retrieval for row ID {row_id}: {e}")
processed_results[row_id] = {"context": f"ERROR: Unexpected error during result retrieval for '{row_id}'. Details: {e}", "status": "FAILED_PROCESSING"}
In diesem Teil des Codes wird ThreadPoolExecutor verwendet, um die abgerufenen Video-URLs parallel zu verarbeiten. Für jede URL wird ein Task gesendet, um den Cloud Run-Dienst (URL-Prozessor) asynchron aufzurufen. So kann der Cloud Run-Job mehrere Videos gleichzeitig effizient verarbeiten, was die Gesamtleistung der Pipeline verbessert. Das Snippet verarbeitet auch potenzielle Zeitüberschreitungen und Fehler des Prozessordienstes.
Daten aus BigQuery lesen und in BigQuery schreiben
Die Hauptinteraktion mit BigQuery besteht darin, ausstehende URLs abzurufen und sie dann mit verarbeiteten Ergebnissen zu aktualisieren.
# ... (inside process_batch_from_bq) ...
BATCH_SIZE = 5
query = f"""
SELECT url, id
FROM `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_SOURCE}`
WHERE status = 'PENDING'
LIMIT {BATCH_SIZE}
"""
rows = bq_client.query(query).result()
pending_urls_data = []
for row in rows:
pending_urls_data.append({"url": row.url, "id": row.id})
# ... (rest of fetching and marking as PROCESSING) ...
Ergebnisse zurück in BigQuery schreiben:
# --- Step 4: Write results back to BigQuery ---
logging.info(f"Writing {len(processed_results)} results back to BigQuery...")
successful_updates = 0
for row_id, data in processed_results.items():
if update_bq_row(row_id, data["context"], data["status"]):
successful_updates += 1
logging.info(f"Finished processing. {successful_updates} out of {len(processed_results)} rows updated successfully.")
# ... (return statement) ...
# --- Helper to update a single row in BigQuery ---
def update_bq_row(row_id, context, status="COMPLETED"):
"""Updates a specific row in the target BigQuery table."""
# ... (checks for config) ...
update_query = f"""
UPDATE `{BIGQUERY_PROJECT}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE_TARGET}`
SET
context = @context,
status = @status
WHERE id = @row_id
"""
# Correctly defining query parameters for the UPDATE statement
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("context", "STRING", value=context),
bigquery.ScalarQueryParameter("status", "STRING", value=status),
# Assuming 'id' column is STRING. Adjust if it's INT64.
bigquery.ScalarQueryParameter("row_id", "STRING", value=row_id)
]
)
try:
update_job = bq_client.query(update_query, job_config=job_config)
update_job.result() # Wait for the job to complete
logging.info(f"Successfully updated BigQuery row ID {row_id} with status {status}.")
return True
except Exception as e:
logging.error(f"Failed to update BigQuery row ID {row_id}: {e}")
return False
Die oben gezeigten Code-Snippets konzentrieren sich auf die Dateninteraktion zwischen dem Cloud Run-Job und BigQuery. Es werden ein Batch mit „PENDING“-Video-URLs und deren IDs aus der Quelltabelle abgerufen. Nachdem die URLs verarbeitet wurden, wird in diesem Snippet gezeigt, wie der extrahierte Kontext und Status („COMPLETED“ oder „FAILED_PROCESSING“) mit einer UPDATE-Abfrage zurück in die BigQuery-Zieltabelle geschrieben werden. Mit diesem Snippet wird der Datenverarbeitungszyklus abgeschlossen. Sie enthält auch die Hilfsfunktion „update_bq_row“, die zeigt, wie Parameter der Update-Anweisung definiert werden.
Anwendungseinrichtung
Die Anwendung ist als einzelnes Python-Skript strukturiert, das in einem Container ausgeführt wird. Dabei werden Google Cloud-Clientbibliotheken und das Functions-Framework verwendet, um den Einstiegspunkt zu definieren.
- Abhängigkeiten: google-cloud-bigquery, requests
- Konfiguration: Alle wichtigen Einstellungen (BigQuery-Projekt/-Dataset/-Tabelle, URL-Prozessorservice-URL) werden aus Umgebungsvariablen geladen, wodurch die Anwendung portabel und sicher ist.
- Hauptlogik: Die Funktion „process_batch_from_bq“ orchestriert den gesamten Workflow.
- Integration externer Dienste: Die Funktion „call_url_processor_service“ übernimmt die Kommunikation mit dem separaten Cloud Run-Dienst.
- BigQuery-Interaktion: bq_client wird zum Abrufen von URLs und zum Aktualisieren von Ergebnissen verwendet, wobei die Parameter korrekt verarbeitet werden.
- Parallelität: concurrent.futures.ThreadPoolExecutor verwaltet gleichzeitige Aufrufe des externen Dienstes.
- Einstiegspunkt: Der Python-Code mit dem Namen „main.py“ dient als Einstiegspunkt für die Batchverarbeitung.
Richten wir die Anwendung jetzt ein:
- Rufen Sie zuerst Ihr Cloud Shell-Terminal auf und klonen Sie das Repository:
git clone https://github.com/AbiramiSukumaran/video-context-crj
- Rufen Sie den Cloud Shell-Editor auf. Dort sehen Sie den neu erstellten Ordner video-context-crj.
- Löschen Sie die folgenden Schritte, da sie bereits in den vorherigen Abschnitten ausgeführt wurden:
- Löschen Sie den Ordner „Cloud_Run_Function“.
- Navigieren Sie zum Projektordner video-context-crj. Die Projektstruktur sollte so aussehen:
7. Dockerfile-Einrichtung und Containerisierung
Wenn wir diese Logik als Cloud Run-Job bereitstellen möchten, müssen wir sie containerisieren. Bei der Containerisierung werden unser Anwendungscode, seine Abhängigkeiten und die Laufzeitumgebung in einem portierbaren Image verpackt.
Ersetzen Sie die Platzhalter (fett formatierter Text) im Dockerfile durch Ihre Werte:
# Use an official Python runtime as a parent image
FROM python:3.12-alpine
# Set the working directory in the container
WORKDIR /app
# Copy the requirements file into the container
COPY requirements.txt .
# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt
# Copy the rest of the application code
COPY . .
# Define environment variables for configuration (these will be overridden during deployment)
ENV BIGQUERY_PROJECT="YOUR-project"
ENV BIGQUERY_DATASET="YOUR-dataset"
ENV BIGQUERY_TABLE_SOURCE="YOUR-source-table"
ENV URL_PROCESSOR_SERVICE_URL="ENDPOINT FOR VIDEO PROCESSING"
ENV BIGQUERY_TABLE_TARGET = "YOUR-destination-table"
ENTRYPOINT ["python", "main.py"]
Im obigen Dockerfile-Snippet wird das Basis-Image definiert, Abhängigkeiten werden installiert, unser Code wird kopiert und der Befehl zum Ausführen unserer Anwendung mit dem Functions Framework mit der richtigen Zielfunktion (process_batch_from_bq) wird festgelegt. Dieses Image wird dann per Push an Artifact Registry übertragen.
Containerisieren
Um die Anwendung in einen Container zu packen, rufen Sie das Cloud Shell-Terminal auf und führen Sie die folgenden Befehle aus. Ersetzen Sie dabei den Platzhalter <<YOUR_PROJECT_ID>>:
export CONTAINER_IMAGE="gcr.io/<<YOUR_PROJECT_ID>>/batch-url-processor-orchestrator:latest"
gcloud builds submit --tag $CONTAINER_IMAGE .
Nachdem das Container-Image erstellt wurde, sollten Sie die folgende Ausgabe sehen:
Unser Container wurde jetzt erstellt und in Artifact Registry gespeichert. Wir können mit dem nächsten Schritt fortfahren.
8. Cloud Run-Jobs erstellen
Zum Bereitstellen des Jobs müssen Sie das Container-Image erstellen und dann eine Cloud Run-Jobressource erstellen.
Wir haben das Container-Image bereits erstellt und in Artifact Registry gespeichert. Erstellen wir nun den Job.
- Rufen Sie die Cloud Run-Jobs-Konsole auf und klicken Sie auf „Container bereitstellen“:
- Wählen Sie das Container-Image aus, das wir gerade erstellt haben:
- Geben Sie die weiteren Konfigurationsdetails so ein:
- Legen Sie die Aufgabenkapazität so fest:
Da wir Datenbankschreibvorgänge haben und die Parallelisierung (max_instances und task concurrency) bereits im Code behandelt wird, legen wir die Anzahl der gleichzeitigen Aufgaben auf 1 fest. Sie können diesen Wert aber nach Bedarf erhöhen. Ziel ist es, dass die Aufgaben gemäß Konfiguration mit dem im Parallelitätsgrad festgelegten Parallelitätsgrad ausgeführt werden.
- Klicken Sie auf „Erstellen“
Ihr Cloud Run-Job wird erstellt.
Funktionsweise
Eine Containerinstanz unseres Jobs wird gestartet. Es werden BigQuery-Abfragen ausgeführt, um einen kleinen Batch (BATCH_SIZE) von URLs abzurufen, die als PENDING gekennzeichnet sind. Der Status dieser abgerufenen URLs wird in BigQuery sofort auf PROCESSING aktualisiert, damit sie nicht von anderen Jobinstanzen aufgegriffen werden. Es wird ein ThreadPoolExecutor erstellt und für jede URL im Batch eine Aufgabe übergeben. Bei jeder Aufgabe wird die Funktion „call_url_processor_service“ aufgerufen. Wenn die call_url_processor_service-Anfragen abgeschlossen sind (oder ein Zeitlimit überschritten wird/sie fehlschlagen), werden ihre Ergebnisse (entweder der KI-generierte Kontext oder eine Fehlermeldung) erfasst und der ursprünglichen row_id zugeordnet. Sobald alle Aufgaben für den Batch abgeschlossen sind, werden die gesammelten Ergebnisse im Job durchlaufen und die Kontext- und Statusfelder für jede entsprechende Zeile in BigQuery werden aktualisiert. Wenn der Vorgang erfolgreich ist, wird die Jobinstanz ordnungsgemäß beendet. Wenn nicht behandelte Fehler auftreten, wird eine Ausnahme ausgelöst, die je nach Jobkonfiguration möglicherweise einen Wiederholungsversuch durch Cloud Run Jobs auslöst.
Cloud Run-Jobs: Orchestrierung
Hier liegt die wahre Stärke von Cloud Run-Jobs.
Serverlose Batchverarbeitung: Wir erhalten eine verwaltete Infrastruktur, die so viele Containerinstanzen wie nötig (bis zu MAX_INSTANCES) starten kann, um unsere Daten gleichzeitig zu verarbeiten.
Parallelitätssteuerung: Wir definieren MAX_INSTANCES (wie viele Jobs insgesamt parallel ausgeführt werden können) und TASK_CONCURRENCY (wie viele Vorgänge jede Jobinstanz parallel ausführt). Dies ermöglicht eine detaillierte Steuerung des Durchsatzes und der Ressourcennutzung.
Fehlertoleranz: Wenn eine Jobinstanz mittendrin fehlschlägt, können Cloud Run-Jobs so konfiguriert werden, dass der gesamte Job oder bestimmte Aufgaben noch einmal versucht werden. So geht die Datenverarbeitung nicht verloren.
Vereinfachte Architektur: Da HTTP-Aufrufe direkt im Job orchestriert werden und BigQuery für die Statusverwaltung verwendet wird, vermeiden wir die Komplexität der Einrichtung und Verwaltung von Pub/Sub, den zugehörigen Themen, Abos und der Bestätigungslogik.
MAX_INSTANCES im Vergleich zu TASK_CONCURRENCY:
MAX_INSTANCES::Die Gesamtzahl der Jobinstanzen, die während der gesamten Jobausführung gleichzeitig ausgeführt werden können. Das ist Ihr wichtigstes Mittel, um viele URLs gleichzeitig zu verarbeiten.
TASK_CONCURRENCY::Die Anzahl der parallelen Vorgänge (Aufrufe Ihres Prozessordienstes), die von einer einzelnen Instanz Ihres Jobs ausgeführt werden. So lässt sich die CPU/das Netzwerk einer Instanz auslasten.
9. Cloud Run-Job ausführen und überwachen
Video-Metadaten
Bevor wir auf „Ausführen“ klicken, sehen wir uns den Status der Daten an.
Rufen Sie BigQuery Studio auf und führen Sie die folgende Abfrage aus:
Select id, descr, url, status from cv_metadata.post_session_labs where status = ‘PENDING'
Wir haben einige Beispiel-Datensätze mit Video-URLs und dem Status „AUSSTEHEND“. Unser Ziel ist es, das Feld „Kontext“ mit Statistiken aus dem Video in dem im Prompt beschriebenen Format zu füllen.
Job-Trigger
Führen Sie den Job aus, indem Sie in der Cloud Run-Jobs-Konsole auf die Schaltfläche AUSFÜHREN klicken. Der Fortschritt und der Status des Jobs werden in der Konsole angezeigt:
Auf dem Tab „LOGS“ unter „OBSERVABILITY“ finden Sie Monitoring-Schritte und andere Details zum Job und zu den Aufgaben.
10. Ergebnisanalyse
Nach Abschluss des Jobs sollte der Kontext für jede Video-URL in der Tabelle aktualisiert werden:
Ausgabekontext (für einen der Datensätze)
{
"chapter_title": "Building a Travel Agent with ADK and MCP Toolbox",
"introduction_context": "This chapter section is derived from a hands-on lab session focused on building a travel agent. It details the process of integrating various Google Cloud services and tools to create an intelligent agent capable of querying a database and interacting with users.",
"what_will_build": "The goal is to build and deploy a travel agent that can answer user queries about hotels using the Agent Development Kit (ADK) and the MCP Toolbox for Databases, connecting to a PostgreSQL database.",
"technologies_and_services": [
"Google Cloud Platform",
"Cloud SQL for PostgreSQL",
"Agent Development Kit (ADK)",
"MCP Toolbox for Databases",
"Cloud Shell",
"Cloud Run",
"Python",
"Docker"
],
"how_we_did_it": [
"Provision a Cloud SQL instance for PostgreSQL with the 'hoteldb-instance'.",
"Prepare the 'hotels' database by creating a table with relevant schema and populating it with sample data.",
"Set up the MCP Toolbox for Databases by downloading and configuring the necessary components.",
"Install the Agent Development Kit (ADK) and its dependencies.",
"Create a new agent using the ADK, specifying the model (Gemini 2.0-flash) and backend (Vertex AI).",
"Modify the agent's code to connect to the PostgreSQL database via the MCP Toolbox.",
"Run the agent locally to test its functionality and ability to interact with the database.",
"Deploy the agent to Cloud Run for cloud-based access and further testing.",
"Interact with the deployed agent through a web console or command line to query hotel information."
],
"source_code_url": "N/A",
"demo_url": "N/A",
"qa_segment": [
{
"question": "What is the primary purpose of the MCP Toolbox for Databases?",
"answer": "The MCP Toolbox for Databases is an open-source MCP server designed to help users develop tools faster, more securely, and by handling complexities like connection pooling, authentication, and more."
},
{
"question": "Which Google Cloud service is used to create the database for the travel agent?",
"answer": "Cloud SQL for PostgreSQL is used to create the database."
},
{
"question": "What is the role of the Agent Development Kit (ADK)?",
"answer": "The ADK helps build Generative AI tools that allow agents to access data in a database. It enables agents to perform actions, interact with users, utilize external tools, and coordinate with other agents."
},
{
"question": "What command is used to create the initial agent application using ADK?",
"answer": "The command `adk create hotel-agent-app` is used to create the agent application."
},
....
Sie sollten diese JSON-Struktur jetzt für komplexere Agent-Anwendungsfälle verwenden können.
Warum dieser Ansatz?
Diese Architektur bietet erhebliche strategische Vorteile:
- Kosteneffizienz: Bei serverlosen Diensten zahlen Sie nur für die tatsächliche Nutzung. Cloud Run-Jobs werden bei Nichtverwendung auf null skaliert.
- Skalierbarkeit: Zehntausende von URLs lassen sich problemlos verarbeiten, indem die Instanz- und Nebenläufigkeitseinstellungen von Cloud Run-Jobs angepasst werden.
- Agilität: Schnelle Entwicklungs- und Bereitstellungszyklen für neue Verarbeitungslogik oder KI-Modelle durch einfaches Aktualisieren der enthaltenen Anwendung und des zugehörigen Dienstes.
- Geringerer operativer Aufwand: Es müssen keine Server gepatcht oder verwaltet werden. Google kümmert sich um die Infrastruktur.
- Demokratisierung von KI: Ermöglicht die Verarbeitung von Batchaufgaben mit fortschrittlicher KI, ohne dass umfassende ML Ops-Kenntnisse erforderlich sind.
11. Bereinigen
So vermeiden Sie, dass Ihrem Google Cloud-Konto die in diesem Beitrag verwendeten Ressourcen in Rechnung gestellt werden:
- Wechseln Sie in der Google Cloud Console zur Seite Ressourcenmanager.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Beenden, um das Projekt zu löschen.
12. Glückwunsch
Glückwunsch! Indem Sie unsere Lösung auf Cloud Run-Jobs aufbauen und die Leistungsfähigkeit von BigQuery für die Datenverwaltung und einen externen Cloud Run-Dienst für die KI-Verarbeitung nutzen, haben Sie ein hochgradig skalierbares, kostengünstiges und wartungsfreundliches System entwickelt. Dieses Muster entkoppelt die Verarbeitungslogik, ermöglicht die parallele Ausführung ohne komplexe Infrastruktur und beschleunigt die Zeit bis zur Erkenntnis erheblich.
Wir empfehlen Ihnen, Cloud Run-Jobs für Ihre eigenen Batchverarbeitungsanforderungen zu nutzen. Ob es um die Skalierung von KI-Analysen, die Ausführung von ETL-Pipelines oder die Durchführung regelmäßiger Datenaufgaben geht – dieser serverlose Ansatz bietet eine leistungsstarke und effiziente Lösung. Hier finden Sie weitere Informationen.
Wenn Sie alle Ihre Apps serverlos und agentenbasiert erstellen und bereitstellen möchten, registrieren Sie sich für Code Vipassana. Dort geht es darum, datengesteuerte generative agentenbasierte Anwendungen zu beschleunigen.