Vertex AI Vision Queue Detection 应用

1. 目标

概览

本 Codelab 将重点介绍如何端到端创建 Vertex AI Vision 应用,以使用零售视频片段监控队列大小。我们将使用预训练的专用模型入住人数分析内置功能来捕获以下内容:

  • 统计排队人数。
  • 统计在柜台接受服务的人数。

学习内容

  • 如何在 Vertex AI Vision 中创建应用并将其部署
  • 如何使用视频文件设置 RTSP 流,并使用 Jupyter 笔记本中的 vaictl 将流提取到 Vertex AI Vision。
  • 如何使用入住分析模型及其不同功能。
  • 如何在 Vertex AI Vision 的媒体仓库中搜索视频。
  • 如何将输出连接到 BigQuery,编写 SQL 查询以从模型的 JSON 输出中提取数据洞见,以及使用输出标记和注释原始视频。

费用

在 Google Cloud 上运行此实验的总费用约为 2 美元。

2. 准备工作

创建项目并启用 API:

  1. 在 Google Cloud 控制台的项目选择器页面上,选择或创建 Google Cloud 项目注意:如果您不打算保留在此过程中创建的资源,请创建新的项目,而不要选择现有的项目。完成上述步骤后,您可以删除所创建的项目,并移除与该项目关联的所有资源。前往“项目选择器”
  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能
  3. 启用 Compute Engine、Vertex API、Notebook API 和 Vision AI API。启用 API

创建服务账号

  1. 在 Google Cloud 控制台中,前往创建服务账号页面。前往“创建服务账号”页面
  2. 选择您的项目。
  3. 服务账号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务账号 ID 字段。在服务账号说明字段中,输入说明。例如,快速入门的服务账号。
  4. 点击创建并继续
  5. 如需提供对项目的访问权限,请向服务账号授予以下角色:
  • Vision AI > Vision AI Editor
  • Compute Engine > Compute Instance Admin(Beta 版)
  • BigQuery > BigQuery 管理员

选择角色列表中,选择一个角色。如需添加其他角色,请点击添加其他角色,然后添加其他各个角色。

  1. 点击继续
  2. 点击完成以完成服务账号的创建。不要关闭浏览器窗口。您将在下一步骤中用到它。

3. 设置 Jupyter 笔记本

在 Google 占用情况分析中创建应用之前,您必须注册一个流,以便应用稍后使用。

在本教程中,您将创建一个托管视频的 Jupyter 笔记本实例,并从笔记本发送流式视频数据。我们使用 Jupyter Notebook,因为它让我们能够灵活地在一个位置执行 shell 命令以及运行自定义预处理/后处理代码,非常适合快速进行实验。我们将使用此笔记本:

  1. rtsp 服务器作为后台进程运行
  2. vaictl 命令作为后台进程运行
  3. 运行查询和处理代码以分析入住人数分析输出

创建 Jupyter 笔记本

从 Jupyter 笔记本实例发送视频的第一步是使用我们在上一步中创建的服务账号创建笔记本。

  1. 在控制台中,前往 Vertex AI 页面。前往 Vertex AI Workbench
  2. 点击“用户管理的笔记本”

65b7112822858dce.png

  1. 点击新建笔记本 > Tensorflow Enterprise 2.6(带 LTS)> 不带 GPU

dc156f20b14651d7.png

  1. 输入 Jupyter Notebook 的名称。如需了解详情,请参阅资源命名惯例

b4dbc5fddc37e8d9.png

  1. 点击高级选项
  2. 向下滚动到权限部分
  3. 取消选中使用 Compute Engine 默认服务账号选项
  4. 添加在上一步中创建的服务账号电子邮件地址。然后点击创建

ec0b9ef00f0ef470.png

  1. 创建实例后,点击打开 JUPYTERLAB

4. 设置笔记本以流式传输视频

在 Google 占用情况分析中创建应用之前,您必须注册一个流,以便应用稍后使用。

在本教程中,我们将使用 Jupyter Notebook 实例托管视频,您将从笔记本终端发送该流式视频数据。

下载 vaictl 命令行工具

  1. 在打开的 Jupyterlab 实例中,从启动器中打开笔记本

a6d182923ae4ada3.png

  1. 在笔记本单元中使用以下命令下载 Vertex AI Vision (vaictl) 命令行工具、rtsp 服务器命令行工具、open-cv 工具:
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg

5. 提取要流式传输的视频文件

使用所需的命令行工具设置笔记本环境后,您可以复制示例视频文件,然后使用 vaictl 将视频数据流式传输到入住分析应用。

注册新的视频流

  1. 点击 Vertex AI Vision 左侧面板中的“数据流”标签页。
  2. 点击顶部的“注册”按钮 eba418e723916514.png
  3. 在“数据流名称”中输入 ‘queue-stream'
  4. 在“区域”中,选择在上一步中创建笔记本时选择的区域。
  5. 点击注册

将示例视频复制到虚拟机

  1. 在您的笔记本中,使用以下 wget 命令复制示例视频。
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4

