VideoMAE による人物動作認識(ソースコードと説明と利用ガイド)

プログラム利用ガイド

1. このプログラムの利用シーン

このツールは、人物の動作をリアルタイムで認識するためのソフトウェアである。動画ファイルやウェブカメラの映像から人物を自動検出し、16フレームの動作シーケンスを分析して動作認識を粉う。行動分析が必要な場面で活用できる。

2. 主な機能

3. 基本的な使い方

4. 便利な機能

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール

インストール済みの場合は実行不要。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。

REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul

関連する外部ページ

Python の公式ページ: https://www.python.org/

AI エディタ Windsurf のインストール

Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。

winget install --scope machine Codeium.Windsurf -e --silent

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリのインストール

コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する


pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install transformers pillow opencv-python numpy matplotlib

人物動作認識プログラム(VideoMAE + DETR統合版)

概要

このプログラムは、VideoMAEとDETRを組み合わせた人物動作認識システムである。動画またはカメラ映像から人物を検出し、16フレームシーケンスの動作認識を行う。そして,危険動作や予測の不確実性に基づいて異常度を算出する。

主要技術

VideoMAE(Video Masked Autoencoders)

2022年に提案された動画理解のための自己教師あり学習手法[1]。90%~95%の極めて高いマスキング比率による動画チューブマスキング戦略を用い、時間的冗長性を活用して動画表現学習を実現する。

DETR(Detection Transformer)

2020年に提案されたTransformerベースのエンドツーエンド物体検出手法[2]。従来の物体検出で必要だったアンカーボックス生成やNon-Maximum Suppressionを不要とし、集合予測問題として物体検出を定式化する。

実装の特色

参考文献

[1] Tong, Z., Song, Y., Wang, J., & Wang, L. (2022). VideoMAE: Masked Autoencoders are Data-Efficient Learners for Self-Supervised Video Pre-Training. Advances in Neural Information Processing Systems, 35. https://arxiv.org/abs/2203.12602

[2] Carion, N., Massa, F., Synnaeve, G., Usunier, N., Kirillov, A., & Zagoruyko, S. (2020). End-to-End Object Detection with Transformers. In European Conference on Computer Vision (ECCV 2020), 213-229. https://arxiv.org/abs/2005.12872

[3] Wang, L., Huang, B., Zhao, Z., Tong, Z., He, Y., Wang, Y., Wang, Y., & Qiao, Y. (2023). VideoMAE V2: Scaling Video Masked Autoencoders with Dual Masking. In Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition (CVPR 2023). https://arxiv.org/abs/2303.16727

ソースコード


"""
人物動作認識プログラム(VideoMAE + DETR統合版)

特徴技術名: VideoMAE(Video Masked Autoencoders)
出典: Tong, Z., Song, Y., Wang, J., & Wang, L. (2022). VideoMAE: Masked Autoencoders are Data-Efficient Learners for Self-Supervised Video Pre-Training. arXiv preprint arXiv:2203.12602.
特徴機能: 極めて高いマスキング比率(90%~95%)による動画チューブマスキング戦略を用いた自己教師あり学習による効率的な動画理解。時間的冗長性を活用し、従来の画像ベース手法より高いマスキング比率を実現。

学習済みモデル:
- VideoMAE: MCG-NJU/videomae-base-finetuned-kinetics(Kinetics-400データセットで事前学習済み、16フレームシーケンス対応)
- DETR: facebook/detr-resnet-50(COCO 2017データセットで学習済み、ResNet-50バックボーン、人物検出用)
- URL: https://huggingface.co/MCG-NJU/videomae-base-finetuned-kinetics、https://huggingface.co/facebook/detr-resnet-50

特徴技術および学習済みモデルの利用制限: **学術研究目的での利用を推奨。商用利用の場合は各モデルのライセンス(Apache 2.0等)を確認のこと。必ず利用者自身で利用制限を確認すること。**

方式設計:
  関連利用技術:
  - DETR(Detection Transformer): Transformerを用いたエンドツーエンド物体検出、アンカーボックスやNMSを不要とする革新的アーキテクチャ
  - OpenCV: リアルタイム映像処理、カメラ入力、画像前処理
  - PyTorch: 深層学習フレームワーク、GPU加速処理
  - Transformers Library: Hugging Face製、事前学習済みモデルの統一インターフェース

  入力と出力: 入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択.0:動画ファイルの場合はtkinterでファイル選択.1の場合はOpenCVでカメラが開く.2の場合はhttps://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.aviを使用)、出力: リアルタイム異常度表示、動作認識結果、処理結果のテキストファイル保存

  処理手順:
  1. DETR による人物検出(複数人物対応、面積と信頼度による主要人物選択)
  2. 人物領域のクロップと16フレームシーケンス蓄積
  3. VideoMAE による動作認識(Top-5予測結果出力)
  4. 予測確率分布と危険動作検出による異常度算出
  5. リアルタイム可視化と結果出力

  前処理、後処理:
  - 前処理: 人物領域の動的クロップ、224×224リサイズ、BGR→RGB変換、PIL Image変換
  - 後処理: Top-5動作認識結果の確率分析、不確実性と危険動作の統合による異常度計算

  追加処理:
  - 複数人物検出時の主要人物選択(信頼度0.7×面積0.3の重み付けスコア)
  - 検出失敗時のフルフレームフォールバック機能
  - 連続検出失敗カウントによる適応的処理切替
  - フレーム補完機能(無効フレーム時の最後有効フレーム再利用)

  調整を必要とする設定値:
  - PERSON_CONFIDENCE_THRESHOLD(人物検出信頼度閾値、デフォルト0.7)
  - MIN_PERSON_SIZE(最小人物サイズピクセル、デフォルト50)
  - SEQUENCE_LENGTH(動作認識用フレーム数、デフォルト16)

将来方策: PERSON_CONFIDENCE_THRESHOLD自動調整機能(環境光量や被写体サイズに基づく動的閾値設定、検出成功率のリアルタイム監視による自動最適化)

その他の重要事項:
- GPU使用推奨(CUDA対応)
- リアルタイム処理のため数フレームごとに人物検出実行(skip_frames=3)
- カメラ解像度640x480固定

前準備:
pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install transformers pillow opencv-python numpy matplotlib
"""

