Vertex AI Vision Queue Detection 应用

1. 目标

概览

此 Codelab 将重点介绍如何创建一个端到端的 Vertex AI Vision 应用,以便使用零售视频片段监控队列大小。我们将使用预训练的专业模型占用分析内置特征来捕获以下内容:

  • 统计排队人数。
  • 统计在柜台上菜的人数。

学习内容

  • 如何在 Vertex AI Vision 中创建和部署应用
  • 如何使用视频文件设置 RTSP 流,并从 Jupyter 笔记本中使用 vaictl 将流提取到 Vertex AI Vision。
  • 如何使用“占用分析”模型及其不同功能。
  • 如何在 Vertex AI Vision 的媒体仓库中搜索视频。
  • 如何将输出连接到 BigQuery,编写 SQL 查询以从模型的 json 输出中提取实用信息,并使用输出为原始视频添加标签和注释。

费用

在 Google Cloud 上运行此实验的总费用约为 $2。

2. 准备工作

创建项目并启用 API:

  1. 在 Google Cloud 控制台的项目选择器页面上,选择或创建 Google Cloud 项目注意:如果您不打算保留在此过程中创建的资源,请创建项目,而不是选择现有项目。完成上述步骤后,您可以删除所创建的项目,并移除与该项目关联的所有资源。前往项目选择器
  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能
  3. 启用 Compute Engine、Vertex API、Notebook API 和 Vision AI API。启用 API

创建服务账号

  1. 在 Google Cloud 控制台中,转到创建服务账号页面。转到“创建服务账号”
  2. 选择您的项目。
  3. 服务账号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填写服务账号 ID 字段。在服务账号说明字段中,输入说明。例如,快速入门的服务账号。
  4. 点击创建并继续
  5. 如需提供项目访问权限,请向您的服务账号授予以下角色:
  • Vision AI >Vision AI 编辑器
  • Compute Engine >Compute Instance Admin(Beta 版)
  • BigQuery >BigQuery 管理员 -

选择角色列表中,选择一个角色。要添加其他角色,请点击添加其他角色,然后添加其他角色。

  1. 点击继续
  2. 点击完成以完成服务账号的创建。不要关闭浏览器窗口。您将在下一步骤中用到它。

3. 设置 Jupyter 笔记本

在“占用分析”中创建应用之前,您必须先注册一个数据流以供应用使用。

在本教程中,您将创建一个托管视频的 Jupyter 笔记本实例,并从笔记本发送流式视频数据。我们使用的是 Jupyter Notebook,因为它让我们能够灵活地在一个位置执行 shell 命令以及运行自定义的预处理/后处理代码,这非常适合快速实验。我们将使用此笔记本执行以下操作:

  1. rtsp 服务器作为后台进程运行
  2. vaictl 命令作为后台进程运行
  3. 运行查询并处理代码,以分析占用分析输出

创建 Jupyter 笔记本

从 Jupyter 笔记本实例发送视频的第一步是使用我们在上一步中创建的服务账号创建笔记本。

  1. 在控制台中,前往 Vertex AI 页面。前往 Vertex AI Workbench
  2. 点击“用户管理的笔记本”

65b7112822858dce

  1. 点击新建笔记本 >TensorFlow 企业版 2.6(含 LTS)>不使用 GPU

dc156f20b14651d7.png

  1. 输入 Jupyter 笔记本的名称。如需了解详情,请参阅资源命名惯例

b4dbc5fddc37e8d9.png

  1. 点击高级选项
  2. 向下滚动到权限部分
  3. 取消选中使用 Compute Engine 默认服务账号选项
  4. 添加上一步中创建的服务账号电子邮件地址。然后点击创建

ec0b9ef00f0ef470.png

  1. 创建实例后,点击打开 JUPYTERLAB

4. 设置笔记本以流式传输视频

在“占用分析”中创建应用之前,您必须先注册一个数据流以供应用使用。

在本教程中,我们将使用 Jupyter 笔记本实例来托管视频,您可以从笔记本终端发送流式视频数据。

下载 vaictl 命令行工具

  1. 在打开的 Jupyterlab 实例中,从启动器中打开笔记本

a6d182923ae4ada3.png

  1. 在笔记本单元中使用以下命令下载 Vertex AI Vision (vaictl) 命令行工具、rtsp 服务器命令行工具、open-cv 工具:
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg

5. 提取视频文件以进行流式传输

使用所需的命令行工具设置笔记本环境后,您可以复制示例视频文件,然后使用 vaictl 将视频数据流式传输到占用分析应用。

注册新数据流

  1. 点击 Vertex AI Vision 左侧面板上的“流”标签页。
  2. 点击顶部的“注册”按钮 eba418e723916514.png
  3. 在“视频流”名称中,输入 'queue-stream'
  4. 在区域中,选择上一步中创建笔记本时选择的同一区域。
  5. 点击注册

将示例视频复制到虚拟机

  1. 在您的笔记本中,使用以下 wget 命令复制示例视频。
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4

