"""
main
Pi 側アプリケーションのエントリーポイント
カメラ画像の取得・画像処理・操舵量計算・モーター制御を
すべて Pi 上で完結させる
PC にはテレメトリ(画像+状態)を送信し,
PC からはコマンド(モード切替・パラメータ更新)を受信する
"""
import dataclasses
import time
from typing import Any
from pi.camera.capture import CameraCapture
from pi.comm.zmq_client import PiZmqClient
from pi.motor.driver import MotorDriver
from common.steering.base import SteeringBase, SteeringOutput
from common.steering.pd_control import PdControl, PdParams
from common.steering.pursuit_control import (
PursuitControl,
PursuitParams,
)
from common.steering.recovery import (
RecoveryController,
RecoveryParams,
)
from common.steering.ts_pd_control import (
TsPdControl,
TsPdParams,
)
from common.vision.intersection import IntersectionClassifier
from common.vision.line_detector import ImageParams
def main() -> None:
"""Pi 側のメインループを実行する"""
camera = CameraCapture()
zmq_client = PiZmqClient()
motor = MotorDriver()
# 操舵制御(3手法)
pd_control = PdControl()
pursuit_control = PursuitControl()
ts_pd_control = TsPdControl()
steering: SteeringBase = ts_pd_control
steering_method = "ts_pd"
recovery = RecoveryController()
# 十字路分類器(遅延読み込み)
intersection_clf = IntersectionClassifier()
intersection_enabled = False
intersection_throttle = 0.3
# モード管理
mode = "stop" # "auto", "manual", "stop"
manual_throttle = 0.0
manual_steer = 0.0
# FPS 計測用
frame_count = 0
fps_start = time.time()
current_fps = 0.0
LOG_INTERVAL_SEC = 3.0
try:
camera.start()
zmq_client.start()
motor.start()
print("Pi: カメラ・通信・モーターを開始(自律モード)")
while True:
# ── コマンド受信 ────────────────────────
cmd = zmq_client.receive_command()
if cmd is not None:
# 制御手法の切り替え
new_method = cmd.get("steering_method")
if (
new_method is not None
and new_method != steering_method
):
steering_method = new_method
if steering_method == "pd":
steering = pd_control
elif steering_method == "pursuit":
steering = pursuit_control
else:
steering = ts_pd_control
steering_method = "ts_pd"
print(
f"Pi: 制御手法変更 → "
f"{steering_method}"
)
_apply_command(
cmd, pd_control, pursuit_control,
ts_pd_control, recovery,
intersection_clf,
)
# モード更新
if "mode" in cmd:
new_mode = cmd["mode"]
if new_mode != mode:
if new_mode == "auto":
steering.reset()
recovery.reset()
mode = new_mode
print(f"Pi: モード変更 → {mode}")
# 手動操作値
if mode == "manual":
manual_throttle = cmd.get(
"throttle", 0.0,
)
manual_steer = cmd.get("steer", 0.0)
# 十字路設定
if "intersection_enabled" in cmd:
intersection_enabled = cmd[
"intersection_enabled"
]
if "intersection_throttle" in cmd:
intersection_throttle = cmd[
"intersection_throttle"
]
# ── カメラ画像取得 ──────────────────────
frame = camera.capture()
frame_count += 1
# ── 操舵量決定 ─────────────────────────
throttle = 0.0
steer = 0.0
detected = False
position_error = 0.0
heading = 0.0
is_intersection = False
is_recovering = False
binary_image = None
if mode == "auto":
# 線検出 + 制御
output = steering.compute(frame)
det = steering.last_detect_result
detected = (
det is not None and det.detected
)
position_error = (
det.position_error
if detected and det is not None
else 0.0
)
heading = (
det.heading
if detected and det is not None
else 0.0
)
binary_image = (
det.binary_image
if det is not None else None
)
# 十字路判定
if (
intersection_enabled
and intersection_clf.available
and det is not None
and det.binary_image is not None
):
is_intersection = (
intersection_clf.predict(
det.binary_image,
)
)
if is_intersection:
output = SteeringOutput(
throttle=intersection_throttle,
steer=0.0,
)
# コースアウト復帰
recovery_output = recovery.update(
detected, position_error,
)
if recovery_output is not None:
output = recovery_output
is_recovering = recovery.is_recovering
throttle = output.throttle
steer = output.steer
elif mode == "manual":
throttle = manual_throttle
steer = manual_steer
# mode == "stop" なら throttle=0, steer=0
# ── モーター制御 ───────────────────────
if mode == "stop":
motor.stop()
else:
motor.set_drive(throttle, steer)
# ── FPS 計測 ───────────────────────────
elapsed = time.time() - fps_start
if elapsed >= LOG_INTERVAL_SEC:
current_fps = frame_count / elapsed
print(f"Pi: FPS={current_fps:.1f}")
frame_count = 0
fps_start = time.time()
# ── テレメトリ送信 ─────────────────────
zmq_client.send_telemetry(
frame=frame,
throttle=throttle,
steer=steer,
detected=detected,
position_error=position_error,
heading=heading,
is_intersection=is_intersection,
is_recovering=is_recovering,
fps=current_fps,
binary_image=binary_image,
)
except KeyboardInterrupt:
print("\nPi: 終了")
finally:
motor.cleanup()
camera.stop()
zmq_client.stop()
def _safe_update_dataclass(
target: Any,
updates: dict[str, Any],
) -> None:
"""dataclass のフィールドを型チェック付きで更新する
Args:
target: 更新対象の dataclass インスタンス
updates: フィールド名と値の辞書
"""
fields = {
f.name: f.type
for f in dataclasses.fields(target)
}
for key, value in updates.items():
if key not in fields:
continue
expected = fields[key]
# int フィールドに float が来た場合は変換を許容
if expected is int and isinstance(value, float):
value = int(value)
elif expected is float and isinstance(value, int):
value = float(value)
elif not isinstance(value, expected):
print(
f"Pi: パラメータ型エラー "
f"{key}: 期待={expected.__name__}, "
f"実際={type(value).__name__}"
)
continue
setattr(target, key, value)
def _apply_command(
cmd: dict,
pd_control: PdControl,
pursuit_control: PursuitControl,
ts_pd_control: TsPdControl,
recovery: RecoveryController,
intersection_clf: IntersectionClassifier,
) -> None:
"""コマンドからパラメータ更新を適用する
Args:
cmd: 受信したコマンド辞書
pd_control: PD 制御クラス
pursuit_control: Pursuit 制御クラス
ts_pd_control: Theil-Sen PD 制御クラス
recovery: 復帰制御クラス
intersection_clf: 十字路分類器
"""
# 画像処理パラメータの更新(全制御クラスに反映)
if "image_params" in cmd:
ip = cmd["image_params"]
for ctrl in [
pd_control, pursuit_control, ts_pd_control,
]:
_safe_update_dataclass(
ctrl.image_params, ip,
)
# PD パラメータの更新
if "pd_params" in cmd:
_safe_update_dataclass(
pd_control.params, cmd["pd_params"],
)
# Pursuit パラメータの更新
if "pursuit_params" in cmd:
_safe_update_dataclass(
pursuit_control.params,
cmd["pursuit_params"],
)
# Theil-Sen PD パラメータの更新
if "steering_params" in cmd:
_safe_update_dataclass(
ts_pd_control.params,
cmd["steering_params"],
)
# 復帰パラメータの更新
if "recovery_params" in cmd:
_safe_update_dataclass(
recovery.params,
cmd["recovery_params"],
)
# 十字路分類器の遅延読み込み
if (
cmd.get("intersection_enabled", False)
and not intersection_clf.available
):
print("Pi: 十字路分類器を読み込み中...")
intersection_clf.load()
if intersection_clf.available:
print("Pi: 十字路分類器の読み込み完了")
else:
print("Pi: 十字路分類器が見つかりません")
if __name__ == "__main__":
main()