1. Introducción
Descripción general
La generación aumentada por recuperación (RAG) mejora las respuestas de los modelos de lenguaje grandes (LLM) fundamentándolas en conocimiento externo. Sin embargo, crear un sistema de RAG listo para la producción requiere más que una simple búsqueda de vectores. Debes optimizar la forma en que se ingresan los datos, cómo se clasifican los resultados relevantes y cómo se procesan las búsquedas de los usuarios.
En este lab integral, compilarás una aplicación de RAG sólida con Cloud SQL para PostgreSQL (extendido con pgvector) y Vertex AI. Avanzarás a través de tres técnicas avanzadas:
- Estrategias de fragmentación: Observarás cómo los diferentes métodos de división del texto (carácter, recursivo, token) afectan la calidad de la recuperación.
- Reclasificación: Implementarás Vertex AI Reranker para refinar los resultados de la búsqueda y abordar el problema de "pérdida en el medio".
- Transformación de la búsqueda: Usarás Gemini para optimizar las búsquedas de los usuarios con técnicas como HyDE (incorporación de documentos hipotéticos) y Step-back Prompting (instrucciones de retroceso).
Actividades
- Configura una instancia de Cloud SQL para PostgreSQL con
pgvector. - Compila una canalización de transferencia de datos que divida el texto en fragmentos con varias estrategias y almacene las incorporaciones en Cloud SQL.
- Realizar búsquedas semánticas y comparar la calidad de los resultados de diferentes métodos de fragmentación
- Integra un reranqueador para reordenar los documentos recuperados según su relevancia.
- Implementa transformaciones de consultas potenciadas por LLM para mejorar la recuperación de preguntas ambiguas o complejas.
Qué aprenderás
- Cómo usar LangChain con Vertex AI y Cloud SQL
- El impacto de los divisores de texto Character, Recursive y Token.
- Cómo implementar la búsqueda de vectores en PostgreSQL
- Cómo usar ContextualCompressionRetriever para volver a clasificar
- Cómo implementar HyDE y Step-back Prompting
2. Configura el proyecto
Cuenta de Google
Si aún no tienes una Cuenta de Google personal, debes crear una.
Usa una cuenta personal en lugar de una cuenta de trabajo o de institución educativa.
Accede a la consola de Google Cloud
Accede a la consola de Google Cloud con una Cuenta de Google personal.
Habilitar facturación
Canjea USD 5 en créditos de Google Cloud (opcional)
Para realizar este taller, necesitas una cuenta de facturación con algo de crédito. Si planeas usar tu propia facturación, puedes omitir este paso.
- Haz clic en este vínculo y accede con una Cuenta de Google personal. Verás algo como esto:

- Haz clic en el botón HAZ CLIC AQUÍ PARA ACCEDER A TU CRÉDITO. Esto te dirigirá a una página para configurar tu perfil de facturación

- Haz clic en Confirmar. Ahora estás conectado a una cuenta de facturación de prueba de Google Cloud Platform.

Configura una cuenta de facturación personal
Si configuraste la facturación con créditos de Google Cloud, puedes omitir este paso.
Para configurar una cuenta de facturación personal, ve aquí para habilitar la facturación en la consola de Cloud.
Notas:
- Completar este lab debería costar menos de USD 1 en recursos de Cloud.
- Puedes seguir los pasos al final de este lab para borrar recursos y evitar cargos adicionales.
- Los usuarios nuevos pueden acceder a la prueba gratuita de USD 300.
Crear un proyecto (opcional)
Si no tienes un proyecto actual que quieras usar para este lab, crea uno nuevo aquí.
3. Abre el editor de Cloud Shell
- Haz clic en este vínculo para navegar directamente al editor de Cloud Shell.
- Si se te solicita autorización en algún momento, haz clic en Autorizar para continuar.

- Si la terminal no aparece en la parte inferior de la pantalla, ábrela:
- Haz clic en Ver.
- Haz clic en Terminal.

