Erweiterte RAG-Techniken

1. Einführung

Übersicht

Retrieval Augmented Generation (RAG) verbessert die Antworten von Large Language Models (LLMs), indem sie auf externem Wissen basieren. Für die Entwicklung eines produktionsreifen RAG-Systems ist jedoch mehr als nur eine einfache Vektorsuche erforderlich. Sie müssen optimieren, wie Daten aufgenommen werden, wie relevante Ergebnisse eingestuft werden und wie Nutzeranfragen verarbeitet werden.

In diesem umfassenden Lab erstellen Sie eine robuste RAG-Anwendung mit Cloud SQL for PostgreSQL (erweitert mit pgvector) und Vertex AI. Sie werden drei fortgeschrittene Techniken durchlaufen:

  1. Chunking-Strategien:Sie sehen, wie sich verschiedene Methoden zum Aufteilen von Text (Zeichen, rekursiv, Token) auf die Qualität des Abrufs auswirken.
  2. Neubewertung:Sie implementieren den Vertex AI Reranker, um Suchergebnisse zu optimieren und das Problem „Lost in the Middle“ zu beheben.
  3. Anfrage-Transformation:Sie verwenden Gemini, um Nutzeranfragen mit Techniken wie HyDE (Hypothetical Document Embeddings) und Step-back Prompting zu optimieren.

Aufgaben

  • Richten Sie eine Cloud SQL for PostgreSQL-Instanz mit pgvector ein.
  • Erstellen Sie eine Pipeline zur Datenaufnahme, die Text mithilfe verschiedener Strategien in Chunks zerlegt und Embeddings in Cloud SQL speichert.
  • Semantische Suchanfragen ausführen und die Qualität der Ergebnisse verschiedener Chunking-Methoden vergleichen
  • Integrieren Sie einen Reranker, um abgerufene Dokumente nach Relevanz neu zu sortieren.
  • Implementieren Sie LLM-basierte Abfragetransformationen, um den Abruf bei mehrdeutigen oder komplexen Fragen zu verbessern.

Lerninhalte

  • LangChain mit Vertex AI und Cloud SQL verwenden
  • Die Auswirkungen der Textaufteilungen Character, Recursive und Token.
  • Vektorsuche in PostgreSQL implementieren
  • ContextualCompressionRetriever zum Neubewerten verwenden
  • So implementieren Sie HyDE und Step-back Prompting.

2. Projekt einrichten

Google-Konto

Wenn Sie noch kein privates Google-Konto haben, müssen Sie ein Google-Konto erstellen.

Verwenden Sie stattdessen ein privates Konto.

In der Google Cloud Console anmelden

Melden Sie sich mit einem privaten Google-Konto in der Google Cloud Console an.

Abrechnung aktivieren

Google Cloud-Guthaben im Wert von 5 $einlösen (optional)

Für diesen Workshop benötigen Sie ein Rechnungskonto mit Guthaben. Wenn Sie Ihre eigene Abrechnung verwenden möchten, können Sie diesen Schritt überspringen.

  1. Klicken Sie auf diesen Link und melden Sie sich mit einem privaten Google-Konto an. Sie sehen etwa Folgendes: Zur Seite „Guthaben“
  2. Klicken Sie auf die Schaltfläche CLICK HERE TO ACCESS YOUR CREDITS (HIER KLICKEN, UM AUF IHR GUTHABEN ZUZUGREIFEN). Sie werden auf eine Seite weitergeleitet, auf der Sie Ihr Abrechnungsprofil einrichten können. Seite zum Einrichten des Abrechnungsprofils
  3. Klicken Sie auf Bestätigen. Sie sind jetzt mit einem Google Cloud Platform-Testrechnungskonto verbunden. Screenshot der Abrechnungsübersicht

Privates Rechnungskonto einrichten

Wenn Sie die Abrechnung mit Google Cloud-Guthaben eingerichtet haben, können Sie diesen Schritt überspringen.

Klicken Sie hier, um die Abrechnung in der Cloud Console zu aktivieren und ein privates Rechnungskonto einzurichten.

Hinweise:

  • Die Kosten für Cloud-Ressourcen für dieses Lab sollten weniger als 1 $betragen.
  • Sie können die Schritte am Ende dieses Labs ausführen, um Ressourcen zu löschen und so weitere Kosten zu vermeiden.
  • Neuen Nutzern steht die kostenlose Testversion mit einem Guthaben von 300$ zur Verfügung.

Projekt erstellen (optional)

Wenn Sie kein aktuelles Projekt haben, das Sie für dieses Lab verwenden möchten, erstellen Sie hier ein neues Projekt.

3. Cloud Shell-Editor öffnen

  1. Klicken Sie auf diesen Link, um direkt zum Cloud Shell-Editor zu gelangen.
  2. Wenn Sie heute an irgendeinem Punkt zur Autorisierung aufgefordert werden, klicken Sie auf Autorisieren, um fortzufahren.Klicken Sie, um Cloud Shell zu autorisieren.
  3. Wenn das Terminal nicht unten auf dem Bildschirm angezeigt wird, öffnen Sie es:
    • Klicken Sie auf Ansehen.
    • Klicken Sie auf TerminalNeues Terminal im Cloud Shell-Editor öffnen.
  4. Legen Sie im Terminal Ihr Projekt mit diesem Befehl fest:
    gcloud config set project [PROJECT_ID]
    
    • Beispiel:
      gcloud config set project lab-project-id-example
      
    • Wenn Sie sich nicht mehr an Ihre Projekt-ID erinnern, können Sie alle Ihre Projekt-IDs mit folgendem Befehl auflisten:
      gcloud projects list
      
      Projekt-ID im Cloud Shell Editor-Terminal festlegen
  5. Es sollte folgende Meldung angezeigt werden:
    Updated property [core/project].
    

4. APIs aktivieren

Um diese Lösung zu erstellen, müssen Sie mehrere Google Cloud APIs für Vertex AI, Cloud SQL und den Reranking-Dienst aktivieren.

  1. Aktivieren Sie die APIs im Terminal:
    gcloud services enable \
      aiplatform.googleapis.com \
      sqladmin.googleapis.com \
      cloudresourcemanager.googleapis.com \
      serviceusage.googleapis.com \
      discoveryengine.googleapis.com
    
    
    

Einführung in die APIs

  • Vertex AI API (aiplatform.googleapis.com): Ermöglicht die Verwendung von Gemini für die Generierung und von Vertex AI Embeddings für die Vektorisierung von Text.
  • Cloud SQL Admin API (sqladmin.googleapis.com): Ermöglicht die programmatische Verwaltung von Cloud SQL-Instanzen.
  • Discovery Engine API (discoveryengine.googleapis.com): Ermöglicht die Funktionen von Vertex AI Reranker.
  • Service Usage API (serviceusage.googleapis.com): Erforderlich, um Dienstkontingente zu prüfen und zu verwalten.

5. Virtuelle Umgebung erstellen und Abhängigkeiten installieren

