"""
line_detector
カメラ画像から黒線の位置を検出するモジュール
複数の検出手法を切り替えて使用できる
"""
from dataclasses import dataclass
import cv2
import numpy as np
from common import config
# 検出領域の y 範囲(画像全体)
DETECT_Y_START: int = 0
DETECT_Y_END: int = config.FRAME_HEIGHT
# フィッティングに必要な最小数
MIN_FIT_PIXELS: int = 50
MIN_FIT_ROWS: int = 10
# 検出手法の定義(キー: 識別子,値: 表示名)
DETECT_METHODS: dict[str, str] = {
"current": "現行(CLAHE + 固定閾値)",
"blackhat": "案A(Black-hat 中心)",
"dual_norm": "案B(二重正規化)",
"robust": "案C(最高ロバスト)",
"valley": "案D(谷検出+追跡)",
}
@dataclass
class ImageParams:
"""画像処理パラメータ
Attributes:
method: 検出手法の識別子
clahe_clip: CLAHE のコントラスト増幅上限
clahe_grid: CLAHE の局所領域分割数
blur_size: ガウシアンブラーのカーネルサイズ(奇数)
binary_thresh: 二値化の閾値
open_size: オープニングのカーネルサイズ
close_width: クロージングの横幅
close_height: クロージングの高さ
blackhat_ksize: Black-hat のカーネルサイズ
bg_blur_ksize: 背景除算のブラーカーネルサイズ
adaptive_block: 適応的閾値のブロックサイズ
adaptive_c: 適応的閾値の定数 C
iso_close_size: 等方クロージングのカーネルサイズ
dist_thresh: 距離変換の閾値
min_line_width: 行ごと中心抽出の最小線幅
ransac_thresh: RANSAC の外れ値判定閾値
ransac_iter: RANSAC の反復回数
width_near: 画像下端での期待線幅(px,0 で無効)
width_far: 画像上端での期待線幅(px,0 で無効)
width_tolerance: 幅フィルタの上限倍率
median_ksize: 中心点列の移動メディアンフィルタサイズ(0 で無効)
neighbor_thresh: 近傍外れ値除去の閾値(px,0 で無効)
residual_thresh: 残差反復除去の閾値(px,0 で無効)
valley_gauss_ksize: 谷検出の行ごとガウシアンカーネルサイズ
valley_min_depth: 谷として認識する最小深度
valley_max_deviation: 追跡予測からの最大許容偏差(px)
valley_coast_frames: 検出失敗時の予測継続フレーム数
valley_ema_alpha: 多項式係数の指数移動平均係数
"""
# 検出手法
method: str = "current"
# 現行手法パラメータ
clahe_clip: float = 2.0
clahe_grid: int = 8
blur_size: int = 5
binary_thresh: int = 80
open_size: int = 5
close_width: int = 25
close_height: int = 3
# 案A/C: Black-hat
blackhat_ksize: int = 45
# 案B: 背景除算
bg_blur_ksize: int = 101
# 案B/C: 適応的閾値
adaptive_block: int = 51
adaptive_c: int = 10
# 案A/B/C: 後処理
iso_close_size: int = 15
dist_thresh: float = 3.0
min_line_width: int = 3
# 案C: RANSAC
ransac_thresh: float = 5.0
ransac_iter: int = 50
# ロバストフィッティング(全手法共通)
median_ksize: int = 7
neighbor_thresh: float = 10.0
residual_thresh: float = 8.0
# 透視補正付き幅フィルタ(0 で無効)
width_near: int = 0
width_far: int = 0
width_tolerance: float = 1.8
# 案D: 谷検出+追跡
valley_gauss_ksize: int = 15
valley_min_depth: int = 15
valley_max_deviation: int = 40
valley_coast_frames: int = 3
valley_ema_alpha: float = 0.7
@dataclass
class LineDetectResult:
"""線検出の結果を格納するデータクラス
Attributes:
detected: 線が検出できたか
position_error: 画像下端での位置偏差(-1.0~+1.0)
heading: 線の傾き(dx/dy,画像下端での値)
curvature: 線の曲率(d²x/dy²)
poly_coeffs: 多項式の係数(描画用,未検出時は None)
binary_image: 二値化後の画像(デバッグ用)
"""
detected: bool
position_error: float
heading: float
curvature: float
poly_coeffs: np.ndarray | None
binary_image: np.ndarray | None
def detect_line(
frame: np.ndarray,
params: ImageParams | None = None,
) -> LineDetectResult:
"""画像から黒線の位置を検出する
params.method に応じて検出手法を切り替える
Args:
frame: グレースケールのカメラ画像
params: 画像処理パラメータ(None でデフォルト)
Returns:
線検出の結果
"""
if params is None:
params = ImageParams()
method = params.method
if method == "blackhat":
return _detect_blackhat(frame, params)
if method == "dual_norm":
return _detect_dual_norm(frame, params)
if method == "robust":
return _detect_robust(frame, params)
if method == "valley":
return _detect_valley(frame, params)
return _detect_current(frame, params)
# ── 検出手法の実装 ─────────────────────────────
def _detect_current(
frame: np.ndarray, params: ImageParams,
) -> LineDetectResult:
"""現行手法: CLAHE + 固定閾値 + 全ピクセルフィッティング"""
# CLAHE でコントラスト強調
clahe = cv2.createCLAHE(
clipLimit=params.clahe_clip,
tileGridSize=(
params.clahe_grid,
params.clahe_grid,
),
)
enhanced = clahe.apply(frame)
# ガウシアンブラー
blur_k = params.blur_size | 1
blurred = cv2.GaussianBlur(
enhanced, (blur_k, blur_k), 0,
)
# 固定閾値で二値化(黒線を白に反転)
_, binary = cv2.threshold(
blurred, params.binary_thresh, 255,
cv2.THRESH_BINARY_INV,
)
# オープニング(孤立ノイズ除去)
if params.open_size >= 3:
open_k = params.open_size | 1
open_kernel = cv2.getStructuringElement(
cv2.MORPH_ELLIPSE, (open_k, open_k),
)
binary = cv2.morphologyEx(
binary, cv2.MORPH_OPEN, open_kernel,
)
# 横方向クロージング(途切れ補間)
if params.close_width >= 3:
close_h = max(params.close_height | 1, 1)
close_kernel = cv2.getStructuringElement(
cv2.MORPH_ELLIPSE,
(params.close_width, close_h),
)
binary = cv2.morphologyEx(
binary, cv2.MORPH_CLOSE, close_kernel,
)
# 全ピクセルフィッティング(従来方式)
return _fit_all_pixels(binary)
def _detect_blackhat(
frame: np.ndarray, params: ImageParams,
) -> LineDetectResult:
"""案A: Black-hat 中心型
Black-hat 変換で背景より暗い構造を直接抽出し,
固定閾値 + 距離変換 + 行ごと中心抽出で検出する
"""
# Black-hat 変換(暗い構造の抽出)
bh_k = params.blackhat_ksize | 1
bh_kernel = cv2.getStructuringElement(
cv2.MORPH_ELLIPSE, (bh_k, bh_k),
)
blackhat = cv2.morphologyEx(
frame, cv2.MORPH_BLACKHAT, bh_kernel,
)
# ガウシアンブラー
blur_k = params.blur_size | 1
blurred = cv2.GaussianBlur(
blackhat, (blur_k, blur_k), 0,
)
# 固定閾値(Black-hat 後は線が白)
_, binary = cv2.threshold(
blurred, params.binary_thresh, 255,
cv2.THRESH_BINARY,
)
# 等方クロージング + 距離変換マスク + 幅フィルタ
binary = _apply_iso_closing(
binary, params.iso_close_size,
)
binary = _apply_dist_mask(
binary, params.dist_thresh,
)
if params.width_near > 0 and params.width_far > 0:
binary = _apply_width_filter(
binary,
params.width_near,
params.width_far,
params.width_tolerance,
)
# 行ごと中心抽出 + フィッティング
return _fit_row_centers(
binary, params.min_line_width,
median_ksize=params.median_ksize,
neighbor_thresh=params.neighbor_thresh,
residual_thresh=params.residual_thresh,
)
def _detect_dual_norm(
frame: np.ndarray, params: ImageParams,
) -> LineDetectResult:
"""案B: 二重正規化型
背景除算で照明勾配を除去し,
適応的閾値で局所ムラにも対応する二重防壁構成
"""
# 背景除算正規化
bg_k = params.bg_blur_ksize | 1
bg = cv2.GaussianBlur(
frame, (bg_k, bg_k), 0,
)
normalized = (
frame.astype(np.float32) * 255.0
/ (bg.astype(np.float32) + 1.0)
)
normalized = np.clip(
normalized, 0, 255,
).astype(np.uint8)
# 適応的閾値(ガウシアン,BINARY_INV)
block = max(params.adaptive_block | 1, 3)
binary = cv2.adaptiveThreshold(
normalized, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
block, params.adaptive_c,
)
# 等方クロージング + 距離変換マスク + 幅フィルタ
binary = _apply_iso_closing(
binary, params.iso_close_size,
)
binary = _apply_dist_mask(
binary, params.dist_thresh,
)
if params.width_near > 0 and params.width_far > 0:
binary = _apply_width_filter(
binary,
params.width_near,
params.width_far,
params.width_tolerance,
)
# 行ごと中心抽出 + フィッティング
return _fit_row_centers(
binary, params.min_line_width,
median_ksize=params.median_ksize,
neighbor_thresh=params.neighbor_thresh,
residual_thresh=params.residual_thresh,
)
def _detect_robust(
frame: np.ndarray, params: ImageParams,
) -> LineDetectResult:
"""案C: 最高ロバスト型
Black-hat + 適応的閾値の二重正規化に加え,
RANSAC で外れ値を除去する最もロバストな構成
"""
# Black-hat 変換
bh_k = params.blackhat_ksize | 1
bh_kernel = cv2.getStructuringElement(
cv2.MORPH_ELLIPSE, (bh_k, bh_k),
)
blackhat = cv2.morphologyEx(
frame, cv2.MORPH_BLACKHAT, bh_kernel,
)
# 適応的閾値(BINARY: Black-hat 後は線が白)
block = max(params.adaptive_block | 1, 3)
binary = cv2.adaptiveThreshold(
blackhat, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
block, -params.adaptive_c,
)
# 等方クロージング + 距離変換マスク + 幅フィルタ
binary = _apply_iso_closing(
binary, params.iso_close_size,
)
binary = _apply_dist_mask(
binary, params.dist_thresh,
)
if params.width_near > 0 and params.width_far > 0:
binary = _apply_width_filter(
binary,
params.width_near,
params.width_far,
params.width_tolerance,
)
# 行ごと中央値抽出 + RANSAC フィッティング
return _fit_row_centers(
binary, params.min_line_width,
use_median=True,
ransac_thresh=params.ransac_thresh,
ransac_iter=params.ransac_iter,
median_ksize=params.median_ksize,
neighbor_thresh=params.neighbor_thresh,
residual_thresh=params.residual_thresh,
)
# ── 共通処理 ───────────────────────────────────
def _apply_iso_closing(
binary: np.ndarray, size: int,
) -> np.ndarray:
"""等方クロージングで穴を埋める
Args:
binary: 二値画像
size: カーネルサイズ
Returns:
クロージング後の二値画像
"""
if size < 3:
return binary
k = size | 1
kernel = cv2.getStructuringElement(
cv2.MORPH_ELLIPSE, (k, k),
)
return cv2.morphologyEx(
binary, cv2.MORPH_CLOSE, kernel,
)
def _apply_width_filter(
binary: np.ndarray,
width_near: int,
width_far: int,
tolerance: float,
) -> np.ndarray:
"""透視補正付き幅フィルタで広がりすぎた行を除外する
各行の期待線幅を線形補間で算出し,
実際の幅が上限(期待幅 × tolerance)を超える行をマスクする
Args:
binary: 二値画像
width_near: 画像下端での期待線幅(px)
width_far: 画像上端での期待線幅(px)
tolerance: 上限倍率
Returns:
幅フィルタ適用後の二値画像
"""
result = binary.copy()
h = binary.shape[0]
denom = max(h - 1, 1)
for y_local in range(h):
xs = np.where(binary[y_local] > 0)[0]
if len(xs) == 0:
continue
# 画像下端(近距離)ほど t=1,上端(遠距離)ほど t=0
t = (h - 1 - y_local) / denom
expected = float(width_far) + (
float(width_near) - float(width_far)
) * t
max_w = expected * tolerance
actual_w = int(xs[-1]) - int(xs[0]) + 1
if actual_w > max_w:
result[y_local] = 0
return result
def _apply_dist_mask(
binary: np.ndarray, thresh: float,
) -> np.ndarray:
"""距離変換で中心部のみを残す
Args:
binary: 二値画像
thresh: 距離の閾値(ピクセル)
Returns:
中心部のみの二値画像
"""
if thresh <= 0:
return binary
dist = cv2.distanceTransform(
binary, cv2.DIST_L2, 5,
)
_, mask = cv2.threshold(
dist, thresh, 255, cv2.THRESH_BINARY,
)
return mask.astype(np.uint8)
def _fit_all_pixels(
binary: np.ndarray,
) -> LineDetectResult:
"""全白ピクセルに多項式をフィッティングする
従来方式.全ピクセルを等しく扱うため,
陰で幅が広がった行がフィッティングを支配する弱点がある
Args:
binary: 二値画像
Returns:
線検出の結果
"""
region = binary[DETECT_Y_START:DETECT_Y_END, :]
ys_local, xs = np.where(region > 0)
if len(xs) < MIN_FIT_PIXELS:
return _no_detection(binary)
ys = ys_local + DETECT_Y_START
coeffs = np.polyfit(ys, xs, 2)
return _build_result(coeffs, binary)
def _fit_row_centers(
binary: np.ndarray,
min_width: int,
use_median: bool = False,
ransac_thresh: float = 0.0,
ransac_iter: int = 0,
median_ksize: int = 0,
neighbor_thresh: float = 0.0,
residual_thresh: float = 0.0,
) -> LineDetectResult:
"""行ごとの中心点に多項式をフィッティングする
各行の白ピクセルの中心(平均または中央値)を1点抽出し,
ロバスト前処理の後にフィッティングする.
幅の変動に強く,各行が等しく寄与する
Args:
binary: 二値画像
min_width: 線として認識する最小ピクセル数
use_median: True の場合は中央値を使用
ransac_thresh: RANSAC 閾値(0 以下で無効)
ransac_iter: RANSAC 反復回数
median_ksize: 移動メディアンのカーネルサイズ
neighbor_thresh: 近傍外れ値除去の閾値 px
residual_thresh: 残差反復除去の閾値 px
Returns:
線検出の結果
"""
region = binary[DETECT_Y_START:DETECT_Y_END, :]
centers_y: list[float] = []
centers_x: list[float] = []
for y_local in range(region.shape[0]):
xs = np.where(region[y_local] > 0)[0]
if len(xs) < min_width:
continue
y = float(y_local + DETECT_Y_START)
centers_y.append(y)
if use_median:
centers_x.append(float(np.median(xs)))
else:
centers_x.append(float(np.mean(xs)))
if len(centers_y) < MIN_FIT_ROWS:
return _no_detection(binary)
cy = np.array(centers_y)
cx = np.array(centers_x)
coeffs = _clean_and_fit(
cy, cx,
median_ksize=median_ksize,
neighbor_thresh=neighbor_thresh,
residual_thresh=residual_thresh,
ransac_thresh=ransac_thresh,
ransac_iter=ransac_iter,
)
if coeffs is None:
return _no_detection(binary)
return _build_result(coeffs, binary)
def _ransac_polyfit(
ys: np.ndarray, xs: np.ndarray,
degree: int, n_iter: int, thresh: float,
) -> np.ndarray | None:
"""RANSAC で外れ値を除去して多項式フィッティング
Args:
ys: y 座標配列
xs: x 座標配列
degree: 多項式の次数
n_iter: 反復回数
thresh: 外れ値判定閾値(ピクセル)
Returns:
多項式係数(フィッティング失敗時は None)
"""
n = len(ys)
sample_size = degree + 1
if n < sample_size:
return None
best_coeffs: np.ndarray | None = None
best_inliers = 0
rng = np.random.default_rng()
for _ in range(n_iter):
idx = rng.choice(n, sample_size, replace=False)
coeffs = np.polyfit(ys[idx], xs[idx], degree)
poly = np.poly1d(coeffs)
residuals = np.abs(xs - poly(ys))
n_inliers = int(np.sum(residuals < thresh))
if n_inliers > best_inliers:
best_inliers = n_inliers
best_coeffs = coeffs
# インライアで再フィッティング
if best_coeffs is not None:
poly = np.poly1d(best_coeffs)
inlier_mask = np.abs(xs - poly(ys)) < thresh
if np.sum(inlier_mask) >= sample_size:
best_coeffs = np.polyfit(
ys[inlier_mask],
xs[inlier_mask],
degree,
)
return best_coeffs
def _clean_and_fit(
cy: np.ndarray,
cx: np.ndarray,
median_ksize: int,
neighbor_thresh: float,
residual_thresh: float = 0.0,
weights: np.ndarray | None = None,
ransac_thresh: float = 0.0,
ransac_iter: int = 0,
) -> np.ndarray | None:
"""外れ値除去+重み付きフィッティングを行う
全検出手法で共通に使えるロバストなフィッティング.
(1) 移動メディアンフィルタでスパイクを平滑化
(2) 近傍中央値からの偏差で外れ値を除去(複数パス)
(3) 重み付き最小二乗(または RANSAC)でフィッティング
(4) 残差ベースの反復除去で外れ値を最終除去
Args:
cy: 中心点の y 座標配列
cx: 中心点の x 座標配列
median_ksize: 移動メディアンのカーネルサイズ(0 で無効)
neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効)
residual_thresh: 残差除去の閾値 px(0 で無効)
weights: 各点の信頼度(None で均等)
ransac_thresh: RANSAC 閾値(0 以下で無効)
ransac_iter: RANSAC 反復回数
Returns:
多項式係数(フィッティング失敗時は None)
"""
if len(cy) < MIN_FIT_ROWS:
return None
cx_clean = cx.copy()
mask = np.ones(len(cy), dtype=bool)
# (1) 移動メディアンフィルタ
if median_ksize >= 3:
k = median_ksize | 1
half = k // 2
for i in range(len(cx_clean)):
lo = max(0, i - half)
hi = min(len(cx_clean), i + half + 1)
cx_clean[i] = float(np.median(cx[lo:hi]))
# (2) 近傍外れ値除去(複数パス)
if neighbor_thresh > 0:
half_n = 3
for _ in range(3):
new_mask = np.ones(len(cx_clean), dtype=bool)
for i in range(len(cx_clean)):
if not mask[i]:
continue
lo = max(0, i - half_n)
hi = min(len(cx_clean), i + half_n + 1)
neighbors = cx_clean[lo:hi][mask[lo:hi]]
if len(neighbors) == 0:
new_mask[i] = False
continue
local_med = float(np.median(neighbors))
if abs(cx_clean[i] - local_med) > neighbor_thresh:
new_mask[i] = False
if np.array_equal(mask, mask & new_mask):
break
mask = mask & new_mask
cy = cy[mask]
cx_clean = cx_clean[mask]
if weights is not None:
weights = weights[mask]
if len(cy) < MIN_FIT_ROWS:
return None
# (3) フィッティング
if ransac_thresh > 0 and ransac_iter > 0:
coeffs = _ransac_polyfit(
cy, cx_clean, 2, ransac_iter, ransac_thresh,
)
elif weights is not None:
coeffs = np.polyfit(cy, cx_clean, 2, w=weights)
else:
coeffs = np.polyfit(cy, cx_clean, 2)
if coeffs is None:
return None
# (4) 残差ベースの反復除去
if residual_thresh > 0:
for _ in range(5):
poly = np.poly1d(coeffs)
residuals = np.abs(cx_clean - poly(cy))
inlier = residuals < residual_thresh
if np.all(inlier):
break
if np.sum(inlier) < MIN_FIT_ROWS:
break
cy = cy[inlier]
cx_clean = cx_clean[inlier]
if weights is not None:
weights = weights[inlier]
if weights is not None:
coeffs = np.polyfit(
cy, cx_clean, 2, w=weights,
)
else:
coeffs = np.polyfit(cy, cx_clean, 2)
return coeffs
def _no_detection(
binary: np.ndarray,
) -> LineDetectResult:
"""未検出の結果を返す"""
return LineDetectResult(
detected=False,
position_error=0.0,
heading=0.0,
curvature=0.0,
poly_coeffs=None,
binary_image=binary,
)
def _build_result(
coeffs: np.ndarray,
binary: np.ndarray,
) -> LineDetectResult:
"""多項式係数から LineDetectResult を構築する"""
poly = np.poly1d(coeffs)
center_x = config.FRAME_WIDTH / 2.0
# 画像下端での位置偏差
x_bottom = poly(DETECT_Y_END)
position_error = (center_x - x_bottom) / center_x
# 傾き: dx/dy(画像下端での値)
poly_deriv = poly.deriv()
heading = float(poly_deriv(DETECT_Y_END))
# 曲率: d²x/dy²
poly_deriv2 = poly_deriv.deriv()
curvature = float(poly_deriv2(DETECT_Y_END))
return LineDetectResult(
detected=True,
position_error=position_error,
heading=heading,
curvature=curvature,
poly_coeffs=coeffs,
binary_image=binary,
)
# ── 案D: 谷検出+追跡 ─────────────────────────────
class ValleyTracker:
"""谷検出の時系列追跡を管理するクラス
前フレームの多項式係数を保持し,予測・平滑化・
検出失敗時のコースティングを提供する
"""
def __init__(self) -> None:
self._prev_coeffs: np.ndarray | None = None
self._smoothed_coeffs: np.ndarray | None = None
self._frames_lost: int = 0
def predict_x(self, y: float) -> float | None:
"""前フレームの多項式から x 座標を予測する
Args:
y: 画像の y 座標
Returns:
予測 x 座標(履歴なしの場合は None)
"""
if self._smoothed_coeffs is None:
return None
return float(np.poly1d(self._smoothed_coeffs)(y))
def update(
self,
coeffs: np.ndarray,
alpha: float,
) -> np.ndarray:
"""検出成功時に状態を更新する
EMA で多項式係数を平滑化し,更新後の係数を返す
Args:
coeffs: 今フレームのフィッティング係数
alpha: EMA 係数(1.0 で平滑化なし)
Returns:
平滑化後の多項式係数
"""
self._frames_lost = 0
if self._smoothed_coeffs is None:
self._smoothed_coeffs = coeffs.copy()
else:
self._smoothed_coeffs = (
alpha * coeffs
+ (1.0 - alpha) * self._smoothed_coeffs
)
self._prev_coeffs = self._smoothed_coeffs.copy()
return self._smoothed_coeffs
def coast(
self, max_frames: int,
) -> LineDetectResult | None:
"""検出失敗時に予測結果を返す
Args:
max_frames: 予測を継続する最大フレーム数
Returns:
予測による結果(継続不可の場合は None)
"""
if self._smoothed_coeffs is None:
return None
self._frames_lost += 1
if self._frames_lost > max_frames:
return None
# 予測でデバッグ用二値画像は空にする
h = config.FRAME_HEIGHT
w = config.FRAME_WIDTH
blank = np.zeros((h, w), dtype=np.uint8)
return _build_result(self._smoothed_coeffs, blank)
def reset(self) -> None:
"""追跡状態をリセットする"""
self._prev_coeffs = None
self._smoothed_coeffs = None
self._frames_lost = 0
_valley_tracker = ValleyTracker()
def reset_valley_tracker() -> None:
"""谷検出の追跡状態をリセットする"""
_valley_tracker.reset()
def _find_row_valley(
row: np.ndarray,
min_depth: int,
expected_width: float,
width_tolerance: float,
predicted_x: float | None,
max_deviation: int,
) -> tuple[float, float] | None:
"""1行の輝度信号から最適な谷を検出する
Args:
row: スムージング済みの1行輝度信号
min_depth: 最小谷深度
expected_width: 期待線幅(px,0 で幅フィルタ無効)
width_tolerance: 幅フィルタの上限倍率
predicted_x: 追跡による予測 x 座標(None で無効)
max_deviation: 予測からの最大許容偏差
Returns:
(谷の中心x, 谷の深度) または None
"""
n = len(row)
if n < 5:
return None
signal = row.astype(np.float32)
# 極小値を検出(前後より小さい点)
left = signal[:-2]
center = signal[1:-1]
right = signal[2:]
minima_mask = (center <= left) & (center <= right)
minima_indices = np.where(minima_mask)[0] + 1
if len(minima_indices) == 0:
return None
best: tuple[float, float] | None = None
best_score = -1.0
for idx in minima_indices:
val = signal[idx]
# 左の肩を探す
left_shoulder = idx
for i in range(idx - 1, -1, -1):
if signal[i] < signal[i + 1]:
break
left_shoulder = i
# 右の肩を探す
right_shoulder = idx
for i in range(idx + 1, n):
if signal[i] < signal[i - 1]:
break
right_shoulder = i
# 谷の深度(肩の平均 - 谷底)
shoulder_avg = (
signal[left_shoulder] + signal[right_shoulder]
) / 2.0
depth = shoulder_avg - val
if depth < min_depth:
continue
# 谷の幅
width = right_shoulder - left_shoulder
center_x = (left_shoulder + right_shoulder) / 2.0
# 幅フィルタ
if expected_width > 0:
max_w = expected_width * width_tolerance
min_w = expected_width / width_tolerance
if width > max_w or width < min_w:
continue
# 予測との偏差チェック
if predicted_x is not None:
if abs(center_x - predicted_x) > max_deviation:
continue
# スコア: 深度優先,予測がある場合は近さも考慮
score = float(depth)
if predicted_x is not None:
dist = abs(center_x - predicted_x)
score += max(0.0, max_deviation - dist)
if score > best_score:
best_score = score
best = (center_x, float(depth))
return best
def _build_valley_binary(
shape: tuple[int, int],
centers_y: list[int],
centers_x: list[float],
) -> np.ndarray:
"""谷検出結果からデバッグ用二値画像を生成する
Args:
shape: 出力画像の (高さ, 幅)
centers_y: 検出行の y 座標リスト
centers_x: 検出行の中心 x 座標リスト
Returns:
デバッグ用二値画像
"""
binary = np.zeros(shape, dtype=np.uint8)
half_w = 3
w = shape[1]
for y, cx in zip(centers_y, centers_x):
x0 = max(0, int(cx) - half_w)
x1 = min(w, int(cx) + half_w + 1)
binary[y, x0:x1] = 255
return binary
def _detect_valley(
frame: np.ndarray, params: ImageParams,
) -> LineDetectResult:
"""案D: 谷検出+追跡型
各行の輝度信号から谷(暗い領域)を直接検出し,
時系列追跡で安定性を確保する.二値化を使用しない
"""
h, w = frame.shape[:2]
# 行ごとにガウシアン平滑化するため画像全体をブラー
gauss_k = params.valley_gauss_ksize | 1
blurred = cv2.GaussianBlur(
frame, (gauss_k, 1), 0,
)
# 透視補正の期待幅を計算するための準備
use_width = (
params.width_near > 0 and params.width_far > 0
)
detect_h = DETECT_Y_END - DETECT_Y_START
denom = max(detect_h - 1, 1)
centers_y: list[int] = []
centers_x: list[float] = []
depths: list[float] = []
for y in range(DETECT_Y_START, DETECT_Y_END):
row = blurred[y]
# 期待幅の計算
if use_width:
t = (DETECT_Y_END - 1 - y) / denom
expected_w = float(params.width_far) + (
float(params.width_near)
- float(params.width_far)
) * t
else:
expected_w = 0.0
# 予測 x 座標
predicted_x = _valley_tracker.predict_x(
float(y),
)
result = _find_row_valley(
row,
params.valley_min_depth,
expected_w,
params.width_tolerance,
predicted_x,
params.valley_max_deviation,
)
if result is not None:
centers_y.append(y)
centers_x.append(result[0])
depths.append(result[1])
# デバッグ用二値画像
debug_binary = _build_valley_binary(
(h, w), centers_y, centers_x,
)
if len(centers_y) < MIN_FIT_ROWS:
# 検出失敗 → コースティングを試みる
coasted = _valley_tracker.coast(
params.valley_coast_frames,
)
if coasted is not None:
coasted.binary_image = debug_binary
return coasted
return _no_detection(debug_binary)
cy = np.array(centers_y, dtype=np.float64)
cx = np.array(centers_x, dtype=np.float64)
w_arr = np.array(depths, dtype=np.float64)
# ロバストフィッティング(深度を重みに使用)
coeffs = _clean_and_fit(
cy, cx,
median_ksize=params.median_ksize,
neighbor_thresh=params.neighbor_thresh,
residual_thresh=params.residual_thresh,
weights=w_arr,
ransac_thresh=params.ransac_thresh,
ransac_iter=params.ransac_iter,
)
if coeffs is None:
coasted = _valley_tracker.coast(
params.valley_coast_frames,
)
if coasted is not None:
coasted.binary_image = debug_binary
return coasted
return _no_detection(debug_binary)
# EMA で平滑化
smoothed = _valley_tracker.update(
coeffs, params.valley_ema_alpha,
)
return _build_result(smoothed, debug_binary)