Vertex AI Vision Queue Detection App

1. Objectives

Overview

This codelab will focus on creating a Vertex AI Vision application end-to-end to monitor queue size using retail video footage. We will use the pretrained Specialized model Occupancy analytics inbuilt features to capture the following things:

  • Count the number of people standing in the queue.
  • Count the number of people getting served at the counter.

What you'll learn

  • How to create an application in Vertex AI Vision and deploy it
  • How to set up an RTSP stream using a video file and ingest the stream into Vertex AI Vision using vaictl from a Jupyter Notebook.
  • How to use the Occupancy Analytics model and its different features.
  • How to search for videos in your storage Vertex AI Vision's Media Warehouse.
  • How to connect output to BigQuery, write SQL query to extract insights from the model's json output and use the output to label and annotate the original video.

Cost:

The total cost to run this lab on Google Cloud is about $2.

2. Before You Begin

Create a project and enable APIs:

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project. Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project. Go to project selector
  2. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
  3. Enable the Compute Engine, Vertex API, Notebook API and Vision AI API. Enable the APIs

Create a service account:

  1. In the Google Cloud console, go to the Create service account page. Go to Create service account
  2. Select your project.
  3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name. In the Service account description field, enter a description. For example, Service account for quickstart.
  4. Click Create and continue.
  5. To provide access to your project, grant the following role(s) to your service account:
  • Vision AI > Vision AI Editor
  • Compute Engine > Compute Instance Admin (beta)
  • BigQuery > BigQuery Admin .

In the Select a role list, select a role. For additional roles, click Add another role and add each additional role.

  1. Click Continue.
  2. Click Done to finish creating the service account. Do not close your browser window. You will use it in the next step.

3. Set up Jupyter Notebook

Before creating an App in Occupancy Analytics, you must register a stream which can be used later by the App.

In this tutorial you create a Jupyter Notebook instance that hosts a video, and you send that streaming video data from the notebook. We are using jupyter notebook as it offers us flexibility to do execute shell commands as well as run custom pre/post processing code in a single place that is very good for rapid experimentation. We wiil use this notebook to:

  1. Run rtsp server as a background process
  2. Run vaictl command as background process
  3. Run queries and processing code to analyze occupancy analytics output

Create a Jupyter Notebook

The first step in sending video from a Jupyter Notebook instance is creating the notebook with our service account created in the previous step.

  1. In the console, go to the Vertex AI page. Go to Vertex AI Workbench
  2. Click User-Managed Notebooks

65b7112822858dce.png

  1. Click on New Notebook > Tensorflow Enterprise 2.6 (with LTS) > Without GPUs

dc156f20b14651d7.png

  1. Enter the name for jupyter notebook. For more information, see Resource naming convention.

b4dbc5fddc37e8d9.png

  1. Click on ADVANCED OPTIONS
  2. Scroll down to Permissions Sections
  3. Uncheck Use Compute Engine default service account option
  4. Add the Service account email created in the previous step. And click on Create.

ec0b9ef00f0ef470.png

  1. Once the instance has been created click on OPEN JUPYTERLAB.

4. Set up a Notebook to stream video

Before creating an App in Occupancy Analytics, you must register a stream which can be used later by the App.

In this tutorial we will use our Jupyter Notebook instance to host a video, and you send that streaming video data from the Notebook terminal.

Download the vaictl command-line tool

  1. In the opened Jupyterlab instance, Open a Notebook from the launcher.

a6d182923ae4ada3.png

  1. Download the Vertex AI Vision (vaictl) command line tool, rtsp server command line tool, open-cv tool using the following command in the notebook cell:
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg

5. Ingest a video file for streaming

After you set up your notebook environment with required command line tools, you can copy a sample video file and then use vaictl to stream the video data to your occupancy analytics app.

Register a new stream

  1. Click streams tab on the left panel of Vertex AI Vision.
  2. Click on Register Button at the top eba418e723916514.png
  3. In the Stream name enter ‘queue-stream'
  4. In region, choose the same region selected during Notebook creation in the previous step.
  5. Click Register

Copy a sample video to your VM

  1. In your notebook, copy a sample video with the following wget command.
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4

Stream video from VM and ingest data into your stream

  1. To send this local video file to the app input stream, use the following command in your notebook cell. You must make the following variable substitutions:
  • PROJECT_ID: Your Google Cloud project ID.
  • LOCATION: Your location ID. For example, us-central1. For more information, see Cloud locations.
  • LOCAL_FILE: The filename of a local video file. For example, seq25_h264.mp4.
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
  1. Start a rtsp-simple-server where we stream the video file with rtsp protocol
import os
import time
import subprocess

subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. Use ffmpeg command line tool to loop the video in rtsp stream
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. Use the vaictl command line tool to stream the video from rtsp server uri to our Vertex AI Vision stream ‘queue-stream' created in previous step.
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)

It might take ~100 seconds between starting the vaictl ingest operation and the video appearing in the dashboard.

After the stream ingestion is available, you can see the video feed in the Streams tab of the Vertex AI Vision dashboard by selecting the queue-stream stream.

Go to the Streams tab

1b7aac7d36552f29.png

6. Create an application

The first step is to create an app that processes your data. An app can be thought of as an automated pipeline that connects the following:

  • Data ingestion: A video feed is ingested into a stream.
  • Data analysis: An AI(Computer Vision) model can be added after the ingestion.
  • Data storage: The two versions of the video feed (the original stream and the stream processed by the AI model) can be stored in a media warehouse.

In the Google Cloud console an app is represented as a graph.

Create an empty app

Before you can populate the app graph, you must first create an empty app.

Create an app in the Google Cloud console.

  1. Go to Google Cloud console.
  2. Open the Applications tab of the Vertex AI Vision dashboard. Go to the Applications tab
  3. Click the Create button. 21ecba7a23e9979e.png
  4. Enter ‘queue-app' as the app name and choose your region.
  5. Click Create.

Add app component nodes

After you have created the empty application, you can then add the three nodes to the app graph:

  1. Ingestion node: The stream resource that ingests data sent from a rtsp video server you created in notebook.
  2. Processing node: The occupancy analytics model that acts on ingested data.
  3. Storage node: The media warehouse that stores processed videos, and serves as a metadata store. The metadata stores include analytics information about ingested video data, and inferred information by the AI models.

Add component nodes to your app in the console.

  1. Open the Applications tab of the Vertex AI Vision dashboard. Go to the Applications tab

This takes you to the graph visualization of the processing pipeline.

Add a data ingestion node

  1. To add an input stream node, select the Streams option in the Connectors section of the side menu.
  2. In the Source section of the Stream menu that opens, select Add streams.
  3. In the Add streams menu, choose queue-stream.
  4. To add the stream to the app graph, click Add streams.

Add a data processing node

  1. To add the occupancy count model node, select the occupancy analytics option in the Specialized models section of the side menu.
  2. Leave the default selections People. Uncheck Vehicles if it is already selected.

618b0c9dc671bae3.png

  1. In the Advanced Options section, Click on Create Active Zones/Lines 5b2f31235603e05d.png
  2. Draw the active zones using the Polygon tool to count people in that zone. Label the zone accordingly

50281a723650491f.png

  1. Click on Back Arrow at top.

2bf0ff4d029d29eb.png

  1. Add settings for dwell time to detect congestion by clicking on Checkbox.

c067fa256ca5bb96.png

Add a data storage node

  1. To add the output destination (storage) node, select the VIsion AI Warehouse option in the Connectors section of the side menu.
  2. Click on Vertex AI Warehouse Connector to open its menu, click Connect warehouse.
  3. In the Connect warehouse menu, select Create new warehouse. Name the warehouse queue-warehouse, and leave the TTL duration at 14 days.
  4. Click the Create button to add the warehouse.

7. Connect Output to BigQuery Table

When you add a BigQuery connector to your Vertex AI Vision app all the connected app model outputs will be ingested to the target table.

You can either create your own BigQuery table and specify that table when you add a BigQuery connector to the app, or let the Vertex AI Vision app platform automatically create the table.

Automatic table creation

If you let Vertex AI Vision app platform automatically create the table, you can specify this option when you add the BigQuery connector node.

The following dataset and table conditions apply if you want to use automatic table creation:

  • Dataset: The automatically created dataset name is visionai_dataset.
  • Table: The automatically created table name is visionai_dataset.APPLICATION_ID.
  • Error handling:
  • If the table with the same name under the same dataset exists, no automatic creation happens.
  1. Open the Applications tab of the Vertex AI Vision dashboard. Go to the Applications tab
  2. Select View app next to the name of your application from the list.
  3. On the application builder page select BigQuery from the Connectors section.
  4. Leave the BigQuery path field empty.

ee0b67d4ab2263d.png

  1. In the store metadata from: select only ‘occupancy Analytics' and uncheck streams.

The final app graph should look like this:

da0a1a049843572f.png

8. Deploy your app for use

