"""
zmq_client
PC 側の ZMQ 通信を担当するモジュール
テレメトリ受信(画像+検出結果+操舵量)と
コマンド送信(モード切替・パラメータ更新・手動操作)を行う
"""
import json
import struct
import cv2
import numpy as np
import zmq
from common import config
class PcZmqClient:
"""PC 側の ZMQ 通信クライアント
テレメトリ受信(SUB)とコマンド送信(PUB)の2チャネルを管理する
"""
def __init__(self) -> None:
self._context: zmq.Context | None = None
self._telemetry_socket: zmq.Socket | None = None
self._command_socket: zmq.Socket | None = None
def start(self) -> None:
"""通信ソケットを初期化してバインドする"""
self._context = zmq.Context()
# テレメトリ受信ソケット(SUB,Pi からの画像+状態を受信)
self._telemetry_socket = self._context.socket(
zmq.SUB,
)
self._telemetry_socket.setsockopt(
zmq.CONFLATE, 1,
)
self._telemetry_socket.setsockopt_string(
zmq.SUBSCRIBE, "",
)
self._telemetry_socket.bind(
config.image_bind_address(),
)
# コマンド送信ソケット(PUB,Pi へコマンドを送信)
self._command_socket = self._context.socket(
zmq.PUB,
)
self._command_socket.bind(
config.control_bind_address(),
)
def receive_telemetry(
self,
) -> tuple[dict, np.ndarray, np.ndarray | None] | None:
"""テレメトリを非ブロッキングで受信する
Returns:
(telemetry_dict, camera_frame, binary_image) のタプル,
受信データがない場合は None.
binary_image はデータがない場合 None
"""
if self._telemetry_socket is None:
return None
try:
raw = self._telemetry_socket.recv(
zmq.NOBLOCK,
)
offset = 0
# JSON ヘッダを読み取り
json_len = struct.unpack_from(
"<I", raw, offset,
)[0]
offset += 4
telemetry = json.loads(
raw[offset:offset + json_len]
.decode("utf-8"),
)
offset += json_len
# バージョンチェック
msg_ver = telemetry.get("v", 0)
if msg_ver != config.TELEMETRY_VERSION:
print(
f"PC: テレメトリバージョン不一致 "
f"(受信={msg_ver}, "
f"期待={config.TELEMETRY_VERSION})"
)
return None
# カメラ画像を読み取り
cam_len = struct.unpack_from(
"<I", raw, offset,
)[0]
offset += 4
cam_data = raw[offset:offset + cam_len]
offset += cam_len
frame = cv2.imdecode(
np.frombuffer(cam_data, dtype=np.uint8),
cv2.IMREAD_GRAYSCALE,
)
# 二値画像(残りがあれば)
binary = None
if offset < len(raw):
bin_data = raw[offset:]
binary = cv2.imdecode(
np.frombuffer(
bin_data, dtype=np.uint8,
),
cv2.IMREAD_GRAYSCALE,
)
return (telemetry, frame, binary)
except zmq.Again:
return None
def send_command(self, command: dict) -> None:
"""コマンドを Pi に送信する
Args:
command: コマンド辞書
"""
if self._command_socket is None:
return
payload = json.dumps(command).encode("utf-8")
self._command_socket.send(
payload, zmq.NOBLOCK,
)
def stop(self) -> None:
"""通信ソケットを閉じる"""
if self._telemetry_socket is not None:
self._telemetry_socket.close()
self._telemetry_socket = None
if self._command_socket is not None:
self._command_socket.close()
self._command_socket = None
if self._context is not None:
self._context.term()
self._context = None