Newer
Older
RobotCar / src / pc / comm / zmq_client.py
"""
zmq_client
PC 側の ZMQ 通信を担当するモジュール
画像の受信と操舵量の送信を行う
"""

import json
import struct

import cv2
import numpy as np
import zmq

from common import config


class PcZmqClient:
    """PC 側の ZMQ 通信クライアント

    画像受信(SUB)と操舵量送信(PUB)の2チャネルを管理する
    """

    def __init__(self) -> None:
        self._context: zmq.Context | None = None
        self._image_socket: zmq.Socket | None = None
        self._control_socket: zmq.Socket | None = None
        self._last_image_ts: float | None = None

    def start(self) -> None:
        """通信ソケットを初期化してバインドする"""
        self._context = zmq.Context()

        # 画像受信ソケット(SUB,Pi からの画像を受信)
        self._image_socket = self._context.socket(zmq.SUB)
        self._image_socket.setsockopt(zmq.CONFLATE, 1)
        self._image_socket.setsockopt_string(zmq.SUBSCRIBE, "")
        self._image_socket.bind(config.image_bind_address())

        # 操舵量送信ソケット(PUB,Pi へ操舵量を送信)
        self._control_socket = self._context.socket(zmq.PUB)
        self._control_socket.bind(config.control_bind_address())

    def receive_image(self) -> np.ndarray | None:
        """画像をタイムスタンプ付きで非ブロッキング受信する

        Returns:
            受信したグレースケール画像の NumPy 配列,受信データがない場合は None
        """
        if self._image_socket is None:
            return None
        try:
            raw = self._image_socket.recv(zmq.NOBLOCK)
            ts_size = struct.calcsize("d")
            if len(raw) > ts_size:
                self._last_image_ts = struct.unpack(
                    "d", raw[:ts_size],
                )[0]
                data = raw[ts_size:]
            else:
                self._last_image_ts = None
                data = raw
            frame = cv2.imdecode(
                np.frombuffer(data, dtype=np.uint8),
                cv2.IMREAD_GRAYSCALE,
            )
            return frame
        except zmq.Again:
            return None

    def send_control(
        self, throttle: float, steer: float,
    ) -> None:
        """操舵量をタイムスタンプ付きで送信する

        Args:
            throttle: 前後方向の出力 (-1.0 ~ +1.0)
            steer: 左右方向の出力 (-1.0 ~ +1.0)
        """
        if self._control_socket is None:
            return
        msg: dict = {
            "throttle": throttle,
            "steer": steer,
        }
        if self._last_image_ts is not None:
            msg["ts"] = self._last_image_ts
        payload = json.dumps(msg).encode("utf-8")
        self._control_socket.send(payload, zmq.NOBLOCK)

    @property
    def last_image_ts(self) -> float | None:
        """最後に受信した画像のタイムスタンプを返す"""
        return self._last_image_ts

    def stop(self) -> None:
        """通信ソケットを閉じる"""
        if self._image_socket is not None:
            self._image_socket.close()
            self._image_socket = None
        if self._control_socket is not None:
            self._control_socket.close()
            self._control_socket = None
        if self._context is not None:
            self._context.term()
            self._context = None