Bevor Sie ein Python-Projekt starten, sollten Sie eine virtuelle Umgebung erstellen. Dadurch werden die Abhängigkeiten des Projekts isoliert und Konflikte mit anderen Projekten oder den globalen Python-Paketen des Systems vermieden.

  1. Erstellen Sie einen Ordner mit dem Namen rag-labs und wechseln Sie in diesen Ordner. Führen Sie im Terminal den folgenden Code aus:
    mkdir rag-labs && cd rag-labs
    
  2. Erstellen und aktivieren Sie eine virtuelle Umgebung:
    uv venv --python 3.12
    source .venv/bin/activate
    
  3. Erstellen Sie eine requirements.txt-Datei mit den erforderlichen Abhängigkeiten. Führen Sie im Terminal den folgenden Code aus:
    cloudshell edit requirements.txt
    
  4. Fügen Sie die folgenden optimierten Abhängigkeiten in requirements.txt ein. Diese Versionen sind fixiert, um Konflikte zu vermeiden und die Installation zu beschleunigen.
    # Core LangChain & AI
    langchain-community==0.3.31
    langchain-google-vertexai==2.1.2
    langchain-google-community[vertexaisearch]==2.0.10
    
    # Google Cloud
    google-cloud-storage==2.19.0
    google-cloud-aiplatform[langchain]==1.130.0
    
    # Database
    cloud-sql-python-connector[pg8000]==1.19.0
    sqlalchemy==2.0.45
    pgvector==0.4.2
    
    # Utilities
    tiktoken==0.12.0
    python-dotenv==1.2.1
    requests==2.32.5
    
  5. Installieren Sie die Abhängigkeiten:
    uv pip install -r requirements.txt
    

6. Cloud SQL für PostgreSQL einrichten

In dieser Aufgabe stellen Sie eine Cloud SQL for PostgreSQL-Instanz bereit, erstellen eine Datenbank und bereiten sie für die Vektorsuche vor.

Cloud SQL-Konfiguration definieren

  1. Erstellen Sie eine .env-Datei, in der Sie Ihre Konfiguration speichern. Führen Sie im Terminal den folgenden Code aus:
    cloudshell edit .env
    
  2. Fügen Sie die folgende Konfiguration in .env ein.
    # Project Config
    PROJECT_ID="[YOUR_PROJECT_ID]"
    REGION="us-central1"
    
    # Database Config
    SQL_INSTANCE_NAME="rag-pg-instance-1"
    SQL_DATABASE_NAME="rag_harry_potter_db"
    SQL_USER="rag_user"
    SQL_PASSWORD="StrongPassword123!" 
    
    # RAG Config
    PGVECTOR_COLLECTION_NAME="rag_harry_potter"
    RANKING_LOCATION_ID="global"
    
    # Connection Name (Auto-generated in scripts usually, but useful to have)
    DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}"
    
  3. Ersetzen Sie [YOUR_PROJECT_ID] durch Ihre tatsächliche Google Cloud-Projekt-ID. (z.B. PROJECT_ID = "google-cloud-labs")
    Wenn Sie sich nicht an Ihre Projekt-ID erinnern können, führen Sie den folgenden Befehl in Ihrem Terminal aus. Sie sehen dann eine Liste aller Ihrer Projekte und deren IDs.
    gcloud projects list
    
  4. Laden Sie die Variablen in Ihre Shell-Sitzung:
    source .env
    

Instanz und Datenbank erstellen

  1. Cloud SQL für PostgreSQL-Instanz erstellen. Mit diesem Befehl wird eine kleine Instanz erstellt, die für dieses Lab geeignet ist.
    gcloud sql instances create ${SQL_INSTANCE_NAME} \
      --database-version=POSTGRES_15 \
      --tier=db-g1-small \
      --region=${REGION} \
      --project=${PROJECT_ID}
    
  2. Wenn die Instanz bereit ist, erstellen Sie die Datenbank:
    gcloud sql databases create ${SQL_DATABASE_NAME} \
      --instance=${SQL_INSTANCE_NAME} \
      --project=${PROJECT_ID}
    
  3. Erstellen Sie den Datenbanknutzer:
    gcloud sql users create ${SQL_USER} \
      --instance=${SQL_INSTANCE_NAME} \
      --password=${SQL_PASSWORD} \
      --project=${PROJECT_ID}
    

pgvector-Erweiterung aktivieren

