diff --git a/src/pc/gui/main_window.py b/src/pc/gui/main_window.py index 434221e..92d9223 100644 --- a/src/pc/gui/main_window.py +++ b/src/pc/gui/main_window.py @@ -6,7 +6,7 @@ import numpy as np from PySide6.QtCore import Qt, QTimer -from PySide6.QtGui import QImage, QPixmap +from PySide6.QtGui import QImage, QKeyEvent, QPixmap from PySide6.QtWidgets import ( QHBoxLayout, QLabel, @@ -22,9 +22,18 @@ # 映像更新間隔 (ms) FRAME_INTERVAL_MS: int = 33 +# 操舵量送信間隔 (ms) +CONTROL_INTERVAL_MS: int = int( + 1000 / config.CONTROL_PUBLISH_HZ +) + # 映像表示のスケール倍率 DISPLAY_SCALE: float = 2.0 +# 手動操作の throttle / steer 量 +MANUAL_THROTTLE: float = 0.5 +MANUAL_STEER: float = 0.4 + class MainWindow(QMainWindow): """PC 側のメインウィンドウ""" @@ -33,8 +42,14 @@ super().__init__() self._zmq_client = PcZmqClient() self._is_connected = False + + # 手動操作の状態 + self._pressed_keys: set[int] = set() + self._throttle: float = 0.0 + self._steer: float = 0.0 + self._setup_ui() - self._setup_timer() + self._setup_timers() def _setup_ui(self) -> None: """UI を構築する""" @@ -47,13 +62,16 @@ # 左側: 映像表示 self._video_label = QLabel("カメラ映像待機中...") - self._video_label.setAlignment(Qt.AlignmentFlag.AlignCenter) + self._video_label.setAlignment( + Qt.AlignmentFlag.AlignCenter, + ) self._video_label.setMinimumSize( int(config.FRAME_WIDTH * DISPLAY_SCALE), int(config.FRAME_HEIGHT * DISPLAY_SCALE), ) self._video_label.setStyleSheet( - "background-color: #222; color: #aaa; font-size: 16px;" + "background-color: #222;" + " color: #aaa; font-size: 16px;" ) root_layout.addWidget(self._video_label, stretch=3) @@ -63,22 +81,56 @@ # 接続ボタン self._connect_btn = QPushButton("接続開始") - self._connect_btn.clicked.connect(self._toggle_connection) + self._connect_btn.clicked.connect( + self._toggle_connection, + ) control_layout.addWidget(self._connect_btn) # ステータス表示 self._status_label = QLabel("未接続") - self._status_label.setAlignment(Qt.AlignmentFlag.AlignCenter) + self._status_label.setAlignment( + Qt.AlignmentFlag.AlignCenter, + ) control_layout.addWidget(self._status_label) + # 操舵量表示 + self._control_label = QLabel( + "throttle: 0.00\nsteer: 0.00" + ) + self._control_label.setAlignment( + Qt.AlignmentFlag.AlignCenter, + ) + self._control_label.setStyleSheet("font-size: 14px;") + control_layout.addWidget(self._control_label) + + # 操作ガイド + guide = QLabel( + "--- 手動操作 ---\n" + "W / ↑ : 前進\n" + "S / ↓ : 後退\n" + "A / ← : 左旋回\n" + "D / → : 右旋回\n" + "Space : 停止" + ) + guide.setAlignment(Qt.AlignmentFlag.AlignLeft) + guide.setStyleSheet("font-size: 12px; color: #666;") + control_layout.addWidget(guide) + # 余白を下に詰める control_layout.addStretch() - def _setup_timer(self) -> None: - """映像更新用タイマーを設定する""" + def _setup_timers(self) -> None: + """タイマーを設定する""" + # 映像更新用 self._frame_timer = QTimer(self) self._frame_timer.timeout.connect(self._update_frame) + # 操舵量送信用 + self._control_timer = QTimer(self) + self._control_timer.timeout.connect( + self._send_control, + ) + def _toggle_connection(self) -> None: """接続 / 切断を切り替える""" if self._is_connected: @@ -91,17 +143,25 @@ self._zmq_client.start() self._is_connected = True self._connect_btn.setText("切断") - self._status_label.setText("接続中") + self._status_label.setText("接続中 (手動操作)") self._frame_timer.start(FRAME_INTERVAL_MS) + self._control_timer.start(CONTROL_INTERVAL_MS) def _disconnect(self) -> None: """ZMQ 通信を停止する""" self._frame_timer.stop() + self._control_timer.stop() self._zmq_client.stop() self._is_connected = False + self._pressed_keys.clear() + self._throttle = 0.0 + self._steer = 0.0 self._connect_btn.setText("接続開始") self._status_label.setText("未接続") self._video_label.setText("カメラ映像待機中...") + self._control_label.setText( + "throttle: 0.00\nsteer: 0.00" + ) def _update_frame(self) -> None: """タイマーから呼び出され,最新フレームを表示する""" @@ -120,7 +180,8 @@ rgb = frame[:, :, ::-1].copy() h, w, ch = rgb.shape image = QImage( - rgb.data, w, h, ch * w, QImage.Format.Format_RGB888, + rgb.data, w, h, ch * w, + QImage.Format.Format_RGB888, ) # スケーリングして表示 scaled_w = int(w * DISPLAY_SCALE) @@ -133,6 +194,83 @@ ) self._video_label.setPixmap(pixmap) + # ── 手動操作 ────────────────────────────────────────── + + def keyPressEvent(self, event: QKeyEvent) -> None: + """キー押下時に操舵量を更新する""" + if event.isAutoRepeat(): + return + self._pressed_keys.add(event.key()) + self._update_manual_control() + + def keyReleaseEvent(self, event: QKeyEvent) -> None: + """キー離上時に操舵量を更新する""" + if event.isAutoRepeat(): + return + self._pressed_keys.discard(event.key()) + self._update_manual_control() + + def _update_manual_control(self) -> None: + """押下中のキーから throttle と steer を計算する""" + keys = self._pressed_keys + + # Space で緊急停止 + if Qt.Key.Key_Space in keys: + self._throttle = 0.0 + self._steer = 0.0 + self._pressed_keys.clear() + self._update_control_label() + return + + # throttle: W/↑ で前進,S/↓ で後退 + forward = ( + Qt.Key.Key_W in keys + or Qt.Key.Key_Up in keys + ) + backward = ( + Qt.Key.Key_S in keys + or Qt.Key.Key_Down in keys + ) + if forward and not backward: + self._throttle = MANUAL_THROTTLE + elif backward and not forward: + self._throttle = -MANUAL_THROTTLE + else: + self._throttle = 0.0 + + # steer: A/← で左,D/→ で右 + left = ( + Qt.Key.Key_A in keys + or Qt.Key.Key_Left in keys + ) + right = ( + Qt.Key.Key_D in keys + or Qt.Key.Key_Right in keys + ) + if left and not right: + self._steer = -MANUAL_STEER + elif right and not left: + self._steer = MANUAL_STEER + else: + self._steer = 0.0 + + self._update_control_label() + + def _update_control_label(self) -> None: + """操舵量の表示を更新する""" + self._control_label.setText( + f"throttle: {self._throttle:+.2f}\n" + f"steer: {self._steer:+.2f}" + ) + + def _send_control(self) -> None: + """操舵量を Pi に送信する""" + if not self._is_connected: + return + self._zmq_client.send_control( + self._throttle, self._steer, + ) + def closeEvent(self, event) -> None: """ウィンドウを閉じるときに通信を停止する""" if self._is_connected: diff --git a/src/pi/main.py b/src/pi/main.py index dfcb700..17b7630 100644 --- a/src/pi/main.py +++ b/src/pi/main.py @@ -8,42 +8,42 @@ from pi.camera.capture import CameraCapture from pi.comm.zmq_client import PiZmqClient +from pi.motor.driver import MotorDriver def main() -> None: """Pi 側のメインループを実行する""" camera = CameraCapture() zmq_client = PiZmqClient() + motor = MotorDriver() try: camera.start() zmq_client.start() - print("Pi: カメラ・通信を開始") + motor.start() + print("Pi: カメラ・通信・モーターを開始") while True: # カメラ画像を取得して送信 frame = camera.capture() zmq_client.send_image(frame) - # 操舵量を受信(現時点ではログ出力のみ) + # 操舵量を受信してモーターに反映 control = zmq_client.receive_control() if control is not None: throttle, steer = control - print( - f"受信: throttle={throttle:.2f}," - f" steer={steer:.2f}" - ) + motor.set_drive(throttle, steer) - # タイムアウト判定 + # タイムアウト時はモーター停止 if zmq_client.is_timeout(): - # TODO(rinto): モーター停止処理を追加する - pass + motor.stop() time.sleep(0.01) except KeyboardInterrupt: print("\nPi: 終了") finally: + motor.cleanup() camera.stop() zmq_client.stop() diff --git a/src/pi/motor/driver.py b/src/pi/motor/driver.py new file mode 100644 index 0000000..7e0dd99 --- /dev/null +++ b/src/pi/motor/driver.py @@ -0,0 +1,132 @@ +""" +driver +TB6612FNG モータードライバを制御するモジュール +差動2輪駆動で左右のモーターを制御する +""" + +from common import config + +try: + import RPi.GPIO as GPIO +except Exception: + GPIO = None + + +class MotorDriver: + """TB6612FNG を介して左右のモーターを制御するクラス""" + + def __init__(self) -> None: + self._pwm_a: object = None + self._pwm_b: object = None + self._ready: bool = False + + def start(self) -> None: + """GPIO を初期化して PWM を開始する""" + if GPIO is None: + print("GPIO 未検出: モーター無効(非 RPi 環境)") + return + + GPIO.setmode(GPIO.BOARD) + + # モーター A(左) + GPIO.setup(config.MA_IN1, GPIO.OUT, initial=GPIO.LOW) + GPIO.setup(config.MA_IN2, GPIO.OUT, initial=GPIO.LOW) + GPIO.setup(config.MA_PWM, GPIO.OUT, initial=GPIO.LOW) + + # モーター B(右) + GPIO.setup(config.MB_IN1, GPIO.OUT, initial=GPIO.LOW) + GPIO.setup(config.MB_IN2, GPIO.OUT, initial=GPIO.LOW) + GPIO.setup(config.MB_PWM, GPIO.OUT, initial=GPIO.LOW) + + self._pwm_a = GPIO.PWM( + config.MA_PWM, config.MOTOR_PWM_FREQ, + ) + self._pwm_b = GPIO.PWM( + config.MB_PWM, config.MOTOR_PWM_FREQ, + ) + self._pwm_a.start(0) + self._pwm_b.start(0) + self._ready = True + print("モーター初期化完了") + + @staticmethod + def _clamp( + value: float, low: float, high: float, + ) -> float: + """値を指定範囲に制限する""" + return max(low, min(high, value)) + + def _apply_one_motor( + self, in1: int, in2: int, pwm: object, + speed: float, + ) -> None: + """1つのモーターに速度を適用する + + Args: + in1: IN1 の GPIO ピン番号 + in2: IN2 の GPIO ピン番号 + pwm: PWM オブジェクト + speed: -1.0 ~ +1.0 の速度値 + """ + if speed > 0: + GPIO.output(in1, GPIO.LOW) + GPIO.output(in2, GPIO.HIGH) + elif speed < 0: + GPIO.output(in1, GPIO.HIGH) + GPIO.output(in2, GPIO.LOW) + else: + GPIO.output(in1, GPIO.LOW) + GPIO.output(in2, GPIO.LOW) + pwm.ChangeDutyCycle(abs(speed) * 100.0) + + def set_drive( + self, throttle: float, steer: float, + ) -> None: + """throttle と steer からモーターを駆動する + + Args: + throttle: 前後方向 (-1.0 ~ +1.0) + steer: 左右方向 (-1.0 ~ +1.0) + """ + if not self._ready: + return + + throttle = self._clamp(throttle, -1.0, 1.0) + steer = self._clamp(steer, -1.0, 1.0) + + # ステアリング方向の補正 + if config.STEER_REVERSED: + steer = -steer + + # 差動2輪: 左右の速度を計算 + left = self._clamp(throttle + steer, -1.0, 1.0) + right = self._clamp(throttle - steer, -1.0, 1.0) + + # モーター配線の極性補正 + if config.MOTOR_LEFT_REVERSED: + left = -left + if config.MOTOR_RIGHT_REVERSED: + right = -right + + self._apply_one_motor( + config.MA_IN1, config.MA_IN2, + self._pwm_a, left, + ) + self._apply_one_motor( + config.MB_IN1, config.MB_IN2, + self._pwm_b, right, + ) + + def stop(self) -> None: + """モーターを停止する""" + self.set_drive(0.0, 0.0) + + def cleanup(self) -> None: + """GPIO リソースを解放する""" + if not self._ready: + return + self.stop() + self._pwm_a.stop() + self._pwm_b.stop() + GPIO.cleanup() + self._ready = False