diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index 1aa7eaf..070fec0 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -39,10 +39,12 @@ 2-2. src/common/ common/ - └── config.py + ├── config.py + └── json_utils.py JSON 読み書き共通ユーティリティ - ・PC・Pi 間で共有する設定値を定義する. - ・内容: ネットワーク設定,画像フォーマット,通信設定等. + ・PC・Pi 間で共有する設定値・ユーティリティを定義する. + ・config.py: ネットワーク設定,画像フォーマット,通信設定等. + ・json_utils.py: JSON ファイル読み書きとパラメータディレクトリの定義. 2-3. src/pc/ @@ -55,9 +57,12 @@ ├── steering/ 操舵量計算(独立モジュール) │ ├── base.py 共通インターフェース │ ├── pd_control.py PD 制御の実装 - │ └── param_store.py パラメータ保存・読み込み + │ ├── pursuit_control.py 2点パシュート制御の実装 + │ ├── param_store.py プリセット保存・読み込み + │ └── auto_params.py パラメータ自動保存・復元 └── vision/ 画像処理 ├── line_detector.py 線検出(多項式フィッティング) + ├── fitting.py 直線・曲線近似の共通ユーティリティ └── overlay.py デバッグオーバーレイ描画 2-4. src/pi/ diff --git a/src/common/json_utils.py b/src/common/json_utils.py new file mode 100644 index 0000000..62e214a --- /dev/null +++ b/src/common/json_utils.py @@ -0,0 +1,41 @@ +""" +json_utils +JSON ファイルの読み書きとパラメータディレクトリの共通定義 +""" + +import json +from pathlib import Path + +# プロジェクトルートの params/ ディレクトリ +PARAMS_DIR: Path = ( + Path(__file__).resolve().parent.parent.parent / "params" +) + + +def read_json(path: Path) -> dict: + """JSON ファイルを読み込む + + Args: + path: 読み込む JSON ファイルのパス + + Returns: + 読み込んだデータの辞書 + """ + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def write_json(path: Path, data: dict | list) -> None: + """JSON ファイルに書き込む + + 親ディレクトリが存在しない場合は自動作成する + + Args: + path: 書き込み先の JSON ファイルのパス + data: 書き込むデータ + """ + path.parent.mkdir(exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump( + data, f, ensure_ascii=False, indent=2, + ) diff --git a/src/pc/steering/auto_params.py b/src/pc/steering/auto_params.py index 4376097..24be64e 100644 --- a/src/pc/steering/auto_params.py +++ b/src/pc/steering/auto_params.py @@ -12,21 +12,14 @@ └── detect_robust.json 案C の画像処理パラメータ """ -import json from dataclasses import asdict -from pathlib import Path +from common.json_utils import PARAMS_DIR, read_json, write_json from pc.steering.pd_control import PdParams from pc.vision.line_detector import ImageParams -# パラメータ保存ディレクトリ -_PARAMS_DIR: Path = ( - Path(__file__).resolve().parent.parent.parent.parent - / "params" -) - # PD 制御パラメータファイル -_CONTROL_FILE: Path = _PARAMS_DIR / "control.json" +_CONTROL_FILE = PARAMS_DIR / "control.json" # 検出手法ごとのファイル名 _DETECT_FILES: dict[str, str] = { @@ -47,10 +40,9 @@ params: PD 制御パラメータ method: 最後に使用した検出手法の識別子 """ - _PARAMS_DIR.mkdir(exist_ok=True) data = asdict(params) data["last_method"] = method - _write_json(_CONTROL_FILE, data) + write_json(_CONTROL_FILE, data) def load_control() -> tuple[PdParams, str]: @@ -62,7 +54,7 @@ if not _CONTROL_FILE.exists(): return PdParams(), "current" - data = _read_json(_CONTROL_FILE) + data = read_json(_CONTROL_FILE) method = data.pop("last_method", "current") known = PdParams.__dataclass_fields__ filtered = { @@ -84,10 +76,9 @@ filename = _DETECT_FILES.get(method) if filename is None: return - _PARAMS_DIR.mkdir(exist_ok=True) data = asdict(params) data["method"] = method - _write_json(_PARAMS_DIR / filename, data) + write_json(PARAMS_DIR / filename, data) def load_detect_params(method: str) -> ImageParams: @@ -103,11 +94,11 @@ if filename is None: return ImageParams(method=method) - path = _PARAMS_DIR / filename + path = PARAMS_DIR / filename if not path.exists(): return ImageParams(method=method) - data = _read_json(path) + data = read_json(path) known = ImageParams.__dataclass_fields__ filtered = { k: v for k, v in data.items() @@ -117,15 +108,3 @@ return ImageParams(**filtered) -def _write_json(path: Path, data: dict) -> None: - """JSON ファイルに書き込む""" - with open(path, "w", encoding="utf-8") as f: - json.dump( - data, f, ensure_ascii=False, indent=2, - ) - - -def _read_json(path: Path) -> dict: - """JSON ファイルを読み込む""" - with open(path, "r", encoding="utf-8") as f: - return json.load(f) diff --git a/src/pc/steering/param_store.py b/src/pc/steering/param_store.py index c51ee3f..5a9cc58 100644 --- a/src/pc/steering/param_store.py +++ b/src/pc/steering/param_store.py @@ -4,21 +4,14 @@ 画像処理パラメータと PD 制御パラメータを独立して管理する """ -import json from dataclasses import asdict, dataclass -from pathlib import Path +from common.json_utils import PARAMS_DIR, read_json, write_json from pc.steering.pd_control import PdParams from pc.vision.line_detector import ImageParams -# プリセット保存ディレクトリ -_PARAMS_DIR: Path = ( - Path(__file__).resolve().parent.parent.parent.parent - / "params" -) - -_PD_FILE: Path = _PARAMS_DIR / "presets_pd.json" -_IMAGE_FILE: Path = _PARAMS_DIR / "presets_image.json" +_PD_FILE = PARAMS_DIR / "presets_pd.json" +_IMAGE_FILE = PARAMS_DIR / "presets_image.json" # ── PD 制御プリセット ───────────────────────── @@ -118,8 +111,7 @@ if not path.exists(): return [] - with open(path, "r", encoding="utf-8") as f: - data = json.load(f) + data = read_json(path) known = params_cls.__dataclass_fields__ presets = [] @@ -144,7 +136,6 @@ def _save_presets(path, presets, params_key): """プリセットファイルに保存する""" - path.parent.mkdir(exist_ok=True) data = [] for preset in presets: data.append({ @@ -155,5 +146,4 @@ ), }) - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) + write_json(path, data) diff --git a/src/pc/steering/pursuit_control.py b/src/pc/steering/pursuit_control.py index 71f8ce9..833edd4 100644 --- a/src/pc/steering/pursuit_control.py +++ b/src/pc/steering/pursuit_control.py @@ -10,6 +10,7 @@ from common import config from pc.steering.base import SteeringBase, SteeringOutput +from pc.vision.fitting import theil_sen_fit from pc.vision.line_detector import ( ImageParams, detect_line, @@ -39,37 +40,6 @@ speed_k: float = 2.0 -def theil_sen_fit( - y: np.ndarray, - x: np.ndarray, -) -> tuple[float, float]: - """Theil-Sen 推定で直線 x = slope * y + intercept を求める - - 全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う - - Args: - y: y 座標の配列(行番号) - x: x 座標の配列(各行の中心) - - Returns: - (slope, intercept) のタプル - """ - n = len(y) - slopes = [] - for i in range(n): - for j in range(i + 1, n): - dy = y[j] - y[i] - if dy != 0: - slopes.append((x[j] - x[i]) / dy) - - if len(slopes) == 0: - return 0.0, float(np.median(x)) - - slope = float(np.median(slopes)) - intercept = float(np.median(x - slope * y)) - return slope, intercept - - class PursuitControl(SteeringBase): """2点パシュートによる操舵量計算クラス diff --git a/src/pc/vision/fitting.py b/src/pc/vision/fitting.py new file mode 100644 index 0000000..298a241 --- /dev/null +++ b/src/pc/vision/fitting.py @@ -0,0 +1,37 @@ +""" +fitting +直線・曲線近似の共通ユーティリティモジュール +""" + +import numpy as np + + +def theil_sen_fit( + y: np.ndarray, + x: np.ndarray, +) -> tuple[float, float]: + """Theil-Sen 推定で直線 x = slope * y + intercept を求める + + 全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う + + Args: + y: y 座標の配列(行番号) + x: x 座標の配列(各行の中心) + + Returns: + (slope, intercept) のタプル + """ + n = len(y) + slopes = [] + for i in range(n): + for j in range(i + 1, n): + dy = y[j] - y[i] + if dy != 0: + slopes.append((x[j] - x[i]) / dy) + + if len(slopes) == 0: + return 0.0, float(np.median(x)) + + slope = float(np.median(slopes)) + intercept = float(np.median(x - slope * y)) + return slope, intercept diff --git a/src/pc/vision/line_detector.py b/src/pc/vision/line_detector.py index 84ff7cd..0bd2eac 100644 --- a/src/pc/vision/line_detector.py +++ b/src/pc/vision/line_detector.py @@ -19,6 +19,13 @@ MIN_FIT_PIXELS: int = 50 MIN_FIT_ROWS: int = 10 +# 近傍外れ値除去の設定 +NEIGHBOR_HALF_WINDOW: int = 3 +NEIGHBOR_FILTER_PASSES: int = 3 + +# 残差ベース反復除去の最大回数 +RESIDUAL_REMOVAL_ITERATIONS: int = 5 + # 検出手法の定義(キー: 識別子,値: 表示名) DETECT_METHODS: dict[str, str] = { "current": "現行(CLAHE + 固定閾値)", @@ -725,8 +732,8 @@ # (2) 近傍外れ値除去(複数パス) if neighbor_thresh > 0: - half_n = 3 - for _ in range(3): + half_n = NEIGHBOR_HALF_WINDOW + for _ in range(NEIGHBOR_FILTER_PASSES): new_mask = np.ones(len(cx_clean), dtype=bool) for i in range(len(cx_clean)): if not mask[i]: @@ -767,7 +774,7 @@ # (4) 残差ベースの反復除去 if residual_thresh > 0: - for _ in range(5): + for _ in range(RESIDUAL_REMOVAL_ITERATIONS): poly = np.poly1d(coeffs) residuals = np.abs(cx_clean - poly(cy)) inlier = residuals < residual_thresh diff --git a/src/pc/vision/overlay.py b/src/pc/vision/overlay.py index de4e2d6..27436f2 100644 --- a/src/pc/vision/overlay.py +++ b/src/pc/vision/overlay.py @@ -9,6 +9,7 @@ import cv2 import numpy as np +from pc.vision.fitting import theil_sen_fit from pc.vision.line_detector import LineDetectResult # 描画色の定義 (BGR) @@ -164,20 +165,7 @@ if len(ys) < 2: return - # Theil-Sen: 全ペアの傾きの中央値 - n = len(ys) - slopes = [] - for i in range(n): - for j in range(i + 1, n): - dy = ys[j] - ys[i] - if dy != 0: - slopes.append((xs[j] - xs[i]) / dy) - - if len(slopes) == 0: - return - - slope = float(np.median(slopes)) - intercept = float(np.median(xs - slope * ys)) + slope, intercept = theil_sen_fit(ys, xs) # 直線の両端を計算して描画 x0 = int(round(intercept))