import cv2
import torch
import numpy as np
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
from transformers import DetrImageProcessor, DetrForObjectDetection
from collections import deque
import warnings
from PIL import Image, ImageFont, ImageDraw
import tkinter as tk
from tkinter import filedialog
import urllib.request
import os
import time
from datetime import datetime
warnings.filterwarnings("ignore")

# GPU/CPU自動選択
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'デバイス: {str(device)}')

# VideoMAEモデル読み込み
model_name = "MCG-NJU/videomae-base-finetuned-kinetics"
print("Loading VideoMAE model...")
try:
    image_processor = VideoMAEImageProcessor.from_pretrained(model_name)
    model = VideoMAEForVideoClassification.from_pretrained(model_name).to(device)
    model.eval()
    print("VideoMAE model loaded successfully")
except Exception as e:
    print(f"Failed to load VideoMAE model: {e}")
    exit(1)

# DETR人物検出モデル読み込み
detr_model_name = "facebook/detr-resnet-50"
print("Loading DETR model...")
try:
    detr_processor = DetrImageProcessor.from_pretrained(detr_model_name)
    detr_model = DetrForObjectDetection.from_pretrained(detr_model_name).to(device)
    detr_model.eval()
    print("DETR model loaded successfully")
except Exception as e:
    print(f"Failed to load DETR model: {e}")
    exit(1)

# パラメータ
SEQUENCE_LENGTH = 16
PERSON_CONFIDENCE_THRESHOLD = 0.7
MIN_PERSON_SIZE = 50

# バッファ管理
person_crop_buffer = deque(maxlen=SEQUENCE_LENGTH)
detection_status_buffer = deque(maxlen=SEQUENCE_LENGTH)

frame_count = 0
results_log = []

