Newer
Older
RobotCar / src / common / vision / fitting.py
"""
fitting
直線・曲線近似の共通ユーティリティモジュール
Theil-Sen 推定,RANSAC,外れ値除去付きフィッティングを提供する
"""

import numpy as np

# フィッティングに必要な最小行数
MIN_FIT_ROWS: int = 10

# 近傍外れ値除去の設定
NEIGHBOR_HALF_WINDOW: int = 3
NEIGHBOR_FILTER_PASSES: int = 3

# 残差ベース反復除去の最大回数
RESIDUAL_REMOVAL_ITERATIONS: int = 5


def theil_sen_fit(
    y: np.ndarray,
    x: np.ndarray,
) -> tuple[float, float]:
    """Theil-Sen 推定で直線 x = slope * y + intercept を求める

    全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う

    Args:
        y: y 座標の配列(行番号)
        x: x 座標の配列(各行の中心)

    Returns:
        (slope, intercept) のタプル
    """
    n = len(y)
    slopes = []
    for i in range(n):
        for j in range(i + 1, n):
            dy = y[j] - y[i]
            if dy != 0:
                slopes.append((x[j] - x[i]) / dy)

    if len(slopes) == 0:
        return 0.0, float(np.median(x))

    slope = float(np.median(slopes))
    intercept = float(np.median(x - slope * y))
    return slope, intercept


def ransac_polyfit(
    ys: np.ndarray, xs: np.ndarray,
    degree: int, n_iter: int, thresh: float,
) -> np.ndarray | None:
    """RANSAC で外れ値を除去して多項式フィッティング

    Args:
        ys: y 座標配列
        xs: x 座標配列
        degree: 多項式の次数
        n_iter: 反復回数
        thresh: 外れ値判定閾値(ピクセル)

    Returns:
        多項式係数(フィッティング失敗時は None)
    """
    n = len(ys)
    sample_size = degree + 1
    if n < sample_size:
        return None

    best_coeffs: np.ndarray | None = None
    best_inliers = 0
    rng = np.random.default_rng()

    for _ in range(n_iter):
        idx = rng.choice(n, sample_size, replace=False)
        coeffs = np.polyfit(ys[idx], xs[idx], degree)
        poly = np.poly1d(coeffs)
        residuals = np.abs(xs - poly(ys))
        n_inliers = int(np.sum(residuals < thresh))
        if n_inliers > best_inliers:
            best_inliers = n_inliers
            best_coeffs = coeffs

    # インライアで再フィッティング
    if best_coeffs is not None:
        poly = np.poly1d(best_coeffs)
        inlier_mask = np.abs(xs - poly(ys)) < thresh
        if np.sum(inlier_mask) >= sample_size:
            best_coeffs = np.polyfit(
                ys[inlier_mask],
                xs[inlier_mask],
                degree,
            )

    return best_coeffs


def clean_and_fit(
    cy: np.ndarray,
    cx: np.ndarray,
    median_ksize: int,
    neighbor_thresh: float,
    residual_thresh: float = 0.0,
    weights: np.ndarray | None = None,
    ransac_thresh: float = 0.0,
    ransac_iter: int = 0,
) -> np.ndarray | None:
    """外れ値除去+重み付きフィッティングを行う

    全検出手法で共通に使えるロバストなフィッティング
    (1) 移動メディアンフィルタでスパイクを平滑化
    (2) 近傍中央値からの偏差で外れ値を除去(複数パス)
    (3) 重み付き最小二乗(または RANSAC)でフィッティング
    (4) 残差ベースの反復除去で外れ値を最終除去

    Args:
        cy: 中心点の y 座標配列
        cx: 中心点の x 座標配列
        median_ksize: 移動メディアンのカーネルサイズ(0 で無効)
        neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効)
        residual_thresh: 残差除去の閾値 px(0 で無効)
        weights: 各点の信頼度(None で均等)
        ransac_thresh: RANSAC 閾値(0 以下で無効)
        ransac_iter: RANSAC 反復回数

    Returns:
        多項式係数(フィッティング失敗時は None)
    """
    if len(cy) < MIN_FIT_ROWS:
        return None

    cx_clean = cx.copy()
    mask = np.ones(len(cy), dtype=bool)

    # (1) 移動メディアンフィルタ
    if median_ksize >= 3:
        k = median_ksize | 1
        half = k // 2
        for i in range(len(cx_clean)):
            lo = max(0, i - half)
            hi = min(len(cx_clean), i + half + 1)
            cx_clean[i] = float(np.median(cx[lo:hi]))

    # (2) 近傍外れ値除去(複数パス)
    if neighbor_thresh > 0:
        half_n = NEIGHBOR_HALF_WINDOW
        for _ in range(NEIGHBOR_FILTER_PASSES):
            new_mask = np.ones(len(cx_clean), dtype=bool)
            for i in range(len(cx_clean)):
                if not mask[i]:
                    continue
                lo = max(0, i - half_n)
                hi = min(len(cx_clean), i + half_n + 1)
                neighbors = cx_clean[lo:hi][mask[lo:hi]]
                if len(neighbors) == 0:
                    new_mask[i] = False
                    continue
                local_med = float(np.median(neighbors))
                if (
                    abs(cx_clean[i] - local_med)
                    > neighbor_thresh
                ):
                    new_mask[i] = False
            if np.array_equal(mask, mask & new_mask):
                break
            mask = mask & new_mask

        cy = cy[mask]
        cx_clean = cx_clean[mask]
        if weights is not None:
            weights = weights[mask]

    if len(cy) < MIN_FIT_ROWS:
        return None

    # (3) フィッティング
    if ransac_thresh > 0 and ransac_iter > 0:
        coeffs = ransac_polyfit(
            cy, cx_clean, 2, ransac_iter, ransac_thresh,
        )
    elif weights is not None:
        coeffs = np.polyfit(cy, cx_clean, 2, w=weights)
    else:
        coeffs = np.polyfit(cy, cx_clean, 2)

    if coeffs is None:
        return None

    # (4) 残差ベースの反復除去
    if residual_thresh > 0:
        for _ in range(RESIDUAL_REMOVAL_ITERATIONS):
            poly = np.poly1d(coeffs)
            residuals = np.abs(cx_clean - poly(cy))
            inlier = residuals < residual_thresh
            if np.all(inlier):
                break
            if np.sum(inlier) < MIN_FIT_ROWS:
                break
            cy = cy[inlier]
            cx_clean = cx_clean[inlier]
            if weights is not None:
                weights = weights[inlier]
            if weights is not None:
                coeffs = np.polyfit(
                    cy, cx_clean, 2, w=weights,
                )
            else:
                coeffs = np.polyfit(cy, cx_clean, 2)

    return coeffs