1. 目標
概要
この Codelab では、Vertex AI Vision アプリケーションをエンドツーエンドで作成し、販売店の動画映像を使用してキューサイズをモニタリングすることに重点を置きます。事前トレーニング済みの専門モデル「占有率分析」の組み込み機能を使用して、次の情報を取得します。
- キューにいる人の数をカウントします。
- カウンターで対応している人数を数えます。
学習内容
- Vertex AI Vision でアプリケーションを作成してデプロイする方法
- 動画ファイルを使用して RTSP ストリームを設定し、Jupyter ノートブックから vaictl を使用してストリームを Vertex AI Vision に取り込む方法。
- 利用人数の分析モデルとそのさまざまな機能の使用方法。
- ストレージ Vertex AI Vision の Media Warehouse で動画を検索する方法。
- 出力を BigQuery に接続する方法、モデルの json 出力から分析情報を抽出する SQL クエリを記述し、出力を使用して元の動画にラベルとアノテーションを付ける方法。
費用:
Google Cloud でこのラボを実行するための総費用は約 $2 です。
2. 始める前に
プロジェクトを作成して API を有効にします。
- Google Cloud コンソールの [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。注: この手順で作成するリソースをそのまま保持する予定でない場合、既存のプロジェクトを選択するのではなく、新しいプロジェクトを作成してください。チュートリアルの終了後にそのプロジェクトを削除すれば、プロジェクトに関連するすべてのリソースを削除できます。プロジェクト セレクタに移動
- Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
- Compute Engine、Vertex API、Notebook API、Vision AI API を有効にします。API を有効にする
サービス アカウントを作成します。
- Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。[サービス アカウントの作成] に移動
- プロジェクトを選択します。
- [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールは、この名前に基づいて [サービス アカウント ID] フィールドに入力します。[サービス アカウントの説明] フィールドに説明を入力します。(クイックスタート用のサービス アカウントなど)。
- [作成して続行] をクリックします。
- プロジェクトへのアクセス権を付与するには、サービス アカウントに次のロールを付与します。
- [Vision AI] > [Vision AI 編集者]
- [Compute Engine] > [Compute インスタンス管理者(ベータ版)]
- [BigQuery] > [BigQuery 管理者] に移動します。
[ロールを選択] リストでロールを選択します。ロールを追加するには、[別のロールを追加] をクリックして各ロールを追加します。
- [続行] をクリックします。
- [完了] をクリックして、サービス アカウントの作成を完了します。ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。
3. Jupyter Notebook を設定する
占有状況アナリティクスでアプリを作成する前に、アプリで後で使用できるストリームを登録する必要があります。
このチュートリアルでは、動画をホストする Jupyter ノートブック インスタンスを作成し、そのストリーミング動画データをノートブックから送信します。シェルコマンドの実行や、カスタムの前処理/後処理コードを 1 か所で実行できる柔軟性があるため、Jupyter Notebook を使用しています。これは、迅速なテストに非常に適しています。このノートブックでは、次の作業を行います。
- rtsp サーバーをバックグラウンド プロセスとして実行
- バックグラウンド プロセスとして vaictl コマンドを実行する
- クエリと処理コードを実行して、宿泊状況分析の出力を分析する
Jupyter ノートブックを作成する
Jupyter ノートブック インスタンスから動画を送信するための最初のステップは、前のステップで作成したサービス アカウントを使用してノートブックを作成することです。
- コンソールで、[Vertex AI] ページに移動します。Vertex AI Workbench に移動
- [ユーザー管理のノートブック] をクリックする
- [新しいノートブック] > [Tensorflow Enterprise 2.6 (with LTS)] > [Without GPUs] の順にクリックします。
- Jupyter Notebook の名前を入力します。詳細については、リソースの命名規則をご覧ください。
- [詳細オプション] をクリックします。
- [権限セクション] まで下にスクロールします。
- [Compute Engine のデフォルトのサービス アカウントを使用する] オプションのチェックを外します。
- 前の手順で作成したサービス アカウントのメールアドレスを追加します。[作成] をクリックします。
- インスタンスが作成されたら、[JUPYTERLAB を開く] をクリックします。
4. 動画をストリーミングするノートブックを設定する
占有状況アナリティクスでアプリを作成する前に、アプリで後で使用できるストリームを登録する必要があります。
このチュートリアルでは、Jupyter Notebook インスタンスを使用して動画をホストし、そのストリーミング動画データをノートブックのターミナルから送信します。
vaictl コマンドライン ツールをダウンロードする
- 開いた Jupyterlab インスタンスで、ランチャーから [Notebook] を開きます。
- ノートブック セルで次のコマンドを使用して、Vertex AI Vision(vaictl)コマンドライン ツール、rtsp サーバー コマンドライン ツール、open-cv ツールをダウンロードします。
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg
5. ストリーミング用に動画ファイルを取り込む
必要なコマンドライン ツールを使用してノートブック環境を設定したら、サンプル動画ファイルをコピーし、vaictl を使用して動画データを空室状況分析アプリにストリーミングできます。
新しいストリームを登録する
- Vertex AI Vision の左側のパネルで [ストリーム] タブをクリックします。
- 上部にある登録ボタン をクリックします。
- [ストリーム名] に「queue-stream」と入力します。
- [リージョン] で、前の手順でノートブックの作成時に選択したのと同じリージョンを選択します。
- [登録] をクリックします。
サンプル動画を VM にコピーする
- ノートブックで、次の wget コマンドを使用してサンプル動画をコピーします。
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4
VM から動画をストリーミングしてストリームにデータを取り込む
- このローカル動画ファイルをアプリの入力ストリームに送信するには、ノートブック セルで次のコマンドを使用します。次の変数を置き換える必要があります。
- PROJECT_ID: Google Cloud プロジェクト ID。
- LOCATION: ロケーション ID。(us-central1 など)。詳細については、クラウドのロケーションをご覧ください。
- LOCAL_FILE: ローカル動画ファイルのファイル名。例:
seq25_h264
.mp4。
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
- rtsp プロトコルで動画ファイルをストリーミングする rtsp-simple-server を起動します。
import os
import time
import subprocess
subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- ffmpeg コマンドライン ツールを使用して rtsp ストリームで動画をループする
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- vaictl コマンドライン ツールを使用して、rtsp サーバー URI から、前のステップで作成した Vertex AI Vision ストリーム「queue-stream」に動画をストリーミングします。
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)
vaictl 取り込みオペレーションの開始から動画がダッシュボードに表示されるまで、100 秒ほどかかることがあります。
ストリームの取り込みが利用可能になったら、キューストリームを選択して Vertex AI Vision ダッシュボードの [ストリーム] タブで動画フィードを表示できます。
6. アプリケーションを作成する
最初のステップは、データを処理するアプリを作成することです。アプリは、次を接続する自動化されたパイプラインと見なすことができます。
- データの取り込み: 動画フィードがストリームに取り込まれます。
- データ分析: 取り込み後に AI(コンピュータ ビジョン)モデルを追加できます。
- データ ストレージ: 2 つのバージョンの動画フィード(元のストリーミングと AI モデルによって処理されたストリーミング)をメディア ウェアハウスに保存できます。
Google Cloud コンソールでは、アプリはグラフとして表示されます。
空のアプリを作成する
アプリグラフにデータを入力するには、まず空のアプリを作成する必要があります。
Google Cloud コンソールでアプリを作成します。
- Google Cloud コンソールに移動します。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
- [Create] ボタンをクリックします。
- アプリ名として「queue-app」と入力し、リージョンを選択します。
- [作成] をクリックします。
アプリ コンポーネント ノードを追加する
空のアプリを作成したら、次の 3 つのノードをアプリグラフに追加できます。
- 取り込みノード: ノートブックで作成した rtsp 動画サーバーから送信されたデータを取り込むストリーム リソース。
- 処理ノード: 取り込まれたデータを処理する占有率分析モデル。
- ストレージ ノード: 処理済みの動画を保存し、メタデータ ストアとして機能するメディア ウェアハウス。メタデータ ストアには、取り込まれた動画データに関する分析情報と、AI モデルによって推測された情報が含まれます。
コンソールでコンポーネント ノードをアプリに追加します。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
処理パイプラインのグラフ ビジュアリゼーションが表示されます。
データ取り込みノードを追加する
- 入力ストリーム ノードを追加するには、サイドメニューの [コネクタ] セクションで [ストリーム] オプションを選択します。
- 表示された [ストリーム] メニューの [ソース] セクションで、[ストリームを追加] を選択します。
- [ストリームを追加] メニューで [queue-stream] を選択します。
- アプリグラフにストリームを追加するには、[ストリームを追加] をクリックします。
データ処理ノードを追加する
- 占有人数モデルノードを追加するには、サイドメニューの [特殊モデル] セクションで [占有人数分析] オプションを選択します。
- デフォルトの [ユーザー] のままにします。[車両] がすでに選択されている場合は、チェックボックスをオフにします。
- [詳細オプション] セクションで、[アクティブなゾーン/ラインを作成 ] をクリックします。
- ポリゴンツールを使用してアクティブ ゾーンを描画し、そのゾーンにいる人数を数えます。必要に応じてゾーンにラベルを付ける
- 上部の戻る矢印をクリックします。
- チェックボックスをクリックして、混雑を検知するための滞留時間の設定を追加します。
データ ストレージ ノードを追加する
- 出力先(ストレージ)ノードを追加するには、サイドメニューの [コネクタ] セクションで [VIsion AI ウェアハウス] オプションを選択します。
- [Vertex AI Warehouse] コネクタをクリックしてメニューを開き、[ウェアハウスを接続] をクリックします。
- [ウェアハウスを接続] メニューで、[新しいウェアハウスを作成] を選択します。ウェアハウスに queue-warehouse という名前を付け、TTL の期間は 14 日のままにします。
- [作成] ボタンをクリックしてウェアハウスを追加します。
7. 出力を BigQuery テーブルに接続する
Vertex AI Vision アプリに BigQuery コネクタを追加すると、接続されたアプリモデルの出力がすべてターゲット テーブルに取り込まれます。
独自の BigQuery テーブルを作成して、BigQuery コネクタをアプリに追加するときにそのテーブルを指定するか、Vertex AI Vision アプリ プラットフォームにテーブルを自動的に作成させることができます。
テーブルの自動作成
Vertex AI Vision アプリ プラットフォームでテーブルを自動的に作成する場合は、BigQuery コネクタ ノードを追加するときにこのオプションを指定できます。
テーブルの自動作成を使用する場合は、次のデータセットとテーブルの条件が適用されます。
- データセット: 自動作成されたデータセットの名前は visionai_dataset です。
- テーブル: 自動的に作成されるテーブル名は visionai_dataset.APPLICATION_ID です。
- エラー処理:
- 同じデータセットに同じ名前のテーブルが存在する場合、自動作成は行われません。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
- リストからアプリケーション名の横にある [アプリを表示] を選択します。
- アプリケーション ビルダー ページの [コネクタ] セクションで [BigQuery] を選択します。
- [BigQuery パス] フィールドは空白のままにします。
- [store metadata from:] で [occupancy Analytics] のみを選択し、ストリームのチェックボックスをオフにします。
最終的なアプリグラフは次のようになります。
8. アプリをデプロイして使用する
必要なコンポーネントをすべて含めてエンドツーエンドのアプリをビルドしたら、アプリの使用の最後のステップとしてアプリをデプロイします。
- Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
- リストの queue-app アプリの横にある [View app] を選択します。
- [Studio] ページで、[デプロイ] ボタンをクリックします。
- 次の確認ダイアログで [Deploy] をクリックします。デプロイ オペレーションが完了するまでに数分かかることがあります。デプロイが完了すると、ノードの横に緑色のチェックマークが表示されます。
9. ストレージ ウェアハウス内の動画コンテンツを検索する
動画データを処理アプリに取り込んだら、分析された動画データを表示し、占有率分析情報に基づいてデータを検索できます。
- Vertex AI Vision ダッシュボードの [倉庫] タブを開きます。[倉庫] タブに移動
- リストでキュー ウェアハウスを見つけて、[アセットを表示] をクリックします。
- [People count] セクションで、[Min] の値を 1 に、[Max] の値を 5 に設定します。
- Vertex AI Vision の Media Warehouse に保存されている処理済みの動画データをフィルタするには、[検索] をクリックします。
Google Cloud コンソールの検索条件に一致する動画データのビュー。
10. BigQuery テーブルを使用して出力にアノテーションを付け、分析する
- ノートブックで、セルに次の変数を初期化します。
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
- 次に、以下のコードを使用して、rtsp ストリームからフレームをキャプチャします。
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone
frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()
stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
global frames
while True:
ret, frame = stream.read()
frame_ts = datetime.now(timezone.utc).timestamp() * 1000
if ret:
with frame_buffer_lock:
while len(frame_buffer) >= frame_buffer_size:
_ = frame_buffer.popitem(last=False)
frame_buffer[frame_ts] = frame
frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
- BigQuery テーブルからデータのタイムスタンプとアノテーション情報を取得し、キャプチャしたフレーム画像を保存するディレクトリを作成します。
from google.cloud import bigquery
import pandas as pd
client = bigquery.Client(project=PROJECT_ID)
query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""
bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
- 次のコードを使用してフレームにアノテーションを付けます。
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output
im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)
dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
while True:
try:
annotations_df = client.query(f'''
SELECT ingestion_time, annotation
FROM `{bq_table}`
WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
''').to_dataframe()
except ValueError as e:
continue
bq_max_ingest_ts = annotations_df['ingestion_time'].max()
for _, row in annotations_df.iterrows():
with frame_buffer_lock:
frame_ts = np.asarray(list(frame_buffer.keys()))
delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
delta_tx_idx = delta_ts.argmin()
closest_ts_delta = delta_ts[delta_tx_idx]
closest_ts = frame_ts[delta_tx_idx]
if closest_ts_delta > frame_buffer_error_milliseconds: continue
image = frame_buffer[closest_ts]
annotations = json.loads(row['annotation'])
for box in annotations['identifiedBoxes']:
image = cv2.rectangle(
image,
(
int(box['normalizedBoundingBox']['xmin']*im_width),
int(box['normalizedBoundingBox']['ymin']*im_height)
),
(
int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
),
(255, 0, 0), 2
)
img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
cv2.imwrite(img_filename, image)
binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
curr_framedata = {
'path': img_filename,
'timestamp_error': closest_ts_delta,
'counts': {
**{
k['annotation']['displayName'] : cntext(k['counts'])
for k in annotations['stats']["activeZoneCounts"]
},
'full-frame': cntext(annotations['stats']["fullFrameCount"])
}
}
framedata[img_filename] = curr_framedata
if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
dashdelta = datetime.now()
clear_output()
display(HTML(f'''
<h1>Queue Monitoring Application</h1>
<p>Live Feed of the queue camera:</p>
<p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
<table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
<caption>Current Model Outputs</caption>
<thead>
<tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
</thead>
<tbody>
<tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
<tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
<tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
<tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
</tbody>
</table>
<p> </p>
'''))
except KeyboardInterrupt:
print('Stopping Live Monitoring')
- ノートブックのメニューバーにある [停止] ボタンを使用して、アノテーション タスクを停止します。
- 個々のフレームに戻るには、次のコードを使用します。
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
display(framedata[imgs[frame]])
display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
description='Frame #:',
value=0,
min=0, max=len(imgs)-1, step=1,
layout=Layout(width='100%')))
11. 完了
お疲れさまでした。ラボはこれで完了です。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトの削除
リソースを個別に削除する
リソース
https://cloud.google.com/vision-ai/docs/overview
https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial