"""
zmq_client
Pi 側の ZMQ 通信を担当するモジュール
テレメトリ送信(画像+検出結果+操舵量)と
コマンド受信(モード切替・パラメータ更新・手動操作)を行う
"""
import json
import struct
import time
import cv2
import numpy as np
import zmq
from common import config
class PiZmqClient:
"""Pi 側の ZMQ 通信クライアント
テレメトリ送信(PUB)とコマンド受信(SUB)の2チャネルを管理する
"""
def __init__(self) -> None:
self._context = zmq.Context()
self._telemetry_socket: zmq.Socket | None = None
self._command_socket: zmq.Socket | None = None
def start(self) -> None:
"""通信ソケットを初期化して接続する"""
# テレメトリ送信ソケット(PUB,PC へ画像+状態を送信)
self._telemetry_socket = self._context.socket(
zmq.PUB,
)
self._telemetry_socket.setsockopt(zmq.CONFLATE, 1)
self._telemetry_socket.connect(
config.image_connect_address(),
)
# コマンド受信ソケット(SUB,PC からのコマンドを受信)
self._command_socket = self._context.socket(
zmq.SUB,
)
self._command_socket.setsockopt(zmq.CONFLATE, 1)
self._command_socket.setsockopt_string(
zmq.SUBSCRIBE, "",
)
self._command_socket.connect(
config.control_connect_address(),
)
def send_telemetry(
self,
frame: np.ndarray,
throttle: float,
steer: float,
detected: bool,
position_error: float,
heading: float,
is_intersection: bool,
is_recovering: bool,
fps: float,
binary_image: np.ndarray | None = None,
) -> None:
"""テレメトリ(JSON ヘッダ + JPEG 画像)を送信する
メッセージ形式:
4 バイト: JSON 長(uint32 LE)
N バイト: JSON テレメトリ
残り: JPEG 画像(カメラ映像)
(binary_image がある場合はさらに続く)
Args:
frame: カメラから取得した画像の NumPy 配列
throttle: 現在の throttle 出力
steer: 現在の steer 出力
detected: 線が検出できたか
position_error: 位置偏差
heading: 線の傾き
is_intersection: 十字路と判定されたか
is_recovering: 復帰動作中か
fps: Pi 側の処理 FPS
binary_image: 二値画像(None で省略)
"""
if self._telemetry_socket is None:
return
telemetry: dict = {
"ts": time.time(),
"throttle": throttle,
"steer": steer,
"detected": detected,
"pos_error": position_error,
"heading": heading,
"is_intersection": is_intersection,
"is_recovering": is_recovering,
"fps": fps,
}
# JSON ヘッダをエンコード
json_bytes = json.dumps(telemetry).encode("utf-8")
json_len = struct.pack("<I", len(json_bytes))
# カメラ画像を JPEG 圧縮
_, cam_encoded = cv2.imencode(
".jpg",
frame,
[cv2.IMWRITE_JPEG_QUALITY, config.JPEG_QUALITY],
)
cam_bytes = cam_encoded.tobytes()
cam_len = struct.pack("<I", len(cam_bytes))
# 二値画像を JPEG 圧縮(ある場合)
bin_bytes = b""
if binary_image is not None:
_, bin_encoded = cv2.imencode(
".jpg",
binary_image,
[cv2.IMWRITE_JPEG_QUALITY, 80],
)
bin_bytes = bin_encoded.tobytes()
# メッセージ: JSON長 + JSON + CAM長 + CAM + BIN
msg = (
json_len + json_bytes
+ cam_len + cam_bytes
+ bin_bytes
)
self._telemetry_socket.send(msg, zmq.NOBLOCK)
def receive_command(self) -> dict | None:
"""PC からのコマンドを非ブロッキングで受信する
コマンド形式(JSON):
mode: "auto" | "manual" | "stop"
throttle: float(手動モード時のみ)
steer: float(手動モード時のみ)
image_params: dict(パラメータ更新,省略可)
steering_params: dict(パラメータ更新,省略可)
recovery_params: dict(パラメータ更新,省略可)
intersection_enabled: bool(省略可)
intersection_throttle: float(省略可)
Returns:
コマンド辞書,受信データがない場合は None
"""
if self._command_socket is None:
return None
try:
data = self._command_socket.recv(zmq.NOBLOCK)
return json.loads(data.decode("utf-8"))
except zmq.Again:
return None
def stop(self) -> None:
"""通信ソケットを閉じる"""
if self._telemetry_socket is not None:
self._telemetry_socket.close()
self._telemetry_socket = None
if self._command_socket is not None:
self._command_socket.close()
self._command_socket = None
self._context.term()