Mit der Erweiterung pgvector können Vektoreinbettungen in PostgreSQL gespeichert und durchsucht werden. Sie müssen sie explizit in Ihrer Datenbank aktivieren.

  1. Erstellen Sie ein Skript mit dem Namen enable_pgvector.py. Führen Sie im Terminal den folgenden Code aus:
    cloudshell edit enable_pgvector.py
    
  2. Fügen Sie den folgenden Code in enable_pgvector.py ein. Dieses Script stellt eine Verbindung zu Ihrer Datenbank her und führt CREATE EXTENSION IF NOT EXISTS vector; aus.
    import os
    import sqlalchemy
    from google.cloud.sql.connector import Connector, IPTypes
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Config
    project_id = os.getenv("PROJECT_ID")
    region = os.getenv("REGION")
    instance_name = os.getenv("SQL_INSTANCE_NAME")
    db_user = os.getenv("SQL_USER")
    db_pass = os.getenv("SQL_PASSWORD")
    db_name = os.getenv("SQL_DATABASE_NAME")
    instance_connection_name = f"{project_id}:{region}:{instance_name}"
    
    def getconn():
        with Connector() as connector:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=IPTypes.PUBLIC,
            )
            return conn
    
    def enable_pgvector():
        pool = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        with pool.connect() as db_conn:
            # Check if extension exists
            result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone()
            if result:
                logging.info("pgvector extension is already enabled.")
            else:
                logging.info("Enabling pgvector extension...")
                db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
                db_conn.commit()
                logging.info("pgvector extension enabled successfully.")
    
    if __name__ == "__main__":
        enable_pgvector()
    
  3. Führen Sie das Script aus:
    python enable_pgvector.py
    

7. Teil 1: Blockstrategien

Der erste Schritt in jeder RAG-Pipeline besteht darin, Dokumente in ein Format zu transformieren, das das LLM verstehen kann: Chunks.

LLMs haben ein Kontextfensterlimit (die Menge an Text, die sie gleichzeitig verarbeiten können). Außerdem wird die Information verwässert, wenn ein 50-seitiges Dokument abgerufen wird, um eine bestimmte Frage zu beantworten. Wir teilen Dokumente in kleinere „Chunks“ auf, um relevante Informationen zu isolieren.

Wie Sie den Text aufteilen, ist jedoch von großer Bedeutung:

  • Zeichentrenner:Teilt die Daten strikt nach der Anzahl der Zeichen auf. Das ist zwar schnell, aber riskant, da Wörter oder Sätze halbiert werden können, wodurch die semantische Bedeutung verloren geht.
  • Rekursiver Splitter:Versucht zuerst, nach Absatz, dann nach Satz und dann nach Wort aufzuteilen. Es wird versucht, semantische Einheiten zusammenzuhalten.
  • Token Splitter:Die Aufteilung erfolgt auf Grundlage des eigenen Vokabulars (Tokens) des LLM. So passen die Chunks perfekt in Kontextfenster, die Generierung kann jedoch rechenintensiver sein.

In diesem Abschnitt werden Sie dieselben Daten mit allen drei Strategien erfassen, um sie zu vergleichen.

Aufnahmeskript erstellen