从虚拟机流式传输视频并将数据注入您的数据流

  1. 如需将此本地视频文件发送到应用输入流,请在笔记本单元中使用以下命令。您必须进行以下变量替换:
  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • LOCATION:您的地理位置 ID。例如 us-central1。如需了解详情,请参阅 Cloud 网点
  • LOCAL_FILE:本地视频文件的文件名。例如 seq25_h264.mp4。
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
  1. 启动 rtsp-simple-server,我们在其中使用 rtsp 协议流式传输视频文件
import os
import time
import subprocess

subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. 使用 ffmpeg 命令行工具在 rtsp 流中循环播放视频
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. 使用 vaictl 命令行工具将视频从 rtsp 服务器 URI 流式传输到我们的 Vertex AI Vision 流“queue-stream”创建 Deployment 清单
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)

启动 vaictl 注入操作到信息中心显示视频可能需要大约 100 秒时间。

视频流提取可用后,您可以通过选择队列流视频流,在 Vertex AI Vision 信息中心的视频流标签页中查看视频 Feed。

前往“信息流”标签页

1b7aac7d36552f29

6. 创建应用

第一步是创建可处理您的数据的应用。应用可以视为连接以下各项的自动化流水线:

  • 数据提取:视频 Feed 被提取到数据流中。
  • 数据分析:您可以在注入后添加 AI(计算机视觉)模型。
  • 数据存储:两个版本的视频 Feed(原始视频流和 AI 模型处理的视频流)可以存储在媒体仓库中。

在 Google Cloud 控制台中,应用以图表的形式表示。

创建空应用

您必须先创建一个空应用,然后才能填充应用图表。

在 Google Cloud 控制台中创建应用。

  1. 前往 Google Cloud 控制台
  2. 打开 Vertex AI Vision 信息中心的应用标签页。前往“Applications”(应用)标签页
  3. 点击创建按钮。21ecba7a23e9979e
  4. 输入 queue-app 作为应用名称,然后选择您所在的区域。
  5. 点击创建

添加应用组件节点

创建空应用后,您可以将三个节点添加到应用图中:

  1. 提取节点:用于提取您在笔记本中创建的 rtsp 视频服务器所发送数据的流资源。
  2. 处理节点:对提取的数据执行操作的占用分析模型。
  3. 存储节点:用于存储已处理的视频并用作元数据存储空间的媒体仓库。元数据存储空间包括有关所提取视频数据的分析信息,以及 AI 模型推断出的信息。

在控制台中向应用添加组件节点。

  1. 打开 Vertex AI Vision 信息中心的应用标签页。前往“Applications”(应用)标签页

您将转到处理流水线的图表可视化。

添加数据注入节点

  1. 要添加输入流节点,请在侧边菜单的连接器部分中选择选项。
  2. 在打开的数据流菜单的来源部分,选择添加数据流
  3. 添加数据流菜单中,选择 queue-stream
  4. 如需将数据流添加到应用图,请点击添加数据流

添加数据处理节点

  1. 如需添加入住人数模型节点,请在侧边菜单的专用模型部分中选择入住人数分析选项。
  2. 保留默认选项人员。取消选中车辆(如果已选中)。

618b0c9dc671bae3.png

  1. 在“Advanced Options”(高级选项)部分,点击 Create Active Zones/Lines (创建活跃区域/线条)5b2f31235603e05d
  2. 使用多边形工具绘制活跃区域,以统计该区域中的人员数量。相应地为可用区添加标签

50281a723650491f

  1. 点击顶部的后退箭头。

2bf0ff4d029d29eb.png

  1. 点击复选框,添加用于检测拥塞的停留时间设置。

c067fa256ca5bb96.png

添加数据存储节点

  1. 要添加输出目标(存储)节点,请在侧边菜单的连接器部分中选择 VIsion AI Warehouse 选项。
  2. 点击 Vertex AI Warehouse 连接器以打开其菜单,然后点击 Connect Warehouse
  3. 关联仓库菜单中,选择创建新仓库。将仓库命名为 queue-warehouse,并将 TTL 期限保留为 14 天。
  4. 点击创建按钮以添加仓库。

7. 将输出连接到 BigQuery 表

将 BigQuery 连接器添加到 Vertex AI Vision 应用后,所有关联的应用模型输出都将提取到目标表中。

您可以自行创建 BigQuery 表,并在向应用添加 BigQuery 连接器时指定该表,也可以让 Vertex AI Vision 应用平台自动创建该表。

自动创建表

如果您让 Vertex AI Vision 应用平台自动创建表,则可以在添加 BigQuery 连接器节点时指定此选项。

如果您想使用自动创建表功能,则以下数据集和表条件适用:

  • 数据集:自动创建的数据集名称为 visionai_dataset。
  • 表:自动创建的表名称为 visionai_dataset.APPLICATION_ID。
  • 错误处理:
  • 如果同一数据集下已存在同名的表,系统不会自动创建该表。
  1. 打开 Vertex AI Vision 信息中心的应用标签页。前往“Applications”(应用)标签页
  2. 从列表中选择应用名称旁边的查看应用
  3. 在应用构建器页面上,从连接器部分选择 BigQuery
  4. BigQuery 路径字段留空。

