from gdx import gdx
import threading
import time
import csv
from abc import abstractmethod
import numpy as np
class DataWorker(threading.Thread):
def __init__(self, out_filename=""):
threading.Thread.__init__(self)
self._running = False
self._start_time = 0
@abstractmethod
def preprocess(self):
pass
@abstractmethod
def read(self):
pass
@abstractmethod
def postprocess(self):
pass
def run(self):
self.preprocess()
self._running = True
self._data = np.array([tuple([0] * self._cols)])
self._start_time = time.perf_counter()
while self._running:
measurements = self.read()
current_time = time.perf_counter() - self._start_time
measurements.insert(0, current_time)
if len(measurements) == self._cols:
self._data = np.vstack([self._data, measurements])
# print(measurements)
def stop(self):
self._running = False
self.join(1.0)
self.postprocess()
self._data = self._data[1:, :]
class GoDirectWorker(DataWorker):
def __init__(self, out_filename):
super().__init__(out_filename)
# Connect GoDirect device
self._gdx = gdx.gdx()
self._gdx.open(connection="usb")
self._gdx.select_sensors([1])
self._cols = 2
print("GoDirect connected.")
def preprocess(self):
self._gdx.start(50)
def read(self):
return self._gdx.read()
def postprocess(self):
self._gdx.stop()
self._gdx.close()
if __name__ == "__main__":
godirect_worker = GoDirectWorker("")
godirect_worker.start()
# Wait for stop
print("measurement start.")
recode_dulation = 3 # sec
for i in range(recode_dulation):
time.sleep(1)
print(i + 1, "/", recode_dulation, "sec")
# input("Press ENTER to stop")
print("measurement end.")
print(godirect_worker._data)
# Closing
godirect_worker.stop()