Newer
Older
RobotCar / src / pi / main.py
"""
main
Pi 側アプリケーションのエントリーポイント
カメラ画像の取得・画像処理・操舵量計算・モーター制御を
すべて Pi 上で完結させる
PC にはテレメトリ(画像+状態)を送信し,
PC からはコマンド(モード切替・パラメータ更新)を受信する
"""

import dataclasses
import time

from pi.camera.capture import CameraCapture
from pi.comm.zmq_client import PiZmqClient
from pi.motor.driver import MotorDriver
from common.steering.base import SteeringBase, SteeringOutput
from common.steering.pd_control import PdControl, PdParams
from common.steering.pursuit_control import (
    PursuitControl,
    PursuitParams,
)
from common.steering.recovery import (
    RecoveryController,
    RecoveryParams,
)
from common.steering.ts_pd_control import (
    TsPdControl,
    TsPdParams,
)
from common.vision.intersection import IntersectionClassifier
from common.vision.line_detector import ImageParams


def main() -> None:
    """Pi 側のメインループを実行する"""
    camera = CameraCapture()
    zmq_client = PiZmqClient()
    motor = MotorDriver()

    # 操舵制御(3手法)
    pd_control = PdControl()
    pursuit_control = PursuitControl()
    ts_pd_control = TsPdControl()
    steering: SteeringBase = ts_pd_control
    steering_method = "ts_pd"

    recovery = RecoveryController()

    # 十字路分類器(遅延読み込み)
    intersection_clf = IntersectionClassifier()
    intersection_enabled = False
    intersection_throttle = 0.3

    # モード管理
    mode = "stop"  # "auto", "manual", "stop"
    manual_throttle = 0.0
    manual_steer = 0.0

    # FPS 計測用
    frame_count = 0
    fps_start = time.time()
    current_fps = 0.0
    LOG_INTERVAL_SEC = 3.0

    try:
        camera.start()
        zmq_client.start()
        motor.start()
        print("Pi: カメラ・通信・モーターを開始(自律モード)")

        while True:
            # ── コマンド受信 ────────────────────────
            cmd = zmq_client.receive_command()
            if cmd is not None:
                # 制御手法の切り替え
                new_method = cmd.get("steering_method")
                if (
                    new_method is not None
                    and new_method != steering_method
                ):
                    steering_method = new_method
                    if steering_method == "pd":
                        steering = pd_control
                    elif steering_method == "pursuit":
                        steering = pursuit_control
                    else:
                        steering = ts_pd_control
                        steering_method = "ts_pd"
                    print(
                        f"Pi: 制御手法変更 → "
                        f"{steering_method}"
                    )

                _apply_command(
                    cmd, pd_control, pursuit_control,
                    ts_pd_control, recovery,
                    intersection_clf,
                )
                # モード更新
                if "mode" in cmd:
                    new_mode = cmd["mode"]
                    if new_mode != mode:
                        if new_mode == "auto":
                            steering.reset()
                            recovery.reset()
                        mode = new_mode
                        print(f"Pi: モード変更 → {mode}")
                # 手動操作値
                if mode == "manual":
                    manual_throttle = cmd.get(
                        "throttle", 0.0,
                    )
                    manual_steer = cmd.get("steer", 0.0)
                # 十字路設定
                if "intersection_enabled" in cmd:
                    intersection_enabled = cmd[
                        "intersection_enabled"
                    ]
                if "intersection_throttle" in cmd:
                    intersection_throttle = cmd[
                        "intersection_throttle"
                    ]

            # ── カメラ画像取得 ──────────────────────
            frame = camera.capture()
            frame_count += 1

            # ── 操舵量決定 ─────────────────────────
            throttle = 0.0
            steer = 0.0
            detected = False
            position_error = 0.0
            heading = 0.0
            is_intersection = False
            is_recovering = False
            binary_image = None

            if mode == "auto":
                # 線検出 + 制御
                output = steering.compute(frame)
                det = steering.last_detect_result

                detected = (
                    det is not None and det.detected
                )
                position_error = (
                    det.position_error
                    if detected and det is not None
                    else 0.0
                )
                heading = (
                    det.heading
                    if detected and det is not None
                    else 0.0
                )
                binary_image = (
                    det.binary_image
                    if det is not None else None
                )

                # 十字路判定
                if (
                    intersection_enabled
                    and intersection_clf.available
                    and det is not None
                    and det.binary_image is not None
                ):
                    is_intersection = (
                        intersection_clf.predict(
                            det.binary_image,
                        )
                    )
                if is_intersection:
                    output = SteeringOutput(
                        throttle=intersection_throttle,
                        steer=0.0,
                    )

                # コースアウト復帰
                recovery_output = recovery.update(
                    detected, position_error,
                )
                if recovery_output is not None:
                    output = recovery_output
                is_recovering = recovery.is_recovering

                throttle = output.throttle
                steer = output.steer

            elif mode == "manual":
                throttle = manual_throttle
                steer = manual_steer

            # mode == "stop" なら throttle=0, steer=0

            # ── モーター制御 ───────────────────────
            if mode == "stop":
                motor.stop()
            else:
                motor.set_drive(throttle, steer)

            # ── FPS 計測 ───────────────────────────
            elapsed = time.time() - fps_start
            if elapsed >= LOG_INTERVAL_SEC:
                current_fps = frame_count / elapsed
                print(f"Pi: FPS={current_fps:.1f}")
                frame_count = 0
                fps_start = time.time()

            # ── テレメトリ送信 ─────────────────────
            zmq_client.send_telemetry(
                frame=frame,
                throttle=throttle,
                steer=steer,
                detected=detected,
                position_error=position_error,
                heading=heading,
                is_intersection=is_intersection,
                is_recovering=is_recovering,
                fps=current_fps,
                binary_image=binary_image,
            )

    except KeyboardInterrupt:
        print("\nPi: 終了")
    finally:
        motor.cleanup()
        camera.stop()
        zmq_client.stop()