从虚拟机流式传输视频并将数据提取到数据流中

  1. 如需将此本地视频文件发送到应用输入流,请在笔记本单元中使用以下命令。您必须进行以下变量替换:
  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • LOCATION:您的营业地点 ID。例如 us-central1。如需了解详情,请参阅 Cloud 位置
  • LOCAL_FILE:本地视频文件的文件名。例如 seq25_h264.mp4。
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
  1. 启动 rtsp-simple-server,在其中使用 rtsp 协议流式传输视频文件
import os
import time
import subprocess

subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. 使用 ffmpeg 命令行工具在 rtsp 流中循环播放视频
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. 使用 vaictl 命令行工具将视频从 rtsp 服务器 URI 流式传输到上一步中创建的 Vertex AI Vision 数据流“queue-stream”。
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)

启动 vaictl 提取操作到信息中心显示视频可能需要大约 100 秒时间。

流数据提取功能可用后,您可以选择队列串流,在 Vertex AI Vision 信息中心的数据流标签页中查看视频画面。

前往“数据流”标签页

1b7aac7d36552f29

6. 创建应用

第一步是创建可处理您的数据的应用。应用可以视为连接以下各项的自动化流水线:

  • 数据提取:将视频 Feed 提取到数据流中。
  • 数据分析:在提取数据后,可以添加 AI(计算机视觉)模型。
  • 数据存储:视频画面的两个版本(原始数据流和由 AI 模型处理的数据流)可以存储在媒体仓库中。

在 Google Cloud 控制台中,应用以图表的形式表示。

创建空应用

您必须先创建一个空应用,然后才能填充应用图。

在 Google Cloud 控制台中创建应用。

  1. 前往 Google Cloud 控制台
  2. 打开 Vertex AI Vision 信息中心的应用标签页。前往“应用”标签页
  3. 点击创建按钮。21ecba7a23e9979e.png
  4. 输入“queue-app'”作为应用名称,然后选择您的区域。
  5. 点击创建

添加应用组件节点

创建空应用后,您可以将三个节点添加到应用图中:

  1. 提取节点:用于提取您在笔记本中创建的 rtsp 视频服务器发送的数据的流式传输资源。
  2. 处理节点:用于处理提取数据的入住分析模型。
  3. 存储节点:用于存储已处理视频的媒体仓库,同时也是元数据存储空间。元数据存储区包含有关提取的视频数据的分析信息,以及 AI 模型推断的信息。

在控制台中向应用添加组件节点。

  1. 打开 Vertex AI Vision 信息中心的应用标签页。前往“应用”标签页

这会将您转到处理流水线的图表可视化结果。

添加数据提取节点

  1. 如需添加输入数据流节点,请选择侧边菜单的连接器部分中的数据流选项。
  2. 在打开的视频流菜单的来源部分中,选择添加视频流
  3. 添加数据流菜单中,选择 queue-stream
  4. 如需将数据流添加到应用图中,请点击添加数据流

添加数据处理节点

  1. 如需添加入住人数统计模型节点,请选择侧边菜单的专用模型部分中的入住人数分析选项。
  2. 保留默认选项人员。如果车辆已选中,请取消选中。

618b0c9dc671bae3.png

  1. 在“高级选项”部分,点击创建活动区域/线条 5b2f31235603e05d.png
  2. 使用多边形工具绘制活跃区域,以统计该区域内的人数。相应地为该区域添加标签

50281a723650491f.png

  1. 点击顶部的返回箭头。

2bf0ff4d029d29eb.png

  1. 点击复选框,添加用于检测拥塞情况的逗留时间设置。

c067fa256ca5bb96.png

添加数据存储节点

  1. 如需添加输出目标位置(存储)节点,请选择侧边菜单的连接器部分中的 Vision AI 仓库选项。
  2. 点击 Vertex AI Warehouse 连接器以打开其菜单,然后点击 Connect Warehouse
  3. 连接仓库菜单中,选择创建新仓库。将仓库命名为 queue-warehouse,并将 TTL 时长保留为 14 天。
  4. 点击创建按钮以添加仓库。

7. 将输出连接到 BigQuery 表

将 BigQuery 连接器添加到 Vertex AI Vision 应用后,所有关联的应用模型输出都将提取到目标表中。

您可以自行创建 BigQuery 表,并在向应用添加 BigQuery 连接器时指定该表,也可以让 Vertex AI Vision 应用平台自动创建该表。

自动创建表

如果您让 Vertex AI Vision 应用平台自动创建表,则可以在添加 BigQuery 连接器节点时指定此选项。

如果您想使用自动表创建功能,则需要满足以下数据集和表条件:

  • 数据集:自动创建的数据集名称为 visionai_dataset。
  • 表:自动创建的表名称为 visionai_dataset.APPLICATION_ID。
  • 错误处理:
  • 如果同一数据集下存在同名表,则不会自动创建。
  1. 打开 Vertex AI Vision 信息中心的应用标签页。前往“应用”标签页
  2. 从列表中选择应用名称旁边的查看应用
  3. 在应用构建器页面上,从“连接器”部分选择 BigQuery
  4. BigQuery 路径字段留空。