- En la terminal, configura tu proyecto con este comando:
gcloud config set project [PROJECT_ID]- Ejemplo:
gcloud config set project lab-project-id-example - Si no recuerdas el ID de tu proyecto, puedes enumerar todos tus IDs de proyecto con el siguiente comando:
gcloud projects list
- Ejemplo:
- Deberías ver el siguiente mensaje:
Updated property [core/project].
4. Habilita las APIs
Para compilar esta solución, debes habilitar varias APIs de Google Cloud para Vertex AI, Cloud SQL y el servicio de reranking.
- En la terminal, habilita las APIs:
gcloud services enable \ aiplatform.googleapis.com \ sqladmin.googleapis.com \ cloudresourcemanager.googleapis.com \ serviceusage.googleapis.com \ discoveryengine.googleapis.com
Presentamos las APIs
- API de Vertex AI (
aiplatform.googleapis.com): Permite usar Gemini para la generación y Vertex AI Embeddings para vectorizar texto. - API de Cloud SQL Admin (
sqladmin.googleapis.com): Te permite administrar instancias de Cloud SQL de forma programática. - API de Discovery Engine (
discoveryengine.googleapis.com): Potencia las capacidades de Vertex AI Reranker. - API de Service Usage (
serviceusage.googleapis.com): Se requiere para verificar y administrar las cuotas de servicio.
5. Crea un entorno virtual y, luego, instala las dependencias
Antes de comenzar cualquier proyecto de Python, es una buena práctica crear un entorno virtual. Esto aísla las dependencias del proyecto y evita conflictos con otros proyectos o con los paquetes globales de Python del sistema.
- Crea una carpeta llamada
rag-labsy cámbiate a ella. Ejecuta el siguiente código en la terminal:mkdir rag-labs && cd rag-labs - Crea y activa un entorno virtual:
uv venv --python 3.12 source .venv/bin/activate - Crea un archivo
requirements.txtcon las dependencias necesarias. Ejecuta el siguiente código en la terminal:cloudshell edit requirements.txt - Pega las siguientes dependencias optimizadas en
requirements.txt. Estas versiones se fijan para evitar conflictos y acelerar la instalación.# Core LangChain & AI langchain-community==0.3.31 langchain-google-vertexai==2.1.2 langchain-google-community[vertexaisearch]==2.0.10 # Google Cloud google-cloud-storage==2.19.0 google-cloud-aiplatform[langchain]==1.130.0 # Database cloud-sql-python-connector[pg8000]==1.19.0 sqlalchemy==2.0.45 pgvector==0.4.2 # Utilities tiktoken==0.12.0 python-dotenv==1.2.1 requests==2.32.5 - Instala las dependencias:
uv pip install -r requirements.txt
6. Configura Cloud SQL para PostgreSQL
En esta tarea, aprovisionarás una instancia de Cloud SQL para PostgreSQL, crearás una base de datos y la prepararás para la búsqueda de vectores.
Define la configuración de Cloud SQL
- Crea un archivo
.envpara almacenar tu configuración. Ejecuta el siguiente código en la terminal:cloudshell edit .env - Pega la siguiente configuración en
.env.# Project Config PROJECT_ID="[YOUR_PROJECT_ID]" REGION="us-central1" # Database Config SQL_INSTANCE_NAME="rag-pg-instance-1" SQL_DATABASE_NAME="rag_harry_potter_db" SQL_USER="rag_user" SQL_PASSWORD="StrongPassword123!" # RAG Config PGVECTOR_COLLECTION_NAME="rag_harry_potter" RANKING_LOCATION_ID="global" # Connection Name (Auto-generated in scripts usually, but useful to have) DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}" - Reemplaza
[YOUR_PROJECT_ID]por el ID de tu proyecto de Google Cloud. (p.ej.,PROJECT_ID = "google-cloud-labs")
Si no recuerdas el ID de tu proyecto, ejecuta el siguiente comando en tu terminal. Se mostrará una lista de todos tus proyectos y sus IDs.gcloud projects list - Carga las variables en tu sesión de shell:
source .env
Crea la instancia y la base de datos
- Crea una instancia de Cloud SQL para PostgreSQL. Este comando crea una instancia pequeña adecuada para este lab.
gcloud sql instances create ${SQL_INSTANCE_NAME} \ --database-version=POSTGRES_15 \ --tier=db-g1-small \ --region=${REGION} \ --project=${PROJECT_ID} - Una vez que la instancia esté lista, crea la base de datos:
gcloud sql databases create ${SQL_DATABASE_NAME} \ --instance=${SQL_INSTANCE_NAME} \ --project=${PROJECT_ID} - Crea el usuario de la base de datos:
gcloud sql users create ${SQL_USER} \ --instance=${SQL_INSTANCE_NAME} \ --password=${SQL_PASSWORD} \ --project=${PROJECT_ID}
Habilita la extensión pgvector
La extensión pgvector permite que PostgreSQL almacene y busque embeddings de vectores. Debes habilitarlo de forma explícita en tu base de datos.
- Crea un script llamado
enable_pgvector.py. Ejecuta el siguiente código en la terminal:cloudshell edit enable_pgvector.py - Pega el siguiente código en
enable_pgvector.py. Esta secuencia de comandos se conecta a tu base de datos y ejecutaCREATE EXTENSION IF NOT EXISTS vector;.import os import sqlalchemy from google.cloud.sql.connector import Connector, IPTypes import logging from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO) # Config project_id = os.getenv("PROJECT_ID") region = os.getenv("REGION") instance_name = os.getenv("SQL_INSTANCE_NAME") db_user = os.getenv("SQL_USER") db_pass = os.getenv("SQL_PASSWORD") db_name = os.getenv("SQL_DATABASE_NAME") instance_connection_name = f"{project_id}:{region}:{instance_name}" def getconn(): with Connector() as connector: conn = connector.connect( instance_connection_name, "pg8000", user=db_user, password=db_pass, db=db_name, ip_type=IPTypes.PUBLIC, ) return conn def enable_pgvector(): pool = sqlalchemy.create_engine( "postgresql+pg8000://", creator=getconn, ) with pool.connect() as db_conn: # Check if extension exists result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone() if result: logging.info("pgvector extension is already enabled.") else: logging.info("Enabling pgvector extension...") db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;")) db_conn.commit() logging.info("pgvector extension enabled successfully.") if __name__ == "__main__": enable_pgvector() - Ejecuta la secuencia de comandos:
python enable_pgvector.py
7. Parte 1: Estrategias de fragmentación
El primer paso en cualquier canalización de RAG es transformar los documentos en un formato que el LLM pueda entender: fragmentos.
Los LLMs tienen un límite de ventana de contexto (la cantidad de texto que pueden procesar a la vez). Además, recuperar un documento de 50 páginas para responder una pregunta específica diluye la información. Dividimos los documentos en "fragmentos" más pequeños para aislar la información pertinente.
Sin embargo, cómo divides el texto es muy importante:
- Divisor de caracteres: Divide estrictamente según el recuento de caracteres. Es rápido, pero riesgoso, ya que puede cortar palabras o frases por la mitad y destruir el significado semántico.
- Recursive Splitter: Intenta dividir primero por párrafo, luego por oración y, por último, por palabra. Intenta mantener juntas las unidades semánticas.
- Token Splitter: Realiza la división según el vocabulario propio del LLM (tokens). Esto garantiza que los fragmentos se ajusten perfectamente a las ventanas de contexto, pero puede ser más costoso desde el punto de vista computacional generarlos.
En esta sección, transferirás los mismos datos con las tres estrategias para compararlas.
Crea la secuencia de comandos de transferencia
Usarás un script que descarga un conjunto de datos de Harry Potter, lo divide con las estrategias Character, Recursive y Token, y sube las incorporaciones a tres tablas separadas en Cloud SQL.
- Crea el archivo
ingest_data.py:cloudshell edit ingest_data.py - Pega el siguiente código corregido en
ingest_data.py. Esta versión analiza correctamente la estructura JSON del conjunto de datos.import os import json import logging import requests from typing import List, Dict, Any from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter from langchain.docstore.document import Document load_dotenv() logging.basicConfig(level=logging.INFO) # Configuration PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME") BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json" CHUNK_SIZE = 500 CHUNK_OVERLAP = 50 MAX_DOCS_TO_PROCESS = 10 # Database Connector def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def download_data(): logging.info(f"Downloading data from {BOOKS_JSON_URL}...") response = requests.get(BOOKS_JSON_URL) return response.json() def prepare_chunks(json_data, strategy): documents = [] # Iterate through the downloaded data for entry in json_data[:MAX_DOCS_TO_PROCESS]: # --- JSON PARSING LOGIC --- # The data structure nests content inside 'kwargs' -> 'page_content' if "kwargs" in entry and "page_content" in entry["kwargs"]: content = entry["kwargs"]["page_content"] # Extract metadata if available, ensuring it's a dict metadata = entry["kwargs"].get("metadata", {}) if not isinstance(metadata, dict): metadata = {"source": "unknown"} # Add the strategy to metadata for tracking metadata["strategy"] = strategy else: continue if not content: continue # Choose the splitter based on the strategy if strategy == "character": splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n") elif strategy == "token": splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) else: # default to recursive splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) # Split the content into chunks chunks = splitter.split_text(content) # Create Document objects for each chunk for chunk in chunks: documents.append(Document(page_content=chunk, metadata=metadata)) return documents def main(): logging.info("Initializing Embeddings...") embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) data = download_data() strategies = ["character", "recursive", "token"] # Connection string for PGVector (uses the getconn helper) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" for strategy in strategies: collection_name = f"{BASE_COLLECTION_NAME}_{strategy}" logging.info(f"--- Processing strategy: {strategy.upper()} ---") logging.info(f"Target Collection: {collection_name}") # Prepare documents with the specific strategy docs = prepare_chunks(data, strategy) if not docs: logging.warning(f"No documents generated for strategy {strategy}. Check data source.") continue logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...") # Initialize the Vector Store store = PGVector( collection_name=collection_name, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn}, pre_delete_collection=True # Clears old data for this collection before adding new ) # Batch add documents store.add_documents(docs) logging.info(f"Successfully finished {strategy}.\n") if __name__ == "__main__": main() - Ejecuta la secuencia de comandos de transferencia. Esto completará tu base de datos con tres tablas (colecciones) diferentes.
python ingest_data.py
Compara los resultados de la división en fragmentos
Ahora que se cargaron los datos, ejecutemos una consulta en las tres colecciones para ver cómo la estrategia de fragmentación afecta los resultados.
- Crea
query_chunking.py:cloudshell edit query_chunking.py - Pega el siguiente código en
query_chunking.py:import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector load_dotenv() logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean # Config PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME") def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" query = "Tell me about the Dursleys and their relationship with Harry Potter" print(f"\nQUERY: {query}\n" + "="*50) strategies = ["character", "recursive", "token"] for strategy in strategies: collection = f"{BASE_COLLECTION_NAME}_{strategy}" print(f"\nSTRATEGY: {strategy.upper()}") store = PGVector( collection_name=collection, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) results = store.similarity_search_with_score(query, k=2) for i, (doc, score) in enumerate(results): print(f" Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...") if __name__ == "__main__": main() - Ejecuta la secuencia de comandos de consulta:
python query_chunking.py
Observa el resultado.
Observa cómo la división Character podría cortar oraciones a mitad de una idea, mientras que Recursive intenta respetar los límites de los párrafos. La división de tokens garantiza que los fragmentos se ajusten perfectamente a las ventanas de contexto de los LLM, pero puede ignorar la estructura semántica.
8. Parte 2: Clasificación de nuevo
La búsqueda de vectores (recuperación) es increíblemente rápida porque se basa en representaciones matemáticas comprimidas (embeddings). Abarca una amplia variedad de resultados para garantizar la Recuperación (encontrar todos los elementos potencialmente relevantes), pero a menudo tiene una Precisión baja (la clasificación de esos elementos puede ser imperfecta).
A menudo, los documentos relevantes se “pierden en el medio” de la lista de resultados. Un LLM que presta atención a los 5 primeros resultados podría perderse la respuesta crucial que se encuentra en la posición núm. 7.
El nuevo ranking resuelve este problema agregando una segunda etapa.
- Recuperador: Recupera un conjunto más grande (p.ej., los 25 primeros) con una búsqueda vectorial rápida.
- Clasificador secundario: Usa un modelo especializado (como un codificador cruzado) para examinar el texto completo de los pares de documentos y búsquedas. Es más lento, pero mucho más preciso. Vuelve a puntuar los 25 primeros y devuelve los 3 mejores absolutos.
En esta tarea, buscarás en la colección recursive que creaste en la Parte 1, pero esta vez aplicarás el Vertex AI Reranker para refinar los resultados.
- Crea
query_reranking.py:cloudshell edit query_reranking.py - Pega el siguiente código. Observa cómo se segmenta explícitamente para la colección
_recursivey usaContextualCompressionRetriever.import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector # Reranking Imports from langchain.retrievers import ContextualCompressionRetriever from langchain_google_community.vertex_rank import VertexAIRank load_dotenv() logging.basicConfig(level=logging.ERROR) PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" # IMPORTANT: Target the recursive collection created in ingest_data.py COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive" RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID") def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" print(f"Connecting to collection: {COLLECTION_NAME}") store = PGVector( collection_name=COLLECTION_NAME, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) query = "What are the Horcruxes?" print(f"QUERY: {query}\n") # 1. Base Retriever (Vector Search) - Fetch top 10 base_retriever = store.as_retriever(search_kwargs={"k": 10}) # 2. Reranker - Select top 3 from the 10 reranker = VertexAIRank( project_id=PROJECT_ID, location_id=RANKING_LOCATION, ranking_config="default_ranking_config", title_field="source", top_n=3 ) compression_retriever = ContextualCompressionRetriever( base_compressor=reranker, base_retriever=base_retriever ) # Execute try: reranked_docs = compression_retriever.invoke(query) if not reranked_docs: print("No documents returned. Check if the collection exists and is populated.") print(f"--- Top 3 Reranked Results ---") for i, doc in enumerate(reranked_docs): print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):") print(f" {doc.page_content[:200]}...\n") except Exception as e: print(f"Error during reranking: {e}") if __name__ == "__main__": main() - Ejecuta la consulta de nuevo ranking:
python query_reranking.py
Observar
Es posible que observes puntuaciones de relevancia más altas o un orden diferente en comparación con una búsqueda de vectores sin procesar. Esto garantiza que el LLM reciba el contexto más preciso posible.
9. Parte 3: Transformación de la consulta
A menudo, el mayor cuello de botella en la RAG es el usuario. Las búsquedas de los usuarios suelen ser ambiguas, incompletas o estar mal redactadas. Si la incorporación de la búsqueda no se alinea matemáticamente con la incorporación del documento, la recuperación falla.
La Transformación de la búsqueda usa un LLM para reescribir o expandir la búsqueda antes de que llegue a la base de datos. Implementarás dos técnicas:
- HyDE (incorporación de documentos hipotéticos): La similitud vectorial entre una pregunta y una respuesta suele ser menor que la similitud entre una respuesta y una respuesta hipotética. HyDE le pide al LLM que alucine una respuesta perfecta, la incorpora y busca documentos que se parezcan a la alucinación.
- Indicaciones de retroceso: Si un usuario hace una pregunta específica y detallada, es posible que el sistema no capte el contexto más amplio. Las instrucciones de retroceso le piden al LLM que genere una pregunta abstracta de nivel superior ("¿Cuál es la historia de esta familia?") para recuperar información fundamental junto con los detalles específicos.
- Crea
query_transformation.py:cloudshell edit query_transformation.py - Pega el siguiente código:
import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings, VertexAI from langchain_community.vectorstores import PGVector from langchain_core.prompts import PromptTemplate load_dotenv() logging.basicConfig(level=logging.ERROR) PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive" def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def generate_hyde_doc(query, llm): prompt = PromptTemplate( input_variables=["question"], template="Write a concise, hypothetical answer to the question. Question: {question} Answer:" ) chain = prompt | llm return chain.invoke({"question": query}) def generate_step_back(query, llm): prompt = PromptTemplate( input_variables=["question"], template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:" ) chain = prompt | llm return chain.invoke({"question": query}) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" store = PGVector( collection_name=COLLECTION_NAME, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) retriever = store.as_retriever(search_kwargs={"k": 2}) original_query = "Tell me about the Dursleys." print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30) # 1. HyDE hyde_doc = generate_hyde_doc(original_query, llm) print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...") hyde_results = retriever.invoke(hyde_doc) print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n") # 2. Step-back step_back_q = generate_step_back(original_query, llm) print(f"Step-back Query: {step_back_q.strip()}") step_results = retriever.invoke(step_back_q) print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...") if __name__ == "__main__": main() - Ejecuta la secuencia de comandos de transformación:
python query_transformation.py
Observa el resultado.
Observa cómo la búsqueda de Step-back puede recuperar un contexto más amplio sobre la historia familiar de los Dursley, mientras que HyDE se enfoca en los detalles específicos que se generan en la respuesta hipotética.
10. Parte 4: Generación de extremo a extremo
Dividimos nuestros datos, refinamos nuestra búsqueda y pulimos la búsqueda del usuario. Ahora, finalmente, agregamos la "G" a RAG: Generación.
Hasta ahora, solo hemos estado encontrando información. Para crear un verdadero asistente de IA, debemos ingresar esos documentos de alta calidad y con una nueva clasificación en un LLM (Gemini) para sintetizar una respuesta en lenguaje natural.
En una canalización de producción, esto implica un flujo específico:
- Recuperación: Obtén un amplio conjunto de candidatos (p.ej., Top 10) con la búsqueda de vectores rápida.
- Volver a clasificar: Filtra hasta obtener los mejores resultados absolutos (p.ej., Los 3 primeros) con Vertex AI Reranker.
- Construcción del contexto: Une el contenido de los 3 documentos principales en una sola cadena.
- Instrucciones fundamentadas: Inserta esa cadena de contexto en una plantilla de instrucciones estricta que obligue al LLM a usar solo esa información.
Crea la secuencia de comandos de generación
Usaremos gemini-2.5-flash para el paso de generación. Este modelo es ideal para RAG porque tiene una ventana de contexto larga y baja latencia, lo que le permite procesar varios documentos recuperados rápidamente.
- Crea
end_to_end_rag.py:
cloudshell edit end_to_end_rag.py
- Pega el siguiente código. Presta atención a la variable
template, ya que es donde le indicamos estrictamente al modelo que evite las "alucinaciones" (inventar cosas) vinculándolo al contexto proporcionado.
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
load_dotenv()
logging.basicConfig(level=logging.ERROR)
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
def getconn():
instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
with Connector() as connector:
return connector.connect(
instance_conn, "pg8000",
user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
)
def main():
print("--- Initializing Production RAG Pipeline ---")
# 1. Setup Embeddings (Gemini Embedding 001)
# We use this to vectorize the user's query to match our database.
embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
# 2. Connect to Vector Store
pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
store = PGVector(
collection_name=COLLECTION_NAME,
embedding_function=embeddings,
connection_string=pg_conn_str,
engine_args={"creator": getconn}
)
# 3. Setup The 'Filter Funnel' (Retriever + Reranker)
# Step A: Fast retrieval of top 10 similar documents
base_retriever = store.as_retriever(search_kwargs={"k": 10})
# Step B: Precise reranking to find the top 3 most relevant
reranker = VertexAIRank(
project_id=PROJECT_ID,
location_id="global",
ranking_config="default_ranking_config",
title_field="source",
top_n=3
)
# Combine A and B into a single retrieval object
compression_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=base_retriever
)
# 4. Setup LLM (Gemini 2.5 Flash)
# We use a low temperature (0.1) to reduce creativity and increase factual adherence.
llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)
# --- Execution Loop ---
user_query = "Who is Harry Potter?"
print(f"\nUser Query: {user_query}")
print("Retrieving and Reranking documents...")
# Retrieve the most relevant documents
top_docs = compression_retriever.invoke(user_query)
if not top_docs:
print("No relevant documents found.")
return
# Build the Context String
# We stitch the documents together, labeling them as Source 1, Source 2, etc.
context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])
print(f"Found {len(top_docs)} relevant context chunks.")
# 5. The Grounded Prompt
template = """You are a helpful assistant. Answer the question strictly based on the provided context.
If the answer is not in the context, say "I don't know."
Context:
{context}
Question:
{question}
Answer:
"""
prompt = PromptTemplate(template=template, input_variables=["context", "question"])
# Create the chain: Prompt -> LLM
chain = prompt | llm
print("Generating Answer via Gemini 2.5 Flash...")
final_answer = chain.invoke({"context": context_str, "question": user_query})
print(f"\nFINAL ANSWER:\n{final_answer}")
if __name__ == "__main__":
main()
- Ejecuta la aplicación final:
python end_to_end_rag.py
Cómo comprender el resultado
Cuando ejecutes este script, observa la diferencia entre los fragmentos recuperados sin procesar (que viste en los pasos anteriores) y la respuesta final. El LLM actúa como un sintetizador: lee los "fragmentos" de texto proporcionados por el rerank y los une en una oración coherente y legible para los humanos.
Al encadenar estos componentes, pasas de una "suposición" estocástica a un flujo de trabajo determinístico y fundamentado. El Recuperador lanza la red, el Clasificador selecciona la mejor captura y el Generador cocina la comida.
11. Conclusión
¡Felicitaciones! Creaste con éxito una canalización de RAG avanzada que va mucho más allá de la búsqueda de vectores básica.
Resumen
- Configuraste Cloud SQL con pgvector para el almacenamiento de vectores escalable.
- Comparaste las estrategias de fragmentación para comprender cómo la preparación de datos afecta la recuperación.
- Implementaste la función Reranking con Vertex AI para mejorar la precisión de tus resultados.
- Utilizaste transformaciones de consultas (HyDE, Step-back) para alinear la intención del usuario con tus datos.
Más información
- Arquitecturas de referencia de RAG: Explora una lista de guías de arquitecturas de referencia relacionadas con RAG.
Del prototipo a la producción
Este lab forma parte de la ruta de aprendizaje de IA lista para producción con Google Cloud.
- Explora el plan de estudios completo para cerrar la brecha entre el prototipo y la producción.
- Comparte tu progreso con el hashtag #ProductionReadyAI.