Técnicas avanzadas de RAG

1. Introducción

Descripción general

La generación aumentada por recuperación (RAG) mejora las respuestas de los modelos de lenguaje grandes (LLM) fundamentándolas en conocimiento externo. Sin embargo, crear un sistema de RAG listo para la producción requiere más que una simple búsqueda de vectores. Debes optimizar la forma en que se ingresan los datos, cómo se clasifican los resultados relevantes y cómo se procesan las búsquedas de los usuarios.

En este lab integral, compilarás una aplicación de RAG sólida con Cloud SQL para PostgreSQL (extendido con pgvector) y Vertex AI. Avanzarás a través de tres técnicas avanzadas:

  1. Estrategias de fragmentación: Observarás cómo los diferentes métodos de división del texto (carácter, recursivo, token) afectan la calidad de la recuperación.
  2. Reclasificación: Implementarás Vertex AI Reranker para refinar los resultados de la búsqueda y abordar el problema de "pérdida en el medio".
  3. Transformación de la búsqueda: Usarás Gemini para optimizar las búsquedas de los usuarios con técnicas como HyDE (incorporación de documentos hipotéticos) y Step-back Prompting (instrucciones de retroceso).

Actividades

  • Configura una instancia de Cloud SQL para PostgreSQL con pgvector.
  • Compila una canalización de transferencia de datos que divida el texto en fragmentos con varias estrategias y almacene las incorporaciones en Cloud SQL.
  • Realizar búsquedas semánticas y comparar la calidad de los resultados de diferentes métodos de fragmentación
  • Integra un reranqueador para reordenar los documentos recuperados según su relevancia.
  • Implementa transformaciones de consultas potenciadas por LLM para mejorar la recuperación de preguntas ambiguas o complejas.

Qué aprenderás

  • Cómo usar LangChain con Vertex AI y Cloud SQL
  • El impacto de los divisores de texto Character, Recursive y Token.
  • Cómo implementar la búsqueda de vectores en PostgreSQL
  • Cómo usar ContextualCompressionRetriever para volver a clasificar
  • Cómo implementar HyDE y Step-back Prompting

2. Configura el proyecto

Cuenta de Google

Si aún no tienes una Cuenta de Google personal, debes crear una.

Usa una cuenta personal en lugar de una cuenta de trabajo o de institución educativa.

Accede a la consola de Google Cloud

Accede a la consola de Google Cloud con una Cuenta de Google personal.

Habilitar facturación

Canjea USD 5 en créditos de Google Cloud (opcional)

Para realizar este taller, necesitas una cuenta de facturación con algo de crédito. Si planeas usar tu propia facturación, puedes omitir este paso.

  1. Haz clic en este vínculo y accede con una Cuenta de Google personal. Verás algo como esto: Haz clic aquí para ir a la página de créditos
  2. Haz clic en el botón HAZ CLIC AQUÍ PARA ACCEDER A TU CRÉDITO. Esto te dirigirá a una página para configurar tu perfil de facturación Página de configuración del perfil de facturación
  3. Haz clic en Confirmar. Ahora estás conectado a una cuenta de facturación de prueba de Google Cloud Platform. Captura de pantalla de la descripción general de la facturación

Configura una cuenta de facturación personal

Si configuraste la facturación con créditos de Google Cloud, puedes omitir este paso.

Para configurar una cuenta de facturación personal, ve aquí para habilitar la facturación en la consola de Cloud.

Notas:

  • Completar este lab debería costar menos de USD 1 en recursos de Cloud.
  • Puedes seguir los pasos al final de este lab para borrar recursos y evitar cargos adicionales.
  • Los usuarios nuevos pueden acceder a la prueba gratuita de USD 300.

Crear un proyecto (opcional)

Si no tienes un proyecto actual que quieras usar para este lab, crea uno nuevo aquí.

