Newer
Older
CM700Reader / cm700reader.py
import csv
import datetime
import os
import sys
import threading
import time
import winsound

import numpy as np
import serial
import serial.tools.list_ports
from scipy.interpolate import interp1d

import config
from dobot import DobotMeasurer


class CM700D:
    # 初期化
    def initialize(self):
        if not self.open_port():
            return False

        self.wl = np.linspace(400, 700, 31)
        self.cmf = self.read_spectrum_csv("cmf.csv")
        # print(self.cmf)
        self.d65 = self.read_spectrum_csv("d65.csv")
        # print(self.d65)
        self.K = 100.0 / (self.d65[:, 0] @ self.cmf[:, 1])
        self.XYZw = ((self.d65.T @ self.cmf) * self.K)[0]
        print("White point of D65 = ", self.XYZw)
        self.dobot = None

        return True

    # 分光分布csv読み込み
    def read_spectrum_csv(self, filename):
        # csv読み込み
        with open(filename, newline="") as f:
            csvreader = csv.reader(f)
            data_org = np.array([[float(val) for val in row] for row in csvreader])
        wl_org = data_org[:, 0]

        # return wl_org, data[:, 1:]
        data = np.zeros([len(self.wl), data_org.shape[-1] - 1])
        for i in range(data.shape[-1]):
            resampler = interp1d(wl_org, data_org[:, 1 + i], kind="linear")
            data[:, i] = resampler(self.wl)
        return data

    # COMポートを開く
    def open_port(self):
        ports = list(serial.tools.list_ports.comports())
        if len(ports) == 0:
            print(f"シリアルデバイス {config.DEVICE_NAME} が見つかりません")
            return False

        found = False
        for port in ports:
            try:
                print(f"Checking {port.device} ... ", end="")
                self.ser = serial.Serial(
                    port.device,
                    config.BAUD_RATE,
                    timeout=config.TIMEOUT,
                    writeTimeout=config.TIMEOUT,
                )
                if self.ser.is_open:
                    recv = cm700d.send_command("IDR")
                    print(recv)
                    if recv[0] == "OK00":
                        print(f"CM-700dと接続しました ({port.device})")
                        found = True
                        break
                    self.ser.close()
            except Exception as e:
                print(str(e))
        return found

    # コマンド送受信
    def send_command(self, command, validate=True):
        try:
            self.ser.write((command + "\r\n").encode())
            data = self.ser.read(1024)
            recv_items = [x.strip() for x in data.decode().split(",")]
            if recv_items[0] != config.CODE_OK and validate:
                print(f"コマンド{command}に失敗しました {recv_items[0]}")
            return recv_items
        except Exception as e:
            return ["error", str(e)]

    # COMポートを閉じる
    def close_port(self):
        self.ser.close()

    # 保存されている計測サンプル数を取得
    def get_num_samples(self):
        recv = self.send_command("STR")
        self.num_samples = int(recv[5])
        print(f"本体保存サンプル数は{self.num_samples}個です")

    # 保存されている計測サンプルを全て取得
    def read_all_spectrum(self):
        self.get_num_samples()
        self.read_spectrum(1, self.num_samples)

    # 一部の計測サンプルを取得
    def read_partial_spectrum(self):
        self.get_num_samples()
        start_no = int(input("開始サンプル番号>"))
        end_no = int(input("終了サンプル番号>"))
        if end_no > self.num_samples:
            end_no = self.num_samples
        self.read_spectrum(start_no, end_no)

    # 計測サンプルを読み込む
    def read_spectrum(self, start_no, end_no):
        # 基本データ受信
        recv = self.send_command("IDR")
        wavelen = range(int(recv[5]), int(recv[6]) + 1, int(recv[7]))

        spectrum = []
        num_read = end_no - start_no + 1
        for i in range(num_read):
            idx = i + start_no
            print("\r", "%3d/%3d" % (i + 1, num_read), f" Reading {idx}", end="")
            recv = self.send_command(
                f"SDR,{idx},{config.MEAS_TYPE_DICT[config.MEAS_TYPE]}"
            )
            data = np.asarray(recv[1:], dtype=np.float32) / 10000.0
            spectrum.append([idx] + data.tolist())
        print("    done")

        # CSVファイルに保存
        if len(spectrum) > 0:
            header = "no," + (",".join(map(str, wavelen)))
            self.save_csv(spectrum, header)

    # CSVファイルに保存
    @staticmethod
    def save_csv(data, header, filename=""):
        # 保存ファイル名の生成
        if filename == "":
            datestr = datetime.datetime.now().strftime("%m%d")
            filename = ""
            for i in range(2, 6):
                for ab in ["a", "b"]:
                    fn = f"tcc6-{i}_{datestr}{ab}.csv"
                    if (filename == "") and (not os.path.exists(fn)):
                        filename = fn
            infn = input(f"csv保存ファイル名( '{filename}' はenter, 'q'で保存しない) >")
            if infn == "q":
                return
            if len(infn) > 0:
                filename = infn
            if len(filename) < 1:
                return

        # csv保存
        np.savetxt(
            filename,
            data,
            delimiter=",",
            header=header,
            comments="",
            fmt="%.4f",
        )
        print(f"CSVファイル {filename} に保存しました")

    # 測定条件
    def mesuring_condition(self):
        recv = self.send_command("CPR")
        if recv[0] == config.CODE_OK:
            print(
                f"測定条件 測定径 {config.MEAS_DIAMETER[recv[1]]}"
                + f" 測定モード {config.MEAS_MODE[recv[2]]}"
                + f" 待ち時間 {float(recv[3]) / 10}秒"
                + f" 自動平均回数 {recv[4]}回"
                + f" 手動平均回数 {recv[5]}回"
            )

    # 手動計測
    def manual_measurement(self):
        self.mesuring_condition()

        data = []
        while True:
            winsound.Beep(1000, 100)
            ret = input(f"No. {len(data) + 1}: ENTERで計測('q'で終了)")
            if ret == "q":
                break
            self.measurement(data)

        if len(data) > 0:
            header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
            self.save_csv(data, header)

    # インターバル計測
    def interval_measurement(self):
        self.mesuring_condition()

        num_samples = int(input("測定回数>"))
        interval = float(input("間隔(秒)>"))

        data = []
        for i in range(num_samples):
            print("\r", "%3d/%3d" % (i + 1, num_samples), end="")
            winsound.Beep(1500, 100)
            self.measurement(data)
            time.sleep(interval)
        winsound.Beep(1500, 1000)

        if len(data) > 0:
            header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
            self.save_csv(data, header)

    # Dobotを使った計測
    def dobot_measurement(self, filename=""):
        if filename == "":
            self.mesuring_condition()

        self.dobot = DobotMeasurer()
        self.move_next(0)  # Move to the first position

        data = []
        num_samples = len(config.CC_XY)
        num_samples = 3
        for i in range(num_samples):
            print("\r", "%3d/%3d" % (i + 1, num_samples), end="")
            self.measurement(data, i + 1 if i < num_samples - 1 else -1)
        if filename == "":
            winsound.Beep(800, 1000)
        self.dobot.up(wait=True)

        self.dobot = None  # Close the Dobot connection

        if len(data) > 0:
            header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
            self.save_csv(data, header, filename)

    # Dobotを使った繰り返し計測
    def repeat_dobot(self):
        self.mesuring_condition()

        num_samples = int(input("測定回数>"))
        interval_min = float(input("間隔(分)>"))

        for i in range(num_samples):
            start_time = time.time()
            datestr = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            print(f"\nDobot計測開始 {i + 1}/{num_samples} {datestr}")
            self.dobot_measurement(f"dobot_{i + 1}_{datestr}.csv")
            datestr = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            print(f"Dobot計測終了 {i + 1}/{num_samples} {datestr}")

            wait_sec = start_time + interval_min * 60 - time.time()
            if i < num_samples - 1 and wait_sec > 0:
                print(f"次の計測まで{wait_sec // 60}分待機します")
                time.sleep(wait_sec)

    # Dobotを使って次の位置へ移動
    def move_next(self, idx):
        self.dobot.up(wait=True)
        self.dobot.move(config.CC_XY[idx][0], config.CC_XY[idx][1], wait=True)
        self.dobot.down(wait=True)

    # 測定
    def measurement(self, data, idx=-1):
        print("測定中...", end="")
        recv = self.send_command("MES,1")
        start_time = time.time()
        while time.time() - start_time < config.MEAS_TO_MOVE:
            time.sleep(0.1)
        if idx >= 0:
            thread1 = threading.Thread(target=self.move_next, args=(idx,))
            thread1.start()
        while time.time() - start_time < config.MEAS_TO_RECEIVE:
            time.sleep(0.1)

        while True:
            elapsed_time = time.time() - start_time
            recv = self.send_command("MDR,2", validate=False)
            # elapsed_time2 = time.time() - start_time
            # print(elapsed_time1, elapsed_time2, recv[0])

            if recv[0] == config.CODE_OK:
                break
            if elapsed_time > config.MEAS_TIMEOUT:
                print("timeout")
                return
        if idx >= 0:
            thread1.join()
        print("done")

        spectrum = np.asarray(recv[1:], dtype=np.float32) / 10000.0
        XYZ = (((self.d65.T * spectrum) @ self.cmf) * self.K)[0]
        # print("XYZ: ", XYZ)
        lab = self.xyz_to_lab(XYZ)
        print("Lab: ", lab)
        # print("X %s, Y %s, Z %s" % (XYZcc24[0], XYZcc24[1], XYZcc24[2]))

        # color = [self.send_command(f"COR,2,1,4,{i}")[1] for i in range(1, 9)]
        # print("X %s, Y %s, Z %s" % (color[0], color[1], color[2]))
        # print("L* %s, a* %s, b* %s" % (color[5], color[6], color[7]))

        count = len(data) + 1
        data.append([count] + XYZ.tolist() + lab + spectrum.tolist())

    # XYZからLabへ変換
    def xyz_to_lab(self, xyz):
        x, y, z = xyz
        xw, yw, zw = self.XYZw
        fx = self.f(x / xw)
        fy = self.f(y / yw)
        fz = self.f(z / zw)
        return [
            float((116.0 * fy - 16.0)),
            float(500.0 * (fx - fy)),
            float(200.0 * (fy - fz)),
        ]

    # f関数
    def f(self, t):
        if t > (6.0 / 29.0) ** 3.0:
            return t ** (1.0 / 3.0)
        else:
            return (841.0 / 108.0) * t + (4.0 / 29.0)


# メイン
if __name__ == "__main__":
    cm700d = CM700D()

    if not cm700d.initialize():
        print("デバイスに接続できないため終了します")
        sys.exit()

    while True:
        print("\r\n----- CM700Reader 操作メニュー -----")
        print("1. 全ての本体データを取得")
        print("2. 一部の本体データを取得")
        print("3. 手動計測")
        print("4. インターバル計測")
        print("5. Dobot制御 単回計測")
        print("6. Dobot制御 繰り返し計測")

        print("0. 終了")
        n = input("選択>")
        if n == "1":
            cm700d.read_all_spectrum()
        elif n == "2":
            cm700d.read_partial_spectrum()
        elif n == "3":
            cm700d.manual_measurement()
        elif n == "4":
            cm700d.interval_measurement()
        elif n == "5":
            cm700d.dobot_measurement()
        elif n == "6":
            cm700d.repeat_dobot()
        else:
            break

    cm700d.close_port()