"""
main_window
PC 側のメインウィンドウを定義するモジュール
カメラ映像のリアルタイム表示と操作 UI を提供する
"""
import cv2
import numpy as np
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QImage, QKeyEvent, QPixmap
from PySide6.QtWidgets import (
QCheckBox,
QComboBox,
QDoubleSpinBox,
QFormLayout,
QGroupBox,
QHBoxLayout,
QInputDialog,
QLabel,
QMainWindow,
QMessageBox,
QPushButton,
QScrollArea,
QSpinBox,
QVBoxLayout,
QWidget,
)
from common import config
from pc.comm.zmq_client import PcZmqClient
from pc.steering.auto_params import (
load_control,
load_detect_params,
save_control,
save_detect_params,
)
from pc.steering.pd_control import PdControl, PdParams
from pc.steering.param_store import (
ImagePreset,
PdPreset,
add_image_preset,
add_pd_preset,
delete_image_preset,
delete_pd_preset,
load_image_presets,
load_pd_presets,
)
from pc.vision.line_detector import (
DETECT_METHODS,
ImageParams,
LineDetectResult,
detect_line,
)
from pc.vision.overlay import OverlayFlags, draw_overlay
# 映像更新間隔 (ms)
FRAME_INTERVAL_MS: int = 33
# 操舵量送信間隔 (ms)
CONTROL_INTERVAL_MS: int = int(
1000 / config.CONTROL_PUBLISH_HZ
)
# 映像表示のスケール倍率
DISPLAY_SCALE: float = 2.0
# 手動操作の throttle / steer 量
MANUAL_THROTTLE: float = 0.5
MANUAL_STEER: float = 0.4
class MainWindow(QMainWindow):
"""PC 側のメインウィンドウ"""
def __init__(self) -> None:
super().__init__()
self._zmq_client = PcZmqClient()
self._is_connected = False
self._is_auto = False
# 手動操作の状態
self._pressed_keys: set[int] = set()
self._throttle: float = 0.0
self._steer: float = 0.0
# 自動保存の制御フラグ
self._auto_save_enabled = False
# 前回のパラメータを復元
pd_params, last_method = load_control()
image_params = load_detect_params(last_method)
self._pd_control = PdControl(
params=pd_params,
image_params=image_params,
)
# 最新フレームの保持(自動操縦で使用)
self._latest_frame: np.ndarray | None = None
# オーバーレイ
self._overlay_flags = OverlayFlags()
self._last_detect_result: LineDetectResult | None = None
self._setup_ui()
self._setup_timers()
self._auto_save_enabled = True
def _setup_ui(self) -> None:
"""UI を構築する"""
self.setWindowTitle("RobotCar Controller")
# 中央ウィジェット
central = QWidget()
self.setCentralWidget(central)
root_layout = QHBoxLayout(central)
# 左側: 映像表示
self._video_label = QLabel("カメラ映像待機中...")
self._video_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
self._video_label.setMinimumSize(
int(config.FRAME_WIDTH * DISPLAY_SCALE),
int(config.FRAME_HEIGHT * DISPLAY_SCALE),
)
self._video_label.setStyleSheet(
"background-color: #222;"
" color: #aaa; font-size: 16px;"
)
root_layout.addWidget(self._video_label, stretch=3)
# 右側: スクロール可能なコントロールパネル
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setHorizontalScrollBarPolicy(
Qt.ScrollBarPolicy.ScrollBarAlwaysOff,
)
control_widget = QWidget()
control_layout = QVBoxLayout(control_widget)
scroll.setWidget(control_widget)
root_layout.addWidget(scroll, stretch=1)
# 接続ボタン
self._connect_btn = QPushButton("接続開始")
self._connect_btn.clicked.connect(
self._toggle_connection,
)
control_layout.addWidget(self._connect_btn)
# 自動操縦ボタン
self._auto_btn = QPushButton("自動操縦 ON")
self._auto_btn.setEnabled(False)
self._auto_btn.clicked.connect(
self._toggle_auto,
)
control_layout.addWidget(self._auto_btn)
# ステータス表示
self._status_label = QLabel("未接続")
self._status_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
control_layout.addWidget(self._status_label)
# 操舵量表示
self._control_label = QLabel(
"throttle: 0.00\nsteer: 0.00"
)
self._control_label.setAlignment(
Qt.AlignmentFlag.AlignCenter,
)
self._control_label.setStyleSheet("font-size: 14px;")
control_layout.addWidget(self._control_label)
# 画像処理パラメータ調整(Stage 1〜4)
self._setup_image_param_ui(control_layout)
# PD 制御パラメータ(操舵量計算)
self._setup_param_ui(control_layout)
# デバッグ表示
self._setup_overlay_ui(control_layout)
# 操作ガイド
guide = QLabel(
"--- キー操作 ---\n"
"W/↑: 前進 S/↓: 後退\n"
"A/←: 左 D/→: 右\n"
"Space: 停止\n"
"Q: 自動操縦 切替"
)
guide.setAlignment(Qt.AlignmentFlag.AlignLeft)
guide.setStyleSheet("font-size: 12px; color: #666;")
control_layout.addWidget(guide)
# 余白を下に詰める
control_layout.addStretch()
def _setup_param_ui(
self, parent_layout: QVBoxLayout,
) -> None:
"""PD パラメータ調整 UI を構築する"""
group = QGroupBox("PD 制御パラメータ")
layout = QVBoxLayout()
group.setLayout(layout)
form = QFormLayout()
layout.addLayout(form)
params = self._pd_control.params
self._spin_kp = self._create_spin(
params.kp, 0.0, 5.0, 0.05,
)
form.addRow("Kp (位置):", self._spin_kp)
self._spin_kh = self._create_spin(
params.kh, 0.0, 5.0, 0.05,
)
form.addRow("Kh (傾き):", self._spin_kh)
self._spin_kd = self._create_spin(
params.kd, 0.0, 5.0, 0.05,
)
form.addRow("Kd (微分):", self._spin_kd)
self._spin_max_steer_rate = self._create_spin(
params.max_steer_rate, 0.01, 1.0, 0.01,
)
form.addRow("操舵制限:", self._spin_max_steer_rate)
self._spin_max_throttle = self._create_spin(
params.max_throttle, 0.0, 1.0, 0.05,
)
form.addRow("最大速度:", self._spin_max_throttle)
self._spin_speed_k = self._create_spin(
params.speed_k, 0.0, 5.0, 0.05,
)
form.addRow("減速係数:", self._spin_speed_k)
# --- プリセット管理 ---
self._pd_preset_combo = QComboBox()
self._pd_preset_combo.setPlaceholderText(
"プリセット",
)
layout.addWidget(self._pd_preset_combo)
self._pd_preset_memo = QLabel("")
self._pd_preset_memo.setWordWrap(True)
self._pd_preset_memo.setStyleSheet(
"font-size: 11px; color: #888;",
)
layout.addWidget(self._pd_preset_memo)
btn_layout = QHBoxLayout()
load_btn = QPushButton("読込")
load_btn.clicked.connect(
self._on_load_pd_preset,
)
btn_layout.addWidget(load_btn)
save_btn = QPushButton("保存")
save_btn.clicked.connect(
self._on_save_pd_preset,
)
btn_layout.addWidget(save_btn)
del_btn = QPushButton("削除")
del_btn.clicked.connect(
self._on_delete_pd_preset,
)
btn_layout.addWidget(del_btn)
layout.addLayout(btn_layout)
# コールバック接続
for spin in [
self._spin_kp, self._spin_kh,
self._spin_kd, self._spin_max_steer_rate,
self._spin_max_throttle, self._spin_speed_k,
]:
spin.valueChanged.connect(
self._on_param_changed,
)
self._pd_preset_combo.currentIndexChanged \
.connect(self._on_pd_preset_selected)
self._pd_presets: list[PdPreset] = []
self._refresh_pd_presets()
parent_layout.addWidget(group)
def _setup_image_param_ui(
self, parent_layout: QVBoxLayout,
) -> None:
"""画像処理パラメータ調整 UI を構築する"""
group = QGroupBox("画像処理パラメータ")
layout = QVBoxLayout()
group.setLayout(layout)
ip = self._pd_control.image_params
# 検出手法の選択コンボボックス
self._method_combo = QComboBox()
for key, label in DETECT_METHODS.items():
self._method_combo.addItem(label, key)
idx = self._method_combo.findData(ip.method)
if idx >= 0:
self._method_combo.setCurrentIndex(idx)
layout.addWidget(self._method_combo)
# パラメータフォーム
self._image_form = QFormLayout()
layout.addLayout(self._image_form)
# 各パラメータの可視性マッピング
# (widget, 表示する手法の集合)
self._image_param_vis: list[
tuple[QWidget, set[str]]
] = []
# --- 現行手法パラメータ ---
self._spin_clahe_clip = self._create_spin(
ip.clahe_clip, 0.5, 10.0, 0.5,
)
self._add_image_row(
"CLAHE強度:", self._spin_clahe_clip,
{"current"},
)
self._spin_binary_thresh = QSpinBox()
self._spin_binary_thresh.setRange(10, 200)
self._spin_binary_thresh.setValue(
ip.binary_thresh,
)
self._add_image_row(
"二値化閾値:", self._spin_binary_thresh,
{"current", "blackhat"},
)
self._spin_open_size = QSpinBox()
self._spin_open_size.setRange(1, 31)
self._spin_open_size.setSingleStep(2)
self._spin_open_size.setValue(ip.open_size)
self._add_image_row(
"ノイズ除去:", self._spin_open_size,
{"current"},
)
self._spin_close_width = QSpinBox()
self._spin_close_width.setRange(1, 51)
self._spin_close_width.setSingleStep(2)
self._spin_close_width.setValue(ip.close_width)
self._add_image_row(
"途切れ補間:", self._spin_close_width,
{"current"},
)
# --- 案A/C: Black-hat ---
self._spin_blackhat_ksize = QSpinBox()
self._spin_blackhat_ksize.setRange(11, 101)
self._spin_blackhat_ksize.setSingleStep(2)
self._spin_blackhat_ksize.setValue(
ip.blackhat_ksize,
)
self._add_image_row(
"BHカーネル:", self._spin_blackhat_ksize,
{"blackhat", "robust"},
)
# --- 案B: 背景除算 ---
self._spin_bg_blur_ksize = QSpinBox()
self._spin_bg_blur_ksize.setRange(31, 201)
self._spin_bg_blur_ksize.setSingleStep(2)
self._spin_bg_blur_ksize.setValue(
ip.bg_blur_ksize,
)
self._add_image_row(
"背景ブラー:", self._spin_bg_blur_ksize,
{"dual_norm"},
)
# --- 案B/C: 適応的閾値 ---
self._spin_adaptive_block = QSpinBox()
self._spin_adaptive_block.setRange(11, 101)
self._spin_adaptive_block.setSingleStep(2)
self._spin_adaptive_block.setValue(
ip.adaptive_block,
)
self._add_image_row(
"適応ブロック:", self._spin_adaptive_block,
{"dual_norm", "robust"},
)
self._spin_adaptive_c = QSpinBox()
self._spin_adaptive_c.setRange(1, 30)
self._spin_adaptive_c.setValue(ip.adaptive_c)
self._add_image_row(
"適応定数C:", self._spin_adaptive_c,
{"dual_norm", "robust"},
)
# --- 案A/B/C: 後処理 ---
self._spin_iso_close = QSpinBox()
self._spin_iso_close.setRange(1, 51)
self._spin_iso_close.setSingleStep(2)
self._spin_iso_close.setValue(
ip.iso_close_size,
)
self._add_image_row(
"穴埋め:", self._spin_iso_close,
{"blackhat", "dual_norm", "robust"},
)
self._spin_dist_thresh = self._create_spin(
ip.dist_thresh, 0.0, 10.0, 0.5,
)
self._add_image_row(
"距離閾値:", self._spin_dist_thresh,
{"blackhat", "dual_norm", "robust"},
)
self._spin_min_line_width = QSpinBox()
self._spin_min_line_width.setRange(1, 20)
self._spin_min_line_width.setValue(
ip.min_line_width,
)
self._add_image_row(
"最小線幅:", self._spin_min_line_width,
{"blackhat", "dual_norm", "robust"},
)
# --- 案C: RANSAC ---
self._spin_ransac_thresh = self._create_spin(
ip.ransac_thresh, 1.0, 30.0, 1.0,
)
self._add_image_row(
"RANSAC閾値:", self._spin_ransac_thresh,
{"robust"},
)
# --- 幅フィルタ(透視補正) ---
self._spin_width_near = QSpinBox()
self._spin_width_near.setRange(0, 200)
self._spin_width_near.setValue(ip.width_near)
self._spin_width_near.setSpecialValueText("無効")
self._add_image_row(
"線幅(近)px:", self._spin_width_near,
{"blackhat", "dual_norm", "robust"},
)
self._spin_width_far = QSpinBox()
self._spin_width_far.setRange(0, 200)
self._spin_width_far.setValue(ip.width_far)
self._spin_width_far.setSpecialValueText("無効")
self._add_image_row(
"線幅(遠)px:", self._spin_width_far,
{"blackhat", "dual_norm", "robust"},
)
self._spin_width_tolerance = self._create_spin(
ip.width_tolerance, 1.0, 5.0, 0.1,
)
self._add_image_row(
"幅フィルタ倍率:", self._spin_width_tolerance,
{"blackhat", "dual_norm", "robust"},
)
# --- プリセット管理 ---
self._image_preset_combo = QComboBox()
self._image_preset_combo.setPlaceholderText(
"プリセット",
)
layout.addWidget(self._image_preset_combo)
self._image_preset_memo = QLabel("")
self._image_preset_memo.setWordWrap(True)
self._image_preset_memo.setStyleSheet(
"font-size: 11px; color: #888;",
)
layout.addWidget(self._image_preset_memo)
btn_layout = QHBoxLayout()
load_btn = QPushButton("読込")
load_btn.clicked.connect(
self._on_load_image_preset,
)
btn_layout.addWidget(load_btn)
save_btn = QPushButton("保存")
save_btn.clicked.connect(
self._on_save_image_preset,
)
btn_layout.addWidget(save_btn)
del_btn = QPushButton("削除")
del_btn.clicked.connect(
self._on_delete_image_preset,
)
btn_layout.addWidget(del_btn)
layout.addLayout(btn_layout)
# コールバック接続
self._method_combo.currentIndexChanged.connect(
self._on_method_changed,
)
for widget, _ in self._image_param_vis:
widget.valueChanged.connect(
self._on_image_param_changed,
)
self._image_preset_combo.currentIndexChanged \
.connect(self._on_image_preset_selected)
self._image_presets: list[ImagePreset] = []
self._image_filtered: list[int] = []
parent_layout.addWidget(group)
# 初期表示の更新
self._on_method_changed()
def _add_image_row(
self,
label: str,
widget: QWidget,
methods: set[str],
) -> None:
"""画像処理パラメータの行を追加する"""
self._image_form.addRow(label, widget)
self._image_param_vis.append(
(widget, methods),
)
def _on_method_changed(self) -> None:
"""検出手法の変更を反映する"""
method = self._method_combo.currentData()
# 旧手法のパラメータを保存
if self._auto_save_enabled:
ip = self._pd_control.image_params
save_detect_params(ip.method, ip)
# 新手法のパラメータを読み込み
if self._auto_save_enabled:
new_ip = load_detect_params(method)
self._pd_control.image_params = new_ip
self._sync_image_spinboxes()
save_control(
self._pd_control.params, method,
)
else:
self._pd_control.image_params.method = (
method
)
# パラメータの表示/非表示を更新
for widget, methods in self._image_param_vis:
visible = method in methods
widget.setVisible(visible)
label = self._image_form.labelForField(
widget,
)
if label:
label.setVisible(visible)
# 保存済みプリセットを手法でフィルタ
if hasattr(self, "_image_preset_combo"):
self._refresh_image_presets()
def _sync_image_spinboxes(self) -> None:
"""画像処理パラメータの SpinBox を現在値に同期する"""
self._auto_save_enabled = False
try:
ip = self._pd_control.image_params
self._spin_clahe_clip.setValue(
ip.clahe_clip,
)
self._spin_binary_thresh.setValue(
ip.binary_thresh,
)
self._spin_open_size.setValue(ip.open_size)
self._spin_close_width.setValue(
ip.close_width,
)
self._spin_blackhat_ksize.setValue(
ip.blackhat_ksize,
)
self._spin_bg_blur_ksize.setValue(
ip.bg_blur_ksize,
)
self._spin_adaptive_block.setValue(
ip.adaptive_block,
)
self._spin_adaptive_c.setValue(
ip.adaptive_c,
)
self._spin_iso_close.setValue(
ip.iso_close_size,
)
self._spin_dist_thresh.setValue(
ip.dist_thresh,
)
self._spin_min_line_width.setValue(
ip.min_line_width,
)
self._spin_ransac_thresh.setValue(
ip.ransac_thresh,
)
self._spin_width_near.setValue(ip.width_near)
self._spin_width_far.setValue(ip.width_far)
self._spin_width_tolerance.setValue(
ip.width_tolerance,
)
finally:
self._auto_save_enabled = True
def _on_image_param_changed(self) -> None:
"""画像処理パラメータの変更を反映する"""
ip = self._pd_control.image_params
# 現行手法
ip.clahe_clip = self._spin_clahe_clip.value()
ip.binary_thresh = (
self._spin_binary_thresh.value()
)
ip.open_size = self._spin_open_size.value()
ip.close_width = self._spin_close_width.value()
# 案A/C: Black-hat
ip.blackhat_ksize = (
self._spin_blackhat_ksize.value()
)
# 案B: 背景除算
ip.bg_blur_ksize = (
self._spin_bg_blur_ksize.value()
)
# 案B/C: 適応的閾値
ip.adaptive_block = (
self._spin_adaptive_block.value()
)
ip.adaptive_c = self._spin_adaptive_c.value()
# 案A/B/C: 後処理
ip.iso_close_size = (
self._spin_iso_close.value()
)
ip.dist_thresh = (
self._spin_dist_thresh.value()
)
ip.min_line_width = (
self._spin_min_line_width.value()
)
# 案C: RANSAC
ip.ransac_thresh = (
self._spin_ransac_thresh.value()
)
# 幅フィルタ(透視補正)
ip.width_near = self._spin_width_near.value()
ip.width_far = self._spin_width_far.value()
ip.width_tolerance = (
self._spin_width_tolerance.value()
)
if self._auto_save_enabled:
save_detect_params(ip.method, ip)
# ── 画像処理プリセット ──────────────────────────
def _refresh_image_presets(self) -> None:
"""選択中の手法のプリセットだけ表示する"""
self._image_presets = load_image_presets()
method = self._method_combo.currentData()
self._image_preset_combo.clear()
self._image_filtered = []
for i, p in enumerate(self._image_presets):
if p.image_params.method == method:
self._image_preset_combo.addItem(
p.title,
)
self._image_filtered.append(i)
self._image_preset_memo.setText("")
def _on_image_preset_selected(self) -> None:
"""画像処理プリセット選択時にメモを表示"""
idx = self._get_image_preset_idx()
if idx >= 0:
self._image_preset_memo.setText(
self._image_presets[idx].memo,
)
else:
self._image_preset_memo.setText("")
def _get_image_preset_idx(self) -> int:
"""コンボの選択を全体インデックスに変換"""
ci = self._image_preset_combo.currentIndex()
if ci < 0 or ci >= len(self._image_filtered):
return -1
return self._image_filtered[ci]
def _on_load_image_preset(self) -> None:
"""画像処理プリセットを読み込む"""
idx = self._get_image_preset_idx()
if idx < 0:
return
self._auto_save_enabled = False
try:
ip = self._image_presets[idx].image_params
self._pd_control.image_params = ip
self._sync_image_spinboxes()
finally:
self._auto_save_enabled = True
save_detect_params(ip.method, ip)
def _on_save_image_preset(self) -> None:
"""画像処理プリセットを保存する"""
title, ok = QInputDialog.getText(
self, "画像処理プリセット保存", "タイトル:",
)
if not ok or not title.strip():
return
memo, ok = QInputDialog.getText(
self, "画像処理プリセット保存", "メモ:",
)
if not ok:
return
ip = self._pd_control.image_params
add_image_preset(ImagePreset(
title=title.strip(),
memo=memo.strip(),
image_params=ImageParams(**{
f.name: getattr(ip, f.name)
for f in ip.__dataclass_fields__.values()
}),
))
self._refresh_image_presets()
self._image_preset_combo.setCurrentIndex(
self._image_preset_combo.count() - 1,
)
def _on_delete_image_preset(self) -> None:
"""画像処理プリセットを削除する"""
idx = self._get_image_preset_idx()
if idx < 0:
return
title = self._image_presets[idx].title
reply = QMessageBox.question(
self, "削除確認",
f"「{title}」を削除しますか?",
QMessageBox.StandardButton.Yes
| QMessageBox.StandardButton.No,
QMessageBox.StandardButton.No,
)
if reply == QMessageBox.StandardButton.Yes:
delete_image_preset(idx)
self._refresh_image_presets()
# ── PD 制御プリセット ─────────────────────────
def _refresh_pd_presets(self) -> None:
"""PD 制御プリセット一覧を更新する"""
self._pd_presets = load_pd_presets()
self._pd_preset_combo.clear()
for p in self._pd_presets:
self._pd_preset_combo.addItem(p.title)
self._pd_preset_memo.setText("")
def _on_pd_preset_selected(self) -> None:
"""PD 制御プリセット選択時にメモを表示"""
idx = self._pd_preset_combo.currentIndex()
if 0 <= idx < len(self._pd_presets):
self._pd_preset_memo.setText(
self._pd_presets[idx].memo,
)
else:
self._pd_preset_memo.setText("")
def _on_load_pd_preset(self) -> None:
"""PD 制御プリセットを読み込む"""
idx = self._pd_preset_combo.currentIndex()
if idx < 0 or idx >= len(self._pd_presets):
return
self._auto_save_enabled = False
try:
p = self._pd_presets[idx].params
self._spin_kp.setValue(p.kp)
self._spin_kh.setValue(p.kh)
self._spin_kd.setValue(p.kd)
self._spin_max_steer_rate.setValue(
p.max_steer_rate,
)
self._spin_max_throttle.setValue(
p.max_throttle,
)
self._spin_speed_k.setValue(p.speed_k)
self._pd_control.params = p
finally:
self._auto_save_enabled = True
save_control(
p,
self._pd_control.image_params.method,
)
def _on_save_pd_preset(self) -> None:
"""PD 制御プリセットを保存する"""
title, ok = QInputDialog.getText(
self, "PD プリセット保存", "タイトル:",
)
if not ok or not title.strip():
return
memo, ok = QInputDialog.getText(
self, "PD プリセット保存", "メモ:",
)
if not ok:
return
p = self._pd_control.params
add_pd_preset(PdPreset(
title=title.strip(),
memo=memo.strip(),
params=PdParams(
kp=p.kp, kh=p.kh, kd=p.kd,
max_steer_rate=p.max_steer_rate,
max_throttle=p.max_throttle,
speed_k=p.speed_k,
),
))
self._refresh_pd_presets()
self._pd_preset_combo.setCurrentIndex(
self._pd_preset_combo.count() - 1,
)
def _on_delete_pd_preset(self) -> None:
"""PD 制御プリセットを削除する"""
idx = self._pd_preset_combo.currentIndex()
if idx < 0 or idx >= len(self._pd_presets):
return
title = self._pd_presets[idx].title
reply = QMessageBox.question(
self, "削除確認",
f"「{title}」を削除しますか?",
QMessageBox.StandardButton.Yes
| QMessageBox.StandardButton.No,
QMessageBox.StandardButton.No,
)
if reply == QMessageBox.StandardButton.Yes:
delete_pd_preset(idx)
self._refresh_pd_presets()
def _setup_overlay_ui(
self, parent_layout: QVBoxLayout,
) -> None:
"""デバッグ表示のチェックボックス UI を構築する"""
group = QGroupBox("デバッグ表示")
layout = QVBoxLayout()
group.setLayout(layout)
items = [
("二値化画像", "binary"),
("検出領域", "detect_region"),
("フィッティング曲線", "poly_curve"),
("中心線", "center_line"),
("検出情報", "info_text"),
]
for label, attr in items:
cb = QCheckBox(label)
cb.toggled.connect(
lambda checked, a=attr:
setattr(self._overlay_flags, a, checked),
)
layout.addWidget(cb)
parent_layout.addWidget(group)
def _has_any_overlay(self) -> bool:
"""いずれかのオーバーレイが有効かを返す"""
f = self._overlay_flags
return (
f.binary or f.detect_region
or f.poly_curve or f.center_line
or f.info_text
)
@staticmethod
def _create_spin(
value: float, min_val: float,
max_val: float, step: float,
) -> QDoubleSpinBox:
"""パラメータ用の SpinBox を作成する"""
spin = QDoubleSpinBox()
spin.setRange(min_val, max_val)
spin.setSingleStep(step)
spin.setDecimals(3)
spin.setValue(value)
return spin
def _on_param_changed(self) -> None:
"""パラメータ SpinBox の値が変更されたときに反映する"""
p = self._pd_control.params
p.kp = self._spin_kp.value()
p.kh = self._spin_kh.value()
p.kd = self._spin_kd.value()
p.max_steer_rate = (
self._spin_max_steer_rate.value()
)
p.max_throttle = self._spin_max_throttle.value()
p.speed_k = self._spin_speed_k.value()
if self._auto_save_enabled:
save_control(
p,
self._pd_control.image_params.method,
)
def _setup_timers(self) -> None:
"""タイマーを設定する"""
# 映像更新用
self._frame_timer = QTimer(self)
self._frame_timer.timeout.connect(self._update_frame)
# 操舵量送信用
self._control_timer = QTimer(self)
self._control_timer.timeout.connect(
self._send_control,
)
# ── 接続 ──────────────────────────────────────────────
def _toggle_connection(self) -> None:
"""接続 / 切断を切り替える"""
if self._is_connected:
self._disconnect()
else:
self._connect()
def _connect(self) -> None:
"""ZMQ 通信を開始して映像受信を始める"""
self._zmq_client.start()
self._is_connected = True
self._connect_btn.setText("切断")
self._auto_btn.setEnabled(True)
self._status_label.setText("接続中 (手動操作)")
self._frame_timer.start(FRAME_INTERVAL_MS)
self._control_timer.start(CONTROL_INTERVAL_MS)
def _disconnect(self) -> None:
"""ZMQ 通信を停止する"""
self._frame_timer.stop()
self._control_timer.stop()
if self._is_auto:
self._is_auto = False
self._auto_btn.setText("自動操縦 ON")
self._zmq_client.stop()
self._is_connected = False
self._auto_btn.setEnabled(False)
self._pressed_keys.clear()
self._throttle = 0.0
self._steer = 0.0
self._latest_frame = None
self._connect_btn.setText("接続開始")
self._status_label.setText("未接続")
self._video_label.setText("カメラ映像待機中...")
self._control_label.setText(
"throttle: 0.00\nsteer: 0.00"
)
# ── 自動操縦 ──────────────────────────────────────────
def _toggle_auto(self) -> None:
"""自動操縦の ON / OFF を切り替える"""
if self._is_auto:
self._disable_auto()
else:
self._enable_auto()
def _enable_auto(self) -> None:
"""自動操縦を開始する"""
self._is_auto = True
self._pd_control.reset()
self._pressed_keys.clear()
self._auto_btn.setText("自動操縦 OFF")
self._status_label.setText("接続中 (自動操縦)")
def _disable_auto(self) -> None:
"""自動操縦を停止して手動に戻る"""
self._is_auto = False
self._throttle = 0.0
self._steer = 0.0
self._auto_btn.setText("自動操縦 ON")
self._status_label.setText("接続中 (手動操作)")
self._update_control_label()
# ── 映像更新 ──────────────────────────────────────────
def _update_frame(self) -> None:
"""タイマーから呼び出され,最新フレームを表示する"""
frame = self._zmq_client.receive_image()
if frame is None:
return
self._latest_frame = frame
# 自動操縦時は操舵量を計算
if self._is_auto:
output = self._pd_control.compute(frame)
self._throttle = output.throttle
self._steer = output.steer
self._update_control_label()
self._last_detect_result = (
self._pd_control.last_detect_result
)
elif self._has_any_overlay():
# 手動操作中でもオーバーレイ用に線検出を実行
self._last_detect_result = detect_line(
frame,
self._pd_control.image_params,
)
else:
self._last_detect_result = None
self._display_frame(frame)
def _display_frame(self, frame: np.ndarray) -> None:
"""NumPy 配列の画像を QLabel に表示する
Args:
frame: グレースケールの画像
"""
# グレースケール → BGR 変換(カラーオーバーレイ描画のため)
bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
# オーバーレイ描画
bgr = draw_overlay(
bgr, self._last_detect_result,
self._overlay_flags,
)
# BGR → RGB 変換
rgb = bgr[:, :, ::-1].copy()
h, w, ch = rgb.shape
image = QImage(
rgb.data, w, h, ch * w,
QImage.Format.Format_RGB888,
)
# スケーリングして表示
scaled_w = int(w * DISPLAY_SCALE)
scaled_h = int(h * DISPLAY_SCALE)
pixmap = QPixmap.fromImage(image).scaled(
scaled_w,
scaled_h,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation,
)
self._video_label.setPixmap(pixmap)
# ── 手動操作 ──────────────────────────────────────────
def keyPressEvent(self, event: QKeyEvent) -> None:
"""キー押下時の処理"""
if event.isAutoRepeat():
return
# Q キーで自動操縦切り替え
if event.key() == Qt.Key.Key_Q:
if self._is_connected:
self._toggle_auto()
return
# 自動操縦中はキー操作を無視
if self._is_auto:
return
self._pressed_keys.add(event.key())
self._update_manual_control()
def keyReleaseEvent(self, event: QKeyEvent) -> None:
"""キー離上時に操舵量を更新する"""
if event.isAutoRepeat() or self._is_auto:
return
self._pressed_keys.discard(event.key())
self._update_manual_control()
def _update_manual_control(self) -> None:
"""押下中のキーから throttle と steer を計算する"""
keys = self._pressed_keys
# Space で緊急停止
if Qt.Key.Key_Space in keys:
self._throttle = 0.0
self._steer = 0.0
self._pressed_keys.clear()
# 自動操縦中なら停止
if self._is_auto:
self._disable_auto()
self._update_control_label()
return
# throttle: W/↑ で前進,S/↓ で後退
forward = (
Qt.Key.Key_W in keys
or Qt.Key.Key_Up in keys
)
backward = (
Qt.Key.Key_S in keys
or Qt.Key.Key_Down in keys
)
if forward and not backward:
self._throttle = MANUAL_THROTTLE
elif backward and not forward:
self._throttle = -MANUAL_THROTTLE
else:
self._throttle = 0.0
# steer: A/← で左,D/→ で右
left = (
Qt.Key.Key_A in keys
or Qt.Key.Key_Left in keys
)
right = (
Qt.Key.Key_D in keys
or Qt.Key.Key_Right in keys
)
if left and not right:
self._steer = -MANUAL_STEER
elif right and not left:
self._steer = MANUAL_STEER
else:
self._steer = 0.0
self._update_control_label()
def _update_control_label(self) -> None:
"""操舵量の表示を更新する"""
self._control_label.setText(
f"throttle: {self._throttle:+.2f}\n"
f"steer: {self._steer:+.2f}"
)
def _send_control(self) -> None:
"""操舵量を Pi に送信する"""
if not self._is_connected:
return
self._zmq_client.send_control(
self._throttle, self._steer,
)
def closeEvent(self, event) -> None:
"""ウィンドウを閉じるときに通信を停止する"""
if self._is_connected:
self._disconnect()
event.accept()