Newer
Older
RobotCar / src / common / vision / detectors / valley.py
"""
valley
案D: 谷検出+追跡型の線検出
各行の輝度信号から谷(暗い領域)を直接検出し,
時系列追跡で安定性を確保する.二値化を使用しない
"""

import cv2
import numpy as np

from common import config
from common.vision.fitting import clean_and_fit
from common.vision.line_detector import (
    DETECT_Y_END,
    DETECT_Y_START,
    MIN_FIT_ROWS,
    ImageParams,
    LineDetectResult,
    build_result,
    no_detection,
)


class ValleyTracker:
    """谷検出の時系列追跡を管理するクラス

    前フレームの多項式係数を保持し,予測・平滑化・
    検出失敗時のコースティングを提供する
    """

    def __init__(self) -> None:
        self._prev_coeffs: np.ndarray | None = None
        self._smoothed_coeffs: np.ndarray | None = None
        self._frames_lost: int = 0

    def predict_x(self, y: float) -> float | None:
        """前フレームの多項式から x 座標を予測する

        Args:
            y: 画像の y 座標

        Returns:
            予測 x 座標(履歴なしの場合は None)
        """
        if self._smoothed_coeffs is None:
            return None
        return float(np.poly1d(self._smoothed_coeffs)(y))

    def update(
        self,
        coeffs: np.ndarray,
        alpha: float,
    ) -> np.ndarray:
        """検出成功時に状態を更新する

        Args:
            coeffs: 今フレームのフィッティング係数
            alpha: EMA 係数(1.0 で平滑化なし)

        Returns:
            平滑化後の多項式係数
        """
        self._frames_lost = 0
        if self._smoothed_coeffs is None:
            self._smoothed_coeffs = coeffs.copy()
        else:
            self._smoothed_coeffs = (
                alpha * coeffs
                + (1.0 - alpha) * self._smoothed_coeffs
            )
        self._prev_coeffs = self._smoothed_coeffs.copy()
        return self._smoothed_coeffs

    def coast(
        self, max_frames: int,
    ) -> LineDetectResult | None:
        """検出失敗時に予測結果を返す

        Args:
            max_frames: 予測を継続する最大フレーム数

        Returns:
            予測による結果(継続不可の場合は None)
        """
        if self._smoothed_coeffs is None:
            return None
        self._frames_lost += 1
        if self._frames_lost > max_frames:
            return None
        h = config.FRAME_HEIGHT
        w = config.FRAME_WIDTH
        blank = np.zeros((h, w), dtype=np.uint8)
        return build_result(self._smoothed_coeffs, blank)

    def reset(self) -> None:
        """追跡状態をリセットする"""
        self._prev_coeffs = None
        self._smoothed_coeffs = None
        self._frames_lost = 0


_valley_tracker = ValleyTracker()


def reset_valley_tracker() -> None:
    """谷検出の追跡状態をリセットする"""
    _valley_tracker.reset()


def _find_row_valley(
    row: np.ndarray,
    min_depth: int,
    expected_width: float,
    width_tolerance: float,
    predicted_x: float | None,
    max_deviation: int,
) -> tuple[float, float] | None:
    """1行の輝度信号から最適な谷を検出する

    Args:
        row: スムージング済みの1行輝度信号
        min_depth: 最小谷深度
        expected_width: 期待線幅(px,0 で幅フィルタ無効)
        width_tolerance: 幅フィルタの上限倍率
        predicted_x: 追跡による予測 x 座標(None で無効)
        max_deviation: 予測からの最大許容偏差

    Returns:
        (谷の中心x, 谷の深度) または None
    """
    n = len(row)
    if n < 5:
        return None

    signal = row.astype(np.float32)

    # 極小値を検出(前後より小さい点)
    left = signal[:-2]
    center = signal[1:-1]
    right = signal[2:]
    minima_mask = (center <= left) & (center <= right)
    minima_indices = np.where(minima_mask)[0] + 1

    if len(minima_indices) == 0:
        return None

    best: tuple[float, float] | None = None
    best_score = -1.0

    for idx in minima_indices:
        val = signal[idx]

        # 左の肩を探す
        left_shoulder = idx
        for i in range(idx - 1, -1, -1):
            if signal[i] < signal[i + 1]:
                break
            left_shoulder = i
        # 右の肩を探す
        right_shoulder = idx
        for i in range(idx + 1, n):
            if signal[i] < signal[i - 1]:
                break
            right_shoulder = i

        # 谷の深度(肩の平均 - 谷底)
        shoulder_avg = (
            signal[left_shoulder] + signal[right_shoulder]
        ) / 2.0
        depth = shoulder_avg - val
        if depth < min_depth:
            continue

        # 谷の幅
        width = right_shoulder - left_shoulder
        center_x = (left_shoulder + right_shoulder) / 2.0

        # 幅フィルタ
        if expected_width > 0:
            max_w = expected_width * width_tolerance
            min_w = expected_width / width_tolerance
            if width > max_w or width < min_w:
                continue

        # 予測との偏差チェック
        if predicted_x is not None:
            if abs(center_x - predicted_x) > max_deviation:
                continue

        # スコア: 深度優先,予測がある場合は近さも考慮
        score = float(depth)
        if predicted_x is not None:
            dist = abs(center_x - predicted_x)
            score += max(0.0, max_deviation - dist)

        if score > best_score:
            best_score = score
            best = (center_x, float(depth))

    return best


