PaDiM による画像からの異常検知(ソースコードと実行結果)


Python開発環境,ライブラリ類
ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。
Python 3.12 のインストール
インストール済みの場合は実行不要。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。
REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul
【関連する外部ページ】
Python の公式ページ: https://www.python.org/
AI エディタ Windsurf のインストール
Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。
管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。
winget install --scope machine Codeium.Windsurf -e --silent
【関連する外部ページ】
Windsurf の公式ページ: https://windsurf.com/
必要なライブラリのインストール
コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する
前準備: pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install opencv-python scikit-learn pillow tqdm numpy
PaDiM による画像からの異常検知プログラム
概要
このプログラムは動画の視覚的パターンから正常・異常を判別する。深層ニューラルネットワークにより画像特徴を抽出し、統計的分布モデリングによって正常状態を学習する。その後、新しい入力フレームが,学習した正常分布からどの程度逸脱しているかを測定することで異常検知を実現する。
主要技術
- PaDiM(Patch Distribution Modeling)[1]
CNN特徴マップのパッチレベル分布を多変量ガウス分布でモデル化する手法である。各パッチ位置における正常特徴の確率分布を学習し、マハラノビス距離により異常度を算出する。
- ResNet(Residual Network)[2]
残差接続により深層ニューラルネットワークの学習を可能にした畳み込みニューラルネットワークアーキテクチャである。本プログラムではResNet18を特徴抽出器として使用している。
- Ledoit-Wolf共分散推定[3]
高次元データに対して数値的に安定した共分散行列を推定する正則化手法である。サンプル共分散行列を単位行列に向けて収縮させることで、特異性を回避する。
参考文献
[1] Defard, T., Setkov, A., Loesch, A., & Audigier, R. (2021). PaDiM: a patch distribution modeling framework for anomaly detection and localization. In International Conference on Computer Vision Theory and Applications (pp. 475-489).
[2] He, K., Zhang, X., Ren, S., & Sun, J. (2016). Deep residual learning for image recognition. In Proceedings of the IEEE Conference on Computer Vision and Pattern Recognition (pp. 770-778).
[3] Ledoit, O., & Wolf, M. (2004). A well-conditioned estimator for large-dimensional covariance matrices. Journal of Multivariate Analysis, 88(2), 365-411.
ソースコード
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.models import ResNet18_Weights, ResNet50_Weights, EfficientNet_B0_Weights
import torchvision.transforms as transforms
import numpy as np
import cv2
import os
import time
from datetime import datetime
import tkinter as tk
from tkinter import filedialog
import urllib.request
import shutil
from tqdm import tqdm
from sklearn.covariance import LedoitWolf
from PIL import Image, ImageFont, ImageDraw
# GPU/CPU自動選択
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'デバイス: {str(device)}')
# 共通定数
FILETYPES_IMAGES = [
('画像ファイル', '*.jpg *.jpeg *.png'),
('JPEGファイル', '*.jpg *.jpeg'),
('PNGファイル', '*.png'),
('すべてのファイル', '*.*')
]
DEFAULT_FONT_PATH = 'C:/Windows/Fonts/meiryo.ttc'
DEFAULT_FONT_SIZE = 30
# Tkinterルート(隠しウィンドウ)キャッシュ
_TK_ROOT = None
# ------------------------------------------------------------
# ログ出力
# ------------------------------------------------------------
def log_message(message, log_file='system.log'):
"""タイムスタンプ付きログメッセージの記録と表示"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
log_entry = f'[{timestamp}] {message}'
print(log_entry)
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry + '\n')
# ------------------------------------------------------------
# 共有ユーティリティ
# ------------------------------------------------------------
def get_hidden_tk_root():
"""非表示のTkルートウィンドウを取得(単一インスタンス)"""
global _TK_ROOT
if _TK_ROOT is None:
_TK_ROOT = tk.Tk()
_TK_ROOT.withdraw()
return _TK_ROOT
def open_camera_device(index=0):
"""カメラデバイスをオープンし、必要に応じてフォールバックを行う"""
cap = cv2.VideoCapture(index, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(index)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
return cap
def load_japanese_font(path=DEFAULT_FONT_PATH, size=DEFAULT_FONT_SIZE):
"""日本語フォントの読み込み。成功時は(font, True)、失敗時は(None, False)を返す"""
try:
font = ImageFont.truetype(path, size)
return font, True
except Exception:
log_message('日本語フォント読み込みに失敗。英語表示を使用')
return None, False
def safe_urlretrieve(url, dest, desc=''):
"""urlretrieveの安全ラッパ。成功時True、失敗時Falseを返す"""
try:
urllib.request.urlretrieve(url, dest)
log_message(f'ダウンロード成功: {desc or dest}')
return True
except Exception as e:
log_message(f'ダウンロード失敗: {desc or url}, エラー: {e}')
return False
# ------------------------------------------------------------
# 特徴抽出器
# ------------------------------------------------------------
class ResNetFeatureExtractorBase(nn.Module):
def __init__(self, backbone, layers=['layer1', 'layer2', 'layer3']):
super().__init__()
self.layer1 = nn.Sequential(backbone.conv1, backbone.bn1, backbone.relu, backbone.maxpool, backbone.layer1)
self.layer2 = backbone.layer2
self.layer3 = backbone.layer3
self.layers = layers
def forward(self, x):
out = {}
x = self.layer1(x)
if 'layer1' in self.layers:
out['layer1'] = x
x = self.layer2(x)
if 'layer2' in self.layers:
out['layer2'] = x
x = self.layer3(x)
if 'layer3' in self.layers:
out['layer3'] = x
return out
class ResNet18_FeatureExtractor(ResNetFeatureExtractorBase):
def __init__(self, layers=['layer1', 'layer2', 'layer3']):
log_message('ResNet18特徴抽出器を初期化')
backbone = models.resnet18(weights=ResNet18_Weights.DEFAULT)
super().__init__(backbone, layers)
log_message(f'ResNet18初期化完了 - 使用層: {layers}')
class ResNet50_FeatureExtractor(ResNetFeatureExtractorBase):
def __init__(self, layers=['layer1', 'layer2', 'layer3']):
log_message('ResNet50特徴抽出器を初期化')
backbone = models.resnet50(weights=ResNet50_Weights.DEFAULT)
super().__init__(backbone, layers)
log_message(f'ResNet50初期化完了 - 使用層: {layers}')
class EfficientNetB0_FeatureExtractor(nn.Module):
def __init__(self, layers=['features.3', 'features.4', 'features.6']):
super().__init__()
log_message('EfficientNet-B0特徴抽出器を初期化')
backbone = models.efficientnet_b0(weights=EfficientNet_B0_Weights.DEFAULT)
self.features = backbone.features
# 指定層名の妥当性検証とフィルタリング
total_layers = len(self.features)
valid_names = {f'features.{i}' for i in range(total_layers)}
provided = list(layers) if layers is not None else []
filtered = [name for name in provided if name in valid_names]
invalid = [name for name in provided if name not in valid_names]
if invalid:
log_message(f'警告: 無効な層名を除外: {invalid}')
if not filtered:
raise ValueError(f'指定されたEfficientNet-B0の層名が有効ではありません。利用可能な層: {sorted(valid_names)}')
self.layers = filtered
log_message(f'EfficientNet-B0初期化完了 - 使用層: {self.layers}')
def forward(self, x):
out = {}
for idx, module in enumerate(self.features):
x = module(x)
layer_name = f'features.{idx}'
if layer_name in self.layers:
out[layer_name] = x
return out
# ------------------------------------------------------------
# 前処理(BGR→RGB→正規化)
# ------------------------------------------------------------
_BASE_TRANSFORM = transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def preprocess_bgr_to_tensor(img_bgr):
"""OpenCVのBGR画像をRGBに変換しImageNet基準で正規化してTensor化"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(img_rgb)
x = _BASE_TRANSFORM(pil)
return x
# ------------------------------------------------------------
# モデル選択UI
# ------------------------------------------------------------
def select_model():
"""モデル選択メニューを表示し、選択されたモデルを返す"""
print('\n=== CNN特徴抽出モデル選択 ===')
print('\n利用可能なモデル一覧:')
print('\n1. ResNet18')
print('2. ResNet50')
print('3. EfficientNet-B0')
while True:
choice = input('\nモデルを選択してください (1/2/3): ')
if choice == '1':
log_message('ResNet18モデルが選択された')
print('\nResNet18を使用します')
return ResNet18_FeatureExtractor()
elif choice == '2':
log_message('ResNet50モデルが選択された')
print('\nResNet50を使用します')
return ResNet50_FeatureExtractor()
elif choice == '3':
log_message('EfficientNet-B0モデルが選択された')
print('\nEfficientNet-B0を使用します')
return EfficientNetB0_FeatureExtractor(layers=['features.3', 'features.4', 'features.6'])
else:
print('無効な選択です。1、2、または3を入力してください。')
# ------------------------------------------------------------
# 学習データ準備(撮影・コピー含む)
# ------------------------------------------------------------
def copy_files_from_pc(train_dir, current_files):
"""PC内のファイルを複数選択してコピーする"""
log_message('PC内ファイル選択モードを開始')
print('\nファイル選択ダイアログを開きます(複数選択可能)...')
root = get_hidden_tk_root()
file_paths = filedialog.askopenfilenames(
title='正常画像ファイルを選択してください(複数選択可能)',
filetypes=FILETYPES_IMAGES
)
if not file_paths:
log_message('ファイル選択がキャンセルされた')
print('ファイル選択がキャンセルされました')
return False
log_message(f'{len(file_paths)}個のファイルが選択された')
print(f'{len(file_paths)}個のファイルが選択されました')
for filename in current_files:
filepath = os.path.join(train_dir, filename)
if os.path.exists(filepath):
os.remove(filepath)
log_message(f'既存画像削除: {filename}')
print('既存画像を削除しました')
copied_count = 0
for file_path in file_paths:
try:
filename = os.path.basename(file_path)
dest_path = os.path.join(train_dir, filename)
if os.path.exists(dest_path):
name, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(dest_path):
new_filename = f'{name}_{counter}{ext}'
dest_path = os.path.join(train_dir, new_filename)
counter += 1
filename = os.path.basename(dest_path)
shutil.copy2(file_path, dest_path)
copied_count += 1
log_message(f'ファイルコピー: {filename}')
print(f'コピー完了: {filename}')
except Exception as e:
log_message(f'ファイルコピー失敗: {file_path}, エラー: {e}')
print(f'ファイルコピーに失敗: {os.path.basename(file_path)}, エラー: {e}')
log_message(f'ファイルコピー完了: {copied_count}個')
print(f'\n{copied_count}個のファイルをコピーしました')
return copied_count > 0
def setup_training_data():
"""学習用ディレクトリのセットアップと正常画像準備"""
log_message('学習用データのセットアップを開始')
train_dir = './train_normal'
if not os.path.exists(train_dir):
os.makedirs(train_dir)
log_message(f'学習用ディレクトリ作成: {train_dir}')
print(f'学習用ディレクトリ {train_dir} を作成しました')
existing_images = [f for f in os.listdir(train_dir) if f.lower().endswith(('.jpg', '.png', '.jpeg'))]
log_message(f'既存画像数: {len(existing_images)}')
if len(existing_images) == 0:
log_message('正常画像が存在しないためサンプル画像を取得')
print('正常画像がありません。サンプル画像をダウンロード中...')
sample_urls = [
'https://github.com/opencv/opencv/raw/master/samples/data/fruits.jpg',
'https://github.com/opencv/opencv/raw/master/samples/data/messi5.jpg',
'https://github.com/opencv/opencv/raw/master/samples/data/aero3.jpg'
]
for i, url in enumerate(sample_urls):
filename = f'normal_sample_{i+1}.jpg'
filepath = os.path.join(train_dir, filename)
if safe_urlretrieve(url, filepath, desc=filename):
print(f'ダウンロード完了: {filename}')
else:
print(f'画像ダウンロードに失敗: {url}')
log_message('サンプル画像ダウンロード完了')
print('サンプル正常画像のダウンロードが完了しました')
while True:
current_files = [f for f in os.listdir(train_dir) if f.lower().endswith(('.jpg', '.png', '.jpeg'))]
print('\n--- 利用可能な正常画像 ---')
for filename in current_files:
print(filename)
print('\n選択してください:')
print('1. これらの画像を使用')
print('2. これらの画像を削除してカメラで撮影')
print('3. これらの画像に追加してカメラで撮影')
print('4. これらの画像を削除してPC内のファイルをコピーして使用')
choice = input('\n選択 (1/2/3/4): ')
log_message(f'画像使用選択: {choice}')
if choice == '2':
for filename in current_files:
filepath = os.path.join(train_dir, filename)
if os.path.exists(filepath):
os.remove(filepath)
log_message(f'画像削除: {filename}')
print('既存画像を削除しました')
capture_normal_images(train_dir)
break
elif choice == '3':
print('既存画像に追加してカメラ撮影します')
capture_normal_images(train_dir)
break
elif choice == '4':
if copy_files_from_pc(train_dir, current_files):
break
elif choice == '1':
log_message('既存画像を使用')
print('既存画像を使用します')
break
else:
print('無効な選択です。再度選択してください。')
log_message('学習用データのセットアップ完了')
return train_dir
def capture_normal_images(train_dir):
"""カメラで正常画像を撮影する"""
log_message('正常画像撮影モード開始')
print('\n=== 正常画像撮影モード ===')
print('\n撮影枚数の目安: 5枚以上(推奨10-30枚)')
print('操作: スペース=撮影, q=終了')
print('撮影開始します...')
# カメラ初期化
cap = open_camera_device(0)
if not cap.isOpened():
log_message('エラー: カメラ初期化に失敗')
print('エラー: カメラが開けません')
return 0
log_message('カメラ初期化完了')
capture_count = 0
font, use_japanese = load_japanese_font(DEFAULT_FONT_PATH, DEFAULT_FONT_SIZE)
try:
while True:
cap.grab()
ret, frame = cap.retrieve()
if not ret:
break
display_frame = frame.copy()
if use_japanese and font is not None:
img_pil = Image.fromarray(cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
draw.text((10, 10), f'撮影済み: {capture_count}枚', font=font, fill=(0, 255, 0))
draw.text((10, 50), 'スペース: 撮影, Q: 終了', font=font, fill=(255, 255, 255))
display_frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
else:
cv2.putText(display_frame, f'Captured: {capture_count}', (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(display_frame, 'SPACE: Capture, Q: Quit', (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.imshow('Normal Image Capture', display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord(' '):
capture_count += 1
filename = f'captured_normal_{capture_count:03d}.jpg'
filepath = os.path.join(train_dir, filename)
cv2.imwrite(filepath, frame)
log_message(f'正常画像保存: {filename}')
print(f'撮影完了: {filename}')
white_frame = np.ones_like(frame) * 255
cv2.imshow('Normal Image Capture', white_frame)
cv2.waitKey(100)
elif key == ord('q'):
log_message('正常画像撮影を終了')
break
finally:
cap.release()
cv2.destroyAllWindows()
log_message('カメラリソース解放')
log_message(f'正常画像撮影完了: 合計 {capture_count} 枚')
print(f'\n撮影完了: {capture_count}枚の正常画像を保存しました')
return capture_count
# ------------------------------------------------------------
# PaDiM 学習(位置依存・次元削減)
# ------------------------------------------------------------
def extract_embedding(img_bgr, model, device, out_hw=(64, 64)):
"""単一画像から多段特徴を抽出し同一解像度に補間して結合"""
model.eval()
x = preprocess_bgr_to_tensor(img_bgr).unsqueeze(0).to(device)
with torch.no_grad():
feats = model(x)
# 層名はモデルインスタンスのlayersに一元化
layer_names = getattr(model, 'layers', None)
if layer_names is None:
raise ValueError('モデルにlayers属性が存在しません')
missing = [name for name in layer_names if name not in feats]
if len(missing) > 0:
log_message(f'警告: 指定層がforward出力に存在しないためスキップ: {missing}')
use_names = [name for name in layer_names if name in feats]
if len(use_names) == 0:
raise ValueError('抽出対象の層が存在しません(model.layersとforward出力の不一致)')
# 解像度を統一
feat_list = [nn.functional.interpolate(feats[name], size=out_hw, mode='bilinear', align_corners=False) for name in use_names]
emb = torch.cat(feat_list, dim=1) # [1, C, H, W]
return emb.squeeze(0).cpu().numpy() # [C, H, W]
def train_padim(normal_imgs, model, device, d=100, seed=0):
"""PaDiMの位置依存モデリング学習。各位置の平均と共分散(の逆行列)を推定"""
log_message(f'PaDiM学習開始 - 正常画像数: {len(normal_imgs)}, デバイス: {device}')
embeddings = []
for i, img in enumerate(tqdm(normal_imgs, desc='特徴抽出中')):
emb = extract_embedding(img, model, device, out_hw=(64, 64)) # [C,H,W]
embeddings.append(emb)
if (i + 1) % 5 == 0:
log_message(f'特徴抽出進捗: {i + 1}/{len(normal_imgs)}')
arr = np.stack(embeddings, axis=0) # [N, C, H, W]
N, C, H, W = arr.shape
log_message(f'特徴マップサイズ: N={N}, C={C}, H={H}, W={W}')
# ランダムサブサンプリング
rng = np.random.RandomState(seed)
d_actual = min(d, C)
channel_idx = rng.choice(C, size=d_actual, replace=False)
arr = arr[:, channel_idx, :, :] # [N, d, H, W]
log_message(f'チャネルサブサンプリング: d={d_actual}')
# 位置ごとの平均と共分散の推定(LedoitWolf)
means = np.zeros((H * W, d_actual), dtype=np.float32)
icovs = np.zeros((H * W, d_actual, d_actual), dtype=np.float32)
for p in tqdm(range(H * W), desc='位置別分布推定中'):
y, x = divmod(p, W)
X = arr[:, :, y, x] # [N, d]
mu = X.mean(axis=0)
means[p] = mu.astype(np.float32)
# 共分散行列の推定と正則化
try:
if N > 1:
lw = LedoitWolf()
cov = lw.fit(X).covariance_
else:
cov = np.eye(d_actual, dtype=np.float32) * 0.1
except Exception as e:
log_message(f'共分散推定失敗(p={p}): {e}。対角行列で代替')
cov = np.eye(d_actual, dtype=np.float32) * 0.1
# 数値安定化の強化
cov = cov + 1e-4 * np.eye(d_actual, dtype=cov.dtype)
# 逆行列計算
try:
icov = np.linalg.inv(cov)
except np.linalg.LinAlgError:
# 擬似逆行列を使用
icov = np.linalg.pinv(cov)
icovs[p] = icov.astype(np.float32)
log_message('PaDiM学習完了')
return {
'H': H, 'W': W,
'channel_idx': channel_idx.astype(np.int64),
'means': means, # [H*W, d]
'icovs': icovs # [H*W, d, d]
}
# ------------------------------------------------------------
# 推論(位置依存マハラノビス距離マップ)
# ------------------------------------------------------------
def mahalanobis_map(img_bgr, model, stats, device, gaussian_ksize=5, gaussian_sigma=0):
"""各位置のマハラノビス距離を計算してスコアマップを返す"""
emb = extract_embedding(img_bgr, model, device, out_hw=(stats['H'], stats['W'])) # [C,H,W]
emb = emb[stats['channel_idx'], :, :] # [d,H,W]
d, H, W = emb.shape
emb_flat = emb.transpose(1, 2, 0).reshape(-1, d) # [H*W, d]
diff = emb_flat - stats['means'] # [H*W, d]
# dist_p = diff[p]^T * icovs[p] * diff[p]
dist = np.einsum('pd,pde,pe->p', diff, stats['icovs'], diff) # [H*W]
# 数値安定化: 負の値やNaNを処理
dist = np.maximum(dist, 0)
dist = np.nan_to_num(dist, nan=0.0, posinf=1000.0, neginf=0.0)
dist_map = dist.reshape(H, W)
# ガウシアンフィルタ(ノイズ低減)
if gaussian_ksize and gaussian_ksize > 1:
k = gaussian_ksize if gaussian_ksize % 2 == 1 else gaussian_ksize + 1
dist_map = cv2.GaussianBlur(dist_map.astype(np.float32), (k, k), gaussian_sigma)
return dist_map
# ------------------------------------------------------------
# 可視化と保存補助
# ------------------------------------------------------------
def setup_detection_directory():
"""異常検出領域保存用ディレクトリを作成"""
base_dir = './detected_regions'
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
detection_dir = os.path.join(base_dir, f'detection_{timestamp}')
if not os.path.exists(detection_dir):
os.makedirs(detection_dir)
log_message(f'異常検出画像保存ディレクトリ作成: {detection_dir}')
print(f'異常検出画像保存ディレクトリを作成しました: {detection_dir}')
return detection_dir
def visualize_anomaly(img_bgr, score_map, label='異常スコア', detection_dir=None, file_counter=None,
threshold_mode='std', k=2.0, q=0.995, thr_norm=0.6):
"""異常スコアヒートマップの重畳と領域の切り出し"""
score_map = cv2.resize(score_map, (img_bgr.shape[1], img_bgr.shape[0]))
score_min = float(np.min(score_map))
score_max = float(np.max(score_map))
if score_max - score_min > 0:
norm_map = (score_map - score_min) / (score_max - score_min)
else:
norm_map = np.zeros_like(score_map, dtype=np.float32)
heatmap = cv2.applyColorMap(np.uint8(255 * norm_map), cv2.COLORMAP_JET)
overlay = cv2.addWeighted(img_bgr, 0.6, heatmap, 0.4, 0)
# 二値化と領域抽出(方式選択可能)
if threshold_mode == 'quantile':
threshold_val = float(np.quantile(score_map, q))
_, binary_map = cv2.threshold(score_map.astype(np.float32), threshold_val, 255, cv2.THRESH_BINARY)
elif threshold_mode == 'normalized':
threshold_val = float(thr_norm)
_, binary_map = cv2.threshold(norm_map.astype(np.float32), threshold_val, 255, cv2.THRESH_BINARY)
else: # 'std' 既定
threshold_val = float(np.mean(score_map) + k * np.std(score_map))
_, binary_map = cv2.threshold(score_map.astype(np.float32), threshold_val, 255, cv2.THRESH_BINARY)
binary_map = np.uint8(binary_map)
kernel = np.ones((5, 5), np.uint8)
binary_map = cv2.morphologyEx(binary_map, cv2.MORPH_CLOSE, kernel)
binary_map = cv2.morphologyEx(binary_map, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(binary_map, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = cv2.contourArea(contour)
if area > img_bgr.shape[0] * img_bgr.shape[1] * 0.001:
x, y, w, h = cv2.boundingRect(contour)
cv2.rectangle(overlay, (x, y), (x + w, y + h), (0, 0, 255), 2)
region_scores = score_map[y:y+h, x:x+w]
if region_scores.size > 0:
max_region_score = float(np.max(region_scores))
cv2.putText(overlay, f'{max_region_score:.1f}', (x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
if detection_dir is not None and file_counter is not None:
cropped_region = img_bgr[y:y+h, x:x+w]
filename = f'{file_counter[0]:06d}.png'
filepath = os.path.join(detection_dir, filename)
cv2.imwrite(filepath, cropped_region)
log_message(f'異常領域保存: {filename} (スコア: {max_region_score:.1f})')
file_counter[0] += 1
# 日本語ラベル表示(Pillow+Meiryo)
font, jp_available = load_japanese_font(DEFAULT_FONT_PATH, DEFAULT_FONT_SIZE)
try:
if jp_available and font is not None:
img_pil = Image.fromarray(cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
draw.text((30, 30), label, font=font, fill=(0, 255, 0))
overlay = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
else:
raise RuntimeError('fallback')
except Exception:
cv2.putText(overlay, label, (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
return overlay
def video_frame_processing(frame):
"""動画フレーム処理関数"""
global frame_count
current_time = time.time()
frame_count += 1
# 推論実行
score_map = mahalanobis_map(frame, model, stats, device, gaussian_ksize=5, gaussian_sigma=0)
max_score = float(np.max(score_map))
processed_frame = visualize_anomaly(frame, score_map, f'異常スコア: {max_score:.2f}', detection_dir, file_counter)
result = f'異常スコア: {max_score:.4f}'
return processed_frame, result, current_time
# ------------------------------------------------------------
# 実行部
# ------------------------------------------------------------
log_message('=' * 50)
log_message('PaDiM異常検知システムを開始')
log_message('=' * 50)
print('\n=== PaDiM異常検知システム ===')
print('\n概要:')
print('PaDiMは正常画像から学習した特徴分布を用いて異常を検出する教師なし手法である。')
print('\n操作:')
print('1. CNNモデル選択(ResNet18/50, EfficientNet-B0)')
print('2. 正常画像準備(撮影またはファイル選択)')
print('3. 入力選択(動画ファイル/カメラ/サンプル)')
print('4. 再生中は q キーで終了')
print('\n注意:')
print('・正常画像は5枚以上(推奨10-30枚)')
print('・初回はサンプル画像を自動ダウンロード')
print('・GPUが利用可能な場合は自動使用')
log_message(f'使用デバイス: {device}')
model = select_model().to(device)
train_dir = setup_training_data()
train_imgs = []
for f in os.listdir(train_dir):
if f.lower().endswith(('.jpg', '.png', '.jpeg')):
img_path = os.path.join(train_dir, f)
img = cv2.imread(img_path)
if img is not None:
train_imgs.append(img)
if len(train_imgs) == 0:
log_message('エラー: 学習用画像が見つからない')
print('エラー: 学習用画像が見つかりません')
exit()
log_message(f'学習用画像読み込み完了: {len(train_imgs)}枚')
print(f'正常画像 {len(train_imgs)} 枚でモデルを学習中...')
stats = train_padim(train_imgs, model, device, d=100, seed=0)
print('\n異常検出画像の保存設定:')
print('検出された異常領域を切り抜いて画像として保存しますか?')
print('1: 保存する(連番で自動保存)')
print('2: 保存しない(表示のみ)')
save_choice = input('\n選択 (1/2): ')
log_message(f'異常検出画像保存設定: {save_choice}')
detection_dir = None
file_counter = None
if save_choice == '1':
detection_dir = setup_detection_directory()
file_counter = [1]
print('異常検出画像を保存します')
elif save_choice == '2':
log_message('異常検出画像は保存しない設定')
print('異常検出画像は保存しません(表示のみ)')
else:
log_message('無効な選択のため、保存しない設定を適用')
print('無効な選択です。異常検出画像は保存しません(表示のみ)')
# しきい値方式のログ(既定: std, k=2.0)
log_message('しきい値設定: mode=std, k=2.0, q=0.995, thr_norm=0.6')
print("0: 動画ファイル")
print("1: カメラ")
print("2: サンプル動画")
choice = input("選択: ")
frame_count = 0
results_log = []
if choice == '0':
root = get_hidden_tk_root()
path = filedialog.askopenfilename()
if not path:
exit()
cap = cv2.VideoCapture(path)
elif choice == '1':
cap = open_camera_device(0)
else:
# サンプル動画ダウンロード・処理(例外処理強化)
SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
SAMPLE_FILE = 'vtest.avi'
if safe_urlretrieve(SAMPLE_URL, SAMPLE_FILE, desc='サンプル動画'):
cap = cv2.VideoCapture(SAMPLE_FILE)
else:
print('サンプル動画のダウンロードに失敗しました。ファイルを選択してください。')
root = get_hidden_tk_root()
path = filedialog.askopenfilename()
if not path:
print('動画が選択されなかったため終了します')
exit()
cap = cv2.VideoCapture(path)
if not cap.isOpened():
print('動画ファイル・カメラを開けませんでした')
exit()
# メイン処理
print('\n=== 動画処理開始 ===')
print('操作方法:')
print(' q キー: プログラム終了')
try:
while True:
ret, frame = cap.read()
if not ret:
break
MAIN_FUNC_DESC = "PaDiM異常検知"
processed_frame, result, current_time = video_frame_processing(frame)
cv2.imshow(MAIN_FUNC_DESC, processed_frame)
if choice == '1': # カメラの場合
print(datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], result)
else: # 動画ファイルの場合
print(frame_count, result)
results_log.append(result)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
print('\n=== プログラム終了 ===')
cap.release()
cv2.destroyAllWindows()
if results_log:
with open('result.txt', 'w', encoding='utf-8') as f:
f.write('=== 結果 ===\n')
f.write(f'処理フレーム数: {frame_count}\n')
f.write(f'使用デバイス: {str(device).upper()}\n')
if device.type == 'cuda':
f.write(f'GPU: {torch.cuda.get_device_name(0)}\n')
f.write('\n')
f.write('\n'.join(results_log))
print(f'\n処理結果をresult.txtに保存しました')
if detection_dir is not None:
saved_files = [f for f in os.listdir(detection_dir) if f.endswith('.png')]
total_detections = len(saved_files)
if total_detections > 0:
log_message(f'異常検出画像保存: {detection_dir} に {total_detections} 個の画像を保存')
print(f'\n異常検出画像を {detection_dir} に保存しました(合計 {total_detections} 個)')
log_message('=' * 50)
log_message('PaDiM異常検知システムを終了')
log_message('=' * 50)