# -*- coding: utf-8 -*-
# GoDirectデバイス データ取得プログラム
import csv
import datetime
import threading
import time
from abc import abstractmethod
import cv2
import numpy as np
from gdx import gdx
disp = np.zeros((600, 1200, 3)) + 0.2
cv2.imshow("disp", disp)
cv2.waitKey(1)
x = 0
lx = x
ly = 0
class DataWorker(threading.Thread):
def __init__(self, out_filename=""):
"""初期化
Args:
out_filename (str, optional): データ出力のファイル名,空文字指定で出力なし. Defaults to "".
"""
threading.Thread.__init__(self)
self._running = False
self._f = None
self._writer = None
if out_filename:
self._f = open(out_filename, "w", newline="")
self._writer = csv.writer(self._f)
@abstractmethod
def preprocess(self):
pass
@abstractmethod
def read(self):
pass
@abstractmethod
def postprocess(self):
pass
def run(self):
"""データ取得開始"""
self.preprocess()
self._running = True
self._data = np.array([tuple([0] * self._cols)])
self._start_time = datetime.datetime.now()
self._start = time.perf_counter()
while self._running:
measurements = self.read()
print(measurements[0])
global x, disp, lx, ly
x += 2
if x >= 1200:
x = 0
disp = np.zeros((600, 1200, 3)) + 0.2
y = int(-measurements[0] * 50 + 500)
cv2.line(disp, (lx, ly), (x, y), (0, 1.0, 1.0), 2)
ly = y
lx = x
# cv2.circle(disp, (x, y), 3, (0, 1.0, 1.0), 2)
elapse = time.perf_counter() - self._start
current = self._start_time + datetime.timedelta(seconds=elapse)
measurements.insert(0, current.strftime("%H:%M:%S.%f"))
if self._writer:
self._writer.writerow(measurements)
if len(measurements) == self._cols:
self._data = np.vstack([self._data, measurements])
# print(measurements)
def stop(self):
"""停止"""
self._running = False
self.join(1.0)
self.postprocess()
if self._f:
self._f.close()
self._data = self._data[1:, :]
class GoDirectWorker(DataWorker):
def __init__(self, out_filename):
super().__init__(out_filename)
# Connect GoDirect device
self._gdx = gdx.gdx()
self._gdx.open(connection="usb")
info = self._gdx.device_info()
if info is None:
raise "GoDirect device not found"
self._gdx.select_sensors([1])
self._cols = 2
print("GoDirect connected.")
def preprocess(self):
self._gdx.start(50)
if self._writer:
self._writer.writerow(["time", "force"])
def read(self):
return self._gdx.read()
def postprocess(self):
self._gdx.stop()
self._gdx.close()
if __name__ == "__main__":
# Initialize devices
filename = datetime.datetime.now().strftime("GD_%Y%m%d_%H%M%S.csv")
godirect_worker = GoDirectWorker(out_filename=filename)
# Launch worker threads and start measurements
godirect_worker.start()
# Wait for stop
print("measurement start.")
while 1:
cv2.imshow("disp", disp)
cv2.waitKey(1)
# winsound.Beep(1000, 200)
# recode_dulation = 5 # sec
# for i in range(recode_dulation):
# time.sleep(1)
# print(i + 1, "/", recode_dulation, "sec")
# # input("Press ENTER to stop")
# print("measurement end.")
# winsound.Beep(1000, 150)
# time.sleep(0.15)
# winsound.Beep(1000, 150)
# Closing
# godirect_worker.stop()