"""
main_window
PC 側のメインウィンドウを定義するモジュール
Pi からのテレメトリをリアルタイム表示し,
モード切替・パラメータ調整・手動操作のコマンドを送信する
"""
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QKeyEvent
from PySide6.QtWidgets import (
QHBoxLayout,
QLabel,
QMainWindow,
QPushButton,
QScrollArea,
QVBoxLayout,
QWidget,
)
from common import config
from pc.comm.zmq_client import PcZmqClient
from pc.gui.command_sender import CommandSender
from pc.gui.manual_controller import ManualController
from pc.gui.panels import (
ControlParamPanel,
ImageParamPanel,
IntersectionPanel,
OverlayPanel,
RecoveryPanel,
)
from pc.gui.telemetry_display import TelemetryDisplay
from pc.steering.auto_params import (
load_control,
load_detect_params,
load_overlay,
load_pursuit,
load_recovery,
load_ts_pd,
save_control,
save_overlay,
save_pursuit,
save_recovery,
save_ts_pd,
)
from common.steering.pd_control import PdParams
from common.steering.pursuit_control import PursuitParams
from common.steering.recovery import RecoveryParams
from common.steering.ts_pd_control import TsPdParams
from common.vision.line_detector import ImageParams
# 映像更新間隔 (ms)
FRAME_INTERVAL_MS: int = 33
# コマンド送信間隔 (ms)
COMMAND_INTERVAL_MS: int = int(
1000 / config.CONTROL_PUBLISH_HZ
)
# 映像表示のスケール倍率(40x30 → 640x480 相当)
DISPLAY_SCALE: float = config.DISPLAY_SCALE
class MainWindow(QMainWindow):
"""PC 側のメインウィンドウ
Pi からテレメトリを受信して映像・状態を表示し,
コマンドを Pi に送信する
"""
def __init__(self) -> None:
super().__init__()
self._zmq_client = PcZmqClient()
self._is_connected = False
self._is_auto = False
# 分割されたコンポーネント
self._manual = ManualController()
self._cmd_sender = CommandSender(
self._zmq_client,
)
# 前回のパラメータを復元
pd_params, last_method, last_steering = (
load_control()
)
image_params = load_detect_params(last_method)
self._image_params = image_params
self._pd_params = pd_params
pursuit_params = load_pursuit()
self._pursuit_params = pursuit_params
ts_pd_params = load_ts_pd()
self._ts_pd_params = ts_pd_params
self._steering_method: str = last_steering
# 復帰パラメータ
recovery_params = load_recovery()
self._recovery_params = recovery_params
self._setup_ui()
# TelemetryDisplay はウィジェット生成後に初期化
self._telemetry = TelemetryDisplay(
self._zmq_client,
self._video_label,
self._detect_info_label,
self._perf_label,
)
self._setup_timers()
def _setup_ui(self) -> None:
"""UI を構築する"""
self.setWindowTitle("RobotCar Controller")
central = QWidget()
self.setCentralWidget(central)
root_layout = QHBoxLayout(central)
# 左側: 映像表示 + 検出情報
left_layout = QVBoxLayout()
self._video_label = QLabel("カメラ映像待機中...")
self._video_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
self._video_label.setMinimumSize(
int(config.FRAME_WIDTH * DISPLAY_SCALE),
int(config.FRAME_HEIGHT * DISPLAY_SCALE),
)
self._video_label.setStyleSheet(
"background-color: #222;"
" color: #aaa; font-size: 16px;"
)
left_layout.addWidget(self._video_label)
self._detect_info_label = QLabel(
"pos: --- head: ---"
)
self._detect_info_label.setAlignment(
Qt.AlignmentFlag.AlignLeft,
)
self._detect_info_label.setStyleSheet(
"font-size: 14px; font-family: monospace;"
" color: #0f0; background-color: #222;"
" padding: 4px;"
)
left_layout.addWidget(self._detect_info_label)
self._perf_label = QLabel(
"recv FPS: --- Pi FPS: ---"
)
self._perf_label.setAlignment(
Qt.AlignmentFlag.AlignLeft,
)
self._perf_label.setStyleSheet(
"font-size: 14px; font-family: monospace;"
" color: #ff0; background-color: #222;"
" padding: 4px;"
)
left_layout.addWidget(self._perf_label)
root_layout.addLayout(left_layout, stretch=3)
# 右側: スクロール可能なコントロールパネル
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setHorizontalScrollBarPolicy(
Qt.ScrollBarPolicy.ScrollBarAlwaysOff,
)
control_widget = QWidget()
control_layout = QVBoxLayout(control_widget)
scroll.setWidget(control_widget)
root_layout.addWidget(scroll, stretch=1)
# 接続ボタン
self._connect_btn = QPushButton("接続開始")
self._connect_btn.clicked.connect(
self._toggle_connection,
)
control_layout.addWidget(self._connect_btn)
# 自動操縦ボタン
self._auto_btn = QPushButton("自動操縦 ON")
self._auto_btn.setEnabled(False)
self._auto_btn.clicked.connect(self._toggle_auto)
control_layout.addWidget(self._auto_btn)
# ステータス表示
self._status_label = QLabel("未接続")
self._status_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
control_layout.addWidget(self._status_label)
# 操舵量表示
self._control_label = QLabel(
"throttle: 0.00\nsteer: 0.00"
)
self._control_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
self._control_label.setStyleSheet(
"font-size: 14px;",
)
control_layout.addWidget(self._control_label)
# 二値化パラメータパネル
self._image_panel = ImageParamPanel(
self._image_params,
)
self._image_panel.image_params_changed.connect(
self._on_image_params_changed,
)
self._image_panel.method_changed.connect(
self._on_method_changed,
)
control_layout.addWidget(self._image_panel)
# 十字路判定パネル
self._intersection_panel = IntersectionPanel(
available=True,
)
control_layout.addWidget(self._intersection_panel)
# 制御パラメータパネル
self._control_panel = ControlParamPanel(
self._pd_params,
self._pursuit_params,
self._ts_pd_params,
self._steering_method,
)
self._control_panel.pd_params_changed.connect(
self._on_pd_params_changed,
)
self._control_panel.pursuit_params_changed.connect(
self._on_pursuit_params_changed,
)
self._control_panel.ts_pd_params_changed.connect(
self._on_ts_pd_params_changed,
)
self._control_panel.steering_method_changed.connect(
self._on_steering_method_changed,
)
control_layout.addWidget(self._control_panel)
# コースアウト復帰パネル
self._recovery_panel = RecoveryPanel(
self._recovery_params,
)
self._recovery_panel.recovery_params_changed.connect(
self._on_recovery_params_changed,
)
control_layout.addWidget(self._recovery_panel)
# デバッグ表示パネル
overlay_flags = load_overlay()
self._overlay_panel = OverlayPanel(overlay_flags)
self._overlay_panel.overlay_flags_changed.connect(
self._on_overlay_flags_changed,
)
control_layout.addWidget(self._overlay_panel)
# 操作ガイド
guide = QLabel(
"--- キー操作 ---\n"
"W/↑: 前進 S/↓: 後退\n"
"A/←: 左 D/→: 右\n"
"Space: 停止\n"
"Q: 自動操縦 切替"
)
guide.setAlignment(Qt.AlignmentFlag.AlignLeft)
guide.setStyleSheet("font-size: 12px; color: #666;")
control_layout.addWidget(guide)
control_layout.addStretch()
def _setup_timers(self) -> None:
"""タイマーを設定する"""
self._frame_timer = QTimer(self)
self._frame_timer.timeout.connect(
self._update_frame,
)
self._command_timer = QTimer(self)
self._command_timer.timeout.connect(
self._send_command,
)
# ── パネルシグナルのスロット ───────────────────────────
def _on_image_params_changed(
self, ip: ImageParams,
) -> None:
"""二値化パラメータの変更をマークする"""
self._image_params = ip
self._cmd_sender.mark_dirty()
def _on_method_changed(self, method: str) -> None:
"""検出手法の変更を保存する"""
save_control(
self._pd_params, method,
self._steering_method,
)
def _on_pd_params_changed(self, p: PdParams) -> None:
"""PD パラメータの変更を保存・マークする"""
self._pd_params = p
save_control(
p, self._image_params.method,
self._steering_method,
)
self._cmd_sender.mark_dirty()
def _on_pursuit_params_changed(
self, p: PursuitParams,
) -> None:
"""Pursuit パラメータの変更を保存・マークする"""
self._pursuit_params = p
save_pursuit(p)
self._cmd_sender.mark_dirty()
def _on_ts_pd_params_changed(
self, p: TsPdParams,
) -> None:
"""Theil-Sen PD パラメータの変更を保存・マークする"""
self._ts_pd_params = p
save_ts_pd(p)
self._cmd_sender.mark_dirty()
def _on_recovery_params_changed(
self, p: RecoveryParams,
) -> None:
"""復帰パラメータの変更を保存・マークする"""
self._recovery_params = p
save_recovery(p)
self._cmd_sender.mark_dirty()
def _on_overlay_flags_changed(self) -> None:
"""オーバーレイフラグの変更を保存する"""
save_overlay(self._overlay_panel.get_flags())
def _on_steering_method_changed(
self, method: str,
) -> None:
"""制御手法の切替を反映して保存する"""
self._steering_method = method
save_control(
self._pd_params,
self._image_params.method,
method,
)
self._cmd_sender.mark_dirty()
# ── 接続 ──────────────────────────────────────────────
def _toggle_connection(self) -> None:
"""接続 / 切断を切り替える"""
if self._is_connected:
self._disconnect()
else:
self._connect()
def _connect(self) -> None:
"""ZMQ 通信を開始してテレメトリ受信を始める"""
self._zmq_client.start()
self._is_connected = True
self._cmd_sender.mark_dirty()
self._connect_btn.setText("切断")
self._auto_btn.setEnabled(True)
self._status_label.setText("接続中 (手動操作)")
self._frame_timer.start(FRAME_INTERVAL_MS)
self._command_timer.start(COMMAND_INTERVAL_MS)
# 初回接続時に stop コマンドを送信
self._cmd_sender.send_stop()
def _disconnect(self) -> None:
"""ZMQ 通信を停止する"""
self._frame_timer.stop()
self._command_timer.stop()
# Pi を停止
if self._is_connected:
self._cmd_sender.send_stop()
if self._is_auto:
self._is_auto = False
self._auto_btn.setText("自動操縦 ON")
self._zmq_client.stop()
self._is_connected = False
self._auto_btn.setEnabled(False)
self._manual.reset()
self._connect_btn.setText("接続開始")
self._status_label.setText("未接続")
self._video_label.setText("カメラ映像待機中...")
self._control_label.setText(
"throttle: 0.00\nsteer: 0.00"
)
# ── 自動操縦 ──────────────────────────────────────────
def _toggle_auto(self) -> None:
"""自動操縦の ON / OFF を切り替える"""
if self._is_auto:
self._disable_auto()
else:
self._enable_auto()
def _enable_auto(self) -> None:
"""自動操縦を開始する"""
self._is_auto = True
self._manual.reset()
self._auto_btn.setText("自動操縦 OFF")
self._status_label.setText("接続中 (自動操縦)")
self._cmd_sender.mark_dirty()
def _disable_auto(self) -> None:
"""自動操縦を停止して手動に戻る"""
self._is_auto = False
self._manual.reset()
self._auto_btn.setText("自動操縦 ON")
self._status_label.setText("接続中 (手動操作)")
self._update_control_label()
# ── 映像更新(テレメトリ受信) ────────────────────────
def _update_frame(self) -> None:
"""タイマーから呼び出され,テレメトリを受信して表示する"""
received = self._telemetry.update(
self._overlay_panel.get_flags(),
)
if not received:
return
state = self._telemetry.state
# 自動時は Pi の操舵量を表示
if self._is_auto:
self._update_control_label()
# 十字路パネルの表示更新
if self._intersection_panel.enabled:
self._intersection_panel.update_result(
state.is_intersection,
)
else:
self._intersection_panel.clear_result()
# 十字路有効時は分類器準備完了まで自動操縦を無効化
if not self._is_auto:
can_auto = (
not self._intersection_panel.enabled
or state.intersection_available
)
self._auto_btn.setEnabled(can_auto)
# ── 手動操作 ──────────────────────────────────────────
def keyPressEvent(self, event: QKeyEvent) -> None:
"""キー押下時の処理"""
if event.isAutoRepeat():
return
# Q キーで自動操縦切り替え
if event.key() == Qt.Key.Key_Q:
if self._is_connected:
self._toggle_auto()
return
# 自動操縦中はキー操作を無視
if self._is_auto:
return
if self._manual.handle_key_press(event):
if self._manual.is_emergency_stop():
if self._is_auto:
self._disable_auto()
self._update_control_label()
def keyReleaseEvent(self, event: QKeyEvent) -> None:
"""キー離上時に操舵量を更新する"""
if event.isAutoRepeat():
return
if self._is_auto:
return
if self._manual.handle_key_release(event):
self._update_control_label()
def _update_control_label(self) -> None:
"""操舵量の表示を更新する"""
if self._is_auto:
state = self._telemetry.state
throttle = state.throttle
steer = state.steer
else:
throttle = self._manual.throttle
steer = self._manual.steer
text = (
f"throttle: {throttle:+.2f}\n"
f"steer: {steer:+.2f}"
)
if self._is_auto:
state = self._telemetry.state
if state.is_intersection:
text += "\n[十字路]"
if state.is_recovering:
text += "\n[復帰中]"
self._control_label.setText(text)
# ── コマンド送信 ──────────────────────────────────────
def _send_command(self) -> None:
"""コマンドを Pi に送信する"""
if not self._is_connected:
return
if self._is_auto:
throttle = 0.0
steer = 0.0
else:
throttle = self._manual.throttle
steer = self._manual.steer
self._cmd_sender.send(
is_auto=self._is_auto,
throttle=throttle,
steer=steer,
intersection_enabled=(
self._intersection_panel.enabled
),
intersection_throttle=(
self._intersection_panel.throttle
),
steering_method=self._steering_method,
image_params=self._image_params,
pd_params=self._pd_params,
pursuit_params=self._pursuit_params,
ts_pd_params=self._ts_pd_params,
recovery_params=self._recovery_params,
)
def closeEvent(self, event) -> None:
"""ウィンドウを閉じるときに通信を停止する"""
if self._is_connected:
self._disconnect()
event.accept()