diff --git "a/docs/03_TECH/TECH_01_\346\223\215\350\210\265\351\207\217\350\250\210\347\256\227\344\273\225\346\247\230.txt" "b/docs/03_TECH/TECH_01_\346\223\215\350\210\265\351\207\217\350\250\210\347\256\227\344\273\225\346\247\230.txt" index e7158ae..dcafd87 100644 --- "a/docs/03_TECH/TECH_01_\346\223\215\350\210\265\351\207\217\350\250\210\347\256\227\344\273\225\346\247\230.txt" +++ "b/docs/03_TECH/TECH_01_\346\223\215\350\210\265\351\207\217\350\250\210\347\256\227\344\273\225\346\247\230.txt" @@ -279,8 +279,8 @@ 7-5. 実装ファイル - ・src/pc/steering/pursuit_control.py: PursuitControl クラス(PC 側) - ・src/pi/steering/pursuit_control.py: PursuitControl クラス(Pi 側) + ・src/common/steering/pursuit_control.py: PursuitControl クラス + ・src/common/steering/base.py: SteeringBase(線検出・レートリミッター・reset の共通処理) ・src/pc/gui/main_window.py: 制御手法の切替 UI @@ -338,8 +338,8 @@ 8-6. 実装ファイル - ・src/pc/steering/ts_pd_control.py: TsPdControl クラス(PC 側) - ・src/pi/steering/ts_pd_control.py: TsPdControl クラス(Pi 側) + ・src/common/steering/ts_pd_control.py: TsPdControl クラス + ・src/common/steering/base.py: SteeringBase(線検出・レートリミッター・reset の共通処理) ・src/pc/gui/main_window.py: 制御手法の切替 UI @@ -362,8 +362,7 @@ 9-3. 実装ファイル - ・src/pc/steering/recovery.py: RecoveryParams,RecoveryController(PC 側) - ・src/pi/steering/recovery.py: RecoveryController(Pi 側) + ・src/common/steering/recovery.py: RecoveryParams,RecoveryController ・src/pi/main.py: Pi 側での復帰ロジックの統合 ・src/pc/gui/panels/recovery_panel.py: RecoveryPanel ・src/pc/gui/main_window.py: 復帰パラメータ管理・Pi への送信 diff --git "a/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" "b/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" index 1ecda51..5f0c308 100644 --- "a/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" +++ "b/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" @@ -176,13 +176,27 @@ - 4 バイト: カメラ画像長(uint32 LE) - M バイト: JPEG 圧縮カメラ画像 - 残り: JPEG 圧縮二値画像(デバッグ用,省略可) + ・JSON テレメトリのフィールド: + - v: プロトコルバージョン(config.TELEMETRY_VERSION と一致必須) + - ts: 送信時刻(Unix タイムスタンプ) + - throttle, steer: 現在の操舵量 + - detected: 線検出の成否 + - pos_error: 位置偏差 + - heading: 線の傾き + - is_intersection: 十字路判定結果 + - is_recovering: 復帰動作中か + - fps: Pi 側の処理 FPS + ・PC 側はバージョン不一致のメッセージを破棄する. ■ コマンド(PC → Pi,ZMQ PUB/SUB) ・JSON 形式: - mode: "auto" | "manual" | "stop" - throttle, steer: 手動モード時の操舵量 + - steering_method: "pd" | "pursuit" | "ts_pd" - image_params: 画像処理パラメータの辞書 - - steering_params: 操舵パラメータの辞書 + - pd_params: PD 制御パラメータの辞書 + - pursuit_params: Pursuit 制御パラメータの辞書 + - steering_params: Theil-Sen PD 制御パラメータの辞書 - recovery_params: 復帰パラメータの辞書 - intersection_enabled: 十字路判定の有効/無効 - intersection_throttle: 十字路通過時の速度 diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index 2bd1fb4..a3394c0 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -54,14 +54,15 @@ │ ├── robust.py 案C(最高ロバスト) │ └── valley.py 案D(谷検出+追跡) └── steering/ 操舵量計算(PC・Pi 共通) - ├── base.py 共通インターフェース + ├── base.py 共通基底クラス(線検出・レートリミッター・reset) ├── pd_control.py PD 制御の実装 ├── pursuit_control.py 2点パシュート制御の実装 ├── ts_pd_control.py Theil-Sen PD 制御の実装 └── recovery.py コースアウト復帰 ・PC・Pi 間で共有する設定値・画像処理・操舵量計算を定義する. - ・config.py: ネットワーク設定,画像フォーマット,通信設定等. + ・config.py: ネットワーク設定,画像フォーマット,通信設定, + テレメトリバージョン,表示倍率,ログ間隔等. ・json_utils.py: JSON ファイル読み書きとパラメータディレクトリの定義. ・vision/: 線検出パイプライン・検出手法・十字路分類を共通化. ・steering/: PD 制御・パシュート制御・Theil-Sen PD 制御・復帰制御を共通化. diff --git a/src/common/config.py b/src/common/config.py index 8596713..e092161 100644 --- a/src/common/config.py +++ b/src/common/config.py @@ -18,6 +18,12 @@ _env_path = _search_dir / ".env" load_dotenv(_env_path) +# ── テレメトリプロトコル ────────────────────────────────────── + +# テレメトリメッセージのプロトコルバージョン +# Pi/PC 間でこの値が一致しないと正しくパースできない +TELEMETRY_VERSION: int = 1 + # ── ネットワーク設定(.env から読み込み) ────────────────────── # PC の IP アドレス @@ -46,6 +52,19 @@ # JPEG 圧縮品質 (0-100) JPEG_QUALITY: int = 55 +# 二値画像の JPEG 圧縮品質 (0-100) +JPEG_QUALITY_BINARY: int = 80 + +# ── 表示設定 ────────────────────────────────────────────── + +# GUI 表示倍率(FRAME_WIDTH/HEIGHT → 表示サイズ) +DISPLAY_SCALE: float = 16.0 + +# ── Pi 側ログ設定 ───────────────────────────────────────── + +# FPS ログの出力間隔(秒) +LOG_INTERVAL_SEC: float = 3.0 + # ── 通信設定 ────────────────────────────────────────────── # 操舵量の送信頻度 (Hz) diff --git a/src/pc/comm/zmq_client.py b/src/pc/comm/zmq_client.py index ea2aaef..d9e56e4 100644 --- a/src/pc/comm/zmq_client.py +++ b/src/pc/comm/zmq_client.py @@ -81,6 +81,16 @@ ) offset += json_len + # バージョンチェック + msg_ver = telemetry.get("v", 0) + if msg_ver != config.TELEMETRY_VERSION: + print( + f"PC: テレメトリバージョン不一致 " + f"(受信={msg_ver}, " + f"期待={config.TELEMETRY_VERSION})" + ) + return None + # カメラ画像を読み取り cam_len = struct.unpack_from( " None: @@ -42,6 +33,11 @@ pd_control = PdControl() pursuit_control = PursuitControl() ts_pd_control = TsPdControl() + controllers: dict[str, SteeringBase] = { + "pd": pd_control, + "pursuit": pursuit_control, + "ts_pd": ts_pd_control, + } steering: SteeringBase = ts_pd_control steering_method = "ts_pd" @@ -61,7 +57,6 @@ frame_count = 0 fps_start = time.time() current_fps = 0.0 - LOG_INTERVAL_SEC = 3.0 try: camera.start() @@ -79,22 +74,21 @@ new_method is not None and new_method != steering_method ): - steering_method = new_method - if steering_method == "pd": - steering = pd_control - elif steering_method == "pursuit": - steering = pursuit_control + if new_method in controllers: + steering_method = new_method + steering = controllers[ + steering_method + ] else: - steering = ts_pd_control steering_method = "ts_pd" + steering = controllers["ts_pd"] print( f"Pi: 制御手法変更 → " f"{steering_method}" ) _apply_command( - cmd, pd_control, pursuit_control, - ts_pd_control, recovery, + cmd, controllers, recovery, intersection_clf, ) # モード更新 @@ -202,7 +196,7 @@ # ── FPS 計測 ─────────────────────────── elapsed = time.time() - fps_start - if elapsed >= LOG_INTERVAL_SEC: + if elapsed >= config.LOG_INTERVAL_SEC: current_fps = frame_count / elapsed print(f"Pi: FPS={current_fps:.1f}") frame_count = 0 @@ -265,9 +259,7 @@ def _apply_command( cmd: dict, - pd_control: PdControl, - pursuit_control: PursuitControl, - ts_pd_control: TsPdControl, + controllers: dict[str, SteeringBase], recovery: RecoveryController, intersection_clf: IntersectionClassifier, ) -> None: @@ -275,41 +267,30 @@ Args: cmd: 受信したコマンド辞書 - pd_control: PD 制御クラス - pursuit_control: Pursuit 制御クラス - ts_pd_control: Theil-Sen PD 制御クラス + controllers: 制御手法名とインスタンスの辞書 recovery: 復帰制御クラス intersection_clf: 十字路分類器 """ # 画像処理パラメータの更新(全制御クラスに反映) if "image_params" in cmd: ip = cmd["image_params"] - for ctrl in [ - pd_control, pursuit_control, ts_pd_control, - ]: + for ctrl in controllers.values(): _safe_update_dataclass( ctrl.image_params, ip, ) - # PD パラメータの更新 - if "pd_params" in cmd: - _safe_update_dataclass( - pd_control.params, cmd["pd_params"], - ) - - # Pursuit パラメータの更新 - if "pursuit_params" in cmd: - _safe_update_dataclass( - pursuit_control.params, - cmd["pursuit_params"], - ) - - # Theil-Sen PD パラメータの更新 - if "steering_params" in cmd: - _safe_update_dataclass( - ts_pd_control.params, - cmd["steering_params"], - ) + # 制御手法固有パラメータの更新 + _PARAM_KEYS: dict[str, str] = { + "pd": "pd_params", + "pursuit": "pursuit_params", + "ts_pd": "steering_params", + } + for name, cmd_key in _PARAM_KEYS.items(): + if cmd_key in cmd and name in controllers: + _safe_update_dataclass( + controllers[name].params, + cmd[cmd_key], + ) # 復帰パラメータの更新 if "recovery_params" in cmd: diff --git a/tests/test_steering.py b/tests/test_steering.py new file mode 100644 index 0000000..396d834 --- /dev/null +++ b/tests/test_steering.py @@ -0,0 +1,331 @@ +"""操舵量計算モジュールのテスト""" + +import numpy as np +import pytest + +from common import config +from common.steering.base import SteeringOutput +from common.steering.pd_control import PdControl, PdParams +from common.steering.pursuit_control import ( + PursuitControl, + PursuitParams, +) +from common.steering.ts_pd_control import ( + TsPdControl, + TsPdParams, +) +from common.vision.line_detector import ImageParams + + +@pytest.fixture() +def _center_line_params() -> ImageParams: + """テスト画像用に調整した検出パラメータ""" + return ImageParams( + method="current", + clahe_grid=2, blur_size=3, + open_size=1, close_width=3, + close_height=1, + ) + + +class TestSteeringBase: + """SteeringBase の共通ロジックのテスト""" + + def test_compute_returns_steering_output( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """compute は SteeringOutput を返す""" + ctrl = PdControl( + image_params=_center_line_params, + ) + output = ctrl.compute(straight_line_image) + assert isinstance(output, SteeringOutput) + + def test_steer_within_range( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """steer は -1.0 ~ +1.0 の範囲内""" + ctrl = PdControl( + image_params=_center_line_params, + ) + for _ in range(10): + output = ctrl.compute(straight_line_image) + assert -1.0 <= output.steer <= 1.0 + + def test_throttle_non_negative( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """throttle は 0 以上""" + ctrl = PdControl( + image_params=_center_line_params, + ) + output = ctrl.compute(straight_line_image) + assert output.throttle >= 0.0 + + def test_rate_limiter_clamps_steer_change( + self, + _center_line_params: ImageParams, + ) -> None: + """レートリミッターが操舵変化量を制限する""" + max_rate = 0.05 + params = PdParams( + kp=5.0, kh=5.0, kd=0.0, + max_steer_rate=max_rate, + ) + ctrl = PdControl( + params=params, + image_params=_center_line_params, + ) + + # 大きく左にオフセットした画像を作成 + h, w = config.FRAME_HEIGHT, config.FRAME_WIDTH + img = np.full((h, w), 200, dtype=np.uint8) + img[:, 2:5] = 30 # 左端に線 + + output = ctrl.compute(img) + # 初回は prev_steer=0 なので変化量が制限される + assert abs(output.steer) <= max_rate + 1e-9 + + def test_no_line_returns_zero( + self, + blank_image: np.ndarray, + ) -> None: + """線が検出できない場合は throttle=0, steer=0""" + ctrl = PdControl() + output = ctrl.compute(blank_image) + assert output.throttle == 0.0 + assert output.steer == 0.0 + + def test_last_detect_result_updated( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """compute 後に last_detect_result が更新される""" + ctrl = PdControl( + image_params=_center_line_params, + ) + assert ctrl.last_detect_result is None + ctrl.compute(straight_line_image) + assert ctrl.last_detect_result is not None + + def test_reset_clears_state( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """reset 後に内部状態がクリアされる""" + ctrl = PdControl( + image_params=_center_line_params, + ) + ctrl.compute(straight_line_image) + ctrl.reset() + assert ctrl.last_detect_result is None + assert ctrl._prev_steer == 0.0 + + +class TestPdControl: + """PD 制御のテスト""" + + def test_center_line_small_steer( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """中央の線に対して操舵量が小さい""" + ctrl = PdControl( + image_params=_center_line_params, + ) + output = ctrl.compute(straight_line_image) + assert abs(output.steer) < 0.3 + + def test_left_line_steers_positive( + self, + _center_line_params: ImageParams, + ) -> None: + """左にある線に対して正方向に操舵する""" + h, w = config.FRAME_HEIGHT, config.FRAME_WIDTH + img = np.full((h, w), 200, dtype=np.uint8) + img[:, 5:8] = 30 # 左寄りの線 + + ctrl = PdControl( + params=PdParams( + kp=1.0, kh=0.0, kd=0.0, + max_steer_rate=1.0, + ), + image_params=_center_line_params, + ) + output = ctrl.compute(img) + assert output.steer > 0.0 + + def test_speed_decreases_with_curvature( + self, + _center_line_params: ImageParams, + ) -> None: + """曲率が大きいほど速度が下がる""" + params = PdParams(max_throttle=0.5, speed_k=0.3) + ctrl = PdControl( + params=params, + image_params=_center_line_params, + ) + # 曲がった線を作成 + h, w = config.FRAME_HEIGHT, config.FRAME_WIDTH + img = np.full((h, w), 200, dtype=np.uint8) + for y in range(h): + x = int(w / 2 + 5 * (y / h - 0.5) ** 2 * w) + x = max(0, min(w - 1, x)) + img[y, max(0, x - 1):min(w, x + 2)] = 30 + + output = ctrl.compute(img) + assert output.throttle <= params.max_throttle + + +class TestPursuitControl: + """2点パシュート制御のテスト""" + + def test_center_line_small_steer( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """中央の線に対して操舵量が小さい""" + ctrl = PursuitControl( + image_params=_center_line_params, + ) + output = ctrl.compute(straight_line_image) + assert abs(output.steer) < 0.3 + + def test_no_line_returns_zero( + self, + blank_image: np.ndarray, + ) -> None: + """線が検出できない場合は停止""" + ctrl = PursuitControl() + output = ctrl.compute(blank_image) + assert output.throttle == 0.0 + assert output.steer == 0.0 + + +class TestTsPdControl: + """Theil-Sen PD 制御のテスト""" + + def test_center_line_small_steer( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """中央の線に対して操舵量が小さい""" + ctrl = TsPdControl( + image_params=_center_line_params, + ) + output = ctrl.compute(straight_line_image) + assert abs(output.steer) < 0.3 + + def test_no_line_returns_zero( + self, + blank_image: np.ndarray, + ) -> None: + """線が検出できない場合は停止""" + ctrl = TsPdControl() + output = ctrl.compute(blank_image) + assert output.throttle == 0.0 + assert output.steer == 0.0 + + def test_reset_clears_derivative_state( + self, + straight_line_image: np.ndarray, + _center_line_params: ImageParams, + ) -> None: + """reset で微分項の状態がクリアされる""" + ctrl = TsPdControl( + image_params=_center_line_params, + ) + ctrl.compute(straight_line_image) + ctrl.reset() + assert ctrl._prev_error == 0.0 + assert ctrl._prev_time == 0.0 + + +class TestSafeUpdateDataclass: + """_safe_update_dataclass のテスト + + pi.main は picamera2 に依存するため直接 import できない. + 同等のロジックを common モジュールの dataclass で検証する + """ + + @staticmethod + def _safe_update_dataclass( + target: object, + updates: dict, + ) -> None: + """pi.main._safe_update_dataclass と同等のロジック""" + import dataclasses + field_names = { + f.name for f in dataclasses.fields(target) + } + for key, value in updates.items(): + if key not in field_names: + continue + current = getattr(target, key) + expected = type(current) + if expected is int and isinstance( + value, float, + ): + value = int(value) + elif expected is float and isinstance( + value, int, + ): + value = float(value) + elif not isinstance(value, expected): + continue + setattr(target, key, value) + + def test_updates_valid_fields(self) -> None: + """正しい型のフィールドを更新できる""" + params = PdParams() + self._safe_update_dataclass( + params, {"kp": 2.0}, + ) + assert params.kp == 2.0 + + def test_ignores_unknown_fields(self) -> None: + """存在しないフィールドは無視する""" + params = PdParams() + original_kp = params.kp + self._safe_update_dataclass( + params, {"unknown_field": 999}, + ) + assert params.kp == original_kp + + def test_rejects_wrong_type(self) -> None: + """型が一致しない場合は更新しない""" + params = PdParams() + original_kp = params.kp + self._safe_update_dataclass( + params, {"kp": "not_a_number"}, + ) + assert params.kp == original_kp + + def test_int_to_float_conversion(self) -> None: + """int を float フィールドに渡すと変換される""" + params = PdParams() + self._safe_update_dataclass( + params, {"kp": 2}, + ) + assert params.kp == 2.0 + assert isinstance(params.kp, float) + + def test_float_to_int_conversion(self) -> None: + """float を int フィールドに渡すと変換される""" + params = ImageParams() + self._safe_update_dataclass( + params, {"binary_thresh": 100.0}, + ) + assert params.binary_thresh == 100 + assert isinstance(params.binary_thresh, int)