1. Introduction
Présentation
La génération augmentée par récupération (RAG) améliore les réponses des grands modèles de langage (LLM) en les ancrant dans des connaissances externes. Toutefois, la création d'un système RAG prêt pour la production nécessite plus qu'une simple recherche vectorielle. Vous devez optimiser l'ingestion des données, le classement des résultats pertinents et le traitement des requêtes utilisateur.
Dans cet atelier complet, vous allez créer une application RAG robuste à l'aide de Cloud SQL pour PostgreSQL (étendu avec pgvector) et Vertex AI. Vous allez découvrir trois techniques avancées :
- Stratégies de segmentation : vous observerez l'impact de différentes méthodes de segmentation de texte (caractère, récursif, jeton) sur la qualité de la récupération.
- Reclassement : vous allez implémenter le reclasseur Vertex AI pour affiner les résultats de recherche et résoudre le problème de "perte au milieu".
- Transformation des requêtes : vous utiliserez Gemini pour optimiser les requêtes des utilisateurs à l'aide de techniques telles que HyDE (Hypothetical Document Embeddings) et Step-back Prompting.
Objectifs de l'atelier
- Configurez une instance Cloud SQL pour PostgreSQL avec
pgvector. - Créez un pipeline d'ingestion de données qui segmente le texte à l'aide de plusieurs stratégies et stocke les embeddings dans Cloud SQL.
- Effectuez des recherches sémantiques et comparez la qualité des résultats obtenus avec différentes méthodes de segmentation.
- Intégrez un reranker pour réorganiser les documents récupérés en fonction de leur pertinence.
- Implémenter des transformations de requêtes optimisées par les LLM pour améliorer la récupération des questions ambiguës ou complexes.
Points abordés
- Découvrez comment utiliser LangChain avec Vertex AI et Cloud SQL.
- Impact des fractionneurs de texte Character, Recursive et Token.
- Découvrez comment implémenter la recherche vectorielle dans PostgreSQL.
- Comment utiliser ContextualCompressionRetriever pour le reclassement.
- Comment implémenter HyDE et Step-back Prompting.
2. Configuration du projet
Compte Google
Si vous ne possédez pas encore de compte Google personnel, vous devez en créer un.
Utilisez un compte personnel au lieu d'un compte professionnel ou scolaire.
Se connecter à la console Google Cloud
Connectez-vous à la console Google Cloud à l'aide d'un compte Google personnel.
Activer la facturation
Utiliser 5 $de crédits Google Cloud (facultatif)
Pour suivre cet atelier, vous avez besoin d'un compte de facturation avec un certain crédit. Si vous prévoyez d'utiliser votre propre facturation, vous pouvez ignorer cette étape.
- Cliquez sur ce lien et connectez-vous avec un compte Google personnel. Le résultat qui s'affiche doit ressembler à ceci :

- Cliquez sur le bouton CLIQUEZ ICI POUR ACCÉDER À VOS CRÉDITS. Vous serez redirigé vers une page permettant de configurer votre profil de facturation
. - Cliquez sur Confirmer. Vous êtes désormais connecté à un compte de facturation d'essai Google Cloud Platform.

Configurer un compte de facturation personnel
Si vous avez configuré la facturation à l'aide de crédits Google Cloud, vous pouvez ignorer cette étape.
Pour configurer un compte de facturation personnel, cliquez ici pour activer la facturation dans la console Cloud.
Remarques :
- L'exécution de cet atelier devrait coûter moins de 1 USD en ressources Cloud.
- Vous pouvez suivre les étapes à la fin de cet atelier pour supprimer les ressources et éviter ainsi que des frais supplémentaires ne vous soient facturés.
- Les nouveaux utilisateurs peuvent bénéficier d'un essai sans frais pour un crédit de 300$.
Créer un projet (facultatif)
Si vous n'avez pas de projet que vous souhaitez utiliser pour cet atelier, créez-en un.
3. Ouvrir l'éditeur Cloud Shell
- Cliquez sur ce lien pour accéder directement à l'éditeur Cloud Shell.
- Si vous êtes invité à autoriser l'accès à un moment donné, cliquez sur Autoriser pour continuer.