3. Abre el editor de Cloud Shell

  1. Haz clic en este vínculo para navegar directamente al editor de Cloud Shell.
  2. Si se te solicita autorización en algún momento, haz clic en Autorizar para continuar.Haz clic para autorizar Cloud Shell
  3. Si la terminal no aparece en la parte inferior de la pantalla, ábrela:
    • Haz clic en Ver.
    • Haz clic en Terminal.Abre una terminal nueva en el editor de Cloud Shell
  4. En la terminal, configura tu proyecto con este comando:
    gcloud config set project [PROJECT_ID]
    
    • Ejemplo:
      gcloud config set project lab-project-id-example
      
    • Si no recuerdas el ID de tu proyecto, puedes enumerar todos tus IDs de proyecto con el siguiente comando:
      gcloud projects list
      
      Establece el ID del proyecto en la terminal del editor de Cloud Shell
  5. Deberías ver el siguiente mensaje:
    Updated property [core/project].
    

4. Habilita las APIs

Para compilar esta solución, debes habilitar varias APIs de Google Cloud para Vertex AI, Cloud SQL y el servicio de reranking.

  1. En la terminal, habilita las APIs:
    gcloud services enable \
      aiplatform.googleapis.com \
      sqladmin.googleapis.com \
      cloudresourcemanager.googleapis.com \
      serviceusage.googleapis.com \
      discoveryengine.googleapis.com
    
    
    

Presentamos las APIs

  • API de Vertex AI (aiplatform.googleapis.com): Permite usar Gemini para la generación y Vertex AI Embeddings para vectorizar texto.
  • API de Cloud SQL Admin (sqladmin.googleapis.com): Te permite administrar instancias de Cloud SQL de forma programática.
  • API de Discovery Engine (discoveryengine.googleapis.com): Potencia las capacidades de Vertex AI Reranker.
  • API de Service Usage (serviceusage.googleapis.com): Se requiere para verificar y administrar las cuotas de servicio.

5. Crea un entorno virtual y, luego, instala las dependencias

Antes de comenzar cualquier proyecto de Python, es una buena práctica crear un entorno virtual. Esto aísla las dependencias del proyecto y evita conflictos con otros proyectos o con los paquetes globales de Python del sistema.

  1. Crea una carpeta llamada rag-labs y cámbiate a ella. Ejecuta el siguiente código en la terminal:
    mkdir rag-labs && cd rag-labs
    
  2. Crea y activa un entorno virtual:
    uv venv --python 3.12
    source .venv/bin/activate
    
  3. Crea un archivo requirements.txt con las dependencias necesarias. Ejecuta el siguiente código en la terminal:
    cloudshell edit requirements.txt
    
  4. Pega las siguientes dependencias optimizadas en requirements.txt. Estas versiones se fijan para evitar conflictos y acelerar la instalación.
    # Core LangChain & AI
    langchain-community==0.3.31
    langchain-google-vertexai==2.1.2
    langchain-google-community[vertexaisearch]==2.0.10
    
    # Google Cloud
    google-cloud-storage==2.19.0
    google-cloud-aiplatform[langchain]==1.130.0
    
    # Database
    cloud-sql-python-connector[pg8000]==1.19.0
    sqlalchemy==2.0.45
    pgvector==0.4.2
    
    # Utilities
    tiktoken==0.12.0
    python-dotenv==1.2.1
    requests==2.32.5
    
  5. Instala las dependencias:
    uv pip install -r requirements.txt
    

6. Configura Cloud SQL para PostgreSQL

En esta tarea, aprovisionarás una instancia de Cloud SQL para PostgreSQL, crearás una base de datos y la prepararás para la búsqueda de vectores.