Sie verwenden ein Skript, das ein Harry Potter-Dataset herunterlädt, es mit den Strategien Character, Recursive und Token aufteilt und die Einbettungen in drei separate Tabellen in Cloud SQL hochlädt.

  1. Erstellen Sie die Datei ingest_data.py:
    cloudshell edit ingest_data.py
    
  2. Fügen Sie den folgenden korrigierten Code in ingest_data.py ein. In dieser Version wird die JSON-Struktur des Datasets korrekt geparst.
    import os
    import json
    import logging
    import requests
    from typing import List, Dict, Any
    from dotenv import load_dotenv
    
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter
    from langchain.docstore.document import Document
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Configuration
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json"
    
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    MAX_DOCS_TO_PROCESS = 10 
    
    # Database Connector
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def download_data():
        logging.info(f"Downloading data from {BOOKS_JSON_URL}...")
        response = requests.get(BOOKS_JSON_URL)
        return response.json()
    
    def prepare_chunks(json_data, strategy):
        documents = []
    
        # Iterate through the downloaded data
        for entry in json_data[:MAX_DOCS_TO_PROCESS]:
    
            # --- JSON PARSING LOGIC ---
            # The data structure nests content inside 'kwargs' -> 'page_content'
            if "kwargs" in entry and "page_content" in entry["kwargs"]:
                content = entry["kwargs"]["page_content"]
    
                # Extract metadata if available, ensuring it's a dict
                metadata = entry["kwargs"].get("metadata", {})
                if not isinstance(metadata, dict):
                    metadata = {"source": "unknown"}
    
                # Add the strategy to metadata for tracking
                metadata["strategy"] = strategy
            else:
                continue
    
            if not content:
                continue
    
            # Choose the splitter based on the strategy
            if strategy == "character":
                splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n")
            elif strategy == "token":
                splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
            else: # default to recursive
                splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    
            # Split the content into chunks
            chunks = splitter.split_text(content)
    
            # Create Document objects for each chunk
            for chunk in chunks:
                documents.append(Document(page_content=chunk, metadata=metadata))
    
        return documents
    
    def main():
        logging.info("Initializing Embeddings...")
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
    
        data = download_data()
        strategies = ["character", "recursive", "token"]
    
        # Connection string for PGVector (uses the getconn helper)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        for strategy in strategies:
            collection_name = f"{BASE_COLLECTION_NAME}_{strategy}"
            logging.info(f"--- Processing strategy: {strategy.upper()} ---")
            logging.info(f"Target Collection: {collection_name}")
    
            # Prepare documents with the specific strategy
            docs = prepare_chunks(data, strategy)
    
            if not docs:
                logging.warning(f"No documents generated for strategy {strategy}. Check data source.")
                continue
    
            logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...")
    
            # Initialize the Vector Store
            store = PGVector(
                collection_name=collection_name,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn},
                pre_delete_collection=True # Clears old data for this collection before adding new
            )
    
            # Batch add documents
            store.add_documents(docs)
            logging.info(f"Successfully finished {strategy}.\n")
    
    if __name__ == "__main__":
        main()
    
  3. Führen Sie das Aufnahmeskript aus. Dadurch werden in Ihrer Datenbank drei verschiedene Tabellen (Sammlungen) erstellt.
    python ingest_data.py
    

Chunking-Ergebnisse vergleichen

Nachdem die Daten geladen wurden, führen wir eine Abfrage für alle drei Sammlungen aus, um zu sehen, wie sich die Chunking-Strategie auf die Ergebnisse auswirkt.

  1. query_chunking.py erstellen:
    cloudshell edit query_chunking.py
    
  2. Fügen Sie den folgenden Code in query_chunking.py ein:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean
    
    # Config
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        query = "Tell me about the Dursleys and their relationship with Harry Potter"
        print(f"\nQUERY: {query}\n" + "="*50)
    
        strategies = ["character", "recursive", "token"]
    
        for strategy in strategies:
            collection = f"{BASE_COLLECTION_NAME}_{strategy}"
            print(f"\nSTRATEGY: {strategy.upper()}")
    
            store = PGVector(
                collection_name=collection,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn}
            )
    
            results = store.similarity_search_with_score(query, k=2)
            for i, (doc, score) in enumerate(results):
                print(f"  Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...")
    
    if __name__ == "__main__":
        main()
    
  3. Führen Sie das Abfrageskript aus:
    python query_chunking.py
    

Beobachten Sie die Ausgabe.

Beachten Sie, dass beim Aufteilen nach Zeichen Sätze mitten im Satz abgeschnitten werden können, während beim rekursiven Aufteilen versucht wird, Absatzgrenzen zu berücksichtigen. Durch die Tokenaufteilung passen die Chunks perfekt in die Kontextfenster von LLMs, die semantische Struktur wird jedoch möglicherweise ignoriert.

8. Teil 2: Neuberechnung des Ranks