def video_frame_processing(frame):
    global frame_count, last_person_detections, last_primary_person, consecutive_failures, skip_frames
    current_time = time.time()
    frame_count += 1

    # フレーム処理(既存のロジック)
    if frame_count % skip_frames == 0:
        try:
            last_person_detections = detect_persons_detr(frame)
            last_primary_person = select_primary_person(last_person_detections)

            if last_primary_person is None:
                consecutive_failures += 1

        except Exception as e:
            print(f"Detection error: {e}")
            last_person_detections = []
            last_primary_person = None
            consecutive_failures += 1

    # 複数人物の表示
    for i, person in enumerate(last_person_detections):
        x1, y1, x2, y2 = person['bbox']
        is_primary = (last_primary_person and
                     person['bbox'] == last_primary_person['bbox'])

        color = (0, 255, 0) if is_primary else (255, 100, 0)
        thickness = 3 if is_primary else 1

        cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness)
        label = f"Person {i+1} ({person['confidence']:.2f})"
        if is_primary:
            label += " [PRIMARY]"
        draw_text(frame, label, (x1, y1-10), 0.5, color, 1)

    # 人物検出状況の表示
    detection_status = "No Detection"
    status_color = (0, 0, 255)

    if last_primary_person:
        detection_status = f"Detected: {len(last_person_detections)} person(s)"
        status_color = (0, 255, 0)
    elif consecutive_failures > 5:
        detection_status = "Detection Failed - Using full frame"
        status_color = (0, 165, 255)

    draw_text(frame, detection_status, (10, 25), 0.6, status_color, 2)

    current_crop = None
    detection_success = False

    if last_primary_person:
        x1, y1, x2, y2 = last_primary_person['bbox']
        h, w = frame.shape[:2]
        padding = 20

        x1 = max(0, x1 - padding)
        y1 = max(0, y1 - padding)
        x2 = min(w, x2 + padding)
        y2 = min(h, y2 + padding)

        cropped = frame[y1:y2, x1:x2]

        if cropped.shape[0] >= MIN_PERSON_SIZE and cropped.shape[1] >= MIN_PERSON_SIZE:
            current_crop = cropped
            detection_success = True
            consecutive_failures = 0
        else:
            if frame_count % skip_frames == 0:
                consecutive_failures += 1

    if current_crop is None:
        if consecutive_failures > 5:
            current_crop = frame.copy()
            draw_text(frame, "Fallback: Full frame", (10, 50), 0.5, (255, 255, 0), 1)
        else:
            draw_text(frame, f"Waiting for detection... ({consecutive_failures}/5)",
                     (10, 50), 0.5, (255, 255, 0), 1)

    result = "待機中"

    if current_crop is not None:
        person_crop_buffer.append(current_crop)
        detection_status_buffer.append(detection_success)

    if len(person_crop_buffer) == SEQUENCE_LENGTH:
        detection_rate = sum(detection_status_buffer) / len(detection_status_buffer)

        try:
            pil_frames = []
            valid_frame_count = 0
            last_valid_pil = None

            for f in person_crop_buffer:
                if f is not None and len(f.shape) == 3 and f.shape[0] > 0 and f.shape[1] > 0:
                    try:
                        f_resized = cv2.resize(f, (224, 224))
                        f_rgb = cv2.cvtColor(f_resized, cv2.COLOR_BGR2RGB)
                        f_rgb = f_rgb.astype(np.uint8)
                        pil_img = Image.fromarray(f_rgb)
                        pil_frames.append(pil_img)
                        last_valid_pil = pil_img
                        valid_frame_count += 1
                    except Exception as e:
                        print(f"Frame processing error: {e}")
                        if last_valid_pil is not None:
                            pil_frames.append(last_valid_pil)
                        else:
                            black_frame = np.zeros((224, 224, 3), dtype=np.uint8)
                            pil_frames.append(Image.fromarray(black_frame))
                else:
                    if last_valid_pil is not None:
                        pil_frames.append(last_valid_pil)
                    else:
                        black_frame = np.zeros((224, 224, 3), dtype=np.uint8)
                        pil_frames.append(Image.fromarray(black_frame))

            while len(pil_frames) < SEQUENCE_LENGTH:
                if last_valid_pil is not None:
                    pil_frames.append(last_valid_pil)
                else:
                    black_frame = np.zeros((224, 224, 3), dtype=np.uint8)
                    pil_frames.append(Image.fromarray(black_frame))

            pil_frames = pil_frames[:SEQUENCE_LENGTH]

            if valid_frame_count > 0:
                inputs = image_processor(pil_frames, return_tensors="pt")
                inputs = {k: v.to(device) for k, v in inputs.items()}

                with torch.no_grad():
                    outputs = model(**inputs)

                probs = torch.softmax(outputs.logits, dim=-1)
                top5_probs, top5_indices = torch.topk(probs, 5)

                top5_predictions = []
                for prob, idx in zip(top5_probs[0], top5_indices[0]):
                    class_name = model.config.id2label[idx.item()]
                    top5_predictions.append((class_name, prob.item()))

                anomaly_score = calculate_anomaly_from_predictions(top5_predictions)

                if detection_rate < 0.5:
                    anomaly_score *= 0.7
                    draw_text(frame, "Low detection rate!", (10, 75), 0.5, (0, 165, 255), 2)

                y_offset = 100
                for i, (action, prob) in enumerate(top5_predictions):
                    text = f"{i+1}. {action}: {prob:.3f}"
                    draw_text(frame, text, (10, y_offset), 0.5, (255, 255, 255), 1)
                    y_offset += 25

                draw_text(frame, f"Detection Rate: {detection_rate:.1%}",
                         (frame.shape[1] - 200, 30), 0.5, (255, 255, 255), 1)

                result = f"検出率{detection_rate:.1%}, 有効フレーム{valid_frame_count}/{SEQUENCE_LENGTH}, Top1:{top5_predictions[0][0]}({top5_predictions[0][1]:.3f}), 異常度:{anomaly_score:.1f}"
            else:
                result = "処理不可"

        except Exception as e:
            print(f"Processing error: {e}")
            result = "エラー"
    else:
        draw_text(frame, f"Collecting frames: {len(person_crop_buffer)}/{SEQUENCE_LENGTH}",
                 (10, frame.shape[0] - 30), 0.6, (255, 255, 0), 2)

    return frame, result, current_time

