"""
main
Pi 側アプリケーションのエントリーポイント
カメラ画像の取得・画像処理・操舵量計算・モーター制御を
すべて Pi 上で完結させる
PC にはテレメトリ(画像+状態)を送信し,
PC からはコマンド(モード切替・パラメータ更新)を受信する
"""
import dataclasses
import time
from typing import Any
from pi.camera.capture import CameraCapture
from pi.comm.zmq_client import PiZmqClient
from pi.motor.driver import MotorDriver
from common import config
from common.steering.base import SteeringBase, SteeringOutput
from common.steering.pd_control import PdControl
from common.steering.pursuit_control import PursuitControl
from common.steering.recovery import RecoveryController
from common.steering.ts_pd_control import TsPdControl
from common.vision.intersection import IntersectionClassifier
def main() -> None:
"""Pi 側のメインループを実行する"""
camera = CameraCapture()
zmq_client = PiZmqClient()
motor = MotorDriver()
# 操舵制御(3手法)
pd_control = PdControl()
pursuit_control = PursuitControl()
ts_pd_control = TsPdControl()
controllers: dict[str, SteeringBase] = {
"pd": pd_control,
"pursuit": pursuit_control,
"ts_pd": ts_pd_control,
}
steering: SteeringBase = ts_pd_control
steering_method = "ts_pd"
recovery = RecoveryController()
# 十字路分類器(遅延読み込み)
intersection_clf = IntersectionClassifier()
intersection_enabled = False
intersection_throttle = 0.3
# モード管理
mode = "stop" # "auto", "manual", "stop"
manual_throttle = 0.0
manual_steer = 0.0
# FPS 計測用
frame_count = 0
fps_start = time.time()
current_fps = 0.0
compute_ms_avg = 0.0
compute_ms_sum = 0.0
compute_count = 0
compute_start = time.time()
try:
camera.start()
zmq_client.start()
motor.start()
print("Pi: カメラ・通信・モーターを開始(自律モード)")
while True:
# ── コマンド受信 ────────────────────────
cmd = zmq_client.receive_command()
if cmd is not None:
# 制御手法の切り替え
new_method = cmd.get("steering_method")
if (
new_method is not None
and new_method != steering_method
):
if new_method in controllers:
steering_method = new_method
steering = controllers[
steering_method
]
else:
steering_method = "ts_pd"
steering = controllers["ts_pd"]
print(
f"Pi: 制御手法変更 → "
f"{steering_method}"
)
_apply_command(
cmd, controllers, recovery,
intersection_clf,
)
# モード更新
if "mode" in cmd:
new_mode = cmd["mode"]
if new_mode != mode:
if new_mode == "auto":
steering.reset()
recovery.reset()
mode = new_mode
print(f"Pi: モード変更 → {mode}")
# 手動操作値
if mode == "manual":
manual_throttle = cmd.get(
"throttle", 0.0,
)
manual_steer = cmd.get("steer", 0.0)
# 十字路設定
if "intersection_enabled" in cmd:
intersection_enabled = cmd[
"intersection_enabled"
]
if "intersection_throttle" in cmd:
intersection_throttle = cmd[
"intersection_throttle"
]
# ── カメラ画像取得 ──────────────────────
frame = camera.capture()
frame_count += 1
# ── 操舵量決定 ─────────────────────────
throttle = 0.0
steer = 0.0
detected = False
position_error = 0.0
heading = 0.0
is_intersection = False
is_recovering = False
binary_image = None
if mode == "auto":
# 線検出 + 制御
t_start = time.time()
output = steering.compute(frame)
det = steering.last_detect_result
detected = (
det is not None and det.detected
)
position_error = (
det.position_error
if detected and det is not None
else 0.0
)
heading = (
det.heading
if detected and det is not None
else 0.0
)
binary_image = (
det.binary_image
if det is not None else None
)
# 十字路判定
if (
intersection_enabled
and intersection_clf.available
and det is not None
and det.binary_image is not None
):
is_intersection = (
intersection_clf.predict(
det.binary_image,
)
)
if is_intersection:
output = SteeringOutput(
throttle=intersection_throttle,
steer=0.0,
)
# コースアウト復帰
recovery_output = recovery.update(
detected, position_error,
)
if recovery_output is not None:
output = recovery_output
is_recovering = recovery.is_recovering
throttle = output.throttle
steer = output.steer
compute_ms_sum += (
(time.time() - t_start) * 1000.0
)
compute_count += 1
ce = time.time() - compute_start
if ce >= 1.0:
compute_ms_avg = (
compute_ms_sum / compute_count
)
compute_ms_sum = 0.0
compute_count = 0
compute_start = time.time()
elif mode == "manual":
throttle = manual_throttle
steer = manual_steer
# mode == "stop" なら throttle=0, steer=0
# ── モーター制御 ───────────────────────
if mode == "stop":
motor.stop()
else:
motor.set_drive(throttle, steer)
# ── FPS 計測 ───────────────────────────
elapsed = time.time() - fps_start
if elapsed >= config.LOG_INTERVAL_SEC:
current_fps = frame_count / elapsed
print(f"Pi: FPS={current_fps:.1f}")
frame_count = 0
fps_start = time.time()
# ── テレメトリ送信 ─────────────────────
zmq_client.send_telemetry(
frame=frame,
throttle=throttle,
steer=steer,
detected=detected,
position_error=position_error,
heading=heading,
is_intersection=is_intersection,
is_recovering=is_recovering,
intersection_available=(
intersection_clf.available
),
compute_ms=compute_ms_avg,
fps=current_fps,
binary_image=binary_image,
)
except KeyboardInterrupt:
print("\nPi: 終了")
finally:
motor.cleanup()
camera.stop()
zmq_client.stop()
def _safe_update_dataclass(
target: Any,
updates: dict[str, Any],
) -> None:
"""dataclass のフィールドを型チェック付きで更新する
Args:
target: 更新対象の dataclass インスタンス
updates: フィールド名と値の辞書
"""
field_names = {
f.name for f in dataclasses.fields(target)
}
for key, value in updates.items():
if key not in field_names:
continue
current = getattr(target, key)
expected = type(current)
# int フィールドに float が来た場合は変換を許容
if expected is int and isinstance(value, float):
value = int(value)
elif expected is float and isinstance(value, int):
value = float(value)
elif not isinstance(value, expected):
print(
f"Pi: パラメータ型エラー "
f"{key}: 期待={expected.__name__}, "
f"実際={type(value).__name__}"
)
continue
setattr(target, key, value)
def _apply_command(
cmd: dict,
controllers: dict[str, SteeringBase],
recovery: RecoveryController,
intersection_clf: IntersectionClassifier,
) -> None:
"""コマンドからパラメータ更新を適用する
Args:
cmd: 受信したコマンド辞書
controllers: 制御手法名とインスタンスの辞書
recovery: 復帰制御クラス
intersection_clf: 十字路分類器
"""
# 画像処理パラメータの更新(全制御クラスに反映)
if "image_params" in cmd:
ip = cmd["image_params"]
for ctrl in controllers.values():
_safe_update_dataclass(
ctrl.image_params, ip,
)
# 制御手法固有パラメータの更新
_PARAM_KEYS: dict[str, str] = {
"pd": "pd_params",
"pursuit": "pursuit_params",
"ts_pd": "steering_params",
}
for name, cmd_key in _PARAM_KEYS.items():
if cmd_key in cmd and name in controllers:
_safe_update_dataclass(
controllers[name].params,
cmd[cmd_key],
)
# 復帰パラメータの更新
if "recovery_params" in cmd:
_safe_update_dataclass(
recovery.params,
cmd["recovery_params"],
)
# 十字路分類器の遅延読み込み
if (
cmd.get("intersection_enabled", False)
and not intersection_clf.available
):
print("Pi: 十字路分類器を読み込み中...")
intersection_clf.load()
if intersection_clf.available:
print("Pi: 十字路分類器の読み込み完了")
else:
print("Pi: 十字路分類器が見つかりません")
if __name__ == "__main__":
main()