Define la configuración de Cloud SQL

  1. Crea un archivo .env para almacenar tu configuración. Ejecuta el siguiente código en la terminal:
    cloudshell edit .env
    
  2. Pega la siguiente configuración en .env.
    # Project Config
    PROJECT_ID="[YOUR_PROJECT_ID]"
    REGION="us-central1"
    
    # Database Config
    SQL_INSTANCE_NAME="rag-pg-instance-1"
    SQL_DATABASE_NAME="rag_harry_potter_db"
    SQL_USER="rag_user"
    SQL_PASSWORD="StrongPassword123!" 
    
    # RAG Config
    PGVECTOR_COLLECTION_NAME="rag_harry_potter"
    RANKING_LOCATION_ID="global"
    
    # Connection Name (Auto-generated in scripts usually, but useful to have)
    DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}"
    
  3. Reemplaza [YOUR_PROJECT_ID] por el ID de tu proyecto de Google Cloud. (p.ej., PROJECT_ID = "google-cloud-labs")
    Si no recuerdas el ID de tu proyecto, ejecuta el siguiente comando en tu terminal. Se mostrará una lista de todos tus proyectos y sus IDs.
    gcloud projects list
    
  4. Carga las variables en tu sesión de shell:
    source .env
    

Crea la instancia y la base de datos

  1. Crea una instancia de Cloud SQL para PostgreSQL. Este comando crea una instancia pequeña adecuada para este lab.
    gcloud sql instances create ${SQL_INSTANCE_NAME} \
      --database-version=POSTGRES_15 \
      --tier=db-g1-small \
      --region=${REGION} \
      --project=${PROJECT_ID}
    
  2. Una vez que la instancia esté lista, crea la base de datos:
    gcloud sql databases create ${SQL_DATABASE_NAME} \
      --instance=${SQL_INSTANCE_NAME} \
      --project=${PROJECT_ID}
    
  3. Crea el usuario de la base de datos:
    gcloud sql users create ${SQL_USER} \
      --instance=${SQL_INSTANCE_NAME} \
      --password=${SQL_PASSWORD} \
      --project=${PROJECT_ID}
    

Habilita la extensión pgvector

La extensión pgvector permite que PostgreSQL almacene y busque embeddings de vectores. Debes habilitarlo de forma explícita en tu base de datos.

  1. Crea un script llamado enable_pgvector.py. Ejecuta el siguiente código en la terminal:
    cloudshell edit enable_pgvector.py
    
  2. Pega el siguiente código en enable_pgvector.py. Esta secuencia de comandos se conecta a tu base de datos y ejecuta CREATE EXTENSION IF NOT EXISTS vector;.
    import os
    import sqlalchemy
    from google.cloud.sql.connector import Connector, IPTypes
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Config
    project_id = os.getenv("PROJECT_ID")
    region = os.getenv("REGION")
    instance_name = os.getenv("SQL_INSTANCE_NAME")
    db_user = os.getenv("SQL_USER")
    db_pass = os.getenv("SQL_PASSWORD")
    db_name = os.getenv("SQL_DATABASE_NAME")
    instance_connection_name = f"{project_id}:{region}:{instance_name}"
    
    def getconn():
        with Connector() as connector:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=IPTypes.PUBLIC,
            )
            return conn
    
    def enable_pgvector():
        pool = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        with pool.connect() as db_conn:
            # Check if extension exists
            result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone()
            if result:
                logging.info("pgvector extension is already enabled.")
            else:
                logging.info("Enabling pgvector extension...")
                db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
                db_conn.commit()
                logging.info("pgvector extension enabled successfully.")
    
    if __name__ == "__main__":
        enable_pgvector()
    
  3. Ejecuta la secuencia de comandos:
    python enable_pgvector.py
    

7. Parte 1: Estrategias de fragmentación

El primer paso en cualquier canalización de RAG es transformar los documentos en un formato que el LLM pueda entender: fragmentos.

Los LLMs tienen un límite de ventana de contexto (la cantidad de texto que pueden procesar a la vez). Además, recuperar un documento de 50 páginas para responder una pregunta específica diluye la información. Dividimos los documentos en "fragmentos" más pequeños para aislar la información pertinente.

