Newer
Older
RobotCar / src / pc / gui / main_window.py
"""
main_window
PC 側のメインウィンドウを定義するモジュール
Pi からのテレメトリをリアルタイム表示し,
モード切替・パラメータ調整・手動操作のコマンドを送信する
"""

import dataclasses
import time

import cv2
import numpy as np
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QImage, QKeyEvent, QPixmap
from PySide6.QtWidgets import (
    QHBoxLayout,
    QLabel,
    QMainWindow,
    QPushButton,
    QScrollArea,
    QVBoxLayout,
    QWidget,
)

from common import config
from pc.comm.zmq_client import PcZmqClient
from pc.gui.panels import (
    ControlParamPanel,
    ImageParamPanel,
    IntersectionPanel,
    OverlayPanel,
    RecoveryPanel,
)
from pc.steering.auto_params import (
    load_control,
    load_detect_params,
    load_overlay,
    load_pursuit,
    load_recovery,
    load_ts_pd,
    save_control,
    save_overlay,
    save_pursuit,
    save_recovery,
    save_ts_pd,
)
from pc.steering.pd_control import PdParams
from pc.steering.pursuit_control import PursuitParams
from pc.steering.recovery import RecoveryParams
from pc.steering.ts_pd_control import TsPdParams
from pc.vision.line_detector import (
    ImageParams,
    LineDetectResult,
)
from pc.vision.overlay import draw_overlay

# 映像更新間隔 (ms)
FRAME_INTERVAL_MS: int = 33

# コマンド送信間隔 (ms)
COMMAND_INTERVAL_MS: int = int(
    1000 / config.CONTROL_PUBLISH_HZ
)

# 映像表示のスケール倍率(40x30 → 640x480 相当)
DISPLAY_SCALE: float = 16.0

# 手動操作の throttle / steer 量
MANUAL_THROTTLE: float = 0.5
MANUAL_STEER: float = 0.4


