diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index a3394c0..f0efba8 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -73,7 +73,10 @@ ├── main.py エントリーポイント ├── review.py データ仕分け GUI エントリーポイント ├── gui/ GUI 関連 - │ ├── main_window.py メインウィンドウ(モニタリング専用) + │ ├── main_window.py メインウィンドウ(レイアウト・ライフサイクル管理) + │ ├── telemetry_display.py テレメトリ受信・映像表示・オーバーレイ + │ ├── command_sender.py コマンド構築・dirty 管理・ZMQ 送信 + │ ├── manual_controller.py キー入力 → throttle/steer 変換 │ └── panels/ パラメータ調整パネル群 │ ├── collapsible_group_box.py 折りたたみ GroupBox │ ├── control_param_panel.py 制御パラメータ diff --git a/src/pc/gui/command_sender.py b/src/pc/gui/command_sender.py new file mode 100644 index 0000000..dc48b87 --- /dev/null +++ b/src/pc/gui/command_sender.py @@ -0,0 +1,110 @@ +""" +command_sender +パラメータの dirty 管理・コマンド辞書構築・ZMQ 送信を担当するモジュール +""" + +import dataclasses + +from common.steering.pd_control import PdParams +from common.steering.pursuit_control import PursuitParams +from common.steering.recovery import RecoveryParams +from common.steering.ts_pd_control import TsPdParams +from common.vision.line_detector import ImageParams +from pc.comm.zmq_client import PcZmqClient + + +class CommandSender: + """コマンドを構築して Pi に送信する + + パラメータの変更フラグを管理し, + モード・操舵量・パラメータをまとめて送信する + """ + + def __init__( + self, + zmq_client: PcZmqClient, + ) -> None: + self._zmq_client = zmq_client + self.params_dirty: bool = True + + def send( + self, + *, + is_auto: bool, + throttle: float, + steer: float, + intersection_enabled: bool, + intersection_throttle: float, + steering_method: str, + image_params: ImageParams, + pd_params: PdParams, + pursuit_params: PursuitParams, + ts_pd_params: TsPdParams, + recovery_params: RecoveryParams, + ) -> None: + """コマンドを構築して送信する + + Args: + is_auto: 自動操縦中か + throttle: 手動操作の throttle + steer: 手動操作の steer + intersection_enabled: 十字路判定の有効化 + intersection_throttle: 十字路通過時の throttle + steering_method: 制御手法名 + image_params: 二値化パラメータ + pd_params: PD 制御パラメータ + pursuit_params: Pursuit 制御パラメータ + ts_pd_params: Theil-Sen PD パラメータ + recovery_params: 復帰パラメータ + """ + cmd: dict = {} + + # モード + if is_auto: + cmd["mode"] = "auto" + elif throttle != 0.0 or steer != 0.0: + cmd["mode"] = "manual" + cmd["throttle"] = throttle + cmd["steer"] = steer + else: + cmd["mode"] = "stop" + + # 十字路設定 + cmd["intersection_enabled"] = ( + intersection_enabled + ) + cmd["intersection_throttle"] = ( + intersection_throttle + ) + + # 制御手法 + cmd["steering_method"] = steering_method + + # パラメータ更新(変更があった場合のみ) + if self.params_dirty: + cmd["image_params"] = dataclasses.asdict( + image_params, + ) + cmd["pd_params"] = dataclasses.asdict( + pd_params, + ) + cmd["pursuit_params"] = dataclasses.asdict( + pursuit_params, + ) + cmd["steering_params"] = dataclasses.asdict( + ts_pd_params, + ) + cmd["recovery_params"] = dataclasses.asdict( + recovery_params, + ) + self.params_dirty = False + + self._zmq_client.send_command(cmd) + + def send_stop(self) -> None: + """停止コマンドを送信する""" + self._zmq_client.send_command({"mode": "stop"}) + + def mark_dirty(self) -> None: + """パラメータ変更フラグを立てる""" + self.params_dirty = True diff --git a/src/pc/gui/main_window.py b/src/pc/gui/main_window.py index f37c078..6bd526e 100644 --- a/src/pc/gui/main_window.py +++ b/src/pc/gui/main_window.py @@ -5,13 +5,8 @@ モード切替・パラメータ調整・手動操作のコマンドを送信する """ -import dataclasses -import time - -import cv2 -import numpy as np from PySide6.QtCore import Qt, QTimer -from PySide6.QtGui import QImage, QKeyEvent, QPixmap +from PySide6.QtGui import QKeyEvent from PySide6.QtWidgets import ( QHBoxLayout, QLabel, @@ -24,6 +19,8 @@ from common import config from pc.comm.zmq_client import PcZmqClient +from pc.gui.command_sender import CommandSender +from pc.gui.manual_controller import ManualController from pc.gui.panels import ( ControlParamPanel, ImageParamPanel, @@ -31,6 +28,7 @@ OverlayPanel, RecoveryPanel, ) +from pc.gui.telemetry_display import TelemetryDisplay from pc.steering.auto_params import ( load_control, load_detect_params, @@ -48,11 +46,7 @@ from common.steering.pursuit_control import PursuitParams from common.steering.recovery import RecoveryParams from common.steering.ts_pd_control import TsPdParams -from common.vision.line_detector import ( - ImageParams, - LineDetectResult, -) -from pc.vision.overlay import draw_overlay +from common.vision.line_detector import ImageParams # 映像更新間隔 (ms) FRAME_INTERVAL_MS: int = 33 @@ -65,10 +59,6 @@ # 映像表示のスケール倍率(40x30 → 640x480 相当) DISPLAY_SCALE: float = config.DISPLAY_SCALE -# 手動操作の throttle / steer 量 -MANUAL_THROTTLE: float = 0.5 -MANUAL_STEER: float = 0.4 - class MainWindow(QMainWindow): """PC 側のメインウィンドウ @@ -83,20 +73,13 @@ self._is_connected = False self._is_auto = False - # 手動操作の状態 - self._pressed_keys: set[int] = set() - self._throttle: float = 0.0 - self._steer: float = 0.0 + # 分割されたコンポーネント + self._manual = ManualController() + self._cmd_sender = CommandSender( + self._zmq_client, + ) - # テレメトリから受け取る状態 - self._pi_detected: bool = False - self._pi_pos_error: float = 0.0 - self._pi_heading: float = 0.0 - self._pi_is_intersection: bool = False - self._pi_is_recovering: bool = False - self._pi_fps: float = 0.0 - - # 前回のパラメータを復元(GUI 表示・コマンド送信用) + # 前回のパラメータを復元 pd_params, last_method, last_steering = ( load_control() ) @@ -113,18 +96,16 @@ recovery_params = load_recovery() self._recovery_params = recovery_params - # 最新のバイナリ画像(テレメトリから) - self._latest_binary: np.ndarray | None = None - - # パフォーマンス計測 - self._recv_frame_count: int = 0 - self._recv_fps_start: float = time.time() - self._recv_fps: float = 0.0 - - # パラメータ変更フラグ(次のコマンド送信で送る) - self._params_dirty: bool = True - self._setup_ui() + + # TelemetryDisplay はウィジェット生成後に初期化 + self._telemetry = TelemetryDisplay( + self._zmq_client, + self._video_label, + self._detect_info_label, + self._perf_label, + ) + self._setup_timers() def _setup_ui(self) -> None: @@ -295,7 +276,9 @@ def _setup_timers(self) -> None: """タイマーを設定する""" self._frame_timer = QTimer(self) - self._frame_timer.timeout.connect(self._update_frame) + self._frame_timer.timeout.connect( + self._update_frame, + ) self._command_timer = QTimer(self) self._command_timer.timeout.connect( @@ -309,7 +292,7 @@ ) -> None: """二値化パラメータの変更をマークする""" self._image_params = ip - self._params_dirty = True + self._cmd_sender.mark_dirty() def _on_method_changed(self, method: str) -> None: """検出手法の変更を保存する""" @@ -325,7 +308,7 @@ p, self._image_params.method, self._steering_method, ) - self._params_dirty = True + self._cmd_sender.mark_dirty() def _on_pursuit_params_changed( self, p: PursuitParams, @@ -333,7 +316,7 @@ """Pursuit パラメータの変更を保存・マークする""" self._pursuit_params = p save_pursuit(p) - self._params_dirty = True + self._cmd_sender.mark_dirty() def _on_ts_pd_params_changed( self, p: TsPdParams, @@ -341,7 +324,7 @@ """Theil-Sen PD パラメータの変更を保存・マークする""" self._ts_pd_params = p save_ts_pd(p) - self._params_dirty = True + self._cmd_sender.mark_dirty() def _on_recovery_params_changed( self, p: RecoveryParams, @@ -349,7 +332,7 @@ """復帰パラメータの変更を保存・マークする""" self._recovery_params = p save_recovery(p) - self._params_dirty = True + self._cmd_sender.mark_dirty() def _on_overlay_flags_changed(self) -> None: """オーバーレイフラグの変更を保存する""" @@ -365,7 +348,7 @@ self._image_params.method, method, ) - self._params_dirty = True + self._cmd_sender.mark_dirty() # ── 接続 ────────────────────────────────────────────── @@ -380,14 +363,14 @@ """ZMQ 通信を開始してテレメトリ受信を始める""" self._zmq_client.start() self._is_connected = True - self._params_dirty = True + self._cmd_sender.mark_dirty() self._connect_btn.setText("切断") self._auto_btn.setEnabled(True) self._status_label.setText("接続中 (手動操作)") self._frame_timer.start(FRAME_INTERVAL_MS) self._command_timer.start(COMMAND_INTERVAL_MS) # 初回接続時に stop コマンドを送信 - self._zmq_client.send_command({"mode": "stop"}) + self._cmd_sender.send_stop() def _disconnect(self) -> None: """ZMQ 通信を停止する""" @@ -395,16 +378,14 @@ self._command_timer.stop() # Pi を停止 if self._is_connected: - self._zmq_client.send_command({"mode": "stop"}) + self._cmd_sender.send_stop() if self._is_auto: self._is_auto = False self._auto_btn.setText("自動操縦 ON") self._zmq_client.stop() self._is_connected = False self._auto_btn.setEnabled(False) - self._pressed_keys.clear() - self._throttle = 0.0 - self._steer = 0.0 + self._manual.reset() self._connect_btn.setText("接続開始") self._status_label.setText("未接続") self._video_label.setText("カメラ映像待機中...") @@ -424,17 +405,15 @@ def _enable_auto(self) -> None: """自動操縦を開始する""" self._is_auto = True - self._pressed_keys.clear() + self._manual.reset() self._auto_btn.setText("自動操縦 OFF") self._status_label.setText("接続中 (自動操縦)") - # Pi にパラメータ付きで auto コマンドを送信 - self._params_dirty = True + self._cmd_sender.mark_dirty() def _disable_auto(self) -> None: """自動操縦を停止して手動に戻る""" self._is_auto = False - self._throttle = 0.0 - self._steer = 0.0 + self._manual.reset() self._auto_btn.setText("自動操縦 ON") self._status_label.setText("接続中 (手動操作)") self._update_control_label() @@ -443,125 +422,26 @@ def _update_frame(self) -> None: """タイマーから呼び出され,テレメトリを受信して表示する""" - result = self._zmq_client.receive_telemetry() - if result is None: + received = self._telemetry.update( + self._overlay_panel.get_flags(), + ) + if not received: return - telemetry, frame, binary = result - - # テレメトリから状態を取得 - self._pi_detected = telemetry.get( - "detected", False, - ) - self._pi_pos_error = telemetry.get( - "pos_error", 0.0, - ) - self._pi_heading = telemetry.get("heading", 0.0) - self._pi_is_intersection = telemetry.get( - "is_intersection", False, - ) - self._pi_is_recovering = telemetry.get( - "is_recovering", False, - ) - self._pi_fps = telemetry.get("fps", 0.0) + state = self._telemetry.state # 自動時は Pi の操舵量を表示 if self._is_auto: - self._throttle = telemetry.get( - "throttle", 0.0, - ) - self._steer = telemetry.get("steer", 0.0) self._update_control_label() # 十字路パネルの表示更新 if self._intersection_panel.enabled: self._intersection_panel.update_result( - self._pi_is_intersection, + state.is_intersection, ) else: self._intersection_panel.clear_result() - self._latest_binary = binary - - # 受信 FPS 計測 - self._recv_frame_count += 1 - elapsed = time.time() - self._recv_fps_start - if elapsed >= 1.0: - self._recv_fps = ( - self._recv_frame_count / elapsed - ) - self._recv_frame_count = 0 - self._recv_fps_start = time.time() - - # 検出情報表示 - self._update_detect_info_label() - - # パフォーマンス表示 - self._perf_label.setText( - f"recv FPS: {self._recv_fps:.1f}" - f" Pi FPS: {self._pi_fps:.1f}" - ) - - self._display_frame(frame) - - def _update_detect_info_label(self) -> None: - """検出情報ラベルを更新する""" - if not self._pi_detected: - self._detect_info_label.setText( - "pos: --- head: ---" - ) - return - self._detect_info_label.setText( - f"pos: {self._pi_pos_error:+.3f}" - f" head: {self._pi_heading:+.4f}" - ) - - def _display_frame(self, frame: np.ndarray) -> None: - """NumPy 配列の画像を QLabel に表示する - - Args: - frame: グレースケールの画像 - """ - # グレースケール → BGR 変換(カラーオーバーレイ描画のため) - bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) - - # テレメトリから簡易的な LineDetectResult を構築して - # オーバーレイ描画に渡す - detect_result = None - if self._pi_detected and self._latest_binary is not None: - detect_result = LineDetectResult( - detected=True, - position_error=self._pi_pos_error, - heading=self._pi_heading, - curvature=0.0, - poly_coeffs=None, - row_centers=None, - binary_image=self._latest_binary, - ) - - bgr = draw_overlay( - bgr, detect_result, - self._overlay_panel.get_flags(), - is_intersection=self._pi_is_intersection, - ) - - # BGR → RGB 変換 - rgb = bgr[:, :, ::-1].copy() - h, w, ch = rgb.shape - image = QImage( - rgb.data, w, h, ch * w, - QImage.Format.Format_RGB888, - ) - disp_w = int(config.FRAME_WIDTH * DISPLAY_SCALE) - disp_h = int(config.FRAME_HEIGHT * DISPLAY_SCALE) - pixmap = QPixmap.fromImage(image).scaled( - disp_w, - disp_h, - Qt.AspectRatioMode.KeepAspectRatio, - Qt.TransformationMode.SmoothTransformation, - ) - self._video_label.setPixmap(pixmap) - # ── 手動操作 ────────────────────────────────────────── def keyPressEvent(self, event: QKeyEvent) -> None: @@ -579,129 +459,74 @@ if self._is_auto: return - self._pressed_keys.add(event.key()) - self._update_manual_control() + if self._manual.handle_key_press(event): + if self._manual.is_emergency_stop(): + if self._is_auto: + self._disable_auto() + self._update_control_label() def keyReleaseEvent(self, event: QKeyEvent) -> None: """キー離上時に操舵量を更新する""" if event.isAutoRepeat(): return - if self._is_auto: return - self._pressed_keys.discard(event.key()) - self._update_manual_control() - - def _update_manual_control(self) -> None: - """押下中のキーから throttle と steer を計算する""" - keys = self._pressed_keys - - # Space で緊急停止 - if Qt.Key.Key_Space in keys: - self._throttle = 0.0 - self._steer = 0.0 - self._pressed_keys.clear() - if self._is_auto: - self._disable_auto() + if self._manual.handle_key_release(event): self._update_control_label() - return - - # throttle: W/↑ で前進,S/↓ で後退 - forward = ( - Qt.Key.Key_W in keys or Qt.Key.Key_Up in keys - ) - backward = ( - Qt.Key.Key_S in keys - or Qt.Key.Key_Down in keys - ) - if forward and not backward: - self._throttle = MANUAL_THROTTLE - elif backward and not forward: - self._throttle = -MANUAL_THROTTLE - else: - self._throttle = 0.0 - - # steer: A/← で左,D/→ で右 - left = ( - Qt.Key.Key_A in keys - or Qt.Key.Key_Left in keys - ) - right = ( - Qt.Key.Key_D in keys - or Qt.Key.Key_Right in keys - ) - if left and not right: - self._steer = -MANUAL_STEER - elif right and not left: - self._steer = MANUAL_STEER - else: - self._steer = 0.0 - - self._update_control_label() def _update_control_label(self) -> None: """操舵量の表示を更新する""" + if self._is_auto: + state = self._telemetry.state + throttle = state.throttle + steer = state.steer + else: + throttle = self._manual.throttle + steer = self._manual.steer + text = ( - f"throttle: {self._throttle:+.2f}\n" - f"steer: {self._steer:+.2f}" + f"throttle: {throttle:+.2f}\n" + f"steer: {steer:+.2f}" ) - if self._is_auto and self._pi_is_intersection: - text += "\n[十字路]" - if self._is_auto and self._pi_is_recovering: - text += "\n[復帰中]" + if self._is_auto: + state = self._telemetry.state + if state.is_intersection: + text += "\n[十字路]" + if state.is_recovering: + text += "\n[復帰中]" self._control_label.setText(text) + # ── コマンド送信 ────────────────────────────────────── + def _send_command(self) -> None: """コマンドを Pi に送信する""" if not self._is_connected: return - cmd: dict = {} - - # モード if self._is_auto: - cmd["mode"] = "auto" - elif ( - self._throttle != 0.0 - or self._steer != 0.0 - ): - cmd["mode"] = "manual" - cmd["throttle"] = self._throttle - cmd["steer"] = self._steer + throttle = 0.0 + steer = 0.0 else: - cmd["mode"] = "stop" + throttle = self._manual.throttle + steer = self._manual.steer - # 十字路設定 - cmd["intersection_enabled"] = ( - self._intersection_panel.enabled + self._cmd_sender.send( + is_auto=self._is_auto, + throttle=throttle, + steer=steer, + intersection_enabled=( + self._intersection_panel.enabled + ), + intersection_throttle=( + self._intersection_panel.throttle + ), + steering_method=self._steering_method, + image_params=self._image_params, + pd_params=self._pd_params, + pursuit_params=self._pursuit_params, + ts_pd_params=self._ts_pd_params, + recovery_params=self._recovery_params, ) - cmd["intersection_throttle"] = ( - self._intersection_panel.throttle - ) - - # 制御手法 - cmd["steering_method"] = self._steering_method - - # パラメータ更新(変更があった場合のみ) - if self._params_dirty: - cmd["image_params"] = dataclasses.asdict( - self._image_params, - ) - cmd["pd_params"] = dataclasses.asdict( - self._pd_params, - ) - cmd["pursuit_params"] = dataclasses.asdict( - self._pursuit_params, - ) - cmd["steering_params"] = dataclasses.asdict( - self._ts_pd_params, - ) - cmd["recovery_params"] = dataclasses.asdict( - self._recovery_params, - ) - self._params_dirty = False - - self._zmq_client.send_command(cmd) def closeEvent(self, event) -> None: """ウィンドウを閉じるときに通信を停止する""" diff --git a/src/pc/gui/manual_controller.py b/src/pc/gui/manual_controller.py new file mode 100644 index 0000000..cdeeed8 --- /dev/null +++ b/src/pc/gui/manual_controller.py @@ -0,0 +1,113 @@ +""" +manual_controller +キー入力から手動操作の throttle / steer を計算するモジュール +""" + +from PySide6.QtCore import Qt +from PySide6.QtGui import QKeyEvent + +# 手動操作の throttle / steer 量 +MANUAL_THROTTLE: float = 0.5 +MANUAL_STEER: float = 0.4 + + +class ManualController: + """キー入力を throttle / steer に変換する + + Qt のキーイベントを受け取り, + 押下中のキーセットから操舵量を計算する + """ + + def __init__(self) -> None: + self._pressed_keys: set[int] = set() + self.throttle: float = 0.0 + self.steer: float = 0.0 + + def reset(self) -> None: + """状態をリセットする""" + self._pressed_keys.clear() + self.throttle = 0.0 + self.steer = 0.0 + + def handle_key_press(self, event: QKeyEvent) -> bool: + """キー押下を処理する + + Args: + event: Qt のキーイベント + + Returns: + 操舵量が更新された場合 True + """ + if event.isAutoRepeat(): + return False + self._pressed_keys.add(event.key()) + self._update() + return True + + def handle_key_release(self, event: QKeyEvent) -> bool: + """キー離上を処理する + + Args: + event: Qt のキーイベント + + Returns: + 操舵量が更新された場合 True + """ + if event.isAutoRepeat(): + return False + self._pressed_keys.discard(event.key()) + self._update() + return True + + def is_emergency_stop(self) -> bool: + """Space キーによる緊急停止が発生したか判定する + + 緊急停止の場合は状態をリセットする + + Returns: + 緊急停止が発生した場合 True + """ + if Qt.Key.Key_Space in self._pressed_keys: + self.reset() + return True + return False + + def _update(self) -> None: + """押下中のキーから throttle と steer を計算する""" + keys = self._pressed_keys + + # Space で緊急停止 + if Qt.Key.Key_Space in keys: + self.reset() + return + + # throttle: W/↑ で前進,S/↓ で後退 + forward = ( + Qt.Key.Key_W in keys or Qt.Key.Key_Up in keys + ) + backward = ( + Qt.Key.Key_S in keys + or Qt.Key.Key_Down in keys + ) + if forward and not backward: + self.throttle = MANUAL_THROTTLE + elif backward and not forward: + self.throttle = -MANUAL_THROTTLE + else: + self.throttle = 0.0 + + # steer: A/← で左,D/→ で右 + left = ( + Qt.Key.Key_A in keys + or Qt.Key.Key_Left in keys + ) + right = ( + Qt.Key.Key_D in keys + or Qt.Key.Key_Right in keys + ) + if left and not right: + self.steer = -MANUAL_STEER + elif right and not left: + self.steer = MANUAL_STEER + else: + self.steer = 0.0 diff --git a/src/pc/gui/telemetry_display.py b/src/pc/gui/telemetry_display.py new file mode 100644 index 0000000..e3a138b --- /dev/null +++ b/src/pc/gui/telemetry_display.py @@ -0,0 +1,196 @@ +""" +telemetry_display +テレメトリ受信・状態抽出・映像表示・オーバーレイ描画を担当するモジュール +""" + +import time +from dataclasses import dataclass + +import cv2 +import numpy as np +from PySide6.QtCore import Qt +from PySide6.QtGui import QImage, QPixmap +from PySide6.QtWidgets import QLabel + +from common import config +from common.vision.line_detector import LineDetectResult +from pc.comm.zmq_client import PcZmqClient +from pc.vision.overlay import OverlayFlags, draw_overlay + +# 映像表示のスケール倍率 +DISPLAY_SCALE: float = config.DISPLAY_SCALE + + +@dataclass +class TelemetryState: + """テレメトリから受信した Pi 側の状態""" + + detected: bool = False + pos_error: float = 0.0 + heading: float = 0.0 + is_intersection: bool = False + is_recovering: bool = False + fps: float = 0.0 + throttle: float = 0.0 + steer: float = 0.0 + + +class TelemetryDisplay: + """テレメトリを受信して映像・状態を表示する + + PcZmqClient からテレメトリを受信し, + 状態を抽出してオーバーレイ付き映像を QLabel に表示する + """ + + def __init__( + self, + zmq_client: PcZmqClient, + video_label: QLabel, + detect_info_label: QLabel, + perf_label: QLabel, + ) -> None: + self._zmq_client = zmq_client + self._video_label = video_label + self._detect_info_label = detect_info_label + self._perf_label = perf_label + + self.state = TelemetryState() + self._latest_binary: np.ndarray | None = None + + # 受信 FPS 計測 + self._recv_frame_count: int = 0 + self._recv_fps_start: float = time.time() + self._recv_fps: float = 0.0 + + def update( + self, + overlay_flags: OverlayFlags, + ) -> bool: + """テレメトリを受信して表示を更新する + + Args: + overlay_flags: オーバーレイ表示フラグ + + Returns: + テレメトリを受信できた場合 True + """ + result = self._zmq_client.receive_telemetry() + if result is None: + return False + + telemetry, frame, binary = result + + # テレメトリから状態を取得 + self.state.detected = telemetry.get( + "detected", False, + ) + self.state.pos_error = telemetry.get( + "pos_error", 0.0, + ) + self.state.heading = telemetry.get( + "heading", 0.0, + ) + self.state.is_intersection = telemetry.get( + "is_intersection", False, + ) + self.state.is_recovering = telemetry.get( + "is_recovering", False, + ) + self.state.fps = telemetry.get("fps", 0.0) + self.state.throttle = telemetry.get( + "throttle", 0.0, + ) + self.state.steer = telemetry.get("steer", 0.0) + + self._latest_binary = binary + + # 受信 FPS 計測 + self._recv_frame_count += 1 + elapsed = time.time() - self._recv_fps_start + if elapsed >= 1.0: + self._recv_fps = ( + self._recv_frame_count / elapsed + ) + self._recv_frame_count = 0 + self._recv_fps_start = time.time() + + # 検出情報表示 + self._update_detect_info_label() + + # パフォーマンス表示 + self._perf_label.setText( + f"recv FPS: {self._recv_fps:.1f}" + f" Pi FPS: {self.state.fps:.1f}" + ) + + self._display_frame(frame, overlay_flags) + return True + + def _update_detect_info_label(self) -> None: + """検出情報ラベルを更新する""" + if not self.state.detected: + self._detect_info_label.setText( + "pos: --- head: ---" + ) + return + self._detect_info_label.setText( + f"pos: {self.state.pos_error:+.3f}" + f" head: {self.state.heading:+.4f}" + ) + + def _display_frame( + self, + frame: np.ndarray, + overlay_flags: OverlayFlags, + ) -> None: + """NumPy 配列の画像を QLabel に表示する + + Args: + frame: グレースケールの画像 + overlay_flags: オーバーレイ表示フラグ + """ + # グレースケール → BGR 変換 + bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) + + # テレメトリから LineDetectResult を構築 + detect_result = None + if ( + self.state.detected + and self._latest_binary is not None + ): + detect_result = LineDetectResult( + detected=True, + position_error=self.state.pos_error, + heading=self.state.heading, + curvature=0.0, + poly_coeffs=None, + row_centers=None, + binary_image=self._latest_binary, + ) + + bgr = draw_overlay( + bgr, detect_result, + overlay_flags, + is_intersection=( + self.state.is_intersection + ), + ) + + # BGR → RGB 変換 + rgb = bgr[:, :, ::-1].copy() + h, w, ch = rgb.shape + image = QImage( + rgb.data, w, h, ch * w, + QImage.Format.Format_RGB888, + ) + disp_w = int(config.FRAME_WIDTH * DISPLAY_SCALE) + disp_h = int( + config.FRAME_HEIGHT * DISPLAY_SCALE + ) + pixmap = QPixmap.fromImage(image).scaled( + disp_w, + disp_h, + Qt.AspectRatioMode.KeepAspectRatio, + Qt.TransformationMode.SmoothTransformation, + ) + self._video_label.setPixmap(pixmap)