diff --git a/.gitignore b/.gitignore index cfc8fdb..da62c6d 100644 --- a/.gitignore +++ b/.gitignore @@ -174,5 +174,5 @@ pyrightconfig.json # End of https://www.toptal.com/developers/gitignore/api/python -Rec*.csv -Rec*.xlsx +*.csv +*.xlsx diff --git a/measurementGD.py b/measurementGD.py new file mode 100644 index 0000000..0b4ed41 --- /dev/null +++ b/measurementGD.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# GoDirectデバイス データ取得プログラム + +import csv +import datetime +import threading +import time +import winsound +from abc import abstractmethod + +import numpy as np + +from gdx import gdx + + +class DataWorker(threading.Thread): + def __init__(self, out_filename=""): + """初期化 + + Args: + out_filename (str, optional): データ出力のファイル名,空文字指定で出力なし. Defaults to "". + """ + threading.Thread.__init__(self) + self._running = False + self._f = None + self._writer = None + if out_filename: + self._f = open(out_filename, "w", newline="") + self._writer = csv.writer(self._f) + + @abstractmethod + def preprocess(self): + pass + + @abstractmethod + def read(self): + pass + + @abstractmethod + def postprocess(self): + pass + + def run(self): + """データ取得開始""" + self.preprocess() + self._running = True + self._data = np.array([tuple([0] * self._cols)]) + self._start_time = datetime.datetime.now() + self._start = time.perf_counter() + while self._running: + measurements = self.read() + elapse = time.perf_counter() - self._start + current = self._start_time + datetime.timedelta(seconds=elapse) + measurements.insert(0, current.strftime("%H:%M:%S.%f")) + if self._writer: + self._writer.writerow(measurements) + if len(measurements) == self._cols: + self._data = np.vstack([self._data, measurements]) + # print(measurements) + + def stop(self): + """停止""" + self._running = False + self.join(1.0) + self.postprocess() + if self._f: + self._f.close() + self._data = self._data[1:, :] + + +class GoDirectWorker(DataWorker): + def __init__(self, out_filename): + super().__init__(out_filename) + # Connect GoDirect device + self._gdx = gdx.gdx() + self._gdx.open(connection="usb") + info = self._gdx.device_info() + if info is None: + raise "GoDirect device not found" + self._gdx.select_sensors([1]) + self._cols = 2 + print("GoDirect connected.") + + def preprocess(self): + self._gdx.start(50) + if self._writer: + self._writer.writerow(["time", "force"]) + + def read(self): + return self._gdx.read() + + def postprocess(self): + self._gdx.stop() + self._gdx.close() + + +if __name__ == "__main__": + # Initialize devices + filename = datetime.datetime.now().strftime("GD_%Y%m%d_%H%M%S.csv") + godirect_worker = GoDirectWorker(out_filename=filename) + + # Launch worker threads and start measurements + godirect_worker.start() + + # Wait for stop + print("measurement start.") + winsound.Beep(1000, 200) + recode_dulation = 3 # sec + for i in range(recode_dulation): + time.sleep(1) + print(i + 1, "/", recode_dulation, "sec") + # input("Press ENTER to stop") + print("measurement end.") + winsound.Beep(1000, 150) + time.sleep(0.15) + winsound.Beep(1000, 150) + + # Closing + godirect_worker.stop()