After you have built your end-to-end app with all the necessary components, the last step to using the app is to deploy it.

  1. Open the Applications tab of the Vertex AI Vision dashboard. Go to the Applications tab
  2. Select View app next to the queue-app app in the list.
  3. From the Studio page, click the Deploy button.
  4. In the following confirmation dialog, click Deploy. The deploy operation might take several minutes to complete. After deployment finishes, green check marks appear next to the nodes. dc514d9b9f35099d.png

9. Search video content in the storage warehouse

After you ingest video data into your processing app, you can view analyzed video data, and search the data based on occupancy analytics information.

  1. Open the Warehouses tab of the Vertex AI Vision dashboard. Go to the Warehouses tab
  2. Find the queue-warehouse warehouse in the list, and click View assets.
  3. In the People count section, set the Min value to 1, and the Max value to 5.
  4. To filter processed video data stored in Vertex AI Vision's Media Warehouse, click Search.

a0e5766262443d6c.png

A view of stored video data that matches search criteria in the Google Cloud console.

10. Annotate and Analyze Output using BigQuery Table

  1. In the Notebook, initialize the following variables in the cell.
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
  1. Now we will capture the frames from the rtsp stream using the following code:
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone

frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()

stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
  global frames
  while True:
    ret, frame = stream.read()
    frame_ts = datetime.now(timezone.utc).timestamp() * 1000
    if ret:
      with frame_buffer_lock:
        while len(frame_buffer) >= frame_buffer_size:
          _ = frame_buffer.popitem(last=False)
        frame_buffer[frame_ts] = frame

frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
  1. Pull the data timestamp and annotation information from the bigquery table and create a directory to store the captured frame images:
from google.cloud import bigquery
import pandas as pd

client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""

bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
   os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
  1. Annotate the frames using the following code:
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output

im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)

dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
  while True:
    try:
        annotations_df = client.query(f'''
          SELECT ingestion_time, annotation
          FROM `{bq_table}`
          WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
         ''').to_dataframe()
    except ValueError as e: 
        continue
    bq_max_ingest_ts = annotations_df['ingestion_time'].max()
    for _, row in annotations_df.iterrows():
      with frame_buffer_lock:
        frame_ts = np.asarray(list(frame_buffer.keys()))
        delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
        delta_tx_idx = delta_ts.argmin()
        closest_ts_delta = delta_ts[delta_tx_idx]
        closest_ts = frame_ts[delta_tx_idx]
        if closest_ts_delta > frame_buffer_error_milliseconds: continue
        image = frame_buffer[closest_ts]
      annotations = json.loads(row['annotation'])
      for box in annotations['identifiedBoxes']:
        image = cv2.rectangle(
          image,
          (
            int(box['normalizedBoundingBox']['xmin']*im_width),
            int(box['normalizedBoundingBox']['ymin']*im_height)
          ),
          (
            int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
            int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
          ),
          (255, 0, 0), 2
        )
      img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
      cv2.imwrite(img_filename, image)
      binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
      curr_framedata = {
        'path': img_filename,
        'timestamp_error': closest_ts_delta,
        'counts': {
          **{
            k['annotation']['displayName'] : cntext(k['counts'])
            for k in annotations['stats']["activeZoneCounts"]
          },
          'full-frame': cntext(annotations['stats']["fullFrameCount"])
        }
      }
      framedata[img_filename] = curr_framedata
      if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
        dashdelta = datetime.now()
        clear_output()
        display(HTML(f'''
          <h1>Queue Monitoring Application</h1>
          <p>Live Feed of the queue camera:</p>
          <p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
          <table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
            <caption>Current Model Outputs</caption>
            <thead>
              <tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
            </thead>
            <tbody>
              <tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
              <tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
              <tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
              <tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
            </tbody>
          </table>
          <p>&nbsp;</p>
        '''))
except KeyboardInterrupt:
  print('Stopping Live Monitoring')

9426ffe2376f0a7d.png

  1. Stop the annotation task using the Stop button in notebook menu bar

6c19cb00dcb28894.png

  1. You can revisit individual frames using the following code:
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
    display(framedata[imgs[frame]])
    display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
    description='Frame #:',
    value=0,
    min=0, max=len(imgs)-1, step=1,
    layout=Layout(width='100%')))

78b63b546a4c883b.png

11. Congratulations

Congratulations, you finished the lab!

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

Delete individual resources

Resources

https://cloud.google.com/vision-ai/docs/overview

https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial

License

Survey

How did you use this tutorial?

Read it through only Read it and completed the exercises

How useful was this codelab?

Very useful Moderately useful Not useful

How easy was this codelab to follow?

Easy Moderate Difficult