Similarity Search with Spanner and Vertex AI

1. Introduction

Recent advances in deep learning have made it possible to represent text and other data in a way that captures semantic meaning. This has led to a new approach to search, called vector search, which uses vector representations of text (known as embeddings) to find documents that are most relevant to a user's query. Vector search is preferred over traditional search for applications such as apparel search, where users often search for items by their description, style or context rather than by exact product or brand names. We can integrate Cloud Spanner database with Vector Search to perform vector similarity-matching. By using Spanner and Vector Search together, customers can create a powerful integration that combines the availability, reliability and scale of Spanner and advanced similarity search capabilities of Vertex AI Vector Search. This search is performed by comparing embeddings of items in the Vector Search index and returning the most similar matches.

Use Case

Imagine you're a data scientist at a fashion retailer trying to keep up with rapidly changing trends, product searches and recommendations. The challenge is that you have limited resources and data silos. This blog post demonstrates how-to implement an apparel recommendation use case using similarity search approach on apparel data.The following steps are covered:

  1. Data sourced from Spanner
  2. Vectors generated for the apparel data using ML.PREDICT and stored in Spanner
  3. Spanner vector data integrated with Vector Search using dataflow and workflow jobs
  4. Vector Search performed to find similarity match for user entered input

We will build a demo web application to perform apparel search based on user input text. The application allows users to search for apparel by entering a text description.

Spanner to Vector Search Index:

The data for apparel search is stored in Spanner. We will invoke the Vertex AI Embeddings API in the ML.PREDICT construct directly from Spanner data. Then we will leverage the Dataflow and Workflow jobs that bulk uploads this data (inventory and embeddings) into the Vertex AI's Vector Search and refreshes the index.

Running User Queries on the index:

When a user enters an apparel description, the app generates the embeddings in real time using the Text Embeddings api. This is then sent as input to the Vector Search API to find 10 relevant product descriptions from the index and displays the corresponding image.

Architecture Overview

The architecture of the Spanner- Vector Search application is shown in the following 2-part diagram:

Spanner to Vector Search Index: a79932a25bee23a4.png

Client app to run user queries on the index:

b2b4d5a5715bd4c4.pngWhat you'll build

Spanner to Vector Index:

  • Spanner database to store and manage source data and the corresponding embeddings
  • A Workflow job that bulk uploads data (id and embeddings) into the Vertex AI Vector Search database.
  • A Vector Search API that is used to find relevant product descriptions from the index.

Running User Queries on the index:

  • A web application that allows users to enter text descriptions of apparel, performs similarity search using the deployed index endpoint and returns the nearest apparels to the input.

How it Works

When a user enters a text description of apparel, the web application sends the description to the Vector Search API. The Vector Search API then uses the embeddings of the apparel descriptions to find the most relevant product descriptions from the index. The product descriptions and corresponding images are then displayed to the user. The general workflow is as follows:

  1. Generate embeddings for data stored in Spanner.
  2. Export and upload embeddings into a Vector Search index.
  3. Query the Vector Search index for similar items by performing a nearest-neighbor search.

2. Requirements

  • A browser, such as Chrome or Firefox
  • A Google Cloud project with billing enabled

Before you begin

  1. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project
  2. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project
  3. Make sure all the necessary APIs (Cloud Spanner, Vertex AI, Google Cloud Storage) are enabled
  4. You will use Cloud Shell, a command-line environment running in Google Cloud that comes pre-loaded with gcloud. Refer documentation for gcloud commands and usage. If your project is not set, use the following command to set it:
gcloud config set project <YOUR_PROJECT_ID>
  1. Navigate to the Cloud Spanner page with your active Google Cloud project to get started

3. Backend: Create your Spanner data source and embeddings

In this use case, Spanner database houses the inventory of apparels with the corresponding images and description. Make sure you generate embeddings for the text description and store them in your Spanner database as ARRAY<float64>.

  1. Create the Spanner data

Create an instance named "spanner-vertex" and a database named "spanner-vertex-embeddings". Create a table using the DDL:

CREATE TABLE
  apparels ( id NUMERIC,
    category STRING(100),
    sub_category STRING(50),
    uri STRING(200),
    content STRING(2000),
    embedding ARRAY<FLOAT64>
    )
PRIMARY KEY
  (id);
  1. Insert data into the table using the INSERT SQL

Insert scripts for sample data are available here.

  1. Create Text Embeddings model

This is required so we can generate embeddings for the content in the input. Below is the DDL for the same:

CREATE MODEL text_embeddings INPUT(content STRING(MAX))
OUTPUT(
  embeddings
    STRUCT<
      statistics STRUCT<truncated BOOL, token_count FLOAT64>,
      values ARRAY<FLOAT64>>
)
REMOTE OPTIONS (
  endpoint = '//aiplatform.googleapis.com/projects/abis-345004/locations/us-central1/publishers/google/models/textembedding-gecko');
  1. Generate text embeddings for the source data

Create a table to store the embeddings and insert the embeddings generated. In a real world database application, the data load to Spanner up to step 2 would be transactional. For the purposes of keeping the design best practices intact, I prefer keeping the transactional tables normalized, so creating a separate table for embeddings.

CREATE TABLE apparels_embeddings (id string(100), embedding ARRAY<FLOAT64>)
PRIMARY KEY (id);

INSERT INTO apparels_embeddings(id, embeddings) 
SELECT CAST(id as string), embeddings.values
FROM ML.PREDICT(
  MODEL text_embeddings,
  (SELECT id, content from apparels)
) ;

Now that the bulk content and embeddings are ready, let us create a Vector Search Index and Endpoint to store the embeddings that will help perform the Vector Search.

4. Workflow Job: Spanner data export to Vector Search

  1. Create a Cloud Storage Bucket

This is required to store embeddings from Spanner in a GCS bucket in a json format that Vector Search expects as input. Create a bucket in the same region as your data in Spanner. Create a folder inside if required, but mainly create an empty file called empty.json in it.

  1. Set up Cloud Workflow

To set up a batch export from Spanner to a Vertex AI Vector Search index:

Create an empty index:

Make sure the Vector Search Index is in the same region as your Cloud Storage Bucket and the data. Follow the 11 steps of instruction under the console tab on the Create an index for batch update section in the manage indexes page. In the folder that is passed to contentsDeltaUri, create an empty file called empty.json because you would not be able to create an index without this file. This creates an empty index.

If you already have an index, you can skip this step. The workflow will overwrite your index.

Note: You cannot deploy an empty index to an endpoint. So we are deferring the step of deploying it to an endpoint to a later step, after exporting the vector data to Cloud Storage.

Clone this git repository: There are multiple ways to clone a git repository, one way is to run the following command using the GitHub CLI. Run the below 2 commands from the Cloud Shell Terminal:

gh repo clone cloudspannerecosystem/spanner-ai

cd spanner-ai/vertex-vector-search/workflows

This folder contains two files

  • batch-export.yaml: This is the workflow definition.
  • sample-batch-input.json: This is a sample of the workflow input parameters.

Setup input.json from the sample file: First, copy the sample json.

cp sample-batch-input.json input.json

Then edit input.json with details for your project. In this case, your json should like this:

{
  "project_id": "<<YOUR_PROJECT>>",
  "location": "<<us-central1>>",
  "dataflow": {
    "temp_location": "gs://<<YOUR_BUCKET>>/<<FOLDER_IF_ANY>>/workflow_temp"
  },
  "gcs": {
    "output_folder": "gs://<<YOUR_BUCKET>>/<<FOLDER_IF_ANY>>/workflow_output"
  },
  "spanner": {
    "instance_id": "spanner-vertex",
    "database_id": "spanner-vertex-embeddings",
    "table_name": "apparels_embeddings",
    "columns_to_export": "embedding,id"
  },
  "vertex": {
    "vector_search_index_id": "<<YOUR_INDEX_ID>>"
  }
}

