diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index 070fec0..9f72519 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -61,9 +61,16 @@ │ ├── param_store.py プリセット保存・読み込み │ └── auto_params.py パラメータ自動保存・復元 └── vision/ 画像処理 - ├── line_detector.py 線検出(多項式フィッティング) - ├── fitting.py 直線・曲線近似の共通ユーティリティ - └── overlay.py デバッグオーバーレイ描画 + ├── line_detector.py 線検出 API(データクラス・手法ディスパッチ) + ├── fitting.py 直線・曲線近似(Theil-Sen・RANSAC・外れ値除去) + ├── morphology.py 形態学的処理ユーティリティ + ├── overlay.py デバッグオーバーレイ描画 + └── detectors/ 検出手法の実装 + ├── current.py 現行(CLAHE + 固定閾値) + ├── blackhat.py 案A(Black-hat 中心) + ├── dual_norm.py 案B(二重正規化) + ├── robust.py 案C(最高ロバスト) + └── valley.py 案D(谷検出+追跡) 2-4. src/pi/ diff --git a/src/pc/gui/main_window.py b/src/pc/gui/main_window.py index 1516397..a00514e 100644 --- a/src/pc/gui/main_window.py +++ b/src/pc/gui/main_window.py @@ -257,42 +257,42 @@ params = self._pd_control.params self._spin_kp = self._create_spin( - params.kp, 0.0, 5.0, 0.05, + params.kp, 0.0, 0.05, ) self._pd_param_form.addRow( "Kp (位置):", self._spin_kp, ) self._spin_kh = self._create_spin( - params.kh, 0.0, 5.0, 0.05, + params.kh, 0.0, 0.05, ) self._pd_param_form.addRow( "Kh (傾き):", self._spin_kh, ) self._spin_kd = self._create_spin( - params.kd, 0.0, 5.0, 0.05, + params.kd, 0.0, 0.05, ) self._pd_param_form.addRow( "Kd (微分):", self._spin_kd, ) self._spin_max_steer_rate = self._create_spin( - params.max_steer_rate, 0.01, 1.0, 0.01, + params.max_steer_rate, 0.01, 0.01, ) self._pd_param_form.addRow( "操舵制限:", self._spin_max_steer_rate, ) self._spin_max_throttle = self._create_spin( - params.max_throttle, 0.0, 1.0, 0.05, + params.max_throttle, 0.0, 0.05, ) self._pd_param_form.addRow( "最大速度:", self._spin_max_throttle, ) self._spin_speed_k = self._create_spin( - params.speed_k, 0.0, 5.0, 0.05, + params.speed_k, 0.0, 0.05, ) self._pd_param_form.addRow( "減速係数:", self._spin_speed_k, @@ -311,49 +311,49 @@ pp = self._pursuit_control.params self._spin_near_ratio = self._create_spin( - pp.near_ratio, 0.0, 1.0, 0.05, + pp.near_ratio, 0.0, 0.05, ) self._pursuit_param_form.addRow( "近目標(比率):", self._spin_near_ratio, ) self._spin_far_ratio = self._create_spin( - pp.far_ratio, 0.0, 1.0, 0.05, + pp.far_ratio, 0.0, 0.05, ) self._pursuit_param_form.addRow( "遠目標(比率):", self._spin_far_ratio, ) self._spin_k_near = self._create_spin( - pp.k_near, 0.0, 5.0, 0.05, + pp.k_near, 0.0, 0.05, ) self._pursuit_param_form.addRow( "K_near:", self._spin_k_near, ) self._spin_k_far = self._create_spin( - pp.k_far, 0.0, 5.0, 0.05, + pp.k_far, 0.0, 0.05, ) self._pursuit_param_form.addRow( "K_far:", self._spin_k_far, ) self._spin_pursuit_steer_rate = self._create_spin( - pp.max_steer_rate, 0.01, 1.0, 0.01, + pp.max_steer_rate, 0.01, 0.01, ) self._pursuit_param_form.addRow( "操舵制限:", self._spin_pursuit_steer_rate, ) self._spin_pursuit_throttle = self._create_spin( - pp.max_throttle, 0.0, 1.0, 0.05, + pp.max_throttle, 0.0, 0.05, ) self._pursuit_param_form.addRow( "最大速度:", self._spin_pursuit_throttle, ) self._spin_pursuit_speed_k = self._create_spin( - pp.speed_k, 0.0, 10.0, 0.1, + pp.speed_k, 0.0, 0.1, ) self._pursuit_param_form.addRow( "減速係数:", self._spin_pursuit_speed_k, @@ -371,36 +371,15 @@ ] # --- プリセット管理 --- - self._pd_preset_combo = QComboBox() - self._pd_preset_combo.setPlaceholderText( - "プリセット", + self._pd_preset_combo, self._pd_preset_memo = ( + self._create_preset_ui( + layout, + self._on_load_pd_preset, + self._on_save_pd_preset, + self._on_delete_pd_preset, + self._on_pd_preset_selected, + ) ) - layout.addWidget(self._pd_preset_combo) - - self._pd_preset_memo = QLabel("") - self._pd_preset_memo.setWordWrap(True) - self._pd_preset_memo.setStyleSheet( - "font-size: 11px; color: #888;", - ) - layout.addWidget(self._pd_preset_memo) - - btn_layout = QHBoxLayout() - load_btn = QPushButton("読込") - load_btn.clicked.connect( - self._on_load_pd_preset, - ) - btn_layout.addWidget(load_btn) - save_btn = QPushButton("保存") - save_btn.clicked.connect( - self._on_save_pd_preset, - ) - btn_layout.addWidget(save_btn) - del_btn = QPushButton("削除") - del_btn.clicked.connect( - self._on_delete_pd_preset, - ) - btn_layout.addWidget(del_btn) - layout.addLayout(btn_layout) # コールバック接続 self._steering_combo.currentIndexChanged.connect( @@ -418,9 +397,6 @@ spin.valueChanged.connect( self._on_pursuit_param_changed, ) - self._pd_preset_combo.currentIndexChanged \ - .connect(self._on_pd_preset_selected) - self._pd_presets: list[PdPreset] = [] self._refresh_pd_presets() @@ -460,11 +436,11 @@ # --- 現行手法パラメータ --- self._spin_clahe_clip = self._create_spin( - ip.clahe_clip, 0.5, 10.0, 0.5, + ip.clahe_clip, 0.5, 0.5, ) self._add_image_row( "CLAHE強度:", self._spin_clahe_clip, - {"current"}, + {"current"}, "clahe_clip", ) self._spin_binary_thresh = QSpinBox() @@ -474,7 +450,7 @@ ) self._add_image_row( "二値化閾値:", self._spin_binary_thresh, - {"current", "blackhat"}, + {"current", "blackhat"}, "binary_thresh", ) self._spin_open_size = QSpinBox() @@ -483,7 +459,7 @@ self._spin_open_size.setValue(ip.open_size) self._add_image_row( "ノイズ除去:", self._spin_open_size, - {"current"}, + {"current"}, "open_size", ) self._spin_close_width = QSpinBox() @@ -492,7 +468,7 @@ self._spin_close_width.setValue(ip.close_width) self._add_image_row( "途切れ補間:", self._spin_close_width, - {"current"}, + {"current"}, "close_width", ) # --- 案A/C: Black-hat --- @@ -504,7 +480,7 @@ ) self._add_image_row( "BHカーネル:", self._spin_blackhat_ksize, - {"blackhat", "robust"}, + {"blackhat", "robust"}, "blackhat_ksize", ) # --- 案B: 背景除算 --- @@ -516,7 +492,7 @@ ) self._add_image_row( "背景ブラー:", self._spin_bg_blur_ksize, - {"dual_norm"}, + {"dual_norm"}, "bg_blur_ksize", ) self._spin_global_thresh = QSpinBox() @@ -529,7 +505,7 @@ ) self._add_image_row( "固定閾値:", self._spin_global_thresh, - {"dual_norm"}, + {"dual_norm"}, "global_thresh", ) # --- 案B/C: 適応的閾値 --- @@ -541,7 +517,7 @@ ) self._add_image_row( "適応ブロック:", self._spin_adaptive_block, - {"dual_norm", "robust"}, + {"dual_norm", "robust"}, "adaptive_block", ) self._spin_adaptive_c = QSpinBox() @@ -549,7 +525,7 @@ self._spin_adaptive_c.setValue(ip.adaptive_c) self._add_image_row( "適応定数C:", self._spin_adaptive_c, - {"dual_norm", "robust"}, + {"dual_norm", "robust"}, "adaptive_c", ) # --- 案A/B/C: 後処理 --- @@ -562,14 +538,16 @@ self._add_image_row( "穴埋め:", self._spin_iso_close, {"blackhat", "dual_norm", "robust"}, + "iso_close_size", ) self._spin_dist_thresh = self._create_spin( - ip.dist_thresh, 0.0, 10.0, 0.5, + ip.dist_thresh, 0.0, 0.5, ) self._add_image_row( "距離閾値:", self._spin_dist_thresh, {"blackhat", "dual_norm", "robust"}, + "dist_thresh", ) self._spin_min_line_width = QSpinBox() @@ -580,6 +558,7 @@ self._add_image_row( "最小線幅:", self._spin_min_line_width, {"blackhat", "dual_norm", "robust"}, + "min_line_width", ) # --- 案B: 段階クロージング --- @@ -592,7 +571,7 @@ self._add_image_row( "段階穴埋め(小):", self._spin_stage_close_small, - {"dual_norm"}, + {"dual_norm"}, "stage_close_small", ) self._spin_stage_min_area = QSpinBox() @@ -606,7 +585,7 @@ self._add_image_row( "孤立除去面積:", self._spin_stage_min_area, - {"dual_norm"}, + {"dual_norm"}, "stage_min_area", ) self._spin_stage_close_large = QSpinBox() @@ -621,7 +600,7 @@ self._add_image_row( "段階穴埋め(大):", self._spin_stage_close_large, - {"dual_norm"}, + {"dual_norm"}, "stage_close_large", ) # --- ロバストフィッティング(全手法共通) --- @@ -638,32 +617,32 @@ ) self._add_image_row( "メディアン:", self._spin_median_ksize, - all_methods, + all_methods, "median_ksize", ) self._spin_neighbor_thresh = self._create_spin( - ip.neighbor_thresh, 0.0, 50.0, 1.0, + ip.neighbor_thresh, 0.0, 1.0, ) self._add_image_row( "近傍除去:", self._spin_neighbor_thresh, - all_methods, + all_methods, "neighbor_thresh", ) self._spin_residual_thresh = self._create_spin( - ip.residual_thresh, 0.0, 50.0, 1.0, + ip.residual_thresh, 0.0, 1.0, ) self._add_image_row( "残差除去:", self._spin_residual_thresh, - all_methods, + all_methods, "residual_thresh", ) # --- 案C/D: RANSAC --- self._spin_ransac_thresh = self._create_spin( - ip.ransac_thresh, 1.0, 30.0, 1.0, + ip.ransac_thresh, 1.0, 1.0, ) self._add_image_row( "RANSAC閾値:", self._spin_ransac_thresh, - {"robust", "valley"}, + {"robust", "valley"}, "ransac_thresh", ) # --- 幅フィルタ(透視補正) --- @@ -674,6 +653,7 @@ self._add_image_row( "線幅(近)px:", self._spin_width_near, {"blackhat", "dual_norm", "robust", "valley"}, + "width_near", ) self._spin_width_far = QSpinBox() @@ -683,14 +663,16 @@ self._add_image_row( "線幅(遠)px:", self._spin_width_far, {"blackhat", "dual_norm", "robust", "valley"}, + "width_far", ) self._spin_width_tolerance = self._create_spin( - ip.width_tolerance, 1.0, 5.0, 0.1, + ip.width_tolerance, 1.0, 0.1, ) self._add_image_row( "幅フィルタ倍率:", self._spin_width_tolerance, {"blackhat", "dual_norm", "robust", "valley"}, + "width_tolerance", ) # --- 案D: 谷検出+追跡 --- @@ -702,7 +684,7 @@ ) self._add_image_row( "谷ガウス:", self._spin_valley_gauss, - {"valley"}, + {"valley"}, "valley_gauss_ksize", ) self._spin_valley_min_depth = QSpinBox() @@ -712,7 +694,7 @@ ) self._add_image_row( "最小谷深度:", self._spin_valley_min_depth, - {"valley"}, + {"valley"}, "valley_min_depth", ) self._spin_valley_max_dev = QSpinBox() @@ -722,7 +704,7 @@ ) self._add_image_row( "最大偏差:", self._spin_valley_max_dev, - {"valley"}, + {"valley"}, "valley_max_deviation", ) self._spin_valley_coast = QSpinBox() @@ -732,59 +714,36 @@ ) self._add_image_row( "予測継続:", self._spin_valley_coast, - {"valley"}, + {"valley"}, "valley_coast_frames", ) self._spin_valley_ema = self._create_spin( - ip.valley_ema_alpha, 0.0, 1.0, 0.05, + ip.valley_ema_alpha, 0.0, 0.05, ) self._add_image_row( "EMA係数:", self._spin_valley_ema, - {"valley"}, + {"valley"}, "valley_ema_alpha", ) # --- プリセット管理 --- - self._image_preset_combo = QComboBox() - self._image_preset_combo.setPlaceholderText( - "プリセット", - ) - layout.addWidget(self._image_preset_combo) - - self._image_preset_memo = QLabel("") - self._image_preset_memo.setWordWrap(True) - self._image_preset_memo.setStyleSheet( - "font-size: 11px; color: #888;", - ) - layout.addWidget(self._image_preset_memo) - - btn_layout = QHBoxLayout() - load_btn = QPushButton("読込") - load_btn.clicked.connect( + combo, memo = self._create_preset_ui( + layout, self._on_load_image_preset, - ) - btn_layout.addWidget(load_btn) - save_btn = QPushButton("保存") - save_btn.clicked.connect( self._on_save_image_preset, - ) - btn_layout.addWidget(save_btn) - del_btn = QPushButton("削除") - del_btn.clicked.connect( self._on_delete_image_preset, + self._on_image_preset_selected, ) - btn_layout.addWidget(del_btn) - layout.addLayout(btn_layout) + self._image_preset_combo = combo + self._image_preset_memo = memo # コールバック接続 self._method_combo.currentIndexChanged.connect( self._on_method_changed, ) - for widget, _ in self._image_param_vis: + for widget, _, _ in self._image_param_vis: widget.valueChanged.connect( self._on_image_param_changed, ) - self._image_preset_combo.currentIndexChanged \ - .connect(self._on_image_preset_selected) self._image_presets: list[ImagePreset] = [] self._image_filtered: list[int] = [] @@ -799,11 +758,19 @@ label: str, widget: QWidget, methods: set[str], + field: str, ) -> None: - """画像処理パラメータの行を追加する""" + """画像処理パラメータの行を追加する + + Args: + label: フォームラベル + widget: SpinBox ウィジェット + methods: 表示対象の検出手法集合 + field: ImageParams のフィールド名 + """ self._image_form.addRow(label, widget) self._image_param_vis.append( - (widget, methods), + (widget, methods, field), ) def _on_method_changed(self) -> None: @@ -821,8 +788,7 @@ # 新手法のパラメータを読み込み if self._auto_save_enabled: new_ip = load_detect_params(method) - self._pd_control.image_params = new_ip - self._pursuit_control.image_params = new_ip + self._set_image_params(new_ip) self._sync_image_spinboxes() save_control( self._pd_control.params, method, @@ -833,7 +799,7 @@ ) # パラメータの表示/非表示を更新 - for widget, methods in self._image_param_vis: + for widget, methods, _ in self._image_param_vis: visible = method in methods widget.setVisible(visible) label = self._image_form.labelForField( @@ -846,172 +812,28 @@ if hasattr(self, "_image_preset_combo"): self._refresh_image_presets() + def _set_image_params( + self, ip: ImageParams, + ) -> None: + """両制御クラスの image_params を一括設定する""" + self._pd_control.image_params = ip + self._pursuit_control.image_params = ip + def _sync_image_spinboxes(self) -> None: """画像処理パラメータの SpinBox を現在値に同期する""" self._auto_save_enabled = False try: ip = self._pd_control.image_params - self._spin_clahe_clip.setValue( - ip.clahe_clip, - ) - self._spin_binary_thresh.setValue( - ip.binary_thresh, - ) - self._spin_open_size.setValue(ip.open_size) - self._spin_close_width.setValue( - ip.close_width, - ) - self._spin_blackhat_ksize.setValue( - ip.blackhat_ksize, - ) - self._spin_bg_blur_ksize.setValue( - ip.bg_blur_ksize, - ) - self._spin_global_thresh.setValue( - ip.global_thresh, - ) - self._spin_adaptive_block.setValue( - ip.adaptive_block, - ) - self._spin_adaptive_c.setValue( - ip.adaptive_c, - ) - self._spin_iso_close.setValue( - ip.iso_close_size, - ) - self._spin_dist_thresh.setValue( - ip.dist_thresh, - ) - self._spin_min_line_width.setValue( - ip.min_line_width, - ) - self._spin_stage_close_small.setValue( - ip.stage_close_small, - ) - self._spin_stage_min_area.setValue( - ip.stage_min_area, - ) - self._spin_stage_close_large.setValue( - ip.stage_close_large, - ) - self._spin_median_ksize.setValue( - ip.median_ksize, - ) - self._spin_neighbor_thresh.setValue( - ip.neighbor_thresh, - ) - self._spin_residual_thresh.setValue( - ip.residual_thresh, - ) - self._spin_ransac_thresh.setValue( - ip.ransac_thresh, - ) - self._spin_width_near.setValue(ip.width_near) - self._spin_width_far.setValue(ip.width_far) - self._spin_width_tolerance.setValue( - ip.width_tolerance, - ) - # 案D: 谷検出+追跡 - self._spin_valley_gauss.setValue( - ip.valley_gauss_ksize, - ) - self._spin_valley_min_depth.setValue( - ip.valley_min_depth, - ) - self._spin_valley_max_dev.setValue( - ip.valley_max_deviation, - ) - self._spin_valley_coast.setValue( - ip.valley_coast_frames, - ) - self._spin_valley_ema.setValue( - ip.valley_ema_alpha, - ) + for widget, _, field in self._image_param_vis: + widget.setValue(getattr(ip, field)) finally: self._auto_save_enabled = True def _on_image_param_changed(self) -> None: """画像処理パラメータの変更を反映する""" ip = self._pd_control.image_params - # 現行手法 - ip.clahe_clip = self._spin_clahe_clip.value() - ip.binary_thresh = ( - self._spin_binary_thresh.value() - ) - ip.open_size = self._spin_open_size.value() - ip.close_width = self._spin_close_width.value() - # 案A/C: Black-hat - ip.blackhat_ksize = ( - self._spin_blackhat_ksize.value() - ) - # 案B: 背景除算 - ip.bg_blur_ksize = ( - self._spin_bg_blur_ksize.value() - ) - ip.global_thresh = ( - self._spin_global_thresh.value() - ) - # 案B/C: 適応的閾値 - ip.adaptive_block = ( - self._spin_adaptive_block.value() - ) - ip.adaptive_c = self._spin_adaptive_c.value() - # 案A/B/C: 後処理 - ip.iso_close_size = ( - self._spin_iso_close.value() - ) - ip.dist_thresh = ( - self._spin_dist_thresh.value() - ) - ip.min_line_width = ( - self._spin_min_line_width.value() - ) - # 案B: 段階クロージング - ip.stage_close_small = ( - self._spin_stage_close_small.value() - ) - ip.stage_min_area = ( - self._spin_stage_min_area.value() - ) - ip.stage_close_large = ( - self._spin_stage_close_large.value() - ) - # ロバストフィッティング - ip.median_ksize = ( - self._spin_median_ksize.value() - ) - ip.neighbor_thresh = ( - self._spin_neighbor_thresh.value() - ) - ip.residual_thresh = ( - self._spin_residual_thresh.value() - ) - # 案C: RANSAC - ip.ransac_thresh = ( - self._spin_ransac_thresh.value() - ) - # 幅フィルタ(透視補正) - ip.width_near = self._spin_width_near.value() - ip.width_far = self._spin_width_far.value() - ip.width_tolerance = ( - self._spin_width_tolerance.value() - ) - # 案D: 谷検出+追跡 - ip.valley_gauss_ksize = ( - self._spin_valley_gauss.value() - ) - ip.valley_min_depth = ( - self._spin_valley_min_depth.value() - ) - ip.valley_max_deviation = ( - self._spin_valley_max_dev.value() - ) - ip.valley_coast_frames = ( - self._spin_valley_coast.value() - ) - ip.valley_ema_alpha = ( - self._spin_valley_ema.value() - ) + for widget, _, field in self._image_param_vis: + setattr(ip, field, widget.value()) if self._auto_save_enabled: save_detect_params(ip.method, ip) @@ -1057,8 +879,7 @@ self._auto_save_enabled = False try: ip = self._image_presets[idx].image_params - self._pd_control.image_params = ip - self._pursuit_control.image_params = ip + self._set_image_params(ip) self._sync_image_spinboxes() finally: self._auto_save_enabled = True @@ -1225,13 +1046,50 @@ parent_layout.addWidget(group) @staticmethod - def _create_spin( - value: float, min_val: float, - max_val: float, step: float, - ) -> QDoubleSpinBox: - """パラメータ用の SpinBox を作成する + def _create_preset_ui( + layout: QVBoxLayout, + on_load, + on_save, + on_delete, + on_selected, + ) -> tuple[QComboBox, QLabel]: + """プリセット管理 UI(ComboBox + メモ + ボタン)を作成する - 直接入力にも対応するため,範囲は広めに設定する + Returns: + (コンボボックス, メモラベル) のタプル + """ + combo = QComboBox() + combo.setPlaceholderText("プリセット") + layout.addWidget(combo) + + memo = QLabel("") + memo.setWordWrap(True) + memo.setStyleSheet( + "font-size: 11px; color: #888;", + ) + layout.addWidget(memo) + + btn_layout = QHBoxLayout() + for text, callback in [ + ("読込", on_load), + ("保存", on_save), + ("削除", on_delete), + ]: + btn = QPushButton(text) + btn.clicked.connect(callback) + btn_layout.addWidget(btn) + layout.addLayout(btn_layout) + + combo.currentIndexChanged.connect(on_selected) + return combo, memo + + @staticmethod + def _create_spin( + value: float, min_val: float, step: float, + ) -> QDoubleSpinBox: + """パラメータ用の QDoubleSpinBox を作成する + + 直接入力にも対応するため,上限は広めに設定する """ spin = QDoubleSpinBox() spin.setRange(min_val, 99999.0) diff --git a/src/pc/vision/detectors/__init__.py b/src/pc/vision/detectors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/pc/vision/detectors/__init__.py diff --git a/src/pc/vision/detectors/blackhat.py b/src/pc/vision/detectors/blackhat.py new file mode 100644 index 0000000..bc80920 --- /dev/null +++ b/src/pc/vision/detectors/blackhat.py @@ -0,0 +1,66 @@ +""" +blackhat +案A: Black-hat 中心型の線検出 +Black-hat 変換で背景より暗い構造を直接抽出し, +固定閾値 + 距離変換 + 行ごと中心抽出で検出する +""" + +import cv2 +import numpy as np + +from pc.vision.line_detector import ImageParams, LineDetectResult +from pc.vision.line_detector import fit_row_centers +from pc.vision.morphology import ( + apply_dist_mask, + apply_iso_closing, + apply_width_filter, +) + + +def detect_blackhat( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案A: Black-hat 中心型""" + # Black-hat 変換(暗い構造の抽出) + bh_k = params.blackhat_ksize | 1 + bh_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (bh_k, bh_k), + ) + blackhat = cv2.morphologyEx( + frame, cv2.MORPH_BLACKHAT, bh_kernel, + ) + + # ガウシアンブラー + blur_k = params.blur_size | 1 + blurred = cv2.GaussianBlur( + blackhat, (blur_k, blur_k), 0, + ) + + # 固定閾値(Black-hat 後は線が白) + _, binary = cv2.threshold( + blurred, params.binary_thresh, 255, + cv2.THRESH_BINARY, + ) + + # 等方クロージング + 距離変換マスク + 幅フィルタ + binary = apply_iso_closing( + binary, params.iso_close_size, + ) + binary = apply_dist_mask( + binary, params.dist_thresh, + ) + if params.width_near > 0 and params.width_far > 0: + binary = apply_width_filter( + binary, + params.width_near, + params.width_far, + params.width_tolerance, + ) + + # 行ごと中心抽出 + フィッティング + return fit_row_centers( + binary, params.min_line_width, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + ) diff --git a/src/pc/vision/detectors/current.py b/src/pc/vision/detectors/current.py new file mode 100644 index 0000000..12dacba --- /dev/null +++ b/src/pc/vision/detectors/current.py @@ -0,0 +1,76 @@ +""" +current +現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング +""" + +import cv2 +import numpy as np + +from pc.vision.line_detector import ( + DETECT_Y_END, + DETECT_Y_START, + MIN_FIT_PIXELS, + ImageParams, + LineDetectResult, + build_result, + no_detection, +) + + +def detect_current( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング""" + # CLAHE でコントラスト強調 + clahe = cv2.createCLAHE( + clipLimit=params.clahe_clip, + tileGridSize=( + params.clahe_grid, + params.clahe_grid, + ), + ) + enhanced = clahe.apply(frame) + + # ガウシアンブラー + blur_k = params.blur_size | 1 + blurred = cv2.GaussianBlur( + enhanced, (blur_k, blur_k), 0, + ) + + # 固定閾値で二値化(黒線を白に反転) + _, binary = cv2.threshold( + blurred, params.binary_thresh, 255, + cv2.THRESH_BINARY_INV, + ) + + # オープニング(孤立ノイズ除去) + if params.open_size >= 3: + open_k = params.open_size | 1 + open_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (open_k, open_k), + ) + binary = cv2.morphologyEx( + binary, cv2.MORPH_OPEN, open_kernel, + ) + + # 横方向クロージング(途切れ補間) + if params.close_width >= 3: + close_h = max(params.close_height | 1, 1) + close_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, + (params.close_width, close_h), + ) + binary = cv2.morphologyEx( + binary, cv2.MORPH_CLOSE, close_kernel, + ) + + # 全ピクセルフィッティング + region = binary[DETECT_Y_START:DETECT_Y_END, :] + ys_local, xs = np.where(region > 0) + + if len(xs) < MIN_FIT_PIXELS: + return no_detection(binary) + + ys = ys_local + DETECT_Y_START + coeffs = np.polyfit(ys, xs, 2) + return build_result(coeffs, binary) diff --git a/src/pc/vision/detectors/dual_norm.py b/src/pc/vision/detectors/dual_norm.py new file mode 100644 index 0000000..46a9a6d --- /dev/null +++ b/src/pc/vision/detectors/dual_norm.py @@ -0,0 +1,86 @@ +""" +dual_norm +案B: 二重正規化型の線検出 +背景除算で照明勾配を除去し, +適応的閾値で局所ムラにも対応する二重防壁構成 +""" + +import cv2 +import numpy as np + +from pc.vision.line_detector import ImageParams, LineDetectResult +from pc.vision.line_detector import fit_row_centers +from pc.vision.morphology import ( + apply_dist_mask, + apply_iso_closing, + apply_staged_closing, + apply_width_filter, +) + + +def detect_dual_norm( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案B: 二重正規化型""" + # 背景除算正規化 + bg_k = params.bg_blur_ksize | 1 + bg = cv2.GaussianBlur( + frame, (bg_k, bg_k), 0, + ) + normalized = ( + frame.astype(np.float32) * 255.0 + / (bg.astype(np.float32) + 1.0) + ) + normalized = np.clip( + normalized, 0, 255, + ).astype(np.uint8) + + # 適応的閾値(ガウシアン,BINARY_INV) + block = max(params.adaptive_block | 1, 3) + binary = cv2.adaptiveThreshold( + normalized, 255, + cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY_INV, + block, params.adaptive_c, + ) + + # 固定閾値との AND(有効時のみ) + if params.global_thresh > 0: + _, global_mask = cv2.threshold( + normalized, params.global_thresh, + 255, cv2.THRESH_BINARY_INV, + ) + binary = cv2.bitwise_and(binary, global_mask) + + # 段階クロージング or 等方クロージング + if params.stage_min_area > 0: + binary = apply_staged_closing( + binary, + params.stage_close_small, + params.stage_min_area, + params.stage_close_large, + ) + else: + binary = apply_iso_closing( + binary, params.iso_close_size, + ) + + # 距離変換マスク + 幅フィルタ + binary = apply_dist_mask( + binary, params.dist_thresh, + ) + if params.width_near > 0 and params.width_far > 0: + binary = apply_width_filter( + binary, + params.width_near, + params.width_far, + params.width_tolerance, + ) + + # 行ごと中心抽出 + フィッティング + return fit_row_centers( + binary, params.min_line_width, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + ) diff --git a/src/pc/vision/detectors/robust.py b/src/pc/vision/detectors/robust.py new file mode 100644 index 0000000..ac55d35 --- /dev/null +++ b/src/pc/vision/detectors/robust.py @@ -0,0 +1,66 @@ +""" +robust +案C: 最高ロバスト型の線検出 +Black-hat + 適応的閾値の二重正規化に加え, +RANSAC で外れ値を除去する最もロバストな構成 +""" + +import cv2 +import numpy as np + +from pc.vision.line_detector import ImageParams, LineDetectResult +from pc.vision.line_detector import fit_row_centers +from pc.vision.morphology import ( + apply_dist_mask, + apply_iso_closing, + apply_width_filter, +) + + +def detect_robust( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案C: 最高ロバスト型""" + # Black-hat 変換 + bh_k = params.blackhat_ksize | 1 + bh_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (bh_k, bh_k), + ) + blackhat = cv2.morphologyEx( + frame, cv2.MORPH_BLACKHAT, bh_kernel, + ) + + # 適応的閾値(BINARY: Black-hat 後は線が白) + block = max(params.adaptive_block | 1, 3) + binary = cv2.adaptiveThreshold( + blackhat, 255, + cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY, + block, -params.adaptive_c, + ) + + # 等方クロージング + 距離変換マスク + 幅フィルタ + binary = apply_iso_closing( + binary, params.iso_close_size, + ) + binary = apply_dist_mask( + binary, params.dist_thresh, + ) + if params.width_near > 0 and params.width_far > 0: + binary = apply_width_filter( + binary, + params.width_near, + params.width_far, + params.width_tolerance, + ) + + # 行ごと中央値抽出 + RANSAC フィッティング + return fit_row_centers( + binary, params.min_line_width, + use_median=True, + ransac_thresh=params.ransac_thresh, + ransac_iter=params.ransac_iter, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + ) diff --git a/src/pc/vision/detectors/valley.py b/src/pc/vision/detectors/valley.py new file mode 100644 index 0000000..ce14b52 --- /dev/null +++ b/src/pc/vision/detectors/valley.py @@ -0,0 +1,328 @@ +""" +valley +案D: 谷検出+追跡型の線検出 +各行の輝度信号から谷(暗い領域)を直接検出し, +時系列追跡で安定性を確保する.二値化を使用しない +""" + +import cv2 +import numpy as np + +from common import config +from pc.vision.fitting import clean_and_fit +from pc.vision.line_detector import ( + DETECT_Y_END, + DETECT_Y_START, + MIN_FIT_ROWS, + ImageParams, + LineDetectResult, + build_result, + no_detection, +) + + +class ValleyTracker: + """谷検出の時系列追跡を管理するクラス + + 前フレームの多項式係数を保持し,予測・平滑化・ + 検出失敗時のコースティングを提供する + """ + + def __init__(self) -> None: + self._prev_coeffs: np.ndarray | None = None + self._smoothed_coeffs: np.ndarray | None = None + self._frames_lost: int = 0 + + def predict_x(self, y: float) -> float | None: + """前フレームの多項式から x 座標を予測する + + Args: + y: 画像の y 座標 + + Returns: + 予測 x 座標(履歴なしの場合は None) + """ + if self._smoothed_coeffs is None: + return None + return float(np.poly1d(self._smoothed_coeffs)(y)) + + def update( + self, + coeffs: np.ndarray, + alpha: float, + ) -> np.ndarray: + """検出成功時に状態を更新する + + EMA で多項式係数を平滑化し,更新後の係数を返す + + Args: + coeffs: 今フレームのフィッティング係数 + alpha: EMA 係数(1.0 で平滑化なし) + + Returns: + 平滑化後の多項式係数 + """ + self._frames_lost = 0 + if self._smoothed_coeffs is None: + self._smoothed_coeffs = coeffs.copy() + else: + self._smoothed_coeffs = ( + alpha * coeffs + + (1.0 - alpha) * self._smoothed_coeffs + ) + self._prev_coeffs = self._smoothed_coeffs.copy() + return self._smoothed_coeffs + + def coast( + self, max_frames: int, + ) -> LineDetectResult | None: + """検出失敗時に予測結果を返す + + Args: + max_frames: 予測を継続する最大フレーム数 + + Returns: + 予測による結果(継続不可の場合は None) + """ + if self._smoothed_coeffs is None: + return None + self._frames_lost += 1 + if self._frames_lost > max_frames: + return None + # 予測でデバッグ用二値画像は空にする + h = config.FRAME_HEIGHT + w = config.FRAME_WIDTH + blank = np.zeros((h, w), dtype=np.uint8) + return build_result(self._smoothed_coeffs, blank) + + def reset(self) -> None: + """追跡状態をリセットする""" + self._prev_coeffs = None + self._smoothed_coeffs = None + self._frames_lost = 0 + + +_valley_tracker = ValleyTracker() + + +def reset_valley_tracker() -> None: + """谷検出の追跡状態をリセットする""" + _valley_tracker.reset() + + +def _find_row_valley( + row: np.ndarray, + min_depth: int, + expected_width: float, + width_tolerance: float, + predicted_x: float | None, + max_deviation: int, +) -> tuple[float, float] | None: + """1行の輝度信号から最適な谷を検出する + + Args: + row: スムージング済みの1行輝度信号 + min_depth: 最小谷深度 + expected_width: 期待線幅(px,0 で幅フィルタ無効) + width_tolerance: 幅フィルタの上限倍率 + predicted_x: 追跡による予測 x 座標(None で無効) + max_deviation: 予測からの最大許容偏差 + + Returns: + (谷の中心x, 谷の深度) または None + """ + n = len(row) + if n < 5: + return None + + signal = row.astype(np.float32) + + # 極小値を検出(前後より小さい点) + left = signal[:-2] + center = signal[1:-1] + right = signal[2:] + minima_mask = (center <= left) & (center <= right) + minima_indices = np.where(minima_mask)[0] + 1 + + if len(minima_indices) == 0: + return None + + best: tuple[float, float] | None = None + best_score = -1.0 + + for idx in minima_indices: + val = signal[idx] + + # 左の肩を探す + left_shoulder = idx + for i in range(idx - 1, -1, -1): + if signal[i] < signal[i + 1]: + break + left_shoulder = i + # 右の肩を探す + right_shoulder = idx + for i in range(idx + 1, n): + if signal[i] < signal[i - 1]: + break + right_shoulder = i + + # 谷の深度(肩の平均 - 谷底) + shoulder_avg = ( + signal[left_shoulder] + signal[right_shoulder] + ) / 2.0 + depth = shoulder_avg - val + if depth < min_depth: + continue + + # 谷の幅 + width = right_shoulder - left_shoulder + center_x = (left_shoulder + right_shoulder) / 2.0 + + # 幅フィルタ + if expected_width > 0: + max_w = expected_width * width_tolerance + min_w = expected_width / width_tolerance + if width > max_w or width < min_w: + continue + + # 予測との偏差チェック + if predicted_x is not None: + if abs(center_x - predicted_x) > max_deviation: + continue + + # スコア: 深度優先,予測がある場合は近さも考慮 + score = float(depth) + if predicted_x is not None: + dist = abs(center_x - predicted_x) + score += max(0.0, max_deviation - dist) + + if score > best_score: + best_score = score + best = (center_x, float(depth)) + + return best + + +def _build_valley_binary( + shape: tuple[int, int], + centers_y: list[int], + centers_x: list[float], +) -> np.ndarray: + """谷検出結果からデバッグ用二値画像を生成する + + Args: + shape: 出力画像の (高さ, 幅) + centers_y: 検出行の y 座標リスト + centers_x: 検出行の中心 x 座標リスト + + Returns: + デバッグ用二値画像 + """ + binary = np.zeros(shape, dtype=np.uint8) + half_w = 3 + w = shape[1] + for y, cx in zip(centers_y, centers_x): + x0 = max(0, int(cx) - half_w) + x1 = min(w, int(cx) + half_w + 1) + binary[y, x0:x1] = 255 + return binary + + +def detect_valley( + frame: np.ndarray, params: ImageParams, +) -> LineDetectResult: + """案D: 谷検出+追跡型""" + h, w = frame.shape[:2] + + # 行ごとにガウシアン平滑化するため画像全体をブラー + gauss_k = params.valley_gauss_ksize | 1 + blurred = cv2.GaussianBlur( + frame, (gauss_k, 1), 0, + ) + + # 透視補正の期待幅を計算するための準備 + use_width = ( + params.width_near > 0 and params.width_far > 0 + ) + detect_h = DETECT_Y_END - DETECT_Y_START + denom = max(detect_h - 1, 1) + + centers_y: list[int] = [] + centers_x: list[float] = [] + depths: list[float] = [] + + for y in range(DETECT_Y_START, DETECT_Y_END): + row = blurred[y] + + # 期待幅の計算 + if use_width: + t = (DETECT_Y_END - 1 - y) / denom + expected_w = float(params.width_far) + ( + float(params.width_near) + - float(params.width_far) + ) * t + else: + expected_w = 0.0 + + # 予測 x 座標 + predicted_x = _valley_tracker.predict_x( + float(y), + ) + + result = _find_row_valley( + row, + params.valley_min_depth, + expected_w, + params.width_tolerance, + predicted_x, + params.valley_max_deviation, + ) + if result is not None: + centers_y.append(y) + centers_x.append(result[0]) + depths.append(result[1]) + + # デバッグ用二値画像 + debug_binary = _build_valley_binary( + (h, w), centers_y, centers_x, + ) + + if len(centers_y) < MIN_FIT_ROWS: + # 検出失敗 → コースティングを試みる + coasted = _valley_tracker.coast( + params.valley_coast_frames, + ) + if coasted is not None: + coasted.binary_image = debug_binary + return coasted + return no_detection(debug_binary) + + cy = np.array(centers_y, dtype=np.float64) + cx = np.array(centers_x, dtype=np.float64) + w_arr = np.array(depths, dtype=np.float64) + + # ロバストフィッティング(深度を重みに使用) + coeffs = clean_and_fit( + cy, cx, + median_ksize=params.median_ksize, + neighbor_thresh=params.neighbor_thresh, + residual_thresh=params.residual_thresh, + weights=w_arr, + ransac_thresh=params.ransac_thresh, + ransac_iter=params.ransac_iter, + ) + if coeffs is None: + coasted = _valley_tracker.coast( + params.valley_coast_frames, + ) + if coasted is not None: + coasted.binary_image = debug_binary + return coasted + return no_detection(debug_binary) + + # EMA で平滑化 + smoothed = _valley_tracker.update( + coeffs, params.valley_ema_alpha, + ) + + return build_result(smoothed, debug_binary) diff --git a/src/pc/vision/fitting.py b/src/pc/vision/fitting.py index 298a241..34813d5 100644 --- a/src/pc/vision/fitting.py +++ b/src/pc/vision/fitting.py @@ -1,10 +1,21 @@ """ fitting 直線・曲線近似の共通ユーティリティモジュール +Theil-Sen 推定,RANSAC,外れ値除去付きフィッティングを提供する """ import numpy as np +# フィッティングに必要な最小行数 +MIN_FIT_ROWS: int = 10 + +# 近傍外れ値除去の設定 +NEIGHBOR_HALF_WINDOW: int = 3 +NEIGHBOR_FILTER_PASSES: int = 3 + +# 残差ベース反復除去の最大回数 +RESIDUAL_REMOVAL_ITERATIONS: int = 5 + def theil_sen_fit( y: np.ndarray, @@ -35,3 +46,164 @@ slope = float(np.median(slopes)) intercept = float(np.median(x - slope * y)) return slope, intercept + + +def ransac_polyfit( + ys: np.ndarray, xs: np.ndarray, + degree: int, n_iter: int, thresh: float, +) -> np.ndarray | None: + """RANSAC で外れ値を除去して多項式フィッティング + + Args: + ys: y 座標配列 + xs: x 座標配列 + degree: 多項式の次数 + n_iter: 反復回数 + thresh: 外れ値判定閾値(ピクセル) + + Returns: + 多項式係数(フィッティング失敗時は None) + """ + n = len(ys) + sample_size = degree + 1 + if n < sample_size: + return None + + best_coeffs: np.ndarray | None = None + best_inliers = 0 + rng = np.random.default_rng() + + for _ in range(n_iter): + idx = rng.choice(n, sample_size, replace=False) + coeffs = np.polyfit(ys[idx], xs[idx], degree) + poly = np.poly1d(coeffs) + residuals = np.abs(xs - poly(ys)) + n_inliers = int(np.sum(residuals < thresh)) + if n_inliers > best_inliers: + best_inliers = n_inliers + best_coeffs = coeffs + + # インライアで再フィッティング + if best_coeffs is not None: + poly = np.poly1d(best_coeffs) + inlier_mask = np.abs(xs - poly(ys)) < thresh + if np.sum(inlier_mask) >= sample_size: + best_coeffs = np.polyfit( + ys[inlier_mask], + xs[inlier_mask], + degree, + ) + + return best_coeffs + + +def clean_and_fit( + cy: np.ndarray, + cx: np.ndarray, + median_ksize: int, + neighbor_thresh: float, + residual_thresh: float = 0.0, + weights: np.ndarray | None = None, + ransac_thresh: float = 0.0, + ransac_iter: int = 0, +) -> np.ndarray | None: + """外れ値除去+重み付きフィッティングを行う + + 全検出手法で共通に使えるロバストなフィッティング + (1) 移動メディアンフィルタでスパイクを平滑化 + (2) 近傍中央値からの偏差で外れ値を除去(複数パス) + (3) 重み付き最小二乗(または RANSAC)でフィッティング + (4) 残差ベースの反復除去で外れ値を最終除去 + + Args: + cy: 中心点の y 座標配列 + cx: 中心点の x 座標配列 + median_ksize: 移動メディアンのカーネルサイズ(0 で無効) + neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効) + residual_thresh: 残差除去の閾値 px(0 で無効) + weights: 各点の信頼度(None で均等) + ransac_thresh: RANSAC 閾値(0 以下で無効) + ransac_iter: RANSAC 反復回数 + + Returns: + 多項式係数(フィッティング失敗時は None) + """ + if len(cy) < MIN_FIT_ROWS: + return None + + cx_clean = cx.copy() + mask = np.ones(len(cy), dtype=bool) + + # (1) 移動メディアンフィルタ + if median_ksize >= 3: + k = median_ksize | 1 + half = k // 2 + for i in range(len(cx_clean)): + lo = max(0, i - half) + hi = min(len(cx_clean), i + half + 1) + cx_clean[i] = float(np.median(cx[lo:hi])) + + # (2) 近傍外れ値除去(複数パス) + if neighbor_thresh > 0: + half_n = NEIGHBOR_HALF_WINDOW + for _ in range(NEIGHBOR_FILTER_PASSES): + new_mask = np.ones(len(cx_clean), dtype=bool) + for i in range(len(cx_clean)): + if not mask[i]: + continue + lo = max(0, i - half_n) + hi = min(len(cx_clean), i + half_n + 1) + neighbors = cx_clean[lo:hi][mask[lo:hi]] + if len(neighbors) == 0: + new_mask[i] = False + continue + local_med = float(np.median(neighbors)) + if abs(cx_clean[i] - local_med) > neighbor_thresh: + new_mask[i] = False + if np.array_equal(mask, mask & new_mask): + break + mask = mask & new_mask + + cy = cy[mask] + cx_clean = cx_clean[mask] + if weights is not None: + weights = weights[mask] + + if len(cy) < MIN_FIT_ROWS: + return None + + # (3) フィッティング + if ransac_thresh > 0 and ransac_iter > 0: + coeffs = ransac_polyfit( + cy, cx_clean, 2, ransac_iter, ransac_thresh, + ) + elif weights is not None: + coeffs = np.polyfit(cy, cx_clean, 2, w=weights) + else: + coeffs = np.polyfit(cy, cx_clean, 2) + + if coeffs is None: + return None + + # (4) 残差ベースの反復除去 + if residual_thresh > 0: + for _ in range(RESIDUAL_REMOVAL_ITERATIONS): + poly = np.poly1d(coeffs) + residuals = np.abs(cx_clean - poly(cy)) + inlier = residuals < residual_thresh + if np.all(inlier): + break + if np.sum(inlier) < MIN_FIT_ROWS: + break + cy = cy[inlier] + cx_clean = cx_clean[inlier] + if weights is not None: + weights = weights[inlier] + if weights is not None: + coeffs = np.polyfit( + cy, cx_clean, 2, w=weights, + ) + else: + coeffs = np.polyfit(cy, cx_clean, 2) + + return coeffs diff --git a/src/pc/vision/line_detector.py b/src/pc/vision/line_detector.py index 0bd2eac..09115bf 100644 --- a/src/pc/vision/line_detector.py +++ b/src/pc/vision/line_detector.py @@ -2,6 +2,10 @@ line_detector カメラ画像から黒線の位置を検出するモジュール 複数の検出手法を切り替えて使用できる + +公開 API: + ImageParams, LineDetectResult, detect_line, + reset_valley_tracker, DETECT_METHODS """ from dataclasses import dataclass @@ -10,6 +14,7 @@ import numpy as np from common import config +from pc.vision.fitting import clean_and_fit # 検出領域の y 範囲(画像全体) DETECT_Y_START: int = 0 @@ -19,13 +24,6 @@ MIN_FIT_PIXELS: int = 50 MIN_FIT_ROWS: int = 10 -# 近傍外れ値除去の設定 -NEIGHBOR_HALF_WINDOW: int = 3 -NEIGHBOR_FILTER_PASSES: int = 3 - -# 残差ベース反復除去の最大回数 -RESIDUAL_REMOVAL_ITERATIONS: int = 5 - # 検出手法の定義(キー: 識別子,値: 表示名) DETECT_METHODS: dict[str, str] = { "current": "現行(CLAHE + 固定閾値)", @@ -154,6 +152,9 @@ binary_image: np.ndarray | None +# ── 公開 API ────────────────────────────────────── + + def detect_line( frame: np.ndarray, params: ImageParams | None = None, @@ -174,404 +175,136 @@ method = params.method if method == "blackhat": - return _detect_blackhat(frame, params) + from pc.vision.detectors.blackhat import ( + detect_blackhat, + ) + return detect_blackhat(frame, params) if method == "dual_norm": - return _detect_dual_norm(frame, params) + from pc.vision.detectors.dual_norm import ( + detect_dual_norm, + ) + return detect_dual_norm(frame, params) if method == "robust": - return _detect_robust(frame, params) + from pc.vision.detectors.robust import ( + detect_robust, + ) + return detect_robust(frame, params) if method == "valley": - return _detect_valley(frame, params) - return _detect_current(frame, params) - - -# ── 検出手法の実装 ───────────────────────────── - - -def _detect_current( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング""" - # CLAHE でコントラスト強調 - clahe = cv2.createCLAHE( - clipLimit=params.clahe_clip, - tileGridSize=( - params.clahe_grid, - params.clahe_grid, - ), - ) - enhanced = clahe.apply(frame) - - # ガウシアンブラー - blur_k = params.blur_size | 1 - blurred = cv2.GaussianBlur( - enhanced, (blur_k, blur_k), 0, - ) - - # 固定閾値で二値化(黒線を白に反転) - _, binary = cv2.threshold( - blurred, params.binary_thresh, 255, - cv2.THRESH_BINARY_INV, - ) - - # オープニング(孤立ノイズ除去) - if params.open_size >= 3: - open_k = params.open_size | 1 - open_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (open_k, open_k), + from pc.vision.detectors.valley import ( + detect_valley, ) - binary = cv2.morphologyEx( - binary, cv2.MORPH_OPEN, open_kernel, - ) + return detect_valley(frame, params) - # 横方向クロージング(途切れ補間) - if params.close_width >= 3: - close_h = max(params.close_height | 1, 1) - close_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, - (params.close_width, close_h), - ) - binary = cv2.morphologyEx( - binary, cv2.MORPH_CLOSE, close_kernel, - ) - - # 全ピクセルフィッティング(従来方式) - return _fit_all_pixels(binary) - - -def _detect_blackhat( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案A: Black-hat 中心型 - - Black-hat 変換で背景より暗い構造を直接抽出し, - 固定閾値 + 距離変換 + 行ごと中心抽出で検出する - """ - # Black-hat 変換(暗い構造の抽出) - bh_k = params.blackhat_ksize | 1 - bh_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (bh_k, bh_k), + from pc.vision.detectors.current import ( + detect_current, ) - blackhat = cv2.morphologyEx( - frame, cv2.MORPH_BLACKHAT, bh_kernel, + return detect_current(frame, params) + + +def reset_valley_tracker() -> None: + """谷検出の追跡状態をリセットする""" + from pc.vision.detectors.valley import ( + reset_valley_tracker as _reset, ) - - # ガウシアンブラー - blur_k = params.blur_size | 1 - blurred = cv2.GaussianBlur( - blackhat, (blur_k, blur_k), 0, - ) - - # 固定閾値(Black-hat 後は線が白) - _, binary = cv2.threshold( - blurred, params.binary_thresh, 255, - cv2.THRESH_BINARY, - ) - - # 等方クロージング + 距離変換マスク + 幅フィルタ - binary = _apply_iso_closing( - binary, params.iso_close_size, - ) - binary = _apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = _apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中心抽出 + フィッティング - return _fit_row_centers( - binary, params.min_line_width, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) + _reset() -def _detect_dual_norm( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案B: 二重正規化型 - - 背景除算で照明勾配を除去し, - 適応的閾値で局所ムラにも対応する二重防壁構成 - """ - # 背景除算正規化 - bg_k = params.bg_blur_ksize | 1 - bg = cv2.GaussianBlur( - frame, (bg_k, bg_k), 0, - ) - normalized = ( - frame.astype(np.float32) * 255.0 - / (bg.astype(np.float32) + 1.0) - ) - normalized = np.clip( - normalized, 0, 255, - ).astype(np.uint8) - - # 適応的閾値(ガウシアン,BINARY_INV) - block = max(params.adaptive_block | 1, 3) - binary = cv2.adaptiveThreshold( - normalized, 255, - cv2.ADAPTIVE_THRESH_GAUSSIAN_C, - cv2.THRESH_BINARY_INV, - block, params.adaptive_c, - ) - - # 固定閾値との AND(有効時のみ) - if params.global_thresh > 0: - _, global_mask = cv2.threshold( - normalized, params.global_thresh, - 255, cv2.THRESH_BINARY_INV, - ) - binary = cv2.bitwise_and(binary, global_mask) - - # 段階クロージング or 等方クロージング - if params.stage_min_area > 0: - binary = _apply_staged_closing( - binary, - params.stage_close_small, - params.stage_min_area, - params.stage_close_large, - ) - else: - binary = _apply_iso_closing( - binary, params.iso_close_size, - ) - - # 距離変換マスク + 幅フィルタ - binary = _apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = _apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中心抽出 + フィッティング - return _fit_row_centers( - binary, params.min_line_width, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) +# ── 共通結果構築(各検出器から使用) ────────────── -def _detect_robust( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案C: 最高ロバスト型 - - Black-hat + 適応的閾値の二重正規化に加え, - RANSAC で外れ値を除去する最もロバストな構成 - """ - # Black-hat 変換 - bh_k = params.blackhat_ksize | 1 - bh_kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (bh_k, bh_k), - ) - blackhat = cv2.morphologyEx( - frame, cv2.MORPH_BLACKHAT, bh_kernel, - ) - - # 適応的閾値(BINARY: Black-hat 後は線が白) - block = max(params.adaptive_block | 1, 3) - binary = cv2.adaptiveThreshold( - blackhat, 255, - cv2.ADAPTIVE_THRESH_GAUSSIAN_C, - cv2.THRESH_BINARY, - block, -params.adaptive_c, - ) - - # 等方クロージング + 距離変換マスク + 幅フィルタ - binary = _apply_iso_closing( - binary, params.iso_close_size, - ) - binary = _apply_dist_mask( - binary, params.dist_thresh, - ) - if params.width_near > 0 and params.width_far > 0: - binary = _apply_width_filter( - binary, - params.width_near, - params.width_far, - params.width_tolerance, - ) - - # 行ごと中央値抽出 + RANSAC フィッティング - return _fit_row_centers( - binary, params.min_line_width, - use_median=True, - ransac_thresh=params.ransac_thresh, - ransac_iter=params.ransac_iter, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - ) - - -# ── 共通処理 ─────────────────────────────────── - - -def _apply_iso_closing( - binary: np.ndarray, size: int, -) -> np.ndarray: - """等方クロージングで穴を埋める - - Args: - binary: 二値画像 - size: カーネルサイズ - - Returns: - クロージング後の二値画像 - """ - if size < 3: - return binary - k = size | 1 - kernel = cv2.getStructuringElement( - cv2.MORPH_ELLIPSE, (k, k), - ) - return cv2.morphologyEx( - binary, cv2.MORPH_CLOSE, kernel, - ) - - -def _apply_staged_closing( - binary: np.ndarray, - small_size: int, - min_area: int, - large_size: int, -) -> np.ndarray: - """段階クロージング: 小穴埋め → 孤立除去 → 大穴埋め - - Args: - binary: 二値画像 - small_size: 第1段クロージングのカーネルサイズ - min_area: 孤立領域除去の最小面積(0 で無効) - large_size: 第2段クロージングのカーネルサイズ(0 で無効) - - Returns: - 処理後の二値画像 - """ - # 第1段: 小さいクロージングで近接ピクセルをつなぐ - result = _apply_iso_closing(binary, small_size) - - # 孤立領域の除去 - if min_area > 0: - contours, _ = cv2.findContours( - result, cv2.RETR_EXTERNAL, - cv2.CHAIN_APPROX_SIMPLE, - ) - mask = np.zeros_like(result) - for cnt in contours: - if cv2.contourArea(cnt) >= min_area: - cv2.drawContours( - mask, [cnt], -1, 255, -1, - ) - result = mask - - # 第2段: 大きいクロージングで中抜けを埋める - result = _apply_iso_closing(result, large_size) - - return result - - -def _apply_width_filter( - binary: np.ndarray, - width_near: int, - width_far: int, - tolerance: float, -) -> np.ndarray: - """透視補正付き幅フィルタで広がりすぎた行を除外する - - 各行の期待線幅を線形補間で算出し, - 実際の幅が上限(期待幅 × tolerance)を超える行をマスクする - - Args: - binary: 二値画像 - width_near: 画像下端での期待線幅(px) - width_far: 画像上端での期待線幅(px) - tolerance: 上限倍率 - - Returns: - 幅フィルタ適用後の二値画像 - """ - result = binary.copy() - h = binary.shape[0] - denom = max(h - 1, 1) - - for y_local in range(h): - xs = np.where(binary[y_local] > 0)[0] - if len(xs) == 0: - continue - # 画像下端(近距離)ほど t=1,上端(遠距離)ほど t=0 - t = (h - 1 - y_local) / denom - expected = float(width_far) + ( - float(width_near) - float(width_far) - ) * t - max_w = expected * tolerance - actual_w = int(xs[-1]) - int(xs[0]) + 1 - if actual_w > max_w: - result[y_local] = 0 - - return result - - -def _apply_dist_mask( - binary: np.ndarray, thresh: float, -) -> np.ndarray: - """距離変換で中心部のみを残す - - Args: - binary: 二値画像 - thresh: 距離の閾値(ピクセル) - - Returns: - 中心部のみの二値画像 - """ - if thresh <= 0: - return binary - dist = cv2.distanceTransform( - binary, cv2.DIST_L2, 5, - ) - _, mask = cv2.threshold( - dist, thresh, 255, cv2.THRESH_BINARY, - ) - return mask.astype(np.uint8) - - -def _fit_all_pixels( +def no_detection( binary: np.ndarray, ) -> LineDetectResult: - """全白ピクセルに多項式をフィッティングする + """未検出の結果を返す""" + return LineDetectResult( + detected=False, + position_error=0.0, + heading=0.0, + curvature=0.0, + poly_coeffs=None, + row_centers=None, + binary_image=binary, + ) - 従来方式.全ピクセルを等しく扱うため, - 陰で幅が広がった行がフィッティングを支配する弱点がある + +def _extract_row_centers( + binary: np.ndarray, +) -> np.ndarray | None: + """二値画像の最大連結領域から各行の線中心を求める Args: binary: 二値画像 Returns: - 線検出の結果 + 各行の中心 x 座標(NaN=その行に線なし), + 最大領域が見つからない場合は None """ - region = binary[DETECT_Y_START:DETECT_Y_END, :] - ys_local, xs = np.where(region > 0) + h, w = binary.shape[:2] + num_labels, labels, stats, _ = ( + cv2.connectedComponentsWithStats(binary) + ) - if len(xs) < MIN_FIT_PIXELS: - return _no_detection(binary) + if num_labels <= 1: + return None - ys = ys_local + DETECT_Y_START - coeffs = np.polyfit(ys, xs, 2) - return _build_result(coeffs, binary) + # 背景(ラベル 0)を除いた最大領域を取得 + areas = stats[1:, cv2.CC_STAT_AREA] + largest_label = int(np.argmax(areas)) + 1 + + # 最大領域のマスク + mask = (labels == largest_label).astype(np.uint8) + + # 各行の左右端から中心を計算 + centers = np.full(h, np.nan) + for y in range(h): + row = mask[y] + cols = np.where(row > 0)[0] + if len(cols) > 0: + centers[y] = (cols[0] + cols[-1]) / 2.0 + + return centers -def _fit_row_centers( +def build_result( + coeffs: np.ndarray, + binary: np.ndarray, + row_centers: np.ndarray | None = None, +) -> LineDetectResult: + """多項式係数から LineDetectResult を構築する + + row_centers が None の場合は binary から自動抽出する + """ + poly = np.poly1d(coeffs) + center_x = config.FRAME_WIDTH / 2.0 + + # 画像下端での位置偏差 + x_bottom = poly(DETECT_Y_END) + position_error = (center_x - x_bottom) / center_x + + # 傾き: dx/dy(画像下端での値) + poly_deriv = poly.deriv() + heading = float(poly_deriv(DETECT_Y_END)) + + # 曲率: d²x/dy² + poly_deriv2 = poly_deriv.deriv() + curvature = float(poly_deriv2(DETECT_Y_END)) + + # row_centers が未提供なら binary から抽出 + if row_centers is None: + row_centers = _extract_row_centers(binary) + + return LineDetectResult( + detected=True, + position_error=position_error, + heading=heading, + curvature=curvature, + poly_coeffs=coeffs, + row_centers=row_centers, + binary_image=binary, + ) + + +def fit_row_centers( binary: np.ndarray, min_width: int, use_median: bool = False, @@ -616,12 +349,12 @@ centers_x.append(float(np.mean(xs))) if len(centers_y) < MIN_FIT_ROWS: - return _no_detection(binary) + return no_detection(binary) cy = np.array(centers_y) cx = np.array(centers_x) - coeffs = _clean_and_fit( + coeffs = clean_and_fit( cy, cx, median_ksize=median_ksize, neighbor_thresh=neighbor_thresh, @@ -630,573 +363,6 @@ ransac_iter=ransac_iter, ) if coeffs is None: - return _no_detection(binary) + return no_detection(binary) - return _build_result(coeffs, binary) - - -def _ransac_polyfit( - ys: np.ndarray, xs: np.ndarray, - degree: int, n_iter: int, thresh: float, -) -> np.ndarray | None: - """RANSAC で外れ値を除去して多項式フィッティング - - Args: - ys: y 座標配列 - xs: x 座標配列 - degree: 多項式の次数 - n_iter: 反復回数 - thresh: 外れ値判定閾値(ピクセル) - - Returns: - 多項式係数(フィッティング失敗時は None) - """ - n = len(ys) - sample_size = degree + 1 - if n < sample_size: - return None - - best_coeffs: np.ndarray | None = None - best_inliers = 0 - rng = np.random.default_rng() - - for _ in range(n_iter): - idx = rng.choice(n, sample_size, replace=False) - coeffs = np.polyfit(ys[idx], xs[idx], degree) - poly = np.poly1d(coeffs) - residuals = np.abs(xs - poly(ys)) - n_inliers = int(np.sum(residuals < thresh)) - if n_inliers > best_inliers: - best_inliers = n_inliers - best_coeffs = coeffs - - # インライアで再フィッティング - if best_coeffs is not None: - poly = np.poly1d(best_coeffs) - inlier_mask = np.abs(xs - poly(ys)) < thresh - if np.sum(inlier_mask) >= sample_size: - best_coeffs = np.polyfit( - ys[inlier_mask], - xs[inlier_mask], - degree, - ) - - return best_coeffs - - -def _clean_and_fit( - cy: np.ndarray, - cx: np.ndarray, - median_ksize: int, - neighbor_thresh: float, - residual_thresh: float = 0.0, - weights: np.ndarray | None = None, - ransac_thresh: float = 0.0, - ransac_iter: int = 0, -) -> np.ndarray | None: - """外れ値除去+重み付きフィッティングを行う - - 全検出手法で共通に使えるロバストなフィッティング. - (1) 移動メディアンフィルタでスパイクを平滑化 - (2) 近傍中央値からの偏差で外れ値を除去(複数パス) - (3) 重み付き最小二乗(または RANSAC)でフィッティング - (4) 残差ベースの反復除去で外れ値を最終除去 - - Args: - cy: 中心点の y 座標配列 - cx: 中心点の x 座標配列 - median_ksize: 移動メディアンのカーネルサイズ(0 で無効) - neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効) - residual_thresh: 残差除去の閾値 px(0 で無効) - weights: 各点の信頼度(None で均等) - ransac_thresh: RANSAC 閾値(0 以下で無効) - ransac_iter: RANSAC 反復回数 - - Returns: - 多項式係数(フィッティング失敗時は None) - """ - if len(cy) < MIN_FIT_ROWS: - return None - - cx_clean = cx.copy() - mask = np.ones(len(cy), dtype=bool) - - # (1) 移動メディアンフィルタ - if median_ksize >= 3: - k = median_ksize | 1 - half = k // 2 - for i in range(len(cx_clean)): - lo = max(0, i - half) - hi = min(len(cx_clean), i + half + 1) - cx_clean[i] = float(np.median(cx[lo:hi])) - - # (2) 近傍外れ値除去(複数パス) - if neighbor_thresh > 0: - half_n = NEIGHBOR_HALF_WINDOW - for _ in range(NEIGHBOR_FILTER_PASSES): - new_mask = np.ones(len(cx_clean), dtype=bool) - for i in range(len(cx_clean)): - if not mask[i]: - continue - lo = max(0, i - half_n) - hi = min(len(cx_clean), i + half_n + 1) - neighbors = cx_clean[lo:hi][mask[lo:hi]] - if len(neighbors) == 0: - new_mask[i] = False - continue - local_med = float(np.median(neighbors)) - if abs(cx_clean[i] - local_med) > neighbor_thresh: - new_mask[i] = False - if np.array_equal(mask, mask & new_mask): - break - mask = mask & new_mask - - cy = cy[mask] - cx_clean = cx_clean[mask] - if weights is not None: - weights = weights[mask] - - if len(cy) < MIN_FIT_ROWS: - return None - - # (3) フィッティング - if ransac_thresh > 0 and ransac_iter > 0: - coeffs = _ransac_polyfit( - cy, cx_clean, 2, ransac_iter, ransac_thresh, - ) - elif weights is not None: - coeffs = np.polyfit(cy, cx_clean, 2, w=weights) - else: - coeffs = np.polyfit(cy, cx_clean, 2) - - if coeffs is None: - return None - - # (4) 残差ベースの反復除去 - if residual_thresh > 0: - for _ in range(RESIDUAL_REMOVAL_ITERATIONS): - poly = np.poly1d(coeffs) - residuals = np.abs(cx_clean - poly(cy)) - inlier = residuals < residual_thresh - if np.all(inlier): - break - if np.sum(inlier) < MIN_FIT_ROWS: - break - cy = cy[inlier] - cx_clean = cx_clean[inlier] - if weights is not None: - weights = weights[inlier] - if weights is not None: - coeffs = np.polyfit( - cy, cx_clean, 2, w=weights, - ) - else: - coeffs = np.polyfit(cy, cx_clean, 2) - - return coeffs - - -def _extract_row_centers( - binary: np.ndarray, -) -> np.ndarray | None: - """二値画像の最大連結領域から各行の線中心を求める - - Args: - binary: 二値画像 - - Returns: - 各行の中心 x 座標(NaN=その行に線なし), - 最大領域が見つからない場合は None - """ - h, w = binary.shape[:2] - num_labels, labels, stats, _ = ( - cv2.connectedComponentsWithStats(binary) - ) - - if num_labels <= 1: - return None - - # 背景(ラベル 0)を除いた最大領域を取得 - areas = stats[1:, cv2.CC_STAT_AREA] - largest_label = int(np.argmax(areas)) + 1 - - # 最大領域のマスク - mask = (labels == largest_label).astype(np.uint8) - - # 各行の左右端から中心を計算 - centers = np.full(h, np.nan) - for y in range(h): - row = mask[y] - cols = np.where(row > 0)[0] - if len(cols) > 0: - centers[y] = (cols[0] + cols[-1]) / 2.0 - - return centers - - -def _no_detection( - binary: np.ndarray, -) -> LineDetectResult: - """未検出の結果を返す""" - return LineDetectResult( - detected=False, - position_error=0.0, - heading=0.0, - curvature=0.0, - poly_coeffs=None, - row_centers=None, - binary_image=binary, - ) - - -def _build_result( - coeffs: np.ndarray, - binary: np.ndarray, - row_centers: np.ndarray | None = None, -) -> LineDetectResult: - """多項式係数から LineDetectResult を構築する - - row_centers が None の場合は binary から自動抽出する - """ - poly = np.poly1d(coeffs) - center_x = config.FRAME_WIDTH / 2.0 - - # 画像下端での位置偏差 - x_bottom = poly(DETECT_Y_END) - position_error = (center_x - x_bottom) / center_x - - # 傾き: dx/dy(画像下端での値) - poly_deriv = poly.deriv() - heading = float(poly_deriv(DETECT_Y_END)) - - # 曲率: d²x/dy² - poly_deriv2 = poly_deriv.deriv() - curvature = float(poly_deriv2(DETECT_Y_END)) - - # row_centers が未提供なら binary から抽出 - if row_centers is None: - row_centers = _extract_row_centers(binary) - - return LineDetectResult( - detected=True, - position_error=position_error, - heading=heading, - curvature=curvature, - poly_coeffs=coeffs, - row_centers=row_centers, - binary_image=binary, - ) - - -# ── 案D: 谷検出+追跡 ───────────────────────────── - - -class ValleyTracker: - """谷検出の時系列追跡を管理するクラス - - 前フレームの多項式係数を保持し,予測・平滑化・ - 検出失敗時のコースティングを提供する - """ - - def __init__(self) -> None: - self._prev_coeffs: np.ndarray | None = None - self._smoothed_coeffs: np.ndarray | None = None - self._frames_lost: int = 0 - - def predict_x(self, y: float) -> float | None: - """前フレームの多項式から x 座標を予測する - - Args: - y: 画像の y 座標 - - Returns: - 予測 x 座標(履歴なしの場合は None) - """ - if self._smoothed_coeffs is None: - return None - return float(np.poly1d(self._smoothed_coeffs)(y)) - - def update( - self, - coeffs: np.ndarray, - alpha: float, - ) -> np.ndarray: - """検出成功時に状態を更新する - - EMA で多項式係数を平滑化し,更新後の係数を返す - - Args: - coeffs: 今フレームのフィッティング係数 - alpha: EMA 係数(1.0 で平滑化なし) - - Returns: - 平滑化後の多項式係数 - """ - self._frames_lost = 0 - if self._smoothed_coeffs is None: - self._smoothed_coeffs = coeffs.copy() - else: - self._smoothed_coeffs = ( - alpha * coeffs - + (1.0 - alpha) * self._smoothed_coeffs - ) - self._prev_coeffs = self._smoothed_coeffs.copy() - return self._smoothed_coeffs - - def coast( - self, max_frames: int, - ) -> LineDetectResult | None: - """検出失敗時に予測結果を返す - - Args: - max_frames: 予測を継続する最大フレーム数 - - Returns: - 予測による結果(継続不可の場合は None) - """ - if self._smoothed_coeffs is None: - return None - self._frames_lost += 1 - if self._frames_lost > max_frames: - return None - # 予測でデバッグ用二値画像は空にする - h = config.FRAME_HEIGHT - w = config.FRAME_WIDTH - blank = np.zeros((h, w), dtype=np.uint8) - return _build_result(self._smoothed_coeffs, blank) - - def reset(self) -> None: - """追跡状態をリセットする""" - self._prev_coeffs = None - self._smoothed_coeffs = None - self._frames_lost = 0 - - -_valley_tracker = ValleyTracker() - - -def reset_valley_tracker() -> None: - """谷検出の追跡状態をリセットする""" - _valley_tracker.reset() - - -def _find_row_valley( - row: np.ndarray, - min_depth: int, - expected_width: float, - width_tolerance: float, - predicted_x: float | None, - max_deviation: int, -) -> tuple[float, float] | None: - """1行の輝度信号から最適な谷を検出する - - Args: - row: スムージング済みの1行輝度信号 - min_depth: 最小谷深度 - expected_width: 期待線幅(px,0 で幅フィルタ無効) - width_tolerance: 幅フィルタの上限倍率 - predicted_x: 追跡による予測 x 座標(None で無効) - max_deviation: 予測からの最大許容偏差 - - Returns: - (谷の中心x, 谷の深度) または None - """ - n = len(row) - if n < 5: - return None - - signal = row.astype(np.float32) - - # 極小値を検出(前後より小さい点) - left = signal[:-2] - center = signal[1:-1] - right = signal[2:] - minima_mask = (center <= left) & (center <= right) - minima_indices = np.where(minima_mask)[0] + 1 - - if len(minima_indices) == 0: - return None - - best: tuple[float, float] | None = None - best_score = -1.0 - - for idx in minima_indices: - val = signal[idx] - - # 左の肩を探す - left_shoulder = idx - for i in range(idx - 1, -1, -1): - if signal[i] < signal[i + 1]: - break - left_shoulder = i - # 右の肩を探す - right_shoulder = idx - for i in range(idx + 1, n): - if signal[i] < signal[i - 1]: - break - right_shoulder = i - - # 谷の深度(肩の平均 - 谷底) - shoulder_avg = ( - signal[left_shoulder] + signal[right_shoulder] - ) / 2.0 - depth = shoulder_avg - val - if depth < min_depth: - continue - - # 谷の幅 - width = right_shoulder - left_shoulder - center_x = (left_shoulder + right_shoulder) / 2.0 - - # 幅フィルタ - if expected_width > 0: - max_w = expected_width * width_tolerance - min_w = expected_width / width_tolerance - if width > max_w or width < min_w: - continue - - # 予測との偏差チェック - if predicted_x is not None: - if abs(center_x - predicted_x) > max_deviation: - continue - - # スコア: 深度優先,予測がある場合は近さも考慮 - score = float(depth) - if predicted_x is not None: - dist = abs(center_x - predicted_x) - score += max(0.0, max_deviation - dist) - - if score > best_score: - best_score = score - best = (center_x, float(depth)) - - return best - - -def _build_valley_binary( - shape: tuple[int, int], - centers_y: list[int], - centers_x: list[float], -) -> np.ndarray: - """谷検出結果からデバッグ用二値画像を生成する - - Args: - shape: 出力画像の (高さ, 幅) - centers_y: 検出行の y 座標リスト - centers_x: 検出行の中心 x 座標リスト - - Returns: - デバッグ用二値画像 - """ - binary = np.zeros(shape, dtype=np.uint8) - half_w = 3 - w = shape[1] - for y, cx in zip(centers_y, centers_x): - x0 = max(0, int(cx) - half_w) - x1 = min(w, int(cx) + half_w + 1) - binary[y, x0:x1] = 255 - return binary - - -def _detect_valley( - frame: np.ndarray, params: ImageParams, -) -> LineDetectResult: - """案D: 谷検出+追跡型 - - 各行の輝度信号から谷(暗い領域)を直接検出し, - 時系列追跡で安定性を確保する.二値化を使用しない - """ - h, w = frame.shape[:2] - - # 行ごとにガウシアン平滑化するため画像全体をブラー - gauss_k = params.valley_gauss_ksize | 1 - blurred = cv2.GaussianBlur( - frame, (gauss_k, 1), 0, - ) - - # 透視補正の期待幅を計算するための準備 - use_width = ( - params.width_near > 0 and params.width_far > 0 - ) - detect_h = DETECT_Y_END - DETECT_Y_START - denom = max(detect_h - 1, 1) - - centers_y: list[int] = [] - centers_x: list[float] = [] - depths: list[float] = [] - - for y in range(DETECT_Y_START, DETECT_Y_END): - row = blurred[y] - - # 期待幅の計算 - if use_width: - t = (DETECT_Y_END - 1 - y) / denom - expected_w = float(params.width_far) + ( - float(params.width_near) - - float(params.width_far) - ) * t - else: - expected_w = 0.0 - - # 予測 x 座標 - predicted_x = _valley_tracker.predict_x( - float(y), - ) - - result = _find_row_valley( - row, - params.valley_min_depth, - expected_w, - params.width_tolerance, - predicted_x, - params.valley_max_deviation, - ) - if result is not None: - centers_y.append(y) - centers_x.append(result[0]) - depths.append(result[1]) - - # デバッグ用二値画像 - debug_binary = _build_valley_binary( - (h, w), centers_y, centers_x, - ) - - if len(centers_y) < MIN_FIT_ROWS: - # 検出失敗 → コースティングを試みる - coasted = _valley_tracker.coast( - params.valley_coast_frames, - ) - if coasted is not None: - coasted.binary_image = debug_binary - return coasted - return _no_detection(debug_binary) - - cy = np.array(centers_y, dtype=np.float64) - cx = np.array(centers_x, dtype=np.float64) - w_arr = np.array(depths, dtype=np.float64) - - # ロバストフィッティング(深度を重みに使用) - coeffs = _clean_and_fit( - cy, cx, - median_ksize=params.median_ksize, - neighbor_thresh=params.neighbor_thresh, - residual_thresh=params.residual_thresh, - weights=w_arr, - ransac_thresh=params.ransac_thresh, - ransac_iter=params.ransac_iter, - ) - if coeffs is None: - coasted = _valley_tracker.coast( - params.valley_coast_frames, - ) - if coasted is not None: - coasted.binary_image = debug_binary - return coasted - return _no_detection(debug_binary) - - # EMA で平滑化 - smoothed = _valley_tracker.update( - coeffs, params.valley_ema_alpha, - ) - - return _build_result(smoothed, debug_binary) + return build_result(coeffs, binary) diff --git a/src/pc/vision/morphology.py b/src/pc/vision/morphology.py new file mode 100644 index 0000000..9b7edd3 --- /dev/null +++ b/src/pc/vision/morphology.py @@ -0,0 +1,134 @@ +""" +morphology +二値画像の形態学的処理ユーティリティモジュール +""" + +import cv2 +import numpy as np + + +def apply_iso_closing( + binary: np.ndarray, size: int, +) -> np.ndarray: + """等方クロージングで穴を埋める + + Args: + binary: 二値画像 + size: カーネルサイズ + + Returns: + クロージング後の二値画像 + """ + if size < 3: + return binary + k = size | 1 + kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (k, k), + ) + return cv2.morphologyEx( + binary, cv2.MORPH_CLOSE, kernel, + ) + + +def apply_staged_closing( + binary: np.ndarray, + small_size: int, + min_area: int, + large_size: int, +) -> np.ndarray: + """段階クロージング: 小穴埋め → 孤立除去 → 大穴埋め + + Args: + binary: 二値画像 + small_size: 第1段クロージングのカーネルサイズ + min_area: 孤立領域除去の最小面積(0 で無効) + large_size: 第2段クロージングのカーネルサイズ(0 で無効) + + Returns: + 処理後の二値画像 + """ + # 第1段: 小さいクロージングで近接ピクセルをつなぐ + result = apply_iso_closing(binary, small_size) + + # 孤立領域の除去 + if min_area > 0: + contours, _ = cv2.findContours( + result, cv2.RETR_EXTERNAL, + cv2.CHAIN_APPROX_SIMPLE, + ) + mask = np.zeros_like(result) + for cnt in contours: + if cv2.contourArea(cnt) >= min_area: + cv2.drawContours( + mask, [cnt], -1, 255, -1, + ) + result = mask + + # 第2段: 大きいクロージングで中抜けを埋める + result = apply_iso_closing(result, large_size) + + return result + + +def apply_width_filter( + binary: np.ndarray, + width_near: int, + width_far: int, + tolerance: float, +) -> np.ndarray: + """透視補正付き幅フィルタで広がりすぎた行を除外する + + 各行の期待線幅を線形補間で算出し, + 実際の幅が上限(期待幅 × tolerance)を超える行をマスクする + + Args: + binary: 二値画像 + width_near: 画像下端での期待線幅(px) + width_far: 画像上端での期待線幅(px) + tolerance: 上限倍率 + + Returns: + 幅フィルタ適用後の二値画像 + """ + result = binary.copy() + h = binary.shape[0] + denom = max(h - 1, 1) + + for y_local in range(h): + xs = np.where(binary[y_local] > 0)[0] + if len(xs) == 0: + continue + # 画像下端(近距離)ほど t=1,上端(遠距離)ほど t=0 + t = (h - 1 - y_local) / denom + expected = float(width_far) + ( + float(width_near) - float(width_far) + ) * t + max_w = expected * tolerance + actual_w = int(xs[-1]) - int(xs[0]) + 1 + if actual_w > max_w: + result[y_local] = 0 + + return result + + +def apply_dist_mask( + binary: np.ndarray, thresh: float, +) -> np.ndarray: + """距離変換で中心部のみを残す + + Args: + binary: 二値画像 + thresh: 距離の閾値(ピクセル) + + Returns: + 中心部のみの二値画像 + """ + if thresh <= 0: + return binary + dist = cv2.distanceTransform( + binary, cv2.DIST_L2, 5, + ) + _, mask = cv2.threshold( + dist, thresh, 255, cv2.THRESH_BINARY, + ) + return mask.astype(np.uint8)