1. Cele
Omówienie
W tym laboratorium programistycznym zajmiemy się tworzeniem kompleksowej aplikacji Vertex AI Vision do monitorowania rozmiaru kolejki za pomocą filmów z obrazu z kamery w sklepie detalicznym. Do rejestrowania tych informacji użyjemy wbudowanych funkcji analizy zajętości w wytrenowanym modelu specjalistycznym:
- Oblicz liczbę osób stojących w kole.
- Oblicz liczbę osób obsługiwanych przy ladzie.
Czego się nauczysz
- Jak utworzyć aplikację w Vertex AI Vision i ją wdrożyć
- Jak skonfigurować strumień RTSP za pomocą pliku wideo i przetworzyć strumień w Vertex AI Vision za pomocą vaictl w notatniku Jupyter.
- Jak korzystać z modelu analityki zajętości i jego różnych funkcji.
- Jak wyszukiwać filmy w hurtowni multimediów Vertex AI Vision.
- Jak połączyć dane wyjściowe z BigQuery, napisz zapytanie SQL w celu wyodrębnienia obserwacji z danych wyjściowych modelu JSON oraz użyj tych danych do oznaczenia oryginalnego filmu i adnotacji.
Koszt:
Łączny koszt przeprowadzenia tego laboratorium w Google Cloud wynosi około 2 USD.
2. Zanim zaczniesz
Utwórz projekt i włącz interfejsy API:
- W konsoli Google Cloud na stronie selektora projektu wybierz lub utwórz projekt Google Cloud. Uwaga: jeśli zasoby utworzone podczas wykonywania tej procedury nie będą Ci później potrzebne, nie wybieraj istniejącego projektu, tylko utwórz nowy. Po jej zakończeniu możesz usunąć projekt wraz ze wszystkimi zasobami, które są z nim powiązane. Otwórz selektor projektów
- Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.
- Włącz interfejsy Compute Engine, Vertex API, Notebook API i Vision AI API. Włącz interfejsy API
Tworzenie konta usługi:
- W konsoli Google Cloud otwórz stronę Utwórz konto usługi. Otwórz stronę Utwórz konto usługi
- Wybierz projekt.
- W polu Nazwa konta usługi wpisz nazwę. Na podstawie tej nazwy konsola Google Cloud wypełni pole Identyfikator konta usługi. W polu Opis konta usługi wpisz opis. Przykład: Konto usługi do krótkiego wprowadzenia.
- Kliknij Utwórz i kontynuuj.
- Aby przyznać dostęp do projektu, przypisz do konta usługi te role:
- Vision AI > Edytujący Vision AI
- Compute Engine > Administrator instancji Compute (beta)
- BigQuery > Administrator BigQuery.
Na liście Wybierz rolę wybierz rolę. Aby dodać kolejne role, kliknij Dodaj kolejną rolę i dodaj każdą z nich.
- Kliknij Dalej.
- Aby zakończyć tworzenie konta usługi, kliknij Gotowe. Nie zamykaj okna przeglądarki. Użyjesz go w następnym kroku.
3. Konfigurowanie notatnika Jupyter
Zanim utworzysz aplikację w ramach analizy zajętości, musisz zarejestrować strumień, którego aplikacja może później używać.
W tym samouczku utworzysz instancję Jupyter Notebook, która będzie hostować film, i wyślesz dane strumieniowego wideo z notatnika. Korzystamy z notebooka Jupyter, ponieważ daje on nam elastyczność w wykonywaniu poleceń w powłoce oraz uruchamianiu niestandardowego kodu przetwarzania wstępnego i następczego w jednym miejscu, co jest bardzo przydatne podczas szybkich eksperymentów. Będziemy używać tego notatnika do:
- Uruchamianie serwera rtsp jako procesu w tle
- Uruchom polecenie vaictl jako proces w tle
- Wykonywanie zapytań i kodu przetwarzania w celu analizowania danych wyjściowych z analizy zajętości
Tworzenie notatnika Jupyter
Pierwszym krokiem w wysyłaniu filmu z instancji Jupyter Notebook jest utworzenie notatnika za pomocą konta usługi utworzonego w poprzednim kroku.
- W konsoli otwórz stronę Vertex AI. Otwórz Vertex AI Workbench
- Kliknij Notatniki zarządzane przez użytkownika.
- Kliknij Nowy notatnik > Tensorflow Enterprise 2.6 (z LTS) > Bez GPU
- Wpisz nazwę notatnika Jupyter. Więcej informacji znajdziesz w artykule Konwencja nazewnictwa zasobów.
- Kliknij OPCJE ZAAWANSOWANE.
- Przewiń w dół do sekcji Uprawnienia.
- Odznacz opcję Użyj domyślnego konta usługi Compute Engine.
- Dodaj adres e-mail konta usługi utworzony w poprzednim kroku. Następnie kliknij Utwórz.
- Po utworzeniu instancji kliknij OTWÓRZ JUPYTERLAB.
4. Konfigurowanie urządzenia Surface do strumieniowego przesyłania filmów
Zanim utworzysz aplikację w Analytics Otwarcia, musisz zarejestrować strumień, którego aplikacja będzie później używać.
W tym samouczku użyjemy instancji Jupyter Notebook do hostowania filmu, a Ty wyślesz dane strumieniowego wideo z terminala Notebooka.
Pobieranie narzędzia wiersza poleceń vaictl
- W otwartej instancji JupyterLab otwórz notatnik z poziomu menu.
- Pobierz narzędzie wiersza poleceń Vertex AI Vision (vaictl), narzędzie wiersza poleceń serwera rtsp i narzędzie open-cv, używając w komórce notebooka tego polecenia:
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg
5. Pozyskaj plik wideo na potrzeby przesyłania strumieniowego
Po skonfigurowaniu środowiska notebooka za pomocą wymaganych narzędzi wiersza poleceń możesz skopiować przykładowy plik wideo, a potem użyć narzędzia vaictl do przesyłania danych wideo do aplikacji do analizy zajętości.
Rejestrowanie nowego strumienia
- W panelu bocznym Vertex AI Vision kliknij kartę strumieni.
- Kliknij przycisk Zarejestruj się u góry strony.
- W polu Nazwa strumienia wpisz „queue-stream”.
- W polu Region wybierz ten sam region, który został wybrany podczas tworzenia notatnika w poprzednim kroku.
- Kliknij Zarejestruj.
Kopiowanie przykładowego filmu na maszynę wirtualną
- W notatniku skopiuj przykładowy film za pomocą tego polecenia wget.
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4
Przesyłanie strumieni wideo z VM i przetwarzanie danych na potrzeby strumienia
- Aby wysłać ten lokalny plik wideo do strumienia danych wejściowych aplikacji, użyj tego polecenia w komórce notatnika. Musisz wprowadzić te zamiany zmiennych:
- PROJECT_ID: identyfikator Twojego projektu Google Cloud.
- LOKALIZACJA: identyfikator lokalizacji. Na przykład us-central1. Więcej informacji znajdziesz w artykule Lokalizacja Google Cloud.
- LOCAL_FILE: nazwa lokalnego pliku wideo. Na przykład
seq25_h264
.mp4.
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
- Uruchomić serwer rtsp-simple-server, na którym strumieniowo przesyłamy plik wideo za pomocą protokołu rtsp.
import os
import time
import subprocess
subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- Użyj narzędzia wiersza poleceń ffmpeg, aby zapętlić film w strumieniu rtsp.
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- Użyj narzędzia wiersza poleceń vaictl, aby przesyłać strumieniowo film z identyfikatora URI serwera RTSP do strumienia „queue-stream” Vertex AI Vision utworzonego w poprzednim kroku.
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)
Od momentu rozpoczęcia operacji przetwarzania danych przez vaictl do momentu wyświetlenia filmu na panelu może minąć około 100 sekund.
Gdy przetworzenie strumienia będzie dostępne, możesz wyświetlić strumień wideo na karcie Strumienie w panelu Vertex AI Vision, wybierając kolejkę strumieni.
6. Tworzenie aplikacji
Pierwszym krokiem jest utworzenie aplikacji, która będzie przetwarzać Twoje dane. Aplikację można traktować jako automatyczny potok łączący:
- Pozyskiwanie danych: dane z kanału wideo są przetwarzane w strumień.
- Analiza danych: model AI(Computer Vision) można dodać po przetworzeniu danych.
- Pamięć danych: dwie wersje strumienia wideo (oryginalny strumień i strumień przetworzony przez model AI) można przechowywać w hurtowni multimediów.
W konsoli Google Cloud aplikacja jest reprezentowana jako wykres.
Tworzenie pustej aplikacji
Zanim wypełnisz graf aplikacji, musisz najpierw utworzyć pustą aplikację.
Utwórz aplikację w konsoli Google Cloud.
- Otwórz konsolę Google Cloud.
- Na panelu Vertex AI Vision otwórz kartę Aplikacje. Otwórz kartę Aplikacje
- Kliknij przycisk Utwórz.
- Wpisz „queue-app” jako nazwę aplikacji i wybierz region.
- Kliknij Utwórz.
Dodawanie węzłów komponentów aplikacji
Po utworzeniu pustej aplikacji możesz dodać do grafu aplikacji te 3 węzły:
- węzeł przetwarzania: zasób strumienia, który przetwarza dane wysyłane z serwera wideo RTSP utworzonego w notatniku.
- węzeł przetwarzania: model analityczny zajętości, który działa na podstawie przetworzonych danych;
- Węzeł pamięci masowej: hurtownia multimediów, w której przechowywane są przetworzone filmy, i służy jako magazyn metadanych. Magazyny metadanych zawierają informacje analityczne o przetworzonych danych wideo oraz informacje wywnioskowane przez modele AI.
Dodaj do aplikacji w konsoli węzły komponentów.
- Na panelu Vertex AI Vision otwórz kartę Aplikacje. Otwórz kartę Aplikacje
Przejdziesz do wizualizacji potoku przetwarzania w postaci wykresu.
Dodawanie węzła przetwarzania danych
- Aby dodać węzeł strumienia wejściowego, w sekcji Oprogramowanie sprzęgające w menu bocznym wybierz opcję Strumienie.
- W sekcji Źródło w menu Strumień, które się otworzy, wybierz Dodaj strumienie.
- W menu Dodaj strumienie wybierz queue-stream.
- Aby dodać strumień do wykresu aplikacji, kliknij Dodaj strumienie.
Dodawanie węzła przetwarzania danych
- Aby dodać węzeł modelu liczby obłożenia, wybierz opcję Analiza zajętości w sekcji Modele specjalistyczne w menu bocznym.
- Pozostaw domyślne opcje Osoby. Odznacz opcję Pojazdy, jeśli jest już wybrana.
- W sekcji Opcje zaawansowane kliknij Utwórz aktywne strefy lub linie .
- Za pomocą narzędzia wielokąta narysuj aktywne strefy, aby zliczać osoby w danej strefie. odpowiednio ją nazwać;
- U góry kliknij strzałkę wstecz.
- Aby dodać ustawienia czasu bezczynności, które pozwolą wykrywać przeciążenie, kliknij pole wyboru.
Dodawanie węzła przechowywania danych
- Aby dodać węzeł docelowy (lokalizację w pamięci masowej) dla danych wyjściowych, w sekcji Oprogramowanie sprzęgające w menu bocznym wybierz opcję Hurtownia Vision AI.
- Kliknij łącznik Hurtownia Vertex AI, aby otworzyć jego menu, a potem kliknij Połącz hurtownię.
- W menu Połącz hurtownię wybierz Utwórz nową hurtownię. Nazwij hurtownię queue-warehouse i pozostaw czas TTL ustawiony na 14 dni.
- Kliknij przycisk Utwórz, aby dodać hurtownię.
7. Łączenie wyjścia z tabelą BigQuery
Gdy dodasz oprogramowanie sprzęgające BigQuery do aplikacji Vertex AI Vision, wszystkie dane wyjściowe połączonego modelu aplikacji będą przetwarzane do tabeli docelowej.
Możesz utworzyć własną tabelę BigQuery i wskazać ją podczas dodawania do aplikacji łącznika BigQuery lub pozwolić platformie aplikacji Vertex AI Vision na automatyczne utworzenie tabeli.
Automatyczne tworzenie tabel
Jeśli chcesz, aby platforma aplikacji Vertex AI Vision automatycznie utworzyła tabelę, możesz określić tę opcję podczas dodawania węzła łącznika BigQuery.
Jeśli chcesz korzystać z automatycznego tworzenia tabel, obowiązują te warunki dotyczące zbiorów danych i tabel:
- Zbiór danych: nazwa automatycznie utworzonego zbioru danych to visionai_dataset.
- Tabela: nazwa automatycznie utworzonej tabeli to visionai_dataset.APPLICATION_ID.
- Obsługa błędów:
- Jeśli w ramach tego samego zbioru danych istnieje już tabela o tej samej nazwie, nie nastąpi jej automatyczne utworzenie.
- Na panelu Vertex AI Vision otwórz kartę Aplikacje. Otwórz kartę Aplikacje
- Wybierz Wyświetl aplikację obok nazwy aplikacji na liście.
- Na stronie kreatora aplikacji w sekcji Oprogramowanie sprzęgające kliknij BigQuery.
- Pole Ścieżka BigQuery pozostaw puste.
- W sekcji Metadane sklepu pochodzące z: wybierz tylko „Statystyki dotyczące zajętości” i odznacz strumienie.
Ostateczny wykres aplikacji powinien wyglądać tak:
8. Wdrażanie aplikacji do użytkowania
Po utworzeniu kompleksowej aplikacji ze wszystkimi niezbędnymi komponentami ostatnim krokiem do jej użycia jest wdrożenie.
- Otwórz kartę Applications (Aplikacje) w panelu Vertex AI Vision. Otwórz kartę Aplikacje.
- Obok aplikacji queue-app na liście kliknij Wyświetl aplikację.
- Na stronie Studio kliknij przycisk Wdróż.
- W oknie potwierdzenia kliknij Wdróż. Operacja wdrożenia może potrwać kilka minut. Po zakończeniu wdrażania obok węzłów pojawią się zielone ikony potwierdzenia.
9. wyszukiwanie treści wideo w hurtowni danych,
Po przetworzeniu danych wideo w aplikacji do przetwarzania możesz wyświetlić przetworzone dane wideo i wyszukiwać je na podstawie informacji z analizy zajętości.
- Na panelu Vertex AI Vision otwórz kartę Magazyny. Otwórz kartę Magazyny
- Na liście odszukaj magazyn kolejki-magazynu i kliknij Wyświetl zasoby.
- W sekcji Liczba osób ustaw wartość Min na 1, a Max na 5.
- Aby odfiltrować przetworzone dane wideo przechowywane w hurtowni multimediów Vertex AI Vision, kliknij Szukaj.
Widok przechowywanych danych wideo, które pasują do kryteriów wyszukiwania w konsoli Google Cloud.
10. Dodawanie adnotacji do danych wyjściowych i analizowanie ich za pomocą tabeli BigQuery
- W notatniku zainicjuj te zmienne w komórce.
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
- Teraz przechwytujemy klatki ze strumienia rtsp za pomocą następującego kodu:
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone
frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()
stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
global frames
while True:
ret, frame = stream.read()
frame_ts = datetime.now(timezone.utc).timestamp() * 1000
if ret:
with frame_buffer_lock:
while len(frame_buffer) >= frame_buffer_size:
_ = frame_buffer.popitem(last=False)
frame_buffer[frame_ts] = frame
frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
- Pobierz sygnaturę czasową danych i informacje o adnotacji z tabeli BigQuery i utwórz katalog do przechowywania przechwyconych obrazów ramek:
from google.cloud import bigquery
import pandas as pd
client = bigquery.Client(project=PROJECT_ID)
query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""
bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
- Dodaj adnotacje do klatek za pomocą tego kodu:
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output
im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)
dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
while True:
try:
annotations_df = client.query(f'''
SELECT ingestion_time, annotation
FROM `{bq_table}`
WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
''').to_dataframe()
except ValueError as e:
continue
bq_max_ingest_ts = annotations_df['ingestion_time'].max()
for _, row in annotations_df.iterrows():
with frame_buffer_lock:
frame_ts = np.asarray(list(frame_buffer.keys()))
delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
delta_tx_idx = delta_ts.argmin()
closest_ts_delta = delta_ts[delta_tx_idx]
closest_ts = frame_ts[delta_tx_idx]
if closest_ts_delta > frame_buffer_error_milliseconds: continue
image = frame_buffer[closest_ts]
annotations = json.loads(row['annotation'])
for box in annotations['identifiedBoxes']:
image = cv2.rectangle(
image,
(
int(box['normalizedBoundingBox']['xmin']*im_width),
int(box['normalizedBoundingBox']['ymin']*im_height)
),
(
int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
),
(255, 0, 0), 2
)
img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
cv2.imwrite(img_filename, image)
binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
curr_framedata = {
'path': img_filename,
'timestamp_error': closest_ts_delta,
'counts': {
**{
k['annotation']['displayName'] : cntext(k['counts'])
for k in annotations['stats']["activeZoneCounts"]
},
'full-frame': cntext(annotations['stats']["fullFrameCount"])
}
}
framedata[img_filename] = curr_framedata
if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
dashdelta = datetime.now()
clear_output()
display(HTML(f'''
<h1>Queue Monitoring Application</h1>
<p>Live Feed of the queue camera:</p>
<p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
<table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
<caption>Current Model Outputs</caption>
<thead>
<tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
</thead>
<tbody>
<tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
<tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
<tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
<tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
</tbody>
</table>
<p> </p>
'''))
except KeyboardInterrupt:
print('Stopping Live Monitoring')
- Zatrzymaj zadanie adnotacji, klikając przycisk Zatrzymaj na pasku menu notatnika.
- Aby powrócić do poszczególnych klatek, użyj tego kodu:
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
display(framedata[imgs[frame]])
display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
description='Frame #:',
value=0,
min=0, max=len(imgs)-1, step=1,
layout=Layout(width='100%')))
11. Gratulacje
Gratulacje! Masz ukończony ten moduł.
Czyszczenie danych
Aby uniknąć obciążenia konta Google Cloud opłatami za zasoby zużyte w tym samouczku, możesz usunąć projekt zawierający te zasoby lub zachować projekt i usunąć poszczególne zasoby.
Usuwanie projektu
Usuwanie poszczególnych zasobów
Materiały
https://cloud.google.com/vision-ai/docs/overview
https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial