Newer
Older
RobotCar / src / common / steering / base.py
"""
base
操舵量計算の共通インターフェースを定義するモジュール
全ての操舵量計算クラスはこのインターフェースに従う
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass

import numpy as np

from common.vision.line_detector import (
    ImageParams,
    LineDetectResult,
    detect_line,
    reset_valley_tracker,
)


@dataclass
class SteeringOutput:
    """操舵量計算の出力を格納するデータクラス

    Attributes:
        throttle: 前後方向の出力 (-1.0 ~ +1.0)
        steer: 左右方向の出力 (-1.0 ~ +1.0)
    """
    throttle: float
    steer: float


class SteeringBase(ABC):
    """操舵量計算の基底クラス

    線検出・レートリミッター・状態管理の共通ロジックを提供し,
    サブクラスは _compute_from_result で操舵計算のみ実装する
    """

    def __init__(
        self,
        image_params: ImageParams | None = None,
    ) -> None:
        self.image_params: ImageParams = (
            image_params or ImageParams()
        )
        self._prev_steer: float = 0.0
        self._last_result: LineDetectResult | None = None

    def compute(
        self, frame: np.ndarray,
    ) -> SteeringOutput:
        """カメラ画像から操舵量を計算する

        線検出 → サブクラスの操舵計算 → レートリミッター
        の共通フローを実行する

        Args:
            frame: グレースケールのカメラ画像

        Returns:
            計算された操舵量
        """
        result = detect_line(frame, self.image_params)
        self._last_result = result

        output = self._compute_from_result(result)

        # レートリミッター
        max_rate = self._max_steer_rate()
        delta = output.steer - self._prev_steer
        delta = max(-max_rate, min(max_rate, delta))
        output.steer = self._prev_steer + delta
        self._prev_steer = output.steer

        return output

    @abstractmethod
    def _compute_from_result(
        self, result: LineDetectResult,
    ) -> SteeringOutput:
        """線検出結果から操舵量を計算する

        サブクラスで操舵アルゴリズムを実装する.
        レートリミッターは基底クラスが適用するため,
        ここでは素の操舵量を返せばよい

        Args:
            result: 線検出の結果

        Returns:
            計算された操舵量(レートリミッター適用前)
        """

    @abstractmethod
    def _max_steer_rate(self) -> float:
        """1フレームあたりの最大操舵変化量を返す"""

    def reset(self) -> None:
        """内部状態をリセットする"""
        self._prev_steer = 0.0
        self._last_result = None
        reset_valley_tracker()

    @property
    def last_detect_result(
        self,
    ) -> LineDetectResult | None:
        """直近の線検出結果を取得する"""
        return self._last_result