ee0b67d4ab2263d.png

  1. 商店元数据来源:仅选择“占用分析”并取消选中数据流。

最终的应用图表应如下所示:

da0a1a049843572f.png

8. 部署应用以供使用

使用所有必要组件构建端到端应用后,使用应用的最后一步是部署应用。

  1. 打开 Vertex AI Vision 信息中心的应用标签页。前往“Applications”(应用)标签页
  2. 选择列表中的 queue-app 应用旁边的查看应用
  3. Studio 页面中,点击部署按钮。
  4. 在下面的确认对话框中,点击部署。部署操作可能需要几分钟才能完成。部署完成后,节点旁边会显示绿色对勾标记。dc514d9b9f35099d.png

9. 搜索存储仓库中的视频内容

将视频数据注入处理应用后,您可以查看经过分析的视频数据,并根据占用分析信息搜索数据。

  1. 打开 Vertex AI Vision 信息中心的 Warehouses 标签页。转到“仓库”标签页
  2. 在列表中找到队列仓库仓库,然后点击 View assets(查看资产)。
  3. 人数部分,将最小值设置为 1,将最大值设置为 5。
  4. 如需过滤存储在 Vertex AI Vision 的 Media Warehouse 中的已处理视频数据,请点击搜索

a0e5766262443d6c.png

查看 Google Cloud 控制台中与搜索条件匹配的存储视频数据。

10. 使用 BigQuery 表为输出添加注解和分析

  1. 在笔记本中,初始化单元中的以下变量。
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
  1. 现在,我们将使用以下代码从 rtsp 流中捕获帧:
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone

frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()

stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
  global frames
  while True:
    ret, frame = stream.read()
    frame_ts = datetime.now(timezone.utc).timestamp() * 1000
    if ret:
      with frame_buffer_lock:
        while len(frame_buffer) >= frame_buffer_size:
          _ = frame_buffer.popitem(last=False)
        frame_buffer[frame_ts] = frame

frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
  1. 从 BigQuery 表中拉取数据时间戳和注释信息,然后创建一个目录来存储捕获的帧图片:
from google.cloud import bigquery
import pandas as pd

client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""

bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
   os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
  1. 使用以下代码为帧添加注释:
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output

im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)

dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
  while True:
    try:
        annotations_df = client.query(f'''
          SELECT ingestion_time, annotation
          FROM `{bq_table}`
          WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
         ''').to_dataframe()
    except ValueError as e: 
        continue
    bq_max_ingest_ts = annotations_df['ingestion_time'].max()
    for _, row in annotations_df.iterrows():
      with frame_buffer_lock:
        frame_ts = np.asarray(list(frame_buffer.keys()))
        delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
        delta_tx_idx = delta_ts.argmin()
        closest_ts_delta = delta_ts[delta_tx_idx]
        closest_ts = frame_ts[delta_tx_idx]
        if closest_ts_delta > frame_buffer_error_milliseconds: continue
        image = frame_buffer[closest_ts]
      annotations = json.loads(row['annotation'])
      for box in annotations['identifiedBoxes']:
        image = cv2.rectangle(
          image,
          (
            int(box['normalizedBoundingBox']['xmin']*im_width),
            int(box['normalizedBoundingBox']['ymin']*im_height)
          ),
          (
            int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
            int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
          ),
          (255, 0, 0), 2
        )
      img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
      cv2.imwrite(img_filename, image)
      binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
      curr_framedata = {
        'path': img_filename,
        'timestamp_error': closest_ts_delta,
        'counts': {
          **{
            k['annotation']['displayName'] : cntext(k['counts'])
            for k in annotations['stats']["activeZoneCounts"]
          },
          'full-frame': cntext(annotations['stats']["fullFrameCount"])
        }
      }
      framedata[img_filename] = curr_framedata
      if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
        dashdelta = datetime.now()
        clear_output()
        display(HTML(f'''
          <h1>Queue Monitoring Application</h1>
          <p>Live Feed of the queue camera:</p>
          <p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
          <table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
            <caption>Current Model Outputs</caption>
            <thead>
              <tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
            </thead>
            <tbody>
              <tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
              <tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
              <tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
              <tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
            </tbody>
          </table>
          <p>&nbsp;</p>
        '''))
except KeyboardInterrupt:
  print('Stopping Live Monitoring')

9426ffe2376f0a7d

  1. 使用笔记本菜单栏中的 Stop 按钮停止注解任务

6c19cb00dcb28894

  1. 您可以使用以下代码重新访问各个帧:
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
    display(framedata[imgs[frame]])
    display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
    description='Frame #:',
    value=0,
    min=0, max=len(imgs)-1, step=1,
    layout=Layout(width='100%')))

78b63b546a4c883b

11. 恭喜

恭喜,您已完成本实验!

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

删除各个资源

资源

https://cloud.google.com/vision-ai/docs/overview

https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial

许可

调查问卷

您如何使用本教程?

仅通读 阅读并完成练习

此 Codelab 对你有多大用处?

非常有用 比较有用 没用

此 Codelab 的难易程度如何?

简单 中等