def _build_valley_binary(
    shape: tuple[int, int],
    centers_y: list[int],
    centers_x: list[float],
) -> np.ndarray:
    """谷検出結果からデバッグ用二値画像を生成する

    Args:
        shape: 出力画像の (高さ, 幅)
        centers_y: 検出行の y 座標リスト
        centers_x: 検出行の中心 x 座標リスト

    Returns:
        デバッグ用二値画像
    """
    binary = np.zeros(shape, dtype=np.uint8)
    half_w = 3
    w = shape[1]
    for y, cx in zip(centers_y, centers_x):
        x0 = max(0, int(cx) - half_w)
        x1 = min(w, int(cx) + half_w + 1)
        binary[y, x0:x1] = 255
    return binary


def detect_valley(
    frame: np.ndarray, params: ImageParams,
) -> LineDetectResult:
    """案D: 谷検出+追跡型"""
    h, w = frame.shape[:2]

    # 行ごとにガウシアン平滑化するため画像全体をブラー
    gauss_k = params.valley_gauss_ksize | 1
    blurred = cv2.GaussianBlur(
        frame, (gauss_k, 1), 0,
    )

    # 透視補正の期待幅を計算するための準備
    use_width = (
        params.width_near > 0 and params.width_far > 0
    )
    detect_h = DETECT_Y_END - DETECT_Y_START
    denom = max(detect_h - 1, 1)

    centers_y: list[int] = []
    centers_x: list[float] = []
    depths: list[float] = []

    for y in range(DETECT_Y_START, DETECT_Y_END):
        row = blurred[y]

        # 期待幅の計算
        if use_width:
            t = (DETECT_Y_END - 1 - y) / denom
            expected_w = float(params.width_far) + (
                float(params.width_near)
                - float(params.width_far)
            ) * t
        else:
            expected_w = 0.0

        # 予測 x 座標
        predicted_x = _valley_tracker.predict_x(
            float(y),
        )

        result = _find_row_valley(
            row,
            params.valley_min_depth,
            expected_w,
            params.width_tolerance,
            predicted_x,
            params.valley_max_deviation,
        )
        if result is not None:
            centers_y.append(y)
            centers_x.append(result[0])
            depths.append(result[1])

    # デバッグ用二値画像
    debug_binary = _build_valley_binary(
        (h, w), centers_y, centers_x,
    )

    if len(centers_y) < MIN_FIT_ROWS:
        coasted = _valley_tracker.coast(
            params.valley_coast_frames,
        )
        if coasted is not None:
            coasted.binary_image = debug_binary
            return coasted
        return no_detection(debug_binary)

    cy = np.array(centers_y, dtype=np.float64)
    cx = np.array(centers_x, dtype=np.float64)
    w_arr = np.array(depths, dtype=np.float64)

    # ロバストフィッティング(深度を重みに使用)
    coeffs = clean_and_fit(
        cy, cx,
        median_ksize=params.median_ksize,
        neighbor_thresh=params.neighbor_thresh,
        residual_thresh=params.residual_thresh,
        weights=w_arr,
        ransac_thresh=params.ransac_thresh,
        ransac_iter=params.ransac_iter,
    )
    if coeffs is None:
        coasted = _valley_tracker.coast(
            params.valley_coast_frames,
        )
        if coasted is not None:
            coasted.binary_image = debug_binary
            return coasted
        return no_detection(debug_binary)

    # EMA で平滑化
    smoothed = _valley_tracker.update(
        coeffs, params.valley_ema_alpha,
    )

    return build_result(smoothed, debug_binary)