Die Vektorsuche (Abruf) ist unglaublich schnell, da sie auf komprimierten mathematischen Darstellungen (Einbettungen) basiert. Es wird ein breites Netz ausgeworfen, um Recall (alle potenziell relevanten Elemente finden) zu gewährleisten, aber es leidet oft unter geringer Precision (die Rangfolge dieser Elemente ist möglicherweise nicht perfekt).

Relevante Dokumente gehen oft in der Mitte der Ergebnisliste unter. Ein LLM, das nur die ersten fünf Ergebnisse berücksichtigt, übersieht möglicherweise die entscheidende Antwort auf Position 7.

Reranking löst dieses Problem, indem eine zweite Phase hinzugefügt wird.

  1. Retriever:Ruft mithilfe der schnellen Vektorsuche eine größere Menge ab (z.B. die 25 besten Ergebnisse).
  2. Reranker:Hier wird ein spezielles Modell (z. B. ein Cross-Encoder) verwendet, um den vollständigen Text der Anfrage und der Dokumentpaare zu untersuchen. Sie ist langsamer, aber viel genauer. Die 25 besten werden neu bewertet und die 3 absolut besten zurückgegeben.

In dieser Aufgabe durchsuchen Sie die in Teil 1 erstellte Sammlung recursive. Diesmal wenden Sie jedoch den Vertex AI Reranker an, um die Ergebnisse zu optimieren.

  1. query_reranking.py erstellen:
    cloudshell edit query_reranking.py
    
  2. Fügen Sie den folgenden Code ein. Beachten Sie, dass die Sammlung von _recursive explizit angesprochen wird und ContextualCompressionRetriever verwendet wird.
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    # Reranking Imports
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain_google_community.vertex_rank import VertexAIRank
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    
    # IMPORTANT: Target the recursive collection created in ingest_data.py
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        print(f"Connecting to collection: {COLLECTION_NAME}")
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
    
        query = "What are the Horcruxes?"
        print(f"QUERY: {query}\n")
    
        # 1. Base Retriever (Vector Search) - Fetch top 10
        base_retriever = store.as_retriever(search_kwargs={"k": 10})
    
        # 2. Reranker - Select top 3 from the 10
        reranker = VertexAIRank(
            project_id=PROJECT_ID,
            location_id=RANKING_LOCATION,
            ranking_config="default_ranking_config",
            title_field="source",
            top_n=3
        )
    
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=reranker,
            base_retriever=base_retriever
        )
    
        # Execute
        try:
            reranked_docs = compression_retriever.invoke(query)
    
            if not reranked_docs:
                print("No documents returned. Check if the collection exists and is populated.")
    
            print(f"--- Top 3 Reranked Results ---")
            for i, doc in enumerate(reranked_docs):
                print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):")
                print(f"  {doc.page_content[:200]}...\n")
        except Exception as e:
            print(f"Error during reranking: {e}")
    
    if __name__ == "__main__":
        main()
    
  3. Führen Sie die Reranking-Abfrage aus:
    python query_reranking.py
    

Beobachten

Möglicherweise stellen Sie im Vergleich zu einer reinen Vektorsuche höhere Relevanzwerte oder eine andere Reihenfolge fest. So erhält das LLM den präzisesten Kontext.

9. Teil 3: Transformation von Anfragen

Oft ist der Nutzer der größte Engpass bei RAG. Nutzeranfragen sind oft mehrdeutig, unvollständig oder schlecht formuliert. Wenn die Anfrage-Einbettung nicht mathematisch mit der Dokument-Einbettung übereinstimmt, schlägt der Abruf fehl.

