DeepLSDによる線分検出(ソースコードと実行結果)

Python開発環境,ライブラリ類

ここでは、最低限の事前準備について説明する。機械学習や深層学習を行う場合は、NVIDIA CUDA、Visual Studio、Cursorなどを追加でインストールすると便利である。これらについては別ページ https://www.kkaneko.jp/cc/dev/aiassist.htmlで詳しく解説しているので、必要に応じて参照してください。

Python 3.12 のインストール

インストール済みの場合は実行不要。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要である。

REM Python をシステム領域にインストール
winget install --scope machine --id Python.Python.3.12 -e --silent
REM Python のパス設定
set "PYTHON_PATH=C:\Program Files\Python312"
set "PYTHON_SCRIPTS_PATH=C:\Program Files\Python312\Scripts"
echo "%PATH%" | find /i "%PYTHON_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_PATH%" /M >nul
echo "%PATH%" | find /i "%PYTHON_SCRIPTS_PATH%" >nul
if errorlevel 1 setx PATH "%PATH%;%PYTHON_SCRIPTS_PATH%" /M >nul

関連する外部ページ

Python の公式ページ: https://www.python.org/

AI エディタ Windsurf のインストール

Pythonプログラムの編集・実行には、AI エディタの利用を推奨する。ここでは,Windsurfのインストールを説明する。

管理者権限でコマンドプロンプトを起動(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行して、Windsurfをシステム全体にインストールする。管理者権限は、wingetの--scope machineオプションでシステム全体にソフトウェアをインストールするために必要となる。

winget install --scope machine Codeium.Windsurf -e --silent

関連する外部ページ

Windsurf の公式ページ: https://windsurf.com/

必要なライブラリのインストール

コマンドプロンプトを管理者として実行(手順:Windowsキーまたはスタートメニュー > cmd と入力 > 右クリック > 「管理者として実行」)し、以下を実行する



DeepLSD線分検出プログラム

ソースコード


"""
DeepLSD線分検出プログラム
特徴技術名: DeepLSD (Deep Line Segment Detection)
出典: Pautrat, R., Barath, D. B., Larsson, V., Oswald, M. R., & Pollefeys, M. (2023). DeepLSD: Line Segment Detection and Refinement with Deep Image Gradients. In Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern Recognition (pp. 17327-17336).
特徴機能: Line Attraction Field生成による高精度線分検出。深層学習で線分引力場を生成し、それをサロゲート画像勾配に変換して従来の手法と組み合わせることで、ノイズの多い画像や困難な条件でも頑健かつ高精度な線分検出を実現
学習済みモデル: deeplsd_md.tar(MegaDepthデータセットで学習、屋外および困難なシーンに対応、約100MB、https://cvg-data.inf.ethz.ch/DeepLSD/deeplsd_md.tar)
方式設計:
  関連利用技術:
    - PyTorch(深層学習フレームワーク、テンソル演算とGPU加速処理)
    - OpenCV(画像処理、GUI表示、動画・カメラ入力処理)
    - NumPy(数値計算、配列操作)
  入力と出力:
    入力: 動画(ユーザは「0:動画ファイル,1:カメラ,2:サンプル動画」のメニューで選択.0:動画ファイルの場合はtkinterでファイル選択.1の場合はOpenCVでカメラが開く.2の場合はhttps://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.aviを使用)
    出力: 処理結果が画像化できる場合にはOpenCV画面でリアルタイムに表示。OpenCV画面内に処理結果をテキストで表示。さらに、print()で処理結果を表示。プログラム終了時にprint()で表示した処理結果をresult.txtファイルに保存し、「result.txtに保存」したことをprint()で表示。プログラム開始時に、プログラムの概要、ユーザが行う必要がある操作をprint()で表示
  処理手順:
    1. 入力画像をグレースケールに変換
    2. 画像をテンソル形式に変換してGPU/CPUに転送
    3. DeepLSDモデルで線分引力場を生成
    4. 引力場から線分座標を抽出
    5. 検出線分を元画像に描画して結果表示
  前処理: グレースケール変換、画像正規化(0-1範囲)、テンソル変換によりDeepLSDモデルの精度向上
  後処理: 線分座標の整数変換、重複線分の除去、結果可視化により実用性向上
  追加処理: PyTorch 2.6対応の安全な重みロード処理(weights_only=False、safe_globals使用)によりモデル互換性確保
  調整を必要とする設定値:
    - line_neighborhood(線分近傍パラメータ、デフォルト5、線分検出の局所性制御)
    - grad_thresh(勾配閾値、デフォルト3、線分検出感度調整)
将来方策: 設定値の自動調整のため、画像品質解析機能と動適応的パラメータ調整システムの実装
その他の重要事項: PyTorch 2.6での重みロード互換性対応、プログレスバー付きダウンロード機能
前準備: pip install -U torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
pip install opencv-python
pip install git+https://github.com/cvg/DeepLSD.git
"""

import cv2
import torch
import numpy as np
import time
from datetime import datetime
import tkinter as tk
from tkinter import filedialog
import urllib.request
import subprocess
import sys
import os

# GPU/CPU自動選択
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'デバイス: {device}')