Sin embargo, cómo divides el texto es muy importante:

  • Divisor de caracteres: Divide estrictamente según el recuento de caracteres. Es rápido, pero riesgoso, ya que puede cortar palabras o frases por la mitad y destruir el significado semántico.
  • Recursive Splitter: Intenta dividir primero por párrafo, luego por oración y, por último, por palabra. Intenta mantener juntas las unidades semánticas.
  • Token Splitter: Realiza la división según el vocabulario propio del LLM (tokens). Esto garantiza que los fragmentos se ajusten perfectamente a las ventanas de contexto, pero puede ser más costoso desde el punto de vista computacional generarlos.

En esta sección, transferirás los mismos datos con las tres estrategias para compararlas.

Crea la secuencia de comandos de transferencia

Usarás un script que descarga un conjunto de datos de Harry Potter, lo divide con las estrategias Character, Recursive y Token, y sube las incorporaciones a tres tablas separadas en Cloud SQL.

  1. Crea el archivo ingest_data.py:
    cloudshell edit ingest_data.py
    
  2. Pega el siguiente código corregido en ingest_data.py. Esta versión analiza correctamente la estructura JSON del conjunto de datos.
    import os
    import json
    import logging
    import requests
    from typing import List, Dict, Any
    from dotenv import load_dotenv
    
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter
    from langchain.docstore.document import Document
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Configuration
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json"
    
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    MAX_DOCS_TO_PROCESS = 10 
    
    # Database Connector
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def download_data():
        logging.info(f"Downloading data from {BOOKS_JSON_URL}...")
        response = requests.get(BOOKS_JSON_URL)
        return response.json()
    
    def prepare_chunks(json_data, strategy):
        documents = []
    
        # Iterate through the downloaded data
        for entry in json_data[:MAX_DOCS_TO_PROCESS]:
    
            # --- JSON PARSING LOGIC ---
            # The data structure nests content inside 'kwargs' -> 'page_content'
            if "kwargs" in entry and "page_content" in entry["kwargs"]:
                content = entry["kwargs"]["page_content"]
    
                # Extract metadata if available, ensuring it's a dict
                metadata = entry["kwargs"].get("metadata", {})
                if not isinstance(metadata, dict):
                    metadata = {"source": "unknown"}
    
                # Add the strategy to metadata for tracking
                metadata["strategy"] = strategy
            else:
                continue
    
            if not content:
                continue
    
            # Choose the splitter based on the strategy
            if strategy == "character":
                splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n")
            elif strategy == "token":
                splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
            else: # default to recursive
                splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    
            # Split the content into chunks
            chunks = splitter.split_text(content)
    
            # Create Document objects for each chunk
            for chunk in chunks:
                documents.append(Document(page_content=chunk, metadata=metadata))
    
        return documents
    
    def main():
        logging.info("Initializing Embeddings...")
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
    
        data = download_data()
        strategies = ["character", "recursive", "token"]
    
        # Connection string for PGVector (uses the getconn helper)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        for strategy in strategies:
            collection_name = f"{BASE_COLLECTION_NAME}_{strategy}"
            logging.info(f"--- Processing strategy: {strategy.upper()} ---")
            logging.info(f"Target Collection: {collection_name}")
    
            # Prepare documents with the specific strategy
            docs = prepare_chunks(data, strategy)
    
            if not docs:
                logging.warning(f"No documents generated for strategy {strategy}. Check data source.")
                continue
    
            logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...")
    
            # Initialize the Vector Store
            store = PGVector(
                collection_name=collection_name,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn},
                pre_delete_collection=True # Clears old data for this collection before adding new
            )
    
            # Batch add documents
            store.add_documents(docs)
            logging.info(f"Successfully finished {strategy}.\n")
    
    if __name__ == "__main__":
        main()
    
  3. Ejecuta la secuencia de comandos de transferencia. Esto completará tu base de datos con tres tablas (colecciones) diferentes.
    python ingest_data.py
    

Compara los resultados de la división en fragmentos