Setup Permissions

For production environments, we strongly recommend creating a new service account and granting it one or more IAM roles that contain the minimum permissions required for managing service. The following roles are needed to setup the workflow to export data from Spanner (embeddings) to the Vector Search index:

Cloud Workflow Service Account:

By default it uses the Compute Engine default service account.

If you use a manually configured service account, you must include the following roles:

To trigger a dataflow job: Dataflow Admin, Dataflow Worker.

To impersonate a dataflow worker service account: Service Account User.

To write Logs: Logs Writer.

To trigger Vertex AI Vector Search rebuild: Vertex AI User.

Dataflow Worker Service Account:

If you use a manually configured service account, you must include the following roles:

To manage dataflow: Dataflow Admin, Dataflow Worker. To read data from Spanner: Cloud Spanner Database Reader. Write access over selected GCS Container Registry: GCS Storage Bucket Owner.

  1. Deploy the Cloud Workflow

Deploy the workflow yaml file to your Google Cloud project. You can configure the region or location where the workflow will be run when executed.

gcloud workflows deploy vector-export-workflow --source=batch-export.yaml --location="us-central1" [--service account=<service_account>]

or 

gcloud workflows deploy vector-export-workflow --source=batch-export.yaml --location="us-central1"

The workflow should now be visible on the Workflows page in the Google Cloud console.

Note: You can also create and deploy the workflow from the Google Cloud console. Follow the prompts in the Cloud console. For the workflow definition, copy and paste the contents of batch-export.yaml.

Once this is complete, execute the workflow so the data export begins.

  1. Execute the Cloud Workflow

Run the following command to execute the workflow:

gcloud workflows execute vector-export-workflow --data="$(cat input.json)"

The execution should show up in the Executions tab in Workflows. This should load your data into the Vector Search database and index it.

Note: You can also execute from the console using the Execute button. Follow the prompts and for the input, copy and paste the contents of your customized input.json.

5. Deploy Vector Search Index

Deploy the index to an endpoint

You can follow the steps below to deploy the index:

  1. On the Vector Search indexes page, you should see a DEPLOY button next to the index you just created in step 2 of the previous section. Alternatively you can navigate to the index info page and click DEPLOY TO ENDPOINT button.
  2. Provide the necessary information and deploy the index to an endpoint.

Alternatively, you can look at this notebook to deploy it to an endpoint (skip to the deploy part of the notebook). Once deployed, take a note of the deployed index id and endpoint url.

6. Frontend: User data to Vector Search

Let's build a simple Python application with a gradio powered UX to quickly test our implementation: You can refer to the implementation here to implement this demo app in your own colab notebook.

  1. We will use the aiplatform python sdk for calling Embeddings API and also to invoke Vector Search index endpoint.
# [START aiplatform_sdk_embedding]
!pip install google-cloud-aiplatform==1.35.0 --upgrade --quiet --user


import vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")


from vertexai.language_models import TextEmbeddingModel


import sys
if "google.colab" in sys.modules:
    # Define project information
    PROJECT_ID = " "  # Your project id
    LOCATION = " "  # Your location 


    # Authenticate user to Google Cloud
    from google.colab import auth
    auth.authenticate_user()
  1. We will use gradio to demo the AI application we are building quick and easy with a user interface. Restart runtime before you implement this step.
!pip install gradio
import gradio as gr
  1. From the web app upon user input, invoke Embeddings API, we will use the text embedding model: textembedding-gecko@latest

The method below invokes the Text Embedding Model and returns the vector embeddings for the text entered by the user:

def text_embedding(content) -> list:
    """Text embedding with a Large Language Model."""
    model = TextEmbeddingModel.from_pretrained("textembedding-gecko@latest")
    embeddings = model.get_embeddings(content)
    for embedding in embeddings:
        vector = embedding.values
        #print(f"Length of Embedding Vector: {len(vector)}")
    return vector

Test it

text_embedding("red shorts for girls")

You should see an output similar to the below (please note that the image is cropped in height so you are not able to see the entire vector response):

5d8355ec04dac1f9.png

  1. Declare the deployed index id and the endpoint id
from google.cloud import aiplatform
DEPLOYED_INDEX_ID = "spanner_vector1_1702366982123"
#Vector Search Endpoint
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/273845608377/locations/us-central1/indexEndpoints/2021628049526620160')
  1. Define the Vector Search method to call the index endpoint and show the result with the 10 nearest matches for the embedding response corresponding to the user input text.

In the below method definition for Vector Search, note that the find_neighbors method is invoked to identify the 10 nearest vectors.

def vector_search(content) -> list:
  result = text_embedding(content)
  #call_vector_search_api(content)
  index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/273845608377/locations/us-central1/indexEndpoints/2021628049526620160')
  # run query
  response = index_endpoint.find_neighbors(
      deployed_index_id = DEPLOYED_INDEX_ID,
      queries = [result],
      num_neighbors = 10
  )
  out = []
  # show the results
  for idx, neighbor in enumerate(response[0]):
      print(f"{neighbor.distance:.2f} {spanner_read_data(neighbor.id)}")
      out.append(f"{spanner_read_data(neighbor.id)}")
  return out

You will also notice the call out to the method spanner_read_data. Let's look at it in the next step.

  1. Define the Spanner read data method implementation that invokes the execute_sql method to extract the images corresponding to the ids of nearest neighbor vectors returned from the last step.
!pip install google-cloud-spanner==3.36.0


from google.cloud import spanner


instance_id = "spanner-vertex"
database_id = "spanner-vertex-embeddings"
projectId = PROJECT_ID
client = spanner.Client()
client.project = projectId
instance = client.instance(instance_id)
database = instance.database(database_id)
def spanner_read_data(id):
    query = "SELECT uri FROM apparels where id = " + id
    outputs = []
    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(query)


        for row in results:
            #print(row)
            #output = "ID: {}, CONTENT: {}, URI: {}".format(*row)
            output = "{}".format(*row)
            outputs.append(output)


    return "\n".join(outputs)

It should return the URLs of the images corresponding to the chosen vectors.

  1. Finally let's put the pieces together in a user interface and trigger the Vector Search process
from PIL import Image


def call_search(query):
  response = vector_search(query)
  return response


input_text = gr.Textbox(label="Enter your query. Examples: Girls Tops White Casual, Green t-shirt girls, jeans shorts, denim skirt etc.")
output_texts = [gr.Image(label="") for i in range(10)]
demo = gr.Interface(fn=call_search, inputs=input_text, outputs=output_texts, live=True)
resp = demo.launch(share = True)

You should see the result as shown below:

8093b39fbab1a9cc.png

Image: Link

View the result video: here.

7. Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this post, follow these steps:

  1. In the Google Cloud console, go to the Manage resources page.
  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.
  4. If you don't want to delete the project, delete the Spanner instance by navigating to the instance you just created for this project and click the DELETE INSTANCE button on the top right corner of the instance overview page.
  5. You can also navigate to the Vector Search index, undeploy the endpoint and index, and delete the index.

8. Conclusion

Congratulations! You have successfully completed the Spanner - Vertex Vector Search implementation by

  1. Creating Spanner data source and embeddings for applications that are sourced from Spanner database.
  2. Creating Vector Search database index.
  3. Integrating vector data from Spanner to Vector Search using Dataflow and Workflow jobs.
  4. Deploying index to an endpoint.
  5. Finally invoking Vector Search on user input in a Python-powered implementation of Vertex AI sdk.

Feel free to extend the implementation to your own use case or improvise the current use case with new features. Learn more about the machine learning capabilities of Spanner here.