Newer
Older
RobotCar / src / pc / gui / main_window.py
"""
main_window
PC 側のメインウィンドウを定義するモジュール
カメラ映像のリアルタイム表示と操作 UI を提供する
"""

import cv2
import numpy as np
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QImage, QKeyEvent, QPixmap
from PySide6.QtWidgets import (
    QHBoxLayout,
    QLabel,
    QMainWindow,
    QPushButton,
    QScrollArea,
    QVBoxLayout,
    QWidget,
)

from common import config
from pc.comm.zmq_client import PcZmqClient
from pc.gui.panels import (
    ControlParamPanel,
    ImageParamPanel,
    OverlayPanel,
)
from pc.steering.auto_params import (
    load_control,
    load_detect_params,
    save_control,
)
from pc.steering.base import SteeringBase
from pc.steering.pd_control import PdControl, PdParams
from pc.steering.pursuit_control import (
    PursuitControl,
    PursuitParams,
)
from pc.vision.fitting import theil_sen_fit
from pc.vision.line_detector import (
    ImageParams,
    LineDetectResult,
    detect_line,
)
from pc.vision.overlay import draw_overlay

# 映像更新間隔 (ms)
FRAME_INTERVAL_MS: int = 33

# 操舵量送信間隔 (ms)
CONTROL_INTERVAL_MS: int = int(
    1000 / config.CONTROL_PUBLISH_HZ
)

# 映像表示のスケール倍率(40x30 → 640x480 相当)
DISPLAY_SCALE: float = 16.0

# 手動操作の throttle / steer 量
MANUAL_THROTTLE: float = 0.5
MANUAL_STEER: float = 0.4