Ahora que se cargaron los datos, ejecutemos una consulta en las tres colecciones para ver cómo la estrategia de fragmentación afecta los resultados.

  1. Crea query_chunking.py:
    cloudshell edit query_chunking.py
    
  2. Pega el siguiente código en query_chunking.py:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean
    
    # Config
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        query = "Tell me about the Dursleys and their relationship with Harry Potter"
        print(f"\nQUERY: {query}\n" + "="*50)
    
        strategies = ["character", "recursive", "token"]
    
        for strategy in strategies:
            collection = f"{BASE_COLLECTION_NAME}_{strategy}"
            print(f"\nSTRATEGY: {strategy.upper()}")
    
            store = PGVector(
                collection_name=collection,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn}
            )
    
            results = store.similarity_search_with_score(query, k=2)
            for i, (doc, score) in enumerate(results):
                print(f"  Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...")
    
    if __name__ == "__main__":
        main()
    
  3. Ejecuta la secuencia de comandos de consulta:
    python query_chunking.py
    

Observa el resultado.

Observa cómo la división Character podría cortar oraciones a mitad de una idea, mientras que Recursive intenta respetar los límites de los párrafos. La división de tokens garantiza que los fragmentos se ajusten perfectamente a las ventanas de contexto de los LLM, pero puede ignorar la estructura semántica.

8. Parte 2: Clasificación de nuevo

La búsqueda de vectores (recuperación) es increíblemente rápida porque se basa en representaciones matemáticas comprimidas (embeddings). Abarca una amplia variedad de resultados para garantizar la Recuperación (encontrar todos los elementos potencialmente relevantes), pero a menudo tiene una Precisión baja (la clasificación de esos elementos puede ser imperfecta).

A menudo, los documentos relevantes se “pierden en el medio” de la lista de resultados. Un LLM que presta atención a los 5 primeros resultados podría perderse la respuesta crucial que se encuentra en la posición núm. 7.

El nuevo ranking resuelve este problema agregando una segunda etapa.

  1. Recuperador: Recupera un conjunto más grande (p.ej., los 25 primeros) con una búsqueda vectorial rápida.
  2. Clasificador secundario: Usa un modelo especializado (como un codificador cruzado) para examinar el texto completo de los pares de documentos y búsquedas. Es más lento, pero mucho más preciso. Vuelve a puntuar los 25 primeros y devuelve los 3 mejores absolutos.

En esta tarea, buscarás en la colección recursive que creaste en la Parte 1, pero esta vez aplicarás el Vertex AI Reranker para refinar los resultados.

  1. Crea query_reranking.py:
    cloudshell edit query_reranking.py
    
  2. Pega el siguiente código. Observa cómo se segmenta explícitamente para la colección _recursive y usa ContextualCompressionRetriever.
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    # Reranking Imports
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain_google_community.vertex_rank import VertexAIRank
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    
    # IMPORTANT: Target the recursive collection created in ingest_data.py
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        print(f"Connecting to collection: {COLLECTION_NAME}")
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
    
        query = "What are the Horcruxes?"
        print(f"QUERY: {query}\n")
    
        # 1. Base Retriever (Vector Search) - Fetch top 10
        base_retriever = store.as_retriever(search_kwargs={"k": 10})
    
        # 2. Reranker - Select top 3 from the 10
        reranker = VertexAIRank(
            project_id=PROJECT_ID,
            location_id=RANKING_LOCATION,
            ranking_config="default_ranking_config",
            title_field="source",
            top_n=3
        )
    
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=reranker,
            base_retriever=base_retriever
        )
    
        # Execute
        try:
            reranked_docs = compression_retriever.invoke(query)
    
            if not reranked_docs:
                print("No documents returned. Check if the collection exists and is populated.")
    
            print(f"--- Top 3 Reranked Results ---")
            for i, doc in enumerate(reranked_docs):
                print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):")
                print(f"  {doc.page_content[:200]}...\n")
        except Exception as e:
            print(f"Error during reranking: {e}")
    
    if __name__ == "__main__":
        main()
    
  3. Ejecuta la consulta de nuevo ranking:
    python query_reranking.py
    

