VideoMAE による人物動作認識(ソースコードと説明と利用ガイド)
プログラム利用ガイド
1. このプログラムの利用シーン
このツールは、人物の動作をリアルタイムで認識するためのソフトウェアである。動画ファイルやウェブカメラの映像から人物を自動検出し、16フレームの動作シーケンスを分析して動作認識を粉う。行動分析が必要な場面で活用できる。
2. 主な機能
- リアルタイム人物検出:DETRアルゴリズムによる複数人物の同時検出と主要人物の自動選択
- 動作認識:VideoMAEによる16フレームシーケンスからのTop-5動作分類
- 異常度判定:予測の不確実性と危険動作パターンを組み合わせた異常度算出
- 可視化機能:検出結果、動作認識結果、異常度をリアルタイム表示
- 結果保存:処理結果をresult.txtファイルに自動保存
3. 基本的な使い方
- 起動と入力選択:プログラム実行後、0(動画ファイル)、1(カメラ)、2(サンプル動画)から選択してEnterキーを押す
- 動画ファイル選択:0を選択した場合、ファイルダイアログで対象動画を選択する
- 処理開始:選択後、自動的に処理が開始され、映像と解析結果が表示される
- 終了方法:映像表示画面でqキーを押すとプログラムが終了する
4. 便利な機能
- 検出率表示:人物検出の成功率をリアルタイム監視
- 処理ログ:カメラ使用時は時刻付き、動画ファイル使用時はフレーム番号付きで結果を表示
- 結果ファイル出力:全処理結果をresult.txtに保存し、後から詳細分析が可能
Python開発環境,ライブラリ類
ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。
Python 3.12 のインストール
インストール済みの場合は実行不要。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。
REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul
【関連する外部ページ】
Python の公式ページ: https://www.python.org/
AI エディタ Windsurf のインストール
Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。
winget install --scope machine Codeium.Windsurf -e --silent
【関連する外部ページ】
Windsurf の公式ページ: https://windsurf.com/
必要なライブラリのインストール
コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install transformers pillow opencv-python numpy matplotlib
人物動作認識プログラム(VideoMAE + DETR統合版)
概要
このプログラムは、VideoMAEとDETRを組み合わせた人物動作認識システムである。動画またはカメラ映像から人物を検出し、16フレームシーケンスの動作認識を行う。そして,危険動作や予測の不確実性に基づいて異常度を算出する。
主要技術
VideoMAE(Video Masked Autoencoders)
2022年に提案された動画理解のための自己教師あり学習手法[1]。90%~95%の極めて高いマスキング比率による動画チューブマスキング戦略を用い、時間的冗長性を活用して動画表現学習を実現する。
DETR(Detection Transformer)
2020年に提案されたTransformerベースのエンドツーエンド物体検出手法[2]。従来の物体検出で必要だったアンカーボックス生成やNon-Maximum Suppressionを不要とし、集合予測問題として物体検出を定式化する。
実装の特色
- 複数人物検出時の信頼度と面積による主要人物選択アルゴリズム
- リアルタイム可視化とファイル出力機能
参考文献
[1] Tong, Z., Song, Y., Wang, J., & Wang, L. (2022). VideoMAE: Masked Autoencoders are Data-Efficient Learners for Self-Supervised Video Pre-Training. Advances in Neural Information Processing Systems, 35. https://arxiv.org/abs/2203.12602
[2] Carion, N., Massa, F., Synnaeve, G., Usunier, N., Kirillov, A., & Zagoruyko, S. (2020). End-to-End Object Detection with Transformers. In European Conference on Computer Vision (ECCV 2020), 213-229. https://arxiv.org/abs/2005.12872
[3] Wang, L., Huang, B., Zhao, Z., Tong, Z., He, Y., Wang, Y., Wang, Y., & Qiao, Y. (2023). VideoMAE V2: Scaling Video Masked Autoencoders with Dual Masking. In Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition (CVPR 2023). https://arxiv.org/abs/2303.16727
ソースコード
"""
人物動作認識プログラム(VideoMAE + DETR統合版)
特徴技術名: VideoMAE(Video Masked Autoencoders)
出典: Tong, Z., Song, Y., Wang, J., & Wang, L. (2022). VideoMAE: Masked Autoencoders are Data-Efficient Learners for Self-Supervised Video Pre-Training. arXiv preprint arXiv:2203.12602.
特徴機能: 極めて高いマスキング比率(90%~95%)による動画チューブマスキング戦略を用いた自己教師あり学習による効率的な動画理解。時間的冗長性を活用し、従来の画像ベース手法より高いマスキング比率を実現。
学習済みモデル:
- VideoMAE: MCG-NJU/videomae-base-finetuned-kinetics(Kinetics-400データセットで事前学習済み、16フレームシーケンス対応)
- DETR: facebook/detr-resnet-50(COCO 2017データセットで学習済み、ResNet-50バックボーン、人物検出用)
- URL: https://huggingface.co/MCG-NJU/videomae-base-finetuned-kinetics、https://huggingface.co/facebook/detr-resnet-50
特徴技術および学習済みモデルの利用制限: **学術研究目的での利用を推奨。商用利用の場合は各モデルのライセンス(Apache 2.0等)を確認のこと。必ず利用者自身で利用制限を確認すること。**
方式設計:
関連利用技術:
- DETR(Detection Transformer): Transformerを用いたエンドツーエンド物体検出、アンカーボックスやNMSを不要とする革新的アーキテクチャ
- OpenCV: リアルタイム映像処理、カメラ入力、画像前処理
- PyTorch: 深層学習フレームワーク、GPU加速処理
- Transformers Library: Hugging Face製、事前学習済みモデルの統一インターフェース
入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択.0:動画ファイルの場合はtkinterでファイル選択.1の場合はOpenCVでカメラが開く.2の場合はhttps://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.aviを使用)、出力: リアルタイム異常度表示、動作認識結果、処理結果のテキストファイル保存
処理手順:
1. DETR による人物検出(複数人物対応、面積と信頼度による主要人物選択)
2. 人物領域のクロップと16フレームシーケンス蓄積
3. VideoMAE による動作認識(Top-5予測結果出力)
4. 予測確率分布と危険動作検出による異常度算出
5. リアルタイム可視化と結果出力
前処理、後処理:
- 前処理: 人物領域の動的クロップ、224×224リサイズ、BGR→RGB変換、PIL Image変換
- 後処理: Top-5動作認識結果の確率分析、不確実性と危険動作の統合による異常度計算
追加処理:
- 複数人物検出時の主要人物選択(信頼度0.7×面積0.3の重み付けスコア)
- 検出失敗時のフルフレームフォールバック機能
- 連続検出失敗カウントによる適応的処理切替
- フレーム補完機能(無効フレーム時の最後有効フレーム再利用)
調整を必要とする設定値:
- PERSON_CONFIDENCE_THRESHOLD(人物検出信頼度閾値、デフォルト0.7)
- MIN_PERSON_SIZE(最小人物サイズピクセル、デフォルト50)
- SEQUENCE_LENGTH(動作認識用フレーム数、デフォルト16)
将来方策: PERSON_CONFIDENCE_THRESHOLD自動調整機能(環境光量や被写体サイズに基づく動的閾値設定、検出成功率のリアルタイム監視による自動最適化)
その他の重要事項:
- GPU使用推奨(CUDA対応)
- リアルタイム処理のため数フレームごとに人物検出実行(skip_frames=3)
- カメラ解像度640x480固定
前準備:
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install transformers pillow opencv-python numpy matplotlib
"""
import cv2
import torch
import numpy as np
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
from transformers import DetrImageProcessor, DetrForObjectDetection
from collections import deque
import warnings
from PIL import Image, ImageFont, ImageDraw
import tkinter as tk
from tkinter import filedialog
import urllib.request
import os
import time
from datetime import datetime
warnings.filterwarnings("ignore")
# GPU/CPU自動選択
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'デバイス: {str(device)}')
# VideoMAEモデル読み込み
model_name = "MCG-NJU/videomae-base-finetuned-kinetics"
print("Loading VideoMAE model...")
try:
image_processor = VideoMAEImageProcessor.from_pretrained(model_name)
model = VideoMAEForVideoClassification.from_pretrained(model_name).to(device)
model.eval()
print("VideoMAE model loaded successfully")
except Exception as e:
print(f"Failed to load VideoMAE model: {e}")
exit(1)
# DETR人物検出モデル読み込み
detr_model_name = "facebook/detr-resnet-50"
print("Loading DETR model...")
try:
detr_processor = DetrImageProcessor.from_pretrained(detr_model_name)
detr_model = DetrForObjectDetection.from_pretrained(detr_model_name).to(device)
detr_model.eval()
print("DETR model loaded successfully")
except Exception as e:
print(f"Failed to load DETR model: {e}")
exit(1)
# パラメータ
SEQUENCE_LENGTH = 16
PERSON_CONFIDENCE_THRESHOLD = 0.7
MIN_PERSON_SIZE = 50
# バッファ管理
person_crop_buffer = deque(maxlen=SEQUENCE_LENGTH)
detection_status_buffer = deque(maxlen=SEQUENCE_LENGTH)
frame_count = 0
results_log = []
def video_frame_processing(frame):
global frame_count, last_person_detections, last_primary_person, consecutive_failures, skip_frames
current_time = time.time()
frame_count += 1
# フレーム処理(既存のロジック)
if frame_count % skip_frames == 0:
try:
last_person_detections = detect_persons_detr(frame)
last_primary_person = select_primary_person(last_person_detections)
if last_primary_person is None:
consecutive_failures += 1
except Exception as e:
print(f"Detection error: {e}")
last_person_detections = []
last_primary_person = None
consecutive_failures += 1
# 複数人物の表示
for i, person in enumerate(last_person_detections):
x1, y1, x2, y2 = person['bbox']
is_primary = (last_primary_person and
person['bbox'] == last_primary_person['bbox'])
color = (0, 255, 0) if is_primary else (255, 100, 0)
thickness = 3 if is_primary else 1
cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness)
label = f"Person {i+1} ({person['confidence']:.2f})"
if is_primary:
label += " [PRIMARY]"
draw_text(frame, label, (x1, y1-10), 0.5, color, 1)
# 人物検出状況の表示
detection_status = "No Detection"
status_color = (0, 0, 255)
if last_primary_person:
detection_status = f"Detected: {len(last_person_detections)} person(s)"
status_color = (0, 255, 0)
elif consecutive_failures > 5:
detection_status = "Detection Failed - Using full frame"
status_color = (0, 165, 255)
draw_text(frame, detection_status, (10, 25), 0.6, status_color, 2)
current_crop = None
detection_success = False
if last_primary_person:
x1, y1, x2, y2 = last_primary_person['bbox']
h, w = frame.shape[:2]
padding = 20
x1 = max(0, x1 - padding)
y1 = max(0, y1 - padding)
x2 = min(w, x2 + padding)
y2 = min(h, y2 + padding)
cropped = frame[y1:y2, x1:x2]
if cropped.shape[0] >= MIN_PERSON_SIZE and cropped.shape[1] >= MIN_PERSON_SIZE:
current_crop = cropped
detection_success = True
consecutive_failures = 0
else:
if frame_count % skip_frames == 0:
consecutive_failures += 1
if current_crop is None:
if consecutive_failures > 5:
current_crop = frame.copy()
draw_text(frame, "Fallback: Full frame", (10, 50), 0.5, (255, 255, 0), 1)
else:
draw_text(frame, f"Waiting for detection... ({consecutive_failures}/5)",
(10, 50), 0.5, (255, 255, 0), 1)
result = "待機中"
if current_crop is not None:
person_crop_buffer.append(current_crop)
detection_status_buffer.append(detection_success)
if len(person_crop_buffer) == SEQUENCE_LENGTH:
detection_rate = sum(detection_status_buffer) / len(detection_status_buffer)
try:
pil_frames = []
valid_frame_count = 0
last_valid_pil = None
for f in person_crop_buffer:
if f is not None and len(f.shape) == 3 and f.shape[0] > 0 and f.shape[1] > 0:
try:
f_resized = cv2.resize(f, (224, 224))
f_rgb = cv2.cvtColor(f_resized, cv2.COLOR_BGR2RGB)
f_rgb = f_rgb.astype(np.uint8)
pil_img = Image.fromarray(f_rgb)
pil_frames.append(pil_img)
last_valid_pil = pil_img
valid_frame_count += 1
except Exception as e:
print(f"Frame processing error: {e}")
if last_valid_pil is not None:
pil_frames.append(last_valid_pil)
else:
black_frame = np.zeros((224, 224, 3), dtype=np.uint8)
pil_frames.append(Image.fromarray(black_frame))
else:
if last_valid_pil is not None:
pil_frames.append(last_valid_pil)
else:
black_frame = np.zeros((224, 224, 3), dtype=np.uint8)
pil_frames.append(Image.fromarray(black_frame))
while len(pil_frames) < SEQUENCE_LENGTH:
if last_valid_pil is not None:
pil_frames.append(last_valid_pil)
else:
black_frame = np.zeros((224, 224, 3), dtype=np.uint8)
pil_frames.append(Image.fromarray(black_frame))
pil_frames = pil_frames[:SEQUENCE_LENGTH]
if valid_frame_count > 0:
inputs = image_processor(pil_frames, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
probs = torch.softmax(outputs.logits, dim=-1)
top5_probs, top5_indices = torch.topk(probs, 5)
top5_predictions = []
for prob, idx in zip(top5_probs[0], top5_indices[0]):
class_name = model.config.id2label[idx.item()]
top5_predictions.append((class_name, prob.item()))
anomaly_score = calculate_anomaly_from_predictions(top5_predictions)
if detection_rate < 0.5:
anomaly_score *= 0.7
draw_text(frame, "Low detection rate!", (10, 75), 0.5, (0, 165, 255), 2)
y_offset = 100
for i, (action, prob) in enumerate(top5_predictions):
text = f"{i+1}. {action}: {prob:.3f}"
draw_text(frame, text, (10, y_offset), 0.5, (255, 255, 255), 1)
y_offset += 25
draw_text(frame, f"Detection Rate: {detection_rate:.1%}",
(frame.shape[1] - 200, 30), 0.5, (255, 255, 255), 1)
result = f"検出率{detection_rate:.1%}, 有効フレーム{valid_frame_count}/{SEQUENCE_LENGTH}, Top1:{top5_predictions[0][0]}({top5_predictions[0][1]:.3f}), 異常度:{anomaly_score:.1f}"
else:
result = "処理不可"
except Exception as e:
print(f"Processing error: {e}")
result = "エラー"
else:
draw_text(frame, f"Collecting frames: {len(person_crop_buffer)}/{SEQUENCE_LENGTH}",
(10, frame.shape[0] - 30), 0.6, (255, 255, 0), 2)
return frame, result, current_time
def draw_text(frame, text, position, font_scale=0.5, color=(255, 255, 255), thickness=1):
"""統一的なテキスト描画処理"""
cv2.putText(frame, text, position,
cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness)
def detect_persons_detr(frame):
"""DETR による人物検出(複数人物対応)"""
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
inputs = detr_processor(images=pil_image, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = detr_model(**inputs)
target_sizes = torch.tensor([pil_image.size[::-1]], dtype=torch.int64)
results = detr_processor.post_process_object_detection(
outputs, target_sizes=target_sizes, threshold=PERSON_CONFIDENCE_THRESHOLD
)[0]
person_detections = []
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
if label.item() == 1:
box_cpu = box.cpu().numpy()
if box_cpu.ndim == 1 and len(box_cpu) >= 4:
x1, y1, x2, y2 = int(box_cpu[0]), int(box_cpu[1]), int(box_cpu[2]), int(box_cpu[3])
else:
continue
width = x2 - x1
height = y2 - y1
if width >= MIN_PERSON_SIZE and height >= MIN_PERSON_SIZE:
person_detections.append({
'bbox': (x1, y1, x2, y2),
'confidence': score.item(),
'area': width * height
})
return person_detections
def select_primary_person(person_detections):
"""複数人物から主要人物を選択(面積と信頼度の組み合わせ)"""
if not person_detections:
return None
max_area = max(p['area'] for p in person_detections)
for person in person_detections:
normalized_area = person['area'] / max_area
person['combined_score'] = person['confidence'] * 0.7 + normalized_area * 0.3
best_person = max(person_detections, key=lambda x: x['combined_score'])
return best_person
def calculate_anomaly_from_predictions(top5_results):
"""Top-5結果から異常度を判定する最善ルール"""
top1_class, top1_prob = top5_results[0]
uncertainty_score = (1 - top1_prob) * 100
dangerous_actions = ["falling", "fighting", "punching", "kicking", "wrestling",
"sword fighting", "fencing", "slapping", "headbanging"]
danger_bonus = 50 if any(action in top1_class.lower() for action in dangerous_actions) else 0
probs = [result[1] for result in top5_results]
prob_variance = np.var(probs)
anomaly_score = uncertainty_score + danger_bonus + prob_variance
return anomaly_score
print("0: 動画ファイル")
print("1: カメラ")
print("2: サンプル動画")
choice = input("選択: ")
if choice == '0':
root = tk.Tk()
root.withdraw()
path = filedialog.askopenfilename()
if not path:
exit()
cap = cv2.VideoCapture(path)
elif choice == '1':
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
else:
SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
SAMPLE_FILE = 'vtest.avi'
urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
cap = cv2.VideoCapture(SAMPLE_FILE)
if not cap.isOpened():
print('動画ファイル・カメラを開けませんでした')
exit()
# グローバル変数初期化
last_person_detections = []
last_primary_person = None
consecutive_failures = 0
skip_frames = 3
# メイン処理
print('\n=== 動画処理開始 ===')
print('操作方法:')
print(' q キー: プログラム終了')
print(f'人物検出信頼度閾値: {PERSON_CONFIDENCE_THRESHOLD}')
print(f'最小人物サイズ: {MIN_PERSON_SIZE}px')
try:
while True:
ret, frame = cap.read()
if not ret:
break
MAIN_FUNC_DESC = "人物動作異常検知システム"
processed_frame, result, current_time = video_frame_processing(frame)
cv2.imshow(MAIN_FUNC_DESC, processed_frame)
if choice == '1': # カメラの場合
print(datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], result)
else: # 動画ファイルの場合
print(frame_count, result)
results_log.append(result)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
print('\n=== プログラム終了 ===')
cap.release()
cv2.destroyAllWindows()
if results_log:
with open('result.txt', 'w', encoding='utf-8') as f:
f.write('=== 結果 ===\n')
f.write(f'処理フレーム数: {frame_count}\n')
f.write(f'使用デバイス: {str(device).upper()}\n')
if device.type == 'cuda':
f.write(f'GPU: {torch.cuda.get_device_name(0)}\n')
f.write('\n')
f.write('\n'.join(results_log))
print(f'\n処理結果をresult.txtに保存しました')