Newer
Older
RobotCar / src / pc / gui / main_window.py
"""
main_window
PC 側のメインウィンドウを定義するモジュール
カメラ映像のリアルタイム表示と操作 UI を提供する
"""

import numpy as np
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QImage, QKeyEvent, QPixmap
from PySide6.QtWidgets import (
    QCheckBox,
    QDoubleSpinBox,
    QFormLayout,
    QGroupBox,
    QHBoxLayout,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

from common import config
from pc.comm.zmq_client import PcZmqClient
from pc.steering.pd_control import PdControl, PdParams
from pc.vision.line_detector import LineDetectResult, detect_line
from pc.vision.overlay import OverlayFlags, draw_overlay

# 映像更新間隔 (ms)
FRAME_INTERVAL_MS: int = 33

# 操舵量送信間隔 (ms)
CONTROL_INTERVAL_MS: int = int(
    1000 / config.CONTROL_PUBLISH_HZ
)

# 映像表示のスケール倍率
DISPLAY_SCALE: float = 2.0

# 手動操作の throttle / steer 量
MANUAL_THROTTLE: float = 0.5
MANUAL_STEER: float = 0.4


class MainWindow(QMainWindow):
    """PC 側のメインウィンドウ"""

    def __init__(self) -> None:
        super().__init__()
        self._zmq_client = PcZmqClient()
        self._is_connected = False
        self._is_auto = False

        # 手動操作の状態
        self._pressed_keys: set[int] = set()
        self._throttle: float = 0.0
        self._steer: float = 0.0

        # 自動操縦
        self._pd_control = PdControl()

        # 最新フレームの保持(自動操縦で使用)
        self._latest_frame: np.ndarray | None = None

        # オーバーレイ
        self._overlay_flags = OverlayFlags()
        self._last_detect_result: LineDetectResult | None = None

        self._setup_ui()
        self._setup_timers()

    def _setup_ui(self) -> None:
        """UI を構築する"""
        self.setWindowTitle("RobotCar Controller")

        # 中央ウィジェット
        central = QWidget()
        self.setCentralWidget(central)
        root_layout = QHBoxLayout(central)

        # 左側: 映像表示
        self._video_label = QLabel("カメラ映像待機中...")
        self._video_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        self._video_label.setMinimumSize(
            int(config.FRAME_WIDTH * DISPLAY_SCALE),
            int(config.FRAME_HEIGHT * DISPLAY_SCALE),
        )
        self._video_label.setStyleSheet(
            "background-color: #222;"
            " color: #aaa; font-size: 16px;"
        )
        root_layout.addWidget(self._video_label, stretch=3)

        # 右側: コントロールパネル
        control_layout = QVBoxLayout()
        root_layout.addLayout(control_layout, stretch=1)

        # 接続ボタン
        self._connect_btn = QPushButton("接続開始")
        self._connect_btn.clicked.connect(
            self._toggle_connection,
        )
        control_layout.addWidget(self._connect_btn)

        # 自動操縦ボタン
        self._auto_btn = QPushButton("自動操縦 ON")
        self._auto_btn.setEnabled(False)
        self._auto_btn.clicked.connect(
            self._toggle_auto,
        )
        control_layout.addWidget(self._auto_btn)

        # ステータス表示
        self._status_label = QLabel("未接続")
        self._status_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        control_layout.addWidget(self._status_label)

        # 操舵量表示
        self._control_label = QLabel(
            "throttle: 0.00\nsteer: 0.00"
        )
        self._control_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        self._control_label.setStyleSheet("font-size: 14px;")
        control_layout.addWidget(self._control_label)

        # PD パラメータ調整
        self._setup_param_ui(control_layout)

        # デバッグ表示
        self._setup_overlay_ui(control_layout)

        # 操作ガイド
        guide = QLabel(
            "--- キー操作 ---\n"
            "W/↑: 前進  S/↓: 後退\n"
            "A/←: 左    D/→: 右\n"
            "Space: 停止\n"
            "Q: 自動操縦 切替"
        )
        guide.setAlignment(Qt.AlignmentFlag.AlignLeft)
        guide.setStyleSheet("font-size: 12px; color: #666;")
        control_layout.addWidget(guide)

        # 余白を下に詰める
        control_layout.addStretch()

    def _setup_param_ui(
        self, parent_layout: QVBoxLayout,
    ) -> None:
        """PD パラメータ調整 UI を構築する"""
        group = QGroupBox("PD パラメータ")
        form = QFormLayout()
        group.setLayout(form)

        params = self._pd_control.params

        self._spin_kp = self._create_spin(
            params.kp, 0.0, 5.0, 0.05,
        )
        form.addRow("Kp:", self._spin_kp)

        self._spin_kd = self._create_spin(
            params.kd, 0.0, 5.0, 0.05,
        )
        form.addRow("Kd:", self._spin_kd)

        self._spin_alpha = self._create_spin(
            params.alpha, 0.0, 1.0, 0.1,
        )
        form.addRow("α (近方):", self._spin_alpha)

        self._spin_beta = self._create_spin(
            params.beta, 0.0, 1.0, 0.1,
        )
        form.addRow("β (遠方):", self._spin_beta)

        self._spin_max_throttle = self._create_spin(
            params.max_throttle, 0.0, 1.0, 0.05,
        )
        form.addRow("最大速度:", self._spin_max_throttle)

        self._spin_speed_k = self._create_spin(
            params.speed_k, 0.0, 2.0, 0.05,
        )
        form.addRow("減速係数:", self._spin_speed_k)

        # パラメータ変更時のコールバック
        self._spin_kp.valueChanged.connect(
            self._on_param_changed,
        )
        self._spin_kd.valueChanged.connect(
            self._on_param_changed,
        )
        self._spin_alpha.valueChanged.connect(
            self._on_param_changed,
        )
        self._spin_beta.valueChanged.connect(
            self._on_param_changed,
        )
        self._spin_max_throttle.valueChanged.connect(
            self._on_param_changed,
        )
        self._spin_speed_k.valueChanged.connect(
            self._on_param_changed,
        )

        parent_layout.addWidget(group)

    def _setup_overlay_ui(
        self, parent_layout: QVBoxLayout,
    ) -> None:
        """デバッグ表示のチェックボックス UI を構築する"""
        group = QGroupBox("デバッグ表示")
        layout = QVBoxLayout()
        group.setLayout(layout)

        items = [
            ("二値化画像", "binary"),
            ("近方領域", "near_region"),
            ("遠方領域", "far_region"),
            ("重心マーカー", "centroids"),
            ("中心線", "center_line"),
            ("偏差値", "error_text"),
        ]
        for label, attr in items:
            cb = QCheckBox(label)
            cb.toggled.connect(
                lambda checked, a=attr:
                    setattr(self._overlay_flags, a, checked),
            )
            layout.addWidget(cb)

        parent_layout.addWidget(group)

    def _has_any_overlay(self) -> bool:
        """いずれかのオーバーレイが有効かを返す"""
        f = self._overlay_flags
        return (
            f.binary or f.near_region or f.far_region
            or f.centroids or f.center_line
            or f.error_text
        )

    @staticmethod
    def _create_spin(
        value: float, min_val: float,
        max_val: float, step: float,
    ) -> QDoubleSpinBox:
        """パラメータ用の SpinBox を作成する"""
        spin = QDoubleSpinBox()
        spin.setRange(min_val, max_val)
        spin.setSingleStep(step)
        spin.setDecimals(3)
        spin.setValue(value)
        return spin

    def _on_param_changed(self) -> None:
        """パラメータ SpinBox の値が変更されたときに反映する"""
        p = self._pd_control.params
        p.kp = self._spin_kp.value()
        p.kd = self._spin_kd.value()
        p.alpha = self._spin_alpha.value()
        p.beta = self._spin_beta.value()
        p.max_throttle = self._spin_max_throttle.value()
        p.speed_k = self._spin_speed_k.value()

    def _setup_timers(self) -> None:
        """タイマーを設定する"""
        # 映像更新用
        self._frame_timer = QTimer(self)
        self._frame_timer.timeout.connect(self._update_frame)

        # 操舵量送信用
        self._control_timer = QTimer(self)
        self._control_timer.timeout.connect(
            self._send_control,
        )

    # ── 接続 ──────────────────────────────────────────────

    def _toggle_connection(self) -> None:
        """接続 / 切断を切り替える"""
        if self._is_connected:
            self._disconnect()
        else:
            self._connect()

    def _connect(self) -> None:
        """ZMQ 通信を開始して映像受信を始める"""
        self._zmq_client.start()
        self._is_connected = True
        self._connect_btn.setText("切断")
        self._auto_btn.setEnabled(True)
        self._status_label.setText("接続中 (手動操作)")
        self._frame_timer.start(FRAME_INTERVAL_MS)
        self._control_timer.start(CONTROL_INTERVAL_MS)

    def _disconnect(self) -> None:
        """ZMQ 通信を停止する"""
        self._frame_timer.stop()
        self._control_timer.stop()
        if self._is_auto:
            self._is_auto = False
            self._auto_btn.setText("自動操縦 ON")
        self._zmq_client.stop()
        self._is_connected = False
        self._auto_btn.setEnabled(False)
        self._pressed_keys.clear()
        self._throttle = 0.0
        self._steer = 0.0
        self._latest_frame = None
        self._connect_btn.setText("接続開始")
        self._status_label.setText("未接続")
        self._video_label.setText("カメラ映像待機中...")
        self._control_label.setText(
            "throttle: 0.00\nsteer: 0.00"
        )

    # ── 自動操縦 ──────────────────────────────────────────

    def _toggle_auto(self) -> None:
        """自動操縦の ON / OFF を切り替える"""
        if self._is_auto:
            self._disable_auto()
        else:
            self._enable_auto()

    def _enable_auto(self) -> None:
        """自動操縦を開始する"""
        self._is_auto = True
        self._pd_control.reset()
        self._pressed_keys.clear()
        self._auto_btn.setText("自動操縦 OFF")
        self._status_label.setText("接続中 (自動操縦)")

    def _disable_auto(self) -> None:
        """自動操縦を停止して手動に戻る"""
        self._is_auto = False
        self._throttle = 0.0
        self._steer = 0.0
        self._auto_btn.setText("自動操縦 ON")
        self._status_label.setText("接続中 (手動操作)")
        self._update_control_label()

    # ── 映像更新 ──────────────────────────────────────────

    def _update_frame(self) -> None:
        """タイマーから呼び出され,最新フレームを表示する"""
        frame = self._zmq_client.receive_image()
        if frame is None:
            return
        self._latest_frame = frame

        # 自動操縦時は操舵量を計算
        if self._is_auto:
            output = self._pd_control.compute(frame)
            self._throttle = output.throttle
            self._steer = output.steer
            self._update_control_label()
            self._last_detect_result = (
                self._pd_control.last_detect_result
            )
        elif self._has_any_overlay():
            # 手動操作中でもオーバーレイ用に線検出を実行
            self._last_detect_result = detect_line(frame)
        else:
            self._last_detect_result = None

        self._display_frame(frame)

    def _display_frame(self, frame: np.ndarray) -> None:
        """NumPy 配列の画像を QLabel に表示する

        Args:
            frame: BGR 形式の画像
        """
        # オーバーレイ描画
        frame = draw_overlay(
            frame, self._last_detect_result,
            self._overlay_flags,
        )

        # BGR → RGB 変換
        rgb = frame[:, :, ::-1].copy()
        h, w, ch = rgb.shape
        image = QImage(
            rgb.data, w, h, ch * w,
            QImage.Format.Format_RGB888,
        )
        # スケーリングして表示
        scaled_w = int(w * DISPLAY_SCALE)
        scaled_h = int(h * DISPLAY_SCALE)
        pixmap = QPixmap.fromImage(image).scaled(
            scaled_w,
            scaled_h,
            Qt.AspectRatioMode.KeepAspectRatio,
            Qt.TransformationMode.SmoothTransformation,
        )
        self._video_label.setPixmap(pixmap)

    # ── 手動操作 ──────────────────────────────────────────

    def keyPressEvent(self, event: QKeyEvent) -> None:
        """キー押下時の処理"""
        if event.isAutoRepeat():
            return

        # Q キーで自動操縦切り替え
        if event.key() == Qt.Key.Key_Q:
            if self._is_connected:
                self._toggle_auto()
            return

        # 自動操縦中はキー操作を無視
        if self._is_auto:
            return

        self._pressed_keys.add(event.key())
        self._update_manual_control()

    def keyReleaseEvent(self, event: QKeyEvent) -> None:
        """キー離上時に操舵量を更新する"""
        if event.isAutoRepeat() or self._is_auto:
            return
        self._pressed_keys.discard(event.key())
        self._update_manual_control()

    def _update_manual_control(self) -> None:
        """押下中のキーから throttle と steer を計算する"""
        keys = self._pressed_keys

        # Space で緊急停止
        if Qt.Key.Key_Space in keys:
            self._throttle = 0.0
            self._steer = 0.0
            self._pressed_keys.clear()
            # 自動操縦中なら停止
            if self._is_auto:
                self._disable_auto()
            self._update_control_label()
            return

        # throttle: W/↑ で前進,S/↓ で後退
        forward = (
            Qt.Key.Key_W in keys
            or Qt.Key.Key_Up in keys
        )
        backward = (
            Qt.Key.Key_S in keys
            or Qt.Key.Key_Down in keys
        )
        if forward and not backward:
            self._throttle = MANUAL_THROTTLE
        elif backward and not forward:
            self._throttle = -MANUAL_THROTTLE
        else:
            self._throttle = 0.0

        # steer: A/← で左,D/→ で右
        left = (
            Qt.Key.Key_A in keys
            or Qt.Key.Key_Left in keys
        )
        right = (
            Qt.Key.Key_D in keys
            or Qt.Key.Key_Right in keys
        )
        if left and not right:
            self._steer = -MANUAL_STEER
        elif right and not left:
            self._steer = MANUAL_STEER
        else:
            self._steer = 0.0

        self._update_control_label()

    def _update_control_label(self) -> None:
        """操舵量の表示を更新する"""
        self._control_label.setText(
            f"throttle: {self._throttle:+.2f}\n"
            f"steer: {self._steer:+.2f}"
        )

    def _send_control(self) -> None:
        """操舵量を Pi に送信する"""
        if not self._is_connected:
            return
        self._zmq_client.send_control(
            self._throttle, self._steer,
        )

    def closeEvent(self, event) -> None:
        """ウィンドウを閉じるときに通信を停止する"""
        if self._is_connected:
            self._disconnect()
        event.accept()