Observar

Es posible que observes puntuaciones de relevancia más altas o un orden diferente en comparación con una búsqueda de vectores sin procesar. Esto garantiza que el LLM reciba el contexto más preciso posible.

9. Parte 3: Transformación de la consulta

A menudo, el mayor cuello de botella en la RAG es el usuario. Las búsquedas de los usuarios suelen ser ambiguas, incompletas o estar mal redactadas. Si la incorporación de la búsqueda no se alinea matemáticamente con la incorporación del documento, la recuperación falla.

La Transformación de la búsqueda usa un LLM para reescribir o expandir la búsqueda antes de que llegue a la base de datos. Implementarás dos técnicas:

  • HyDE (incorporación de documentos hipotéticos): La similitud vectorial entre una pregunta y una respuesta suele ser menor que la similitud entre una respuesta y una respuesta hipotética. HyDE le pide al LLM que alucine una respuesta perfecta, la incorpora y busca documentos que se parezcan a la alucinación.
  • Indicaciones de retroceso: Si un usuario hace una pregunta específica y detallada, es posible que el sistema no capte el contexto más amplio. Las instrucciones de retroceso le piden al LLM que genere una pregunta abstracta de nivel superior ("¿Cuál es la historia de esta familia?") para recuperar información fundamental junto con los detalles específicos.
  1. Crea query_transformation.py:
    cloudshell edit query_transformation.py
    
  2. Pega el siguiente código:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
    from langchain_community.vectorstores import PGVector
    from langchain_core.prompts import PromptTemplate
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def generate_hyde_doc(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a concise, hypothetical answer to the question. Question: {question} Answer:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def generate_step_back(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5)
    
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
        retriever = store.as_retriever(search_kwargs={"k": 2})
    
        original_query = "Tell me about the Dursleys."
        print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30)
    
        # 1. HyDE
        hyde_doc = generate_hyde_doc(original_query, llm)
        print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...")
        hyde_results = retriever.invoke(hyde_doc)
        print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n")
    
        # 2. Step-back
        step_back_q = generate_step_back(original_query, llm)
        print(f"Step-back Query: {step_back_q.strip()}")
        step_results = retriever.invoke(step_back_q)
        print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...")
    
    if __name__ == "__main__":
        main()
    
  3. Ejecuta la secuencia de comandos de transformación:
    python query_transformation.py
    

Observa el resultado.

Observa cómo la búsqueda de Step-back puede recuperar un contexto más amplio sobre la historia familiar de los Dursley, mientras que HyDE se enfoca en los detalles específicos que se generan en la respuesta hipotética.

10. Parte 4: Generación de extremo a extremo

Dividimos nuestros datos, refinamos nuestra búsqueda y pulimos la búsqueda del usuario. Ahora, finalmente, agregamos la "G" a RAG: Generación.

Hasta ahora, solo hemos estado encontrando información. Para crear un verdadero asistente de IA, debemos ingresar esos documentos de alta calidad y con una nueva clasificación en un LLM (Gemini) para sintetizar una respuesta en lenguaje natural.

En una canalización de producción, esto implica un flujo específico:

  1. Recuperación: Obtén un amplio conjunto de candidatos (p.ej., Top 10) con la búsqueda de vectores rápida.
  2. Volver a clasificar: Filtra hasta obtener los mejores resultados absolutos (p.ej., Los 3 primeros) con Vertex AI Reranker.
  3. Construcción del contexto: Une el contenido de los 3 documentos principales en una sola cadena.
  4. Instrucciones fundamentadas: Inserta esa cadena de contexto en una plantilla de instrucciones estricta que obligue al LLM a usar solo esa información.

Crea la secuencia de comandos de generación

Usaremos gemini-2.5-flash para el paso de generación. Este modelo es ideal para RAG porque tiene una ventana de contexto larga y baja latencia, lo que le permite procesar varios documentos recuperados rápidamente.

  1. Crea end_to_end_rag.py:
cloudshell edit end_to_end_rag.py
  1. Pega el siguiente código. Presta atención a la variable template, ya que es donde le indicamos estrictamente al modelo que evite las "alucinaciones" (inventar cosas) vinculándolo al contexto proporcionado.
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

load_dotenv()
logging.basicConfig(level=logging.ERROR)

PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"

def getconn():
    instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    with Connector() as connector:
        return connector.connect(
            instance_conn, "pg8000",
            user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
            db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
        )

def main():
    print("--- Initializing Production RAG Pipeline ---")

    # 1. Setup Embeddings (Gemini Embedding 001)
    # We use this to vectorize the user's query to match our database.
    embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)

    # 2. Connect to Vector Store
    pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
    store = PGVector(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        connection_string=pg_conn_str,
        engine_args={"creator": getconn}
    )

    # 3. Setup The 'Filter Funnel' (Retriever + Reranker)
    # Step A: Fast retrieval of top 10 similar documents
    base_retriever = store.as_retriever(search_kwargs={"k": 10})

    # Step B: Precise reranking to find the top 3 most relevant
    reranker = VertexAIRank(
        project_id=PROJECT_ID,
        location_id="global", 
        ranking_config="default_ranking_config",
        title_field="source",
        top_n=3
    )

    # Combine A and B into a single retrieval object
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=reranker,
        base_retriever=base_retriever
    )

    # 4. Setup LLM (Gemini 2.5 Flash)
    # We use a low temperature (0.1) to reduce creativity and increase factual adherence.
    llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)

    # --- Execution Loop ---
    user_query = "Who is Harry Potter?"
    print(f"\nUser Query: {user_query}")
    print("Retrieving and Reranking documents...")

    # Retrieve the most relevant documents
    top_docs = compression_retriever.invoke(user_query)

    if not top_docs:
        print("No relevant documents found.")
        return

    # Build the Context String
    # We stitch the documents together, labeling them as Source 1, Source 2, etc.
    context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])

    print(f"Found {len(top_docs)} relevant context chunks.")

    # 5. The Grounded Prompt
    template = """You are a helpful assistant. Answer the question strictly based on the provided context.
    If the answer is not in the context, say "I don't know."

    Context:
    {context}

    Question:
    {question}

    Answer:
    """

    prompt = PromptTemplate(template=template, input_variables=["context", "question"])

    # Create the chain: Prompt -> LLM
    chain = prompt | llm

    print("Generating Answer via Gemini 2.5 Flash...")
    final_answer = chain.invoke({"context": context_str, "question": user_query})

    print(f"\nFINAL ANSWER:\n{final_answer}")

if __name__ == "__main__":
    main()
  1. Ejecuta la aplicación final:
python end_to_end_rag.py

Cómo comprender el resultado

Cuando ejecutes este script, observa la diferencia entre los fragmentos recuperados sin procesar (que viste en los pasos anteriores) y la respuesta final. El LLM actúa como un sintetizador: lee los "fragmentos" de texto proporcionados por el rerank y los une en una oración coherente y legible para los humanos.

Al encadenar estos componentes, pasas de una "suposición" estocástica a un flujo de trabajo determinístico y fundamentado. El Recuperador lanza la red, el Clasificador selecciona la mejor captura y el Generador cocina la comida.

11. Conclusión

¡Felicitaciones! Creaste con éxito una canalización de RAG avanzada que va mucho más allá de la búsqueda de vectores básica.

Resumen

  • Configuraste Cloud SQL con pgvector para el almacenamiento de vectores escalable.
  • Comparaste las estrategias de fragmentación para comprender cómo la preparación de datos afecta la recuperación.
  • Implementaste la función Reranking con Vertex AI para mejorar la precisión de tus resultados.
  • Utilizaste transformaciones de consultas (HyDE, Step-back) para alinear la intención del usuario con tus datos.

Más información

Del prototipo a la producción

Este lab forma parte de la ruta de aprendizaje de IA lista para producción con Google Cloud.