class DownloadProgressBar:
    def __init__(self):
        self.start_time = None
        self.last_update = 0

    def __call__(self, block_num, block_size, total_size):
        if self.start_time is None:
            self.start_time = time.time()

        current_time = time.time()

        if current_time - self.last_update < 1.0 and block_num * block_size < total_size:
            return

        self.last_update = current_time

        downloaded = block_num * block_size
        if downloaded > total_size:
            downloaded = total_size

        def format_size(size):
            for unit in ['B', 'KB', 'MB', 'GB']:
                if size < 1024.0:
                    return f"{size:.1f}{unit}"
                size /= 1024.0
            return f"{size:.1f}TB"

        if total_size > 0:
            progress = (downloaded / total_size) * 100
            progress_bar_length = 50
            filled_length = int(progress_bar_length * downloaded // total_size)
            bar = '█' * filled_length + '-' * (progress_bar_length - filled_length)

            elapsed_time = current_time - self.start_time
            if elapsed_time > 0:
                speed = downloaded / elapsed_time
                speed_str = f"{format_size(speed)}/s"

                if speed > 0:
                    remaining_time = (total_size - downloaded) / speed
                    if remaining_time < 60:
                        eta = f"{remaining_time:.0f}秒"
                    elif remaining_time < 3600:
                        eta = f"{remaining_time/60:.1f}分"
                    else:
                        eta = f"{remaining_time/3600:.1f}時間"
                else:
                    eta = "不明"
            else:
                speed_str = "計算中..."
                eta = "不明"

            print(f"\r[{bar}] {progress:.1f}% ({format_size(downloaded)}/{format_size(total_size)}) {speed_str} ETA: {eta}", end='', flush=True)
        else:
            print(f"\rダウンロード中... {format_size(downloaded)}", end='', flush=True)

class DeepLSDDetector:
    def __init__(self):
        self.device = device
        self.model = None
        self.setup_model()

    def setup_model(self):
        """モデルのセットアップ"""
        self.install_deeplsd()
        self.download_pretrained_weights()

        from deeplsd.models.deeplsd_inference import DeepLSD

        conf = {
            'line_neighborhood': 5,
            'multiscale': False,
            'scale_factors': [1., 1.5],
            'detect_lines': True,
            'line_detection_params': {
                'merge': False,
                'grad_nfa': True,
                'filtering': 'normal',
                'grad_thresh': 3,
            },
        }

        self.model = DeepLSD(conf).to(self.device)

        weights_path = 'deeplsd_md.tar'
        if os.path.exists(weights_path):
            print(f"事前学習済み重みをロード中: {weights_path}")

            try:
                # PyTorch 2.6対応: weights_only=Falseを明示的に指定
                checkpoint = torch.load(weights_path, map_location=self.device, weights_only=False)
                print("checkpointロード成功")

            except Exception as e:
                print(f"重みロードエラー: {e}")
                print("手動で重みファイルを確認してください")
                exit()

            try:
                if 'model' in checkpoint:
                    state_dict = checkpoint['model']
                elif 'state_dict' in checkpoint:
                    state_dict = checkpoint['state_dict']
                else:
                    state_dict = checkpoint

                if hasattr(state_dict, '_content'):
                    state_dict = dict(state_dict)

                self.model.load_state_dict(state_dict)
                print("事前学習済み重みのロード完了")

            except Exception as e:
                print(f"state_dictロードエラー: {e}")
                exit()

        else:
            print("警告: 事前学習済み重みが見つかりません。検出精度が低下する可能性があります。")

        self.model.eval()
        print("DeepLSDモデルのセットアップ完了")

    def download_pretrained_weights(self):
        """事前学習済み重みのダウンロード"""
        weights_url = 'https://cvg-data.inf.ethz.ch/DeepLSD/deeplsd_md.tar'
        weights_path = 'deeplsd_md.tar'

        if not os.path.exists(weights_path):
            print("事前学習済み重みをダウンロード中...")
            print(f"URL: {weights_url}")
            print(f"保存先: {weights_path}")
            print()

            try:
                progress_bar = DownloadProgressBar()
                urllib.request.urlretrieve(weights_url, weights_path, reporthook=progress_bar)
                print()
                print("ダウンロード完了!")

                file_size = os.path.getsize(weights_path)
                print(f"ファイルサイズ: {file_size / (1024*1024):.1f}MB")

            except Exception as e:
                print(f"\nダウンロード失敗: {e}")
                print("手動でダウンロードしてください:")
                print(f"wget {weights_url}")
                exit()
        else:
            file_size = os.path.getsize(weights_path)
            print(f"事前学習済み重みファイルが見つかりました ({file_size / (1024*1024):.1f}MB)")

    def install_deeplsd(self):
        """DeepLSDライブラリのインストール"""
        try:
            import deeplsd
            print("DeepLSDライブラリは既にインストールされています")
        except ImportError:
            print("DeepLSDライブラリをインストール中...")
            subprocess.check_call([sys.executable, "-m", "pip", "install",
                                 "git+https://github.com/cvg/DeepLSD.git"])
            print("インストール完了")

    def detect_lines(self, frame):
        """フレームから線分を検出"""
        if len(frame.shape) == 3:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        else:
            gray = frame

        tensor_image = torch.from_numpy(gray).float().unsqueeze(0).unsqueeze(0)
        tensor_image = tensor_image.to(self.device) / 255.0

        with torch.no_grad():
            data = {'image': tensor_image}
            output = self.model(data)

            if 'lines' in output and len(output['lines']) > 0:
                lines = output['lines'][0]

                if isinstance(lines, np.ndarray) and len(lines) > 0:
                    if lines.ndim == 3 and lines.shape[-1] == 2:
                        lines = lines.reshape(-1, 4)
                    elif lines.ndim == 2 and lines.shape[1] == 4:
                        pass
                    else:
                        lines = np.array([])
                else:
                    lines = np.array([])
            else:
                lines = np.array([])

        return lines

print("=" * 60)
print("DeepLSD線分検出プログラム")
print("=" * 60)
detector = DeepLSDDetector()
print("=" * 60)

frame_count = 0
results_log = []

def video_frame_processing(frame):
    global frame_count
    current_time = time.time()
    frame_count += 1

    lines = detector.detect_lines(frame)

    processed_frame = frame.copy()
    if len(lines) > 0:
        for line in lines:
            x1, y1, x2, y2 = line.astype(int)
            cv2.line(processed_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

    result = f"検出線分数: {len(lines)}"
    return processed_frame, result, current_time

print("入力ソースを選択してください:")
print("0: 動画ファイル")
print("1: カメラ")
print("2: サンプル動画")

choice = input("選択: ")

if choice == '0':
    root = tk.Tk()
    root.withdraw()
    path = filedialog.askopenfilename()
    if not path:
        exit()
    cap = cv2.VideoCapture(path)
elif choice == '1':
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
    if not cap.isOpened():
        cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
else:
    SAMPLE_URL = 'https://raw.githubusercontent.com/opencv/opencv/master/samples/data/vtest.avi'
    SAMPLE_FILE = 'vtest.avi'
    print(f"サンプル動画をダウンロード中: {SAMPLE_URL}")
    urllib.request.urlretrieve(SAMPLE_URL, SAMPLE_FILE)
    cap = cv2.VideoCapture(SAMPLE_FILE)

if not cap.isOpened():
    print('動画ファイル・カメラを開けませんでした')
    exit()

print('\n=== 動画処理開始 ===')
print('操作方法:')
print('  q キー: プログラム終了')
try:
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        MAIN_FUNC_DESC = "線分検出"
        processed_frame, result, current_time = video_frame_processing(frame)
        cv2.imshow(MAIN_FUNC_DESC, processed_frame)
        if choice == '1':
            print(datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], result)
        else:
            print(frame_count, result)
        results_log.append(result)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    print('\n=== プログラム終了 ===')
    cap.release()
    cv2.destroyAllWindows()
    if results_log:
        with open('result.txt', 'w', encoding='utf-8') as f:
            f.write('=== 結果 ===\n')
            f.write(f'処理フレーム数: {frame_count}\n')
            f.write(f'使用デバイス: {str(device).upper()}\n')
            if device.type == 'cuda':
                f.write(f'GPU: {torch.cuda.get_device_name(0)}\n')
            f.write('\n')
            f.write('\n'.join(results_log))
        print(f'\n処理結果をresult.txtに保存しました')