Newer
Older
RespDoppler / measurementGD.py
# -*- coding: utf-8 -*-
# GoDirectデバイス データ取得プログラム

import csv
import datetime
import threading
import time
from abc import abstractmethod

import cv2
import numpy as np

from gdx import gdx

disp = np.zeros((600, 1200, 3)) + 0.2
cv2.imshow("disp", disp)
cv2.waitKey(1)
x = 0
lx = x
ly = 0


class DataWorker(threading.Thread):
    def __init__(self, out_filename=""):
        """初期化

        Args:
            out_filename (str, optional): データ出力のファイル名,空文字指定で出力なし. Defaults to "".
        """
        threading.Thread.__init__(self)
        self._running = False
        self._f = None
        self._writer = None
        if out_filename:
            self._f = open(out_filename, "w", newline="")
            self._writer = csv.writer(self._f)

    @abstractmethod
    def preprocess(self):
        pass

    @abstractmethod
    def read(self):
        pass

    @abstractmethod
    def postprocess(self):
        pass

    def run(self):
        """データ取得開始"""
        self.preprocess()
        self._running = True
        self._data = np.array([tuple([0] * self._cols)])
        self._start_time = datetime.datetime.now()
        self._start = time.perf_counter()
        while self._running:
            measurements = self.read()
            print(measurements[0])
            global x, disp, lx, ly
            x += 2
            if x >= 1200:
                x = 0
                disp = np.zeros((600, 1200, 3)) + 0.2
            y = int(-measurements[0] * 50 + 500)
            cv2.line(disp, (lx, ly), (x, y), (0, 1.0, 1.0), 2)
            ly = y
            lx = x
            # cv2.circle(disp, (x, y), 3, (0, 1.0, 1.0), 2)
            elapse = time.perf_counter() - self._start
            current = self._start_time + datetime.timedelta(seconds=elapse)
            measurements.insert(0, current.strftime("%H:%M:%S.%f"))
            if self._writer:
                self._writer.writerow(measurements)
            if len(measurements) == self._cols:
                self._data = np.vstack([self._data, measurements])
            # print(measurements)

    def stop(self):
        """停止"""
        self._running = False
        self.join(1.0)
        self.postprocess()
        if self._f:
            self._f.close()
        self._data = self._data[1:, :]


class GoDirectWorker(DataWorker):
    def __init__(self, out_filename):
        super().__init__(out_filename)
        # Connect GoDirect device
        self._gdx = gdx.gdx()
        self._gdx.open(connection="usb")
        info = self._gdx.device_info()
        if info is None:
            raise "GoDirect device not found"
        self._gdx.select_sensors([1])
        self._cols = 2
        print("GoDirect connected.")

    def preprocess(self):
        self._gdx.start(50)
        if self._writer:
            self._writer.writerow(["time", "force"])

    def read(self):
        return self._gdx.read()

    def postprocess(self):
        self._gdx.stop()
        self._gdx.close()


if __name__ == "__main__":
    # Initialize devices
    filename = datetime.datetime.now().strftime("GD_%Y%m%d_%H%M%S.csv")
    godirect_worker = GoDirectWorker(out_filename=filename)

    # Launch worker threads and start measurements
    godirect_worker.start()

    # Wait for stop
    print("measurement start.")
    while 1:
        cv2.imshow("disp", disp)
        cv2.waitKey(1)

    # winsound.Beep(1000, 200)
    # recode_dulation = 5  # sec
    # for i in range(recode_dulation):
    #     time.sleep(1)
    #     print(i + 1, "/", recode_dulation, "sec")
    # # input("Press ENTER to stop")
    # print("measurement end.")
    # winsound.Beep(1000, 150)
    # time.sleep(0.15)
    # winsound.Beep(1000, 150)

    # Closing
    # godirect_worker.stop()