Newer
Older
RobotCar / src / pi / comm / zmq_client.py
"""
zmq_client
Pi 側の ZMQ 通信を担当するモジュール
テレメトリ送信(画像+検出結果+操舵量)と
コマンド受信(モード切替・パラメータ更新・手動操作)を行う
"""

import json
import struct
import time

import cv2
import numpy as np
import zmq

from common import config


class PiZmqClient:
    """Pi 側の ZMQ 通信クライアント

    テレメトリ送信(PUB)とコマンド受信(SUB)の2チャネルを管理する
    """

    def __init__(self) -> None:
        self._context = zmq.Context()
        self._telemetry_socket: zmq.Socket | None = None
        self._command_socket: zmq.Socket | None = None

    def start(self) -> None:
        """通信ソケットを初期化して接続する"""

        # テレメトリ送信ソケット(PUB,PC へ画像+状態を送信)
        self._telemetry_socket = self._context.socket(
            zmq.PUB,
        )
        self._telemetry_socket.setsockopt(zmq.CONFLATE, 1)
        self._telemetry_socket.connect(
            config.image_connect_address(),
        )

        # コマンド受信ソケット(SUB,PC からのコマンドを受信)
        self._command_socket = self._context.socket(
            zmq.SUB,
        )
        self._command_socket.setsockopt(zmq.CONFLATE, 1)
        self._command_socket.setsockopt_string(
            zmq.SUBSCRIBE, "",
        )
        self._command_socket.connect(
            config.control_connect_address(),
        )

    def send_telemetry(
        self,
        frame: np.ndarray,
        throttle: float,
        steer: float,
        detected: bool,
        position_error: float,
        heading: float,
        is_intersection: bool,
        is_recovering: bool,
        fps: float,
        binary_image: np.ndarray | None = None,
    ) -> None:
        """テレメトリ(JSON ヘッダ + JPEG 画像)を送信する

        メッセージ形式:
            4 バイト: JSON 長(uint32 LE)
            N バイト: JSON テレメトリ
            残り: JPEG 画像(カメラ映像)
            (binary_image がある場合はさらに続く)

        Args:
            frame: カメラから取得した画像の NumPy 配列
            throttle: 現在の throttle 出力
            steer: 現在の steer 出力
            detected: 線が検出できたか
            position_error: 位置偏差
            heading: 線の傾き
            is_intersection: 十字路と判定されたか
            is_recovering: 復帰動作中か
            fps: Pi 側の処理 FPS
            binary_image: 二値画像(None で省略)
        """
        if self._telemetry_socket is None:
            return

        telemetry: dict = {
            "ts": time.time(),
            "throttle": throttle,
            "steer": steer,
            "detected": detected,
            "pos_error": position_error,
            "heading": heading,
            "is_intersection": is_intersection,
            "is_recovering": is_recovering,
            "fps": fps,
        }

        # JSON ヘッダをエンコード
        json_bytes = json.dumps(telemetry).encode("utf-8")
        json_len = struct.pack("<I", len(json_bytes))

        # カメラ画像を JPEG 圧縮
        _, cam_encoded = cv2.imencode(
            ".jpg",
            frame,
            [cv2.IMWRITE_JPEG_QUALITY, config.JPEG_QUALITY],
        )
        cam_bytes = cam_encoded.tobytes()
        cam_len = struct.pack("<I", len(cam_bytes))

        # 二値画像を JPEG 圧縮(ある場合)
        bin_bytes = b""
        if binary_image is not None:
            _, bin_encoded = cv2.imencode(
                ".jpg",
                binary_image,
                [cv2.IMWRITE_JPEG_QUALITY, 80],
            )
            bin_bytes = bin_encoded.tobytes()

        # メッセージ: JSON長 + JSON + CAM長 + CAM + BIN
        msg = (
            json_len + json_bytes
            + cam_len + cam_bytes
            + bin_bytes
        )
        self._telemetry_socket.send(msg, zmq.NOBLOCK)

    def receive_command(self) -> dict | None:
        """PC からのコマンドを非ブロッキングで受信する

        コマンド形式(JSON):
            mode: "auto" | "manual" | "stop"
            throttle: float(手動モード時のみ)
            steer: float(手動モード時のみ)
            steering_method: "pd" | "pursuit" | "ts_pd"
            image_params: dict(二値化パラメータ更新,省略可)
            pd_params: dict(PD 制御パラメータ,省略可)
            pursuit_params: dict(Pursuit パラメータ,省略可)
            steering_params: dict(TS-PD パラメータ,省略可)
            recovery_params: dict(復帰パラメータ,省略可)
            intersection_enabled: bool(省略可)
            intersection_throttle: float(省略可)

        Returns:
            コマンド辞書,受信データがない場合は None
        """
        if self._command_socket is None:
            return None
        try:
            data = self._command_socket.recv(zmq.NOBLOCK)
            return json.loads(data.decode("utf-8"))
        except zmq.Again:
            return None

    def stop(self) -> None:
        """通信ソケットを閉じる"""
        if self._telemetry_socket is not None:
            self._telemetry_socket.close()
            self._telemetry_socket = None
        if self._command_socket is not None:
            self._command_socket.close()
            self._command_socket = None
        self._context.term()