Newer
Older
RobotCar / src / pi / steering / pd_control.py
"""
pd_control
PD 制御による操舵量計算モジュール
多項式フィッティングの位置・傾き・曲率から操舵量と速度を算出する
"""

import time
from dataclasses import dataclass

import numpy as np

from pi.steering.base import SteeringBase, SteeringOutput
from pi.vision.line_detector import (
    ImageParams,
    LineDetectResult,
    detect_line,
    reset_valley_tracker,
)


@dataclass
class PdParams:
    """PD 制御のパラメータ

    Attributes:
        kp: 位置偏差ゲイン
        kh: 傾き(ヘディング)ゲイン
        kd: 微分ゲイン
        max_steer_rate: 1フレームあたりの最大操舵変化量
        max_throttle: 直線での最大速度
        speed_k: 曲率ベースの減速係数
    """
    kp: float = 0.5
    kh: float = 0.3
    kd: float = 0.1
    max_steer_rate: float = 0.1
    max_throttle: float = 0.4
    speed_k: float = 0.3


class PdControl(SteeringBase):
    """PD 制御による操舵量計算クラス"""

    def __init__(
        self,
        params: PdParams | None = None,
        image_params: ImageParams | None = None,
    ) -> None:
        self.params: PdParams = params or PdParams()
        self.image_params: ImageParams = (
            image_params or ImageParams()
        )
        self._prev_error: float = 0.0
        self._prev_time: float = 0.0
        self._prev_steer: float = 0.0
        self._last_result: LineDetectResult | None = None

    def compute(
        self, frame: np.ndarray,
    ) -> SteeringOutput:
        """カメラ画像から PD 制御で操舵量を計算する

        Args:
            frame: グレースケールのカメラ画像

        Returns:
            計算された操舵量
        """
        p = self.params

        # 線検出
        result = detect_line(frame, self.image_params)
        self._last_result = result

        # 線が検出できなかった場合は停止
        if not result.detected:
            return SteeringOutput(
                throttle=0.0, steer=0.0,
            )

        # 位置偏差 + 傾きによる操舵量
        error = (
            p.kp * result.position_error
            + p.kh * result.heading
        )

        # 時間差分の計算
        now = time.time()
        dt = (
            now - self._prev_time
            if self._prev_time > 0
            else 0.033
        )
        dt = max(dt, 0.001)

        # 微分項
        derivative = (error - self._prev_error) / dt
        steer = error + p.kd * derivative

        # 操舵量のクランプ
        steer = max(-1.0, min(1.0, steer))

        # レートリミッター(急な操舵変化を制限)
        delta = steer - self._prev_steer
        max_delta = p.max_steer_rate
        delta = max(-max_delta, min(max_delta, delta))
        steer = self._prev_steer + delta

        # 速度制御(曲率連動)
        throttle = (
            p.max_throttle
            - p.speed_k * abs(result.curvature)
        )
        throttle = max(0.0, throttle)

        # 状態の更新
        self._prev_error = error
        self._prev_time = now
        self._prev_steer = steer

        return SteeringOutput(
            throttle=throttle, steer=steer,
        )

    def reset(self) -> None:
        """内部状態をリセットする"""
        self._prev_error = 0.0
        self._prev_time = 0.0
        self._prev_steer = 0.0
        self._last_result = None
        reset_valley_tracker()

    @property
    def last_detect_result(
        self,
    ) -> LineDetectResult | None:
        """直近の線検出結果を取得する"""
        return self._last_result