diff --git "a/docs/03_TECH/TECH_04_\347\267\232\346\244\234\345\207\272\347\262\276\345\272\246\345\220\221\344\270\212\346\226\271\351\207\235.txt" "b/docs/03_TECH/TECH_04_\347\267\232\346\244\234\345\207\272\347\262\276\345\272\246\345\220\221\344\270\212\346\226\271\351\207\235.txt" index f715361..a8dd2ce 100644 --- "a/docs/03_TECH/TECH_04_\347\267\232\346\244\234\345\207\272\347\262\276\345\272\246\345\220\221\344\270\212\346\226\271\351\207\235.txt" +++ "b/docs/03_TECH/TECH_04_\347\267\232\346\244\234\345\207\272\347\262\276\345\272\246\345\220\221\344\270\212\346\226\271\351\207\235.txt" @@ -612,7 +612,7 @@ (デフォルト: 7,0 で無効) - neighbor_thresh: 近傍除去の閾値 px (デフォルト: 10.0,0 で無効) - ・実装: _clean_and_fit() 関数(line_detector.py) + ・実装: clean_and_fit() 関数(fitting.py) ・備考: RANSAC と併用可能.RANSAC が有効な場合は メディアン → 近傍除去 → RANSAC の順に適用される @@ -886,13 +886,18 @@ 11-5. 実装ファイル - ・src/pc/vision/line_detector.py + ・src/pc/vision/line_detector.py(PC 側) + ・src/pi/vision/line_detector.py(Pi 側) - ValleyTracker クラス: 時系列追跡の状態管理 - _find_row_valley(): 1行の谷検出 - _detect_valley(): 案Dのメイン処理 - _build_valley_binary(): デバッグ用二値画像生成 - reset_valley_tracker(): 追跡状態のリセット - ・src/pc/gui/main_window.py + ・src/pc/vision/detectors/valley.py(PC 側) + ・src/pi/vision/detectors/valley.py(Pi 側) + - 谷検出の実装 + ・src/pc/gui/panels/image_param_panel.py - 案D用パラメータの GUI コントロール - ・src/pc/steering/pd_control.py + ・src/pc/steering/pd_control.py(PC 側) + ・src/pi/steering/pd_control.py(Pi 側) - reset() 時に追跡状態もリセット diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index c1294e2..2bd1fb4 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -40,12 +40,31 @@ 2-2. src/common/ common/ - ├── config.py - └── json_utils.py JSON 読み書き共通ユーティリティ + ├── config.py ネットワーク・画像・通信設定 + ├── json_utils.py JSON 読み書き共通ユーティリティ + ├── vision/ 画像処理(PC・Pi 共通) + │ ├── line_detector.py 線検出 API(データクラス・手法ディスパッチ) + │ ├── fitting.py 直線・曲線近似(Theil-Sen・RANSAC・外れ値除去) + │ ├── morphology.py 形態学的処理ユーティリティ + │ ├── intersection.py 十字路分類モデルの推論 + │ └── detectors/ 検出手法の実装 + │ ├── current.py 現行(CLAHE + 固定閾値) + │ ├── blackhat.py 案A(Black-hat 中心) + │ ├── dual_norm.py 案B(二重正規化) + │ ├── robust.py 案C(最高ロバスト) + │ └── valley.py 案D(谷検出+追跡) + └── steering/ 操舵量計算(PC・Pi 共通) + ├── base.py 共通インターフェース + ├── pd_control.py PD 制御の実装 + ├── pursuit_control.py 2点パシュート制御の実装 + ├── ts_pd_control.py Theil-Sen PD 制御の実装 + └── recovery.py コースアウト復帰 - ・PC・Pi 間で共有する設定値・ユーティリティを定義する. + ・PC・Pi 間で共有する設定値・画像処理・操舵量計算を定義する. ・config.py: ネットワーク設定,画像フォーマット,通信設定等. ・json_utils.py: JSON ファイル読み書きとパラメータディレクトリの定義. + ・vision/: 線検出パイプライン・検出手法・十字路分類を共通化. + ・steering/: PD 制御・パシュート制御・Theil-Sen PD 制御・復帰制御を共通化. 2-3. src/pc/ @@ -69,26 +88,11 @@ │ ├── reviewer.py 仕分けレビュー GUI │ ├── dataset.py データ読み込み │ └── train.py モデル学習・評価・保存 - ├── steering/ 操舵量計算(パラメータ保存・復元用) - │ ├── base.py 共通インターフェース - │ ├── pd_control.py PD 制御の実装 - │ ├── pursuit_control.py 2点パシュート制御の実装 - │ ├── ts_pd_control.py Theil-Sen PD 制御の実装 + ├── steering/ PC 固有の操舵パラメータ管理 │ ├── param_store.py プリセット保存・読み込み - │ ├── recovery.py コースアウト復帰パラメータ │ └── auto_params.py パラメータ自動保存・復元 - └── vision/ 画像処理(PC 側は表示・データ収集用) - ├── line_detector.py 線検出 API(データクラス・手法ディスパッチ) - ├── fitting.py 直線・曲線近似(Theil-Sen・RANSAC・外れ値除去) - ├── morphology.py 形態学的処理ユーティリティ - ├── intersection.py 十字路分類モデルの推論 - ├── overlay.py デバッグオーバーレイ描画 - └── detectors/ 検出手法の実装 - ├── current.py 現行(CLAHE + 固定閾値) - ├── blackhat.py 案A(Black-hat 中心) - ├── dual_norm.py 案B(二重正規化) - ├── robust.py 案C(最高ロバスト) - └── valley.py 案D(谷検出+追跡) + └── vision/ PC 固有の画像処理 + └── overlay.py デバッグオーバーレイ描画 2-4. src/pi/ @@ -98,22 +102,5 @@ │ └── zmq_client.py ZMQ テレメトリ送信・コマンド受信 ├── camera/ カメラ関連 │ └── capture.py フレーム取得 - ├── motor/ モーター関連 - │ └── driver.py TB6612FNG 制御 - ├── vision/ 画像処理(Pi 側で実行) - │ ├── line_detector.py 線検出 API(手法ディスパッチ) - │ ├── fitting.py 直線・曲線近似(Theil-Sen・RANSAC) - │ ├── morphology.py 形態学的処理ユーティリティ - │ ├── intersection.py 十字路分類モデルの推論 - │ └── detectors/ 検出手法の実装 - │ ├── current.py 現行(CLAHE + 固定閾値) - │ ├── blackhat.py 案A(Black-hat 中心) - │ ├── dual_norm.py 案B(二重正規化) - │ ├── robust.py 案C(最高ロバスト) - │ └── valley.py 案D(谷検出+追跡) - └── steering/ 操舵量計算(Pi 側で実行) - ├── base.py 共通インターフェース - ├── pd_control.py PD 制御の実装 - ├── pursuit_control.py 2点パシュート制御の実装 - ├── ts_pd_control.py Theil-Sen PD 制御の実装 - └── recovery.py コースアウト復帰 + └── motor/ モーター関連 + └── driver.py TB6612FNG 制御 diff --git a/src/common/steering/__init__.py b/src/common/steering/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/common/steering/__init__.py diff --git a/src/common/steering/base.py b/src/common/steering/base.py new file mode 100644 index 0000000..40eff1a --- /dev/null +++ b/src/common/steering/base.py @@ -0,0 +1,50 @@ +""" +base +操舵量計算の共通インターフェースを定義するモジュール +全ての操舵量計算クラスはこのインターフェースに従う +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class SteeringOutput: + """操舵量計算の出力を格納するデータクラス + + Attributes: + throttle: 前後方向の出力 (-1.0 ~ +1.0) + steer: 左右方向の出力 (-1.0 ~ +1.0) + """ + throttle: float + steer: float + + +class SteeringBase(ABC): + """操舵量計算の基底クラス + + 全ての操舵量計算クラスはこのクラスを継承し, + compute メソッドを実装する + """ + + @abstractmethod + def compute( + self, frame: np.ndarray, + ) -> SteeringOutput: + """カメラ画像から操舵量を計算する + + Args: + frame: BGR 形式のカメラ画像 + + Returns: + 計算された操舵量 + """ + + @abstractmethod + def reset(self) -> None: + """内部状態をリセットする + + 自動操縦の開始時に呼び出される + """ diff --git a/src/common/steering/pd_control.py b/src/common/steering/pd_control.py new file mode 100644 index 0000000..ec31ada --- /dev/null +++ b/src/common/steering/pd_control.py @@ -0,0 +1,138 @@ +""" +pd_control +PD 制御による操舵量計算モジュール +多項式フィッティングの位置・傾き・曲率から操舵量と速度を算出する +""" + +import time +from dataclasses import dataclass + +import numpy as np + +from common.steering.base import SteeringBase, SteeringOutput +from common.vision.line_detector import ( + ImageParams, + LineDetectResult, + detect_line, + reset_valley_tracker, +) + + +@dataclass +class PdParams: + """PD 制御のパラメータ + + Attributes: + kp: 位置偏差ゲイン + kh: 傾き(ヘディング)ゲイン + kd: 微分ゲイン + max_steer_rate: 1フレームあたりの最大操舵変化量 + max_throttle: 直線での最大速度 + speed_k: 曲率ベースの減速係数 + """ + kp: float = 0.5 + kh: float = 0.3 + kd: float = 0.1 + max_steer_rate: float = 0.1 + max_throttle: float = 0.4 + speed_k: float = 0.3 + + +class PdControl(SteeringBase): + """PD 制御による操舵量計算クラス""" + + def __init__( + self, + params: PdParams | None = None, + image_params: ImageParams | None = None, + ) -> None: + self.params: PdParams = params or PdParams() + self.image_params: ImageParams = ( + image_params or ImageParams() + ) + self._prev_error: float = 0.0 + self._prev_time: float = 0.0 + self._prev_steer: float = 0.0 + self._last_result: LineDetectResult | None = None + + def compute( + self, frame: np.ndarray, + ) -> SteeringOutput: + """カメラ画像から PD 制御で操舵量を計算する + + Args: + frame: グレースケールのカメラ画像 + + Returns: + 計算された操舵量 + """ + p = self.params + + # 線検出 + result = detect_line(frame, self.image_params) + self._last_result = result + + # 線が検出できなかった場合は停止 + if not result.detected: + return SteeringOutput( + throttle=0.0, steer=0.0, + ) + + # 位置偏差 + 傾きによる操舵量 + error = ( + p.kp * result.position_error + + p.kh * result.heading + ) + + # 時間差分の計算 + now = time.time() + dt = ( + now - self._prev_time + if self._prev_time > 0 + else 0.033 + ) + dt = max(dt, 0.001) + + # 微分項 + derivative = (error - self._prev_error) / dt + steer = error + p.kd * derivative + + # 操舵量のクランプ + steer = max(-1.0, min(1.0, steer)) + + # レートリミッター(急な操舵変化を制限) + delta = steer - self._prev_steer + max_delta = p.max_steer_rate + delta = max(-max_delta, min(max_delta, delta)) + steer = self._prev_steer + delta + + # 速度制御(曲率連動) + throttle = ( + p.max_throttle + - p.speed_k * abs(result.curvature) + ) + throttle = max(0.0, throttle) + + # 状態の更新 + self._prev_error = error + self._prev_time = now + self._prev_steer = steer + + return SteeringOutput( + throttle=throttle, steer=steer, + ) + + def reset(self) -> None: + """内部状態をリセットする""" + self._prev_error = 0.0 + self._prev_time = 0.0 + self._prev_steer = 0.0 + self._last_result = None + reset_valley_tracker() + + @property + def last_detect_result( + self, + ) -> LineDetectResult | None: + """直近の線検出結果を取得する""" + return self._last_result diff --git a/src/common/steering/pursuit_control.py b/src/common/steering/pursuit_control.py new file mode 100644 index 0000000..d576cf3 --- /dev/null +++ b/src/common/steering/pursuit_control.py @@ -0,0 +1,147 @@ +""" +pursuit_control +2点パシュートによる操舵量計算モジュール +行中心点に Theil-Sen 直線近似を適用し,外れ値に強い操舵量を算出する +""" + +from dataclasses import dataclass + +import numpy as np + +from common import config +from common.steering.base import SteeringBase, SteeringOutput +from common.vision.fitting import theil_sen_fit +from common.vision.line_detector import ( + ImageParams, + LineDetectResult, + detect_line, + reset_valley_tracker, +) + + +@dataclass +class PursuitParams: + """2点パシュート制御のパラメータ + + Attributes: + near_ratio: 近い目標点の位置(0.0=上端,1.0=下端) + far_ratio: 遠い目標点の位置(0.0=上端,1.0=下端) + k_near: 近い目標点の操舵ゲイン + k_far: 遠い目標点の操舵ゲイン + max_steer_rate: 1フレームあたりの最大操舵変化量 + max_throttle: 直線での最大速度 + speed_k: カーブ減速係数(2点の差に対する係数) + """ + near_ratio: float = 0.8 + far_ratio: float = 0.3 + k_near: float = 0.5 + k_far: float = 0.3 + max_steer_rate: float = 0.1 + max_throttle: float = 0.4 + speed_k: float = 2.0 + + +class PursuitControl(SteeringBase): + """2点パシュートによる操舵量計算クラス + + 行中心点から Theil-Sen 直線近似を行い, + 直線上の近い点と遠い点の偏差から操舵量を計算する + """ + + def __init__( + self, + params: PursuitParams | None = None, + image_params: ImageParams | None = None, + ) -> None: + self.params: PursuitParams = ( + params or PursuitParams() + ) + self.image_params: ImageParams = ( + image_params or ImageParams() + ) + self._prev_steer: float = 0.0 + self._last_result: LineDetectResult | None = None + + def compute( + self, frame: np.ndarray, + ) -> SteeringOutput: + """カメラ画像から2点パシュートで操舵量を計算する + + Args: + frame: グレースケールのカメラ画像 + + Returns: + 計算された操舵量 + """ + p = self.params + + # 線検出 + result = detect_line(frame, self.image_params) + self._last_result = result + + if not result.detected or result.row_centers is None: + return SteeringOutput( + throttle=0.0, steer=0.0, + ) + + centers = result.row_centers + + # 有効な点(NaN でない行)を抽出 + valid = ~np.isnan(centers) + ys = np.where(valid)[0].astype(float) + xs = centers[valid] + + if len(ys) < 2: + return SteeringOutput( + throttle=0.0, steer=0.0, + ) + + # Theil-Sen 直線近似 + slope, intercept = theil_sen_fit(ys, xs) + + center_x = config.FRAME_WIDTH / 2.0 + h = len(centers) + + # 直線上の 2 点の x 座標を取得 + near_y = h * p.near_ratio + far_y = h * p.far_ratio + near_x = slope * near_y + intercept + far_x = slope * far_y + intercept + + # 各点の偏差(正: 線が左にある → 右に曲がる) + near_err = (center_x - near_x) / center_x + far_err = (center_x - far_x) / center_x + + # 操舵量 + steer = p.k_near * near_err + p.k_far * far_err + steer = max(-1.0, min(1.0, steer)) + + # レートリミッター + delta = steer - self._prev_steer + max_delta = p.max_steer_rate + delta = max(-max_delta, min(max_delta, delta)) + steer = self._prev_steer + delta + + # 速度制御(2点の x 差でカーブ度合いを判定) + curve = abs(near_x - far_x) / center_x + throttle = p.max_throttle - p.speed_k * curve + throttle = max(0.0, throttle) + + self._prev_steer = steer + + return SteeringOutput( + throttle=throttle, steer=steer, + ) + + def reset(self) -> None: + """内部状態をリセットする""" + self._prev_steer = 0.0 + self._last_result = None + reset_valley_tracker() + + @property + def last_detect_result( + self, + ) -> LineDetectResult | None: + """直近の線検出結果を取得する""" + return self._last_result diff --git a/src/common/steering/recovery.py b/src/common/steering/recovery.py new file mode 100644 index 0000000..57087bf --- /dev/null +++ b/src/common/steering/recovery.py @@ -0,0 +1,104 @@ +""" +recovery +コースアウト復帰のパラメータと判定ロジックを定義するモジュール +黒線を一定時間検出できなかった場合に, +最後に検出した方向へ旋回しながら走行して復帰する +""" + +import time +from dataclasses import dataclass + +from common.steering.base import SteeringOutput + + +@dataclass +class RecoveryParams: + """コースアウト復帰のパラメータ + + Attributes: + enabled: 復帰機能の有効/無効 + timeout_sec: 線を見失ってから復帰動作を開始するまでの時間 + steer_amount: 復帰時の操舵量(0.0 ~ 1.0) + throttle: 復帰時の速度(負: 後退,正: 前進) + """ + enabled: bool = True + timeout_sec: float = 0.5 + steer_amount: float = 0.5 + throttle: float = -0.3 + + +class RecoveryController: + """コースアウト復帰の判定と操舵量算出を行うクラス + + 自動操縦中にフレームごとに呼び出し, + 線検出の成否を記録する.一定時間検出できなかった場合に + 復帰用の操舵量を返す + """ + + def __init__( + self, + params: RecoveryParams | None = None, + ) -> None: + self.params: RecoveryParams = ( + params or RecoveryParams() + ) + self._last_detected_time: float = 0.0 + self._last_error_sign: float = 0.0 + self._is_recovering: bool = False + + def reset(self) -> None: + """内部状態をリセットする + + 自動操縦の開始時に呼び出す + """ + self._last_detected_time = time.time() + self._last_error_sign = 0.0 + self._is_recovering = False + + def update( + self, + detected: bool, + position_error: float = 0.0, + ) -> SteeringOutput | None: + """検出結果を記録し,復帰が必要なら操舵量を返す + + Args: + detected: 線が検出できたか + position_error: 検出時の位置偏差(正: 線が左) + + Returns: + 復帰操舵量,または None(通常走行を継続) + """ + if not self.params.enabled: + return None + + now = time.time() + + if detected: + self._last_detected_time = now + if position_error != 0.0: + self._last_error_sign = ( + 1.0 if position_error > 0 else -1.0 + ) + self._is_recovering = False + return None + + elapsed = now - self._last_detected_time + if elapsed < self.params.timeout_sec: + return None + + # 復帰モード: 最後に検出した方向へ旋回 + self._is_recovering = True + steer = ( + self._last_error_sign + * self.params.steer_amount + ) + return SteeringOutput( + throttle=self.params.throttle, + steer=steer, + ) + + @property + def is_recovering(self) -> bool: + """現在復帰動作中かどうかを返す""" + return self._is_recovering diff --git a/src/common/steering/ts_pd_control.py b/src/common/steering/ts_pd_control.py new file mode 100644 index 0000000..30944c3 --- /dev/null +++ b/src/common/steering/ts_pd_control.py @@ -0,0 +1,162 @@ +""" +ts_pd_control +Theil-Sen 直線近似による PD 制御モジュール +行中心点に Theil-Sen 直線をフィッティングし, +位置偏差・傾き・微分項から操舵量を算出する +""" + +import time +from dataclasses import dataclass + +import numpy as np + +from common import config +from common.steering.base import SteeringBase, SteeringOutput +from common.vision.fitting import theil_sen_fit +from common.vision.line_detector import ( + ImageParams, + LineDetectResult, + detect_line, + reset_valley_tracker, +) + + +@dataclass +class TsPdParams: + """Theil-Sen PD 制御のパラメータ + + Attributes: + kp: 位置偏差ゲイン + kh: 傾き(Theil-Sen slope)ゲイン + kd: 微分ゲイン + max_steer_rate: 1フレームあたりの最大操舵変化量 + max_throttle: 直線での最大速度 + speed_k: 傾きベースの減速係数 + """ + kp: float = 0.5 + kh: float = 0.3 + kd: float = 0.1 + max_steer_rate: float = 0.1 + max_throttle: float = 0.4 + speed_k: float = 2.0 + + +class TsPdControl(SteeringBase): + """Theil-Sen 直線近似による PD 制御クラス + + 行中心点から Theil-Sen 直線近似を行い, + 画像下端での位置偏差と直線の傾きから PD 制御で操舵量を計算する + """ + + def __init__( + self, + params: TsPdParams | None = None, + image_params: ImageParams | None = None, + ) -> None: + self.params: TsPdParams = ( + params or TsPdParams() + ) + self.image_params: ImageParams = ( + image_params or ImageParams() + ) + self._prev_error: float = 0.0 + self._prev_time: float = 0.0 + self._prev_steer: float = 0.0 + self._last_result: LineDetectResult | None = None + + def compute( + self, frame: np.ndarray, + ) -> SteeringOutput: + """カメラ画像から Theil-Sen PD 制御で操舵量を計算する + + Args: + frame: グレースケールのカメラ画像 + + Returns: + 計算された操舵量 + """ + p = self.params + + # 線検出 + result = detect_line(frame, self.image_params) + self._last_result = result + + if not result.detected or result.row_centers is None: + return SteeringOutput( + throttle=0.0, steer=0.0, + ) + + centers = result.row_centers + + # 有効な点(NaN でない行)を抽出 + valid = ~np.isnan(centers) + ys = np.where(valid)[0].astype(float) + xs = centers[valid] + + if len(ys) < 2: + return SteeringOutput( + throttle=0.0, steer=0.0, + ) + + # Theil-Sen 直線近似: x = slope * y + intercept + slope, intercept = theil_sen_fit(ys, xs) + + center_x = config.FRAME_WIDTH / 2.0 + h = len(centers) + + # 画像下端での位置偏差 + bottom_x = slope * (h - 1) + intercept + position_error = (center_x - bottom_x) / center_x + + # 操舵量: P 項(位置偏差)+ Heading 項(傾き) + error = p.kp * position_error + p.kh * slope + + # 時間差分の計算 + now = time.time() + dt = ( + now - self._prev_time + if self._prev_time > 0 + else 0.033 + ) + dt = max(dt, 0.001) + + # D 項(微分項) + derivative = (error - self._prev_error) / dt + steer = error + p.kd * derivative + + # 操舵量のクランプ + steer = max(-1.0, min(1.0, steer)) + + # レートリミッター + delta = steer - self._prev_steer + max_delta = p.max_steer_rate + delta = max(-max_delta, min(max_delta, delta)) + steer = self._prev_steer + delta + + # 速度制御(傾きベース) + throttle = p.max_throttle - p.speed_k * abs(slope) + throttle = max(0.0, throttle) + + # 状態の更新 + self._prev_error = error + self._prev_time = now + self._prev_steer = steer + + return SteeringOutput( + throttle=throttle, steer=steer, + ) + + def reset(self) -> None: + """内部状態をリセットする""" + self._prev_error = 0.0 + self._prev_time = 0.0 + self._prev_steer = 0.0 + self._last_result = None + reset_valley_tracker() + + @property + def last_detect_result( + self, + ) -> LineDetectResult | None: + """直近の線検出結果を取得する""" + return self._last_result diff --git a/src/common/vision/__init__.py b/src/common/vision/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/common/vision/__init__.py diff --git a/src/common/vision/detectors/__init__.py b/src/common/vision/detectors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/common/vision/detectors/__init__.py diff --git a/src/common/vision/detectors/blackhat.py b/src/common/vision/detectors/blackhat.py new file mode 100644 index 0000000..0860cd9 --- /dev/null +++ b/src/common/vision/detectors/blackhat.py @@ -0,0 +1,69 @@ +""" +blackhat +案A: Black-hat 中心型の線検出 +Black-hat 変換で背景より暗い構造を直接抽出し, +固定閾値 + 距離変換 + 行ごと中心抽出で検出する +""" + +import cv2 +import numpy as np + +from common.vision.line_detector import ( + ImageParams, + LineDetectResult, + fit_row_centers, +) +from common.vision.morphology import ( + apply_dist_mask, + apply_iso_closing, + apply_width_filter, +) + + +def detect_blackhat( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案A: Black-hat 中心型""" + # Black-hat 変換(暗い構造の抽出) + bh_k = params.blackhat_ksize | 1 + bh_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (bh_k, bh_k), + ) + blackhat = cv2.morphologyEx( + frame, cv2.MORPH_BLACKHAT, bh_kernel, + ) + + # ガウシアンブラー + blur_k = params.blur_size | 1 + blurred = cv2.GaussianBlur( + blackhat, (blur_k, blur_k), 0, + ) + + # 固定閾値(Black-hat 後は線が白) + _, binary = cv2.threshold( + blurred, params.binary_thresh, 255, + cv2.THRESH_BINARY, + ) + + # 等方クロージング + 距離変換マスク + 幅フィルタ + binary = apply_iso_closing( + binary, params.iso_close_size, + ) + binary = apply_dist_mask( + binary, params.dist_thresh, + ) + if params.width_near > 0 and params.width_far > 0: + binary = apply_width_filter( + binary, + params.width_near, + params.width_far, + params.width_tolerance, + ) + + # 行ごと中心抽出 + フィッティング + return fit_row_centers( + binary, params.min_line_width, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + ) diff --git a/src/common/vision/detectors/current.py b/src/common/vision/detectors/current.py new file mode 100644 index 0000000..98c4310 --- /dev/null +++ b/src/common/vision/detectors/current.py @@ -0,0 +1,76 @@ +""" +current +現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング +""" + +import cv2 +import numpy as np + +from common.vision.line_detector import ( + DETECT_Y_END, + DETECT_Y_START, + MIN_FIT_PIXELS, + ImageParams, + LineDetectResult, + build_result, + no_detection, +) + + +def detect_current( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング""" + # CLAHE でコントラスト強調 + clahe = cv2.createCLAHE( + clipLimit=params.clahe_clip, + tileGridSize=( + params.clahe_grid, + params.clahe_grid, + ), + ) + enhanced = clahe.apply(frame) + + # ガウシアンブラー + blur_k = params.blur_size | 1 + blurred = cv2.GaussianBlur( + enhanced, (blur_k, blur_k), 0, + ) + + # 固定閾値で二値化(黒線を白に反転) + _, binary = cv2.threshold( + blurred, params.binary_thresh, 255, + cv2.THRESH_BINARY_INV, + ) + + # オープニング(孤立ノイズ除去) + if params.open_size >= 3: + open_k = params.open_size | 1 + open_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (open_k, open_k), + ) + binary = cv2.morphologyEx( + binary, cv2.MORPH_OPEN, open_kernel, + ) + + # 横方向クロージング(途切れ補間) + if params.close_width >= 3: + close_h = max(params.close_height | 1, 1) + close_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, + (params.close_width, close_h), + ) + binary = cv2.morphologyEx( + binary, cv2.MORPH_CLOSE, close_kernel, + ) + + # 全ピクセルフィッティング + region = binary[DETECT_Y_START:DETECT_Y_END, :] + ys_local, xs = np.where(region > 0) + + if len(xs) < MIN_FIT_PIXELS: + return no_detection(binary) + + ys = ys_local + DETECT_Y_START + coeffs = np.polyfit(ys, xs, 2) + return build_result(coeffs, binary) diff --git a/src/common/vision/detectors/dual_norm.py b/src/common/vision/detectors/dual_norm.py new file mode 100644 index 0000000..c772ebe --- /dev/null +++ b/src/common/vision/detectors/dual_norm.py @@ -0,0 +1,89 @@ +""" +dual_norm +案B: 二重正規化型の線検出 +背景除算で照明勾配を除去し, +適応的閾値で局所ムラにも対応する二重防壁構成 +""" + +import cv2 +import numpy as np + +from common.vision.line_detector import ( + ImageParams, + LineDetectResult, + fit_row_centers, +) +from common.vision.morphology import ( + apply_dist_mask, + apply_iso_closing, + apply_staged_closing, + apply_width_filter, +) + + +def detect_dual_norm( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案B: 二重正規化型""" + # 背景除算正規化 + bg_k = params.bg_blur_ksize | 1 + bg = cv2.GaussianBlur( + frame, (bg_k, bg_k), 0, + ) + normalized = ( + frame.astype(np.float32) * 255.0 + / (bg.astype(np.float32) + 1.0) + ) + normalized = np.clip( + normalized, 0, 255, + ).astype(np.uint8) + + # 適応的閾値(ガウシアン,BINARY_INV) + block = max(params.adaptive_block | 1, 3) + binary = cv2.adaptiveThreshold( + normalized, 255, + cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY_INV, + block, params.adaptive_c, + ) + + # 固定閾値との AND(有効時のみ) + if params.global_thresh > 0: + _, global_mask = cv2.threshold( + normalized, params.global_thresh, + 255, cv2.THRESH_BINARY_INV, + ) + binary = cv2.bitwise_and(binary, global_mask) + + # 段階クロージング or 等方クロージング + if params.stage_min_area > 0: + binary = apply_staged_closing( + binary, + params.stage_close_small, + params.stage_min_area, + params.stage_close_large, + ) + else: + binary = apply_iso_closing( + binary, params.iso_close_size, + ) + + # 距離変換マスク + 幅フィルタ + binary = apply_dist_mask( + binary, params.dist_thresh, + ) + if params.width_near > 0 and params.width_far > 0: + binary = apply_width_filter( + binary, + params.width_near, + params.width_far, + params.width_tolerance, + ) + + # 行ごと中心抽出 + フィッティング + return fit_row_centers( + binary, params.min_line_width, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + ) diff --git a/src/common/vision/detectors/robust.py b/src/common/vision/detectors/robust.py new file mode 100644 index 0000000..04ebf58 --- /dev/null +++ b/src/common/vision/detectors/robust.py @@ -0,0 +1,69 @@ +""" +robust +案C: 最高ロバスト型の線検出 +Black-hat + 適応的閾値の二重正規化に加え, +RANSAC で外れ値を除去する最もロバストな構成 +""" + +import cv2 +import numpy as np + +from common.vision.line_detector import ( + ImageParams, + LineDetectResult, + fit_row_centers, +) +from common.vision.morphology import ( + apply_dist_mask, + apply_iso_closing, + apply_width_filter, +) + + +def detect_robust( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案C: 最高ロバスト型""" + # Black-hat 変換 + bh_k = params.blackhat_ksize | 1 + bh_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (bh_k, bh_k), + ) + blackhat = cv2.morphologyEx( + frame, cv2.MORPH_BLACKHAT, bh_kernel, + ) + + # 適応的閾値(BINARY: Black-hat 後は線が白) + block = max(params.adaptive_block | 1, 3) + binary = cv2.adaptiveThreshold( + blackhat, 255, + cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY, + block, -params.adaptive_c, + ) + + # 等方クロージング + 距離変換マスク + 幅フィルタ + binary = apply_iso_closing( + binary, params.iso_close_size, + ) + binary = apply_dist_mask( + binary, params.dist_thresh, + ) + if params.width_near > 0 and params.width_far > 0: + binary = apply_width_filter( + binary, + params.width_near, + params.width_far, + params.width_tolerance, + ) + + # 行ごと中央値抽出 + RANSAC フィッティング + return fit_row_centers( + binary, params.min_line_width, + use_median=True, + ransac_thresh=params.ransac_thresh, + ransac_iter=params.ransac_iter, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + ) diff --git a/src/common/vision/detectors/valley.py b/src/common/vision/detectors/valley.py new file mode 100644 index 0000000..0368c5b --- /dev/null +++ b/src/common/vision/detectors/valley.py @@ -0,0 +1,324 @@ +""" +valley +案D: 谷検出+追跡型の線検出 +各行の輝度信号から谷(暗い領域)を直接検出し, +時系列追跡で安定性を確保する.二値化を使用しない +""" + +import cv2 +import numpy as np + +from common import config +from common.vision.fitting import clean_and_fit +from common.vision.line_detector import ( + DETECT_Y_END, + DETECT_Y_START, + MIN_FIT_ROWS, + ImageParams, + LineDetectResult, + build_result, + no_detection, +) + + +class ValleyTracker: + """谷検出の時系列追跡を管理するクラス + + 前フレームの多項式係数を保持し,予測・平滑化・ + 検出失敗時のコースティングを提供する + """ + + def __init__(self) -> None: + self._prev_coeffs: np.ndarray | None = None + self._smoothed_coeffs: np.ndarray | None = None + self._frames_lost: int = 0 + + def predict_x(self, y: float) -> float | None: + """前フレームの多項式から x 座標を予測する + + Args: + y: 画像の y 座標 + + Returns: + 予測 x 座標(履歴なしの場合は None) + """ + if self._smoothed_coeffs is None: + return None + return float(np.poly1d(self._smoothed_coeffs)(y)) + + def update( + self, + coeffs: np.ndarray, + alpha: float, + ) -> np.ndarray: + """検出成功時に状態を更新する + + Args: + coeffs: 今フレームのフィッティング係数 + alpha: EMA 係数(1.0 で平滑化なし) + + Returns: + 平滑化後の多項式係数 + """ + self._frames_lost = 0 + if self._smoothed_coeffs is None: + self._smoothed_coeffs = coeffs.copy() + else: + self._smoothed_coeffs = ( + alpha * coeffs + + (1.0 - alpha) * self._smoothed_coeffs + ) + self._prev_coeffs = self._smoothed_coeffs.copy() + return self._smoothed_coeffs + + def coast( + self, max_frames: int, + ) -> LineDetectResult | None: + """検出失敗時に予測結果を返す + + Args: + max_frames: 予測を継続する最大フレーム数 + + Returns: + 予測による結果(継続不可の場合は None) + """ + if self._smoothed_coeffs is None: + return None + self._frames_lost += 1 + if self._frames_lost > max_frames: + return None + h = config.FRAME_HEIGHT + w = config.FRAME_WIDTH + blank = np.zeros((h, w), dtype=np.uint8) + return build_result(self._smoothed_coeffs, blank) + + def reset(self) -> None: + """追跡状態をリセットする""" + self._prev_coeffs = None + self._smoothed_coeffs = None + self._frames_lost = 0 + + +_valley_tracker = ValleyTracker() + + +def reset_valley_tracker() -> None: + """谷検出の追跡状態をリセットする""" + _valley_tracker.reset() + + +def _find_row_valley( + row: np.ndarray, + min_depth: int, + expected_width: float, + width_tolerance: float, + predicted_x: float | None, + max_deviation: int, +) -> tuple[float, float] | None: + """1行の輝度信号から最適な谷を検出する + + Args: + row: スムージング済みの1行輝度信号 + min_depth: 最小谷深度 + expected_width: 期待線幅(px,0 で幅フィルタ無効) + width_tolerance: 幅フィルタの上限倍率 + predicted_x: 追跡による予測 x 座標(None で無効) + max_deviation: 予測からの最大許容偏差 + + Returns: + (谷の中心x, 谷の深度) または None + """ + n = len(row) + if n < 5: + return None + + signal = row.astype(np.float32) + + # 極小値を検出(前後より小さい点) + left = signal[:-2] + center = signal[1:-1] + right = signal[2:] + minima_mask = (center <= left) & (center <= right) + minima_indices = np.where(minima_mask)[0] + 1 + + if len(minima_indices) == 0: + return None + + best: tuple[float, float] | None = None + best_score = -1.0 + + for idx in minima_indices: + val = signal[idx] + + # 左の肩を探す + left_shoulder = idx + for i in range(idx - 1, -1, -1): + if signal[i] < signal[i + 1]: + break + left_shoulder = i + # 右の肩を探す + right_shoulder = idx + for i in range(idx + 1, n): + if signal[i] < signal[i - 1]: + break + right_shoulder = i + + # 谷の深度(肩の平均 - 谷底) + shoulder_avg = ( + signal[left_shoulder] + signal[right_shoulder] + ) / 2.0 + depth = shoulder_avg - val + if depth < min_depth: + continue + + # 谷の幅 + width = right_shoulder - left_shoulder + center_x = (left_shoulder + right_shoulder) / 2.0 + + # 幅フィルタ + if expected_width > 0: + max_w = expected_width * width_tolerance + min_w = expected_width / width_tolerance + if width > max_w or width < min_w: + continue + + # 予測との偏差チェック + if predicted_x is not None: + if abs(center_x - predicted_x) > max_deviation: + continue + + # スコア: 深度優先,予測がある場合は近さも考慮 + score = float(depth) + if predicted_x is not None: + dist = abs(center_x - predicted_x) + score += max(0.0, max_deviation - dist) + + if score > best_score: + best_score = score + best = (center_x, float(depth)) + + return best + + +def _build_valley_binary( + shape: tuple[int, int], + centers_y: list[int], + centers_x: list[float], +) -> np.ndarray: + """谷検出結果からデバッグ用二値画像を生成する + + Args: + shape: 出力画像の (高さ, 幅) + centers_y: 検出行の y 座標リスト + centers_x: 検出行の中心 x 座標リスト + + Returns: + デバッグ用二値画像 + """ + binary = np.zeros(shape, dtype=np.uint8) + half_w = 3 + w = shape[1] + for y, cx in zip(centers_y, centers_x): + x0 = max(0, int(cx) - half_w) + x1 = min(w, int(cx) + half_w + 1) + binary[y, x0:x1] = 255 + return binary + + +def detect_valley( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案D: 谷検出+追跡型""" + h, w = frame.shape[:2] + + # 行ごとにガウシアン平滑化するため画像全体をブラー + gauss_k = params.valley_gauss_ksize | 1 + blurred = cv2.GaussianBlur( + frame, (gauss_k, 1), 0, + ) + + # 透視補正の期待幅を計算するための準備 + use_width = ( + params.width_near > 0 and params.width_far > 0 + ) + detect_h = DETECT_Y_END - DETECT_Y_START + denom = max(detect_h - 1, 1) + + centers_y: list[int] = [] + centers_x: list[float] = [] + depths: list[float] = [] + + for y in range(DETECT_Y_START, DETECT_Y_END): + row = blurred[y] + + # 期待幅の計算 + if use_width: + t = (DETECT_Y_END - 1 - y) / denom + expected_w = float(params.width_far) + ( + float(params.width_near) + - float(params.width_far) + ) * t + else: + expected_w = 0.0 + + # 予測 x 座標 + predicted_x = _valley_tracker.predict_x( + float(y), + ) + + result = _find_row_valley( + row, + params.valley_min_depth, + expected_w, + params.width_tolerance, + predicted_x, + params.valley_max_deviation, + ) + if result is not None: + centers_y.append(y) + centers_x.append(result[0]) + depths.append(result[1]) + + # デバッグ用二値画像 + debug_binary = _build_valley_binary( + (h, w), centers_y, centers_x, + ) + + if len(centers_y) < MIN_FIT_ROWS: + coasted = _valley_tracker.coast( + params.valley_coast_frames, + ) + if coasted is not None: + coasted.binary_image = debug_binary + return coasted + return no_detection(debug_binary) + + cy = np.array(centers_y, dtype=np.float64) + cx = np.array(centers_x, dtype=np.float64) + w_arr = np.array(depths, dtype=np.float64) + + # ロバストフィッティング(深度を重みに使用) + coeffs = clean_and_fit( + cy, cx, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + weights=w_arr, + ransac_thresh=params.ransac_thresh, + ransac_iter=params.ransac_iter, + ) + if coeffs is None: + coasted = _valley_tracker.coast( + params.valley_coast_frames, + ) + if coasted is not None: + coasted.binary_image = debug_binary + return coasted + return no_detection(debug_binary) + + # EMA で平滑化 + smoothed = _valley_tracker.update( + coeffs, params.valley_ema_alpha, + ) + + return build_result(smoothed, debug_binary) diff --git a/src/common/vision/fitting.py b/src/common/vision/fitting.py new file mode 100644 index 0000000..c55139d --- /dev/null +++ b/src/common/vision/fitting.py @@ -0,0 +1,212 @@ +""" +fitting +直線・曲線近似の共通ユーティリティモジュール +Theil-Sen 推定,RANSAC,外れ値除去付きフィッティングを提供する +""" + +import numpy as np + +# フィッティングに必要な最小行数 +MIN_FIT_ROWS: int = 10 + +# 近傍外れ値除去の設定 +NEIGHBOR_HALF_WINDOW: int = 3 +NEIGHBOR_FILTER_PASSES: int = 3 + +# 残差ベース反復除去の最大回数 +RESIDUAL_REMOVAL_ITERATIONS: int = 5 + + +def theil_sen_fit( + y: np.ndarray, + x: np.ndarray, +) -> tuple[float, float]: + """Theil-Sen 推定で直線 x = slope * y + intercept を求める + + 全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う + + Args: + y: y 座標の配列(行番号) + x: x 座標の配列(各行の中心) + + Returns: + (slope, intercept) のタプル + """ + n = len(y) + slopes = [] + for i in range(n): + for j in range(i + 1, n): + dy = y[j] - y[i] + if dy != 0: + slopes.append((x[j] - x[i]) / dy) + + if len(slopes) == 0: + return 0.0, float(np.median(x)) + + slope = float(np.median(slopes)) + intercept = float(np.median(x - slope * y)) + return slope, intercept + + +def ransac_polyfit( + ys: np.ndarray, xs: np.ndarray, + degree: int, n_iter: int, thresh: float, +) -> np.ndarray | None: + """RANSAC で外れ値を除去して多項式フィッティング + + Args: + ys: y 座標配列 + xs: x 座標配列 + degree: 多項式の次数 + n_iter: 反復回数 + thresh: 外れ値判定閾値(ピクセル) + + Returns: + 多項式係数(フィッティング失敗時は None) + """ + n = len(ys) + sample_size = degree + 1 + if n < sample_size: + return None + + best_coeffs: np.ndarray | None = None + best_inliers = 0 + rng = np.random.default_rng() + + for _ in range(n_iter): + idx = rng.choice(n, sample_size, replace=False) + coeffs = np.polyfit(ys[idx], xs[idx], degree) + poly = np.poly1d(coeffs) + residuals = np.abs(xs - poly(ys)) + n_inliers = int(np.sum(residuals < thresh)) + if n_inliers > best_inliers: + best_inliers = n_inliers + best_coeffs = coeffs + + # インライアで再フィッティング + if best_coeffs is not None: + poly = np.poly1d(best_coeffs) + inlier_mask = np.abs(xs - poly(ys)) < thresh + if np.sum(inlier_mask) >= sample_size: + best_coeffs = np.polyfit( + ys[inlier_mask], + xs[inlier_mask], + degree, + ) + + return best_coeffs + + +def clean_and_fit( + cy: np.ndarray, + cx: np.ndarray, + median_ksize: int, + neighbor_thresh: float, + residual_thresh: float = 0.0, + weights: np.ndarray | None = None, + ransac_thresh: float = 0.0, + ransac_iter: int = 0, +) -> np.ndarray | None: + """外れ値除去+重み付きフィッティングを行う + + 全検出手法で共通に使えるロバストなフィッティング + (1) 移動メディアンフィルタでスパイクを平滑化 + (2) 近傍中央値からの偏差で外れ値を除去(複数パス) + (3) 重み付き最小二乗(または RANSAC)でフィッティング + (4) 残差ベースの反復除去で外れ値を最終除去 + + Args: + cy: 中心点の y 座標配列 + cx: 中心点の x 座標配列 + median_ksize: 移動メディアンのカーネルサイズ(0 で無効) + neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効) + residual_thresh: 残差除去の閾値 px(0 で無効) + weights: 各点の信頼度(None で均等) + ransac_thresh: RANSAC 閾値(0 以下で無効) + ransac_iter: RANSAC 反復回数 + + Returns: + 多項式係数(フィッティング失敗時は None) + """ + if len(cy) < MIN_FIT_ROWS: + return None + + cx_clean = cx.copy() + mask = np.ones(len(cy), dtype=bool) + + # (1) 移動メディアンフィルタ + if median_ksize >= 3: + k = median_ksize | 1 + half = k // 2 + for i in range(len(cx_clean)): + lo = max(0, i - half) + hi = min(len(cx_clean), i + half + 1) + cx_clean[i] = float(np.median(cx[lo:hi])) + + # (2) 近傍外れ値除去(複数パス) + if neighbor_thresh > 0: + half_n = NEIGHBOR_HALF_WINDOW + for _ in range(NEIGHBOR_FILTER_PASSES): + new_mask = np.ones(len(cx_clean), dtype=bool) + for i in range(len(cx_clean)): + if not mask[i]: + continue + lo = max(0, i - half_n) + hi = min(len(cx_clean), i + half_n + 1) + neighbors = cx_clean[lo:hi][mask[lo:hi]] + if len(neighbors) == 0: + new_mask[i] = False + continue + local_med = float(np.median(neighbors)) + if ( + abs(cx_clean[i] - local_med) + > neighbor_thresh + ): + new_mask[i] = False + if np.array_equal(mask, mask & new_mask): + break + mask = mask & new_mask + + cy = cy[mask] + cx_clean = cx_clean[mask] + if weights is not None: + weights = weights[mask] + + if len(cy) < MIN_FIT_ROWS: + return None + + # (3) フィッティング + if ransac_thresh > 0 and ransac_iter > 0: + coeffs = ransac_polyfit( + cy, cx_clean, 2, ransac_iter, ransac_thresh, + ) + elif weights is not None: + coeffs = np.polyfit(cy, cx_clean, 2, w=weights) + else: + coeffs = np.polyfit(cy, cx_clean, 2) + + if coeffs is None: + return None + + # (4) 残差ベースの反復除去 + if residual_thresh > 0: + for _ in range(RESIDUAL_REMOVAL_ITERATIONS): + poly = np.poly1d(coeffs) + residuals = np.abs(cx_clean - poly(cy)) + inlier = residuals < residual_thresh + if np.all(inlier): + break + if np.sum(inlier) < MIN_FIT_ROWS: + break + cy = cy[inlier] + cx_clean = cx_clean[inlier] + if weights is not None: + weights = weights[inlier] + if weights is not None: + coeffs = np.polyfit( + cy, cx_clean, 2, w=weights, + ) + else: + coeffs = np.polyfit(cy, cx_clean, 2) + + return coeffs diff --git a/src/common/vision/intersection.py b/src/common/vision/intersection.py new file mode 100644 index 0000000..20a94e3 --- /dev/null +++ b/src/common/vision/intersection.py @@ -0,0 +1,75 @@ +""" +intersection +十字路分類モデルの読み込みと推論を行うモジュール + +学習済みモデルとスケーラを読み込み, +二値画像から十字路かどうかを判定する +""" + +from pathlib import Path + +import numpy as np + +from common.json_utils import PARAMS_DIR + +# モデル・スケーラの保存先 +_MODEL_PATH: Path = PARAMS_DIR / "intersection_model.pkl" +_SCALER_PATH: Path = PARAMS_DIR / "intersection_scaler.pkl" + + +class IntersectionClassifier: + """十字路分類器 + + 学習済みモデルを読み込み,二値画像から + 十字路かどうかを判定する + scikit-learn を遅延インポートして起動時間を短縮する + """ + + def __init__(self) -> None: + self._model: object | None = None + self._scaler: object | None = None + self._available: bool = False + + def load(self) -> None: + """モデルとスケーラを読み込む(遅延呼び出し用)""" + if not _MODEL_PATH.exists(): + print( + f" モデルが見つかりません: {_MODEL_PATH}" + ) + return + if not _SCALER_PATH.exists(): + print( + f" スケーラが見つかりません: {_SCALER_PATH}" + ) + return + try: + import joblib + + self._model = joblib.load(_MODEL_PATH) + self._scaler = joblib.load(_SCALER_PATH) + self._available = True + except Exception as e: + print(f" モデル読み込みエラー: {e}") + + @property + def available(self) -> bool: + """モデルが利用可能かどうか""" + return self._available + + def predict(self, binary_image: np.ndarray) -> bool: + """二値画像が十字路かどうかを判定する + + Args: + binary_image: 40×30 の二値画像(0/255) + + Returns: + 十字路なら True + """ + if not self._available: + return False + flat = (binary_image.flatten() / 255.0).astype( + np.float32, + ) + x = self._scaler.transform(flat.reshape(1, -1)) + pred = self._model.predict(x) + return bool(pred[0] == 1) diff --git a/src/common/vision/line_detector.py b/src/common/vision/line_detector.py new file mode 100644 index 0000000..dbc9ec6 --- /dev/null +++ b/src/common/vision/line_detector.py @@ -0,0 +1,364 @@ +""" +line_detector +カメラ画像から黒線の位置を検出するモジュール +複数の検出手法を切り替えて使用できる + +公開 API: + ImageParams, LineDetectResult, detect_line, + reset_valley_tracker, DETECT_METHODS +""" + +from dataclasses import dataclass + +import cv2 +import numpy as np + +from common import config +from common.vision.fitting import clean_and_fit + +# 検出領域の y 範囲(画像全体) +DETECT_Y_START: int = 0 +DETECT_Y_END: int = config.FRAME_HEIGHT + +# フィッティングに必要な最小数 +MIN_FIT_PIXELS: int = 50 +MIN_FIT_ROWS: int = 10 + +# 検出手法の定義(キー: 識別子,値: 表示名) +DETECT_METHODS: dict[str, str] = { + "current": "現行(CLAHE + 固定閾値)", + "blackhat": "案A(Black-hat 中心)", + "dual_norm": "案B(二重正規化)", + "robust": "案C(最高ロバスト)", + "valley": "案D(谷検出+追跡)", +} + + +@dataclass +class ImageParams: + """二値化パラメータ + + Attributes: + method: 検出手法の識別子 + clahe_clip: CLAHE のコントラスト増幅上限 + clahe_grid: CLAHE の局所領域分割数 + blur_size: ガウシアンブラーのカーネルサイズ(奇数) + binary_thresh: 二値化の閾値 + open_size: オープニングのカーネルサイズ + close_width: クロージングの横幅 + close_height: クロージングの高さ + blackhat_ksize: Black-hat のカーネルサイズ + bg_blur_ksize: 背景除算のブラーカーネルサイズ + global_thresh: 固定閾値(0 で無効,適応的閾値との AND) + adaptive_block: 適応的閾値のブロックサイズ + adaptive_c: 適応的閾値の定数 C + iso_close_size: 等方クロージングのカーネルサイズ + dist_thresh: 距離変換の閾値 + min_line_width: 行ごと中心抽出の最小線幅 + stage_close_small: 段階クロージング第1段のサイズ + stage_min_area: 孤立除去の最小面積(0 で無効) + stage_close_large: 段階クロージング第2段のサイズ(0 で無効) + ransac_thresh: RANSAC の外れ値判定閾値 + ransac_iter: RANSAC の反復回数 + width_near: 画像下端での期待線幅(px,0 で無効) + width_far: 画像上端での期待線幅(px,0 で無効) + width_tolerance: 幅フィルタの上限倍率 + median_ksize: 中心点列の移動メディアンフィルタサイズ(0 で無効) + neighbor_thresh: 近傍外れ値除去の閾値(px,0 で無効) + residual_thresh: 残差反復除去の閾値(px,0 で無効) + valley_gauss_ksize: 谷検出の行ごとガウシアンカーネルサイズ + valley_min_depth: 谷として認識する最小深度 + valley_max_deviation: 追跡予測からの最大許容偏差(px) + valley_coast_frames: 検出失敗時の予測継続フレーム数 + valley_ema_alpha: 多項式係数の指数移動平均係数 + """ + + # 検出手法 + method: str = "current" + + # 現行手法パラメータ + clahe_clip: float = 2.0 + clahe_grid: int = 8 + blur_size: int = 5 + binary_thresh: int = 80 + open_size: int = 5 + close_width: int = 25 + close_height: int = 3 + + # 案A/C: Black-hat + blackhat_ksize: int = 45 + + # 案B: 背景除算 + bg_blur_ksize: int = 101 + global_thresh: int = 0 + + # 案B/C: 適応的閾値 + adaptive_block: int = 51 + adaptive_c: int = 10 + + # 案A/B/C: 後処理 + iso_close_size: int = 15 + dist_thresh: float = 3.0 + min_line_width: int = 3 + + # 案B: 段階クロージング + stage_close_small: int = 5 + stage_min_area: int = 0 + stage_close_large: int = 0 + + # 案C: RANSAC + ransac_thresh: float = 5.0 + ransac_iter: int = 50 + + # ロバストフィッティング(全手法共通) + median_ksize: int = 7 + neighbor_thresh: float = 10.0 + residual_thresh: float = 8.0 + + # 透視補正付き幅フィルタ(0 で無効) + width_near: int = 0 + width_far: int = 0 + width_tolerance: float = 1.8 + + # 案D: 谷検出+追跡 + valley_gauss_ksize: int = 15 + valley_min_depth: int = 15 + valley_max_deviation: int = 40 + valley_coast_frames: int = 3 + valley_ema_alpha: float = 0.7 + + +@dataclass +class LineDetectResult: + """線検出の結果を格納するデータクラス + + Attributes: + detected: 線が検出できたか + position_error: 画像下端での位置偏差(-1.0~+1.0) + heading: 線の傾き(dx/dy,画像下端での値) + curvature: 線の曲率(d²x/dy²) + poly_coeffs: 多項式の係数(描画用,未検出時は None) + row_centers: 各行の線中心 x 座標(index=行番号, + NaN=その行に線なし,未検出時は None) + binary_image: 二値化後の画像(デバッグ用) + """ + + detected: bool + position_error: float + heading: float + curvature: float + poly_coeffs: np.ndarray | None + row_centers: np.ndarray | None + binary_image: np.ndarray | None + + +# ── 公開 API ────────────────────────────────────── + + +def detect_line( + frame: np.ndarray, + params: ImageParams | None = None, +) -> LineDetectResult: + """画像から黒線の位置を検出する + + params.method に応じて検出手法を切り替える + + Args: + frame: グレースケールのカメラ画像 + params: 二値化パラメータ(None でデフォルト) + + Returns: + 線検出の結果 + """ + if params is None: + params = ImageParams() + + method = params.method + if method == "blackhat": + from common.vision.detectors.blackhat import ( + detect_blackhat, + ) + return detect_blackhat(frame, params) + if method == "dual_norm": + from common.vision.detectors.dual_norm import ( + detect_dual_norm, + ) + return detect_dual_norm(frame, params) + if method == "robust": + from common.vision.detectors.robust import ( + detect_robust, + ) + return detect_robust(frame, params) + if method == "valley": + from common.vision.detectors.valley import ( + detect_valley, + ) + return detect_valley(frame, params) + + from common.vision.detectors.current import ( + detect_current, + ) + return detect_current(frame, params) + + +def reset_valley_tracker() -> None: + """谷検出の追跡状態をリセットする""" + from common.vision.detectors.valley import ( + reset_valley_tracker as _reset, + ) + _reset() + + +# ── 共通結果構築(各検出器から使用) ────────────── + + +def no_detection( + binary: np.ndarray, +) -> LineDetectResult: + """未検出の結果を返す""" + return LineDetectResult( + detected=False, + position_error=0.0, + heading=0.0, + curvature=0.0, + poly_coeffs=None, + row_centers=None, + binary_image=binary, + ) + + +def _extract_row_centers( + binary: np.ndarray, +) -> np.ndarray | None: + """二値画像の最大連結領域から各行の線中心を求める + + Args: + binary: 二値画像 + + Returns: + 各行の中心 x 座標(NaN=その行に線なし), + 最大領域が見つからない場合は None + """ + h, w = binary.shape[:2] + num_labels, labels, stats, _ = ( + cv2.connectedComponentsWithStats(binary) + ) + + if num_labels <= 1: + return None + + # 背景(ラベル 0)を除いた最大領域を取得 + areas = stats[1:, cv2.CC_STAT_AREA] + largest_label = int(np.argmax(areas)) + 1 + + # 最大領域のマスク + mask = (labels == largest_label).astype(np.uint8) + + # 各行の左右端から中心を計算 + centers = np.full(h, np.nan) + for y in range(h): + row = mask[y] + cols = np.where(row > 0)[0] + if len(cols) > 0: + centers[y] = (cols[0] + cols[-1]) / 2.0 + + return centers + + +def build_result( + coeffs: np.ndarray, + binary: np.ndarray, + row_centers: np.ndarray | None = None, +) -> LineDetectResult: + """多項式係数から LineDetectResult を構築する + + row_centers が None の場合は binary から自動抽出する + """ + poly = np.poly1d(coeffs) + center_x = config.FRAME_WIDTH / 2.0 + + # 画像下端での位置偏差 + x_bottom = poly(DETECT_Y_END) + position_error = (center_x - x_bottom) / center_x + + # 傾き: dx/dy(画像下端での値) + poly_deriv = poly.deriv() + heading = float(poly_deriv(DETECT_Y_END)) + + # 曲率: d²x/dy² + poly_deriv2 = poly_deriv.deriv() + curvature = float(poly_deriv2(DETECT_Y_END)) + + # row_centers が未提供なら binary から抽出 + if row_centers is None: + row_centers = _extract_row_centers(binary) + + return LineDetectResult( + detected=True, + position_error=position_error, + heading=heading, + curvature=curvature, + poly_coeffs=coeffs, + row_centers=row_centers, + binary_image=binary, + ) + + +def fit_row_centers( + binary: np.ndarray, + min_width: int, + use_median: bool = False, + ransac_thresh: float = 0.0, + ransac_iter: int = 0, + median_ksize: int = 0, + neighbor_thresh: float = 0.0, + residual_thresh: float = 0.0, +) -> LineDetectResult: + """行ごとの中心点に多項式をフィッティングする + + Args: + binary: 二値画像 + min_width: 線として認識する最小ピクセル数 + use_median: True の場合は中央値を使用 + ransac_thresh: RANSAC 閾値(0 以下で無効) + ransac_iter: RANSAC 反復回数 + median_ksize: 移動メディアンのカーネルサイズ + neighbor_thresh: 近傍外れ値除去の閾値 px + residual_thresh: 残差反復除去の閾値 px + + Returns: + 線検出の結果 + """ + region = binary[DETECT_Y_START:DETECT_Y_END, :] + centers_y: list[float] = [] + centers_x: list[float] = [] + + for y_local in range(region.shape[0]): + xs = np.where(region[y_local] > 0)[0] + if len(xs) < min_width: + continue + y = float(y_local + DETECT_Y_START) + centers_y.append(y) + if use_median: + centers_x.append(float(np.median(xs))) + else: + centers_x.append(float(np.mean(xs))) + + if len(centers_y) < MIN_FIT_ROWS: + return no_detection(binary) + + cy = np.array(centers_y) + cx = np.array(centers_x) + + coeffs = clean_and_fit( + cy, cx, + median_ksize=median_ksize, + neighbor_thresh=neighbor_thresh, + residual_thresh=residual_thresh, + ransac_thresh=ransac_thresh, + ransac_iter=ransac_iter, + ) + if coeffs is None: + return no_detection(binary) + + return build_result(coeffs, binary) diff --git a/src/common/vision/morphology.py b/src/common/vision/morphology.py new file mode 100644 index 0000000..578dcb6 --- /dev/null +++ b/src/common/vision/morphology.py @@ -0,0 +1,127 @@ +""" +morphology +二値画像の形態学的処理ユーティリティモジュール +""" + +import cv2 +import numpy as np + + +def apply_iso_closing( + binary: np.ndarray, size: int, +) -> np.ndarray: + """等方クロージングで穴を埋める + + Args: + binary: 二値画像 + size: カーネルサイズ + + Returns: + クロージング後の二値画像 + """ + if size < 3: + return binary + k = size | 1 + kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (k, k), + ) + return cv2.morphologyEx( + binary, cv2.MORPH_CLOSE, kernel, + ) + + +def apply_staged_closing( + binary: np.ndarray, + small_size: int, + min_area: int, + large_size: int, +) -> np.ndarray: + """段階クロージング: 小穴埋め → 孤立除去 → 大穴埋め + + Args: + binary: 二値画像 + small_size: 第1段クロージングのカーネルサイズ + min_area: 孤立領域除去の最小面積(0 で無効) + large_size: 第2段クロージングのカーネルサイズ(0 で無効) + + Returns: + 処理後の二値画像 + """ + result = apply_iso_closing(binary, small_size) + + if min_area > 0: + contours, _ = cv2.findContours( + result, cv2.RETR_EXTERNAL, + cv2.CHAIN_APPROX_SIMPLE, + ) + mask = np.zeros_like(result) + for cnt in contours: + if cv2.contourArea(cnt) >= min_area: + cv2.drawContours( + mask, [cnt], -1, 255, -1, + ) + result = mask + + result = apply_iso_closing(result, large_size) + + return result + + +def apply_width_filter( + binary: np.ndarray, + width_near: int, + width_far: int, + tolerance: float, +) -> np.ndarray: + """透視補正付き幅フィルタで広がりすぎた行を除外する + + Args: + binary: 二値画像 + width_near: 画像下端での期待線幅(px) + width_far: 画像上端での期待線幅(px) + tolerance: 上限倍率 + + Returns: + 幅フィルタ適用後の二値画像 + """ + result = binary.copy() + h = binary.shape[0] + denom = max(h - 1, 1) + + for y_local in range(h): + xs = np.where(binary[y_local] > 0)[0] + if len(xs) == 0: + continue + t = (h - 1 - y_local) / denom + expected = float(width_far) + ( + float(width_near) - float(width_far) + ) * t + max_w = expected * tolerance + actual_w = int(xs[-1]) - int(xs[0]) + 1 + if actual_w > max_w: + result[y_local] = 0 + + return result + + +def apply_dist_mask( + binary: np.ndarray, thresh: float, +) -> np.ndarray: + """距離変換で中心部のみを残す + + Args: + binary: 二値画像 + thresh: 距離の閾値(ピクセル) + + Returns: + 中心部のみの二値画像 + """ + if thresh <= 0: + return binary + dist = cv2.distanceTransform( + binary, cv2.DIST_L2, 5, + ) + _, mask = cv2.threshold( + dist, thresh, 255, cv2.THRESH_BINARY, + ) + return mask.astype(np.uint8) diff --git a/src/pc/gui/main_window.py b/src/pc/gui/main_window.py index e777710..84e1a47 100644 --- a/src/pc/gui/main_window.py +++ b/src/pc/gui/main_window.py @@ -44,11 +44,11 @@ save_recovery, save_ts_pd, ) -from pc.steering.pd_control import PdParams -from pc.steering.pursuit_control import PursuitParams -from pc.steering.recovery import RecoveryParams -from pc.steering.ts_pd_control import TsPdParams -from pc.vision.line_detector import ( +from common.steering.pd_control import PdParams +from common.steering.pursuit_control import PursuitParams +from common.steering.recovery import RecoveryParams +from common.steering.ts_pd_control import TsPdParams +from common.vision.line_detector import ( ImageParams, LineDetectResult, ) diff --git a/src/pc/gui/panels/control_param_panel.py b/src/pc/gui/panels/control_param_panel.py index bddedfe..11572b5 100644 --- a/src/pc/gui/panels/control_param_panel.py +++ b/src/pc/gui/panels/control_param_panel.py @@ -32,9 +32,9 @@ load_pd_presets, load_ts_pd_presets, ) -from pc.steering.pd_control import PdParams -from pc.steering.pursuit_control import PursuitParams -from pc.steering.ts_pd_control import TsPdParams +from common.steering.pd_control import PdParams +from common.steering.pursuit_control import PursuitParams +from common.steering.ts_pd_control import TsPdParams class ControlParamPanel(CollapsibleGroupBox): diff --git a/src/pc/gui/panels/image_param_panel.py b/src/pc/gui/panels/image_param_panel.py index 570daa3..877a6ad 100644 --- a/src/pc/gui/panels/image_param_panel.py +++ b/src/pc/gui/panels/image_param_panel.py @@ -32,7 +32,7 @@ delete_image_preset, load_image_presets, ) -from pc.vision.line_detector import ( +from common.vision.line_detector import ( DETECT_METHODS, ImageParams, reset_valley_tracker, diff --git a/src/pc/gui/panels/recovery_panel.py b/src/pc/gui/panels/recovery_panel.py index a56bb25..bf07380 100644 --- a/src/pc/gui/panels/recovery_panel.py +++ b/src/pc/gui/panels/recovery_panel.py @@ -14,7 +14,7 @@ from pc.gui.panels.collapsible_group_box import ( CollapsibleGroupBox, ) -from pc.steering.recovery import RecoveryParams +from common.steering.recovery import RecoveryParams class RecoveryPanel(CollapsibleGroupBox): diff --git a/src/pc/steering/auto_params.py b/src/pc/steering/auto_params.py index 9b10c4c..e8ddb24 100644 --- a/src/pc/steering/auto_params.py +++ b/src/pc/steering/auto_params.py @@ -19,11 +19,11 @@ from dataclasses import asdict from common.json_utils import PARAMS_DIR, read_json, write_json -from pc.steering.pd_control import PdParams -from pc.steering.pursuit_control import PursuitParams -from pc.steering.recovery import RecoveryParams -from pc.steering.ts_pd_control import TsPdParams -from pc.vision.line_detector import ImageParams +from common.steering.pd_control import PdParams +from common.steering.pursuit_control import PursuitParams +from common.steering.recovery import RecoveryParams +from common.steering.ts_pd_control import TsPdParams +from common.vision.line_detector import ImageParams from pc.vision.overlay import OverlayFlags # PD 制御パラメータファイル diff --git a/src/pc/steering/base.py b/src/pc/steering/base.py deleted file mode 100644 index 40eff1a..0000000 --- a/src/pc/steering/base.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -base -操舵量計算の共通インターフェースを定義するモジュール -全ての操舵量計算クラスはこのインターフェースに従う -""" - -from abc import ABC, abstractmethod -from dataclasses import dataclass - -import numpy as np - - -@dataclass -class SteeringOutput: - """操舵量計算の出力を格納するデータクラス - - Attributes: - throttle: 前後方向の出力 (-1.0 ~ +1.0) - steer: 左右方向の出力 (-1.0 ~ +1.0) - """ - throttle: float - steer: float - - -class SteeringBase(ABC): - """操舵量計算の基底クラス - - 全ての操舵量計算クラスはこのクラスを継承し, - compute メソッドを実装する - """ - - @abstractmethod - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から操舵量を計算する - - Args: - frame: BGR 形式のカメラ画像 - - Returns: - 計算された操舵量 - """ - - @abstractmethod - def reset(self) -> None: - """内部状態をリセットする - - 自動操縦の開始時に呼び出される - """ diff --git a/src/pc/steering/param_store.py b/src/pc/steering/param_store.py index 503bf7c..ea9cf04 100644 --- a/src/pc/steering/param_store.py +++ b/src/pc/steering/param_store.py @@ -7,9 +7,9 @@ from dataclasses import asdict, dataclass from common.json_utils import PARAMS_DIR, read_json, write_json -from pc.steering.pd_control import PdParams -from pc.steering.ts_pd_control import TsPdParams -from pc.vision.line_detector import ImageParams +from common.steering.pd_control import PdParams +from common.steering.ts_pd_control import TsPdParams +from common.vision.line_detector import ImageParams _PD_FILE = PARAMS_DIR / "presets_pd.json" _TS_PD_FILE = PARAMS_DIR / "presets_ts_pd.json" diff --git a/src/pc/steering/pd_control.py b/src/pc/steering/pd_control.py deleted file mode 100644 index 753c8e6..0000000 --- a/src/pc/steering/pd_control.py +++ /dev/null @@ -1,133 +0,0 @@ -""" -pd_control -PD 制御による操舵量計算モジュール -多項式フィッティングの位置・傾き・曲率から操舵量と速度を算出する -""" - -import time -from dataclasses import dataclass - -import numpy as np - -from pc.steering.base import SteeringBase, SteeringOutput -from pc.vision.line_detector import ( - ImageParams, - detect_line, - reset_valley_tracker, -) - - -@dataclass -class PdParams: - """PD 制御のパラメータ - - Attributes: - kp: 位置偏差ゲイン - kh: 傾き(ヘディング)ゲイン - kd: 微分ゲイン - max_steer_rate: 1フレームあたりの最大操舵変化量 - max_throttle: 直線での最大速度 - speed_k: 曲率ベースの減速係数 - """ - kp: float = 0.5 - kh: float = 0.3 - kd: float = 0.1 - max_steer_rate: float = 0.1 - max_throttle: float = 0.4 - speed_k: float = 0.3 - - -class PdControl(SteeringBase): - """PD 制御による操舵量計算クラス""" - - def __init__( - self, - params: PdParams | None = None, - image_params: ImageParams | None = None, - ) -> None: - self.params: PdParams = params or PdParams() - self.image_params: ImageParams = ( - image_params or ImageParams() - ) - self._prev_error: float = 0.0 - self._prev_time: float = 0.0 - self._prev_steer: float = 0.0 - self._last_result = None - - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から PD 制御で操舵量を計算する - - Args: - frame: グレースケールのカメラ画像 - - Returns: - 計算された操舵量 - """ - p = self.params - - # 線検出 - result = detect_line(frame, self.image_params) - self._last_result = result - - # 線が検出できなかった場合は停止 - if not result.detected: - return SteeringOutput(throttle=0.0, steer=0.0) - - # 位置偏差 + 傾きによる操舵量 - error = ( - p.kp * result.position_error - + p.kh * result.heading - ) - - # 時間差分の計算 - now = time.time() - dt = ( - now - self._prev_time - if self._prev_time > 0 - else 0.033 - ) - dt = max(dt, 0.001) - - # 微分項 - derivative = (error - self._prev_error) / dt - steer = error + p.kd * derivative - - # 操舵量のクランプ - steer = max(-1.0, min(1.0, steer)) - - # レートリミッター(急な操舵変化を制限) - delta = steer - self._prev_steer - max_delta = p.max_steer_rate - delta = max(-max_delta, min(max_delta, delta)) - steer = self._prev_steer + delta - - # 速度制御(曲率連動) - throttle = ( - p.max_throttle - - p.speed_k * abs(result.curvature) - ) - throttle = max(0.0, throttle) - - # 状態の更新 - self._prev_error = error - self._prev_time = now - self._prev_steer = steer - - return SteeringOutput( - throttle=throttle, steer=steer, - ) - - def reset(self) -> None: - """内部状態をリセットする""" - self._prev_error = 0.0 - self._prev_time = 0.0 - self._prev_steer = 0.0 - self._last_result = None - reset_valley_tracker() - - @property - def last_detect_result(self): - """直近の線検出結果を取得する""" - return self._last_result diff --git a/src/pc/steering/pursuit_control.py b/src/pc/steering/pursuit_control.py deleted file mode 100644 index db0b793..0000000 --- a/src/pc/steering/pursuit_control.py +++ /dev/null @@ -1,171 +0,0 @@ -""" -pursuit_control -2点パシュートによる操舵量計算モジュール -行中心点に Theil-Sen 直線近似を適用し,外れ値に強い操舵量を算出する -""" - -from dataclasses import dataclass - -import numpy as np - -from common import config -from pc.steering.base import SteeringBase, SteeringOutput -from pc.vision.fitting import theil_sen_fit -from pc.vision.line_detector import ( - ImageParams, - detect_line, - reset_valley_tracker, -) - - -@dataclass -class PursuitParams: - """2点パシュート制御のパラメータ - - Attributes: - near_ratio: 近い目標点の位置(0.0=上端,1.0=下端) - far_ratio: 遠い目標点の位置(0.0=上端,1.0=下端) - k_near: 近い目標点の操舵ゲイン - k_far: 遠い目標点の操舵ゲイン - max_steer_rate: 1フレームあたりの最大操舵変化量 - max_throttle: 直線での最大速度 - speed_k: カーブ減速係数(2点の差に対する係数) - """ - near_ratio: float = 0.8 - far_ratio: float = 0.3 - k_near: float = 0.5 - k_far: float = 0.3 - max_steer_rate: float = 0.1 - max_throttle: float = 0.4 - speed_k: float = 2.0 - - -class PursuitControl(SteeringBase): - """2点パシュートによる操舵量計算クラス - - 行中心点から Theil-Sen 直線近似を行い, - 直線上の近い点と遠い点の偏差から操舵量を計算する - """ - - def __init__( - self, - params: PursuitParams | None = None, - image_params: ImageParams | None = None, - ) -> None: - self.params: PursuitParams = ( - params or PursuitParams() - ) - self.image_params: ImageParams = ( - image_params or ImageParams() - ) - self._prev_steer: float = 0.0 - self._last_result = None - self._last_pursuit_points: ( - tuple[tuple[float, float], tuple[float, float]] - | None - ) = None - - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から2点パシュートで操舵量を計算する - - Args: - frame: グレースケールのカメラ画像 - - Returns: - 計算された操舵量 - """ - p = self.params - - # 線検出 - result = detect_line(frame, self.image_params) - self._last_result = result - - if not result.detected or result.row_centers is None: - self._last_pursuit_points = None - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - centers = result.row_centers - - # 有効な点(NaN でない行)を抽出 - valid = ~np.isnan(centers) - ys = np.where(valid)[0].astype(float) - xs = centers[valid] - - if len(ys) < 2: - self._last_pursuit_points = None - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - # Theil-Sen 直線近似 - slope, intercept = theil_sen_fit(ys, xs) - - center_x = config.FRAME_WIDTH / 2.0 - h = len(centers) - - # 直線上の 2 点の x 座標を取得 - near_y = h * p.near_ratio - far_y = h * p.far_ratio - near_x = slope * near_y + intercept - far_x = slope * far_y + intercept - - # 目標点を保持(デバッグ表示用) - self._last_pursuit_points = ( - (near_x, near_y), - (far_x, far_y), - ) - - # 各点の偏差(正: 線が左にある → 右に曲がる) - near_err = (center_x - near_x) / center_x - far_err = (center_x - far_x) / center_x - - # 操舵量 - steer = p.k_near * near_err + p.k_far * far_err - steer = max(-1.0, min(1.0, steer)) - - # レートリミッター - delta = steer - self._prev_steer - max_delta = p.max_steer_rate - delta = max(-max_delta, min(max_delta, delta)) - steer = self._prev_steer + delta - - # 速度制御(2点の x 差でカーブ度合いを判定) - curve = abs(near_x - far_x) / center_x - throttle = p.max_throttle - p.speed_k * curve - throttle = max(0.0, throttle) - - self._prev_steer = steer - - return SteeringOutput( - throttle=throttle, steer=steer, - ) - - def reset(self) -> None: - """内部状態をリセットする""" - self._prev_steer = 0.0 - self._last_result = None - self._last_pursuit_points = None - reset_valley_tracker() - - @property - def last_detect_result(self): - """直近の線検出結果を取得する""" - return self._last_result - - @property - def last_pursuit_points( - self, - ) -> ( - tuple[tuple[float, float], tuple[float, float]] - | None - ): - """直近の2点パシュート目標点を取得する - - Returns: - ((near_x, near_y), (far_x, far_y)) または None - """ - return self._last_pursuit_points diff --git a/src/pc/steering/recovery.py b/src/pc/steering/recovery.py deleted file mode 100644 index 31bfcb5..0000000 --- a/src/pc/steering/recovery.py +++ /dev/null @@ -1,109 +0,0 @@ -""" -recovery -コースアウト復帰のパラメータと判定ロジックを定義するモジュール -黒線を一定時間検出できなかった場合に, -最後に検出した方向へ旋回しながら走行して復帰する -""" - -import time -from dataclasses import dataclass - -from pc.steering.base import SteeringOutput - - -@dataclass -class RecoveryParams: - """コースアウト復帰のパラメータ - - Attributes: - enabled: 復帰機能の有効/無効 - timeout_sec: 線を見失ってから復帰動作を開始するまでの時間 - steer_amount: 復帰時の操舵量(0.0 ~ 1.0) - throttle: 復帰時の速度(負: 後退,正: 前進) - """ - enabled: bool = True - timeout_sec: float = 0.5 - steer_amount: float = 0.5 - throttle: float = -0.3 - - -class RecoveryController: - """コースアウト復帰の判定と操舵量算出を行うクラス - - 自動操縦中にフレームごとに呼び出し, - 線検出の成否を記録する.一定時間検出できなかった場合に - 復帰用の操舵量を返す - """ - - def __init__( - self, - params: RecoveryParams | None = None, - ) -> None: - self.params: RecoveryParams = ( - params or RecoveryParams() - ) - self._last_detected_time: float = 0.0 - self._last_error_sign: float = 0.0 - self._is_recovering: bool = False - - def reset(self) -> None: - """内部状態をリセットする - - 自動操縦の開始時に呼び出す - """ - self._last_detected_time = time.time() - self._last_error_sign = 0.0 - self._is_recovering = False - - def update( - self, - detected: bool, - position_error: float = 0.0, - ) -> SteeringOutput | None: - """検出結果を記録し,復帰が必要なら操舵量を返す - - 毎フレーム呼び出す.線が検出できている間は内部状態を - 更新して None を返す.検出できない時間が timeout_sec を - 超えたら復帰用の SteeringOutput を返す - - Args: - detected: 線が検出できたか - position_error: 検出時の位置偏差(正: 線が左) - - Returns: - 復帰操舵量,または None(通常走行を継続) - """ - if not self.params.enabled: - return None - - now = time.time() - - if detected: - self._last_detected_time = now - if position_error != 0.0: - self._last_error_sign = ( - 1.0 if position_error > 0 else -1.0 - ) - self._is_recovering = False - return None - - # 線を見失ってからの経過時間を判定 - elapsed = now - self._last_detected_time - if elapsed < self.params.timeout_sec: - return None - - # 復帰モード: 最後に検出した方向へ旋回 - self._is_recovering = True - steer = ( - self._last_error_sign - * self.params.steer_amount - ) - return SteeringOutput( - throttle=self.params.throttle, - steer=steer, - ) - - @property - def is_recovering(self) -> bool: - """現在復帰動作中かどうかを返す""" - return self._is_recovering diff --git a/src/pc/steering/ts_pd_control.py b/src/pc/steering/ts_pd_control.py deleted file mode 100644 index e940694..0000000 --- a/src/pc/steering/ts_pd_control.py +++ /dev/null @@ -1,177 +0,0 @@ -""" -ts_pd_control -Theil-Sen 直線近似による PD 制御モジュール -行中心点に Theil-Sen 直線をフィッティングし, -位置偏差・傾き・微分項から操舵量を算出する -""" - -import time -from dataclasses import dataclass - -import numpy as np - -from common import config -from pc.steering.base import SteeringBase, SteeringOutput -from pc.vision.fitting import theil_sen_fit -from pc.vision.line_detector import ( - ImageParams, - detect_line, - reset_valley_tracker, -) - - -@dataclass -class TsPdParams: - """Theil-Sen PD 制御のパラメータ - - Attributes: - kp: 位置偏差ゲイン - kh: 傾き(Theil-Sen slope)ゲイン - kd: 微分ゲイン - max_steer_rate: 1フレームあたりの最大操舵変化量 - max_throttle: 直線での最大速度 - speed_k: 傾きベースの減速係数 - """ - kp: float = 0.5 - kh: float = 0.3 - kd: float = 0.1 - max_steer_rate: float = 0.1 - max_throttle: float = 0.4 - speed_k: float = 2.0 - - -class TsPdControl(SteeringBase): - """Theil-Sen 直線近似による PD 制御クラス - - 行中心点から Theil-Sen 直線近似を行い, - 画像下端での位置偏差と直線の傾きから PD 制御で操舵量を計算する - """ - - def __init__( - self, - params: TsPdParams | None = None, - image_params: ImageParams | None = None, - ) -> None: - self.params: TsPdParams = ( - params or TsPdParams() - ) - self.image_params: ImageParams = ( - image_params or ImageParams() - ) - self._prev_error: float = 0.0 - self._prev_time: float = 0.0 - self._prev_steer: float = 0.0 - self._last_result = None - self._last_fit_line: ( - tuple[float, float] | None - ) = None - - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から Theil-Sen PD 制御で操舵量を計算する - - Args: - frame: グレースケールのカメラ画像 - - Returns: - 計算された操舵量 - """ - p = self.params - - # 線検出 - result = detect_line(frame, self.image_params) - self._last_result = result - - if not result.detected or result.row_centers is None: - self._last_fit_line = None - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - centers = result.row_centers - - # 有効な点(NaN でない行)を抽出 - valid = ~np.isnan(centers) - ys = np.where(valid)[0].astype(float) - xs = centers[valid] - - if len(ys) < 2: - self._last_fit_line = None - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - # Theil-Sen 直線近似: x = slope * y + intercept - slope, intercept = theil_sen_fit(ys, xs) - self._last_fit_line = (slope, intercept) - - center_x = config.FRAME_WIDTH / 2.0 - h = len(centers) - - # 画像下端での位置偏差 - bottom_x = slope * (h - 1) + intercept - position_error = (center_x - bottom_x) / center_x - - # 操舵量: P 項(位置偏差)+ Heading 項(傾き) - error = p.kp * position_error + p.kh * slope - - # 時間差分の計算 - now = time.time() - dt = ( - now - self._prev_time - if self._prev_time > 0 - else 0.033 - ) - dt = max(dt, 0.001) - - # D 項(微分項) - derivative = (error - self._prev_error) / dt - steer = error + p.kd * derivative - - # 操舵量のクランプ - steer = max(-1.0, min(1.0, steer)) - - # レートリミッター - delta = steer - self._prev_steer - max_delta = p.max_steer_rate - delta = max(-max_delta, min(max_delta, delta)) - steer = self._prev_steer + delta - - # 速度制御(傾きベース: 傾きが大きい → カーブ → 減速) - throttle = p.max_throttle - p.speed_k * abs(slope) - throttle = max(0.0, throttle) - - # 状態の更新 - self._prev_error = error - self._prev_time = now - self._prev_steer = steer - - return SteeringOutput( - throttle=throttle, steer=steer, - ) - - def reset(self) -> None: - """内部状態をリセットする""" - self._prev_error = 0.0 - self._prev_time = 0.0 - self._prev_steer = 0.0 - self._last_result = None - self._last_fit_line = None - reset_valley_tracker() - - @property - def last_detect_result(self): - """直近の線検出結果を取得する""" - return self._last_result - - @property - def last_fit_line( - self, - ) -> tuple[float, float] | None: - """直近の Theil-Sen 直線近似結果を取得する - - Returns: - (slope, intercept) または None - """ - return self._last_fit_line diff --git a/src/pc/vision/detectors/__init__.py b/src/pc/vision/detectors/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/src/pc/vision/detectors/__init__.py +++ /dev/null diff --git a/src/pc/vision/detectors/blackhat.py b/src/pc/vision/detectors/blackhat.py deleted file mode 100644 index bc80920..0000000 --- a/src/pc/vision/detectors/blackhat.py +++ /dev/null @@ -1,66 +0,0 @@ -""" -blackhat -案A: Black-hat 中心型の線検出 -Black-hat 変換で背景より暗い構造を直接抽出し, -固定閾値 + 距離変換 + 行ごと中心抽出で検出する -""" - -import cv2 -import numpy as np - -from pc.vision.line_detector import ImageParams, LineDetectResult -from pc.vision.line_detector import fit_row_centers -from pc.vision.morphology import ( - apply_dist_mask, - apply_iso_closing, - apply_width_filter, -) - - -def detect_blackhat( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案A: Black-hat 中心型""" - # Black-hat 変換(暗い構造の抽出) - bh_k = params.blackhat_ksize | 1 - bh_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (bh_k, bh_k), - ) - blackhat = cv2.morphologyEx( - frame, cv2.MORPH_BLACKHAT, bh_kernel, - ) - - # ガウシアンブラー - blur_k = params.blur_size | 1 - blurred = cv2.GaussianBlur( - blackhat, (blur_k, blur_k), 0, - ) - - # 固定閾値(Black-hat 後は線が白) - _, binary = cv2.threshold( - blurred, params.binary_thresh, 255, - cv2.THRESH_BINARY, - ) - - # 等方クロージング + 距離変換マスク + 幅フィルタ - binary = apply_iso_closing( - binary, params.iso_close_size, - ) - binary = apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中心抽出 + フィッティング - return fit_row_centers( - binary, params.min_line_width, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) diff --git a/src/pc/vision/detectors/current.py b/src/pc/vision/detectors/current.py deleted file mode 100644 index 12dacba..0000000 --- a/src/pc/vision/detectors/current.py +++ /dev/null @@ -1,76 +0,0 @@ -""" -current -現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング -""" - -import cv2 -import numpy as np - -from pc.vision.line_detector import ( - DETECT_Y_END, - DETECT_Y_START, - MIN_FIT_PIXELS, - ImageParams, - LineDetectResult, - build_result, - no_detection, -) - - -def detect_current( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング""" - # CLAHE でコントラスト強調 - clahe = cv2.createCLAHE( - clipLimit=params.clahe_clip, - tileGridSize=( - params.clahe_grid, - params.clahe_grid, - ), - ) - enhanced = clahe.apply(frame) - - # ガウシアンブラー - blur_k = params.blur_size | 1 - blurred = cv2.GaussianBlur( - enhanced, (blur_k, blur_k), 0, - ) - - # 固定閾値で二値化(黒線を白に反転) - _, binary = cv2.threshold( - blurred, params.binary_thresh, 255, - cv2.THRESH_BINARY_INV, - ) - - # オープニング(孤立ノイズ除去) - if params.open_size >= 3: - open_k = params.open_size | 1 - open_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (open_k, open_k), - ) - binary = cv2.morphologyEx( - binary, cv2.MORPH_OPEN, open_kernel, - ) - - # 横方向クロージング(途切れ補間) - if params.close_width >= 3: - close_h = max(params.close_height | 1, 1) - close_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, - (params.close_width, close_h), - ) - binary = cv2.morphologyEx( - binary, cv2.MORPH_CLOSE, close_kernel, - ) - - # 全ピクセルフィッティング - region = binary[DETECT_Y_START:DETECT_Y_END, :] - ys_local, xs = np.where(region > 0) - - if len(xs) < MIN_FIT_PIXELS: - return no_detection(binary) - - ys = ys_local + DETECT_Y_START - coeffs = np.polyfit(ys, xs, 2) - return build_result(coeffs, binary) diff --git a/src/pc/vision/detectors/dual_norm.py b/src/pc/vision/detectors/dual_norm.py deleted file mode 100644 index 46a9a6d..0000000 --- a/src/pc/vision/detectors/dual_norm.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -dual_norm -案B: 二重正規化型の線検出 -背景除算で照明勾配を除去し, -適応的閾値で局所ムラにも対応する二重防壁構成 -""" - -import cv2 -import numpy as np - -from pc.vision.line_detector import ImageParams, LineDetectResult -from pc.vision.line_detector import fit_row_centers -from pc.vision.morphology import ( - apply_dist_mask, - apply_iso_closing, - apply_staged_closing, - apply_width_filter, -) - - -def detect_dual_norm( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案B: 二重正規化型""" - # 背景除算正規化 - bg_k = params.bg_blur_ksize | 1 - bg = cv2.GaussianBlur( - frame, (bg_k, bg_k), 0, - ) - normalized = ( - frame.astype(np.float32) * 255.0 - / (bg.astype(np.float32) + 1.0) - ) - normalized = np.clip( - normalized, 0, 255, - ).astype(np.uint8) - - # 適応的閾値(ガウシアン,BINARY_INV) - block = max(params.adaptive_block | 1, 3) - binary = cv2.adaptiveThreshold( - normalized, 255, - cv2.ADAPTIVE_THRESH_GAUSSIAN_C, - cv2.THRESH_BINARY_INV, - block, params.adaptive_c, - ) - - # 固定閾値との AND(有効時のみ) - if params.global_thresh > 0: - _, global_mask = cv2.threshold( - normalized, params.global_thresh, - 255, cv2.THRESH_BINARY_INV, - ) - binary = cv2.bitwise_and(binary, global_mask) - - # 段階クロージング or 等方クロージング - if params.stage_min_area > 0: - binary = apply_staged_closing( - binary, - params.stage_close_small, - params.stage_min_area, - params.stage_close_large, - ) - else: - binary = apply_iso_closing( - binary, params.iso_close_size, - ) - - # 距離変換マスク + 幅フィルタ - binary = apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中心抽出 + フィッティング - return fit_row_centers( - binary, params.min_line_width, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) diff --git a/src/pc/vision/detectors/robust.py b/src/pc/vision/detectors/robust.py deleted file mode 100644 index ac55d35..0000000 --- a/src/pc/vision/detectors/robust.py +++ /dev/null @@ -1,66 +0,0 @@ -""" -robust -案C: 最高ロバスト型の線検出 -Black-hat + 適応的閾値の二重正規化に加え, -RANSAC で外れ値を除去する最もロバストな構成 -""" - -import cv2 -import numpy as np - -from pc.vision.line_detector import ImageParams, LineDetectResult -from pc.vision.line_detector import fit_row_centers -from pc.vision.morphology import ( - apply_dist_mask, - apply_iso_closing, - apply_width_filter, -) - - -def detect_robust( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案C: 最高ロバスト型""" - # Black-hat 変換 - bh_k = params.blackhat_ksize | 1 - bh_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (bh_k, bh_k), - ) - blackhat = cv2.morphologyEx( - frame, cv2.MORPH_BLACKHAT, bh_kernel, - ) - - # 適応的閾値(BINARY: Black-hat 後は線が白) - block = max(params.adaptive_block | 1, 3) - binary = cv2.adaptiveThreshold( - blackhat, 255, - cv2.ADAPTIVE_THRESH_GAUSSIAN_C, - cv2.THRESH_BINARY, - block, -params.adaptive_c, - ) - - # 等方クロージング + 距離変換マスク + 幅フィルタ - binary = apply_iso_closing( - binary, params.iso_close_size, - ) - binary = apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中央値抽出 + RANSAC フィッティング - return fit_row_centers( - binary, params.min_line_width, - use_median=True, - ransac_thresh=params.ransac_thresh, - ransac_iter=params.ransac_iter, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) diff --git a/src/pc/vision/detectors/valley.py b/src/pc/vision/detectors/valley.py deleted file mode 100644 index ce14b52..0000000 --- a/src/pc/vision/detectors/valley.py +++ /dev/null @@ -1,328 +0,0 @@ -""" -valley -案D: 谷検出+追跡型の線検出 -各行の輝度信号から谷(暗い領域)を直接検出し, -時系列追跡で安定性を確保する.二値化を使用しない -""" - -import cv2 -import numpy as np - -from common import config -from pc.vision.fitting import clean_and_fit -from pc.vision.line_detector import ( - DETECT_Y_END, - DETECT_Y_START, - MIN_FIT_ROWS, - ImageParams, - LineDetectResult, - build_result, - no_detection, -) - - -class ValleyTracker: - """谷検出の時系列追跡を管理するクラス - - 前フレームの多項式係数を保持し,予測・平滑化・ - 検出失敗時のコースティングを提供する - """ - - def __init__(self) -> None: - self._prev_coeffs: np.ndarray | None = None - self._smoothed_coeffs: np.ndarray | None = None - self._frames_lost: int = 0 - - def predict_x(self, y: float) -> float | None: - """前フレームの多項式から x 座標を予測する - - Args: - y: 画像の y 座標 - - Returns: - 予測 x 座標(履歴なしの場合は None) - """ - if self._smoothed_coeffs is None: - return None - return float(np.poly1d(self._smoothed_coeffs)(y)) - - def update( - self, - coeffs: np.ndarray, - alpha: float, - ) -> np.ndarray: - """検出成功時に状態を更新する - - EMA で多項式係数を平滑化し,更新後の係数を返す - - Args: - coeffs: 今フレームのフィッティング係数 - alpha: EMA 係数(1.0 で平滑化なし) - - Returns: - 平滑化後の多項式係数 - """ - self._frames_lost = 0 - if self._smoothed_coeffs is None: - self._smoothed_coeffs = coeffs.copy() - else: - self._smoothed_coeffs = ( - alpha * coeffs - + (1.0 - alpha) * self._smoothed_coeffs - ) - self._prev_coeffs = self._smoothed_coeffs.copy() - return self._smoothed_coeffs - - def coast( - self, max_frames: int, - ) -> LineDetectResult | None: - """検出失敗時に予測結果を返す - - Args: - max_frames: 予測を継続する最大フレーム数 - - Returns: - 予測による結果(継続不可の場合は None) - """ - if self._smoothed_coeffs is None: - return None - self._frames_lost += 1 - if self._frames_lost > max_frames: - return None - # 予測でデバッグ用二値画像は空にする - h = config.FRAME_HEIGHT - w = config.FRAME_WIDTH - blank = np.zeros((h, w), dtype=np.uint8) - return build_result(self._smoothed_coeffs, blank) - - def reset(self) -> None: - """追跡状態をリセットする""" - self._prev_coeffs = None - self._smoothed_coeffs = None - self._frames_lost = 0 - - -_valley_tracker = ValleyTracker() - - -def reset_valley_tracker() -> None: - """谷検出の追跡状態をリセットする""" - _valley_tracker.reset() - - -def _find_row_valley( - row: np.ndarray, - min_depth: int, - expected_width: float, - width_tolerance: float, - predicted_x: float | None, - max_deviation: int, -) -> tuple[float, float] | None: - """1行の輝度信号から最適な谷を検出する - - Args: - row: スムージング済みの1行輝度信号 - min_depth: 最小谷深度 - expected_width: 期待線幅(px,0 で幅フィルタ無効) - width_tolerance: 幅フィルタの上限倍率 - predicted_x: 追跡による予測 x 座標(None で無効) - max_deviation: 予測からの最大許容偏差 - - Returns: - (谷の中心x, 谷の深度) または None - """ - n = len(row) - if n < 5: - return None - - signal = row.astype(np.float32) - - # 極小値を検出(前後より小さい点) - left = signal[:-2] - center = signal[1:-1] - right = signal[2:] - minima_mask = (center <= left) & (center <= right) - minima_indices = np.where(minima_mask)[0] + 1 - - if len(minima_indices) == 0: - return None - - best: tuple[float, float] | None = None - best_score = -1.0 - - for idx in minima_indices: - val = signal[idx] - - # 左の肩を探す - left_shoulder = idx - for i in range(idx - 1, -1, -1): - if signal[i] < signal[i + 1]: - break - left_shoulder = i - # 右の肩を探す - right_shoulder = idx - for i in range(idx + 1, n): - if signal[i] < signal[i - 1]: - break - right_shoulder = i - - # 谷の深度(肩の平均 - 谷底) - shoulder_avg = ( - signal[left_shoulder] + signal[right_shoulder] - ) / 2.0 - depth = shoulder_avg - val - if depth < min_depth: - continue - - # 谷の幅 - width = right_shoulder - left_shoulder - center_x = (left_shoulder + right_shoulder) / 2.0 - - # 幅フィルタ - if expected_width > 0: - max_w = expected_width * width_tolerance - min_w = expected_width / width_tolerance - if width > max_w or width < min_w: - continue - - # 予測との偏差チェック - if predicted_x is not None: - if abs(center_x - predicted_x) > max_deviation: - continue - - # スコア: 深度優先,予測がある場合は近さも考慮 - score = float(depth) - if predicted_x is not None: - dist = abs(center_x - predicted_x) - score += max(0.0, max_deviation - dist) - - if score > best_score: - best_score = score - best = (center_x, float(depth)) - - return best - - -def _build_valley_binary( - shape: tuple[int, int], - centers_y: list[int], - centers_x: list[float], -) -> np.ndarray: - """谷検出結果からデバッグ用二値画像を生成する - - Args: - shape: 出力画像の (高さ, 幅) - centers_y: 検出行の y 座標リスト - centers_x: 検出行の中心 x 座標リスト - - Returns: - デバッグ用二値画像 - """ - binary = np.zeros(shape, dtype=np.uint8) - half_w = 3 - w = shape[1] - for y, cx in zip(centers_y, centers_x): - x0 = max(0, int(cx) - half_w) - x1 = min(w, int(cx) + half_w + 1) - binary[y, x0:x1] = 255 - return binary - - -def detect_valley( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案D: 谷検出+追跡型""" - h, w = frame.shape[:2] - - # 行ごとにガウシアン平滑化するため画像全体をブラー - gauss_k = params.valley_gauss_ksize | 1 - blurred = cv2.GaussianBlur( - frame, (gauss_k, 1), 0, - ) - - # 透視補正の期待幅を計算するための準備 - use_width = ( - params.width_near > 0 and params.width_far > 0 - ) - detect_h = DETECT_Y_END - DETECT_Y_START - denom = max(detect_h - 1, 1) - - centers_y: list[int] = [] - centers_x: list[float] = [] - depths: list[float] = [] - - for y in range(DETECT_Y_START, DETECT_Y_END): - row = blurred[y] - - # 期待幅の計算 - if use_width: - t = (DETECT_Y_END - 1 - y) / denom - expected_w = float(params.width_far) + ( - float(params.width_near) - - float(params.width_far) - ) * t - else: - expected_w = 0.0 - - # 予測 x 座標 - predicted_x = _valley_tracker.predict_x( - float(y), - ) - - result = _find_row_valley( - row, - params.valley_min_depth, - expected_w, - params.width_tolerance, - predicted_x, - params.valley_max_deviation, - ) - if result is not None: - centers_y.append(y) - centers_x.append(result[0]) - depths.append(result[1]) - - # デバッグ用二値画像 - debug_binary = _build_valley_binary( - (h, w), centers_y, centers_x, - ) - - if len(centers_y) < MIN_FIT_ROWS: - # 検出失敗 → コースティングを試みる - coasted = _valley_tracker.coast( - params.valley_coast_frames, - ) - if coasted is not None: - coasted.binary_image = debug_binary - return coasted - return no_detection(debug_binary) - - cy = np.array(centers_y, dtype=np.float64) - cx = np.array(centers_x, dtype=np.float64) - w_arr = np.array(depths, dtype=np.float64) - - # ロバストフィッティング(深度を重みに使用) - coeffs = clean_and_fit( - cy, cx, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - weights=w_arr, - ransac_thresh=params.ransac_thresh, - ransac_iter=params.ransac_iter, - ) - if coeffs is None: - coasted = _valley_tracker.coast( - params.valley_coast_frames, - ) - if coasted is not None: - coasted.binary_image = debug_binary - return coasted - return no_detection(debug_binary) - - # EMA で平滑化 - smoothed = _valley_tracker.update( - coeffs, params.valley_ema_alpha, - ) - - return build_result(smoothed, debug_binary) diff --git a/src/pc/vision/fitting.py b/src/pc/vision/fitting.py deleted file mode 100644 index 34813d5..0000000 --- a/src/pc/vision/fitting.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -fitting -直線・曲線近似の共通ユーティリティモジュール -Theil-Sen 推定,RANSAC,外れ値除去付きフィッティングを提供する -""" - -import numpy as np - -# フィッティングに必要な最小行数 -MIN_FIT_ROWS: int = 10 - -# 近傍外れ値除去の設定 -NEIGHBOR_HALF_WINDOW: int = 3 -NEIGHBOR_FILTER_PASSES: int = 3 - -# 残差ベース反復除去の最大回数 -RESIDUAL_REMOVAL_ITERATIONS: int = 5 - - -def theil_sen_fit( - y: np.ndarray, - x: np.ndarray, -) -> tuple[float, float]: - """Theil-Sen 推定で直線 x = slope * y + intercept を求める - - 全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う - - Args: - y: y 座標の配列(行番号) - x: x 座標の配列(各行の中心) - - Returns: - (slope, intercept) のタプル - """ - n = len(y) - slopes = [] - for i in range(n): - for j in range(i + 1, n): - dy = y[j] - y[i] - if dy != 0: - slopes.append((x[j] - x[i]) / dy) - - if len(slopes) == 0: - return 0.0, float(np.median(x)) - - slope = float(np.median(slopes)) - intercept = float(np.median(x - slope * y)) - return slope, intercept - - -def ransac_polyfit( - ys: np.ndarray, xs: np.ndarray, - degree: int, n_iter: int, thresh: float, -) -> np.ndarray | None: - """RANSAC で外れ値を除去して多項式フィッティング - - Args: - ys: y 座標配列 - xs: x 座標配列 - degree: 多項式の次数 - n_iter: 反復回数 - thresh: 外れ値判定閾値(ピクセル) - - Returns: - 多項式係数(フィッティング失敗時は None) - """ - n = len(ys) - sample_size = degree + 1 - if n < sample_size: - return None - - best_coeffs: np.ndarray | None = None - best_inliers = 0 - rng = np.random.default_rng() - - for _ in range(n_iter): - idx = rng.choice(n, sample_size, replace=False) - coeffs = np.polyfit(ys[idx], xs[idx], degree) - poly = np.poly1d(coeffs) - residuals = np.abs(xs - poly(ys)) - n_inliers = int(np.sum(residuals < thresh)) - if n_inliers > best_inliers: - best_inliers = n_inliers - best_coeffs = coeffs - - # インライアで再フィッティング - if best_coeffs is not None: - poly = np.poly1d(best_coeffs) - inlier_mask = np.abs(xs - poly(ys)) < thresh - if np.sum(inlier_mask) >= sample_size: - best_coeffs = np.polyfit( - ys[inlier_mask], - xs[inlier_mask], - degree, - ) - - return best_coeffs - - -def clean_and_fit( - cy: np.ndarray, - cx: np.ndarray, - median_ksize: int, - neighbor_thresh: float, - residual_thresh: float = 0.0, - weights: np.ndarray | None = None, - ransac_thresh: float = 0.0, - ransac_iter: int = 0, -) -> np.ndarray | None: - """外れ値除去+重み付きフィッティングを行う - - 全検出手法で共通に使えるロバストなフィッティング - (1) 移動メディアンフィルタでスパイクを平滑化 - (2) 近傍中央値からの偏差で外れ値を除去(複数パス) - (3) 重み付き最小二乗(または RANSAC)でフィッティング - (4) 残差ベースの反復除去で外れ値を最終除去 - - Args: - cy: 中心点の y 座標配列 - cx: 中心点の x 座標配列 - median_ksize: 移動メディアンのカーネルサイズ(0 で無効) - neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効) - residual_thresh: 残差除去の閾値 px(0 で無効) - weights: 各点の信頼度(None で均等) - ransac_thresh: RANSAC 閾値(0 以下で無効) - ransac_iter: RANSAC 反復回数 - - Returns: - 多項式係数(フィッティング失敗時は None) - """ - if len(cy) < MIN_FIT_ROWS: - return None - - cx_clean = cx.copy() - mask = np.ones(len(cy), dtype=bool) - - # (1) 移動メディアンフィルタ - if median_ksize >= 3: - k = median_ksize | 1 - half = k // 2 - for i in range(len(cx_clean)): - lo = max(0, i - half) - hi = min(len(cx_clean), i + half + 1) - cx_clean[i] = float(np.median(cx[lo:hi])) - - # (2) 近傍外れ値除去(複数パス) - if neighbor_thresh > 0: - half_n = NEIGHBOR_HALF_WINDOW - for _ in range(NEIGHBOR_FILTER_PASSES): - new_mask = np.ones(len(cx_clean), dtype=bool) - for i in range(len(cx_clean)): - if not mask[i]: - continue - lo = max(0, i - half_n) - hi = min(len(cx_clean), i + half_n + 1) - neighbors = cx_clean[lo:hi][mask[lo:hi]] - if len(neighbors) == 0: - new_mask[i] = False - continue - local_med = float(np.median(neighbors)) - if abs(cx_clean[i] - local_med) > neighbor_thresh: - new_mask[i] = False - if np.array_equal(mask, mask & new_mask): - break - mask = mask & new_mask - - cy = cy[mask] - cx_clean = cx_clean[mask] - if weights is not None: - weights = weights[mask] - - if len(cy) < MIN_FIT_ROWS: - return None - - # (3) フィッティング - if ransac_thresh > 0 and ransac_iter > 0: - coeffs = ransac_polyfit( - cy, cx_clean, 2, ransac_iter, ransac_thresh, - ) - elif weights is not None: - coeffs = np.polyfit(cy, cx_clean, 2, w=weights) - else: - coeffs = np.polyfit(cy, cx_clean, 2) - - if coeffs is None: - return None - - # (4) 残差ベースの反復除去 - if residual_thresh > 0: - for _ in range(RESIDUAL_REMOVAL_ITERATIONS): - poly = np.poly1d(coeffs) - residuals = np.abs(cx_clean - poly(cy)) - inlier = residuals < residual_thresh - if np.all(inlier): - break - if np.sum(inlier) < MIN_FIT_ROWS: - break - cy = cy[inlier] - cx_clean = cx_clean[inlier] - if weights is not None: - weights = weights[inlier] - if weights is not None: - coeffs = np.polyfit( - cy, cx_clean, 2, w=weights, - ) - else: - coeffs = np.polyfit(cy, cx_clean, 2) - - return coeffs diff --git a/src/pc/vision/intersection.py b/src/pc/vision/intersection.py deleted file mode 100644 index 3786630..0000000 --- a/src/pc/vision/intersection.py +++ /dev/null @@ -1,65 +0,0 @@ -""" -intersection -十字路分類モデルの読み込みと推論を行うモジュール - -学習済みモデルとスケーラを読み込み, -二値画像から十字路かどうかを判定する -""" - -from pathlib import Path - -import joblib -import numpy as np - -from common.json_utils import PARAMS_DIR - -# モデル・スケーラの保存先 -_MODEL_PATH: Path = PARAMS_DIR / "intersection_model.pkl" -_SCALER_PATH: Path = PARAMS_DIR / "intersection_scaler.pkl" - - -class IntersectionClassifier: - """十字路分類器 - - 学習済みモデルを読み込み,二値画像から - 十字路かどうかを判定する - """ - - def __init__(self) -> None: - self._model: object | None = None - self._scaler: object | None = None - self._available: bool = False - self._load() - - def _load(self) -> None: - """モデルとスケーラを読み込む""" - if not _MODEL_PATH.exists(): - return - if not _SCALER_PATH.exists(): - return - self._model = joblib.load(_MODEL_PATH) - self._scaler = joblib.load(_SCALER_PATH) - self._available = True - - @property - def available(self) -> bool: - """モデルが利用可能かどうか""" - return self._available - - def predict(self, binary_image: np.ndarray) -> bool: - """二値画像が十字路かどうかを判定する - - Args: - binary_image: 40×30 の二値画像(0/255) - - Returns: - 十字路なら True - """ - if not self._available: - return False - flat = (binary_image.flatten() / 255.0).astype( - np.float32, - ) - x = self._scaler.transform(flat.reshape(1, -1)) - pred = self._model.predict(x) - return bool(pred[0] == 1) diff --git a/src/pc/vision/line_detector.py b/src/pc/vision/line_detector.py deleted file mode 100644 index 7155d16..0000000 --- a/src/pc/vision/line_detector.py +++ /dev/null @@ -1,368 +0,0 @@ -""" -line_detector -カメラ画像から黒線の位置を検出するモジュール -複数の検出手法を切り替えて使用できる - -公開 API: - ImageParams, LineDetectResult, detect_line, - reset_valley_tracker, DETECT_METHODS -""" - -from dataclasses import dataclass - -import cv2 -import numpy as np - -from common import config -from pc.vision.fitting import clean_and_fit - -# 検出領域の y 範囲(画像全体) -DETECT_Y_START: int = 0 -DETECT_Y_END: int = config.FRAME_HEIGHT - -# フィッティングに必要な最小数 -MIN_FIT_PIXELS: int = 50 -MIN_FIT_ROWS: int = 10 - -# 検出手法の定義(キー: 識別子,値: 表示名) -DETECT_METHODS: dict[str, str] = { - "current": "現行(CLAHE + 固定閾値)", - "blackhat": "案A(Black-hat 中心)", - "dual_norm": "案B(二重正規化)", - "robust": "案C(最高ロバスト)", - "valley": "案D(谷検出+追跡)", -} - - -@dataclass -class ImageParams: - """二値化パラメータ - - Attributes: - method: 検出手法の識別子 - clahe_clip: CLAHE のコントラスト増幅上限 - clahe_grid: CLAHE の局所領域分割数 - blur_size: ガウシアンブラーのカーネルサイズ(奇数) - binary_thresh: 二値化の閾値 - open_size: オープニングのカーネルサイズ - close_width: クロージングの横幅 - close_height: クロージングの高さ - blackhat_ksize: Black-hat のカーネルサイズ - bg_blur_ksize: 背景除算のブラーカーネルサイズ - global_thresh: 固定閾値(0 で無効,適応的閾値との AND) - adaptive_block: 適応的閾値のブロックサイズ - adaptive_c: 適応的閾値の定数 C - iso_close_size: 等方クロージングのカーネルサイズ - dist_thresh: 距離変換の閾値 - min_line_width: 行ごと中心抽出の最小線幅 - stage_close_small: 段階クロージング第1段のサイズ - stage_min_area: 孤立除去の最小面積(0 で無効) - stage_close_large: 段階クロージング第2段のサイズ(0 で無効) - ransac_thresh: RANSAC の外れ値判定閾値 - ransac_iter: RANSAC の反復回数 - width_near: 画像下端での期待線幅(px,0 で無効) - width_far: 画像上端での期待線幅(px,0 で無効) - width_tolerance: 幅フィルタの上限倍率 - median_ksize: 中心点列の移動メディアンフィルタサイズ(0 で無効) - neighbor_thresh: 近傍外れ値除去の閾値(px,0 で無効) - residual_thresh: 残差反復除去の閾値(px,0 で無効) - valley_gauss_ksize: 谷検出の行ごとガウシアンカーネルサイズ - valley_min_depth: 谷として認識する最小深度 - valley_max_deviation: 追跡予測からの最大許容偏差(px) - valley_coast_frames: 検出失敗時の予測継続フレーム数 - valley_ema_alpha: 多項式係数の指数移動平均係数 - """ - - # 検出手法 - method: str = "current" - - # 現行手法パラメータ - clahe_clip: float = 2.0 - clahe_grid: int = 8 - blur_size: int = 5 - binary_thresh: int = 80 - open_size: int = 5 - close_width: int = 25 - close_height: int = 3 - - # 案A/C: Black-hat - blackhat_ksize: int = 45 - - # 案B: 背景除算 - bg_blur_ksize: int = 101 - global_thresh: int = 0 # 固定閾値(0 で無効) - - # 案B/C: 適応的閾値 - adaptive_block: int = 51 - adaptive_c: int = 10 - - # 案A/B/C: 後処理 - iso_close_size: int = 15 - dist_thresh: float = 3.0 - min_line_width: int = 3 - - # 案B: 段階クロージング - stage_close_small: int = 5 # 第1段: 小クロージングサイズ - stage_min_area: int = 0 # 孤立除去の最小面積(0 で無効) - stage_close_large: int = 0 # 第2段: 大クロージングサイズ(0 で無効) - - # 案C: RANSAC - ransac_thresh: float = 5.0 - ransac_iter: int = 50 - - # ロバストフィッティング(全手法共通) - median_ksize: int = 7 - neighbor_thresh: float = 10.0 - residual_thresh: float = 8.0 - - # 透視補正付き幅フィルタ(0 で無効) - width_near: int = 0 - width_far: int = 0 - width_tolerance: float = 1.8 - - # 案D: 谷検出+追跡 - valley_gauss_ksize: int = 15 - valley_min_depth: int = 15 - valley_max_deviation: int = 40 - valley_coast_frames: int = 3 - valley_ema_alpha: float = 0.7 - - -@dataclass -class LineDetectResult: - """線検出の結果を格納するデータクラス - - Attributes: - detected: 線が検出できたか - position_error: 画像下端での位置偏差(-1.0~+1.0) - heading: 線の傾き(dx/dy,画像下端での値) - curvature: 線の曲率(d²x/dy²) - poly_coeffs: 多項式の係数(描画用,未検出時は None) - row_centers: 各行の線中心 x 座標(index=行番号, - NaN=その行に線なし,未検出時は None) - binary_image: 二値化後の画像(デバッグ用) - """ - - detected: bool - position_error: float - heading: float - curvature: float - poly_coeffs: np.ndarray | None - row_centers: np.ndarray | None - binary_image: np.ndarray | None - - -# ── 公開 API ────────────────────────────────────── - - -def detect_line( - frame: np.ndarray, - params: ImageParams | None = None, -) -> LineDetectResult: - """画像から黒線の位置を検出する - - params.method に応じて検出手法を切り替える - - Args: - frame: グレースケールのカメラ画像 - params: 二値化パラメータ(None でデフォルト) - - Returns: - 線検出の結果 - """ - if params is None: - params = ImageParams() - - method = params.method - if method == "blackhat": - from pc.vision.detectors.blackhat import ( - detect_blackhat, - ) - return detect_blackhat(frame, params) - if method == "dual_norm": - from pc.vision.detectors.dual_norm import ( - detect_dual_norm, - ) - return detect_dual_norm(frame, params) - if method == "robust": - from pc.vision.detectors.robust import ( - detect_robust, - ) - return detect_robust(frame, params) - if method == "valley": - from pc.vision.detectors.valley import ( - detect_valley, - ) - return detect_valley(frame, params) - - from pc.vision.detectors.current import ( - detect_current, - ) - return detect_current(frame, params) - - -def reset_valley_tracker() -> None: - """谷検出の追跡状態をリセットする""" - from pc.vision.detectors.valley import ( - reset_valley_tracker as _reset, - ) - _reset() - - -# ── 共通結果構築(各検出器から使用) ────────────── - - -def no_detection( - binary: np.ndarray, -) -> LineDetectResult: - """未検出の結果を返す""" - return LineDetectResult( - detected=False, - position_error=0.0, - heading=0.0, - curvature=0.0, - poly_coeffs=None, - row_centers=None, - binary_image=binary, - ) - - -def _extract_row_centers( - binary: np.ndarray, -) -> np.ndarray | None: - """二値画像の最大連結領域から各行の線中心を求める - - Args: - binary: 二値画像 - - Returns: - 各行の中心 x 座標(NaN=その行に線なし), - 最大領域が見つからない場合は None - """ - h, w = binary.shape[:2] - num_labels, labels, stats, _ = ( - cv2.connectedComponentsWithStats(binary) - ) - - if num_labels <= 1: - return None - - # 背景(ラベル 0)を除いた最大領域を取得 - areas = stats[1:, cv2.CC_STAT_AREA] - largest_label = int(np.argmax(areas)) + 1 - - # 最大領域のマスク - mask = (labels == largest_label).astype(np.uint8) - - # 各行の左右端から中心を計算 - centers = np.full(h, np.nan) - for y in range(h): - row = mask[y] - cols = np.where(row > 0)[0] - if len(cols) > 0: - centers[y] = (cols[0] + cols[-1]) / 2.0 - - return centers - - -def build_result( - coeffs: np.ndarray, - binary: np.ndarray, - row_centers: np.ndarray | None = None, -) -> LineDetectResult: - """多項式係数から LineDetectResult を構築する - - row_centers が None の場合は binary から自動抽出する - """ - poly = np.poly1d(coeffs) - center_x = config.FRAME_WIDTH / 2.0 - - # 画像下端での位置偏差 - x_bottom = poly(DETECT_Y_END) - position_error = (center_x - x_bottom) / center_x - - # 傾き: dx/dy(画像下端での値) - poly_deriv = poly.deriv() - heading = float(poly_deriv(DETECT_Y_END)) - - # 曲率: d²x/dy² - poly_deriv2 = poly_deriv.deriv() - curvature = float(poly_deriv2(DETECT_Y_END)) - - # row_centers が未提供なら binary から抽出 - if row_centers is None: - row_centers = _extract_row_centers(binary) - - return LineDetectResult( - detected=True, - position_error=position_error, - heading=heading, - curvature=curvature, - poly_coeffs=coeffs, - row_centers=row_centers, - binary_image=binary, - ) - - -def fit_row_centers( - binary: np.ndarray, - min_width: int, - use_median: bool = False, - ransac_thresh: float = 0.0, - ransac_iter: int = 0, - median_ksize: int = 0, - neighbor_thresh: float = 0.0, - residual_thresh: float = 0.0, -) -> LineDetectResult: - """行ごとの中心点に多項式をフィッティングする - - 各行の白ピクセルの中心(平均または中央値)を1点抽出し, - ロバスト前処理の後にフィッティングする. - 幅の変動に強く,各行が等しく寄与する - - Args: - binary: 二値画像 - min_width: 線として認識する最小ピクセル数 - use_median: True の場合は中央値を使用 - ransac_thresh: RANSAC 閾値(0 以下で無効) - ransac_iter: RANSAC 反復回数 - median_ksize: 移動メディアンのカーネルサイズ - neighbor_thresh: 近傍外れ値除去の閾値 px - residual_thresh: 残差反復除去の閾値 px - - Returns: - 線検出の結果 - """ - region = binary[DETECT_Y_START:DETECT_Y_END, :] - centers_y: list[float] = [] - centers_x: list[float] = [] - - for y_local in range(region.shape[0]): - xs = np.where(region[y_local] > 0)[0] - if len(xs) < min_width: - continue - y = float(y_local + DETECT_Y_START) - centers_y.append(y) - if use_median: - centers_x.append(float(np.median(xs))) - else: - centers_x.append(float(np.mean(xs))) - - if len(centers_y) < MIN_FIT_ROWS: - return no_detection(binary) - - cy = np.array(centers_y) - cx = np.array(centers_x) - - coeffs = clean_and_fit( - cy, cx, - median_ksize=median_ksize, - neighbor_thresh=neighbor_thresh, - residual_thresh=residual_thresh, - ransac_thresh=ransac_thresh, - ransac_iter=ransac_iter, - ) - if coeffs is None: - return no_detection(binary) - - return build_result(coeffs, binary) diff --git a/src/pc/vision/morphology.py b/src/pc/vision/morphology.py deleted file mode 100644 index 9b7edd3..0000000 --- a/src/pc/vision/morphology.py +++ /dev/null @@ -1,134 +0,0 @@ -""" -morphology -二値画像の形態学的処理ユーティリティモジュール -""" - -import cv2 -import numpy as np - - -def apply_iso_closing( - binary: np.ndarray, size: int, -) -> np.ndarray: - """等方クロージングで穴を埋める - - Args: - binary: 二値画像 - size: カーネルサイズ - - Returns: - クロージング後の二値画像 - """ - if size < 3: - return binary - k = size | 1 - kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (k, k), - ) - return cv2.morphologyEx( - binary, cv2.MORPH_CLOSE, kernel, - ) - - -def apply_staged_closing( - binary: np.ndarray, - small_size: int, - min_area: int, - large_size: int, -) -> np.ndarray: - """段階クロージング: 小穴埋め → 孤立除去 → 大穴埋め - - Args: - binary: 二値画像 - small_size: 第1段クロージングのカーネルサイズ - min_area: 孤立領域除去の最小面積(0 で無効) - large_size: 第2段クロージングのカーネルサイズ(0 で無効) - - Returns: - 処理後の二値画像 - """ - # 第1段: 小さいクロージングで近接ピクセルをつなぐ - result = apply_iso_closing(binary, small_size) - - # 孤立領域の除去 - if min_area > 0: - contours, _ = cv2.findContours( - result, cv2.RETR_EXTERNAL, - cv2.CHAIN_APPROX_SIMPLE, - ) - mask = np.zeros_like(result) - for cnt in contours: - if cv2.contourArea(cnt) >= min_area: - cv2.drawContours( - mask, [cnt], -1, 255, -1, - ) - result = mask - - # 第2段: 大きいクロージングで中抜けを埋める - result = apply_iso_closing(result, large_size) - - return result - - -def apply_width_filter( - binary: np.ndarray, - width_near: int, - width_far: int, - tolerance: float, -) -> np.ndarray: - """透視補正付き幅フィルタで広がりすぎた行を除外する - - 各行の期待線幅を線形補間で算出し, - 実際の幅が上限(期待幅 × tolerance)を超える行をマスクする - - Args: - binary: 二値画像 - width_near: 画像下端での期待線幅(px) - width_far: 画像上端での期待線幅(px) - tolerance: 上限倍率 - - Returns: - 幅フィルタ適用後の二値画像 - """ - result = binary.copy() - h = binary.shape[0] - denom = max(h - 1, 1) - - for y_local in range(h): - xs = np.where(binary[y_local] > 0)[0] - if len(xs) == 0: - continue - # 画像下端(近距離)ほど t=1,上端(遠距離)ほど t=0 - t = (h - 1 - y_local) / denom - expected = float(width_far) + ( - float(width_near) - float(width_far) - ) * t - max_w = expected * tolerance - actual_w = int(xs[-1]) - int(xs[0]) + 1 - if actual_w > max_w: - result[y_local] = 0 - - return result - - -def apply_dist_mask( - binary: np.ndarray, thresh: float, -) -> np.ndarray: - """距離変換で中心部のみを残す - - Args: - binary: 二値画像 - thresh: 距離の閾値(ピクセル) - - Returns: - 中心部のみの二値画像 - """ - if thresh <= 0: - return binary - dist = cv2.distanceTransform( - binary, cv2.DIST_L2, 5, - ) - _, mask = cv2.threshold( - dist, thresh, 255, cv2.THRESH_BINARY, - ) - return mask.astype(np.uint8) diff --git a/src/pc/vision/overlay.py b/src/pc/vision/overlay.py index a9da75b..f8f2604 100644 --- a/src/pc/vision/overlay.py +++ b/src/pc/vision/overlay.py @@ -9,8 +9,8 @@ import cv2 import numpy as np -from pc.vision.fitting import theil_sen_fit -from pc.vision.line_detector import LineDetectResult +from common.vision.fitting import theil_sen_fit +from common.vision.line_detector import LineDetectResult # 描画色の定義 (BGR) COLOR_LINE: tuple = (0, 255, 0) diff --git a/src/pi/main.py b/src/pi/main.py index 8739db3..2c61ecc 100644 --- a/src/pi/main.py +++ b/src/pi/main.py @@ -13,22 +13,22 @@ from pi.camera.capture import CameraCapture from pi.comm.zmq_client import PiZmqClient from pi.motor.driver import MotorDriver -from pi.steering.base import SteeringBase, SteeringOutput -from pi.steering.pd_control import PdControl, PdParams -from pi.steering.pursuit_control import ( +from common.steering.base import SteeringBase, SteeringOutput +from common.steering.pd_control import PdControl, PdParams +from common.steering.pursuit_control import ( PursuitControl, PursuitParams, ) -from pi.steering.recovery import ( +from common.steering.recovery import ( RecoveryController, RecoveryParams, ) -from pi.steering.ts_pd_control import ( +from common.steering.ts_pd_control import ( TsPdControl, TsPdParams, ) -from pi.vision.intersection import IntersectionClassifier -from pi.vision.line_detector import ImageParams +from common.vision.intersection import IntersectionClassifier +from common.vision.line_detector import ImageParams def main() -> None: diff --git a/src/pi/steering/__init__.py b/src/pi/steering/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/src/pi/steering/__init__.py +++ /dev/null diff --git a/src/pi/steering/base.py b/src/pi/steering/base.py deleted file mode 100644 index 40eff1a..0000000 --- a/src/pi/steering/base.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -base -操舵量計算の共通インターフェースを定義するモジュール -全ての操舵量計算クラスはこのインターフェースに従う -""" - -from abc import ABC, abstractmethod -from dataclasses import dataclass - -import numpy as np - - -@dataclass -class SteeringOutput: - """操舵量計算の出力を格納するデータクラス - - Attributes: - throttle: 前後方向の出力 (-1.0 ~ +1.0) - steer: 左右方向の出力 (-1.0 ~ +1.0) - """ - throttle: float - steer: float - - -class SteeringBase(ABC): - """操舵量計算の基底クラス - - 全ての操舵量計算クラスはこのクラスを継承し, - compute メソッドを実装する - """ - - @abstractmethod - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から操舵量を計算する - - Args: - frame: BGR 形式のカメラ画像 - - Returns: - 計算された操舵量 - """ - - @abstractmethod - def reset(self) -> None: - """内部状態をリセットする - - 自動操縦の開始時に呼び出される - """ diff --git a/src/pi/steering/pd_control.py b/src/pi/steering/pd_control.py deleted file mode 100644 index 4600dd9..0000000 --- a/src/pi/steering/pd_control.py +++ /dev/null @@ -1,138 +0,0 @@ -""" -pd_control -PD 制御による操舵量計算モジュール -多項式フィッティングの位置・傾き・曲率から操舵量と速度を算出する -""" - -import time -from dataclasses import dataclass - -import numpy as np - -from pi.steering.base import SteeringBase, SteeringOutput -from pi.vision.line_detector import ( - ImageParams, - LineDetectResult, - detect_line, - reset_valley_tracker, -) - - -@dataclass -class PdParams: - """PD 制御のパラメータ - - Attributes: - kp: 位置偏差ゲイン - kh: 傾き(ヘディング)ゲイン - kd: 微分ゲイン - max_steer_rate: 1フレームあたりの最大操舵変化量 - max_throttle: 直線での最大速度 - speed_k: 曲率ベースの減速係数 - """ - kp: float = 0.5 - kh: float = 0.3 - kd: float = 0.1 - max_steer_rate: float = 0.1 - max_throttle: float = 0.4 - speed_k: float = 0.3 - - -class PdControl(SteeringBase): - """PD 制御による操舵量計算クラス""" - - def __init__( - self, - params: PdParams | None = None, - image_params: ImageParams | None = None, - ) -> None: - self.params: PdParams = params or PdParams() - self.image_params: ImageParams = ( - image_params or ImageParams() - ) - self._prev_error: float = 0.0 - self._prev_time: float = 0.0 - self._prev_steer: float = 0.0 - self._last_result: LineDetectResult | None = None - - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から PD 制御で操舵量を計算する - - Args: - frame: グレースケールのカメラ画像 - - Returns: - 計算された操舵量 - """ - p = self.params - - # 線検出 - result = detect_line(frame, self.image_params) - self._last_result = result - - # 線が検出できなかった場合は停止 - if not result.detected: - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - # 位置偏差 + 傾きによる操舵量 - error = ( - p.kp * result.position_error - + p.kh * result.heading - ) - - # 時間差分の計算 - now = time.time() - dt = ( - now - self._prev_time - if self._prev_time > 0 - else 0.033 - ) - dt = max(dt, 0.001) - - # 微分項 - derivative = (error - self._prev_error) / dt - steer = error + p.kd * derivative - - # 操舵量のクランプ - steer = max(-1.0, min(1.0, steer)) - - # レートリミッター(急な操舵変化を制限) - delta = steer - self._prev_steer - max_delta = p.max_steer_rate - delta = max(-max_delta, min(max_delta, delta)) - steer = self._prev_steer + delta - - # 速度制御(曲率連動) - throttle = ( - p.max_throttle - - p.speed_k * abs(result.curvature) - ) - throttle = max(0.0, throttle) - - # 状態の更新 - self._prev_error = error - self._prev_time = now - self._prev_steer = steer - - return SteeringOutput( - throttle=throttle, steer=steer, - ) - - def reset(self) -> None: - """内部状態をリセットする""" - self._prev_error = 0.0 - self._prev_time = 0.0 - self._prev_steer = 0.0 - self._last_result = None - reset_valley_tracker() - - @property - def last_detect_result( - self, - ) -> LineDetectResult | None: - """直近の線検出結果を取得する""" - return self._last_result diff --git a/src/pi/steering/pursuit_control.py b/src/pi/steering/pursuit_control.py deleted file mode 100644 index ee11ab4..0000000 --- a/src/pi/steering/pursuit_control.py +++ /dev/null @@ -1,147 +0,0 @@ -""" -pursuit_control -2点パシュートによる操舵量計算モジュール -行中心点に Theil-Sen 直線近似を適用し,外れ値に強い操舵量を算出する -""" - -from dataclasses import dataclass - -import numpy as np - -from common import config -from pi.steering.base import SteeringBase, SteeringOutput -from pi.vision.fitting import theil_sen_fit -from pi.vision.line_detector import ( - ImageParams, - LineDetectResult, - detect_line, - reset_valley_tracker, -) - - -@dataclass -class PursuitParams: - """2点パシュート制御のパラメータ - - Attributes: - near_ratio: 近い目標点の位置(0.0=上端,1.0=下端) - far_ratio: 遠い目標点の位置(0.0=上端,1.0=下端) - k_near: 近い目標点の操舵ゲイン - k_far: 遠い目標点の操舵ゲイン - max_steer_rate: 1フレームあたりの最大操舵変化量 - max_throttle: 直線での最大速度 - speed_k: カーブ減速係数(2点の差に対する係数) - """ - near_ratio: float = 0.8 - far_ratio: float = 0.3 - k_near: float = 0.5 - k_far: float = 0.3 - max_steer_rate: float = 0.1 - max_throttle: float = 0.4 - speed_k: float = 2.0 - - -class PursuitControl(SteeringBase): - """2点パシュートによる操舵量計算クラス - - 行中心点から Theil-Sen 直線近似を行い, - 直線上の近い点と遠い点の偏差から操舵量を計算する - """ - - def __init__( - self, - params: PursuitParams | None = None, - image_params: ImageParams | None = None, - ) -> None: - self.params: PursuitParams = ( - params or PursuitParams() - ) - self.image_params: ImageParams = ( - image_params or ImageParams() - ) - self._prev_steer: float = 0.0 - self._last_result: LineDetectResult | None = None - - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から2点パシュートで操舵量を計算する - - Args: - frame: グレースケールのカメラ画像 - - Returns: - 計算された操舵量 - """ - p = self.params - - # 線検出 - result = detect_line(frame, self.image_params) - self._last_result = result - - if not result.detected or result.row_centers is None: - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - centers = result.row_centers - - # 有効な点(NaN でない行)を抽出 - valid = ~np.isnan(centers) - ys = np.where(valid)[0].astype(float) - xs = centers[valid] - - if len(ys) < 2: - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - # Theil-Sen 直線近似 - slope, intercept = theil_sen_fit(ys, xs) - - center_x = config.FRAME_WIDTH / 2.0 - h = len(centers) - - # 直線上の 2 点の x 座標を取得 - near_y = h * p.near_ratio - far_y = h * p.far_ratio - near_x = slope * near_y + intercept - far_x = slope * far_y + intercept - - # 各点の偏差(正: 線が左にある → 右に曲がる) - near_err = (center_x - near_x) / center_x - far_err = (center_x - far_x) / center_x - - # 操舵量 - steer = p.k_near * near_err + p.k_far * far_err - steer = max(-1.0, min(1.0, steer)) - - # レートリミッター - delta = steer - self._prev_steer - max_delta = p.max_steer_rate - delta = max(-max_delta, min(max_delta, delta)) - steer = self._prev_steer + delta - - # 速度制御(2点の x 差でカーブ度合いを判定) - curve = abs(near_x - far_x) / center_x - throttle = p.max_throttle - p.speed_k * curve - throttle = max(0.0, throttle) - - self._prev_steer = steer - - return SteeringOutput( - throttle=throttle, steer=steer, - ) - - def reset(self) -> None: - """内部状態をリセットする""" - self._prev_steer = 0.0 - self._last_result = None - reset_valley_tracker() - - @property - def last_detect_result( - self, - ) -> LineDetectResult | None: - """直近の線検出結果を取得する""" - return self._last_result diff --git a/src/pi/steering/recovery.py b/src/pi/steering/recovery.py deleted file mode 100644 index 7170dc8..0000000 --- a/src/pi/steering/recovery.py +++ /dev/null @@ -1,104 +0,0 @@ -""" -recovery -コースアウト復帰のパラメータと判定ロジックを定義するモジュール -黒線を一定時間検出できなかった場合に, -最後に検出した方向へ旋回しながら走行して復帰する -""" - -import time -from dataclasses import dataclass - -from pi.steering.base import SteeringOutput - - -@dataclass -class RecoveryParams: - """コースアウト復帰のパラメータ - - Attributes: - enabled: 復帰機能の有効/無効 - timeout_sec: 線を見失ってから復帰動作を開始するまでの時間 - steer_amount: 復帰時の操舵量(0.0 ~ 1.0) - throttle: 復帰時の速度(負: 後退,正: 前進) - """ - enabled: bool = True - timeout_sec: float = 0.5 - steer_amount: float = 0.5 - throttle: float = -0.3 - - -class RecoveryController: - """コースアウト復帰の判定と操舵量算出を行うクラス - - 自動操縦中にフレームごとに呼び出し, - 線検出の成否を記録する.一定時間検出できなかった場合に - 復帰用の操舵量を返す - """ - - def __init__( - self, - params: RecoveryParams | None = None, - ) -> None: - self.params: RecoveryParams = ( - params or RecoveryParams() - ) - self._last_detected_time: float = 0.0 - self._last_error_sign: float = 0.0 - self._is_recovering: bool = False - - def reset(self) -> None: - """内部状態をリセットする - - 自動操縦の開始時に呼び出す - """ - self._last_detected_time = time.time() - self._last_error_sign = 0.0 - self._is_recovering = False - - def update( - self, - detected: bool, - position_error: float = 0.0, - ) -> SteeringOutput | None: - """検出結果を記録し,復帰が必要なら操舵量を返す - - Args: - detected: 線が検出できたか - position_error: 検出時の位置偏差(正: 線が左) - - Returns: - 復帰操舵量,または None(通常走行を継続) - """ - if not self.params.enabled: - return None - - now = time.time() - - if detected: - self._last_detected_time = now - if position_error != 0.0: - self._last_error_sign = ( - 1.0 if position_error > 0 else -1.0 - ) - self._is_recovering = False - return None - - elapsed = now - self._last_detected_time - if elapsed < self.params.timeout_sec: - return None - - # 復帰モード: 最後に検出した方向へ旋回 - self._is_recovering = True - steer = ( - self._last_error_sign - * self.params.steer_amount - ) - return SteeringOutput( - throttle=self.params.throttle, - steer=steer, - ) - - @property - def is_recovering(self) -> bool: - """現在復帰動作中かどうかを返す""" - return self._is_recovering diff --git a/src/pi/steering/ts_pd_control.py b/src/pi/steering/ts_pd_control.py deleted file mode 100644 index a30744b..0000000 --- a/src/pi/steering/ts_pd_control.py +++ /dev/null @@ -1,160 +0,0 @@ -""" -ts_pd_control -Theil-Sen 直線近似による PD 制御モジュール -行中心点に Theil-Sen 直線をフィッティングし, -位置偏差・傾き・微分項から操舵量を算出する -""" - -import time -from dataclasses import dataclass - -import numpy as np - -from common import config -from pi.steering.base import SteeringBase, SteeringOutput -from pi.vision.fitting import theil_sen_fit -from pi.vision.line_detector import ( - ImageParams, - LineDetectResult, - detect_line, -) - - -@dataclass -class TsPdParams: - """Theil-Sen PD 制御のパラメータ - - Attributes: - kp: 位置偏差ゲイン - kh: 傾き(Theil-Sen slope)ゲイン - kd: 微分ゲイン - max_steer_rate: 1フレームあたりの最大操舵変化量 - max_throttle: 直線での最大速度 - speed_k: 傾きベースの減速係数 - """ - kp: float = 0.5 - kh: float = 0.3 - kd: float = 0.1 - max_steer_rate: float = 0.1 - max_throttle: float = 0.4 - speed_k: float = 2.0 - - -class TsPdControl(SteeringBase): - """Theil-Sen 直線近似による PD 制御クラス - - 行中心点から Theil-Sen 直線近似を行い, - 画像下端での位置偏差と直線の傾きから PD 制御で操舵量を計算する - """ - - def __init__( - self, - params: TsPdParams | None = None, - image_params: ImageParams | None = None, - ) -> None: - self.params: TsPdParams = ( - params or TsPdParams() - ) - self.image_params: ImageParams = ( - image_params or ImageParams() - ) - self._prev_error: float = 0.0 - self._prev_time: float = 0.0 - self._prev_steer: float = 0.0 - self._last_result: LineDetectResult | None = None - - def compute( - self, frame: np.ndarray, - ) -> SteeringOutput: - """カメラ画像から Theil-Sen PD 制御で操舵量を計算する - - Args: - frame: グレースケールのカメラ画像 - - Returns: - 計算された操舵量 - """ - p = self.params - - # 線検出 - result = detect_line(frame, self.image_params) - self._last_result = result - - if not result.detected or result.row_centers is None: - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - centers = result.row_centers - - # 有効な点(NaN でない行)を抽出 - valid = ~np.isnan(centers) - ys = np.where(valid)[0].astype(float) - xs = centers[valid] - - if len(ys) < 2: - return SteeringOutput( - throttle=0.0, steer=0.0, - ) - - # Theil-Sen 直線近似: x = slope * y + intercept - slope, intercept = theil_sen_fit(ys, xs) - - center_x = config.FRAME_WIDTH / 2.0 - h = len(centers) - - # 画像下端での位置偏差 - bottom_x = slope * (h - 1) + intercept - position_error = (center_x - bottom_x) / center_x - - # 操舵量: P 項(位置偏差)+ Heading 項(傾き) - error = p.kp * position_error + p.kh * slope - - # 時間差分の計算 - now = time.time() - dt = ( - now - self._prev_time - if self._prev_time > 0 - else 0.033 - ) - dt = max(dt, 0.001) - - # D 項(微分項) - derivative = (error - self._prev_error) / dt - steer = error + p.kd * derivative - - # 操舵量のクランプ - steer = max(-1.0, min(1.0, steer)) - - # レートリミッター - delta = steer - self._prev_steer - max_delta = p.max_steer_rate - delta = max(-max_delta, min(max_delta, delta)) - steer = self._prev_steer + delta - - # 速度制御(傾きベース) - throttle = p.max_throttle - p.speed_k * abs(slope) - throttle = max(0.0, throttle) - - # 状態の更新 - self._prev_error = error - self._prev_time = now - self._prev_steer = steer - - return SteeringOutput( - throttle=throttle, steer=steer, - ) - - def reset(self) -> None: - """内部状態をリセットする""" - self._prev_error = 0.0 - self._prev_time = 0.0 - self._prev_steer = 0.0 - self._last_result = None - - @property - def last_detect_result( - self, - ) -> LineDetectResult | None: - """直近の線検出結果を取得する""" - return self._last_result diff --git a/src/pi/vision/__init__.py b/src/pi/vision/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/src/pi/vision/__init__.py +++ /dev/null diff --git a/src/pi/vision/detectors/__init__.py b/src/pi/vision/detectors/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/src/pi/vision/detectors/__init__.py +++ /dev/null diff --git a/src/pi/vision/detectors/blackhat.py b/src/pi/vision/detectors/blackhat.py deleted file mode 100644 index 908670d..0000000 --- a/src/pi/vision/detectors/blackhat.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -blackhat -案A: Black-hat 中心型の線検出 -Black-hat 変換で背景より暗い構造を直接抽出し, -固定閾値 + 距離変換 + 行ごと中心抽出で検出する -""" - -import cv2 -import numpy as np - -from pi.vision.line_detector import ( - ImageParams, - LineDetectResult, - fit_row_centers, -) -from pi.vision.morphology import ( - apply_dist_mask, - apply_iso_closing, - apply_width_filter, -) - - -def detect_blackhat( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案A: Black-hat 中心型""" - # Black-hat 変換(暗い構造の抽出) - bh_k = params.blackhat_ksize | 1 - bh_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (bh_k, bh_k), - ) - blackhat = cv2.morphologyEx( - frame, cv2.MORPH_BLACKHAT, bh_kernel, - ) - - # ガウシアンブラー - blur_k = params.blur_size | 1 - blurred = cv2.GaussianBlur( - blackhat, (blur_k, blur_k), 0, - ) - - # 固定閾値(Black-hat 後は線が白) - _, binary = cv2.threshold( - blurred, params.binary_thresh, 255, - cv2.THRESH_BINARY, - ) - - # 等方クロージング + 距離変換マスク + 幅フィルタ - binary = apply_iso_closing( - binary, params.iso_close_size, - ) - binary = apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中心抽出 + フィッティング - return fit_row_centers( - binary, params.min_line_width, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) diff --git a/src/pi/vision/detectors/current.py b/src/pi/vision/detectors/current.py deleted file mode 100644 index 59e1c76..0000000 --- a/src/pi/vision/detectors/current.py +++ /dev/null @@ -1,76 +0,0 @@ -""" -current -現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング -""" - -import cv2 -import numpy as np - -from pi.vision.line_detector import ( - DETECT_Y_END, - DETECT_Y_START, - MIN_FIT_PIXELS, - ImageParams, - LineDetectResult, - build_result, - no_detection, -) - - -def detect_current( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング""" - # CLAHE でコントラスト強調 - clahe = cv2.createCLAHE( - clipLimit=params.clahe_clip, - tileGridSize=( - params.clahe_grid, - params.clahe_grid, - ), - ) - enhanced = clahe.apply(frame) - - # ガウシアンブラー - blur_k = params.blur_size | 1 - blurred = cv2.GaussianBlur( - enhanced, (blur_k, blur_k), 0, - ) - - # 固定閾値で二値化(黒線を白に反転) - _, binary = cv2.threshold( - blurred, params.binary_thresh, 255, - cv2.THRESH_BINARY_INV, - ) - - # オープニング(孤立ノイズ除去) - if params.open_size >= 3: - open_k = params.open_size | 1 - open_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (open_k, open_k), - ) - binary = cv2.morphologyEx( - binary, cv2.MORPH_OPEN, open_kernel, - ) - - # 横方向クロージング(途切れ補間) - if params.close_width >= 3: - close_h = max(params.close_height | 1, 1) - close_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, - (params.close_width, close_h), - ) - binary = cv2.morphologyEx( - binary, cv2.MORPH_CLOSE, close_kernel, - ) - - # 全ピクセルフィッティング - region = binary[DETECT_Y_START:DETECT_Y_END, :] - ys_local, xs = np.where(region > 0) - - if len(xs) < MIN_FIT_PIXELS: - return no_detection(binary) - - ys = ys_local + DETECT_Y_START - coeffs = np.polyfit(ys, xs, 2) - return build_result(coeffs, binary) diff --git a/src/pi/vision/detectors/dual_norm.py b/src/pi/vision/detectors/dual_norm.py deleted file mode 100644 index 170b97e..0000000 --- a/src/pi/vision/detectors/dual_norm.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -dual_norm -案B: 二重正規化型の線検出 -背景除算で照明勾配を除去し, -適応的閾値で局所ムラにも対応する二重防壁構成 -""" - -import cv2 -import numpy as np - -from pi.vision.line_detector import ( - ImageParams, - LineDetectResult, - fit_row_centers, -) -from pi.vision.morphology import ( - apply_dist_mask, - apply_iso_closing, - apply_staged_closing, - apply_width_filter, -) - - -def detect_dual_norm( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案B: 二重正規化型""" - # 背景除算正規化 - bg_k = params.bg_blur_ksize | 1 - bg = cv2.GaussianBlur( - frame, (bg_k, bg_k), 0, - ) - normalized = ( - frame.astype(np.float32) * 255.0 - / (bg.astype(np.float32) + 1.0) - ) - normalized = np.clip( - normalized, 0, 255, - ).astype(np.uint8) - - # 適応的閾値(ガウシアン,BINARY_INV) - block = max(params.adaptive_block | 1, 3) - binary = cv2.adaptiveThreshold( - normalized, 255, - cv2.ADAPTIVE_THRESH_GAUSSIAN_C, - cv2.THRESH_BINARY_INV, - block, params.adaptive_c, - ) - - # 固定閾値との AND(有効時のみ) - if params.global_thresh > 0: - _, global_mask = cv2.threshold( - normalized, params.global_thresh, - 255, cv2.THRESH_BINARY_INV, - ) - binary = cv2.bitwise_and(binary, global_mask) - - # 段階クロージング or 等方クロージング - if params.stage_min_area > 0: - binary = apply_staged_closing( - binary, - params.stage_close_small, - params.stage_min_area, - params.stage_close_large, - ) - else: - binary = apply_iso_closing( - binary, params.iso_close_size, - ) - - # 距離変換マスク + 幅フィルタ - binary = apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中心抽出 + フィッティング - return fit_row_centers( - binary, params.min_line_width, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) diff --git a/src/pi/vision/detectors/robust.py b/src/pi/vision/detectors/robust.py deleted file mode 100644 index fa6e79f..0000000 --- a/src/pi/vision/detectors/robust.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -robust -案C: 最高ロバスト型の線検出 -Black-hat + 適応的閾値の二重正規化に加え, -RANSAC で外れ値を除去する最もロバストな構成 -""" - -import cv2 -import numpy as np - -from pi.vision.line_detector import ( - ImageParams, - LineDetectResult, - fit_row_centers, -) -from pi.vision.morphology import ( - apply_dist_mask, - apply_iso_closing, - apply_width_filter, -) - - -def detect_robust( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案C: 最高ロバスト型""" - # Black-hat 変換 - bh_k = params.blackhat_ksize | 1 - bh_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (bh_k, bh_k), - ) - blackhat = cv2.morphologyEx( - frame, cv2.MORPH_BLACKHAT, bh_kernel, - ) - - # 適応的閾値(BINARY: Black-hat 後は線が白) - block = max(params.adaptive_block | 1, 3) - binary = cv2.adaptiveThreshold( - blackhat, 255, - cv2.ADAPTIVE_THRESH_GAUSSIAN_C, - cv2.THRESH_BINARY, - block, -params.adaptive_c, - ) - - # 等方クロージング + 距離変換マスク + 幅フィルタ - binary = apply_iso_closing( - binary, params.iso_close_size, - ) - binary = apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中央値抽出 + RANSAC フィッティング - return fit_row_centers( - binary, params.min_line_width, - use_median=True, - ransac_thresh=params.ransac_thresh, - ransac_iter=params.ransac_iter, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) diff --git a/src/pi/vision/detectors/valley.py b/src/pi/vision/detectors/valley.py deleted file mode 100644 index 5a485db..0000000 --- a/src/pi/vision/detectors/valley.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -valley -案D: 谷検出+追跡型の線検出 -各行の輝度信号から谷(暗い領域)を直接検出し, -時系列追跡で安定性を確保する.二値化を使用しない -""" - -import cv2 -import numpy as np - -from common import config -from pi.vision.fitting import clean_and_fit -from pi.vision.line_detector import ( - DETECT_Y_END, - DETECT_Y_START, - MIN_FIT_ROWS, - ImageParams, - LineDetectResult, - build_result, - no_detection, -) - - -class ValleyTracker: - """谷検出の時系列追跡を管理するクラス - - 前フレームの多項式係数を保持し,予測・平滑化・ - 検出失敗時のコースティングを提供する - """ - - def __init__(self) -> None: - self._prev_coeffs: np.ndarray | None = None - self._smoothed_coeffs: np.ndarray | None = None - self._frames_lost: int = 0 - - def predict_x(self, y: float) -> float | None: - """前フレームの多項式から x 座標を予測する - - Args: - y: 画像の y 座標 - - Returns: - 予測 x 座標(履歴なしの場合は None) - """ - if self._smoothed_coeffs is None: - return None - return float(np.poly1d(self._smoothed_coeffs)(y)) - - def update( - self, - coeffs: np.ndarray, - alpha: float, - ) -> np.ndarray: - """検出成功時に状態を更新する - - Args: - coeffs: 今フレームのフィッティング係数 - alpha: EMA 係数(1.0 で平滑化なし) - - Returns: - 平滑化後の多項式係数 - """ - self._frames_lost = 0 - if self._smoothed_coeffs is None: - self._smoothed_coeffs = coeffs.copy() - else: - self._smoothed_coeffs = ( - alpha * coeffs - + (1.0 - alpha) * self._smoothed_coeffs - ) - self._prev_coeffs = self._smoothed_coeffs.copy() - return self._smoothed_coeffs - - def coast( - self, max_frames: int, - ) -> LineDetectResult | None: - """検出失敗時に予測結果を返す - - Args: - max_frames: 予測を継続する最大フレーム数 - - Returns: - 予測による結果(継続不可の場合は None) - """ - if self._smoothed_coeffs is None: - return None - self._frames_lost += 1 - if self._frames_lost > max_frames: - return None - h = config.FRAME_HEIGHT - w = config.FRAME_WIDTH - blank = np.zeros((h, w), dtype=np.uint8) - return build_result(self._smoothed_coeffs, blank) - - def reset(self) -> None: - """追跡状態をリセットする""" - self._prev_coeffs = None - self._smoothed_coeffs = None - self._frames_lost = 0 - - -_valley_tracker = ValleyTracker() - - -def reset_valley_tracker() -> None: - """谷検出の追跡状態をリセットする""" - _valley_tracker.reset() - - -def _find_row_valley( - row: np.ndarray, - min_depth: int, - expected_width: float, - width_tolerance: float, - predicted_x: float | None, - max_deviation: int, -) -> tuple[float, float] | None: - """1行の輝度信号から最適な谷を検出する - - Args: - row: スムージング済みの1行輝度信号 - min_depth: 最小谷深度 - expected_width: 期待線幅(px,0 で幅フィルタ無効) - width_tolerance: 幅フィルタの上限倍率 - predicted_x: 追跡による予測 x 座標(None で無効) - max_deviation: 予測からの最大許容偏差 - - Returns: - (谷の中心x, 谷の深度) または None - """ - n = len(row) - if n < 5: - return None - - signal = row.astype(np.float32) - - # 極小値を検出(前後より小さい点) - left = signal[:-2] - center = signal[1:-1] - right = signal[2:] - minima_mask = (center <= left) & (center <= right) - minima_indices = np.where(minima_mask)[0] + 1 - - if len(minima_indices) == 0: - return None - - best: tuple[float, float] | None = None - best_score = -1.0 - - for idx in minima_indices: - val = signal[idx] - - # 左の肩を探す - left_shoulder = idx - for i in range(idx - 1, -1, -1): - if signal[i] < signal[i + 1]: - break - left_shoulder = i - # 右の肩を探す - right_shoulder = idx - for i in range(idx + 1, n): - if signal[i] < signal[i - 1]: - break - right_shoulder = i - - # 谷の深度(肩の平均 - 谷底) - shoulder_avg = ( - signal[left_shoulder] + signal[right_shoulder] - ) / 2.0 - depth = shoulder_avg - val - if depth < min_depth: - continue - - # 谷の幅 - width = right_shoulder - left_shoulder - center_x = (left_shoulder + right_shoulder) / 2.0 - - # 幅フィルタ - if expected_width > 0: - max_w = expected_width * width_tolerance - min_w = expected_width / width_tolerance - if width > max_w or width < min_w: - continue - - # 予測との偏差チェック - if predicted_x is not None: - if abs(center_x - predicted_x) > max_deviation: - continue - - # スコア: 深度優先,予測がある場合は近さも考慮 - score = float(depth) - if predicted_x is not None: - dist = abs(center_x - predicted_x) - score += max(0.0, max_deviation - dist) - - if score > best_score: - best_score = score - best = (center_x, float(depth)) - - return best - - -def _build_valley_binary( - shape: tuple[int, int], - centers_y: list[int], - centers_x: list[float], -) -> np.ndarray: - """谷検出結果からデバッグ用二値画像を生成する - - Args: - shape: 出力画像の (高さ, 幅) - centers_y: 検出行の y 座標リスト - centers_x: 検出行の中心 x 座標リスト - - Returns: - デバッグ用二値画像 - """ - binary = np.zeros(shape, dtype=np.uint8) - half_w = 3 - w = shape[1] - for y, cx in zip(centers_y, centers_x): - x0 = max(0, int(cx) - half_w) - x1 = min(w, int(cx) + half_w + 1) - binary[y, x0:x1] = 255 - return binary - - -def detect_valley( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案D: 谷検出+追跡型""" - h, w = frame.shape[:2] - - # 行ごとにガウシアン平滑化するため画像全体をブラー - gauss_k = params.valley_gauss_ksize | 1 - blurred = cv2.GaussianBlur( - frame, (gauss_k, 1), 0, - ) - - # 透視補正の期待幅を計算するための準備 - use_width = ( - params.width_near > 0 and params.width_far > 0 - ) - detect_h = DETECT_Y_END - DETECT_Y_START - denom = max(detect_h - 1, 1) - - centers_y: list[int] = [] - centers_x: list[float] = [] - depths: list[float] = [] - - for y in range(DETECT_Y_START, DETECT_Y_END): - row = blurred[y] - - # 期待幅の計算 - if use_width: - t = (DETECT_Y_END - 1 - y) / denom - expected_w = float(params.width_far) + ( - float(params.width_near) - - float(params.width_far) - ) * t - else: - expected_w = 0.0 - - # 予測 x 座標 - predicted_x = _valley_tracker.predict_x( - float(y), - ) - - result = _find_row_valley( - row, - params.valley_min_depth, - expected_w, - params.width_tolerance, - predicted_x, - params.valley_max_deviation, - ) - if result is not None: - centers_y.append(y) - centers_x.append(result[0]) - depths.append(result[1]) - - # デバッグ用二値画像 - debug_binary = _build_valley_binary( - (h, w), centers_y, centers_x, - ) - - if len(centers_y) < MIN_FIT_ROWS: - coasted = _valley_tracker.coast( - params.valley_coast_frames, - ) - if coasted is not None: - coasted.binary_image = debug_binary - return coasted - return no_detection(debug_binary) - - cy = np.array(centers_y, dtype=np.float64) - cx = np.array(centers_x, dtype=np.float64) - w_arr = np.array(depths, dtype=np.float64) - - # ロバストフィッティング(深度を重みに使用) - coeffs = clean_and_fit( - cy, cx, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - weights=w_arr, - ransac_thresh=params.ransac_thresh, - ransac_iter=params.ransac_iter, - ) - if coeffs is None: - coasted = _valley_tracker.coast( - params.valley_coast_frames, - ) - if coasted is not None: - coasted.binary_image = debug_binary - return coasted - return no_detection(debug_binary) - - # EMA で平滑化 - smoothed = _valley_tracker.update( - coeffs, params.valley_ema_alpha, - ) - - return build_result(smoothed, debug_binary) diff --git a/src/pi/vision/fitting.py b/src/pi/vision/fitting.py deleted file mode 100644 index c55139d..0000000 --- a/src/pi/vision/fitting.py +++ /dev/null @@ -1,212 +0,0 @@ -""" -fitting -直線・曲線近似の共通ユーティリティモジュール -Theil-Sen 推定,RANSAC,外れ値除去付きフィッティングを提供する -""" - -import numpy as np - -# フィッティングに必要な最小行数 -MIN_FIT_ROWS: int = 10 - -# 近傍外れ値除去の設定 -NEIGHBOR_HALF_WINDOW: int = 3 -NEIGHBOR_FILTER_PASSES: int = 3 - -# 残差ベース反復除去の最大回数 -RESIDUAL_REMOVAL_ITERATIONS: int = 5 - - -def theil_sen_fit( - y: np.ndarray, - x: np.ndarray, -) -> tuple[float, float]: - """Theil-Sen 推定で直線 x = slope * y + intercept を求める - - 全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う - - Args: - y: y 座標の配列(行番号) - x: x 座標の配列(各行の中心) - - Returns: - (slope, intercept) のタプル - """ - n = len(y) - slopes = [] - for i in range(n): - for j in range(i + 1, n): - dy = y[j] - y[i] - if dy != 0: - slopes.append((x[j] - x[i]) / dy) - - if len(slopes) == 0: - return 0.0, float(np.median(x)) - - slope = float(np.median(slopes)) - intercept = float(np.median(x - slope * y)) - return slope, intercept - - -def ransac_polyfit( - ys: np.ndarray, xs: np.ndarray, - degree: int, n_iter: int, thresh: float, -) -> np.ndarray | None: - """RANSAC で外れ値を除去して多項式フィッティング - - Args: - ys: y 座標配列 - xs: x 座標配列 - degree: 多項式の次数 - n_iter: 反復回数 - thresh: 外れ値判定閾値(ピクセル) - - Returns: - 多項式係数(フィッティング失敗時は None) - """ - n = len(ys) - sample_size = degree + 1 - if n < sample_size: - return None - - best_coeffs: np.ndarray | None = None - best_inliers = 0 - rng = np.random.default_rng() - - for _ in range(n_iter): - idx = rng.choice(n, sample_size, replace=False) - coeffs = np.polyfit(ys[idx], xs[idx], degree) - poly = np.poly1d(coeffs) - residuals = np.abs(xs - poly(ys)) - n_inliers = int(np.sum(residuals < thresh)) - if n_inliers > best_inliers: - best_inliers = n_inliers - best_coeffs = coeffs - - # インライアで再フィッティング - if best_coeffs is not None: - poly = np.poly1d(best_coeffs) - inlier_mask = np.abs(xs - poly(ys)) < thresh - if np.sum(inlier_mask) >= sample_size: - best_coeffs = np.polyfit( - ys[inlier_mask], - xs[inlier_mask], - degree, - ) - - return best_coeffs - - -def clean_and_fit( - cy: np.ndarray, - cx: np.ndarray, - median_ksize: int, - neighbor_thresh: float, - residual_thresh: float = 0.0, - weights: np.ndarray | None = None, - ransac_thresh: float = 0.0, - ransac_iter: int = 0, -) -> np.ndarray | None: - """外れ値除去+重み付きフィッティングを行う - - 全検出手法で共通に使えるロバストなフィッティング - (1) 移動メディアンフィルタでスパイクを平滑化 - (2) 近傍中央値からの偏差で外れ値を除去(複数パス) - (3) 重み付き最小二乗(または RANSAC)でフィッティング - (4) 残差ベースの反復除去で外れ値を最終除去 - - Args: - cy: 中心点の y 座標配列 - cx: 中心点の x 座標配列 - median_ksize: 移動メディアンのカーネルサイズ(0 で無効) - neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効) - residual_thresh: 残差除去の閾値 px(0 で無効) - weights: 各点の信頼度(None で均等) - ransac_thresh: RANSAC 閾値(0 以下で無効) - ransac_iter: RANSAC 反復回数 - - Returns: - 多項式係数(フィッティング失敗時は None) - """ - if len(cy) < MIN_FIT_ROWS: - return None - - cx_clean = cx.copy() - mask = np.ones(len(cy), dtype=bool) - - # (1) 移動メディアンフィルタ - if median_ksize >= 3: - k = median_ksize | 1 - half = k // 2 - for i in range(len(cx_clean)): - lo = max(0, i - half) - hi = min(len(cx_clean), i + half + 1) - cx_clean[i] = float(np.median(cx[lo:hi])) - - # (2) 近傍外れ値除去(複数パス) - if neighbor_thresh > 0: - half_n = NEIGHBOR_HALF_WINDOW - for _ in range(NEIGHBOR_FILTER_PASSES): - new_mask = np.ones(len(cx_clean), dtype=bool) - for i in range(len(cx_clean)): - if not mask[i]: - continue - lo = max(0, i - half_n) - hi = min(len(cx_clean), i + half_n + 1) - neighbors = cx_clean[lo:hi][mask[lo:hi]] - if len(neighbors) == 0: - new_mask[i] = False - continue - local_med = float(np.median(neighbors)) - if ( - abs(cx_clean[i] - local_med) - > neighbor_thresh - ): - new_mask[i] = False - if np.array_equal(mask, mask & new_mask): - break - mask = mask & new_mask - - cy = cy[mask] - cx_clean = cx_clean[mask] - if weights is not None: - weights = weights[mask] - - if len(cy) < MIN_FIT_ROWS: - return None - - # (3) フィッティング - if ransac_thresh > 0 and ransac_iter > 0: - coeffs = ransac_polyfit( - cy, cx_clean, 2, ransac_iter, ransac_thresh, - ) - elif weights is not None: - coeffs = np.polyfit(cy, cx_clean, 2, w=weights) - else: - coeffs = np.polyfit(cy, cx_clean, 2) - - if coeffs is None: - return None - - # (4) 残差ベースの反復除去 - if residual_thresh > 0: - for _ in range(RESIDUAL_REMOVAL_ITERATIONS): - poly = np.poly1d(coeffs) - residuals = np.abs(cx_clean - poly(cy)) - inlier = residuals < residual_thresh - if np.all(inlier): - break - if np.sum(inlier) < MIN_FIT_ROWS: - break - cy = cy[inlier] - cx_clean = cx_clean[inlier] - if weights is not None: - weights = weights[inlier] - if weights is not None: - coeffs = np.polyfit( - cy, cx_clean, 2, w=weights, - ) - else: - coeffs = np.polyfit(cy, cx_clean, 2) - - return coeffs diff --git a/src/pi/vision/intersection.py b/src/pi/vision/intersection.py deleted file mode 100644 index 20a94e3..0000000 --- a/src/pi/vision/intersection.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -intersection -十字路分類モデルの読み込みと推論を行うモジュール - -学習済みモデルとスケーラを読み込み, -二値画像から十字路かどうかを判定する -""" - -from pathlib import Path - -import numpy as np - -from common.json_utils import PARAMS_DIR - -# モデル・スケーラの保存先 -_MODEL_PATH: Path = PARAMS_DIR / "intersection_model.pkl" -_SCALER_PATH: Path = PARAMS_DIR / "intersection_scaler.pkl" - - -class IntersectionClassifier: - """十字路分類器 - - 学習済みモデルを読み込み,二値画像から - 十字路かどうかを判定する - scikit-learn を遅延インポートして起動時間を短縮する - """ - - def __init__(self) -> None: - self._model: object | None = None - self._scaler: object | None = None - self._available: bool = False - - def load(self) -> None: - """モデルとスケーラを読み込む(遅延呼び出し用)""" - if not _MODEL_PATH.exists(): - print( - f" モデルが見つかりません: {_MODEL_PATH}" - ) - return - if not _SCALER_PATH.exists(): - print( - f" スケーラが見つかりません: {_SCALER_PATH}" - ) - return - try: - import joblib - - self._model = joblib.load(_MODEL_PATH) - self._scaler = joblib.load(_SCALER_PATH) - self._available = True - except Exception as e: - print(f" モデル読み込みエラー: {e}") - - @property - def available(self) -> bool: - """モデルが利用可能かどうか""" - return self._available - - def predict(self, binary_image: np.ndarray) -> bool: - """二値画像が十字路かどうかを判定する - - Args: - binary_image: 40×30 の二値画像(0/255) - - Returns: - 十字路なら True - """ - if not self._available: - return False - flat = (binary_image.flatten() / 255.0).astype( - np.float32, - ) - x = self._scaler.transform(flat.reshape(1, -1)) - pred = self._model.predict(x) - return bool(pred[0] == 1) diff --git a/src/pi/vision/line_detector.py b/src/pi/vision/line_detector.py deleted file mode 100644 index 5de0cde..0000000 --- a/src/pi/vision/line_detector.py +++ /dev/null @@ -1,364 +0,0 @@ -""" -line_detector -カメラ画像から黒線の位置を検出するモジュール -複数の検出手法を切り替えて使用できる - -公開 API: - ImageParams, LineDetectResult, detect_line, - reset_valley_tracker, DETECT_METHODS -""" - -from dataclasses import dataclass - -import cv2 -import numpy as np - -from common import config -from pi.vision.fitting import clean_and_fit - -# 検出領域の y 範囲(画像全体) -DETECT_Y_START: int = 0 -DETECT_Y_END: int = config.FRAME_HEIGHT - -# フィッティングに必要な最小数 -MIN_FIT_PIXELS: int = 50 -MIN_FIT_ROWS: int = 10 - -# 検出手法の定義(キー: 識別子,値: 表示名) -DETECT_METHODS: dict[str, str] = { - "current": "現行(CLAHE + 固定閾値)", - "blackhat": "案A(Black-hat 中心)", - "dual_norm": "案B(二重正規化)", - "robust": "案C(最高ロバスト)", - "valley": "案D(谷検出+追跡)", -} - - -@dataclass -class ImageParams: - """二値化パラメータ - - Attributes: - method: 検出手法の識別子 - clahe_clip: CLAHE のコントラスト増幅上限 - clahe_grid: CLAHE の局所領域分割数 - blur_size: ガウシアンブラーのカーネルサイズ(奇数) - binary_thresh: 二値化の閾値 - open_size: オープニングのカーネルサイズ - close_width: クロージングの横幅 - close_height: クロージングの高さ - blackhat_ksize: Black-hat のカーネルサイズ - bg_blur_ksize: 背景除算のブラーカーネルサイズ - global_thresh: 固定閾値(0 で無効,適応的閾値との AND) - adaptive_block: 適応的閾値のブロックサイズ - adaptive_c: 適応的閾値の定数 C - iso_close_size: 等方クロージングのカーネルサイズ - dist_thresh: 距離変換の閾値 - min_line_width: 行ごと中心抽出の最小線幅 - stage_close_small: 段階クロージング第1段のサイズ - stage_min_area: 孤立除去の最小面積(0 で無効) - stage_close_large: 段階クロージング第2段のサイズ(0 で無効) - ransac_thresh: RANSAC の外れ値判定閾値 - ransac_iter: RANSAC の反復回数 - width_near: 画像下端での期待線幅(px,0 で無効) - width_far: 画像上端での期待線幅(px,0 で無効) - width_tolerance: 幅フィルタの上限倍率 - median_ksize: 中心点列の移動メディアンフィルタサイズ(0 で無効) - neighbor_thresh: 近傍外れ値除去の閾値(px,0 で無効) - residual_thresh: 残差反復除去の閾値(px,0 で無効) - valley_gauss_ksize: 谷検出の行ごとガウシアンカーネルサイズ - valley_min_depth: 谷として認識する最小深度 - valley_max_deviation: 追跡予測からの最大許容偏差(px) - valley_coast_frames: 検出失敗時の予測継続フレーム数 - valley_ema_alpha: 多項式係数の指数移動平均係数 - """ - - # 検出手法 - method: str = "current" - - # 現行手法パラメータ - clahe_clip: float = 2.0 - clahe_grid: int = 8 - blur_size: int = 5 - binary_thresh: int = 80 - open_size: int = 5 - close_width: int = 25 - close_height: int = 3 - - # 案A/C: Black-hat - blackhat_ksize: int = 45 - - # 案B: 背景除算 - bg_blur_ksize: int = 101 - global_thresh: int = 0 - - # 案B/C: 適応的閾値 - adaptive_block: int = 51 - adaptive_c: int = 10 - - # 案A/B/C: 後処理 - iso_close_size: int = 15 - dist_thresh: float = 3.0 - min_line_width: int = 3 - - # 案B: 段階クロージング - stage_close_small: int = 5 - stage_min_area: int = 0 - stage_close_large: int = 0 - - # 案C: RANSAC - ransac_thresh: float = 5.0 - ransac_iter: int = 50 - - # ロバストフィッティング(全手法共通) - median_ksize: int = 7 - neighbor_thresh: float = 10.0 - residual_thresh: float = 8.0 - - # 透視補正付き幅フィルタ(0 で無効) - width_near: int = 0 - width_far: int = 0 - width_tolerance: float = 1.8 - - # 案D: 谷検出+追跡 - valley_gauss_ksize: int = 15 - valley_min_depth: int = 15 - valley_max_deviation: int = 40 - valley_coast_frames: int = 3 - valley_ema_alpha: float = 0.7 - - -@dataclass -class LineDetectResult: - """線検出の結果を格納するデータクラス - - Attributes: - detected: 線が検出できたか - position_error: 画像下端での位置偏差(-1.0~+1.0) - heading: 線の傾き(dx/dy,画像下端での値) - curvature: 線の曲率(d²x/dy²) - poly_coeffs: 多項式の係数(描画用,未検出時は None) - row_centers: 各行の線中心 x 座標(index=行番号, - NaN=その行に線なし,未検出時は None) - binary_image: 二値化後の画像(デバッグ用) - """ - - detected: bool - position_error: float - heading: float - curvature: float - poly_coeffs: np.ndarray | None - row_centers: np.ndarray | None - binary_image: np.ndarray | None - - -# ── 公開 API ────────────────────────────────────── - - -def detect_line( - frame: np.ndarray, - params: ImageParams | None = None, -) -> LineDetectResult: - """画像から黒線の位置を検出する - - params.method に応じて検出手法を切り替える - - Args: - frame: グレースケールのカメラ画像 - params: 二値化パラメータ(None でデフォルト) - - Returns: - 線検出の結果 - """ - if params is None: - params = ImageParams() - - method = params.method - if method == "blackhat": - from pi.vision.detectors.blackhat import ( - detect_blackhat, - ) - return detect_blackhat(frame, params) - if method == "dual_norm": - from pi.vision.detectors.dual_norm import ( - detect_dual_norm, - ) - return detect_dual_norm(frame, params) - if method == "robust": - from pi.vision.detectors.robust import ( - detect_robust, - ) - return detect_robust(frame, params) - if method == "valley": - from pi.vision.detectors.valley import ( - detect_valley, - ) - return detect_valley(frame, params) - - from pi.vision.detectors.current import ( - detect_current, - ) - return detect_current(frame, params) - - -def reset_valley_tracker() -> None: - """谷検出の追跡状態をリセットする""" - from pi.vision.detectors.valley import ( - reset_valley_tracker as _reset, - ) - _reset() - - -# ── 共通結果構築(各検出器から使用) ────────────── - - -def no_detection( - binary: np.ndarray, -) -> LineDetectResult: - """未検出の結果を返す""" - return LineDetectResult( - detected=False, - position_error=0.0, - heading=0.0, - curvature=0.0, - poly_coeffs=None, - row_centers=None, - binary_image=binary, - ) - - -def _extract_row_centers( - binary: np.ndarray, -) -> np.ndarray | None: - """二値画像の最大連結領域から各行の線中心を求める - - Args: - binary: 二値画像 - - Returns: - 各行の中心 x 座標(NaN=その行に線なし), - 最大領域が見つからない場合は None - """ - h, w = binary.shape[:2] - num_labels, labels, stats, _ = ( - cv2.connectedComponentsWithStats(binary) - ) - - if num_labels <= 1: - return None - - # 背景(ラベル 0)を除いた最大領域を取得 - areas = stats[1:, cv2.CC_STAT_AREA] - largest_label = int(np.argmax(areas)) + 1 - - # 最大領域のマスク - mask = (labels == largest_label).astype(np.uint8) - - # 各行の左右端から中心を計算 - centers = np.full(h, np.nan) - for y in range(h): - row = mask[y] - cols = np.where(row > 0)[0] - if len(cols) > 0: - centers[y] = (cols[0] + cols[-1]) / 2.0 - - return centers - - -def build_result( - coeffs: np.ndarray, - binary: np.ndarray, - row_centers: np.ndarray | None = None, -) -> LineDetectResult: - """多項式係数から LineDetectResult を構築する - - row_centers が None の場合は binary から自動抽出する - """ - poly = np.poly1d(coeffs) - center_x = config.FRAME_WIDTH / 2.0 - - # 画像下端での位置偏差 - x_bottom = poly(DETECT_Y_END) - position_error = (center_x - x_bottom) / center_x - - # 傾き: dx/dy(画像下端での値) - poly_deriv = poly.deriv() - heading = float(poly_deriv(DETECT_Y_END)) - - # 曲率: d²x/dy² - poly_deriv2 = poly_deriv.deriv() - curvature = float(poly_deriv2(DETECT_Y_END)) - - # row_centers が未提供なら binary から抽出 - if row_centers is None: - row_centers = _extract_row_centers(binary) - - return LineDetectResult( - detected=True, - position_error=position_error, - heading=heading, - curvature=curvature, - poly_coeffs=coeffs, - row_centers=row_centers, - binary_image=binary, - ) - - -def fit_row_centers( - binary: np.ndarray, - min_width: int, - use_median: bool = False, - ransac_thresh: float = 0.0, - ransac_iter: int = 0, - median_ksize: int = 0, - neighbor_thresh: float = 0.0, - residual_thresh: float = 0.0, -) -> LineDetectResult: - """行ごとの中心点に多項式をフィッティングする - - Args: - binary: 二値画像 - min_width: 線として認識する最小ピクセル数 - use_median: True の場合は中央値を使用 - ransac_thresh: RANSAC 閾値(0 以下で無効) - ransac_iter: RANSAC 反復回数 - median_ksize: 移動メディアンのカーネルサイズ - neighbor_thresh: 近傍外れ値除去の閾値 px - residual_thresh: 残差反復除去の閾値 px - - Returns: - 線検出の結果 - """ - region = binary[DETECT_Y_START:DETECT_Y_END, :] - centers_y: list[float] = [] - centers_x: list[float] = [] - - for y_local in range(region.shape[0]): - xs = np.where(region[y_local] > 0)[0] - if len(xs) < min_width: - continue - y = float(y_local + DETECT_Y_START) - centers_y.append(y) - if use_median: - centers_x.append(float(np.median(xs))) - else: - centers_x.append(float(np.mean(xs))) - - if len(centers_y) < MIN_FIT_ROWS: - return no_detection(binary) - - cy = np.array(centers_y) - cx = np.array(centers_x) - - coeffs = clean_and_fit( - cy, cx, - median_ksize=median_ksize, - neighbor_thresh=neighbor_thresh, - residual_thresh=residual_thresh, - ransac_thresh=ransac_thresh, - ransac_iter=ransac_iter, - ) - if coeffs is None: - return no_detection(binary) - - return build_result(coeffs, binary) diff --git a/src/pi/vision/morphology.py b/src/pi/vision/morphology.py deleted file mode 100644 index 578dcb6..0000000 --- a/src/pi/vision/morphology.py +++ /dev/null @@ -1,127 +0,0 @@ -""" -morphology -二値画像の形態学的処理ユーティリティモジュール -""" - -import cv2 -import numpy as np - - -def apply_iso_closing( - binary: np.ndarray, size: int, -) -> np.ndarray: - """等方クロージングで穴を埋める - - Args: - binary: 二値画像 - size: カーネルサイズ - - Returns: - クロージング後の二値画像 - """ - if size < 3: - return binary - k = size | 1 - kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (k, k), - ) - return cv2.morphologyEx( - binary, cv2.MORPH_CLOSE, kernel, - ) - - -def apply_staged_closing( - binary: np.ndarray, - small_size: int, - min_area: int, - large_size: int, -) -> np.ndarray: - """段階クロージング: 小穴埋め → 孤立除去 → 大穴埋め - - Args: - binary: 二値画像 - small_size: 第1段クロージングのカーネルサイズ - min_area: 孤立領域除去の最小面積(0 で無効) - large_size: 第2段クロージングのカーネルサイズ(0 で無効) - - Returns: - 処理後の二値画像 - """ - result = apply_iso_closing(binary, small_size) - - if min_area > 0: - contours, _ = cv2.findContours( - result, cv2.RETR_EXTERNAL, - cv2.CHAIN_APPROX_SIMPLE, - ) - mask = np.zeros_like(result) - for cnt in contours: - if cv2.contourArea(cnt) >= min_area: - cv2.drawContours( - mask, [cnt], -1, 255, -1, - ) - result = mask - - result = apply_iso_closing(result, large_size) - - return result - - -def apply_width_filter( - binary: np.ndarray, - width_near: int, - width_far: int, - tolerance: float, -) -> np.ndarray: - """透視補正付き幅フィルタで広がりすぎた行を除外する - - Args: - binary: 二値画像 - width_near: 画像下端での期待線幅(px) - width_far: 画像上端での期待線幅(px) - tolerance: 上限倍率 - - Returns: - 幅フィルタ適用後の二値画像 - """ - result = binary.copy() - h = binary.shape[0] - denom = max(h - 1, 1) - - for y_local in range(h): - xs = np.where(binary[y_local] > 0)[0] - if len(xs) == 0: - continue - t = (h - 1 - y_local) / denom - expected = float(width_far) + ( - float(width_near) - float(width_far) - ) * t - max_w = expected * tolerance - actual_w = int(xs[-1]) - int(xs[0]) + 1 - if actual_w > max_w: - result[y_local] = 0 - - return result - - -def apply_dist_mask( - binary: np.ndarray, thresh: float, -) -> np.ndarray: - """距離変換で中心部のみを残す - - Args: - binary: 二値画像 - thresh: 距離の閾値(ピクセル) - - Returns: - 中心部のみの二値画像 - """ - if thresh <= 0: - return binary - dist = cv2.distanceTransform( - binary, cv2.DIST_L2, 5, - ) - _, mask = cv2.threshold( - dist, thresh, 255, cv2.THRESH_BINARY, - ) - return mask.astype(np.uint8) diff --git a/tests/test_fitting.py b/tests/test_fitting.py index 5791fbf..5230d5c 100644 --- a/tests/test_fitting.py +++ b/tests/test_fitting.py @@ -3,7 +3,7 @@ import numpy as np import pytest -from pc.vision.fitting import ( +from common.vision.fitting import ( MIN_FIT_ROWS, clean_and_fit, ransac_polyfit, diff --git a/tests/test_line_detector.py b/tests/test_line_detector.py index ce186c7..c05c2fb 100644 --- a/tests/test_line_detector.py +++ b/tests/test_line_detector.py @@ -4,7 +4,7 @@ import pytest from common import config -from pc.vision.line_detector import ( +from common.vision.line_detector import ( ImageParams, LineDetectResult, build_result, diff --git a/tests/test_morphology.py b/tests/test_morphology.py index 1cff783..01bb276 100644 --- a/tests/test_morphology.py +++ b/tests/test_morphology.py @@ -4,7 +4,7 @@ import pytest from common import config -from pc.vision.morphology import ( +from common.vision.morphology import ( apply_dist_mask, apply_iso_closing, apply_staged_closing, diff --git a/tests/test_params.py b/tests/test_params.py index d7bf6fa..8c3b1b2 100644 --- a/tests/test_params.py +++ b/tests/test_params.py @@ -5,8 +5,8 @@ import pytest -from pc.steering.pd_control import PdParams -from pc.vision.line_detector import ImageParams +from common.steering.pd_control import PdParams +from common.vision.line_detector import ImageParams class TestAutoParams: @@ -36,9 +36,10 @@ max_steer_rate=0.05, max_throttle=0.6, speed_k=0.4, ) - save_control(params, "blackhat") - loaded, method = load_control() + save_control(params, "blackhat", "pd") + loaded, method, steering = load_control() assert method == "blackhat" + assert steering == "pd" assert loaded.kp == pytest.approx(1.0) assert loaded.kh == pytest.approx(0.5) assert loaded.max_throttle == pytest.approx(0.6) @@ -46,8 +47,9 @@ def test_load_control_missing_file(self) -> None: """ファイルがない場合はデフォルト値を返す""" from pc.steering.auto_params import load_control - params, method = load_control() + params, method, steering = load_control() assert method == "current" + assert steering == "pd" assert params.kp == PdParams().kp def test_save_load_detect_params(