Bei der Abfrageumwandlung wird ein LLM verwendet, um die Abfrage umzuschreiben oder zu erweitern, bevor sie in der Datenbank ausgeführt wird. Sie implementieren zwei Techniken:

  • HyDE (Hypothetical Document Embeddings): Die Vektorähnlichkeit zwischen einer Frage und einer Antwort ist oft geringer als die Ähnlichkeit zwischen einer Antwort und einer hypothetischen Antwort. Bei HyDE wird das LLM aufgefordert, eine perfekte Antwort zu halluzinieren, diese einzubetten und nach Dokumenten zu suchen, die der Halluzination ähneln.
  • Step-back Prompting:Wenn ein Nutzer eine spezifische, detaillierte Frage stellt, geht dem System möglicherweise der breitere Kontext verloren. Beim Step-Back-Prompting wird das LLM aufgefordert, eine übergeordnete, abstrakte Frage zu generieren („Was ist die Geschichte dieser Familie?“), um neben den spezifischen Details auch grundlegende Informationen abzurufen.
  1. query_transformation.py erstellen:
    cloudshell edit query_transformation.py
    
  2. Fügen Sie den folgenden Code ein:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
    from langchain_community.vectorstores import PGVector
    from langchain_core.prompts import PromptTemplate
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def generate_hyde_doc(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a concise, hypothetical answer to the question. Question: {question} Answer:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def generate_step_back(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5)
    
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
        retriever = store.as_retriever(search_kwargs={"k": 2})
    
        original_query = "Tell me about the Dursleys."
        print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30)
    
        # 1. HyDE
        hyde_doc = generate_hyde_doc(original_query, llm)
        print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...")
        hyde_results = retriever.invoke(hyde_doc)
        print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n")
    
        # 2. Step-back
        step_back_q = generate_step_back(original_query, llm)
        print(f"Step-back Query: {step_back_q.strip()}")
        step_results = retriever.invoke(step_back_q)
        print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...")
    
    if __name__ == "__main__":
        main()
    
  3. Führen Sie das Transformationsskript aus:
    python query_transformation.py
    

Beobachten Sie die Ausgabe.

Die Step-back-Anfrage ruft möglicherweise einen breiteren Kontext zur Familiengeschichte der Dursleys ab, während sich HyDE auf die spezifischen Details konzentriert, die in der hypothetischen Antwort generiert wurden.

10. Teil 4: End-to-End-Generierung

Wir haben unsere Daten aufgeteilt, unsere Suche verfeinert und die Anfrage des Nutzers optimiert. Jetzt kommt endlich das „G“ in RAG: Generierung.

Bis zu diesem Punkt haben wir nur Informationen gefunden. Um einen echten KI-Assistenten zu entwickeln, müssen wir diese hochwertigen, neu gerankten Dokumente in ein LLM (Gemini) eingeben, um eine Antwort in natürlicher Sprache zu synthetisieren.

In einer Produktionspipeline ist dafür ein bestimmter Ablauf erforderlich:

  1. Abrufen:Eine breite Auswahl an Kandidaten erhalten (z.B. Top 10) mithilfe der schnellen Vektorsuche.
  2. Neu ranken:Filtern Sie nach den absolut besten Ergebnissen (z.B. Top 3) mit dem Vertex AI-Reranker.
  3. Kontext erstellen:Fügen Sie den Inhalt der drei wichtigsten Dokumente zu einem einzelnen String zusammen.
  4. Grounded Prompting:Fügen Sie diesen Kontextstring in eine strenge Promptvorlage ein, die das LLM zwingt, nur diese Informationen zu verwenden.

Generierungsskript erstellen

Wir verwenden gemini-2.5-flash für den Generierungsschritt. Dieses Modell ist ideal für RAG, da es ein langes Kontextfenster und eine geringe Latenz hat, sodass mehrere abgerufene Dokumente schnell verarbeitet werden können.

  1. end_to_end_rag.py erstellen:
cloudshell edit end_to_end_rag.py
  1. Fügen Sie den folgenden Code ein. Achten Sie auf die Variable template. Hier weisen wir das Modell an, „Halluzinationen“ (Erfinden von Dingen) zu vermeiden, indem wir es an den bereitgestellten Kontext binden.
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

load_dotenv()
logging.basicConfig(level=logging.ERROR)

PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"

def getconn():
    instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    with Connector() as connector:
        return connector.connect(
            instance_conn, "pg8000",
            user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
            db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
        )