class MainWindow(QMainWindow):
    """PC 側のメインウィンドウ"""

    def __init__(self) -> None:
        super().__init__()
        self._zmq_client = PcZmqClient()
        self._is_connected = False
        self._is_auto = False

        # 手動操作の状態
        self._pressed_keys: set[int] = set()
        self._throttle: float = 0.0
        self._steer: float = 0.0

        # 前回のパラメータを復元
        pd_params, last_method = load_control()
        image_params = load_detect_params(last_method)
        self._pd_control = PdControl(
            params=pd_params,
            image_params=image_params,
        )
        self._pursuit_control = PursuitControl(
            image_params=image_params,
        )

        # 現在の制御手法("pd" or "pursuit")
        self._steering_method: str = "pd"

        # 最新フレームの保持(自動操縦で使用)
        self._latest_frame: np.ndarray | None = None

        # 検出結果の保持
        self._last_detect_result: LineDetectResult | None = (
            None
        )

        self._setup_ui()
        self._setup_timers()

    def _setup_ui(self) -> None:
        """UI を構築する"""
        self.setWindowTitle("RobotCar Controller")

        central = QWidget()
        self.setCentralWidget(central)
        root_layout = QHBoxLayout(central)

        # 左側: 映像表示 + 検出情報
        left_layout = QVBoxLayout()

        self._video_label = QLabel("カメラ映像待機中...")
        self._video_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        self._video_label.setMinimumSize(
            int(config.FRAME_WIDTH * DISPLAY_SCALE),
            int(config.FRAME_HEIGHT * DISPLAY_SCALE),
        )
        self._video_label.setStyleSheet(
            "background-color: #222;"
            " color: #aaa; font-size: 16px;"
        )
        left_layout.addWidget(self._video_label)

        self._detect_info_label = QLabel(
            "pos: ---  head: ---  curv: ---"
        )
        self._detect_info_label.setAlignment(
            Qt.AlignmentFlag.AlignLeft,
        )
        self._detect_info_label.setStyleSheet(
            "font-size: 14px; font-family: monospace;"
            " color: #0f0; background-color: #222;"
            " padding: 4px;"
        )
        left_layout.addWidget(self._detect_info_label)
        root_layout.addLayout(left_layout, stretch=3)

        # 右側: スクロール可能なコントロールパネル
        scroll = QScrollArea()
        scroll.setWidgetResizable(True)
        scroll.setHorizontalScrollBarPolicy(
            Qt.ScrollBarPolicy.ScrollBarAlwaysOff,
        )
        control_widget = QWidget()
        control_layout = QVBoxLayout(control_widget)
        scroll.setWidget(control_widget)
        root_layout.addWidget(scroll, stretch=1)

        # 接続ボタン
        self._connect_btn = QPushButton("接続開始")
        self._connect_btn.clicked.connect(
            self._toggle_connection,
        )
        control_layout.addWidget(self._connect_btn)

        # 自動操縦ボタン
        self._auto_btn = QPushButton("自動操縦 ON")
        self._auto_btn.setEnabled(False)
        self._auto_btn.clicked.connect(self._toggle_auto)
        control_layout.addWidget(self._auto_btn)

        # ステータス表示
        self._status_label = QLabel("未接続")
        self._status_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        control_layout.addWidget(self._status_label)

        # 操舵量表示
        self._control_label = QLabel(
            "throttle: 0.00\nsteer: 0.00"
        )
        self._control_label.setAlignment(
            Qt.AlignmentFlag.AlignCenter,
        )
        self._control_label.setStyleSheet(
            "font-size: 14px;",
        )
        control_layout.addWidget(self._control_label)

        # 画像処理パラメータパネル
        self._image_panel = ImageParamPanel(
            self._pd_control.image_params,
        )
        self._image_panel.image_params_changed.connect(
            self._on_image_params_changed,
        )
        self._image_panel.method_changed.connect(
            self._on_method_changed,
        )
        control_layout.addWidget(self._image_panel)

        # 制御パラメータパネル
        self._control_panel = ControlParamPanel(
            self._pd_control.params,
            self._pursuit_control.params,
        )
        self._control_panel.pd_params_changed.connect(
            self._on_pd_params_changed,
        )
        self._control_panel.pursuit_params_changed.connect(
            self._on_pursuit_params_changed,
        )
        self._control_panel.steering_method_changed.connect(
            self._on_steering_method_changed,
        )
        control_layout.addWidget(self._control_panel)

        # デバッグ表示パネル
        self._overlay_panel = OverlayPanel()
        control_layout.addWidget(self._overlay_panel)

        # 操作ガイド
        guide = QLabel(
            "--- キー操作 ---\n"
            "W/↑: 前進  S/↓: 後退\n"
            "A/←: 左    D/→: 右\n"
            "Space: 停止\n"
            "Q: 自動操縦 切替"
        )
        guide.setAlignment(Qt.AlignmentFlag.AlignLeft)
        guide.setStyleSheet("font-size: 12px; color: #666;")
        control_layout.addWidget(guide)

        control_layout.addStretch()

    @property
    def _active_control(self) -> SteeringBase:
        """現在選択中の制御クラスを返す"""
        if self._steering_method == "pursuit":
            return self._pursuit_control
        return self._pd_control

    def _setup_timers(self) -> None:
        """タイマーを設定する"""
        self._frame_timer = QTimer(self)
        self._frame_timer.timeout.connect(self._update_frame)

        self._control_timer = QTimer(self)
        self._control_timer.timeout.connect(
            self._send_control,
        )

    # ── パネルシグナルのスロット ───────────────────────────

    def _on_image_params_changed(
        self, ip: ImageParams,
    ) -> None:
        """画像処理パラメータの変更を両制御クラスに反映する"""
        self._pd_control.image_params = ip
        self._pursuit_control.image_params = ip

    def _on_method_changed(self, method: str) -> None:
        """検出手法の変更に合わせて制御設定を保存する"""
        save_control(self._pd_control.params, method)

    def _on_pd_params_changed(self, p: PdParams) -> None:
        """PD パラメータの変更を制御クラスに反映して保存する"""
        self._pd_control.params = p
        save_control(
            p, self._pd_control.image_params.method,
        )

    def _on_pursuit_params_changed(
        self, p: PursuitParams,
    ) -> None:
        """Pursuit パラメータの変更を制御クラスに反映する"""
        self._pursuit_control.params = p

    def _on_steering_method_changed(
        self, method: str,
    ) -> None:
        """制御手法の切替を反映する"""
        self._steering_method = method

    # ── 接続 ──────────────────────────────────────────────

    def _toggle_connection(self) -> None:
        """接続 / 切断を切り替える"""
        if self._is_connected:
            self._disconnect()
        else:
            self._connect()

    def _connect(self) -> None:
        """ZMQ 通信を開始して映像受信を始める"""
        self._zmq_client.start()
        self._is_connected = True
        self._connect_btn.setText("切断")
        self._auto_btn.setEnabled(True)
        self._status_label.setText("接続中 (手動操作)")
        self._frame_timer.start(FRAME_INTERVAL_MS)
        self._control_timer.start(CONTROL_INTERVAL_MS)

    def _disconnect(self) -> None:
        """ZMQ 通信を停止する"""
        self._frame_timer.stop()
        self._control_timer.stop()
        if self._is_auto:
            self._is_auto = False
            self._auto_btn.setText("自動操縦 ON")
        self._zmq_client.stop()
        self._is_connected = False
        self._auto_btn.setEnabled(False)
        self._pressed_keys.clear()
        self._throttle = 0.0
        self._steer = 0.0
        self._latest_frame = None
        self._connect_btn.setText("接続開始")
        self._status_label.setText("未接続")
        self._video_label.setText("カメラ映像待機中...")
        self._control_label.setText(
            "throttle: 0.00\nsteer: 0.00"
        )

    # ── 自動操縦 ──────────────────────────────────────────

    def _toggle_auto(self) -> None:
        """自動操縦の ON / OFF を切り替える"""
        if self._is_auto:
            self._disable_auto()
        else:
            self._enable_auto()

    def _enable_auto(self) -> None:
        """自動操縦を開始する"""
        self._is_auto = True
        self._active_control.reset()
        self._pressed_keys.clear()
        self._auto_btn.setText("自動操縦 OFF")
        self._status_label.setText("接続中 (自動操縦)")

    def _disable_auto(self) -> None:
        """自動操縦を停止して手動に戻る"""
        self._is_auto = False
        self._throttle = 0.0
        self._steer = 0.0
        self._auto_btn.setText("自動操縦 ON")
        self._status_label.setText("接続中 (手動操作)")
        self._update_control_label()

    # ── 映像更新 ──────────────────────────────────────────

    def _update_frame(self) -> None:
        """タイマーから呼び出され,最新フレームを表示する"""
        frame = self._zmq_client.receive_image()
        if frame is None:
            return
        self._latest_frame = frame

        # 線検出は常に実行(検出情報ラベル表示のため)
        if self._is_auto:
            ctrl = self._active_control
            output = ctrl.compute(frame)
            self._throttle = output.throttle
            self._steer = output.steer
            self._update_control_label()
            self._last_detect_result = (
                ctrl.last_detect_result
            )
        else:
            self._last_detect_result = detect_line(
                frame,
                self._pd_control.image_params,
            )

        self._display_frame(frame)

    def _update_detect_info_label(self) -> None:
        """検出情報ラベルを更新する"""
        r = self._last_detect_result
        if r is None or not r.detected:
            self._detect_info_label.setText(
                "pos: ---  head: ---  curv: ---"
            )
            return
        self._detect_info_label.setText(
            f"pos: {r.position_error:+.3f}"
            f"  head: {r.heading:+.4f}"
            f"  curv: {r.curvature:+.6f}"
        )

    def _calc_pursuit_points_preview(
        self,
    ) -> (
        tuple[tuple[float, float], tuple[float, float]]
        | None
    ):
        """手動操作中にパシュート目標点を算出する

        Returns:
            ((near_x, near_y), (far_x, far_y)) または None
        """
        r = self._last_detect_result
        if r is None or not r.detected:
            return None
        if r.row_centers is None:
            return None

        centers = r.row_centers
        valid = ~np.isnan(centers)
        ys = np.where(valid)[0].astype(float)
        xs = centers[valid]
        if len(ys) < 2:
            return None

        slope, intercept = theil_sen_fit(ys, xs)
        h = len(centers)
        p = self._pursuit_control.params
        near_y = h * p.near_ratio
        far_y = h * p.far_ratio
        near_x = slope * near_y + intercept
        far_x = slope * far_y + intercept
        return ((near_x, near_y), (far_x, far_y))

    def _display_frame(self, frame: np.ndarray) -> None:
        """NumPy 配列の画像を QLabel に表示する

        Args:
            frame: グレースケールの画像
        """
        # グレースケール → BGR 変換(カラーオーバーレイ描画のため)
        bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)

        # オーバーレイ描画
        pursuit_pts = None
        if self._steering_method == "pursuit":
            if self._is_auto:
                pursuit_pts = (
                    self._pursuit_control
                    .last_pursuit_points
                )
            else:
                pursuit_pts = (
                    self._calc_pursuit_points_preview()
                )
        bgr = draw_overlay(
            bgr, self._last_detect_result,
            self._overlay_panel.get_flags(),
            pursuit_points=pursuit_pts,
        )

        # 検出情報をラベルに表示
        self._update_detect_info_label()

        # BGR → RGB 変換
        rgb = bgr[:, :, ::-1].copy()
        h, w, ch = rgb.shape
        image = QImage(
            rgb.data, w, h, ch * w,
            QImage.Format.Format_RGB888,
        )
        disp_w = int(config.FRAME_WIDTH * DISPLAY_SCALE)
        disp_h = int(config.FRAME_HEIGHT * DISPLAY_SCALE)
        pixmap = QPixmap.fromImage(image).scaled(
            disp_w,
            disp_h,
            Qt.AspectRatioMode.KeepAspectRatio,
            Qt.TransformationMode.SmoothTransformation,
        )
        self._video_label.setPixmap(pixmap)

    # ── 手動操作 ──────────────────────────────────────────

    def keyPressEvent(self, event: QKeyEvent) -> None:
        """キー押下時の処理"""
        if event.isAutoRepeat():
            return

        # Q キーで自動操縦切り替え
        if event.key() == Qt.Key.Key_Q:
            if self._is_connected:
                self._toggle_auto()
            return

        # 自動操縦中はキー操作を無視
        if self._is_auto:
            return

        self._pressed_keys.add(event.key())
        self._update_manual_control()

    def keyReleaseEvent(self, event: QKeyEvent) -> None:
        """キー離上時に操舵量を更新する"""
        if event.isAutoRepeat() or self._is_auto:
            return
        self._pressed_keys.discard(event.key())
        self._update_manual_control()

    def _update_manual_control(self) -> None:
        """押下中のキーから throttle と steer を計算する"""
        keys = self._pressed_keys

        # Space で緊急停止
        if Qt.Key.Key_Space in keys:
            self._throttle = 0.0
            self._steer = 0.0
            self._pressed_keys.clear()
            if self._is_auto:
                self._disable_auto()
            self._update_control_label()
            return

        # throttle: W/↑ で前進,S/↓ で後退
        forward = (
            Qt.Key.Key_W in keys or Qt.Key.Key_Up in keys
        )
        backward = (
            Qt.Key.Key_S in keys
            or Qt.Key.Key_Down in keys
        )
        if forward and not backward:
            self._throttle = MANUAL_THROTTLE
        elif backward and not forward:
            self._throttle = -MANUAL_THROTTLE
        else:
            self._throttle = 0.0

        # steer: A/← で左,D/→ で右
        left = (
            Qt.Key.Key_A in keys
            or Qt.Key.Key_Left in keys
        )
        right = (
            Qt.Key.Key_D in keys
            or Qt.Key.Key_Right in keys
        )
        if left and not right:
            self._steer = -MANUAL_STEER
        elif right and not left:
            self._steer = MANUAL_STEER
        else:
            self._steer = 0.0

        self._update_control_label()

    def _update_control_label(self) -> None:
        """操舵量の表示を更新する"""
        self._control_label.setText(
            f"throttle: {self._throttle:+.2f}\n"
            f"steer: {self._steer:+.2f}"
        )

    def _send_control(self) -> None:
        """操舵量を Pi に送信する"""
        if not self._is_connected:
            return
        self._zmq_client.send_control(
            self._throttle, self._steer,
        )

    def closeEvent(self, event) -> None:
        """ウィンドウを閉じるときに通信を停止する"""
        if self._is_connected:
            self._disconnect()
        event.accept()