- Si le terminal ne s'affiche pas en bas de l'écran, ouvrez-le :
- Cliquez sur Afficher.
- Cliquez sur Terminal
.
- Dans le terminal, définissez votre projet à l'aide de la commande suivante :
gcloud config set project [PROJECT_ID]- Exemple :
gcloud config set project lab-project-id-example - Si vous ne vous souvenez pas de l'ID de votre projet, vous pouvez lister tous vos ID de projet avec la commande suivante :
gcloud projects list
- Exemple :
- Le message suivant doit s'afficher :
Updated property [core/project].
4. Activer les API
Pour créer cette solution, vous devez activer plusieurs API Google Cloud pour Vertex AI, Cloud SQL et le service de réorganisation.
- Dans le terminal, activez les API :
gcloud services enable \ aiplatform.googleapis.com \ sqladmin.googleapis.com \ cloudresourcemanager.googleapis.com \ serviceusage.googleapis.com \ discoveryengine.googleapis.com
Présentation des API
- API Vertex AI (
aiplatform.googleapis.com) : permet d'utiliser Gemini pour la génération et Vertex AI Embeddings pour la vectorisation de texte. - API Cloud SQL Admin (
sqladmin.googleapis.com) : vous permet de gérer les instances Cloud SQL de manière programmatique. - L'API Discovery Engine (
discoveryengine.googleapis.com) : elle permet d'utiliser les fonctionnalités de Vertex AI Reranker. - API Service Usage (
serviceusage.googleapis.com) : requise pour vérifier et gérer les quotas de service.
5. Créer un environnement virtuel et installer les dépendances
Avant de commencer un projet Python, il est recommandé de créer un environnement virtuel. Cela isole les dépendances du projet, ce qui évite les conflits avec d'autres projets ou les packages Python globaux du système.
- Créez un dossier nommé
rag-labset accédez-y. Exécutez le code suivant dans le terminal :mkdir rag-labs && cd rag-labs - Créez et activez un environnement virtuel :
uv venv --python 3.12 source .venv/bin/activate - Créez un fichier
requirements.txtavec les dépendances nécessaires. Exécutez le code suivant dans le terminal :cloudshell edit requirements.txt - Collez les dépendances optimisées suivantes dans
requirements.txt. Ces versions sont épinglées pour éviter les conflits et accélérer l'installation.# Core LangChain & AI langchain-community==0.3.31 langchain-google-vertexai==2.1.2 langchain-google-community[vertexaisearch]==2.0.10 # Google Cloud google-cloud-storage==2.19.0 google-cloud-aiplatform[langchain]==1.130.0 # Database cloud-sql-python-connector[pg8000]==1.19.0 sqlalchemy==2.0.45 pgvector==0.4.2 # Utilities tiktoken==0.12.0 python-dotenv==1.2.1 requests==2.32.5 - Installez les dépendances :
uv pip install -r requirements.txt
6. Configurer Cloud SQL pour PostgreSQL
Dans cette tâche, vous allez provisionner une instance Cloud SQL pour PostgreSQL, créer une base de données et la préparer pour la recherche vectorielle.
Définir la configuration Cloud SQL
- Créez un fichier
.envpour stocker votre configuration. Exécutez le code suivant dans le terminal :cloudshell edit .env - Collez la configuration suivante dans
.env.# Project Config PROJECT_ID="[YOUR_PROJECT_ID]" REGION="us-central1" # Database Config SQL_INSTANCE_NAME="rag-pg-instance-1" SQL_DATABASE_NAME="rag_harry_potter_db" SQL_USER="rag_user" SQL_PASSWORD="StrongPassword123!" # RAG Config PGVECTOR_COLLECTION_NAME="rag_harry_potter" RANKING_LOCATION_ID="global" # Connection Name (Auto-generated in scripts usually, but useful to have) DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}" - Remplacez
[YOUR_PROJECT_ID]par l'ID de votre projet Google Cloud. (par exemple,PROJECT_ID = "google-cloud-labs")
Si vous ne vous souvenez pas de votre ID de projet, exécutez la commande suivante dans votre terminal. La liste de tous vos projets et de leurs ID s'affiche.gcloud projects list - Chargez les variables dans votre session d'interface système :
source .env
Créer l'instance et la base de données
- Créez une instance Cloud SQL pour PostgreSQL. Cette commande crée une petite instance adaptée à cet atelier.
gcloud sql instances create ${SQL_INSTANCE_NAME} \ --database-version=POSTGRES_15 \ --tier=db-g1-small \ --region=${REGION} \ --project=${PROJECT_ID} - Une fois l'instance prête, créez la base de données :
gcloud sql databases create ${SQL_DATABASE_NAME} \ --instance=${SQL_INSTANCE_NAME} \ --project=${PROJECT_ID} - Créez l'utilisateur de la base de données :
gcloud sql users create ${SQL_USER} \ --instance=${SQL_INSTANCE_NAME} \ --password=${SQL_PASSWORD} \ --project=${PROJECT_ID}
Activer l'extension pgvector
L'extension pgvector permet à PostgreSQL de stocker et de rechercher des embeddings vectoriels. Vous devez l'activer explicitement sur votre base de données.
- Créez un script nommé
enable_pgvector.py. Exécutez le code suivant dans le terminal :cloudshell edit enable_pgvector.py - Collez le code suivant dans
enable_pgvector.py. Ce script se connecte à votre base de données et exécuteCREATE EXTENSION IF NOT EXISTS vector;.import os import sqlalchemy from google.cloud.sql.connector import Connector, IPTypes import logging from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO) # Config project_id = os.getenv("PROJECT_ID") region = os.getenv("REGION") instance_name = os.getenv("SQL_INSTANCE_NAME") db_user = os.getenv("SQL_USER") db_pass = os.getenv("SQL_PASSWORD") db_name = os.getenv("SQL_DATABASE_NAME") instance_connection_name = f"{project_id}:{region}:{instance_name}" def getconn(): with Connector() as connector: conn = connector.connect( instance_connection_name, "pg8000", user=db_user, password=db_pass, db=db_name, ip_type=IPTypes.PUBLIC, ) return conn def enable_pgvector(): pool = sqlalchemy.create_engine( "postgresql+pg8000://", creator=getconn, ) with pool.connect() as db_conn: # Check if extension exists result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone() if result: logging.info("pgvector extension is already enabled.") else: logging.info("Enabling pgvector extension...") db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;")) db_conn.commit() logging.info("pgvector extension enabled successfully.") if __name__ == "__main__": enable_pgvector() - Exécutez le script :
python enable_pgvector.py
7. Partie 1 : Stratégies de fragmentation
La première étape de tout pipeline RAG consiste à transformer les documents dans un format que le LLM peut comprendre : les chunks.
Les LLM ont une limite de fenêtre de contexte (la quantité de texte qu'ils peuvent traiter à la fois). De plus, récupérer un document de 50 pages pour répondre à une question spécifique dilue l'information. Nous divisons les documents en "blocs" plus petits pour isoler les informations pertinentes.
Toutefois, la façon dont vous fractionnez le texte est extrêmement importante :
- Fractionneur de caractères : fractionne strictement en fonction du nombre de caractères. Cette méthode est rapide, mais risquée, car elle peut couper des mots ou des phrases en deux, ce qui détruit le sens sémantique.
- Recursive Splitter : tente d'abord de diviser par paragraphe, puis par phrase, puis par mot. Il essaie de regrouper les unités sémantiques.
- Token Splitter : divise le texte en fonction du vocabulaire (jetons) du LLM. Cela permet de s'assurer que les blocs s'intègrent parfaitement dans les fenêtres de contexte, mais peut être plus coûteux en termes de calculs.
Dans cette section, vous ingérerez les mêmes données en utilisant les trois stratégies pour les comparer.
Créer le script d'ingestion
Vous allez utiliser un script qui télécharge un ensemble de données Harry Potter, le divise à l'aide des stratégies Character, Recursive et Token, puis importe les embeddings dans trois tables distinctes de Cloud SQL.
- Créez le fichier
ingest_data.py:cloudshell edit ingest_data.py - Collez le code corrigé suivant dans
ingest_data.py. Cette version analyse correctement la structure JSON de l'ensemble de données.import os import json import logging import requests from typing import List, Dict, Any from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter from langchain.docstore.document import Document load_dotenv() logging.basicConfig(level=logging.INFO) # Configuration PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME") BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json" CHUNK_SIZE = 500 CHUNK_OVERLAP = 50 MAX_DOCS_TO_PROCESS = 10 # Database Connector def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def download_data(): logging.info(f"Downloading data from {BOOKS_JSON_URL}...") response = requests.get(BOOKS_JSON_URL) return response.json() def prepare_chunks(json_data, strategy): documents = [] # Iterate through the downloaded data for entry in json_data[:MAX_DOCS_TO_PROCESS]: # --- JSON PARSING LOGIC --- # The data structure nests content inside 'kwargs' -> 'page_content' if "kwargs" in entry and "page_content" in entry["kwargs"]: content = entry["kwargs"]["page_content"] # Extract metadata if available, ensuring it's a dict metadata = entry["kwargs"].get("metadata", {}) if not isinstance(metadata, dict): metadata = {"source": "unknown"} # Add the strategy to metadata for tracking metadata["strategy"] = strategy else: continue if not content: continue # Choose the splitter based on the strategy if strategy == "character": splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n") elif strategy == "token": splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) else: # default to recursive splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) # Split the content into chunks chunks = splitter.split_text(content) # Create Document objects for each chunk for chunk in chunks: documents.append(Document(page_content=chunk, metadata=metadata)) return documents def main(): logging.info("Initializing Embeddings...") embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) data = download_data() strategies = ["character", "recursive", "token"] # Connection string for PGVector (uses the getconn helper) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" for strategy in strategies: collection_name = f"{BASE_COLLECTION_NAME}_{strategy}" logging.info(f"--- Processing strategy: {strategy.upper()} ---") logging.info(f"Target Collection: {collection_name}") # Prepare documents with the specific strategy docs = prepare_chunks(data, strategy) if not docs: logging.warning(f"No documents generated for strategy {strategy}. Check data source.") continue logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...") # Initialize the Vector Store store = PGVector( collection_name=collection_name, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn}, pre_delete_collection=True # Clears old data for this collection before adding new ) # Batch add documents store.add_documents(docs) logging.info(f"Successfully finished {strategy}.\n") if __name__ == "__main__": main() - Exécutez le script d'ingestion. Votre base de données sera alors remplie avec trois tables (collections) différentes.
python ingest_data.py
Comparer les résultats du découpage
Maintenant que les données sont chargées, exécutons une requête sur les trois collections pour voir comment la stratégie de segmentation affecte les résultats.
- Créez
query_chunking.py:cloudshell edit query_chunking.py - Collez le code suivant dans
query_chunking.py:import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector load_dotenv() logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean # Config PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME") def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" query = "Tell me about the Dursleys and their relationship with Harry Potter" print(f"\nQUERY: {query}\n" + "="*50) strategies = ["character", "recursive", "token"] for strategy in strategies: collection = f"{BASE_COLLECTION_NAME}_{strategy}" print(f"\nSTRATEGY: {strategy.upper()}") store = PGVector( collection_name=collection, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) results = store.similarity_search_with_score(query, k=2) for i, (doc, score) in enumerate(results): print(f" Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...") if __name__ == "__main__": main() - Exécutez le script de requête :
python query_chunking.py
Observez le résultat.
Notez que la division Caractère peut couper les phrases en plein milieu, tandis que Récursif tente de respecter les limites des paragraphes. La segmentation par jetons permet de s'assurer que les blocs s'intègrent parfaitement aux fenêtres de contexte des LLM, mais peut ignorer la structure sémantique.
8. Partie 2 : Réorganisation
La recherche vectorielle (récupération) est incroyablement rapide, car elle repose sur des représentations mathématiques compressées (embeddings). Il couvre un large éventail de résultats pour assurer le rappel (trouver tous les éléments potentiellement pertinents), mais il souffre souvent d'une faible précision (le classement de ces éléments peut être imparfait).
Souvent, les documents pertinents sont "perdus au milieu" de la liste des résultats. Un LLM qui ne prête attention qu'aux cinq premiers résultats risque de passer à côté de la réponse cruciale qui se trouve en septième position.
Le reranking résout ce problème en ajoutant une deuxième étape.
- Récupérateur : récupère un ensemble plus important (par exemple, les 25 premiers) à l'aide d'une recherche vectorielle rapide.
- Reranker : utilise un modèle spécialisé (comme un Cross-Encoder) pour examiner le texte intégral des paires de requêtes et de documents. Elle est plus lente, mais beaucoup plus précise. Il recalcule le score des 25 premiers résultats et renvoie les trois meilleurs résultats absolus.
Dans cette tâche, vous allez effectuer une recherche dans la collection recursive créée dans la partie 1, mais cette fois, vous allez appliquer Vertex AI Reranker pour affiner les résultats.
- Créez
query_reranking.py:cloudshell edit query_reranking.py - Collez le code suivant. Notez qu'il cible explicitement la collection
_recursiveet utiliseContextualCompressionRetriever.import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings from langchain_community.vectorstores import PGVector # Reranking Imports from langchain.retrievers import ContextualCompressionRetriever from langchain_google_community.vertex_rank import VertexAIRank load_dotenv() logging.basicConfig(level=logging.ERROR) PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" # IMPORTANT: Target the recursive collection created in ingest_data.py COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive" RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID") def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" print(f"Connecting to collection: {COLLECTION_NAME}") store = PGVector( collection_name=COLLECTION_NAME, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) query = "What are the Horcruxes?" print(f"QUERY: {query}\n") # 1. Base Retriever (Vector Search) - Fetch top 10 base_retriever = store.as_retriever(search_kwargs={"k": 10}) # 2. Reranker - Select top 3 from the 10 reranker = VertexAIRank( project_id=PROJECT_ID, location_id=RANKING_LOCATION, ranking_config="default_ranking_config", title_field="source", top_n=3 ) compression_retriever = ContextualCompressionRetriever( base_compressor=reranker, base_retriever=base_retriever ) # Execute try: reranked_docs = compression_retriever.invoke(query) if not reranked_docs: print("No documents returned. Check if the collection exists and is populated.") print(f"--- Top 3 Reranked Results ---") for i, doc in enumerate(reranked_docs): print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):") print(f" {doc.page_content[:200]}...\n") except Exception as e: print(f"Error during reranking: {e}") if __name__ == "__main__": main() - Exécutez la requête de réorganisation :
python query_reranking.py
Observer
Vous remarquerez peut-être des scores de pertinence plus élevés ou un ordre différent par rapport à une recherche vectorielle brute. Cela permet au LLM de recevoir le contexte le plus précis possible.
9. Partie 3 : Transformation des requêtes
Souvent, le principal goulot d'étranglement dans le RAG est l'utilisateur. Les requêtes utilisateur sont souvent ambiguës, incomplètes ou mal formulées. Si l'embedding de la requête ne correspond pas mathématiquement à l'embedding du document, la récupération échoue.
La transformation des requêtes utilise un LLM pour réécrire ou développer la requête avant qu'elle n'atteigne la base de données. Vous allez implémenter deux techniques :
- HyDE (Hypothetical Document Embeddings) : la similarité vectorielle entre une question et une réponse est souvent inférieure à celle entre une réponse et une réponse hypothétique. HyDE demande au LLM d'halluciner une réponse parfaite, l'intègre et recherche des documents qui ressemblent à l'hallucination.
- Step-back Prompting : si un utilisateur pose une question détaillée et spécifique, le système peut passer à côté du contexte plus large. L'incitation "step-back" demande au LLM de générer une question abstraite de niveau supérieur ("Quelle est l'histoire de cette famille ?") pour récupérer des informations de base en plus des détails spécifiques.
- Créez
query_transformation.py:cloudshell edit query_transformation.py - Collez le code suivant :
import os import logging from dotenv import load_dotenv from google.cloud.sql.connector import Connector, IPTypes from langchain_google_vertexai import VertexAIEmbeddings, VertexAI from langchain_community.vectorstores import PGVector from langchain_core.prompts import PromptTemplate load_dotenv() logging.basicConfig(level=logging.ERROR) PROJECT_ID = os.getenv("PROJECT_ID") REGION = os.getenv("REGION") DB_USER = os.getenv("SQL_USER") DB_PASS = os.getenv("SQL_PASSWORD") DB_NAME = os.getenv("SQL_DATABASE_NAME") INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}" COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive" def getconn(): with Connector() as connector: return connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, ) def generate_hyde_doc(query, llm): prompt = PromptTemplate( input_variables=["question"], template="Write a concise, hypothetical answer to the question. Question: {question} Answer:" ) chain = prompt | llm return chain.invoke({"question": query}) def generate_step_back(query, llm): prompt = PromptTemplate( input_variables=["question"], template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:" ) chain = prompt | llm return chain.invoke({"question": query}) def main(): embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION) llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5) pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}" store = PGVector( collection_name=COLLECTION_NAME, embedding_function=embeddings, connection_string=pg_conn_str, engine_args={"creator": getconn} ) retriever = store.as_retriever(search_kwargs={"k": 2}) original_query = "Tell me about the Dursleys." print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30) # 1. HyDE hyde_doc = generate_hyde_doc(original_query, llm) print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...") hyde_results = retriever.invoke(hyde_doc) print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n") # 2. Step-back step_back_q = generate_step_back(original_query, llm) print(f"Step-back Query: {step_back_q.strip()}") step_results = retriever.invoke(step_back_q) print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...") if __name__ == "__main__": main() - Exécutez le script de transformation :
python query_transformation.py
Observez le résultat.
Remarquez que la requête Step-back peut récupérer un contexte plus large sur l'histoire de la famille Dursley, tandis que HyDE se concentre sur les détails spécifiques générés dans la réponse hypothétique.
10. Partie 4 : Génération de bout en bout
Nous avons découpé nos données, affiné notre recherche et peaufiné la requête de l'utilisateur. Maintenant, nous allons enfin aborder la partie "G" de RAG : la génération.
Jusqu'à présent, nous nous sommes contentés de trouver des informations. Pour créer un véritable assistant IA, nous devons fournir ces documents de haute qualité et réorganisés à un LLM (Gemini) afin de synthétiser une réponse en langage naturel.
Dans un pipeline de production, cela implique un flux spécifique :
- Récupérer : obtenez un large ensemble de candidats (par exemple, Top 10) à l'aide de la recherche vectorielle rapide.
- Réorganiser : ne conservez que les meilleurs résultats (par exemple, Top 3) à l'aide de Vertex AI Reranker.
- Construction du contexte : assemblez le contenu des trois premiers documents en une seule chaîne.
- Requêtes ancrées : insérez cette chaîne de contexte dans un modèle de requête strict qui force le LLM à utiliser uniquement ces informations.
Créer le script de génération
Nous utiliserons gemini-2.5-flash pour l'étape de génération. Ce modèle est idéal pour la RAG, car il dispose d'une longue fenêtre de contexte et d'une faible latence, ce qui lui permet de traiter rapidement plusieurs documents récupérés.
- Créez
end_to_end_rag.py:
cloudshell edit end_to_end_rag.py
- Collez le code suivant. Faites attention à la variable
template. C'est là que nous demandons explicitement au modèle d'éviter les "hallucinations" (c'est-à-dire d'inventer des choses) en le liant au contexte fourni.
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
load_dotenv()
logging.basicConfig(level=logging.ERROR)
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
def getconn():
instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
with Connector() as connector:
return connector.connect(
instance_conn, "pg8000",
user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
)
def main():
print("--- Initializing Production RAG Pipeline ---")
# 1. Setup Embeddings (Gemini Embedding 001)
# We use this to vectorize the user's query to match our database.
embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
# 2. Connect to Vector Store
pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
store = PGVector(
collection_name=COLLECTION_NAME,
embedding_function=embeddings,
connection_string=pg_conn_str,
engine_args={"creator": getconn}
)
# 3. Setup The 'Filter Funnel' (Retriever + Reranker)
# Step A: Fast retrieval of top 10 similar documents
base_retriever = store.as_retriever(search_kwargs={"k": 10})
# Step B: Precise reranking to find the top 3 most relevant
reranker = VertexAIRank(
project_id=PROJECT_ID,
location_id="global",
ranking_config="default_ranking_config",
title_field="source",
top_n=3
)
# Combine A and B into a single retrieval object
compression_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=base_retriever
)
# 4. Setup LLM (Gemini 2.5 Flash)
# We use a low temperature (0.1) to reduce creativity and increase factual adherence.
llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)
# --- Execution Loop ---
user_query = "Who is Harry Potter?"
print(f"\nUser Query: {user_query}")
print("Retrieving and Reranking documents...")
# Retrieve the most relevant documents
top_docs = compression_retriever.invoke(user_query)
if not top_docs:
print("No relevant documents found.")
return
# Build the Context String
# We stitch the documents together, labeling them as Source 1, Source 2, etc.
context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])
print(f"Found {len(top_docs)} relevant context chunks.")
# 5. The Grounded Prompt
template = """You are a helpful assistant. Answer the question strictly based on the provided context.
If the answer is not in the context, say "I don't know."
Context:
{context}
Question:
{question}
Answer:
"""
prompt = PromptTemplate(template=template, input_variables=["context", "question"])
# Create the chain: Prompt -> LLM
chain = prompt | llm
print("Generating Answer via Gemini 2.5 Flash...")
final_answer = chain.invoke({"context": context_str, "question": user_query})
print(f"\nFINAL ANSWER:\n{final_answer}")
if __name__ == "__main__":
main()
- Exécutez l'application finale :
python end_to_end_rag.py
Comprendre le résultat
Lorsque vous exécutez ce script, observez la différence entre les blocs bruts récupérés (que vous avez vus dans les étapes précédentes) et la réponse finale. Le LLM agit comme un synthétiseur : il lit les "blocs" de texte fragmentés fournis par le Reranker et les transforme en une phrase cohérente et lisible par un humain.
En enchaînant ces composants, vous passez d'une "devinette" stochastique à un workflow déterministe et ancré. Le récupérateur lance le filet, le reclasseur sélectionne la meilleure prise et le générateur prépare le repas.
11. Conclusion
Félicitations ! Vous avez réussi à créer un pipeline RAG avancé qui va bien au-delà de la simple recherche vectorielle.
Récapitulatif
- Vous avez configuré Cloud SQL avec pgvector pour un stockage vectoriel évolutif.
- Vous avez comparé les stratégies de segmentation pour comprendre l'impact de la préparation des données sur la récupération.
- Vous avez implémenté le reranking avec Vertex AI pour améliorer la précision de vos résultats.
- Vous avez utilisé les transformations de requêtes (HyDE, Step-back) pour aligner l'intention de l'utilisateur sur vos données.
En savoir plus
- Architectures de référence RAG : explorez une liste de guides d'architectures de référence liés à RAG.
Du prototype à la production
Cet atelier fait partie du parcours de formation "IA prête pour la production avec Google Cloud".
- Découvrez le programme complet pour passer du prototype à la production.
- Partagez votre progression avec le hashtag #ProductionReadyAI.