Newer
Older
CM700Reader / cm700reader.py
import csv
import sys
import time
import winsound

import numpy as np
import serial
import serial.tools.list_ports
from scipy.interpolate import interp1d

import config


class CM700D:
    def initialize(self):
        if not self.open_port():
            return False

        self.wl = np.linspace(400, 700, 31)
        self.cmf = self.read_spectrum_csv("cmf.csv")
        # print(self.cmf)
        self.d65 = self.read_spectrum_csv("d65.csv")
        # print(self.d65)
        self.K = 100.0 / (self.d65[:, 0] @ self.cmf[:, 1])
        self.XYZw = ((self.d65.T @ self.cmf) * self.K)[0]
        print("White point of D65 = ", self.XYZw)

        return True

    def read_spectrum_csv(self, filename):
        # csv読み込み
        with open(filename, newline="") as f:
            csvreader = csv.reader(f)
            data_org = np.array([[float(val) for val in row] for row in csvreader])
        wl_org = data_org[:, 0]

        # return wl_org, data[:, 1:]
        data = np.zeros([len(self.wl), data_org.shape[-1] - 1])
        for i in range(data.shape[-1]):
            resampler = interp1d(wl_org, data_org[:, 1 + i], kind="linear")
            data[:, i] = resampler(self.wl)
        return data

    # デバイス名でCOMポート番号を検索
    @staticmethod
    def find_port(device_str):
        ports = list(serial.tools.list_ports.comports())
        found_list = list(filter(lambda x: device_str in x.description, ports))
        if len(found_list) == 0:
            return ""
        return found_list[0].device

    # COMポートを開く
    def open_port(self):
        port = self.find_port(config.DEVICE_NAME)
        if port == "":
            print(f"シリアルデバイス {config.DEVICE_NAME} が見つかりません")
            return False
        self.ser = serial.Serial(port, config.BAUD_RATE, timeout=config.TIMEOUT)
        if not self.ser.is_open:
            print(f"COMポート{port}を開くことができませんでした")
            return False
        print(f"CM-700dと接続しました ({port})")
        return True

    # コマンド送受信
    def send_command(self, command, validate=True):
        self.ser.write((command + "\r\n").encode())
        data = self.ser.read(1024)
        recv_items = [x.strip() for x in data.decode().split(",")]
        if recv_items[0] != config.CODE_OK and validate:
            print(f"コマンド{command}に失敗しました {recv_items[0]}")
        return recv_items

    # COMポートを閉じる
    def close_port(self):
        self.ser.close()

    def get_num_samples(self):
        recv = self.send_command("STR")
        self.num_samples = int(recv[5])
        print(f"本体保存サンプル数は{self.num_samples}個です")

    def read_all_spectrum(self):
        self.get_num_samples()
        self.read_spectrum(1, self.num_samples)

    def read_partial_spectrum(self):
        self.get_num_samples()
        start_no = int(input("開始サンプル番号>"))
        end_no = int(input("終了サンプル番号>"))
        if end_no > self.num_samples:
            end_no = self.num_samples
        self.read_spectrum(start_no, end_no)

    def read_spectrum(self, start_no, end_no):
        # 基本データ受信
        recv = self.send_command("IDR")
        wavelen = range(int(recv[5]), int(recv[6]) + 1, int(recv[7]))

        spectrum = []
        num_read = end_no - start_no + 1
        for i in range(num_read):
            idx = i + start_no
            print("\r", "%3d/%3d" % (i + 1, num_read), f" Reading {idx}", end="")
            recv = self.send_command(
                f"SDR,{idx},{config.MEAS_TYPE_DICT[config.MEAS_TYPE]}"
            )
            data = np.asarray(recv[1:], dtype=np.float32) / 10000.0
            spectrum.append([idx] + data.tolist())
        print("    done")

        # CSVファイルに保存
        if len(spectrum) > 0:
            header = "no," + (",".join(map(str, wavelen)))
            self.save_csv(spectrum, header)

    # CSVファイルに保存
    @staticmethod
    def save_csv(data, header):
        # if len(self.data) > 0:
        filename = input("csv保存ファイル名(未記入で保存しない) >")
        if len(filename) > 0:
            np.savetxt(
                filename,
                data,
                delimiter=",",
                header=header,
                comments="",
                fmt="%.4f",
            )
            print(f"CSVファイル {filename} に保存しました")

    # 手動計測
    def manual_measurement(self):
        # 測定条件
        recv = self.send_command("CPR")
        print("測定条件:", recv)

        data = []
        while True:
            winsound.Beep(1000, 100)
            ret = input(f"No. {len(data) + 1}: ENTERで計測('q'で終了)")
            if ret == "q":
                break
            self.measurement(data)

        if len(data) > 0:
            header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
            self.save_csv(data, header)

    # インターバル計測
    def interval_measurement(self):
        # 測定条件
        recv = self.send_command("CPR")
        print("測定条件:", recv)

        num_samples = int(input("測定回数>"))
        interval = float(input("間隔(秒)>"))

        data = []
        for i in range(num_samples):
            print("\r", "%3d/%3d" % (i + 1, num_samples), end="")
            winsound.Beep(1500, 100)
            self.measurement(data)
            time.sleep(interval)
        winsound.Beep(1500, 1000)

        if len(data) > 0:
            header = "no,X,Y,Z,L*,a*,b*," + (",".join(map(str, self.wl)))
            self.save_csv(data, header)

    # 測定
    def measurement(self, data):
        print("測定中...", end="")
        recv = self.send_command("MES,1")
        repeat = 0
        while True:
            time.sleep(0.5)
            recv = self.send_command("MDR,2", validate=False)
            if recv[0] == config.CODE_OK:
                break
            if repeat > 10:
                print("timeout")
                return
            repeat += 1
        print("done")

        spectrum = np.asarray(recv[1:], dtype=np.float32) / 10000.0
        XYZ = (((self.d65.T * spectrum) @ self.cmf) * self.K)[0]
        # print("XYZ: ", XYZ)
        lab = self.xyz_to_lab(XYZ)
        print("Lab: ", lab)
        # print("X %s, Y %s, Z %s" % (XYZcc24[0], XYZcc24[1], XYZcc24[2]))

        # color = [self.send_command(f"COR,2,1,4,{i}")[1] for i in range(1, 9)]
        # print("X %s, Y %s, Z %s" % (color[0], color[1], color[2]))
        # print("L* %s, a* %s, b* %s" % (color[5], color[6], color[7]))

        count = len(data) + 1
        data.append([count] + XYZ.tolist() + lab + spectrum.tolist())

    def xyz_to_lab(self, xyz):
        x, y, z = xyz
        xw, yw, zw = self.XYZw
        fx = self.f(x / xw)
        fy = self.f(y / yw)
        fz = self.f(z / zw)
        return [
            float((116.0 * fy - 16.0)),
            float(500.0 * (fx - fy)),
            float(200.0 * (fy - fz)),
        ]

    def f(self, t):
        if t > (6.0 / 29.0) ** 3.0:
            return t ** (1.0 / 3.0)
        else:
            return (841.0 / 108.0) * t + (4.0 / 29.0)


# メイン
if __name__ == "__main__":
    cm700d = CM700D()

    if not cm700d.initialize():
        sys.exit()

    while True:
        print("\r\n----- CM700Reader 操作メニュー -----")
        print("1. 全ての本体データを取得")
        print("2. 一部の本体データを取得")
        print("3. 手動計測")
        print("4. インターバル計測")
        print("0. 終了")
        n = input("選択>")
        if n == "1":
            cm700d.read_all_spectrum()
        elif n == "2":
            cm700d.read_partial_spectrum()
        elif n == "3":
            cm700d.manual_measurement()
        elif n == "4":
            cm700d.interval_measurement()
        else:
            break

    cm700d.close_port()