diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index 9026ee0..c07a330 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -52,7 +52,14 @@ ├── main.py エントリーポイント ├── review.py データ仕分け GUI エントリーポイント ├── gui/ GUI 関連 - │ └── main_window.py メインウィンドウ + │ ├── main_window.py メインウィンドウ + │ └── panels/ パラメータ調整パネル群 + │ ├── collapsible_group_box.py 折りたたみ GroupBox + │ ├── control_param_panel.py 制御パラメータ + │ ├── image_param_panel.py 二値化パラメータ + │ ├── intersection_panel.py 十字路判定 + │ ├── overlay_panel.py デバッグ表示 + │ └── recovery_panel.py コースアウト復帰 ├── comm/ 通信関連 │ └── zmq_client.py ZMQ 送受信 ├── data/ 学習データ収集・仕分け・学習 @@ -71,6 +78,7 @@ ├── line_detector.py 線検出 API(データクラス・手法ディスパッチ) ├── fitting.py 直線・曲線近似(Theil-Sen・RANSAC・外れ値除去) ├── morphology.py 形態学的処理ユーティリティ + ├── intersection.py 十字路分類モデルの推論 ├── overlay.py デバッグオーバーレイ描画 └── detectors/ 検出手法の実装 ├── current.py 現行(CLAHE + 固定閾値) diff --git a/src/pc/comm/zmq_client.py b/src/pc/comm/zmq_client.py index d373c71..f927012 100644 --- a/src/pc/comm/zmq_client.py +++ b/src/pc/comm/zmq_client.py @@ -5,6 +5,7 @@ """ import json +import struct import cv2 import numpy as np @@ -23,6 +24,7 @@ self._context: zmq.Context | None = None self._image_socket: zmq.Socket | None = None self._control_socket: zmq.Socket | None = None + self._last_image_ts: float | None = None def start(self) -> None: """通信ソケットを初期化してバインドする""" @@ -39,7 +41,7 @@ self._control_socket.bind(config.control_bind_address()) def receive_image(self) -> np.ndarray | None: - """画像を非ブロッキングで受信する + """画像をタイムスタンプ付きで非ブロッキング受信する Returns: 受信したグレースケール画像の NumPy 配列,受信データがない場合は None @@ -47,7 +49,16 @@ if self._image_socket is None: return None try: - data = self._image_socket.recv(zmq.NOBLOCK) + raw = self._image_socket.recv(zmq.NOBLOCK) + ts_size = struct.calcsize("d") + if len(raw) > ts_size: + self._last_image_ts = struct.unpack( + "d", raw[:ts_size], + )[0] + data = raw[ts_size:] + else: + self._last_image_ts = None + data = raw frame = cv2.imdecode( np.frombuffer(data, dtype=np.uint8), cv2.IMREAD_GRAYSCALE, @@ -59,7 +70,7 @@ def send_control( self, throttle: float, steer: float, ) -> None: - """操舵量を送信する + """操舵量をタイムスタンプ付きで送信する Args: throttle: 前後方向の出力 (-1.0 ~ +1.0) @@ -67,12 +78,20 @@ """ if self._control_socket is None: return - payload = json.dumps({ + msg: dict = { "throttle": throttle, "steer": steer, - }).encode("utf-8") + } + if self._last_image_ts is not None: + msg["ts"] = self._last_image_ts + payload = json.dumps(msg).encode("utf-8") self._control_socket.send(payload, zmq.NOBLOCK) + @property + def last_image_ts(self) -> float | None: + """最後に受信した画像のタイムスタンプを返す""" + return self._last_image_ts + def stop(self) -> None: """通信ソケットを閉じる""" if self._image_socket is not None: diff --git a/src/pc/gui/main_window.py b/src/pc/gui/main_window.py index 51ba5df..bf91fdf 100644 --- a/src/pc/gui/main_window.py +++ b/src/pc/gui/main_window.py @@ -4,6 +4,8 @@ カメラ映像のリアルタイム表示と操作 UI を提供する """ +import time + import cv2 import numpy as np from PySide6.QtCore import Qt, QTimer @@ -24,6 +26,7 @@ from pc.gui.panels import ( ControlParamPanel, ImageParamPanel, + IntersectionPanel, OverlayPanel, RecoveryPanel, ) @@ -40,7 +43,7 @@ save_recovery, save_ts_pd, ) -from pc.steering.base import SteeringBase +from pc.steering.base import SteeringBase, SteeringOutput from pc.steering.pd_control import PdControl, PdParams from pc.steering.pursuit_control import ( PursuitControl, @@ -60,6 +63,7 @@ LineDetectResult, detect_line, ) +from pc.vision.intersection import IntersectionClassifier from pc.vision.overlay import draw_overlay # 映像更新間隔 (ms) @@ -124,15 +128,27 @@ # 最新フレームの保持(自動操縦で使用) self._latest_frame: np.ndarray | None = None + # 十字路判定結果の保持 + self._is_intersection: bool = False + # 検出結果の保持 self._last_detect_result: LineDetectResult | None = ( None ) + # 十字路分類器 + self._intersection_clf = IntersectionClassifier() + # データ収集 self._collector = DataCollector() self._is_intersection_key: bool = False + # パフォーマンス計測 + self._recv_frame_count: int = 0 + self._recv_fps_start: float = time.time() + self._recv_fps: float = 0.0 + self._last_proc_ms: float = 0.0 + self._setup_ui() self._setup_timers() @@ -173,6 +189,19 @@ " padding: 4px;" ) left_layout.addWidget(self._detect_info_label) + + self._perf_label = QLabel( + "recv FPS: --- proc: ---" + ) + self._perf_label.setAlignment( + Qt.AlignmentFlag.AlignLeft, + ) + self._perf_label.setStyleSheet( + "font-size: 14px; font-family: monospace;" + " color: #ff0; background-color: #222;" + " padding: 4px;" + ) + left_layout.addWidget(self._perf_label) root_layout.addLayout(left_layout, stretch=3) # 右側: スクロール可能なコントロールパネル @@ -230,6 +259,12 @@ ) control_layout.addWidget(self._image_panel) + # 十字路判定パネル + self._intersection_panel = IntersectionPanel( + available=self._intersection_clf.available, + ) + control_layout.addWidget(self._intersection_panel) + # 制御パラメータパネル self._control_panel = ControlParamPanel( self._pd_control.params, @@ -504,8 +539,19 @@ frame = self._zmq_client.receive_image() if frame is None: return + proc_start = time.time() self._latest_frame = frame + # 受信 FPS 計測 + self._recv_frame_count += 1 + elapsed = time.time() - self._recv_fps_start + if elapsed >= 1.0: + self._recv_fps = ( + self._recv_frame_count / elapsed + ) + self._recv_frame_count = 0 + self._recv_fps_start = time.time() + # 線検出は常に実行(検出情報ラベル表示のため) if self._is_auto: ctrl = self._active_control @@ -514,8 +560,29 @@ ctrl.last_detect_result ) - # コースアウト復帰判定 + # 十字路判定: 十字路ならまっすぐ進む det = self._last_detect_result + self._is_intersection = False + if ( + self._intersection_panel.enabled + and self._intersection_clf.available + and det is not None + and det.binary_image is not None + ): + self._is_intersection = ( + self._intersection_clf.predict( + det.binary_image, + ) + ) + if self._is_intersection: + output = SteeringOutput( + throttle=( + self._intersection_panel.throttle + ), + steer=0.0, + ) + + # コースアウト復帰判定 detected = det is not None and det.detected pos_err = ( det.position_error @@ -528,10 +595,18 @@ if recovery_output is not None: output = recovery_output + # 十字路パネルの表示更新 + if self._intersection_panel.enabled: + self._intersection_panel.update_result( + self._is_intersection, + ) + self._throttle = output.throttle self._steer = output.steer self._update_control_label() else: + self._is_intersection = False + self._intersection_panel.clear_result() self._last_detect_result = detect_line( frame, self._pd_control.image_params, @@ -549,6 +624,15 @@ self._display_frame(frame) + # PC 処理時間を計測・表示 + self._last_proc_ms = ( + (time.time() - proc_start) * 1000 + ) + self._perf_label.setText( + f"recv FPS: {self._recv_fps:.1f}" + f" proc: {self._last_proc_ms:.1f}ms" + ) + def _update_detect_info_label(self) -> None: """検出情報ラベルを更新する""" r = self._last_detect_result @@ -668,6 +752,7 @@ bgr, self._last_detect_result, self._overlay_panel.get_flags(), pursuit_points=pursuit_pts, + is_intersection=self._is_intersection, ) # 検出情報をラベルに表示 @@ -784,6 +869,8 @@ f"throttle: {self._throttle:+.2f}\n" f"steer: {self._steer:+.2f}" ) + if self._is_auto and self._is_intersection: + text += "\n[十字路]" if self._is_auto and self._recovery.is_recovering: text += "\n[復帰中]" self._control_label.setText(text) diff --git a/src/pc/gui/panels/__init__.py b/src/pc/gui/panels/__init__.py index 9814afe..f9c8691 100644 --- a/src/pc/gui/panels/__init__.py +++ b/src/pc/gui/panels/__init__.py @@ -8,6 +8,7 @@ ) from pc.gui.panels.control_param_panel import ControlParamPanel from pc.gui.panels.image_param_panel import ImageParamPanel +from pc.gui.panels.intersection_panel import IntersectionPanel from pc.gui.panels.overlay_panel import OverlayPanel from pc.gui.panels.recovery_panel import RecoveryPanel @@ -15,6 +16,7 @@ "CollapsibleGroupBox", "ControlParamPanel", "ImageParamPanel", + "IntersectionPanel", "OverlayPanel", "RecoveryPanel", ] diff --git a/src/pc/gui/panels/intersection_panel.py b/src/pc/gui/panels/intersection_panel.py new file mode 100644 index 0000000..2363746 --- /dev/null +++ b/src/pc/gui/panels/intersection_panel.py @@ -0,0 +1,109 @@ +""" +intersection_panel +十字路判定の有効/無効を切り替える UI パネル +""" + +from PySide6.QtCore import Signal +from PySide6.QtWidgets import ( + QCheckBox, + QDoubleSpinBox, + QFormLayout, + QLabel, + QVBoxLayout, +) + +from pc.gui.panels.collapsible_group_box import ( + CollapsibleGroupBox, +) + + +class IntersectionPanel(CollapsibleGroupBox): + """十字路判定の切替 UI""" + + enabled_changed = Signal(bool) + + def __init__(self, available: bool = False) -> None: + super().__init__("十字路判定") + self._available = available + self._setup_ui() + + def _setup_ui(self) -> None: + """UI を構築する""" + layout = QVBoxLayout() + self.setLayout(layout) + + # 有効/無効チェックボックス + self._enabled_cb = QCheckBox("十字路判定を有効にする") + self._enabled_cb.setChecked(self._available) + self._enabled_cb.setEnabled(self._available) + self._enabled_cb.toggled.connect( + self._on_toggled, + ) + layout.addWidget(self._enabled_cb) + + # モデル状態の表示 + if self._available: + status_text = "モデル: 読み込み済み" + else: + status_text = "モデル: 未学習(params/ にありません)" + self._status_label = QLabel(status_text) + self._status_label.setStyleSheet( + "font-size: 11px; color: #888;" + ) + layout.addWidget(self._status_label) + + # 十字路時の速度 + form = QFormLayout() + layout.addLayout(form) + + self._spin_throttle = QDoubleSpinBox() + self._spin_throttle.setRange(0.0, 1.0) + self._spin_throttle.setSingleStep(0.05) + self._spin_throttle.setDecimals(2) + self._spin_throttle.setValue(0.4) + form.addRow("十字路速度:", self._spin_throttle) + + # 判定結果の表示 + self._result_label = QLabel("") + self._result_label.setStyleSheet( + "font-size: 13px; font-weight: bold;" + " padding: 2px;" + ) + layout.addWidget(self._result_label) + + @property + def enabled(self) -> bool: + """十字路判定が有効かどうか""" + return self._enabled_cb.isChecked() + + @property + def throttle(self) -> float: + """十字路時の速度""" + return self._spin_throttle.value() + + def update_result(self, is_intersection: bool) -> None: + """判定結果の表示を更新する + + Args: + is_intersection: 十字路と判定されたか + """ + if is_intersection: + self._result_label.setText("判定: 十字路(直進)") + self._result_label.setStyleSheet( + "font-size: 13px; font-weight: bold;" + " color: #f44; padding: 2px;" + ) + else: + self._result_label.setText("判定: 通常") + self._result_label.setStyleSheet( + "font-size: 13px; font-weight: bold;" + " color: #4f4; padding: 2px;" + ) + + def clear_result(self) -> None: + """判定結果の表示をクリアする""" + self._result_label.setText("") + + def _on_toggled(self, checked: bool) -> None: + """チェックボックスの切替をシグナルで通知する""" + self.enabled_changed.emit(checked) diff --git a/src/pc/steering/recovery.py b/src/pc/steering/recovery.py index dfa88d9..31bfcb5 100644 --- a/src/pc/steering/recovery.py +++ b/src/pc/steering/recovery.py @@ -93,10 +93,9 @@ return None # 復帰モード: 最後に検出した方向へ旋回 - # position_error > 0(線が左)→ 左へ旋回(steer < 0) self._is_recovering = True steer = ( - -self._last_error_sign + self._last_error_sign * self.params.steer_amount ) return SteeringOutput( diff --git a/src/pc/steering/ts_pd_control.py b/src/pc/steering/ts_pd_control.py index 67f3bc5..e940694 100644 --- a/src/pc/steering/ts_pd_control.py +++ b/src/pc/steering/ts_pd_control.py @@ -114,8 +114,7 @@ position_error = (center_x - bottom_x) / center_x # 操舵量: P 項(位置偏差)+ Heading 項(傾き) - # 符号反転: 偏差正(線が左)→ steer 負(左へ曲がる) - error = -(p.kp * position_error + p.kh * slope) + error = p.kp * position_error + p.kh * slope # 時間差分の計算 now = time.time() diff --git a/src/pc/vision/intersection.py b/src/pc/vision/intersection.py new file mode 100644 index 0000000..3786630 --- /dev/null +++ b/src/pc/vision/intersection.py @@ -0,0 +1,65 @@ +""" +intersection +十字路分類モデルの読み込みと推論を行うモジュール + +学習済みモデルとスケーラを読み込み, +二値画像から十字路かどうかを判定する +""" + +from pathlib import Path + +import joblib +import numpy as np + +from common.json_utils import PARAMS_DIR + +# モデル・スケーラの保存先 +_MODEL_PATH: Path = PARAMS_DIR / "intersection_model.pkl" +_SCALER_PATH: Path = PARAMS_DIR / "intersection_scaler.pkl" + + +class IntersectionClassifier: + """十字路分類器 + + 学習済みモデルを読み込み,二値画像から + 十字路かどうかを判定する + """ + + def __init__(self) -> None: + self._model: object | None = None + self._scaler: object | None = None + self._available: bool = False + self._load() + + def _load(self) -> None: + """モデルとスケーラを読み込む""" + if not _MODEL_PATH.exists(): + return + if not _SCALER_PATH.exists(): + return + self._model = joblib.load(_MODEL_PATH) + self._scaler = joblib.load(_SCALER_PATH) + self._available = True + + @property + def available(self) -> bool: + """モデルが利用可能かどうか""" + return self._available + + def predict(self, binary_image: np.ndarray) -> bool: + """二値画像が十字路かどうかを判定する + + Args: + binary_image: 40×30 の二値画像(0/255) + + Returns: + 十字路なら True + """ + if not self._available: + return False + flat = (binary_image.flatten() / 255.0).astype( + np.float32, + ) + x = self._scaler.transform(flat.reshape(1, -1)) + pred = self._model.predict(x) + return bool(pred[0] == 1) diff --git a/src/pc/vision/overlay.py b/src/pc/vision/overlay.py index f5bb0a5..a9da75b 100644 --- a/src/pc/vision/overlay.py +++ b/src/pc/vision/overlay.py @@ -19,6 +19,7 @@ COLOR_ROW_CENTER: tuple = (0, 165, 255) COLOR_THEIL_SEN: tuple = (255, 0, 255) COLOR_PURSUIT: tuple = (255, 255, 0) +COLOR_INTERSECTION: tuple = (0, 0, 255) # パシュート目標点の描画半径 PURSUIT_POINT_RADIUS: int = 2 @@ -57,6 +58,7 @@ tuple[tuple[float, float], tuple[float, float]] | None ) = None, + is_intersection: bool = False, ) -> np.ndarray: """カメラ映像にオーバーレイを描画する @@ -66,6 +68,7 @@ flags: 表示項目のフラグ pursuit_points: 2点パシュートの目標点 ((near_x, near_y), (far_x, far_y)) + is_intersection: 十字路と判定されているか Returns: オーバーレイ描画済みの画像 @@ -73,6 +76,13 @@ display = frame.copy() h, w = display.shape[:2] + # 十字路判定の表示 + if is_intersection: + cv2.rectangle( + display, (0, 0), (w - 1, h - 1), + COLOR_INTERSECTION, 1, + ) + if result is None: return display diff --git a/src/pi/comm/zmq_client.py b/src/pi/comm/zmq_client.py index f553511..93aa4dd 100644 --- a/src/pi/comm/zmq_client.py +++ b/src/pi/comm/zmq_client.py @@ -5,6 +5,7 @@ """ import json +import struct import time import cv2 @@ -25,6 +26,7 @@ self._image_socket: zmq.Socket | None = None self._control_socket: zmq.Socket | None = None self._last_receive_time: float = 0.0 + self._last_rtt: float | None = None def start(self) -> None: """通信ソケットを初期化して接続する""" @@ -43,7 +45,7 @@ self._last_receive_time = time.time() def send_image(self, frame: np.ndarray) -> None: - """画像を JPEG 圧縮して送信する + """画像を JPEG 圧縮してタイムスタンプ付きで送信する Args: frame: カメラから取得した画像の NumPy 配列 @@ -55,9 +57,14 @@ frame, [cv2.IMWRITE_JPEG_QUALITY, config.JPEG_QUALITY], ) - self._image_socket.send(encoded.tobytes(), zmq.NOBLOCK) + ts_bytes = struct.pack("d", time.time()) + self._image_socket.send( + ts_bytes + encoded.tobytes(), zmq.NOBLOCK, + ) - def receive_control(self) -> tuple[float, float] | None: + def receive_control( + self, + ) -> tuple[float, float] | None: """操舵量を非ブロッキングで受信する Returns: @@ -69,10 +76,21 @@ data = self._control_socket.recv(zmq.NOBLOCK) payload = json.loads(data.decode("utf-8")) self._last_receive_time = time.time() + + # ラウンドトリップ計測 + if "ts" in payload: + rtt = time.time() - payload["ts"] + self._last_rtt = rtt + return (payload["throttle"], payload["steer"]) except zmq.Again: return None + @property + def last_rtt(self) -> float | None: + """最後に計測したラウンドトリップ時間(秒)を返す""" + return self._last_rtt + def is_timeout(self) -> bool: """操舵量の受信がタイムアウトしたか判定する diff --git a/src/pi/main.py b/src/pi/main.py index 17b7630..129fe8a 100644 --- a/src/pi/main.py +++ b/src/pi/main.py @@ -23,10 +23,16 @@ motor.start() print("Pi: カメラ・通信・モーターを開始") + # FPS / RTT 計測用 + frame_count = 0 + fps_start = time.time() + LOG_INTERVAL_SEC = 3.0 + while True: # カメラ画像を取得して送信 frame = camera.capture() zmq_client.send_image(frame) + frame_count += 1 # 操舵量を受信してモーターに反映 control = zmq_client.receive_control() @@ -38,6 +44,22 @@ if zmq_client.is_timeout(): motor.stop() + # 定期的に FPS と RTT を表示 + elapsed = time.time() - fps_start + if elapsed >= LOG_INTERVAL_SEC: + fps = frame_count / elapsed + rtt = zmq_client.last_rtt + rtt_ms = ( + f"{rtt * 1000:.1f}ms" + if rtt is not None else "---" + ) + print( + f"Pi: カメラ FPS={fps:.1f}" + f" RTT={rtt_ms}" + ) + frame_count = 0 + fps_start = time.time() + time.sleep(0.01) except KeyboardInterrupt: