"""
command_sender
パラメータの dirty 管理・コマンド辞書構築・ZMQ 送信を担当するモジュール
"""
import dataclasses
from common.steering.pd_control import PdParams
from common.steering.pursuit_control import PursuitParams
from common.steering.recovery import RecoveryParams
from common.steering.ts_pd_control import TsPdParams
from common.vision.line_detector import ImageParams
from pc.comm.zmq_client import PcZmqClient
class CommandSender:
"""コマンドを構築して Pi に送信する
パラメータの変更フラグを管理し,
モード・操舵量・パラメータをまとめて送信する
"""
def __init__(
self,
zmq_client: PcZmqClient,
) -> None:
self._zmq_client = zmq_client
self.params_dirty: bool = True
def send(
self,
*,
is_auto: bool,
throttle: float,
steer: float,
intersection_enabled: bool,
intersection_throttle: float,
steering_method: str,
image_params: ImageParams,
pd_params: PdParams,
pursuit_params: PursuitParams,
ts_pd_params: TsPdParams,
recovery_params: RecoveryParams,
) -> None:
"""コマンドを構築して送信する
Args:
is_auto: 自動操縦中か
throttle: 手動操作の throttle
steer: 手動操作の steer
intersection_enabled: 十字路判定の有効化
intersection_throttle: 十字路通過時の throttle
steering_method: 制御手法名
image_params: 二値化パラメータ
pd_params: PD 制御パラメータ
pursuit_params: Pursuit 制御パラメータ
ts_pd_params: Theil-Sen PD パラメータ
recovery_params: 復帰パラメータ
"""
cmd: dict = {}
# モード
if is_auto:
cmd["mode"] = "auto"
elif throttle != 0.0 or steer != 0.0:
cmd["mode"] = "manual"
cmd["throttle"] = throttle
cmd["steer"] = steer
else:
cmd["mode"] = "stop"
# 十字路設定
cmd["intersection_enabled"] = (
intersection_enabled
)
cmd["intersection_throttle"] = (
intersection_throttle
)
# 制御手法
cmd["steering_method"] = steering_method
# パラメータ更新(変更があった場合のみ)
if self.params_dirty:
cmd["image_params"] = dataclasses.asdict(
image_params,
)
cmd["pd_params"] = dataclasses.asdict(
pd_params,
)
cmd["pursuit_params"] = dataclasses.asdict(
pursuit_params,
)
cmd["steering_params"] = dataclasses.asdict(
ts_pd_params,
)
cmd["recovery_params"] = dataclasses.asdict(
recovery_params,
)
self.params_dirty = False
self._zmq_client.send_command(cmd)
def send_stop(self) -> None:
"""停止コマンドを送信する"""
self._zmq_client.send_command({"mode": "stop"})
def mark_dirty(self) -> None:
"""パラメータ変更フラグを立てる"""
self.params_dirty = True