class MainWindow(QMainWindow):
    """PC 側のメインウィンドウ

    Pi からテレメトリを受信して映像・状態を表示し,
    コマンドを Pi に送信する
    """

    def __init__(self) -> None:
        super().__init__()
        self._zmq_client = PcZmqClient()
        self._is_connected = False
        self._is_auto = False

        # 手動操作の状態
        self._pressed_keys: set[int] = set()
        self._throttle: float = 0.0
        self._steer: float = 0.0

        # テレメトリから受け取る状態
        self._pi_detected: bool = False
        self._pi_pos_error: float = 0.0
        self._pi_heading: float = 0.0
        self._pi_is_intersection: bool = False
        self._pi_is_recovering: bool = False
        self._pi_fps: float = 0.0

        # 前回のパラメータを復元(GUI 表示・コマンド送信用)
        pd_params, last_method, last_steering = (
            load_control()
        )
        image_params = load_detect_params(last_method)
        self._image_params = image_params
        self._pd_params = pd_params
        pursuit_params = load_pursuit()
        self._pursuit_params = pursuit_params
        ts_pd_params = load_ts_pd()
        self._ts_pd_params = ts_pd_params
        self._steering_method: str = last_steering

        # 復帰パラメータ
        recovery_params = load_recovery()
        self._recovery_params = recovery_params

        # 最新のバイナリ画像(テレメトリから)
        self._latest_binary: np.ndarray | None = None

        # パフォーマンス計測
        self._recv_frame_count: int = 0
        self._recv_fps_start: float = time.time()
        self._recv_fps: float = 0.0

        # パラメータ変更フラグ(次のコマンド送信で送る)
        self._params_dirty: bool = True

        self._setup_ui()
        self._setup_timers()

    def _setup_ui(self) -> None:
        """UI を構築する"""
        self.setWindowTitle("RobotCar Controller")

        central = QWidget()
        self.setCentralWidget(central)
        root_layout = QHBoxLayout(central)

        # 左側: 映像表示 + 検出情報
        left_layout = QVBoxLayout()

        self._video_label = QLabel("カメラ映像待機中...")
        self._video_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        self._video_label.setMinimumSize(
            int(config.FRAME_WIDTH * DISPLAY_SCALE),
            int(config.FRAME_HEIGHT * DISPLAY_SCALE),
        )
        self._video_label.setStyleSheet(
            "background-color: #222;"
            " color: #aaa; font-size: 16px;"
        )
        left_layout.addWidget(self._video_label)

        self._detect_info_label = QLabel(
            "pos: ---  head: ---"
        )
        self._detect_info_label.setAlignment(
            Qt.AlignmentFlag.AlignLeft,
        )
        self._detect_info_label.setStyleSheet(
            "font-size: 14px; font-family: monospace;"
            " color: #0f0; background-color: #222;"
            " padding: 4px;"
        )
        left_layout.addWidget(self._detect_info_label)

        self._perf_label = QLabel(
            "recv FPS: ---  Pi FPS: ---"
        )
        self._perf_label.setAlignment(
            Qt.AlignmentFlag.AlignLeft,
        )
        self._perf_label.setStyleSheet(
            "font-size: 14px; font-family: monospace;"
            " color: #ff0; background-color: #222;"
            " padding: 4px;"
        )
        left_layout.addWidget(self._perf_label)
        root_layout.addLayout(left_layout, stretch=3)

        # 右側: スクロール可能なコントロールパネル
        scroll = QScrollArea()
        scroll.setWidgetResizable(True)
        scroll.setHorizontalScrollBarPolicy(
            Qt.ScrollBarPolicy.ScrollBarAlwaysOff,
        )
        control_widget = QWidget()
        control_layout = QVBoxLayout(control_widget)
        scroll.setWidget(control_widget)
        root_layout.addWidget(scroll, stretch=1)

        # 接続ボタン
        self._connect_btn = QPushButton("接続開始")
        self._connect_btn.clicked.connect(
            self._toggle_connection,
        )
        control_layout.addWidget(self._connect_btn)

        # 自動操縦ボタン
        self._auto_btn = QPushButton("自動操縦 ON")
        self._auto_btn.setEnabled(False)
        self._auto_btn.clicked.connect(self._toggle_auto)
        control_layout.addWidget(self._auto_btn)

        # ステータス表示
        self._status_label = QLabel("未接続")
        self._status_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        control_layout.addWidget(self._status_label)

        # 操舵量表示
        self._control_label = QLabel(
            "throttle: 0.00\nsteer: 0.00"
        )
        self._control_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        self._control_label.setStyleSheet(
            "font-size: 14px;",
        )
        control_layout.addWidget(self._control_label)

        # 二値化パラメータパネル
        self._image_panel = ImageParamPanel(
            self._image_params,
        )
        self._image_panel.image_params_changed.connect(
            self._on_image_params_changed,
        )
        self._image_panel.method_changed.connect(
            self._on_method_changed,
        )
        control_layout.addWidget(self._image_panel)

        # 十字路判定パネル
        self._intersection_panel = IntersectionPanel(
            available=True,
        )
        control_layout.addWidget(self._intersection_panel)

        # 制御パラメータパネル
        self._control_panel = ControlParamPanel(
            self._pd_params,
            self._pursuit_params,
            self._ts_pd_params,
            self._steering_method,
        )
        self._control_panel.pd_params_changed.connect(
            self._on_pd_params_changed,
        )
        self._control_panel.pursuit_params_changed.connect(
            self._on_pursuit_params_changed,
        )
        self._control_panel.ts_pd_params_changed.connect(
            self._on_ts_pd_params_changed,
        )
        self._control_panel.steering_method_changed.connect(
            self._on_steering_method_changed,
        )
        control_layout.addWidget(self._control_panel)

        # コースアウト復帰パネル
        self._recovery_panel = RecoveryPanel(
            self._recovery_params,
        )
        self._recovery_panel.recovery_params_changed.connect(
            self._on_recovery_params_changed,
        )
        control_layout.addWidget(self._recovery_panel)

        # デバッグ表示パネル
        overlay_flags = load_overlay()
        self._overlay_panel = OverlayPanel(overlay_flags)
        self._overlay_panel.overlay_flags_changed.connect(
            self._on_overlay_flags_changed,
        )
        control_layout.addWidget(self._overlay_panel)

        # 操作ガイド
        guide = QLabel(
            "--- キー操作 ---\n"
            "W/↑: 前進  S/↓: 後退\n"
            "A/←: 左    D/→: 右\n"
            "Space: 停止\n"
            "Q: 自動操縦 切替"
        )
        guide.setAlignment(Qt.AlignmentFlag.AlignLeft)
        guide.setStyleSheet("font-size: 12px; color: #666;")
        control_layout.addWidget(guide)

        control_layout.addStretch()

    def _setup_timers(self) -> None:
        """タイマーを設定する"""
        self._frame_timer = QTimer(self)
        self._frame_timer.timeout.connect(self._update_frame)

        self._command_timer = QTimer(self)
        self._command_timer.timeout.connect(
            self._send_command,
        )

    # ── パネルシグナルのスロット ───────────────────────────

    def _on_image_params_changed(
        self, ip: ImageParams,
    ) -> None:
        """二値化パラメータの変更をマークする"""
        self._image_params = ip
        self._params_dirty = True

    def _on_method_changed(self, method: str) -> None:
        """検出手法の変更を保存する"""
        save_control(
            self._pd_params, method,
            self._steering_method,
        )

    def _on_pd_params_changed(self, p: PdParams) -> None:
        """PD パラメータの変更を保存・マークする"""
        self._pd_params = p
        save_control(
            p, self._image_params.method,
            self._steering_method,
        )
        self._params_dirty = True

    def _on_pursuit_params_changed(
        self, p: PursuitParams,
    ) -> None:
        """Pursuit パラメータの変更を保存・マークする"""
        self._pursuit_params = p
        save_pursuit(p)
        self._params_dirty = True

    def _on_ts_pd_params_changed(
        self, p: TsPdParams,
    ) -> None:
        """Theil-Sen PD パラメータの変更を保存・マークする"""
        self._ts_pd_params = p
        save_ts_pd(p)
        self._params_dirty = True

    def _on_recovery_params_changed(
        self, p: RecoveryParams,
    ) -> None:
        """復帰パラメータの変更を保存・マークする"""
        self._recovery_params = p
        save_recovery(p)
        self._params_dirty = True

    def _on_overlay_flags_changed(self) -> None:
        """オーバーレイフラグの変更を保存する"""
        save_overlay(self._overlay_panel.get_flags())

    def _on_steering_method_changed(
        self, method: str,
    ) -> None:
        """制御手法の切替を反映して保存する"""
        self._steering_method = method
        save_control(
            self._pd_params,
            self._image_params.method,
            method,
        )
        self._params_dirty = True

    # ── 接続 ──────────────────────────────────────────────

    def _toggle_connection(self) -> None:
        """接続 / 切断を切り替える"""
        if self._is_connected:
            self._disconnect()
        else:
            self._connect()

    def _connect(self) -> None:
        """ZMQ 通信を開始してテレメトリ受信を始める"""
        self._zmq_client.start()
        self._is_connected = True
        self._params_dirty = True
        self._connect_btn.setText("切断")
        self._auto_btn.setEnabled(True)
        self._status_label.setText("接続中 (手動操作)")
        self._frame_timer.start(FRAME_INTERVAL_MS)
        self._command_timer.start(COMMAND_INTERVAL_MS)
        # 初回接続時に stop コマンドを送信
        self._zmq_client.send_command({"mode": "stop"})

    def _disconnect(self) -> None:
        """ZMQ 通信を停止する"""
        self._frame_timer.stop()
        self._command_timer.stop()
        # Pi を停止
        if self._is_connected:
            self._zmq_client.send_command({"mode": "stop"})
        if self._is_auto:
            self._is_auto = False
            self._auto_btn.setText("自動操縦 ON")
        self._zmq_client.stop()
        self._is_connected = False
        self._auto_btn.setEnabled(False)
        self._pressed_keys.clear()
        self._throttle = 0.0
        self._steer = 0.0
        self._connect_btn.setText("接続開始")
        self._status_label.setText("未接続")
        self._video_label.setText("カメラ映像待機中...")
        self._control_label.setText(
            "throttle: 0.00\nsteer: 0.00"
        )

    # ── 自動操縦 ──────────────────────────────────────────

    def _toggle_auto(self) -> None:
        """自動操縦の ON / OFF を切り替える"""
        if self._is_auto:
            self._disable_auto()
        else:
            self._enable_auto()

    def _enable_auto(self) -> None:
        """自動操縦を開始する"""
        self._is_auto = True
        self._pressed_keys.clear()
        self._auto_btn.setText("自動操縦 OFF")
        self._status_label.setText("接続中 (自動操縦)")
        # Pi にパラメータ付きで auto コマンドを送信
        self._params_dirty = True

    def _disable_auto(self) -> None:
        """自動操縦を停止して手動に戻る"""
        self._is_auto = False
        self._throttle = 0.0
        self._steer = 0.0
        self._auto_btn.setText("自動操縦 ON")
        self._status_label.setText("接続中 (手動操作)")
        self._update_control_label()

    # ── 映像更新(テレメトリ受信) ────────────────────────

    def _update_frame(self) -> None:
        """タイマーから呼び出され,テレメトリを受信して表示する"""
        result = self._zmq_client.receive_telemetry()
        if result is None:
            return

        telemetry, frame, binary = result

        # テレメトリから状態を取得
        self._pi_detected = telemetry.get(
            "detected", False,
        )
        self._pi_pos_error = telemetry.get(
            "pos_error", 0.0,
        )
        self._pi_heading = telemetry.get("heading", 0.0)
        self._pi_is_intersection = telemetry.get(
            "is_intersection", False,
        )
        self._pi_is_recovering = telemetry.get(
            "is_recovering", False,
        )
        self._pi_fps = telemetry.get("fps", 0.0)

        # 自動時は Pi の操舵量を表示
        if self._is_auto:
            self._throttle = telemetry.get(
                "throttle", 0.0,
            )
            self._steer = telemetry.get("steer", 0.0)
            self._update_control_label()

        # 十字路パネルの表示更新
        if self._intersection_panel.enabled:
            self._intersection_panel.update_result(
                self._pi_is_intersection,
            )
        else:
            self._intersection_panel.clear_result()

        self._latest_binary = binary

        # 受信 FPS 計測
        self._recv_frame_count += 1
        elapsed = time.time() - self._recv_fps_start
        if elapsed >= 1.0:
            self._recv_fps = (
                self._recv_frame_count / elapsed
            )
            self._recv_frame_count = 0
            self._recv_fps_start = time.time()

        # 検出情報表示
        self._update_detect_info_label()

        # パフォーマンス表示
        self._perf_label.setText(
            f"recv FPS: {self._recv_fps:.1f}"
            f"  Pi FPS: {self._pi_fps:.1f}"
        )

        self._display_frame(frame)

    def _update_detect_info_label(self) -> None:
        """検出情報ラベルを更新する"""
        if not self._pi_detected:
            self._detect_info_label.setText(
                "pos: ---  head: ---"
            )
            return
        self._detect_info_label.setText(
            f"pos: {self._pi_pos_error:+.3f}"
            f"  head: {self._pi_heading:+.4f}"
        )

    def _display_frame(self, frame: np.ndarray) -> None:
        """NumPy 配列の画像を QLabel に表示する

        Args:
            frame: グレースケールの画像
        """
        # グレースケール → BGR 変換(カラーオーバーレイ描画のため)
        bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)

        # テレメトリから簡易的な LineDetectResult を構築して
        # オーバーレイ描画に渡す
        detect_result = None
        if self._pi_detected and self._latest_binary is not None:
            detect_result = LineDetectResult(
                detected=True,
                position_error=self._pi_pos_error,
                heading=self._pi_heading,
                curvature=0.0,
                poly_coeffs=None,
                row_centers=None,
                binary_image=self._latest_binary,
            )

        bgr = draw_overlay(
            bgr, detect_result,
            self._overlay_panel.get_flags(),
            is_intersection=self._pi_is_intersection,
        )

        # BGR → RGB 変換
        rgb = bgr[:, :, ::-1].copy()
        h, w, ch = rgb.shape
        image = QImage(
            rgb.data, w, h, ch * w,
            QImage.Format.Format_RGB888,
        )
        disp_w = int(config.FRAME_WIDTH * DISPLAY_SCALE)
        disp_h = int(config.FRAME_HEIGHT * DISPLAY_SCALE)
        pixmap = QPixmap.fromImage(image).scaled(
            disp_w,
            disp_h,
            Qt.AspectRatioMode.KeepAspectRatio,
            Qt.TransformationMode.SmoothTransformation,
        )
        self._video_label.setPixmap(pixmap)

    # ── 手動操作 ──────────────────────────────────────────

    def keyPressEvent(self, event: QKeyEvent) -> None:
        """キー押下時の処理"""
        if event.isAutoRepeat():
            return

        # Q キーで自動操縦切り替え
        if event.key() == Qt.Key.Key_Q:
            if self._is_connected:
                self._toggle_auto()
            return

        # 自動操縦中はキー操作を無視
        if self._is_auto:
            return

        self._pressed_keys.add(event.key())
        self._update_manual_control()

    def keyReleaseEvent(self, event: QKeyEvent) -> None:
        """キー離上時に操舵量を更新する"""
        if event.isAutoRepeat():
            return

        if self._is_auto:
            return
        self._pressed_keys.discard(event.key())
        self._update_manual_control()

    def _update_manual_control(self) -> None:
        """押下中のキーから throttle と steer を計算する"""
        keys = self._pressed_keys

        # Space で緊急停止
        if Qt.Key.Key_Space in keys:
            self._throttle = 0.0
            self._steer = 0.0
            self._pressed_keys.clear()
            if self._is_auto:
                self._disable_auto()
            self._update_control_label()
            return

        # throttle: W/↑ で前進,S/↓ で後退
        forward = (
            Qt.Key.Key_W in keys or Qt.Key.Key_Up in keys
        )
        backward = (
            Qt.Key.Key_S in keys
            or Qt.Key.Key_Down in keys
        )
        if forward and not backward:
            self._throttle = MANUAL_THROTTLE
        elif backward and not forward:
            self._throttle = -MANUAL_THROTTLE
        else:
            self._throttle = 0.0

        # steer: A/← で左,D/→ で右
        left = (
            Qt.Key.Key_A in keys
            or Qt.Key.Key_Left in keys
        )
        right = (
            Qt.Key.Key_D in keys
            or Qt.Key.Key_Right in keys
        )
        if left and not right:
            self._steer = -MANUAL_STEER
        elif right and not left:
            self._steer = MANUAL_STEER
        else:
            self._steer = 0.0

        self._update_control_label()

    def _update_control_label(self) -> None:
        """操舵量の表示を更新する"""
        text = (
            f"throttle: {self._throttle:+.2f}\n"
            f"steer: {self._steer:+.2f}"
        )
        if self._is_auto and self._pi_is_intersection:
            text += "\n[十字路]"
        if self._is_auto and self._pi_is_recovering:
            text += "\n[復帰中]"
        self._control_label.setText(text)

    def _send_command(self) -> None:
        """コマンドを Pi に送信する"""
        if not self._is_connected:
            return

        cmd: dict = {}

        # モード
        if self._is_auto:
            cmd["mode"] = "auto"
        elif (
            self._throttle != 0.0
            or self._steer != 0.0
        ):
            cmd["mode"] = "manual"
            cmd["throttle"] = self._throttle
            cmd["steer"] = self._steer
        else:
            cmd["mode"] = "stop"

        # 十字路設定
        cmd["intersection_enabled"] = (
            self._intersection_panel.enabled
        )
        cmd["intersection_throttle"] = (
            self._intersection_panel.throttle
        )

        # 制御手法
        cmd["steering_method"] = self._steering_method

        # パラメータ更新(変更があった場合のみ)
        if self._params_dirty:
            cmd["image_params"] = dataclasses.asdict(
                self._image_params,
            )
            cmd["pd_params"] = dataclasses.asdict(
                self._pd_params,
            )
            cmd["pursuit_params"] = dataclasses.asdict(
                self._pursuit_params,
            )
            cmd["steering_params"] = dataclasses.asdict(
                self._ts_pd_params,
            )
            cmd["recovery_params"] = dataclasses.asdict(
                self._recovery_params,
            )
            self._params_dirty = False

        self._zmq_client.send_command(cmd)

    def closeEvent(self, event) -> None:
        """ウィンドウを閉じるときに通信を停止する"""
        if self._is_connected:
            self._disconnect()
        event.accept()