ee0b67d4ab2263d.png

  1. 存储元数据来源中,仅选择“入住情况分析”,然后取消选中数据流。

最终的应用图表应如下所示:

da0a1a049843572f.png

8. 部署应用以供使用

使用所有必要组件构建端到端应用后,使用该应用的最后一步是部署它。

  1. 打开 Vertex AI Vision 信息中心的应用标签页。前往“应用”标签页
  2. 在列表中,选择 queue-app 应用旁边的查看应用
  3. Studio 页面中,点击部署按钮。
  4. 在随即显示的确认对话框中,点击部署。部署操作可能需要几分钟才能完成。部署完成后,节点旁边会显示绿色对勾标记。dc514d9b9f35099d.png

9. 在存储仓库中搜索视频内容

将视频数据注入处理应用后,您可以查看经过分析的视频数据,并根据占用分析信息搜索数据。

  1. 打开 Vertex AI Vision 信息中心的 Warehouses 标签页。前往“仓库”标签页
  2. 在列表中找到队列仓库仓库,然后点击 View assets(查看资产)。
  3. 人数部分,将最小值设置为 1,将最大值设置为 5。
  4. 如需过滤存储在 Vertex AI Vision 媒体仓库中的已处理视频数据,请点击搜索

a0e5766262443d6c.png

Google Cloud 控制台中与搜索条件匹配的存储视频数据的视图。

10. 使用 BigQuery 表注释和分析输出

  1. 在笔记本中,在单元格中初始化以下变量。
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
  1. 现在,我们将使用以下代码从 rtsp 流中捕获帧:
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone

frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()

stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
  global frames
  while True:
    ret, frame = stream.read()
    frame_ts = datetime.now(timezone.utc).timestamp() * 1000
    if ret:
      with frame_buffer_lock:
        while len(frame_buffer) >= frame_buffer_size:
          _ = frame_buffer.popitem(last=False)
        frame_buffer[frame_ts] = frame

frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
  1. 从 BigQuery 表中提取数据时间戳和注释信息,并创建一个目录来存储捕获的帧图片:
from google.cloud import bigquery
import pandas as pd

client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""

bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
   os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
  1. 使用以下代码为帧添加注释:
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output

im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)

dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
  while True:
    try:
        annotations_df = client.query(f'''
          SELECT ingestion_time, annotation
          FROM `{bq_table}`
          WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
         ''').to_dataframe()
    except ValueError as e: 
        continue
    bq_max_ingest_ts = annotations_df['ingestion_time'].max()
    for _, row in annotations_df.iterrows():
      with frame_buffer_lock:
        frame_ts = np.asarray(list(frame_buffer.keys()))
        delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
        delta_tx_idx = delta_ts.argmin()
        closest_ts_delta = delta_ts[delta_tx_idx]
        closest_ts = frame_ts[delta_tx_idx]
        if closest_ts_delta > frame_buffer_error_milliseconds: continue
        image = frame_buffer[closest_ts]
      annotations = json.loads(row['annotation'])
      for box in annotations['identifiedBoxes']:
        image = cv2.rectangle(
          image,
          (
            int(box['normalizedBoundingBox']['xmin']*im_width),
            int(box['normalizedBoundingBox']['ymin']*im_height)
          ),
          (
            int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
            int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
          ),
          (255, 0, 0), 2
        )
      img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
      cv2.imwrite(img_filename, image)
      binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
      curr_framedata = {
        'path': img_filename,
        'timestamp_error': closest_ts_delta,
        'counts': {
          **{
            k['annotation']['displayName'] : cntext(k['counts'])
            for k in annotations['stats']["activeZoneCounts"]
          },
          'full-frame': cntext(annotations['stats']["fullFrameCount"])
        }
      }
      framedata[img_filename] = curr_framedata
      if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
        dashdelta = datetime.now()
        clear_output()
        display(HTML(f'''
          <h1>Queue Monitoring Application</h1>
          <p>Live Feed of the queue camera:</p>
          <p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
          <table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
            <caption>Current Model Outputs</caption>
            <thead>
              <tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
            </thead>
            <tbody>
              <tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
              <tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
              <tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
              <tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
            </tbody>
          </table>
          <p>&nbsp;</p>
        '''))
except KeyboardInterrupt:
  print('Stopping Live Monitoring')

9426ffe2376f0a7d.png

  1. 使用笔记本菜单栏中的 Stop 按钮停止注解任务

6c19cb00dcb28894.png

  1. 您可以使用以下代码重新访问各个帧:
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
    display(framedata[imgs[frame]])
    display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
    description='Frame #:',
    value=0,
    min=0, max=len(imgs)-1, step=1,
    layout=Layout(width='100%')))

78b63b546a4c883b

11. 恭喜

恭喜,您已完成此实验!

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

删除各个资源

资源

https://cloud.google.com/vision-ai/docs/overview

https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial

许可

调查问卷

您是如何使用本教程的?

仅阅读教程内容 阅读并完成练习

此 Codelab 对您有多实用?

非常有用 比较有用 没用

您觉得本 Codelab 的难易程度如何?

简单 中等 困难