"""
zmq_client
PC 側の ZMQ 通信を担当するモジュール
画像の受信と操舵量の送信を行う
"""
import json
import struct
import cv2
import numpy as np
import zmq
from common import config
class PcZmqClient:
"""PC 側の ZMQ 通信クライアント
画像受信(SUB)と操舵量送信(PUB)の2チャネルを管理する
"""
def __init__(self) -> None:
self._context: zmq.Context | None = None
self._image_socket: zmq.Socket | None = None
self._control_socket: zmq.Socket | None = None
self._last_image_ts: float | None = None
def start(self) -> None:
"""通信ソケットを初期化してバインドする"""
self._context = zmq.Context()
# 画像受信ソケット(SUB,Pi からの画像を受信)
self._image_socket = self._context.socket(zmq.SUB)
self._image_socket.setsockopt(zmq.CONFLATE, 1)
self._image_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self._image_socket.bind(config.image_bind_address())
# 操舵量送信ソケット(PUB,Pi へ操舵量を送信)
self._control_socket = self._context.socket(zmq.PUB)
self._control_socket.bind(config.control_bind_address())
def receive_image(self) -> np.ndarray | None:
"""画像をタイムスタンプ付きで非ブロッキング受信する
Returns:
受信したグレースケール画像の NumPy 配列,受信データがない場合は None
"""
if self._image_socket is None:
return None
try:
raw = self._image_socket.recv(zmq.NOBLOCK)
ts_size = struct.calcsize("d")
if len(raw) > ts_size:
self._last_image_ts = struct.unpack(
"d", raw[:ts_size],
)[0]
data = raw[ts_size:]
else:
self._last_image_ts = None
data = raw
frame = cv2.imdecode(
np.frombuffer(data, dtype=np.uint8),
cv2.IMREAD_GRAYSCALE,
)
return frame
except zmq.Again:
return None
def send_control(
self, throttle: float, steer: float,
) -> None:
"""操舵量をタイムスタンプ付きで送信する
Args:
throttle: 前後方向の出力 (-1.0 ~ +1.0)
steer: 左右方向の出力 (-1.0 ~ +1.0)
"""
if self._control_socket is None:
return
msg: dict = {
"throttle": throttle,
"steer": steer,
}
if self._last_image_ts is not None:
msg["ts"] = self._last_image_ts
payload = json.dumps(msg).encode("utf-8")
self._control_socket.send(payload, zmq.NOBLOCK)
@property
def last_image_ts(self) -> float | None:
"""最後に受信した画像のタイムスタンプを返す"""
return self._last_image_ts
def stop(self) -> None:
"""通信ソケットを閉じる"""
if self._image_socket is not None:
self._image_socket.close()
self._image_socket = None
if self._control_socket is not None:
self._control_socket.close()
self._control_socket = None
if self._context is not None:
self._context.term()
self._context = None