import threading
import time
from abc import abstractmethod
import cv2
import numpy as np
from gdx import gdx
class DataWorker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._running = False
self._start_time = 0
@abstractmethod
def preprocess(self):
pass
@abstractmethod
def read(self):
pass
@abstractmethod
def postprocess(self):
pass
def run(self):
self.preprocess()
self._running = True
self._data = np.array([tuple([0] * (self._num_channels + 1))])
self._start_time = time.perf_counter()
while self._running:
measurements = self.read()
current_time = time.perf_counter() - self._start_time
measurements.insert(0, current_time)
if len(measurements) == self._num_channels + 1:
self._data = np.vstack([self._data, measurements])
# print(measurements)
def stop(self):
self._running = False
self.join(1.0)
self.postprocess()
self._data = self._data[1:, :]
class GoDirectWorker(DataWorker):
def __init__(self):
super().__init__()
# Connect GoDirect device
self._gdx = gdx.gdx()
self._gdx.open(connection="usb")
self._gdx.select_sensors([1])
self._num_channels = 1
print("GoDirect connected.")
def preprocess(self):
self._gdx.start(50)
def read(self):
return self._gdx.read()
def postprocess(self):
self._gdx.stop()
self._gdx.close()
if __name__ == "__main__":
win_name = "measurement"
cv2.namedWindow(win_name, cv2.WINDOW_NORMAL)
display_width = 800
display_height = 600
godirect_worker = GoDirectWorker()
godirect_worker.start()
# Wait for stop
print("measurement start.")
while True:
disp = np.zeros((display_height, display_width, 3), np.uint8)
cv2.imshow(win_name, disp)
if cv2.waitKey(30) == 27:
break
# input("Press ENTER to stop")
print("measurement end.")
# Closing
godirect_worker.stop()