"""
fitting
直線・曲線近似の共通ユーティリティモジュール
Theil-Sen 推定,RANSAC,外れ値除去付きフィッティングを提供する
"""
import numpy as np
# フィッティングに必要な最小行数
MIN_FIT_ROWS: int = 10
# 近傍外れ値除去の設定
NEIGHBOR_HALF_WINDOW: int = 3
NEIGHBOR_FILTER_PASSES: int = 3
# 残差ベース反復除去の最大回数
RESIDUAL_REMOVAL_ITERATIONS: int = 5
def theil_sen_fit(
y: np.ndarray,
x: np.ndarray,
) -> tuple[float, float]:
"""Theil-Sen 推定で直線 x = slope * y + intercept を求める
全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う
Args:
y: y 座標の配列(行番号)
x: x 座標の配列(各行の中心)
Returns:
(slope, intercept) のタプル
"""
n = len(y)
slopes = []
for i in range(n):
for j in range(i + 1, n):
dy = y[j] - y[i]
if dy != 0:
slopes.append((x[j] - x[i]) / dy)
if len(slopes) == 0:
return 0.0, float(np.median(x))
slope = float(np.median(slopes))
intercept = float(np.median(x - slope * y))
return slope, intercept
def ransac_polyfit(
ys: np.ndarray, xs: np.ndarray,
degree: int, n_iter: int, thresh: float,
) -> np.ndarray | None:
"""RANSAC で外れ値を除去して多項式フィッティング
Args:
ys: y 座標配列
xs: x 座標配列
degree: 多項式の次数
n_iter: 反復回数
thresh: 外れ値判定閾値(ピクセル)
Returns:
多項式係数(フィッティング失敗時は None)
"""
n = len(ys)
sample_size = degree + 1
if n < sample_size:
return None
best_coeffs: np.ndarray | None = None
best_inliers = 0
rng = np.random.default_rng()
for _ in range(n_iter):
idx = rng.choice(n, sample_size, replace=False)
coeffs = np.polyfit(ys[idx], xs[idx], degree)
poly = np.poly1d(coeffs)
residuals = np.abs(xs - poly(ys))
n_inliers = int(np.sum(residuals < thresh))
if n_inliers > best_inliers:
best_inliers = n_inliers
best_coeffs = coeffs
# インライアで再フィッティング
if best_coeffs is not None:
poly = np.poly1d(best_coeffs)
inlier_mask = np.abs(xs - poly(ys)) < thresh
if np.sum(inlier_mask) >= sample_size:
best_coeffs = np.polyfit(
ys[inlier_mask],
xs[inlier_mask],
degree,
)
return best_coeffs
def clean_and_fit(
cy: np.ndarray,
cx: np.ndarray,
median_ksize: int,
neighbor_thresh: float,
residual_thresh: float = 0.0,
weights: np.ndarray | None = None,
ransac_thresh: float = 0.0,
ransac_iter: int = 0,
) -> np.ndarray | None:
"""外れ値除去+重み付きフィッティングを行う
全検出手法で共通に使えるロバストなフィッティング
(1) 移動メディアンフィルタでスパイクを平滑化
(2) 近傍中央値からの偏差で外れ値を除去(複数パス)
(3) 重み付き最小二乗(または RANSAC)でフィッティング
(4) 残差ベースの反復除去で外れ値を最終除去
Args:
cy: 中心点の y 座標配列
cx: 中心点の x 座標配列
median_ksize: 移動メディアンのカーネルサイズ(0 で無効)
neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効)
residual_thresh: 残差除去の閾値 px(0 で無効)
weights: 各点の信頼度(None で均等)
ransac_thresh: RANSAC 閾値(0 以下で無効)
ransac_iter: RANSAC 反復回数
Returns:
多項式係数(フィッティング失敗時は None)
"""
if len(cy) < MIN_FIT_ROWS:
return None
cx_clean = cx.copy()
mask = np.ones(len(cy), dtype=bool)
# (1) 移動メディアンフィルタ
if median_ksize >= 3:
k = median_ksize | 1
half = k // 2
for i in range(len(cx_clean)):
lo = max(0, i - half)
hi = min(len(cx_clean), i + half + 1)
cx_clean[i] = float(np.median(cx[lo:hi]))
# (2) 近傍外れ値除去(複数パス)
if neighbor_thresh > 0:
half_n = NEIGHBOR_HALF_WINDOW
for _ in range(NEIGHBOR_FILTER_PASSES):
new_mask = np.ones(len(cx_clean), dtype=bool)
for i in range(len(cx_clean)):
if not mask[i]:
continue
lo = max(0, i - half_n)
hi = min(len(cx_clean), i + half_n + 1)
neighbors = cx_clean[lo:hi][mask[lo:hi]]
if len(neighbors) == 0:
new_mask[i] = False
continue
local_med = float(np.median(neighbors))
if (
abs(cx_clean[i] - local_med)
> neighbor_thresh
):
new_mask[i] = False
if np.array_equal(mask, mask & new_mask):
break
mask = mask & new_mask
cy = cy[mask]
cx_clean = cx_clean[mask]
if weights is not None:
weights = weights[mask]
if len(cy) < MIN_FIT_ROWS:
return None
# (3) フィッティング
if ransac_thresh > 0 and ransac_iter > 0:
coeffs = ransac_polyfit(
cy, cx_clean, 2, ransac_iter, ransac_thresh,
)
elif weights is not None:
coeffs = np.polyfit(cy, cx_clean, 2, w=weights)
else:
coeffs = np.polyfit(cy, cx_clean, 2)
if coeffs is None:
return None
# (4) 残差ベースの反復除去
if residual_thresh > 0:
for _ in range(RESIDUAL_REMOVAL_ITERATIONS):
poly = np.poly1d(coeffs)
residuals = np.abs(cx_clean - poly(cy))
inlier = residuals < residual_thresh
if np.all(inlier):
break
if np.sum(inlier) < MIN_FIT_ROWS:
break
cy = cy[inlier]
cx_clean = cx_clean[inlier]
if weights is not None:
weights = weights[inlier]
if weights is not None:
coeffs = np.polyfit(
cy, cx_clean, 2, w=weights,
)
else:
coeffs = np.polyfit(cy, cx_clean, 2)
return coeffs