def draw_text(frame, text, position, font_scale=0.5, color=(255, 255, 255), thickness=1):
    """統一的なテキスト描画処理"""
    cv2.putText(frame, text, position,
                cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness)

def detect_persons_detr(frame):
    """DETR による人物検出(複数人物対応)"""
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(rgb_frame)

    inputs = detr_processor(images=pil_image, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = detr_model(**inputs)

    target_sizes = torch.tensor([pil_image.size[::-1]], dtype=torch.int64)
    results = detr_processor.post_process_object_detection(
        outputs, target_sizes=target_sizes, threshold=PERSON_CONFIDENCE_THRESHOLD
    )[0]

    person_detections = []
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        if label.item() == 1:
            box_cpu = box.cpu().numpy()
            if box_cpu.ndim == 1 and len(box_cpu) >= 4:
                x1, y1, x2, y2 = int(box_cpu[0]), int(box_cpu[1]), int(box_cpu[2]), int(box_cpu[3])
            else:
                continue

            width = x2 - x1
            height = y2 - y1

            if width >= MIN_PERSON_SIZE and height >= MIN_PERSON_SIZE:
                person_detections.append({
                    'bbox': (x1, y1, x2, y2),
                    'confidence': score.item(),
                    'area': width * height
                })

    return person_detections

def select_primary_person(person_detections):
    """複数人物から主要人物を選択(面積と信頼度の組み合わせ)"""
    if not person_detections:
        return None

    max_area = max(p['area'] for p in person_detections)

    for person in person_detections:
        normalized_area = person['area'] / max_area
        person['combined_score'] = person['confidence'] * 0.7 + normalized_area * 0.3

    best_person = max(person_detections, key=lambda x: x['combined_score'])
    return best_person

def calculate_anomaly_from_predictions(top5_results):
    """Top-5結果から異常度を判定する最善ルール"""
    top1_class, top1_prob = top5_results[0]

    uncertainty_score = (1 - top1_prob) * 100

    dangerous_actions = ["falling", "fighting", "punching", "kicking", "wrestling",
                        "sword fighting", "fencing", "slapping", "headbanging"]
    danger_bonus = 50 if any(action in top1_class.lower() for action in dangerous_actions) else 0

    probs = [result[1] for result in top5_results]
    prob_variance = np.var(probs)

    anomaly_score = uncertainty_score + danger_bonus + prob_variance

    return anomaly_score

print("0: 動画ファイル")
print("1: カメラ")
print("2: サンプル動画")

choice = input("選択: ")

if choice == '0':
    root = tk.Tk()
    root.withdraw()
    path = filedialog.askopenfilename()
    if not path:
        exit()
    cap = cv2.VideoCapture(path)
elif choice == '1':
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
    if not cap.isOpened():
        cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
else:
    SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
    SAMPLE_FILE = 'vtest.avi'
    urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
    cap = cv2.VideoCapture(SAMPLE_FILE)

if not cap.isOpened():
    print('動画ファイル・カメラを開けませんでした')
    exit()

# グローバル変数初期化
last_person_detections = []
last_primary_person = None
consecutive_failures = 0
skip_frames = 3

# メイン処理
print('\n=== 動画処理開始 ===')
print('操作方法:')
print('  q キー: プログラム終了')
print(f'人物検出信頼度閾値: {PERSON_CONFIDENCE_THRESHOLD}')
print(f'最小人物サイズ: {MIN_PERSON_SIZE}px')

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        MAIN_FUNC_DESC = "人物動作異常検知システム"
        processed_frame, result, current_time = video_frame_processing(frame)
        cv2.imshow(MAIN_FUNC_DESC, processed_frame)
        if choice == '1':  # カメラの場合
            print(datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], result)
        else:  # 動画ファイルの場合
            print(frame_count, result)
        results_log.append(result)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    print('\n=== プログラム終了 ===')
    cap.release()
    cv2.destroyAllWindows()
    if results_log:
        with open('result.txt', 'w', encoding='utf-8') as f:
            f.write('=== 結果 ===\n')
            f.write(f'処理フレーム数: {frame_count}\n')
            f.write(f'使用デバイス: {str(device).upper()}\n')
            if device.type == 'cuda':
                f.write(f'GPU: {torch.cuda.get_device_name(0)}\n')
            f.write('\n')
            f.write('\n'.join(results_log))
        print(f'\n処理結果をresult.txtに保存しました')