def _apply_command(
    cmd: dict,
    pd_control: PdControl,
    pursuit_control: PursuitControl,
    ts_pd_control: TsPdControl,
    recovery: RecoveryController,
    intersection_clf: IntersectionClassifier,
) -> None:
    """コマンドからパラメータ更新を適用する

    Args:
        cmd: 受信したコマンド辞書
        pd_control: PD 制御クラス
        pursuit_control: Pursuit 制御クラス
        ts_pd_control: Theil-Sen PD 制御クラス
        recovery: 復帰制御クラス
        intersection_clf: 十字路分類器
    """
    # 画像処理パラメータの更新(全制御クラスに反映)
    if "image_params" in cmd:
        ip = cmd["image_params"]
        for ctrl in [
            pd_control, pursuit_control, ts_pd_control,
        ]:
            current = ctrl.image_params
            for key, value in ip.items():
                if hasattr(current, key):
                    setattr(current, key, value)

    # PD パラメータの更新
    if "pd_params" in cmd:
        sp = cmd["pd_params"]
        current = pd_control.params
        for key, value in sp.items():
            if hasattr(current, key):
                setattr(current, key, value)

    # Pursuit パラメータの更新
    if "pursuit_params" in cmd:
        pp = cmd["pursuit_params"]
        current = pursuit_control.params
        for key, value in pp.items():
            if hasattr(current, key):
                setattr(current, key, value)

    # Theil-Sen PD パラメータの更新
    if "steering_params" in cmd:
        sp = cmd["steering_params"]
        current = ts_pd_control.params
        for key, value in sp.items():
            if hasattr(current, key):
                setattr(current, key, value)

    # 復帰パラメータの更新
    if "recovery_params" in cmd:
        rp = cmd["recovery_params"]
        current = recovery.params
        for key, value in rp.items():
            if hasattr(current, key):
                setattr(current, key, value)

    # 十字路分類器の遅延読み込み
    if (
        cmd.get("intersection_enabled", False)
        and not intersection_clf.available
    ):
        print("Pi: 十字路分類器を読み込み中...")
        intersection_clf.load()
        if intersection_clf.available:
            print("Pi: 十字路分類器の読み込み完了")
        else:
            print("Pi: 十字路分類器が見つかりません")


if __name__ == "__main__":
    main()