"""
valley
案D: 谷検出+追跡型の線検出
各行の輝度信号から谷(暗い領域)を直接検出し,
時系列追跡で安定性を確保する.二値化を使用しない
"""
import cv2
import numpy as np
from common import config
from common.vision.fitting import clean_and_fit
from common.vision.line_detector import (
DETECT_Y_END,
DETECT_Y_START,
MIN_FIT_ROWS,
ImageParams,
LineDetectResult,
build_result,
no_detection,
)
class ValleyTracker:
"""谷検出の時系列追跡を管理するクラス
前フレームの多項式係数を保持し,予測・平滑化・
検出失敗時のコースティングを提供する
"""
def __init__(self) -> None:
self._prev_coeffs: np.ndarray | None = None
self._smoothed_coeffs: np.ndarray | None = None
self._frames_lost: int = 0
def predict_x(self, y: float) -> float | None:
"""前フレームの多項式から x 座標を予測する
Args:
y: 画像の y 座標
Returns:
予測 x 座標(履歴なしの場合は None)
"""
if self._smoothed_coeffs is None:
return None
return float(np.poly1d(self._smoothed_coeffs)(y))
def update(
self,
coeffs: np.ndarray,
alpha: float,
) -> np.ndarray:
"""検出成功時に状態を更新する
Args:
coeffs: 今フレームのフィッティング係数
alpha: EMA 係数(1.0 で平滑化なし)
Returns:
平滑化後の多項式係数
"""
self._frames_lost = 0
if self._smoothed_coeffs is None:
self._smoothed_coeffs = coeffs.copy()
else:
self._smoothed_coeffs = (
alpha * coeffs
+ (1.0 - alpha) * self._smoothed_coeffs
)
self._prev_coeffs = self._smoothed_coeffs.copy()
return self._smoothed_coeffs
def coast(
self, max_frames: int,
) -> LineDetectResult | None:
"""検出失敗時に予測結果を返す
Args:
max_frames: 予測を継続する最大フレーム数
Returns:
予測による結果(継続不可の場合は None)
"""
if self._smoothed_coeffs is None:
return None
self._frames_lost += 1
if self._frames_lost > max_frames:
return None
h = config.FRAME_HEIGHT
w = config.FRAME_WIDTH
blank = np.zeros((h, w), dtype=np.uint8)
return build_result(self._smoothed_coeffs, blank)
def reset(self) -> None:
"""追跡状態をリセットする"""
self._prev_coeffs = None
self._smoothed_coeffs = None
self._frames_lost = 0
_valley_tracker = ValleyTracker()
def reset_valley_tracker() -> None:
"""谷検出の追跡状態をリセットする"""
_valley_tracker.reset()
def _find_row_valley(
row: np.ndarray,
min_depth: int,
expected_width: float,
width_tolerance: float,
predicted_x: float | None,
max_deviation: int,
) -> tuple[float, float] | None:
"""1行の輝度信号から最適な谷を検出する
Args:
row: スムージング済みの1行輝度信号
min_depth: 最小谷深度
expected_width: 期待線幅(px,0 で幅フィルタ無効)
width_tolerance: 幅フィルタの上限倍率
predicted_x: 追跡による予測 x 座標(None で無効)
max_deviation: 予測からの最大許容偏差
Returns:
(谷の中心x, 谷の深度) または None
"""
n = len(row)
if n < 5:
return None
signal = row.astype(np.float32)
# 極小値を検出(前後より小さい点)
left = signal[:-2]
center = signal[1:-1]
right = signal[2:]
minima_mask = (center <= left) & (center <= right)
minima_indices = np.where(minima_mask)[0] + 1
if len(minima_indices) == 0:
return None
best: tuple[float, float] | None = None
best_score = -1.0
for idx in minima_indices:
val = signal[idx]
# 左の肩を探す
left_shoulder = idx
for i in range(idx - 1, -1, -1):
if signal[i] < signal[i + 1]:
break
left_shoulder = i
# 右の肩を探す
right_shoulder = idx
for i in range(idx + 1, n):
if signal[i] < signal[i - 1]:
break
right_shoulder = i
# 谷の深度(肩の平均 - 谷底)
shoulder_avg = (
signal[left_shoulder] + signal[right_shoulder]
) / 2.0
depth = shoulder_avg - val
if depth < min_depth:
continue
# 谷の幅
width = right_shoulder - left_shoulder
center_x = (left_shoulder + right_shoulder) / 2.0
# 幅フィルタ
if expected_width > 0:
max_w = expected_width * width_tolerance
min_w = expected_width / width_tolerance
if width > max_w or width < min_w:
continue
# 予測との偏差チェック
if predicted_x is not None:
if abs(center_x - predicted_x) > max_deviation:
continue
# スコア: 深度優先,予測がある場合は近さも考慮
score = float(depth)
if predicted_x is not None:
dist = abs(center_x - predicted_x)
score += max(0.0, max_deviation - dist)
if score > best_score:
best_score = score
best = (center_x, float(depth))
return best
def _build_valley_binary(
shape: tuple[int, int],
centers_y: list[int],
centers_x: list[float],
) -> np.ndarray:
"""谷検出結果からデバッグ用二値画像を生成する
Args:
shape: 出力画像の (高さ, 幅)
centers_y: 検出行の y 座標リスト
centers_x: 検出行の中心 x 座標リスト
Returns:
デバッグ用二値画像
"""
binary = np.zeros(shape, dtype=np.uint8)
half_w = 3
w = shape[1]
for y, cx in zip(centers_y, centers_x):
x0 = max(0, int(cx) - half_w)
x1 = min(w, int(cx) + half_w + 1)
binary[y, x0:x1] = 255
return binary
def detect_valley(
frame: np.ndarray, params: ImageParams,
) -> LineDetectResult:
"""案D: 谷検出+追跡型"""
h, w = frame.shape[:2]
# 行ごとにガウシアン平滑化するため画像全体をブラー
gauss_k = params.valley_gauss_ksize | 1
blurred = cv2.GaussianBlur(
frame, (gauss_k, 1), 0,
)
# 透視補正の期待幅を計算するための準備
use_width = (
params.width_near > 0 and params.width_far > 0
)
detect_h = DETECT_Y_END - DETECT_Y_START
denom = max(detect_h - 1, 1)
centers_y: list[int] = []
centers_x: list[float] = []
depths: list[float] = []
for y in range(DETECT_Y_START, DETECT_Y_END):
row = blurred[y]
# 期待幅の計算
if use_width:
t = (DETECT_Y_END - 1 - y) / denom
expected_w = float(params.width_far) + (
float(params.width_near)
- float(params.width_far)
) * t
else:
expected_w = 0.0
# 予測 x 座標
predicted_x = _valley_tracker.predict_x(
float(y),
)
result = _find_row_valley(
row,
params.valley_min_depth,
expected_w,
params.width_tolerance,
predicted_x,
params.valley_max_deviation,
)
if result is not None:
centers_y.append(y)
centers_x.append(result[0])
depths.append(result[1])
# デバッグ用二値画像
debug_binary = _build_valley_binary(
(h, w), centers_y, centers_x,
)
if len(centers_y) < MIN_FIT_ROWS:
coasted = _valley_tracker.coast(
params.valley_coast_frames,
)
if coasted is not None:
coasted.binary_image = debug_binary
return coasted
return no_detection(debug_binary)
cy = np.array(centers_y, dtype=np.float64)
cx = np.array(centers_x, dtype=np.float64)
w_arr = np.array(depths, dtype=np.float64)
# ロバストフィッティング(深度を重みに使用)
coeffs = clean_and_fit(
cy, cx,
median_ksize=params.median_ksize,
neighbor_thresh=params.neighbor_thresh,
residual_thresh=params.residual_thresh,
weights=w_arr,
ransac_thresh=params.ransac_thresh,
ransac_iter=params.ransac_iter,
)
if coeffs is None:
coasted = _valley_tracker.coast(
params.valley_coast_frames,
)
if coasted is not None:
coasted.binary_image = debug_binary
return coasted
return no_detection(debug_binary)
# EMA で平滑化
smoothed = _valley_tracker.update(
coeffs, params.valley_ema_alpha,
)
return build_result(smoothed, debug_binary)