import csv
import datetime
import glob
import os
import sys
import threading
import time
import winsound
import numpy as np
import serial
import serial.tools.list_ports
from scipy.interpolate import interp1d
import config
from dobot import DobotMeasurer
class CM700D:
# 初期化
def initialize(self):
if not self.open_port():
return False
self.wl = np.linspace(400, 700, 31)
self.cmf = self.read_spectrum_csv("cmf.csv")
# print(self.cmf)
self.d65 = self.read_spectrum_csv("d65.csv")
# print(self.d65)
self.K = 100.0 / (self.d65[:, 0] @ self.cmf[:, 1])
self.XYZw = ((self.d65.T @ self.cmf) * self.K)[0]
print("White point of D65 = ", self.XYZw)
self.dobot = None
return True
# 分光分布csv読み込み
def read_spectrum_csv(self, filename):
# csv読み込み
with open(filename, newline="") as f:
csvreader = csv.reader(f)
data_org = np.array([[float(val) for val in row] for row in csvreader])
wl_org = data_org[:, 0]
# return wl_org, data[:, 1:]
data = np.zeros([len(self.wl), data_org.shape[-1] - 1])
for i in range(data.shape[-1]):
resampler = interp1d(wl_org, data_org[:, 1 + i], kind="linear")
data[:, i] = resampler(self.wl)
return data
# COMポートを開く
def open_port(self):
ports = list(serial.tools.list_ports.comports())
if len(ports) == 0:
print(f"シリアルデバイス {config.DEVICE_NAME} が見つかりません")
return False
found = False
for port in ports:
try:
print(f"Checking {port.device} ... ", end="")
self.ser = serial.Serial(
port.device,
config.BAUD_RATE,
timeout=config.TIMEOUT,
writeTimeout=config.TIMEOUT,
)
if self.ser.is_open:
recv = cm700d.send_command("IDR")
print(recv)
if recv[0] == "OK00":
print(f"CM-700dと接続しました ({port.device})")
found = True
break
self.ser.close()
except Exception as e:
print(str(e))
return found
# コマンド送受信
def send_command(self, command, validate=True):
try:
self.ser.write((command + "\r\n").encode())
data = self.ser.read(1024)
recv_items = [x.strip() for x in data.decode().split(",")]
if recv_items[0] != config.CODE_OK and validate:
print(f"コマンド{command}に失敗しました {recv_items[0]}")
return recv_items
except Exception as e:
return ["error", str(e)]
# COMポートを閉じる
def close_port(self):
self.ser.close()
# 保存されている計測サンプル数を取得
def get_num_samples(self):
recv = self.send_command("STR")
self.num_samples = int(recv[5])
print(f"本体保存サンプル数は{self.num_samples}個です")
# 保存されている計測サンプルを全て取得
def read_all_spectrum(self):
self.get_num_samples()
self.read_spectrum(1, self.num_samples)
# 一部の計測サンプルを取得
def read_partial_spectrum(self):
self.get_num_samples()
start_no = int(input("開始サンプル番号>"))
end_no = int(input("終了サンプル番号>"))
if end_no > self.num_samples:
end_no = self.num_samples
self.read_spectrum(start_no, end_no)
# 計測サンプルを読み込む
def read_spectrum(self, start_no, end_no):
# 基本データ受信
recv = self.send_command("IDR")
wavelen = range(int(recv[5]), int(recv[6]) + 1, int(recv[7]))
spectrum = []
num_read = end_no - start_no + 1
for i in range(num_read):
idx = i + start_no
print("\r", "%3d/%3d" % (i + 1, num_read), f" Reading {idx}", end="")
recv = self.send_command(
f"SDR,{idx},{config.MEAS_TYPE_DICT[config.MEAS_TYPE]}"
)
data = np.asarray(recv[1:], dtype=np.float32) / 10000.0
spectrum.append([idx] + data.tolist())
print(" done")
# CSVファイルに保存
if len(spectrum) > 0:
header = "no," + (",".join(map(str, wavelen)))
self.save_csv(spectrum, header)
# CSVファイルに保存
@staticmethod
def save_csv(data, header, filename=""):
# 保存ファイル名の生成
if filename == "":
cur_time = datetime.datetime.now()
datestr = cur_time.strftime("%Y%m%d")
timestr = cur_time.strftime('%H%M%S')
filename = ""
file_list = glob.glob("./*.csv")
print(file_list)
for i in range(1, 6):
for ab in ["A", "B"]:
fn = f"tcc6-{i}{ab}_{datestr}"
if (filename == "") and len([f for f in file_list if fn in f]) == 0:
filename = f"{fn}_{timestr}.csv"
infn = input(f"csv保存ファイル名( '{filename}' はenter, 'q'で保存しない) >")
if infn == "q":
return
if len(infn) > 0:
filename = infn
if len(filename) < 1:
return
# csv保存
np.savetxt(
filename,
data,
delimiter=",",
header=header,
comments="",
fmt="%.4f",
)
print(f"CSVファイル {filename} に保存しました")
# 測定条件
def mesuring_condition(self):
recv = self.send_command("CPR")
if recv[0] == config.CODE_OK:
print(
f"測定条件 測定径 {config.MEAS_DIAMETER[recv[1]]}"
+ f" 測定モード {config.MEAS_MODE[recv[2]]}"
+ f" 待ち時間 {float(recv[3]) / 10}秒"
+ f" 自動平均回数 {recv[4]}回"
+ f" 手動平均回数 {recv[5]}回"
)
# 手動計測
def manual_measurement(self):
self.mesuring_condition()
data = []
while True:
winsound.Beep(1000, 100)
ret = input(f"No. {len(data) + 1}: ENTERで計測('q'で終了)")
if ret == "q":
break
self.measurement(data)
if len(data) > 0:
header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
self.save_csv(data, header)
# インターバル計測
def interval_measurement(self):
self.mesuring_condition()
num_samples = int(input("測定回数>"))
interval = float(input("間隔(秒)>"))
data = []
for i in range(num_samples):
print("\r", "%3d/%3d" % (i + 1, num_samples), end="")
winsound.Beep(1500, 100)
self.measurement(data)
time.sleep(interval)
winsound.Beep(1500, 1000)
if len(data) > 0:
header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
self.save_csv(data, header)
# Dobotを使った計測
def dobot_measurement(self, filename=""):
if filename == "":
self.mesuring_condition()
self.dobot = DobotMeasurer()
self.move_next(0) # Move to the first position
data = []
num_samples = len(config.CC_XY)
# num_samples = 1
for i in range(num_samples):
print("%2d/%2d " % (i + 1, num_samples), end="")
self.measurement(data, i + 1 if i < num_samples - 1 else -1)
if filename == "":
winsound.Beep(800, 1000)
self.dobot.up(wait=True)
self.dobot = None # Close the Dobot connection
if len(data) > 0:
header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
self.save_csv(data, header, filename)
# Dobotを使った繰り返し計測
def repeat_dobot(self):
self.mesuring_condition()
num_samples = int(input("測定回数>"))
interval_min = float(input("間隔(分)>"))
for i in range(num_samples):
start_time = time.time()
datestr = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"\nDobot計測開始 {i + 1}/{num_samples} {datestr}")
self.dobot_measurement(f"dobot{i + 1:03}_{datestr}.csv")
datestr = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"Dobot計測終了 {i + 1}/{num_samples} {datestr}")
wait_sec = start_time + interval_min * 60 - time.time()
if i < num_samples - 1 and wait_sec > 0:
print(f"次の計測まで{wait_sec // 60}分待機します")
time.sleep(wait_sec)
# Dobotを使って次の位置へ移動
def move_next(self, idx):
self.dobot.up(wait=True)
self.dobot.move(config.CC_XY[idx][0], config.CC_XY[idx][1], wait=True)
self.dobot.down(wait=True)
# 測定
def measurement(self, data, idx=-1):
print("測定中...", end="")
recv = self.send_command("MES,1")
start_time = time.time()
while time.time() - start_time < config.MEAS_TO_MOVE:
time.sleep(0.1)
if idx >= 0:
thread1 = threading.Thread(target=self.move_next, args=(idx,))
thread1.start()
while time.time() - start_time < config.MEAS_TO_RECEIVE:
time.sleep(0.1)
while True:
elapsed_time = time.time() - start_time
recv = self.send_command("MDR,2", validate=False)
# elapsed_time2 = time.time() - start_time
# print(elapsed_time1, elapsed_time2, recv[0])
if recv[0] == config.CODE_OK:
break
if elapsed_time > config.MEAS_TIMEOUT:
print("timeout")
return
if idx >= 0:
thread1.join()
print("done")
spectrum = np.asarray(recv[1:], dtype=np.float32) / 10000.0
XYZ = (((self.d65.T * spectrum) @ self.cmf) * self.K)[0]
# print("XYZ: ", XYZ)
lab = self.xyz_to_lab(XYZ)
print("Lab: ", lab)
# print("X %s, Y %s, Z %s" % (XYZcc24[0], XYZcc24[1], XYZcc24[2]))
# color = [self.send_command(f"COR,2,1,4,{i}")[1] for i in range(1, 9)]
# print("X %s, Y %s, Z %s" % (color[0], color[1], color[2]))
# print("L* %s, a* %s, b* %s" % (color[5], color[6], color[7]))
count = len(data) + 1
data.append([count] + XYZ.tolist() + lab + spectrum.tolist())
# XYZからLabへ変換
def xyz_to_lab(self, xyz):
x, y, z = xyz
xw, yw, zw = self.XYZw
fx = self.f(x / xw)
fy = self.f(y / yw)
fz = self.f(z / zw)
return [
float((116.0 * fy - 16.0)),
float(500.0 * (fx - fy)),
float(200.0 * (fy - fz)),
]
# f関数
def f(self, t):
if t > (6.0 / 29.0) ** 3.0:
return t ** (1.0 / 3.0)
else:
return (841.0 / 108.0) * t + (4.0 / 29.0)
# メイン
if __name__ == "__main__":
cm700d = CM700D()
if not cm700d.initialize():
print("デバイスに接続できないため終了します")
sys.exit()
while True:
print("\r\n----- CM700Reader 操作メニュー -----")
print("1. 全ての本体データを取得")
print("2. 一部の本体データを取得")
print("3. 手動計測")
print("4. インターバル計測")
print("5. Dobot制御 単回計測")
print("6. Dobot制御 繰り返し計測")
print("0. 終了")
n = input("選択>")
if n == "1":
cm700d.read_all_spectrum()
elif n == "2":
cm700d.read_partial_spectrum()
elif n == "3":
cm700d.manual_measurement()
elif n == "4":
cm700d.interval_measurement()
elif n == "5":
cm700d.dobot_measurement()
elif n == "6":
cm700d.repeat_dobot()
else:
break
cm700d.close_port()