"""
main_window
PC 側のメインウィンドウを定義するモジュール
Pi からのテレメトリをリアルタイム表示し,
モード切替・パラメータ調整・手動操作のコマンドを送信する
"""
import dataclasses
import time
import cv2
import numpy as np
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QImage, QKeyEvent, QPixmap
from PySide6.QtWidgets import (
QHBoxLayout,
QLabel,
QMainWindow,
QPushButton,
QScrollArea,
QVBoxLayout,
QWidget,
)
from common import config
from pc.comm.zmq_client import PcZmqClient
from pc.gui.panels import (
ControlParamPanel,
ImageParamPanel,
IntersectionPanel,
OverlayPanel,
RecoveryPanel,
)
from pc.steering.auto_params import (
load_control,
load_detect_params,
load_overlay,
load_pursuit,
load_recovery,
load_ts_pd,
save_control,
save_overlay,
save_pursuit,
save_recovery,
save_ts_pd,
)
from common.steering.pd_control import PdParams
from common.steering.pursuit_control import PursuitParams
from common.steering.recovery import RecoveryParams
from common.steering.ts_pd_control import TsPdParams
from common.vision.line_detector import (
ImageParams,
LineDetectResult,
)
from pc.vision.overlay import draw_overlay
# 映像更新間隔 (ms)
FRAME_INTERVAL_MS: int = 33
# コマンド送信間隔 (ms)
COMMAND_INTERVAL_MS: int = int(
1000 / config.CONTROL_PUBLISH_HZ
)
# 映像表示のスケール倍率(40x30 → 640x480 相当)
DISPLAY_SCALE: float = 16.0
# 手動操作の throttle / steer 量
MANUAL_THROTTLE: float = 0.5
MANUAL_STEER: float = 0.4
class MainWindow(QMainWindow):
"""PC 側のメインウィンドウ
Pi からテレメトリを受信して映像・状態を表示し,
コマンドを Pi に送信する
"""
def __init__(self) -> None:
super().__init__()
self._zmq_client = PcZmqClient()
self._is_connected = False
self._is_auto = False
# 手動操作の状態
self._pressed_keys: set[int] = set()
self._throttle: float = 0.0
self._steer: float = 0.0
# テレメトリから受け取る状態
self._pi_detected: bool = False
self._pi_pos_error: float = 0.0
self._pi_heading: float = 0.0
self._pi_is_intersection: bool = False
self._pi_is_recovering: bool = False
self._pi_fps: float = 0.0
# 前回のパラメータを復元(GUI 表示・コマンド送信用)
pd_params, last_method, last_steering = (
load_control()
)
image_params = load_detect_params(last_method)
self._image_params = image_params
self._pd_params = pd_params
pursuit_params = load_pursuit()
self._pursuit_params = pursuit_params
ts_pd_params = load_ts_pd()
self._ts_pd_params = ts_pd_params
self._steering_method: str = last_steering
# 復帰パラメータ
recovery_params = load_recovery()
self._recovery_params = recovery_params
# 最新のバイナリ画像(テレメトリから)
self._latest_binary: np.ndarray | None = None
# パフォーマンス計測
self._recv_frame_count: int = 0
self._recv_fps_start: float = time.time()
self._recv_fps: float = 0.0
# パラメータ変更フラグ(次のコマンド送信で送る)
self._params_dirty: bool = True
self._setup_ui()
self._setup_timers()
def _setup_ui(self) -> None:
"""UI を構築する"""
self.setWindowTitle("RobotCar Controller")
central = QWidget()
self.setCentralWidget(central)
root_layout = QHBoxLayout(central)
# 左側: 映像表示 + 検出情報
left_layout = QVBoxLayout()
self._video_label = QLabel("カメラ映像待機中...")
self._video_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
self._video_label.setMinimumSize(
int(config.FRAME_WIDTH * DISPLAY_SCALE),
int(config.FRAME_HEIGHT * DISPLAY_SCALE),
)
self._video_label.setStyleSheet(
"background-color: #222;"
" color: #aaa; font-size: 16px;"
)
left_layout.addWidget(self._video_label)
self._detect_info_label = QLabel(
"pos: --- head: ---"
)
self._detect_info_label.setAlignment(
Qt.AlignmentFlag.AlignLeft,
)
self._detect_info_label.setStyleSheet(
"font-size: 14px; font-family: monospace;"
" color: #0f0; background-color: #222;"
" padding: 4px;"
)
left_layout.addWidget(self._detect_info_label)
self._perf_label = QLabel(
"recv FPS: --- Pi FPS: ---"
)
self._perf_label.setAlignment(
Qt.AlignmentFlag.AlignLeft,
)
self._perf_label.setStyleSheet(
"font-size: 14px; font-family: monospace;"
" color: #ff0; background-color: #222;"
" padding: 4px;"
)
left_layout.addWidget(self._perf_label)
root_layout.addLayout(left_layout, stretch=3)
# 右側: スクロール可能なコントロールパネル
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setHorizontalScrollBarPolicy(
Qt.ScrollBarPolicy.ScrollBarAlwaysOff,
)
control_widget = QWidget()
control_layout = QVBoxLayout(control_widget)
scroll.setWidget(control_widget)
root_layout.addWidget(scroll, stretch=1)
# 接続ボタン
self._connect_btn = QPushButton("接続開始")
self._connect_btn.clicked.connect(
self._toggle_connection,
)
control_layout.addWidget(self._connect_btn)
# 自動操縦ボタン
self._auto_btn = QPushButton("自動操縦 ON")
self._auto_btn.setEnabled(False)
self._auto_btn.clicked.connect(self._toggle_auto)
control_layout.addWidget(self._auto_btn)
# ステータス表示
self._status_label = QLabel("未接続")
self._status_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
control_layout.addWidget(self._status_label)
# 操舵量表示
self._control_label = QLabel(
"throttle: 0.00\nsteer: 0.00"
)
self._control_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
self._control_label.setStyleSheet(
"font-size: 14px;",
)
control_layout.addWidget(self._control_label)
# 二値化パラメータパネル
self._image_panel = ImageParamPanel(
self._image_params,
)
self._image_panel.image_params_changed.connect(
self._on_image_params_changed,
)
self._image_panel.method_changed.connect(
self._on_method_changed,
)
control_layout.addWidget(self._image_panel)
# 十字路判定パネル
self._intersection_panel = IntersectionPanel(
available=True,
)
control_layout.addWidget(self._intersection_panel)
# 制御パラメータパネル
self._control_panel = ControlParamPanel(
self._pd_params,
self._pursuit_params,
self._ts_pd_params,
self._steering_method,
)
self._control_panel.pd_params_changed.connect(
self._on_pd_params_changed,
)
self._control_panel.pursuit_params_changed.connect(
self._on_pursuit_params_changed,
)
self._control_panel.ts_pd_params_changed.connect(
self._on_ts_pd_params_changed,
)
self._control_panel.steering_method_changed.connect(
self._on_steering_method_changed,
)
control_layout.addWidget(self._control_panel)
# コースアウト復帰パネル
self._recovery_panel = RecoveryPanel(
self._recovery_params,
)
self._recovery_panel.recovery_params_changed.connect(
self._on_recovery_params_changed,
)
control_layout.addWidget(self._recovery_panel)
# デバッグ表示パネル
overlay_flags = load_overlay()
self._overlay_panel = OverlayPanel(overlay_flags)
self._overlay_panel.overlay_flags_changed.connect(
self._on_overlay_flags_changed,
)
control_layout.addWidget(self._overlay_panel)
# 操作ガイド
guide = QLabel(
"--- キー操作 ---\n"
"W/↑: 前進 S/↓: 後退\n"
"A/←: 左 D/→: 右\n"
"Space: 停止\n"
"Q: 自動操縦 切替"
)
guide.setAlignment(Qt.AlignmentFlag.AlignLeft)
guide.setStyleSheet("font-size: 12px; color: #666;")
control_layout.addWidget(guide)
control_layout.addStretch()
def _setup_timers(self) -> None:
"""タイマーを設定する"""
self._frame_timer = QTimer(self)
self._frame_timer.timeout.connect(self._update_frame)
self._command_timer = QTimer(self)
self._command_timer.timeout.connect(
self._send_command,
)
# ── パネルシグナルのスロット ───────────────────────────
def _on_image_params_changed(
self, ip: ImageParams,
) -> None:
"""二値化パラメータの変更をマークする"""
self._image_params = ip
self._params_dirty = True
def _on_method_changed(self, method: str) -> None:
"""検出手法の変更を保存する"""
save_control(
self._pd_params, method,
self._steering_method,
)
def _on_pd_params_changed(self, p: PdParams) -> None:
"""PD パラメータの変更を保存・マークする"""
self._pd_params = p
save_control(
p, self._image_params.method,
self._steering_method,
)
self._params_dirty = True
def _on_pursuit_params_changed(
self, p: PursuitParams,
) -> None:
"""Pursuit パラメータの変更を保存・マークする"""
self._pursuit_params = p
save_pursuit(p)
self._params_dirty = True
def _on_ts_pd_params_changed(
self, p: TsPdParams,
) -> None:
"""Theil-Sen PD パラメータの変更を保存・マークする"""
self._ts_pd_params = p
save_ts_pd(p)
self._params_dirty = True
def _on_recovery_params_changed(
self, p: RecoveryParams,
) -> None:
"""復帰パラメータの変更を保存・マークする"""
self._recovery_params = p
save_recovery(p)
self._params_dirty = True
def _on_overlay_flags_changed(self) -> None:
"""オーバーレイフラグの変更を保存する"""
save_overlay(self._overlay_panel.get_flags())
def _on_steering_method_changed(
self, method: str,
) -> None:
"""制御手法の切替を反映して保存する"""
self._steering_method = method
save_control(
self._pd_params,
self._image_params.method,
method,
)
self._params_dirty = True
# ── 接続 ──────────────────────────────────────────────
def _toggle_connection(self) -> None:
"""接続 / 切断を切り替える"""
if self._is_connected:
self._disconnect()
else:
self._connect()
def _connect(self) -> None:
"""ZMQ 通信を開始してテレメトリ受信を始める"""
self._zmq_client.start()
self._is_connected = True
self._params_dirty = True
self._connect_btn.setText("切断")
self._auto_btn.setEnabled(True)
self._status_label.setText("接続中 (手動操作)")
self._frame_timer.start(FRAME_INTERVAL_MS)
self._command_timer.start(COMMAND_INTERVAL_MS)
# 初回接続時に stop コマンドを送信
self._zmq_client.send_command({"mode": "stop"})
def _disconnect(self) -> None:
"""ZMQ 通信を停止する"""
self._frame_timer.stop()
self._command_timer.stop()
# Pi を停止
if self._is_connected:
self._zmq_client.send_command({"mode": "stop"})
if self._is_auto:
self._is_auto = False
self._auto_btn.setText("自動操縦 ON")
self._zmq_client.stop()
self._is_connected = False
self._auto_btn.setEnabled(False)
self._pressed_keys.clear()
self._throttle = 0.0
self._steer = 0.0
self._connect_btn.setText("接続開始")
self._status_label.setText("未接続")
self._video_label.setText("カメラ映像待機中...")
self._control_label.setText(
"throttle: 0.00\nsteer: 0.00"
)
# ── 自動操縦 ──────────────────────────────────────────
def _toggle_auto(self) -> None:
"""自動操縦の ON / OFF を切り替える"""
if self._is_auto:
self._disable_auto()
else:
self._enable_auto()
def _enable_auto(self) -> None:
"""自動操縦を開始する"""
self._is_auto = True
self._pressed_keys.clear()
self._auto_btn.setText("自動操縦 OFF")
self._status_label.setText("接続中 (自動操縦)")
# Pi にパラメータ付きで auto コマンドを送信
self._params_dirty = True
def _disable_auto(self) -> None:
"""自動操縦を停止して手動に戻る"""
self._is_auto = False
self._throttle = 0.0
self._steer = 0.0
self._auto_btn.setText("自動操縦 ON")
self._status_label.setText("接続中 (手動操作)")
self._update_control_label()
# ── 映像更新(テレメトリ受信) ────────────────────────
def _update_frame(self) -> None:
"""タイマーから呼び出され,テレメトリを受信して表示する"""
result = self._zmq_client.receive_telemetry()
if result is None:
return
telemetry, frame, binary = result
# テレメトリから状態を取得
self._pi_detected = telemetry.get(
"detected", False,
)
self._pi_pos_error = telemetry.get(
"pos_error", 0.0,
)
self._pi_heading = telemetry.get("heading", 0.0)
self._pi_is_intersection = telemetry.get(
"is_intersection", False,
)
self._pi_is_recovering = telemetry.get(
"is_recovering", False,
)
self._pi_fps = telemetry.get("fps", 0.0)
# 自動時は Pi の操舵量を表示
if self._is_auto:
self._throttle = telemetry.get(
"throttle", 0.0,
)
self._steer = telemetry.get("steer", 0.0)
self._update_control_label()
# 十字路パネルの表示更新
if self._intersection_panel.enabled:
self._intersection_panel.update_result(
self._pi_is_intersection,
)
else:
self._intersection_panel.clear_result()
self._latest_binary = binary
# 受信 FPS 計測
self._recv_frame_count += 1
elapsed = time.time() - self._recv_fps_start
if elapsed >= 1.0:
self._recv_fps = (
self._recv_frame_count / elapsed
)
self._recv_frame_count = 0
self._recv_fps_start = time.time()
# 検出情報表示
self._update_detect_info_label()
# パフォーマンス表示
self._perf_label.setText(
f"recv FPS: {self._recv_fps:.1f}"
f" Pi FPS: {self._pi_fps:.1f}"
)
self._display_frame(frame)
def _update_detect_info_label(self) -> None:
"""検出情報ラベルを更新する"""
if not self._pi_detected:
self._detect_info_label.setText(
"pos: --- head: ---"
)
return
self._detect_info_label.setText(
f"pos: {self._pi_pos_error:+.3f}"
f" head: {self._pi_heading:+.4f}"
)
def _display_frame(self, frame: np.ndarray) -> None:
"""NumPy 配列の画像を QLabel に表示する
Args:
frame: グレースケールの画像
"""
# グレースケール → BGR 変換(カラーオーバーレイ描画のため)
bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
# テレメトリから簡易的な LineDetectResult を構築して
# オーバーレイ描画に渡す
detect_result = None
if self._pi_detected and self._latest_binary is not None:
detect_result = LineDetectResult(
detected=True,
position_error=self._pi_pos_error,
heading=self._pi_heading,
curvature=0.0,
poly_coeffs=None,
row_centers=None,
binary_image=self._latest_binary,
)
bgr = draw_overlay(
bgr, detect_result,
self._overlay_panel.get_flags(),
is_intersection=self._pi_is_intersection,
)
# BGR → RGB 変換
rgb = bgr[:, :, ::-1].copy()
h, w, ch = rgb.shape
image = QImage(
rgb.data, w, h, ch * w,
QImage.Format.Format_RGB888,
)
disp_w = int(config.FRAME_WIDTH * DISPLAY_SCALE)
disp_h = int(config.FRAME_HEIGHT * DISPLAY_SCALE)
pixmap = QPixmap.fromImage(image).scaled(
disp_w,
disp_h,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation,
)
self._video_label.setPixmap(pixmap)
# ── 手動操作 ──────────────────────────────────────────
def keyPressEvent(self, event: QKeyEvent) -> None:
"""キー押下時の処理"""
if event.isAutoRepeat():
return
# Q キーで自動操縦切り替え
if event.key() == Qt.Key.Key_Q:
if self._is_connected:
self._toggle_auto()
return
# 自動操縦中はキー操作を無視
if self._is_auto:
return
self._pressed_keys.add(event.key())
self._update_manual_control()
def keyReleaseEvent(self, event: QKeyEvent) -> None:
"""キー離上時に操舵量を更新する"""
if event.isAutoRepeat():
return
if self._is_auto:
return
self._pressed_keys.discard(event.key())
self._update_manual_control()
def _update_manual_control(self) -> None:
"""押下中のキーから throttle と steer を計算する"""
keys = self._pressed_keys
# Space で緊急停止
if Qt.Key.Key_Space in keys:
self._throttle = 0.0
self._steer = 0.0
self._pressed_keys.clear()
if self._is_auto:
self._disable_auto()
self._update_control_label()
return
# throttle: W/↑ で前進,S/↓ で後退
forward = (
Qt.Key.Key_W in keys or Qt.Key.Key_Up in keys
)
backward = (
Qt.Key.Key_S in keys
or Qt.Key.Key_Down in keys
)
if forward and not backward:
self._throttle = MANUAL_THROTTLE
elif backward and not forward:
self._throttle = -MANUAL_THROTTLE
else:
self._throttle = 0.0
# steer: A/← で左,D/→ で右
left = (
Qt.Key.Key_A in keys
or Qt.Key.Key_Left in keys
)
right = (
Qt.Key.Key_D in keys
or Qt.Key.Key_Right in keys
)
if left and not right:
self._steer = -MANUAL_STEER
elif right and not left:
self._steer = MANUAL_STEER
else:
self._steer = 0.0
self._update_control_label()
def _update_control_label(self) -> None:
"""操舵量の表示を更新する"""
text = (
f"throttle: {self._throttle:+.2f}\n"
f"steer: {self._steer:+.2f}"
)
if self._is_auto and self._pi_is_intersection:
text += "\n[十字路]"
if self._is_auto and self._pi_is_recovering:
text += "\n[復帰中]"
self._control_label.setText(text)
def _send_command(self) -> None:
"""コマンドを Pi に送信する"""
if not self._is_connected:
return
cmd: dict = {}
# モード
if self._is_auto:
cmd["mode"] = "auto"
elif (
self._throttle != 0.0
or self._steer != 0.0
):
cmd["mode"] = "manual"
cmd["throttle"] = self._throttle
cmd["steer"] = self._steer
else:
cmd["mode"] = "stop"
# 十字路設定
cmd["intersection_enabled"] = (
self._intersection_panel.enabled
)
cmd["intersection_throttle"] = (
self._intersection_panel.throttle
)
# 制御手法
cmd["steering_method"] = self._steering_method
# パラメータ更新(変更があった場合のみ)
if self._params_dirty:
cmd["image_params"] = dataclasses.asdict(
self._image_params,
)
cmd["pd_params"] = dataclasses.asdict(
self._pd_params,
)
cmd["pursuit_params"] = dataclasses.asdict(
self._pursuit_params,
)
cmd["steering_params"] = dataclasses.asdict(
self._ts_pd_params,
)
cmd["recovery_params"] = dataclasses.asdict(
self._recovery_params,
)
self._params_dirty = False
self._zmq_client.send_command(cmd)
def closeEvent(self, event) -> None:
"""ウィンドウを閉じるときに通信を停止する"""
if self._is_connected:
self._disconnect()
event.accept()