diff --git "a/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" "b/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" index e00c748..8555bb2 100644 --- "a/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" +++ "b/docs/03_TECH/TECH_02_\343\202\267\343\202\271\343\203\206\343\203\240\346\247\213\346\210\220\344\273\225\346\247\230.txt" @@ -14,21 +14,22 @@ 1-1. 全体構成 - Raspberry Pi はカメラ画像の取得とモーター制御を担当し, - PC は画像処理・操舵量計算・GUI 表示を担当する. + Raspberry Pi はカメラ画像の取得・画像処理・操舵量計算・モーター制御を + すべて担当し,PC はモニタリング GUI・パラメータ調整・手動操作を担当する. + 制御ループが Pi 内で完結するため,通信遅延の影響を受けない. - Raspberry Pi PC - ──────────── ── - カメラ撮影 画像受信 + Raspberry Pi PC + ──────────── ── + カメラ撮影 テレメトリ受信 │ │ - └──── 画像送信 ──────> │ - 画像処理・線検出 - │ - 操舵量計算(PD 制御) - │ - ┌──── 操舵量受信 <────── 操舵量送信 - │ - モーター制御 + 画像処理・線検出 映像・状態表示 + │ │ + 操舵量計算(PD 制御) パラメータ調整 + │ │ + モーター制御 コマンド送信 + │ (モード切替・手動操作) + └──── テレメトリ ──────> │ + ┌──── コマンド受信 <────── │ 2. Raspberry Pi 側の処理 (Raspberry Pi Processing) @@ -37,39 +38,11 @@ 2-1. カメラ画像の取得 ・Picamera2 を使用してグレースケール(Y8 フォーマット)でフレームを取得する. - ・取得した画像を JPEG 圧縮して PC に送信する. ※ グレースケールで取得することで,転送データ量を BGR 比で 1/3 に削減する. - 2-2. 操舵量の受信 + 2-2. 画像処理・線検出 - ・PC から送信された操舵量(throttle,steer)を受信する. - - 2-3. モーター制御 - - ・受信した throttle,steer を既存の `MotorDriver.set_drive()` に渡し, - 左右モーターを制御する. - ・差動2輪駆動の計算は既存コードを流用する. - - left = throttle + steer - - right = throttle - steer - ・極性補正・PWM 出力も既存コードに従う. - - 2-4. フェイルセーフ - - ・一定時間(例: 0.5秒)操舵量を受信しなかった場合, - モーターを自動停止する. - ・通信切断時の暴走を防止する. - - -3. PC 側の処理 (PC Processing) ------------------------------------------------------------------------- - - 3-1. 画像の受信 - - ・Raspberry Pi から送信されたカメラ画像を受信する. - - 3-2. 画像処理・線検出 - - ・受信した画像から黒線の位置を検出する. + ・取得した画像から黒線の位置を検出する. ・処理手順: 1. CLAHE によるコントラスト強調 2. ガウシアンブラー(ノイズ除去) @@ -77,23 +50,55 @@ 4. オープニングで孤立ノイズ除去 5. 横方向クロージングで途切れ補間 6. 白ピクセルに2次多項式フィッティング - ※ グレースケール変換は Pi 側(撮影時)で完了しているため不要 - ・位置偏差・傾き・曲率を算出する. + ・位置偏差・傾きを算出する. ・詳細は `TECH_01_操舵量計算仕様.txt` を参照する. - 3-3. 操舵量計算 + 2-3. 操舵量計算 - ・多項式フィッティングから得た位置偏差と傾きで - PD 制御により操舵量を計算する. - ・速度は曲率に応じて動的に調整する. + ・Theil-Sen 直線近似と PD 制御により操舵量を計算する. + ・速度は傾きに応じて動的に調整する. ・レートリミッターで急激な操舵変化を抑制する. + ・十字路判定: SVM 分類器で十字路を検出し,直進に切り替える. + ・コースアウト復帰: 一定時間線を検出できない場合に復帰動作を行う. ・詳細は `TECH_01_操舵量計算仕様.txt` を参照する. - 3-4. 操舵量の送信 + 2-4. モーター制御 - ・計算した throttle,steer を Raspberry Pi に送信する. + ・計算した throttle,steer を `MotorDriver.set_drive()` に渡し, + 左右モーターを制御する. + ・差動2輪駆動の計算: + - left = throttle + steer + - right = throttle - steer + ・極性補正・PWM 出力も既存コードに従う. - 3-5. GUI 表示 + 2-5. テレメトリ送信 + + ・毎フレーム,以下のテレメトリを PC に送信する: + - カメラ画像(JPEG 圧縮) + - 操舵量(throttle,steer) + - 検出結果(検出成否,位置偏差,傾き) + - 十字路判定結果,復帰状態,処理 FPS + - 二値画像(デバッグ用) + ・ZMQ PUB/SUB + CONFLATE で,古いフレームを自動破棄する. + + 2-6. コマンド受信 + + ・PC からのコマンドを非ブロッキングで受信する: + - mode: "auto"(自律走行),"manual"(手動操作),"stop"(停止) + - 手動モード時の throttle,steer + - パラメータ更新(画像処理・操舵・復帰パラメータ) + - 十字路判定の有効/無効 + ・PC が切断しても,auto モードなら自律走行を継続する. + + +3. PC 側の処理 (PC Processing) +------------------------------------------------------------------------ + + 3-1. テレメトリの受信 + + ・Pi から送信されたテレメトリ(画像+検出結果+操舵量)を受信する. + + 3-2. GUI 表示 ■ カメラ映像表示 ・受信した画像をリアルタイムで表示する. @@ -102,15 +107,15 @@ ■ 自動操縦の切り替え ・自動操縦の ON / OFF を切り替えるボタンを設ける. - ・ON: 画像処理の結果に基づいて操舵量を自動計算・送信する. - ・OFF: 手動操作モードに切り替わる. + ・ON: Pi に "auto" コマンドを送信し,自律走行を開始する. + ・OFF: Pi に "manual" コマンドを送信し,手動操作に切り替える. ■ パラメータ調整 ・PD 制御パラメータ(Kp,Kh,Kd 等)をリアルタイムに変更できる UI を設ける. ・二値化パラメータ(二値化閾値,CLAHE 強度等)も リアルタイムに変更できる. - ・変更したパラメータは即座に処理に反映される. + ・変更したパラメータはコマンドとして Pi に送信し,即座に反映される. ■ パラメータ保存・読み込み ・調整したパラメータをタイトル・メモ付きで JSON に保存できる. @@ -119,7 +124,7 @@ ■ 手動操作 ・自動操縦 OFF 時に,ユーザーが手動で車体を操作できる. - ・操作方式は操作性を重視して設計する. + ・キー入力を throttle,steer に変換し,コマンドとして Pi に送信する. 4. 設計方針 (Design Policy) @@ -138,26 +143,54 @@ この入出力を維持する限り,計算の中身(偏差の取り方,制御式, 速度調整の方法等)を自由に変更できる. + 4-2. 制御ループの完結性 + + 画像取得からモーター制御までの制御ループを Pi 内で完結させ, + 通信遅延の影響を排除する.PC はモニタリングとパラメータ調整のみを + 担当し,制御のクリティカルパスには含まれない. + 5. 通信の流れ (Communication Flow) ------------------------------------------------------------------------ 5-1. 全体のループ - 以下のサイクルを毎フレーム繰り返す. - + Pi 側の制御ループ(毎フレーム): 1. Pi: カメラでフレームを取得する. - 2. Pi → PC: 画像を送信する. - 3. PC: 画像処理・線検出を行う. - 4. PC: 操舵量を計算する(自動時)またはユーザー入力を取得する(手動時). - 5. PC → Pi: 操舵量(throttle,steer)を送信する. - 6. Pi: 受信した操舵量でモーターを制御する. + 2. Pi: 画像処理・線検出を行う. + 3. Pi: 操舵量を計算する(自動時). + 4. Pi: モーターを制御する. + 5. Pi → PC: テレメトリを送信する. + 6. Pi: PC からのコマンドを確認する. - 5-2. 通信要件 + PC 側のループ(タイマー駆動): + 1. PC: テレメトリを受信して映像・状態を表示する. + 2. PC → Pi: コマンドを送信する(モード切替・手動操作・パラメータ更新). - ・双方向通信: Pi → PC(画像),PC → Pi(操舵量). - ・低遅延: 画像取得からモーター反映までの遅延を最小限にする. - ※ 遅延が大きいとコースアウトのリスクが増加する. - ・信頼性: パケットロス時の振る舞いを定義する. - - 画像が届かない場合: 前フレームの操舵量を維持する. - - 操舵量が届かない場合: フェイルセーフでモーター停止する. + 5-2. 通信プロトコル + + ■ テレメトリ(Pi → PC,ZMQ PUB/SUB) + ・メッセージ形式: + - 4 バイト: JSON ヘッダ長(uint32 LE) + - N バイト: JSON テレメトリ(操舵量・検出結果・状態) + - 4 バイト: カメラ画像長(uint32 LE) + - M バイト: JPEG 圧縮カメラ画像 + - 残り: JPEG 圧縮二値画像(デバッグ用,省略可) + + ■ コマンド(PC → Pi,ZMQ PUB/SUB) + ・JSON 形式: + - mode: "auto" | "manual" | "stop" + - throttle, steer: 手動モード時の操舵量 + - image_params: 画像処理パラメータの辞書 + - steering_params: 操舵パラメータの辞書 + - recovery_params: 復帰パラメータの辞書 + - intersection_enabled: 十字路判定の有効/無効 + - intersection_throttle: 十字路通過時の速度 + + 5-3. 通信要件 + + ・双方向通信: Pi → PC(テレメトリ),PC → Pi(コマンド). + ・テレメトリ: CONFLATE により最新フレームのみ保持(古いデータを自動破棄). + ・コマンド: 非ブロッキング受信で制御ループを阻害しない. + ・PC 切断時: Pi は最後のモードで動作を継続する. + auto モードなら自律走行を続け,stop モードなら停止を維持する. diff --git "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" index c07a330..a6f4a5a 100644 --- "a/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" +++ "b/docs/04_ENV/ENV_04_\343\203\207\343\202\243\343\203\254\343\202\257\343\203\210\343\203\252\346\247\213\346\210\220.txt" @@ -52,7 +52,7 @@ ├── main.py エントリーポイント ├── review.py データ仕分け GUI エントリーポイント ├── gui/ GUI 関連 - │ ├── main_window.py メインウィンドウ + │ ├── main_window.py メインウィンドウ(モニタリング専用) │ └── panels/ パラメータ調整パネル群 │ ├── collapsible_group_box.py 折りたたみ GroupBox │ ├── control_param_panel.py 制御パラメータ @@ -61,20 +61,21 @@ │ ├── overlay_panel.py デバッグ表示 │ └── recovery_panel.py コースアウト復帰 ├── comm/ 通信関連 - │ └── zmq_client.py ZMQ 送受信 + │ └── zmq_client.py ZMQ テレメトリ受信・コマンド送信 ├── data/ 学習データ収集・仕分け・学習 │ ├── collector.py 二値画像のラベル付き保存 │ ├── reviewer.py 仕分けレビュー GUI │ ├── dataset.py データ読み込み │ └── train.py モデル学習・評価・保存 - ├── steering/ 操舵量計算(独立モジュール) + ├── steering/ 操舵量計算(パラメータ保存・復元用) │ ├── base.py 共通インターフェース │ ├── pd_control.py PD 制御の実装 │ ├── pursuit_control.py 2点パシュート制御の実装 │ ├── ts_pd_control.py Theil-Sen PD 制御の実装 │ ├── param_store.py プリセット保存・読み込み + │ ├── recovery.py コースアウト復帰パラメータ │ └── auto_params.py パラメータ自動保存・復元 - └── vision/ 画像処理 + └── vision/ 画像処理(PC 側は表示・データ収集用) ├── line_detector.py 線検出 API(データクラス・手法ディスパッチ) ├── fitting.py 直線・曲線近似(Theil-Sen・RANSAC・外れ値除去) ├── morphology.py 形態学的処理ユーティリティ @@ -90,10 +91,19 @@ 2-4. src/pi/ pi/ - ├── main.py エントリーポイント + ├── main.py エントリーポイント(自律制御ループ) ├── comm/ 通信関連 - │ └── zmq_client.py ZMQ 送受信 + │ └── zmq_client.py ZMQ テレメトリ送信・コマンド受信 ├── camera/ カメラ関連 │ └── capture.py フレーム取得 - └── motor/ モーター関連 - └── driver.py TB6612FNG 制御 + ├── motor/ モーター関連 + │ └── driver.py TB6612FNG 制御 + ├── vision/ 画像処理(Pi 側で実行) + │ ├── line_detector.py 線検出(現行手法のみ) + │ ├── fitting.py 直線・曲線近似 + │ ├── morphology.py 形態学的処理ユーティリティ + │ └── intersection.py 十字路分類モデルの推論 + └── steering/ 操舵量計算(Pi 側で実行) + ├── base.py 共通インターフェース + ├── ts_pd_control.py Theil-Sen PD 制御の実装 + └── recovery.py コースアウト復帰 diff --git a/src/pc/comm/zmq_client.py b/src/pc/comm/zmq_client.py index f927012..ea2aaef 100644 --- a/src/pc/comm/zmq_client.py +++ b/src/pc/comm/zmq_client.py @@ -1,7 +1,8 @@ """ zmq_client PC 側の ZMQ 通信を担当するモジュール -画像の受信と操舵量の送信を行う +テレメトリ受信(画像+検出結果+操舵量)と +コマンド送信(モード切替・パラメータ更新・手動操作)を行う """ import json @@ -17,89 +18,117 @@ class PcZmqClient: """PC 側の ZMQ 通信クライアント - 画像受信(SUB)と操舵量送信(PUB)の2チャネルを管理する + テレメトリ受信(SUB)とコマンド送信(PUB)の2チャネルを管理する """ def __init__(self) -> None: self._context: zmq.Context | None = None - self._image_socket: zmq.Socket | None = None - self._control_socket: zmq.Socket | None = None - self._last_image_ts: float | None = None + self._telemetry_socket: zmq.Socket | None = None + self._command_socket: zmq.Socket | None = None def start(self) -> None: """通信ソケットを初期化してバインドする""" self._context = zmq.Context() - # 画像受信ソケット(SUB,Pi からの画像を受信) - self._image_socket = self._context.socket(zmq.SUB) - self._image_socket.setsockopt(zmq.CONFLATE, 1) - self._image_socket.setsockopt_string(zmq.SUBSCRIBE, "") - self._image_socket.bind(config.image_bind_address()) + # テレメトリ受信ソケット(SUB,Pi からの画像+状態を受信) + self._telemetry_socket = self._context.socket( + zmq.SUB, + ) + self._telemetry_socket.setsockopt( + zmq.CONFLATE, 1, + ) + self._telemetry_socket.setsockopt_string( + zmq.SUBSCRIBE, "", + ) + self._telemetry_socket.bind( + config.image_bind_address(), + ) - # 操舵量送信ソケット(PUB,Pi へ操舵量を送信) - self._control_socket = self._context.socket(zmq.PUB) - self._control_socket.bind(config.control_bind_address()) + # コマンド送信ソケット(PUB,Pi へコマンドを送信) + self._command_socket = self._context.socket( + zmq.PUB, + ) + self._command_socket.bind( + config.control_bind_address(), + ) - def receive_image(self) -> np.ndarray | None: - """画像をタイムスタンプ付きで非ブロッキング受信する + def receive_telemetry( + self, + ) -> tuple[dict, np.ndarray, np.ndarray | None] | None: + """テレメトリを非ブロッキングで受信する Returns: - 受信したグレースケール画像の NumPy 配列,受信データがない場合は None + (telemetry_dict, camera_frame, binary_image) のタプル, + 受信データがない場合は None. + binary_image はデータがない場合 None """ - if self._image_socket is None: + if self._telemetry_socket is None: return None try: - raw = self._image_socket.recv(zmq.NOBLOCK) - ts_size = struct.calcsize("d") - if len(raw) > ts_size: - self._last_image_ts = struct.unpack( - "d", raw[:ts_size], - )[0] - data = raw[ts_size:] - else: - self._last_image_ts = None - data = raw + raw = self._telemetry_socket.recv( + zmq.NOBLOCK, + ) + offset = 0 + + # JSON ヘッダを読み取り + json_len = struct.unpack_from( + " None: - """操舵量をタイムスタンプ付きで送信する + def send_command(self, command: dict) -> None: + """コマンドを Pi に送信する Args: - throttle: 前後方向の出力 (-1.0 ~ +1.0) - steer: 左右方向の出力 (-1.0 ~ +1.0) + command: コマンド辞書 """ - if self._control_socket is None: + if self._command_socket is None: return - msg: dict = { - "throttle": throttle, - "steer": steer, - } - if self._last_image_ts is not None: - msg["ts"] = self._last_image_ts - payload = json.dumps(msg).encode("utf-8") - self._control_socket.send(payload, zmq.NOBLOCK) - - @property - def last_image_ts(self) -> float | None: - """最後に受信した画像のタイムスタンプを返す""" - return self._last_image_ts + payload = json.dumps(command).encode("utf-8") + self._command_socket.send( + payload, zmq.NOBLOCK, + ) def stop(self) -> None: """通信ソケットを閉じる""" - if self._image_socket is not None: - self._image_socket.close() - self._image_socket = None - if self._control_socket is not None: - self._control_socket.close() - self._control_socket = None + if self._telemetry_socket is not None: + self._telemetry_socket.close() + self._telemetry_socket = None + if self._command_socket is not None: + self._command_socket.close() + self._command_socket = None if self._context is not None: self._context.term() self._context = None diff --git a/src/pc/gui/main_window.py b/src/pc/gui/main_window.py index bf91fdf..a17de83 100644 --- a/src/pc/gui/main_window.py +++ b/src/pc/gui/main_window.py @@ -1,9 +1,11 @@ """ main_window PC 側のメインウィンドウを定義するモジュール -カメラ映像のリアルタイム表示と操作 UI を提供する +Pi からのテレメトリをリアルタイム表示し, +モード切替・パラメータ調整・手動操作のコマンドを送信する """ +import dataclasses import time import cv2 @@ -22,7 +24,6 @@ from common import config from pc.comm.zmq_client import PcZmqClient -from pc.data.collector import DataCollector from pc.gui.panels import ( ControlParamPanel, ImageParamPanel, @@ -43,34 +44,21 @@ save_recovery, save_ts_pd, ) -from pc.steering.base import SteeringBase, SteeringOutput -from pc.steering.pd_control import PdControl, PdParams -from pc.steering.pursuit_control import ( - PursuitControl, - PursuitParams, -) -from pc.steering.recovery import ( - RecoveryController, - RecoveryParams, -) -from pc.steering.ts_pd_control import ( - TsPdControl, - TsPdParams, -) -from pc.vision.fitting import theil_sen_fit +from pc.steering.pd_control import PdParams +from pc.steering.pursuit_control import PursuitParams +from pc.steering.recovery import RecoveryParams +from pc.steering.ts_pd_control import TsPdParams from pc.vision.line_detector import ( ImageParams, LineDetectResult, - detect_line, ) -from pc.vision.intersection import IntersectionClassifier from pc.vision.overlay import draw_overlay # 映像更新間隔 (ms) FRAME_INTERVAL_MS: int = 33 -# 操舵量送信間隔 (ms) -CONTROL_INTERVAL_MS: int = int( +# コマンド送信間隔 (ms) +COMMAND_INTERVAL_MS: int = int( 1000 / config.CONTROL_PUBLISH_HZ ) @@ -83,7 +71,11 @@ class MainWindow(QMainWindow): - """PC 側のメインウィンドウ""" + """PC 側のメインウィンドウ + + Pi からテレメトリを受信して映像・状態を表示し, + コマンドを Pi に送信する + """ def __init__(self) -> None: super().__init__() @@ -96,58 +88,41 @@ self._throttle: float = 0.0 self._steer: float = 0.0 - # 前回のパラメータを復元 + # テレメトリから受け取る状態 + self._pi_detected: bool = False + self._pi_pos_error: float = 0.0 + self._pi_heading: float = 0.0 + self._pi_is_intersection: bool = False + self._pi_is_recovering: bool = False + self._pi_fps: float = 0.0 + + # 前回のパラメータを復元(GUI 表示・コマンド送信用) pd_params, last_method, last_steering = ( load_control() ) image_params = load_detect_params(last_method) - self._pd_control = PdControl( - params=pd_params, - image_params=image_params, - ) + self._image_params = image_params + self._pd_params = pd_params pursuit_params = load_pursuit() - self._pursuit_control = PursuitControl( - params=pursuit_params, - image_params=image_params, - ) + self._pursuit_params = pursuit_params ts_pd_params = load_ts_pd() - self._ts_pd_control = TsPdControl( - params=ts_pd_params, - image_params=image_params, - ) - - # 現在の制御手法("pd", "pursuit", "ts_pd") + self._ts_pd_params = ts_pd_params self._steering_method: str = last_steering - # コースアウト復帰 + # 復帰パラメータ recovery_params = load_recovery() - self._recovery = RecoveryController( - params=recovery_params, - ) + self._recovery_params = recovery_params - # 最新フレームの保持(自動操縦で使用) - self._latest_frame: np.ndarray | None = None - - # 十字路判定結果の保持 - self._is_intersection: bool = False - - # 検出結果の保持 - self._last_detect_result: LineDetectResult | None = ( - None - ) - - # 十字路分類器 - self._intersection_clf = IntersectionClassifier() - - # データ収集 - self._collector = DataCollector() - self._is_intersection_key: bool = False + # 最新のバイナリ画像(テレメトリから) + self._latest_binary: np.ndarray | None = None # パフォーマンス計測 self._recv_frame_count: int = 0 self._recv_fps_start: float = time.time() self._recv_fps: float = 0.0 - self._last_proc_ms: float = 0.0 + + # パラメータ変更フラグ(次のコマンド送信で送る) + self._params_dirty: bool = True self._setup_ui() self._setup_timers() @@ -178,7 +153,7 @@ left_layout.addWidget(self._video_label) self._detect_info_label = QLabel( - "pos: --- head: --- curv: ---" + "pos: --- head: ---" ) self._detect_info_label.setAlignment( Qt.AlignmentFlag.AlignLeft, @@ -191,7 +166,7 @@ left_layout.addWidget(self._detect_info_label) self._perf_label = QLabel( - "recv FPS: --- proc: ---" + "recv FPS: --- Pi FPS: ---" ) self._perf_label.setAlignment( Qt.AlignmentFlag.AlignLeft, @@ -249,7 +224,7 @@ # 二値化パラメータパネル self._image_panel = ImageParamPanel( - self._pd_control.image_params, + self._image_params, ) self._image_panel.image_params_changed.connect( self._on_image_params_changed, @@ -261,15 +236,15 @@ # 十字路判定パネル self._intersection_panel = IntersectionPanel( - available=self._intersection_clf.available, + available=True, ) control_layout.addWidget(self._intersection_panel) # 制御パラメータパネル self._control_panel = ControlParamPanel( - self._pd_control.params, - self._pursuit_control.params, - self._ts_pd_control.params, + self._pd_params, + self._pursuit_params, + self._ts_pd_params, self._steering_method, ) self._control_panel.pd_params_changed.connect( @@ -288,32 +263,13 @@ # コースアウト復帰パネル self._recovery_panel = RecoveryPanel( - self._recovery.params, + self._recovery_params, ) self._recovery_panel.recovery_params_changed.connect( self._on_recovery_params_changed, ) control_layout.addWidget(self._recovery_panel) - # データ収集ボタン - self._record_btn = QPushButton("録画開始") - self._record_btn.setEnabled(False) - self._record_btn.clicked.connect( - self._toggle_recording, - ) - control_layout.addWidget(self._record_btn) - - # 録画ステータス表示 - self._record_label = QLabel("") - self._record_label.setAlignment( - Qt.AlignmentFlag.AlignLeft, - ) - self._record_label.setStyleSheet( - "font-size: 12px; font-family: monospace;" - " color: #f80; padding: 2px;" - ) - control_layout.addWidget(self._record_label) - # デバッグ表示パネル overlay_flags = load_overlay() self._overlay_panel = OverlayPanel(overlay_flags) @@ -328,8 +284,7 @@ "W/↑: 前進 S/↓: 後退\n" "A/←: 左 D/→: 右\n" "Space: 停止\n" - "Q: 自動操縦 切替\n" - "I: 十字路ラベル(録画中,押下中)" + "Q: 自動操縦 切替" ) guide.setAlignment(Qt.AlignmentFlag.AlignLeft) guide.setStyleSheet("font-size: 12px; color: #666;") @@ -337,23 +292,14 @@ control_layout.addStretch() - @property - def _active_control(self) -> SteeringBase: - """現在選択中の制御クラスを返す""" - if self._steering_method == "pursuit": - return self._pursuit_control - if self._steering_method == "ts_pd": - return self._ts_pd_control - return self._pd_control - def _setup_timers(self) -> None: """タイマーを設定する""" self._frame_timer = QTimer(self) self._frame_timer.timeout.connect(self._update_frame) - self._control_timer = QTimer(self) - self._control_timer.timeout.connect( - self._send_control, + self._command_timer = QTimer(self) + self._command_timer.timeout.connect( + self._send_command, ) # ── パネルシグナルのスロット ─────────────────────────── @@ -361,46 +307,48 @@ def _on_image_params_changed( self, ip: ImageParams, ) -> None: - """二値化パラメータの変更を全制御クラスに反映する""" - self._pd_control.image_params = ip - self._pursuit_control.image_params = ip - self._ts_pd_control.image_params = ip + """二値化パラメータの変更をマークする""" + self._image_params = ip + self._params_dirty = True def _on_method_changed(self, method: str) -> None: - """検出手法の変更に合わせて制御設定を保存する""" + """検出手法の変更を保存する""" save_control( - self._pd_control.params, method, + self._pd_params, method, self._steering_method, ) def _on_pd_params_changed(self, p: PdParams) -> None: - """PD パラメータの変更を制御クラスに反映して保存する""" - self._pd_control.params = p + """PD パラメータの変更を保存・マークする""" + self._pd_params = p save_control( - p, self._pd_control.image_params.method, + p, self._image_params.method, self._steering_method, ) + self._params_dirty = True def _on_pursuit_params_changed( self, p: PursuitParams, ) -> None: - """Pursuit パラメータの変更を制御クラスに反映して保存する""" - self._pursuit_control.params = p + """Pursuit パラメータの変更を保存する""" + self._pursuit_params = p save_pursuit(p) def _on_ts_pd_params_changed( self, p: TsPdParams, ) -> None: - """Theil-Sen PD パラメータの変更を反映して保存する""" - self._ts_pd_control.params = p + """Theil-Sen PD パラメータの変更を保存・マークする""" + self._ts_pd_params = p save_ts_pd(p) + self._params_dirty = True def _on_recovery_params_changed( self, p: RecoveryParams, ) -> None: - """復帰パラメータの変更を反映して保存する""" - self._recovery.params = p + """復帰パラメータの変更を保存・マークする""" + self._recovery_params = p save_recovery(p) + self._params_dirty = True def _on_overlay_flags_changed(self) -> None: """オーバーレイフラグの変更を保存する""" @@ -412,10 +360,11 @@ """制御手法の切替を反映して保存する""" self._steering_method = method save_control( - self._pd_control.params, - self._pd_control.image_params.method, + self._pd_params, + self._image_params.method, method, ) + self._params_dirty = True # ── 接続 ────────────────────────────────────────────── @@ -427,33 +376,34 @@ self._connect() def _connect(self) -> None: - """ZMQ 通信を開始して映像受信を始める""" + """ZMQ 通信を開始してテレメトリ受信を始める""" self._zmq_client.start() self._is_connected = True + self._params_dirty = True self._connect_btn.setText("切断") self._auto_btn.setEnabled(True) - self._record_btn.setEnabled(True) self._status_label.setText("接続中 (手動操作)") self._frame_timer.start(FRAME_INTERVAL_MS) - self._control_timer.start(CONTROL_INTERVAL_MS) + self._command_timer.start(COMMAND_INTERVAL_MS) + # 初回接続時に stop コマンドを送信 + self._zmq_client.send_command({"mode": "stop"}) def _disconnect(self) -> None: """ZMQ 通信を停止する""" self._frame_timer.stop() - self._control_timer.stop() - if self._collector.is_recording: - self._stop_recording() + self._command_timer.stop() + # Pi を停止 + if self._is_connected: + self._zmq_client.send_command({"mode": "stop"}) if self._is_auto: self._is_auto = False self._auto_btn.setText("自動操縦 ON") self._zmq_client.stop() self._is_connected = False self._auto_btn.setEnabled(False) - self._record_btn.setEnabled(False) self._pressed_keys.clear() self._throttle = 0.0 self._steer = 0.0 - self._latest_frame = None self._connect_btn.setText("接続開始") self._status_label.setText("未接続") self._video_label.setText("カメラ映像待機中...") @@ -473,11 +423,11 @@ def _enable_auto(self) -> None: """自動操縦を開始する""" self._is_auto = True - self._active_control.reset() - self._recovery.reset() self._pressed_keys.clear() self._auto_btn.setText("自動操縦 OFF") self._status_label.setText("接続中 (自動操縦)") + # Pi にパラメータ付きで auto コマンドを送信 + self._params_dirty = True def _disable_auto(self) -> None: """自動操縦を停止して手動に戻る""" @@ -488,59 +438,49 @@ self._status_label.setText("接続中 (手動操作)") self._update_control_label() - # ── データ収集 ──────────────────────────────────────── - - def _toggle_recording(self) -> None: - """録画の開始 / 停止を切り替える""" - if self._collector.is_recording: - self._stop_recording() - else: - self._start_recording() - - def _start_recording(self) -> None: - """録画を開始する""" - session_dir = self._collector.start() - self._record_btn.setText("録画停止") - self._record_label.setText( - f"録画中: {session_dir.name}\n" - "intersection: 0 normal: 0" - ) - - def _stop_recording(self) -> None: - """録画を停止する""" - n_int = self._collector.count_intersection - n_norm = self._collector.count_normal - self._collector.stop() - self._is_intersection_key = False - self._record_btn.setText("録画開始") - self._record_label.setText( - f"録画完了: intersection={n_int}" - f" normal={n_norm}" - ) - - def _update_record_label(self) -> None: - """録画ステータスの枚数表示を更新する""" - if not self._collector.is_recording: - return - label = ( - "I押下中" if self._is_intersection_key - else "通常" - ) - self._record_label.setText( - f"録画中 [{label}]\n" - f"intersection: {self._collector.count_intersection}" - f" normal: {self._collector.count_normal}" - ) - - # ── 映像更新 ────────────────────────────────────────── + # ── 映像更新(テレメトリ受信) ──────────────────────── def _update_frame(self) -> None: - """タイマーから呼び出され,最新フレームを表示する""" - frame = self._zmq_client.receive_image() - if frame is None: + """タイマーから呼び出され,テレメトリを受信して表示する""" + result = self._zmq_client.receive_telemetry() + if result is None: return - proc_start = time.time() - self._latest_frame = frame + + telemetry, frame, binary = result + + # テレメトリから状態を取得 + self._pi_detected = telemetry.get( + "detected", False, + ) + self._pi_pos_error = telemetry.get( + "pos_error", 0.0, + ) + self._pi_heading = telemetry.get("heading", 0.0) + self._pi_is_intersection = telemetry.get( + "is_intersection", False, + ) + self._pi_is_recovering = telemetry.get( + "is_recovering", False, + ) + self._pi_fps = telemetry.get("fps", 0.0) + + # 自動時は Pi の操舵量を表示 + if self._is_auto: + self._throttle = telemetry.get( + "throttle", 0.0, + ) + self._steer = telemetry.get("steer", 0.0) + self._update_control_label() + + # 十字路パネルの表示更新 + if self._intersection_panel.enabled: + self._intersection_panel.update_result( + self._pi_is_intersection, + ) + else: + self._intersection_panel.clear_result() + + self._latest_binary = binary # 受信 FPS 計測 self._recv_frame_count += 1 @@ -552,177 +492,29 @@ self._recv_frame_count = 0 self._recv_fps_start = time.time() - # 線検出は常に実行(検出情報ラベル表示のため) - if self._is_auto: - ctrl = self._active_control - output = ctrl.compute(frame) - self._last_detect_result = ( - ctrl.last_detect_result - ) + # 検出情報表示 + self._update_detect_info_label() - # 十字路判定: 十字路ならまっすぐ進む - det = self._last_detect_result - self._is_intersection = False - if ( - self._intersection_panel.enabled - and self._intersection_clf.available - and det is not None - and det.binary_image is not None - ): - self._is_intersection = ( - self._intersection_clf.predict( - det.binary_image, - ) - ) - if self._is_intersection: - output = SteeringOutput( - throttle=( - self._intersection_panel.throttle - ), - steer=0.0, - ) - - # コースアウト復帰判定 - detected = det is not None and det.detected - pos_err = ( - det.position_error - if detected and det is not None - else 0.0 - ) - recovery_output = self._recovery.update( - detected, pos_err, - ) - if recovery_output is not None: - output = recovery_output - - # 十字路パネルの表示更新 - if self._intersection_panel.enabled: - self._intersection_panel.update_result( - self._is_intersection, - ) - - self._throttle = output.throttle - self._steer = output.steer - self._update_control_label() - else: - self._is_intersection = False - self._intersection_panel.clear_result() - self._last_detect_result = detect_line( - frame, - self._pd_control.image_params, - ) - - # データ収集: 二値画像を保存 - if self._collector.is_recording: - det = self._last_detect_result - if det is not None and det.binary_image is not None: - self._collector.save( - det.binary_image, - self._is_intersection_key, - ) - self._update_record_label() + # パフォーマンス表示 + self._perf_label.setText( + f"recv FPS: {self._recv_fps:.1f}" + f" Pi FPS: {self._pi_fps:.1f}" + ) self._display_frame(frame) - # PC 処理時間を計測・表示 - self._last_proc_ms = ( - (time.time() - proc_start) * 1000 - ) - self._perf_label.setText( - f"recv FPS: {self._recv_fps:.1f}" - f" proc: {self._last_proc_ms:.1f}ms" - ) - def _update_detect_info_label(self) -> None: """検出情報ラベルを更新する""" - r = self._last_detect_result - if r is None or not r.detected: + if not self._pi_detected: self._detect_info_label.setText( - "pos: --- head: --- curv: ---" + "pos: --- head: ---" ) return self._detect_info_label.setText( - f"pos: {r.position_error:+.3f}" - f" head: {r.heading:+.4f}" - f" curv: {r.curvature:+.6f}" + f"pos: {self._pi_pos_error:+.3f}" + f" head: {self._pi_heading:+.4f}" ) - def _calc_pursuit_points_preview( - self, - ) -> ( - tuple[tuple[float, float], tuple[float, float]] - | None - ): - """手動操作中にパシュート目標点を算出する - - Returns: - ((near_x, near_y), (far_x, far_y)) または None - """ - r = self._last_detect_result - if r is None or not r.detected: - return None - if r.row_centers is None: - return None - - centers = r.row_centers - valid = ~np.isnan(centers) - ys = np.where(valid)[0].astype(float) - xs = centers[valid] - if len(ys) < 2: - return None - - slope, intercept = theil_sen_fit(ys, xs) - h = len(centers) - p = self._pursuit_control.params - near_y = h * p.near_ratio - far_y = h * p.far_ratio - near_x = slope * near_y + intercept - far_x = slope * far_y + intercept - return ((near_x, near_y), (far_x, far_y)) - - def _calc_ts_pd_line_preview( - self, - ) -> ( - tuple[tuple[float, float], tuple[float, float]] - | None - ): - """Theil-Sen PD の近似直線を表示用に算出する - - 直線の上端と下端の 2 点を返す - - Returns: - ((bottom_x, bottom_y), (top_x, top_y)) または None - """ - if self._is_auto: - fit = self._ts_pd_control.last_fit_line - if fit is None: - return None - slope, intercept = fit - r = self._last_detect_result - if r is None or r.row_centers is None: - return None - h = len(r.row_centers) - else: - r = self._last_detect_result - if r is None or not r.detected: - return None - if r.row_centers is None: - return None - centers = r.row_centers - valid = ~np.isnan(centers) - ys = np.where(valid)[0].astype(float) - xs = centers[valid] - if len(ys) < 2: - return None - slope, intercept = theil_sen_fit(ys, xs) - h = len(centers) - - bottom_y = float(h - 1) - top_y = 0.0 - bottom_x = slope * bottom_y + intercept - top_x = slope * top_y + intercept - return ((bottom_x, bottom_y), (top_x, top_y)) - def _display_frame(self, frame: np.ndarray) -> None: """NumPy 配列の画像を QLabel に表示する @@ -732,31 +524,25 @@ # グレースケール → BGR 変換(カラーオーバーレイ描画のため) bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) - # オーバーレイ描画 - pursuit_pts = None - if self._steering_method == "pursuit": - if self._is_auto: - pursuit_pts = ( - self._pursuit_control - .last_pursuit_points - ) - else: - pursuit_pts = ( - self._calc_pursuit_points_preview() - ) - elif self._steering_method == "ts_pd": - pursuit_pts = ( - self._calc_ts_pd_line_preview() + # テレメトリから簡易的な LineDetectResult を構築して + # オーバーレイ描画に渡す + detect_result = None + if self._pi_detected and self._latest_binary is not None: + detect_result = LineDetectResult( + detected=True, + position_error=self._pi_pos_error, + heading=self._pi_heading, + curvature=0.0, + poly_coeffs=None, + row_centers=None, + binary_image=self._latest_binary, ) - bgr = draw_overlay( - bgr, self._last_detect_result, - self._overlay_panel.get_flags(), - pursuit_points=pursuit_pts, - is_intersection=self._is_intersection, - ) - # 検出情報をラベルに表示 - self._update_detect_info_label() + bgr = draw_overlay( + bgr, detect_result, + self._overlay_panel.get_flags(), + is_intersection=self._pi_is_intersection, + ) # BGR → RGB 変換 rgb = bgr[:, :, ::-1].copy() @@ -788,12 +574,6 @@ self._toggle_auto() return - # I キーで十字路ラベル(録画中のみ) - if event.key() == Qt.Key.Key_I: - if self._collector.is_recording: - self._is_intersection_key = True - return - # 自動操縦中はキー操作を無視 if self._is_auto: return @@ -806,11 +586,6 @@ if event.isAutoRepeat(): return - # I キーの離上 - if event.key() == Qt.Key.Key_I: - self._is_intersection_key = False - return - if self._is_auto: return self._pressed_keys.discard(event.key()) @@ -869,19 +644,54 @@ f"throttle: {self._throttle:+.2f}\n" f"steer: {self._steer:+.2f}" ) - if self._is_auto and self._is_intersection: + if self._is_auto and self._pi_is_intersection: text += "\n[十字路]" - if self._is_auto and self._recovery.is_recovering: + if self._is_auto and self._pi_is_recovering: text += "\n[復帰中]" self._control_label.setText(text) - def _send_control(self) -> None: - """操舵量を Pi に送信する""" + def _send_command(self) -> None: + """コマンドを Pi に送信する""" if not self._is_connected: return - self._zmq_client.send_control( - self._throttle, self._steer, + + cmd: dict = {} + + # モード + if self._is_auto: + cmd["mode"] = "auto" + elif ( + self._throttle != 0.0 + or self._steer != 0.0 + ): + cmd["mode"] = "manual" + cmd["throttle"] = self._throttle + cmd["steer"] = self._steer + else: + cmd["mode"] = "stop" + + # 十字路設定 + cmd["intersection_enabled"] = ( + self._intersection_panel.enabled ) + cmd["intersection_throttle"] = ( + self._intersection_panel.throttle + ) + + # パラメータ更新(変更があった場合のみ) + if self._params_dirty: + cmd["image_params"] = dataclasses.asdict( + self._image_params, + ) + cmd["steering_params"] = dataclasses.asdict( + self._ts_pd_params, + ) + cmd["recovery_params"] = dataclasses.asdict( + self._recovery_params, + ) + self._params_dirty = False + + self._zmq_client.send_command(cmd) def closeEvent(self, event) -> None: """ウィンドウを閉じるときに通信を停止する""" diff --git a/src/pi/comm/zmq_client.py b/src/pi/comm/zmq_client.py index 93aa4dd..bb0c701 100644 --- a/src/pi/comm/zmq_client.py +++ b/src/pi/comm/zmq_client.py @@ -1,7 +1,8 @@ """ zmq_client Pi 側の ZMQ 通信を担当するモジュール -画像の送信と操舵量の受信を行う +テレメトリ送信(画像+検出結果+操舵量)と +コマンド受信(モード切替・パラメータ更新・手動操作)を行う """ import json @@ -18,94 +19,147 @@ class PiZmqClient: """Pi 側の ZMQ 通信クライアント - 画像送信(PUB)と操舵量受信(SUB)の2チャネルを管理する + テレメトリ送信(PUB)とコマンド受信(SUB)の2チャネルを管理する """ def __init__(self) -> None: self._context = zmq.Context() - self._image_socket: zmq.Socket | None = None - self._control_socket: zmq.Socket | None = None - self._last_receive_time: float = 0.0 - self._last_rtt: float | None = None + self._telemetry_socket: zmq.Socket | None = None + self._command_socket: zmq.Socket | None = None def start(self) -> None: """通信ソケットを初期化して接続する""" - # 画像送信ソケット(PUB,PC へ画像を送信) - self._image_socket = self._context.socket(zmq.PUB) - self._image_socket.setsockopt(zmq.CONFLATE, 1) - self._image_socket.connect(config.image_connect_address()) + # テレメトリ送信ソケット(PUB,PC へ画像+状態を送信) + self._telemetry_socket = self._context.socket( + zmq.PUB, + ) + self._telemetry_socket.setsockopt(zmq.CONFLATE, 1) + self._telemetry_socket.connect( + config.image_connect_address(), + ) - # 操舵量受信ソケット(SUB,PC からの操舵量を受信) - self._control_socket = self._context.socket(zmq.SUB) - self._control_socket.setsockopt(zmq.CONFLATE, 1) - self._control_socket.setsockopt_string(zmq.SUBSCRIBE, "") - self._control_socket.connect(config.control_connect_address()) + # コマンド受信ソケット(SUB,PC からのコマンドを受信) + self._command_socket = self._context.socket( + zmq.SUB, + ) + self._command_socket.setsockopt(zmq.CONFLATE, 1) + self._command_socket.setsockopt_string( + zmq.SUBSCRIBE, "", + ) + self._command_socket.connect( + config.control_connect_address(), + ) - self._last_receive_time = time.time() + def send_telemetry( + self, + frame: np.ndarray, + throttle: float, + steer: float, + detected: bool, + position_error: float, + heading: float, + is_intersection: bool, + is_recovering: bool, + fps: float, + binary_image: np.ndarray | None = None, + ) -> None: + """テレメトリ(JSON ヘッダ + JPEG 画像)を送信する - def send_image(self, frame: np.ndarray) -> None: - """画像を JPEG 圧縮してタイムスタンプ付きで送信する + メッセージ形式: + 4 バイト: JSON 長(uint32 LE) + N バイト: JSON テレメトリ + 残り: JPEG 画像(カメラ映像) + (binary_image がある場合はさらに続く) Args: frame: カメラから取得した画像の NumPy 配列 + throttle: 現在の throttle 出力 + steer: 現在の steer 出力 + detected: 線が検出できたか + position_error: 位置偏差 + heading: 線の傾き + is_intersection: 十字路と判定されたか + is_recovering: 復帰動作中か + fps: Pi 側の処理 FPS + binary_image: 二値画像(None で省略) """ - if self._image_socket is None: + if self._telemetry_socket is None: return - _, encoded = cv2.imencode( + + telemetry: dict = { + "ts": time.time(), + "throttle": throttle, + "steer": steer, + "detected": detected, + "pos_error": position_error, + "heading": heading, + "is_intersection": is_intersection, + "is_recovering": is_recovering, + "fps": fps, + } + + # JSON ヘッダをエンコード + json_bytes = json.dumps(telemetry).encode("utf-8") + json_len = struct.pack(" tuple[float, float] | None: - """操舵量を非ブロッキングで受信する + # 二値画像を JPEG 圧縮(ある場合) + bin_bytes = b"" + if binary_image is not None: + _, bin_encoded = cv2.imencode( + ".jpg", + binary_image, + [cv2.IMWRITE_JPEG_QUALITY, 80], + ) + bin_bytes = bin_encoded.tobytes() + + # メッセージ: JSON長 + JSON + CAM長 + CAM + BIN + msg = ( + json_len + json_bytes + + cam_len + cam_bytes + + bin_bytes + ) + self._telemetry_socket.send(msg, zmq.NOBLOCK) + + def receive_command(self) -> dict | None: + """PC からのコマンドを非ブロッキングで受信する + + コマンド形式(JSON): + mode: "auto" | "manual" | "stop" + throttle: float(手動モード時のみ) + steer: float(手動モード時のみ) + image_params: dict(パラメータ更新,省略可) + steering_params: dict(パラメータ更新,省略可) + recovery_params: dict(パラメータ更新,省略可) + intersection_enabled: bool(省略可) + intersection_throttle: float(省略可) Returns: - (throttle, steer) のタプル,受信データがない場合は None + コマンド辞書,受信データがない場合は None """ - if self._control_socket is None: + if self._command_socket is None: return None try: - data = self._control_socket.recv(zmq.NOBLOCK) - payload = json.loads(data.decode("utf-8")) - self._last_receive_time = time.time() - - # ラウンドトリップ計測 - if "ts" in payload: - rtt = time.time() - payload["ts"] - self._last_rtt = rtt - - return (payload["throttle"], payload["steer"]) + data = self._command_socket.recv(zmq.NOBLOCK) + return json.loads(data.decode("utf-8")) except zmq.Again: return None - @property - def last_rtt(self) -> float | None: - """最後に計測したラウンドトリップ時間(秒)を返す""" - return self._last_rtt - - def is_timeout(self) -> bool: - """操舵量の受信がタイムアウトしたか判定する - - Returns: - タイムアウトしていれば True - """ - elapsed = time.time() - self._last_receive_time - return elapsed > config.CONTROL_TIMEOUT_SEC - def stop(self) -> None: """通信ソケットを閉じる""" - if self._image_socket is not None: - self._image_socket.close() - self._image_socket = None - if self._control_socket is not None: - self._control_socket.close() - self._control_socket = None + if self._telemetry_socket is not None: + self._telemetry_socket.close() + self._telemetry_socket = None + if self._command_socket is not None: + self._command_socket.close() + self._command_socket = None self._context.term() diff --git a/src/pi/main.py b/src/pi/main.py index 129fe8a..8b38581 100644 --- a/src/pi/main.py +++ b/src/pi/main.py @@ -1,14 +1,29 @@ """ main Pi 側アプリケーションのエントリーポイント -カメラ画像の送信と操舵量の受信・モーター制御を行う +カメラ画像の取得・画像処理・操舵量計算・モーター制御を +すべて Pi 上で完結させる +PC にはテレメトリ(画像+状態)を送信し, +PC からはコマンド(モード切替・パラメータ更新)を受信する """ +import dataclasses import time from pi.camera.capture import CameraCapture from pi.comm.zmq_client import PiZmqClient from pi.motor.driver import MotorDriver +from pi.steering.base import SteeringOutput +from pi.steering.recovery import ( + RecoveryController, + RecoveryParams, +) +from pi.steering.ts_pd_control import ( + TsPdControl, + TsPdParams, +) +from pi.vision.intersection import IntersectionClassifier +from pi.vision.line_detector import ImageParams def main() -> None: @@ -17,50 +32,164 @@ zmq_client = PiZmqClient() motor = MotorDriver() + # 操舵制御 + steering = TsPdControl() + recovery = RecoveryController() + + # 十字路分類器(遅延読み込み) + intersection_clf = IntersectionClassifier() + intersection_enabled = False + intersection_throttle = 0.3 + + # モード管理 + mode = "stop" # "auto", "manual", "stop" + manual_throttle = 0.0 + manual_steer = 0.0 + + # FPS 計測用 + frame_count = 0 + fps_start = time.time() + current_fps = 0.0 + LOG_INTERVAL_SEC = 3.0 + try: camera.start() zmq_client.start() motor.start() - print("Pi: カメラ・通信・モーターを開始") - - # FPS / RTT 計測用 - frame_count = 0 - fps_start = time.time() - LOG_INTERVAL_SEC = 3.0 + print("Pi: カメラ・通信・モーターを開始(自律モード)") while True: - # カメラ画像を取得して送信 + # ── コマンド受信 ──────────────────────── + cmd = zmq_client.receive_command() + if cmd is not None: + _apply_command( + cmd, steering, recovery, + intersection_clf, + ) + # モード更新 + if "mode" in cmd: + new_mode = cmd["mode"] + if new_mode != mode: + if new_mode == "auto": + steering.reset() + recovery.reset() + mode = new_mode + print(f"Pi: モード変更 → {mode}") + # 手動操作値 + if mode == "manual": + manual_throttle = cmd.get( + "throttle", 0.0, + ) + manual_steer = cmd.get("steer", 0.0) + # 十字路設定 + if "intersection_enabled" in cmd: + intersection_enabled = cmd[ + "intersection_enabled" + ] + if "intersection_throttle" in cmd: + intersection_throttle = cmd[ + "intersection_throttle" + ] + + # ── カメラ画像取得 ────────────────────── frame = camera.capture() - zmq_client.send_image(frame) frame_count += 1 - # 操舵量を受信してモーターに反映 - control = zmq_client.receive_control() - if control is not None: - throttle, steer = control + # ── 操舵量決定 ───────────────────────── + throttle = 0.0 + steer = 0.0 + detected = False + position_error = 0.0 + heading = 0.0 + is_intersection = False + is_recovering = False + binary_image = None + + if mode == "auto": + # 線検出 + PD 制御 + output = steering.compute(frame) + det = steering.last_detect_result + + detected = ( + det is not None and det.detected + ) + position_error = ( + det.position_error + if detected and det is not None + else 0.0 + ) + heading = ( + det.heading + if detected and det is not None + else 0.0 + ) + binary_image = ( + det.binary_image + if det is not None else None + ) + + # 十字路判定 + if ( + intersection_enabled + and intersection_clf.available + and det is not None + and det.binary_image is not None + ): + is_intersection = ( + intersection_clf.predict( + det.binary_image, + ) + ) + if is_intersection: + output = SteeringOutput( + throttle=intersection_throttle, + steer=0.0, + ) + + # コースアウト復帰 + recovery_output = recovery.update( + detected, position_error, + ) + if recovery_output is not None: + output = recovery_output + is_recovering = recovery.is_recovering + + throttle = output.throttle + steer = output.steer + + elif mode == "manual": + throttle = manual_throttle + steer = manual_steer + + # mode == "stop" なら throttle=0, steer=0 + + # ── モーター制御 ─────────────────────── + if mode == "stop": + motor.stop() + else: motor.set_drive(throttle, steer) - # タイムアウト時はモーター停止 - if zmq_client.is_timeout(): - motor.stop() - - # 定期的に FPS と RTT を表示 + # ── FPS 計測 ─────────────────────────── elapsed = time.time() - fps_start if elapsed >= LOG_INTERVAL_SEC: - fps = frame_count / elapsed - rtt = zmq_client.last_rtt - rtt_ms = ( - f"{rtt * 1000:.1f}ms" - if rtt is not None else "---" - ) - print( - f"Pi: カメラ FPS={fps:.1f}" - f" RTT={rtt_ms}" - ) + current_fps = frame_count / elapsed + print(f"Pi: FPS={current_fps:.1f}") frame_count = 0 fps_start = time.time() - time.sleep(0.01) + # ── テレメトリ送信 ───────────────────── + zmq_client.send_telemetry( + frame=frame, + throttle=throttle, + steer=steer, + detected=detected, + position_error=position_error, + heading=heading, + is_intersection=is_intersection, + is_recovering=is_recovering, + fps=current_fps, + binary_image=binary_image, + ) except KeyboardInterrupt: print("\nPi: 終了") @@ -70,5 +199,56 @@ zmq_client.stop() +def _apply_command( + cmd: dict, + steering: TsPdControl, + recovery: RecoveryController, + intersection_clf: IntersectionClassifier, +) -> None: + """コマンドからパラメータ更新を適用する + + Args: + cmd: 受信したコマンド辞書 + steering: 操舵制御クラス + recovery: 復帰制御クラス + intersection_clf: 十字路分類器 + """ + # 画像処理パラメータの更新 + if "image_params" in cmd: + ip = cmd["image_params"] + current = steering.image_params + for key, value in ip.items(): + if hasattr(current, key): + setattr(current, key, value) + + # 操舵パラメータの更新 + if "steering_params" in cmd: + sp = cmd["steering_params"] + current = steering.params + for key, value in sp.items(): + if hasattr(current, key): + setattr(current, key, value) + + # 復帰パラメータの更新 + if "recovery_params" in cmd: + rp = cmd["recovery_params"] + current = recovery.params + for key, value in rp.items(): + if hasattr(current, key): + setattr(current, key, value) + + # 十字路分類器の遅延読み込み + if ( + cmd.get("intersection_enabled", False) + and not intersection_clf.available + ): + print("Pi: 十字路分類器を読み込み中...") + intersection_clf.load() + if intersection_clf.available: + print("Pi: 十字路分類器の読み込み完了") + else: + print("Pi: 十字路分類器が見つかりません") + + if __name__ == "__main__": main() diff --git a/src/pi/steering/__init__.py b/src/pi/steering/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/pi/steering/__init__.py diff --git a/src/pi/steering/base.py b/src/pi/steering/base.py new file mode 100644 index 0000000..40eff1a --- /dev/null +++ b/src/pi/steering/base.py @@ -0,0 +1,50 @@ +""" +base +操舵量計算の共通インターフェースを定義するモジュール +全ての操舵量計算クラスはこのインターフェースに従う +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class SteeringOutput: + """操舵量計算の出力を格納するデータクラス + + Attributes: + throttle: 前後方向の出力 (-1.0 ~ +1.0) + steer: 左右方向の出力 (-1.0 ~ +1.0) + """ + throttle: float + steer: float + + +class SteeringBase(ABC): + """操舵量計算の基底クラス + + 全ての操舵量計算クラスはこのクラスを継承し, + compute メソッドを実装する + """ + + @abstractmethod + def compute( + self, frame: np.ndarray, + ) -> SteeringOutput: + """カメラ画像から操舵量を計算する + + Args: + frame: BGR 形式のカメラ画像 + + Returns: + 計算された操舵量 + """ + + @abstractmethod + def reset(self) -> None: + """内部状態をリセットする + + 自動操縦の開始時に呼び出される + """ diff --git a/src/pi/steering/recovery.py b/src/pi/steering/recovery.py new file mode 100644 index 0000000..7170dc8 --- /dev/null +++ b/src/pi/steering/recovery.py @@ -0,0 +1,104 @@ +""" +recovery +コースアウト復帰のパラメータと判定ロジックを定義するモジュール +黒線を一定時間検出できなかった場合に, +最後に検出した方向へ旋回しながら走行して復帰する +""" + +import time +from dataclasses import dataclass + +from pi.steering.base import SteeringOutput + + +@dataclass +class RecoveryParams: + """コースアウト復帰のパラメータ + + Attributes: + enabled: 復帰機能の有効/無効 + timeout_sec: 線を見失ってから復帰動作を開始するまでの時間 + steer_amount: 復帰時の操舵量(0.0 ~ 1.0) + throttle: 復帰時の速度(負: 後退,正: 前進) + """ + enabled: bool = True + timeout_sec: float = 0.5 + steer_amount: float = 0.5 + throttle: float = -0.3 + + +class RecoveryController: + """コースアウト復帰の判定と操舵量算出を行うクラス + + 自動操縦中にフレームごとに呼び出し, + 線検出の成否を記録する.一定時間検出できなかった場合に + 復帰用の操舵量を返す + """ + + def __init__( + self, + params: RecoveryParams | None = None, + ) -> None: + self.params: RecoveryParams = ( + params or RecoveryParams() + ) + self._last_detected_time: float = 0.0 + self._last_error_sign: float = 0.0 + self._is_recovering: bool = False + + def reset(self) -> None: + """内部状態をリセットする + + 自動操縦の開始時に呼び出す + """ + self._last_detected_time = time.time() + self._last_error_sign = 0.0 + self._is_recovering = False + + def update( + self, + detected: bool, + position_error: float = 0.0, + ) -> SteeringOutput | None: + """検出結果を記録し,復帰が必要なら操舵量を返す + + Args: + detected: 線が検出できたか + position_error: 検出時の位置偏差(正: 線が左) + + Returns: + 復帰操舵量,または None(通常走行を継続) + """ + if not self.params.enabled: + return None + + now = time.time() + + if detected: + self._last_detected_time = now + if position_error != 0.0: + self._last_error_sign = ( + 1.0 if position_error > 0 else -1.0 + ) + self._is_recovering = False + return None + + elapsed = now - self._last_detected_time + if elapsed < self.params.timeout_sec: + return None + + # 復帰モード: 最後に検出した方向へ旋回 + self._is_recovering = True + steer = ( + self._last_error_sign + * self.params.steer_amount + ) + return SteeringOutput( + throttle=self.params.throttle, + steer=steer, + ) + + @property + def is_recovering(self) -> bool: + """現在復帰動作中かどうかを返す""" + return self._is_recovering diff --git a/src/pi/steering/ts_pd_control.py b/src/pi/steering/ts_pd_control.py new file mode 100644 index 0000000..a30744b --- /dev/null +++ b/src/pi/steering/ts_pd_control.py @@ -0,0 +1,160 @@ +""" +ts_pd_control +Theil-Sen 直線近似による PD 制御モジュール +行中心点に Theil-Sen 直線をフィッティングし, +位置偏差・傾き・微分項から操舵量を算出する +""" + +import time +from dataclasses import dataclass + +import numpy as np + +from common import config +from pi.steering.base import SteeringBase, SteeringOutput +from pi.vision.fitting import theil_sen_fit +from pi.vision.line_detector import ( + ImageParams, + LineDetectResult, + detect_line, +) + + +@dataclass +class TsPdParams: + """Theil-Sen PD 制御のパラメータ + + Attributes: + kp: 位置偏差ゲイン + kh: 傾き(Theil-Sen slope)ゲイン + kd: 微分ゲイン + max_steer_rate: 1フレームあたりの最大操舵変化量 + max_throttle: 直線での最大速度 + speed_k: 傾きベースの減速係数 + """ + kp: float = 0.5 + kh: float = 0.3 + kd: float = 0.1 + max_steer_rate: float = 0.1 + max_throttle: float = 0.4 + speed_k: float = 2.0 + + +class TsPdControl(SteeringBase): + """Theil-Sen 直線近似による PD 制御クラス + + 行中心点から Theil-Sen 直線近似を行い, + 画像下端での位置偏差と直線の傾きから PD 制御で操舵量を計算する + """ + + def __init__( + self, + params: TsPdParams | None = None, + image_params: ImageParams | None = None, + ) -> None: + self.params: TsPdParams = ( + params or TsPdParams() + ) + self.image_params: ImageParams = ( + image_params or ImageParams() + ) + self._prev_error: float = 0.0 + self._prev_time: float = 0.0 + self._prev_steer: float = 0.0 + self._last_result: LineDetectResult | None = None + + def compute( + self, frame: np.ndarray, + ) -> SteeringOutput: + """カメラ画像から Theil-Sen PD 制御で操舵量を計算する + + Args: + frame: グレースケールのカメラ画像 + + Returns: + 計算された操舵量 + """ + p = self.params + + # 線検出 + result = detect_line(frame, self.image_params) + self._last_result = result + + if not result.detected or result.row_centers is None: + return SteeringOutput( + throttle=0.0, steer=0.0, + ) + + centers = result.row_centers + + # 有効な点(NaN でない行)を抽出 + valid = ~np.isnan(centers) + ys = np.where(valid)[0].astype(float) + xs = centers[valid] + + if len(ys) < 2: + return SteeringOutput( + throttle=0.0, steer=0.0, + ) + + # Theil-Sen 直線近似: x = slope * y + intercept + slope, intercept = theil_sen_fit(ys, xs) + + center_x = config.FRAME_WIDTH / 2.0 + h = len(centers) + + # 画像下端での位置偏差 + bottom_x = slope * (h - 1) + intercept + position_error = (center_x - bottom_x) / center_x + + # 操舵量: P 項(位置偏差)+ Heading 項(傾き) + error = p.kp * position_error + p.kh * slope + + # 時間差分の計算 + now = time.time() + dt = ( + now - self._prev_time + if self._prev_time > 0 + else 0.033 + ) + dt = max(dt, 0.001) + + # D 項(微分項) + derivative = (error - self._prev_error) / dt + steer = error + p.kd * derivative + + # 操舵量のクランプ + steer = max(-1.0, min(1.0, steer)) + + # レートリミッター + delta = steer - self._prev_steer + max_delta = p.max_steer_rate + delta = max(-max_delta, min(max_delta, delta)) + steer = self._prev_steer + delta + + # 速度制御(傾きベース) + throttle = p.max_throttle - p.speed_k * abs(slope) + throttle = max(0.0, throttle) + + # 状態の更新 + self._prev_error = error + self._prev_time = now + self._prev_steer = steer + + return SteeringOutput( + throttle=throttle, steer=steer, + ) + + def reset(self) -> None: + """内部状態をリセットする""" + self._prev_error = 0.0 + self._prev_time = 0.0 + self._prev_steer = 0.0 + self._last_result = None + + @property + def last_detect_result( + self, + ) -> LineDetectResult | None: + """直近の線検出結果を取得する""" + return self._last_result diff --git a/src/pi/vision/__init__.py b/src/pi/vision/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/pi/vision/__init__.py diff --git a/src/pi/vision/detectors/__init__.py b/src/pi/vision/detectors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/pi/vision/detectors/__init__.py diff --git a/src/pi/vision/fitting.py b/src/pi/vision/fitting.py new file mode 100644 index 0000000..d8122c0 --- /dev/null +++ b/src/pi/vision/fitting.py @@ -0,0 +1,147 @@ +""" +fitting +直線・曲線近似の共通ユーティリティモジュール +Theil-Sen 推定,RANSAC,外れ値除去付きフィッティングを提供する +""" + +import numpy as np + +# フィッティングに必要な最小行数 +MIN_FIT_ROWS: int = 10 + +# 近傍外れ値除去の設定 +NEIGHBOR_HALF_WINDOW: int = 3 +NEIGHBOR_FILTER_PASSES: int = 3 + +# 残差ベース反復除去の最大回数 +RESIDUAL_REMOVAL_ITERATIONS: int = 5 + + +def theil_sen_fit( + y: np.ndarray, + x: np.ndarray, +) -> tuple[float, float]: + """Theil-Sen 推定で直線 x = slope * y + intercept を求める + + 全ペアの傾きの中央値を使い,外れ値に強い直線近似を行う + + Args: + y: y 座標の配列(行番号) + x: x 座標の配列(各行の中心) + + Returns: + (slope, intercept) のタプル + """ + n = len(y) + slopes = [] + for i in range(n): + for j in range(i + 1, n): + dy = y[j] - y[i] + if dy != 0: + slopes.append((x[j] - x[i]) / dy) + + if len(slopes) == 0: + return 0.0, float(np.median(x)) + + slope = float(np.median(slopes)) + intercept = float(np.median(x - slope * y)) + return slope, intercept + + +def clean_and_fit( + cy: np.ndarray, + cx: np.ndarray, + median_ksize: int, + neighbor_thresh: float, + residual_thresh: float = 0.0, + weights: np.ndarray | None = None, + ransac_thresh: float = 0.0, + ransac_iter: int = 0, +) -> np.ndarray | None: + """外れ値除去+重み付きフィッティングを行う + + Args: + cy: 中心点の y 座標配列 + cx: 中心点の x 座標配列 + median_ksize: 移動メディアンのカーネルサイズ(0 で無効) + neighbor_thresh: 近傍外れ値除去の閾値 px(0 で無効) + residual_thresh: 残差除去の閾値 px(0 で無効) + weights: 各点の信頼度(None で均等) + ransac_thresh: RANSAC 閾値(0 以下で無効) + ransac_iter: RANSAC 反復回数 + + Returns: + 多項式係数(フィッティング失敗時は None) + """ + if len(cy) < MIN_FIT_ROWS: + return None + + cx_clean = cx.copy() + mask = np.ones(len(cy), dtype=bool) + + # (1) 移動メディアンフィルタ + if median_ksize >= 3: + k = median_ksize | 1 + half = k // 2 + for i in range(len(cx_clean)): + lo = max(0, i - half) + hi = min(len(cx_clean), i + half + 1) + cx_clean[i] = float(np.median(cx[lo:hi])) + + # (2) 近傍外れ値除去(複数パス) + if neighbor_thresh > 0: + half_n = NEIGHBOR_HALF_WINDOW + for _ in range(NEIGHBOR_FILTER_PASSES): + new_mask = np.ones(len(cx_clean), dtype=bool) + for i in range(len(cx_clean)): + if not mask[i]: + continue + lo = max(0, i - half_n) + hi = min(len(cx_clean), i + half_n + 1) + neighbors = cx_clean[lo:hi][mask[lo:hi]] + if len(neighbors) == 0: + new_mask[i] = False + continue + local_med = float(np.median(neighbors)) + if abs(cx_clean[i] - local_med) > neighbor_thresh: + new_mask[i] = False + if np.array_equal(mask, mask & new_mask): + break + mask = mask & new_mask + + cy = cy[mask] + cx_clean = cx_clean[mask] + if weights is not None: + weights = weights[mask] + + if len(cy) < MIN_FIT_ROWS: + return None + + # (3) フィッティング + if weights is not None: + coeffs = np.polyfit(cy, cx_clean, 2, w=weights) + else: + coeffs = np.polyfit(cy, cx_clean, 2) + + # (4) 残差ベースの反復除去 + if residual_thresh > 0: + for _ in range(RESIDUAL_REMOVAL_ITERATIONS): + poly = np.poly1d(coeffs) + residuals = np.abs(cx_clean - poly(cy)) + inlier = residuals < residual_thresh + if np.all(inlier): + break + if np.sum(inlier) < MIN_FIT_ROWS: + break + cy = cy[inlier] + cx_clean = cx_clean[inlier] + if weights is not None: + weights = weights[inlier] + if weights is not None: + coeffs = np.polyfit( + cy, cx_clean, 2, w=weights, + ) + else: + coeffs = np.polyfit(cy, cx_clean, 2) + + return coeffs diff --git a/src/pi/vision/intersection.py b/src/pi/vision/intersection.py new file mode 100644 index 0000000..46788a4 --- /dev/null +++ b/src/pi/vision/intersection.py @@ -0,0 +1,66 @@ +""" +intersection +十字路分類モデルの読み込みと推論を行うモジュール + +学習済みモデルとスケーラを読み込み, +二値画像から十字路かどうかを判定する +""" + +from pathlib import Path + +import numpy as np + +from common.json_utils import PARAMS_DIR + +# モデル・スケーラの保存先 +_MODEL_PATH: Path = PARAMS_DIR / "intersection_model.pkl" +_SCALER_PATH: Path = PARAMS_DIR / "intersection_scaler.pkl" + + +class IntersectionClassifier: + """十字路分類器 + + 学習済みモデルを読み込み,二値画像から + 十字路かどうかを判定する + scikit-learn を遅延インポートして起動時間を短縮する + """ + + def __init__(self) -> None: + self._model: object | None = None + self._scaler: object | None = None + self._available: bool = False + + def load(self) -> None: + """モデルとスケーラを読み込む(遅延呼び出し用)""" + if not _MODEL_PATH.exists(): + return + if not _SCALER_PATH.exists(): + return + import joblib + + self._model = joblib.load(_MODEL_PATH) + self._scaler = joblib.load(_SCALER_PATH) + self._available = True + + @property + def available(self) -> bool: + """モデルが利用可能かどうか""" + return self._available + + def predict(self, binary_image: np.ndarray) -> bool: + """二値画像が十字路かどうかを判定する + + Args: + binary_image: 40×30 の二値画像(0/255) + + Returns: + 十字路なら True + """ + if not self._available: + return False + flat = (binary_image.flatten() / 255.0).astype( + np.float32, + ) + x = self._scaler.transform(flat.reshape(1, -1)) + pred = self._model.predict(x) + return bool(pred[0] == 1) diff --git a/src/pi/vision/line_detector.py b/src/pi/vision/line_detector.py new file mode 100644 index 0000000..6dca0be --- /dev/null +++ b/src/pi/vision/line_detector.py @@ -0,0 +1,229 @@ +""" +line_detector +カメラ画像から黒線の位置を検出するモジュール +Pi 側では現行手法(current)のみ使用する +""" + +from dataclasses import dataclass + +import cv2 +import numpy as np + +from common import config +from pi.vision.fitting import clean_and_fit + +# 検出領域の y 範囲(画像全体) +DETECT_Y_START: int = 0 +DETECT_Y_END: int = config.FRAME_HEIGHT + +# フィッティングに必要な最小数 +MIN_FIT_PIXELS: int = 50 +MIN_FIT_ROWS: int = 10 + + +@dataclass +class ImageParams: + """二値化パラメータ + + Attributes: + clahe_clip: CLAHE のコントラスト増幅上限 + clahe_grid: CLAHE の局所領域分割数 + blur_size: ガウシアンブラーのカーネルサイズ(奇数) + binary_thresh: 二値化の閾値 + open_size: オープニングのカーネルサイズ + close_width: クロージングの横幅 + close_height: クロージングの高さ + median_ksize: 中心点列の移動メディアンフィルタサイズ(0 で無効) + neighbor_thresh: 近傍外れ値除去の閾値(px,0 で無効) + residual_thresh: 残差反復除去の閾値(px,0 で無効) + """ + + clahe_clip: float = 2.0 + clahe_grid: int = 8 + blur_size: int = 5 + binary_thresh: int = 80 + open_size: int = 5 + close_width: int = 25 + close_height: int = 3 + median_ksize: int = 7 + neighbor_thresh: float = 10.0 + residual_thresh: float = 8.0 + + +@dataclass +class LineDetectResult: + """線検出の結果を格納するデータクラス + + Attributes: + detected: 線が検出できたか + position_error: 画像下端での位置偏差(-1.0~+1.0) + heading: 線の傾き(dx/dy,画像下端での値) + curvature: 線の曲率(d²x/dy²) + poly_coeffs: 多項式の係数(描画用,未検出時は None) + row_centers: 各行の線中心 x 座標(index=行番号, + NaN=その行に線なし,未検出時は None) + binary_image: 二値化後の画像(デバッグ用) + """ + + detected: bool + position_error: float + heading: float + curvature: float + poly_coeffs: np.ndarray | None + row_centers: np.ndarray | None + binary_image: np.ndarray | None + + +def no_detection( + binary: np.ndarray, +) -> LineDetectResult: + """未検出の結果を返す""" + return LineDetectResult( + detected=False, + position_error=0.0, + heading=0.0, + curvature=0.0, + poly_coeffs=None, + row_centers=None, + binary_image=binary, + ) + + +def _extract_row_centers( + binary: np.ndarray, +) -> np.ndarray | None: + """二値画像の最大連結領域から各行の線中心を求める + + Args: + binary: 二値画像 + + Returns: + 各行の中心 x 座標(NaN=その行に線なし), + 最大領域が見つからない場合は None + """ + h, w = binary.shape[:2] + num_labels, labels, stats, _ = ( + cv2.connectedComponentsWithStats(binary) + ) + + if num_labels <= 1: + return None + + areas = stats[1:, cv2.CC_STAT_AREA] + largest_label = int(np.argmax(areas)) + 1 + mask = (labels == largest_label).astype(np.uint8) + + centers = np.full(h, np.nan) + for y in range(h): + row = mask[y] + cols = np.where(row > 0)[0] + if len(cols) > 0: + centers[y] = (cols[0] + cols[-1]) / 2.0 + + return centers + + +def build_result( + coeffs: np.ndarray, + binary: np.ndarray, + row_centers: np.ndarray | None = None, +) -> LineDetectResult: + """多項式係数から LineDetectResult を構築する + + row_centers が None の場合は binary から自動抽出する + """ + poly = np.poly1d(coeffs) + center_x = config.FRAME_WIDTH / 2.0 + + x_bottom = poly(DETECT_Y_END) + position_error = (center_x - x_bottom) / center_x + + poly_deriv = poly.deriv() + heading = float(poly_deriv(DETECT_Y_END)) + + poly_deriv2 = poly_deriv.deriv() + curvature = float(poly_deriv2(DETECT_Y_END)) + + if row_centers is None: + row_centers = _extract_row_centers(binary) + + return LineDetectResult( + detected=True, + position_error=position_error, + heading=heading, + curvature=curvature, + poly_coeffs=coeffs, + row_centers=row_centers, + binary_image=binary, + ) + + +def detect_line( + frame: np.ndarray, + params: ImageParams | None = None, +) -> LineDetectResult: + """画像から黒線の位置を検出する(現行手法) + + Args: + frame: グレースケールのカメラ画像 + params: 二値化パラメータ(None でデフォルト) + + Returns: + 線検出の結果 + """ + if params is None: + params = ImageParams() + + # CLAHE でコントラスト強調 + clahe = cv2.createCLAHE( + clipLimit=params.clahe_clip, + tileGridSize=( + params.clahe_grid, + params.clahe_grid, + ), + ) + enhanced = clahe.apply(frame) + + # ガウシアンブラー + blur_k = params.blur_size | 1 + blurred = cv2.GaussianBlur( + enhanced, (blur_k, blur_k), 0, + ) + + # 固定閾値で二値化(黒線を白に反転) + _, binary = cv2.threshold( + blurred, params.binary_thresh, 255, + cv2.THRESH_BINARY_INV, + ) + + # オープニング(孤立ノイズ除去) + if params.open_size >= 3: + open_k = params.open_size | 1 + open_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (open_k, open_k), + ) + binary = cv2.morphologyEx( + binary, cv2.MORPH_OPEN, open_kernel, + ) + + # 横方向クロージング(途切れ補間) + if params.close_width >= 3: + close_h = max(params.close_height | 1, 1) + close_kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, + (params.close_width, close_h), + ) + binary = cv2.morphologyEx( + binary, cv2.MORPH_CLOSE, close_kernel, + ) + + # 全ピクセルフィッティング + region = binary[DETECT_Y_START:DETECT_Y_END, :] + ys_local, xs = np.where(region > 0) + + if len(xs) < MIN_FIT_PIXELS: + return no_detection(binary) + + ys = ys_local + DETECT_Y_START + coeffs = np.polyfit(ys, xs, 2) + return build_result(coeffs, binary) diff --git a/src/pi/vision/morphology.py b/src/pi/vision/morphology.py new file mode 100644 index 0000000..578dcb6 --- /dev/null +++ b/src/pi/vision/morphology.py @@ -0,0 +1,127 @@ +""" +morphology +二値画像の形態学的処理ユーティリティモジュール +""" + +import cv2 +import numpy as np + + +def apply_iso_closing( + binary: np.ndarray, size: int, +) -> np.ndarray: + """等方クロージングで穴を埋める + + Args: + binary: 二値画像 + size: カーネルサイズ + + Returns: + クロージング後の二値画像 + """ + if size < 3: + return binary + k = size | 1 + kernel = cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (k, k), + ) + return cv2.morphologyEx( + binary, cv2.MORPH_CLOSE, kernel, + ) + + +def apply_staged_closing( + binary: np.ndarray, + small_size: int, + min_area: int, + large_size: int, +) -> np.ndarray: + """段階クロージング: 小穴埋め → 孤立除去 → 大穴埋め + + Args: + binary: 二値画像 + small_size: 第1段クロージングのカーネルサイズ + min_area: 孤立領域除去の最小面積(0 で無効) + large_size: 第2段クロージングのカーネルサイズ(0 で無効) + + Returns: + 処理後の二値画像 + """ + result = apply_iso_closing(binary, small_size) + + if min_area > 0: + contours, _ = cv2.findContours( + result, cv2.RETR_EXTERNAL, + cv2.CHAIN_APPROX_SIMPLE, + ) + mask = np.zeros_like(result) + for cnt in contours: + if cv2.contourArea(cnt) >= min_area: + cv2.drawContours( + mask, [cnt], -1, 255, -1, + ) + result = mask + + result = apply_iso_closing(result, large_size) + + return result + + +def apply_width_filter( + binary: np.ndarray, + width_near: int, + width_far: int, + tolerance: float, +) -> np.ndarray: + """透視補正付き幅フィルタで広がりすぎた行を除外する + + Args: + binary: 二値画像 + width_near: 画像下端での期待線幅(px) + width_far: 画像上端での期待線幅(px) + tolerance: 上限倍率 + + Returns: + 幅フィルタ適用後の二値画像 + """ + result = binary.copy() + h = binary.shape[0] + denom = max(h - 1, 1) + + for y_local in range(h): + xs = np.where(binary[y_local] > 0)[0] + if len(xs) == 0: + continue + t = (h - 1 - y_local) / denom + expected = float(width_far) + ( + float(width_near) - float(width_far) + ) * t + max_w = expected * tolerance + actual_w = int(xs[-1]) - int(xs[0]) + 1 + if actual_w > max_w: + result[y_local] = 0 + + return result + + +def apply_dist_mask( + binary: np.ndarray, thresh: float, +) -> np.ndarray: + """距離変換で中心部のみを残す + + Args: + binary: 二値画像 + thresh: 距離の閾値(ピクセル) + + Returns: + 中心部のみの二値画像 + """ + if thresh <= 0: + return binary + dist = cv2.distanceTransform( + binary, cv2.DIST_L2, 5, + ) + _, mask = cv2.threshold( + dist, thresh, 255, cv2.THRESH_BINARY, + ) + return mask.astype(np.uint8)