def main():
    print("--- Initializing Production RAG Pipeline ---")

    # 1. Setup Embeddings (Gemini Embedding 001)
    # We use this to vectorize the user's query to match our database.
    embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)

    # 2. Connect to Vector Store
    pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
    store = PGVector(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        connection_string=pg_conn_str,
        engine_args={"creator": getconn}
    )

    # 3. Setup The 'Filter Funnel' (Retriever + Reranker)
    # Step A: Fast retrieval of top 10 similar documents
    base_retriever = store.as_retriever(search_kwargs={"k": 10})

    # Step B: Precise reranking to find the top 3 most relevant
    reranker = VertexAIRank(
        project_id=PROJECT_ID,
        location_id="global", 
        ranking_config="default_ranking_config",
        title_field="source",
        top_n=3
    )

    # Combine A and B into a single retrieval object
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=reranker,
        base_retriever=base_retriever
    )

    # 4. Setup LLM (Gemini 2.5 Flash)
    # We use a low temperature (0.1) to reduce creativity and increase factual adherence.
    llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)

    # --- Execution Loop ---
    user_query = "Who is Harry Potter?"
    print(f"\nUser Query: {user_query}")
    print("Retrieving and Reranking documents...")

    # Retrieve the most relevant documents
    top_docs = compression_retriever.invoke(user_query)

    if not top_docs:
        print("No relevant documents found.")
        return

    # Build the Context String
    # We stitch the documents together, labeling them as Source 1, Source 2, etc.
    context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])

    print(f"Found {len(top_docs)} relevant context chunks.")

    # 5. The Grounded Prompt
    template = """You are a helpful assistant. Answer the question strictly based on the provided context.
    If the answer is not in the context, say "I don't know."

    Context:
    {context}

    Question:
    {question}

    Answer:
    """

    prompt = PromptTemplate(template=template, input_variables=["context", "question"])

    # Create the chain: Prompt -> LLM
    chain = prompt | llm

    print("Generating Answer via Gemini 2.5 Flash...")
    final_answer = chain.invoke({"context": context_str, "question": user_query})

    print(f"\nFINAL ANSWER:\n{final_answer}")

if __name__ == "__main__":
    main()
  1. Führen Sie die endgültige Anwendung aus:
python end_to_end_rag.py

Ausgabe verstehen

Wenn Sie dieses Skript ausführen, sehen Sie den Unterschied zwischen den abgerufenen Roh-Chunks (die Sie in den vorherigen Schritten gesehen haben) und der endgültigen Antwort. Das LLM fungiert als Synthesizer. Es liest die fragmentierten Textblöcke, die vom Reranker bereitgestellt werden, und fügt sie zu einem kohärenten, menschenlesbaren Satz zusammen.

Durch die Verkettung dieser Komponenten wird aus einer stochastischen „Vermutung“ ein deterministischer, fundierter Workflow. Der Retriever wirft das Netz aus, der Reranker wählt den besten Fang aus und der Generator bereitet das Gericht zu.

11. Fazit

Glückwunsch! Sie haben erfolgreich eine erweiterte RAG-Pipeline erstellt, die weit über die einfache Vektorsuche hinausgeht.

Zusammenfassung

  • Sie haben Cloud SQL mit pgvector für die skalierbare Vektorspeicherung konfiguriert.
  • Sie haben Chunking-Strategien verglichen, um zu verstehen, wie sich die Datenvorbereitung auf den Abruf auswirkt.
  • Sie haben Reranking mit Vertex AI implementiert, um die Genauigkeit Ihrer Ergebnisse zu verbessern.
  • Sie haben Abfragetransformationen (HyDE, Step-back) verwendet, um die Nutzerintention mit Ihren Daten in Einklang zu bringen.

Weitere Informationen

Vom Prototyp zur Produktion

Dieses Lab ist Teil des Lernpfads für produktionsreife KI mit Google Cloud.