Newer
Older
RespDoppler / mes_gui.py

from gdx import gdx
import threading
import time
import csv
from abc import abstractmethod
import numpy as np


class DataWorker(threading.Thread):
    def __init__(self, out_filename=""):
        threading.Thread.__init__(self)
        self._running = False
        self._start_time = 0

    @abstractmethod
    def preprocess(self):
        pass

    @abstractmethod
    def read(self):
        pass

    @abstractmethod
    def postprocess(self):
        pass

    def run(self):
        self.preprocess()
        self._running = True
        self._data = np.array([tuple([0] * self._cols)])
        self._start_time = time.perf_counter()
        while self._running:
            measurements = self.read()
            current_time = time.perf_counter() - self._start_time
            measurements.insert(0, current_time)
            if len(measurements) == self._cols:
                self._data = np.vstack([self._data, measurements])
            # print(measurements)

    def stop(self):
        self._running = False
        self.join(1.0)
        self.postprocess()
        self._data = self._data[1:, :]

class GoDirectWorker(DataWorker):
    def __init__(self, out_filename):
        super().__init__(out_filename)
        # Connect GoDirect device
        self._gdx = gdx.gdx()
        self._gdx.open(connection="usb")
        self._gdx.select_sensors([1])
        self._cols = 2
        print("GoDirect connected.")

    def preprocess(self):
        self._gdx.start(50)

    def read(self):
        return self._gdx.read()

    def postprocess(self):
        self._gdx.stop()
        self._gdx.close()

if __name__ == "__main__":
    godirect_worker = GoDirectWorker("")
    godirect_worker.start()

    # Wait for stop
    print("measurement start.")
    recode_dulation = 3  # sec
    for i in range(recode_dulation):
        time.sleep(1)
        print(i + 1, "/", recode_dulation, "sec")
    # input("Press ENTER to stop")
    print("measurement end.")
    print(godirect_worker._data)

    # Closing
    godirect_worker.stop()