import argparse
import csv
import os
import pickle
import re
import time
from threading import Thread
import cv2
import numpy as np
import pandas as pd
from mmdet.apis import DetInferencer, inference_detector, init_detector
# RTMpose
from mmpose.apis import inference_topdown
from mmpose.apis import init_model as init_pose_estimator
from mmpose.evaluation.functional import nms
from mmpose.registry import VISUALIZERS
from mmpose.structures import merge_data_samples
from mmpose.utils import adapt_mmdet_pipeline
# Pillow
from PIL import Image, ImageDraw
import config
# EARSNet
from modules.EARSNet.predictor import EARSNetPredictor
# Utilities
from util.calc_ste_position import CalcStethoscopePosition
from util.ears_ai import EarsAI
###############################################################################
# Config 値を参照
###############################################################################
CONV_COLOR = config.CONV_COLOR
XGBOOST_COLOR = config.XGBOOST_COLOR
LIGHTGBM_COLOR = config.LIGHTGBM_COLOR
EARSNET_COLOR = config.EARSNET_COLOR
CATBOOST_COLOR = config.CATBOOST_COLOR
NGBOOST_COLOR = config.NGBOOST_COLOR
CONV_ENABLED = config.CONV_ENABLED
XGBOOST_ENABLED = config.XGBOOST_ENABLED
LIGHTGBM_ENABLED = config.LIGHTGBM_ENABLED
CATBOOST_ENABLED = config.CATBOOST_ENABLED
NGBOOST_ENABLED = config.NGBOOST_ENABLED
POSENET_ENABLED = config.POSENET_ENABLED
RTMPOSE_ENABLED = config.RTMPOSE_ENABLED
MobileNetV1SSD_ENABLED = config.MOBILENETV1SSD_ENABLED
YOLOX_ENABLED = config.YOLOX_ENABLED
EARSNET_ENABLED = config.EARSNET_ENABLED
EARSNET_CROP_ENABLED = config.EARSNET_CROP_ENABLED
NORMALIZE_ENABLED = config.NORMALIZE_ENABLED
DEVICE = config.DEVICE # "cuda" or "cpu" など
###############################################################################
# リアルタイムFPS計測用のグローバル変数&スレッド定義 (描画時間は含まない)
###############################################################################
processed_frames = 0 # 推論処理が完了したフレーム数(メインスレッドでインクリメント)
stop_fps_thread = False # スレッド終了フラグ
fps_history = []
def fps_monitor(interval=1.0):
"""推論処理完了したフレーム数を定期的に見てFPSを算出する。描画時間は含まない。"""
global processed_frames, stop_fps_thread, fps_history
last_count = 0
last_time = time.time()
while not stop_fps_thread:
time.sleep(interval)
now = time.time()
current_count = processed_frames
frames_delta = current_count - last_count
time_delta = now - last_time
if time_delta > 0:
current_fps = frames_delta / time_delta
else:
current_fps = 0.0
print(
f"[FPS Monitor] Real-time FPS: {current_fps:.2f} (frames: +{frames_delta})"
)
fps_history.append((now, current_fps))
last_count = current_count
last_time = now
###############################################################################
# モデルロード系
###############################################################################
def load_model(model_path, model_type="lgb"):
with open(model_path, "rb") as model_file:
return pickle.load(model_file)
def load_scaler(scaler_path):
with open(scaler_path, "rb") as f:
return pickle.load(f)
###############################################################################
# YOLOX
###############################################################################
def init_yolox():
try:
from mmengine.registry import DefaultScope
DefaultScope.get_instance("mmdet", scope_name="mmdet")
init_args = {
"model": config.YOLOX_CONFIG_FILE,
"weights": config.YOLOX_CHECKPOINT_FILE,
"device": DEVICE,
}
yolox_inferencer = DetInferencer(**init_args)
return yolox_inferencer
except Exception as e:
print(f"Error initializing YOLOX: {str(e)}")
return None
###############################################################################
# Pillow-based drawing helpers
###############################################################################
def pillow_draw_circle(draw, center, radius, fill=None, outline=None, width=1):
x, y = int(center[0]), int(center[1])
left_up = (x - radius, y - radius)
right_down = (x + radius, y + radius)
if fill is not None:
draw.ellipse([left_up, right_down], fill=fill, outline=outline, width=width)
else:
draw.ellipse([left_up, right_down], outline=outline, width=width)
def pillow_draw_polygon(draw, vertices, outline=(0, 255, 0), width=2):
int_vertices = [(int(v[0]), int(v[1])) for v in vertices]
if len(int_vertices) > 1:
for i in range(len(int_vertices)):
j = (i + 1) % len(int_vertices)
draw.line([int_vertices[i], int_vertices[j]], fill=outline, width=width)
def pillow_draw_polyline(draw, points, color=(255, 0, 0), width=2):
if len(points) < 2:
return
int_points = [(int(p[0]), int(p[1])) for p in points]
for i in range(len(int_points) - 1):
draw.line([int_points[i], int_points[i + 1]], fill=color, width=width)
def draw_polygon_and_detection_pillow(
image, polygon_vertices, stethoscope_x, stethoscope_y
):
"""Draw polygon & stethoscope location with Pillow, then return BGR np.array."""
pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_img)
vertices = [(int(v[0]), int(v[1])) for v in polygon_vertices]
pillow_draw_polygon(draw, vertices, outline=(0, 255, 0), width=2)
if stethoscope_x is not None and stethoscope_y is not None:
x, y = int(stethoscope_x), int(stethoscope_y)
pillow_draw_circle(draw, (x, y), 10, fill=(255, 0, 0))
pillow_draw_circle(
draw, (x, y), 12, fill=None, outline=(255, 255, 255), width=2
)
out_img_rgb = np.array(pil_img)
out_img_bgr = cv2.cvtColor(out_img_rgb, cv2.COLOR_RGB2BGR)
return out_img_bgr
def yolox_detector_inference(frame, yolox_inferencer, pose_keypoints, score_thr=0.3):
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
result = yolox_inferencer(inputs=frame_rgb, return_vis=True)
predictions = result["predictions"][0]
stethoscope_x = None
stethoscope_y = None
max_score = -1
nose = pose_keypoints[0]
left_shoulder = pose_keypoints[5]
right_shoulder = pose_keypoints[6]
left_hip = pose_keypoints[11]
right_hip = pose_keypoints[12]
expanded_left_shoulder, expanded_right_shoulder = expand_points(
left_shoulder, right_shoulder
)
expanded_left_hip, expanded_right_hip = expand_points(left_hip, right_hip)
polygon_vertices = np.array(
[
nose,
expanded_left_shoulder,
expanded_left_hip,
expanded_right_hip,
expanded_right_shoulder,
]
)
for i, (label, score) in enumerate(
zip(predictions["labels"], predictions["scores"])
):
if score >= score_thr and label == 0:
bbox = predictions["bboxes"][i]
center_x = (bbox[0] + bbox[2]) / 2
center_y = (bbox[1] + bbox[3]) / 2
if point_in_polygon([center_x, center_y], polygon_vertices):
if score > max_score:
stethoscope_x = center_x
stethoscope_y = center_y
max_score = score
if stethoscope_x is None or stethoscope_y is None:
stethoscope_x = 0
stethoscope_y = 0
stethoscope_overlay_img = result["visualization"][0]
if (
len(stethoscope_overlay_img.shape) == 3
and stethoscope_overlay_img.shape[2] == 3
):
stethoscope_overlay_img = cv2.cvtColor(
stethoscope_overlay_img, cv2.COLOR_RGB2BGR
)
stethoscope_overlay_img = draw_polygon_and_detection_pillow(
stethoscope_overlay_img, polygon_vertices, stethoscope_x, stethoscope_y
)
return stethoscope_overlay_img, stethoscope_x, stethoscope_y
def expand_points(p1, p2):
mid_x = (p1[0] + p2[0]) / 2
mid_y = (p1[1] + p2[1]) / 2
vec_x = p1[0] - mid_x
vec_y = p1[1] - mid_y
new_p1 = [mid_x + vec_x * 2, mid_y + vec_y * 2]
new_p2 = [mid_x - vec_x * 2, mid_y - vec_y * 2]
return np.array(new_p1), np.array(new_p2)
def point_in_polygon(point, vertices):
x, y = point
n = len(vertices)
inside = False
j = n - 1
for i in range(n):
if (vertices[i][1] > y) != (vertices[j][1] > y):
slope = (vertices[j][0] - vertices[i][0]) / (
vertices[j][1] - vertices[i][1]
)
intersect_x = slope * (y - vertices[i][1]) + vertices[i][0]
if x < intersect_x:
inside = not inside
j = i
return inside
###############################################################################
# 各種座標変換
###############################################################################
def normalize_quadrilateral_with_point(points, extra_point):
all_points = np.vstack([points.reshape(-1, 2), extra_point])
center = np.mean(points.reshape(-1, 2), axis=0)
centered_points = all_points - center
shoulder_angle = calculate_rotation_angle(centered_points[0], centered_points[1])
hip_angle = calculate_rotation_angle(centered_points[2], centered_points[3])
average_angle = (shoulder_angle + hip_angle) / 2
rotation_matrix = np.array(
[
[np.cos(-average_angle), -np.sin(-average_angle)],
[np.sin(-average_angle), np.cos(-average_angle)],
]
)
rotated_points = np.dot(centered_points, rotation_matrix.T)
max_edge_length = np.max(
np.linalg.norm(
np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1
)
)
if max_edge_length == 0:
return rotated_points
return rotated_points / max_edge_length
def calculate_rotation_angle(point1, point2):
vector = point2 - point1
return np.arctan2(vector[1], vector[0])
def video_to_frames(video_path, output_dir):
os.makedirs(output_dir, exist_ok=True)
video = cv2.VideoCapture(video_path)
if not video.isOpened():
raise IOError(f"Could not open video file: {video_path}")
frame_num = 0
while True:
success, frame = video.read()
if not success:
break
frame_num += 1
cv2.imwrite(os.path.join(output_dir, f"{frame_num}-frame.png"), frame)
video.release()
print(f"All frames saved to {output_dir}")
###############################################################################
# RTMpose キーポイント抽出
###############################################################################
def extract_keypoints_rtmpose(pose_results):
if not pose_results:
print("No pose results found.")
return None
max_avg_visible = 0
best_instance = None
for result in pose_results:
pred_instances = result.pred_instances
for instance in pred_instances:
avg_visible = np.mean(instance.keypoints_visible)
if avg_visible > max_avg_visible:
max_avg_visible = avg_visible
best_instance = instance
if best_instance is None:
print("No valid instances found.")
return None
keypoints = best_instance.keypoints[0]
return keypoints
###############################################################################
# 胴体クロップ生成
###############################################################################
def crop_body_from_keypoints(frame, left_shoulder, right_shoulder, left_hip, right_hip):
h, w, _ = frame.shape
xs = [left_shoulder[0], right_shoulder[0], left_hip[0], right_hip[0]]
ys = [left_shoulder[1], right_shoulder[1], left_hip[1], right_hip[1]]
xmin = int(min(xs))
xmax = int(max(xs))
ymin = int(min(ys))
ymax = int(max(ys))
margin = 20
xmin = max(0, xmin - margin)
xmax = min(w, xmax + margin)
ymin = max(0, ymin - margin)
ymax = min(h, ymax + margin)
cropped_frame = frame[ymin:ymax, xmin:xmax].copy()
return cropped_frame, (xmin, ymin)
###############################################################################
# メイン処理 (推論 & 座標計算のみ) -> FPS計測対象
###############################################################################
def process_images(args, detector, pose_estimator, visualizer):
global processed_frames
ears_ai = EarsAI()
calc_position = CalcStethoscopePosition()
base_dir = os.path.join(args.output_dir, "frames")
results_dir = os.path.join(args.output_dir, "results")
csv_path = os.path.join(results_dir, "results.csv")
normalized_csv_path = os.path.join(results_dir, "results-convert.csv")
pose_overlay_dir = os.path.join(results_dir, "pose_overlay_image")
stethoscope_overlay_dir = os.path.join(results_dir, "stethoscope_overlay_image")
cropped_dir = os.path.join(results_dir, "cropped_images")
os.makedirs(results_dir, exist_ok=True)
os.makedirs(pose_overlay_dir, exist_ok=True)
os.makedirs(stethoscope_overlay_dir, exist_ok=True)
os.makedirs(cropped_dir, exist_ok=True)
png_files = sorted(
[f for f in os.listdir(base_dir) if f.lower().endswith(".png")],
key=lambda x: int(re.search(r"(\d+)", x).group(1)),
)
print(f"Found {len(png_files)} PNG files in {base_dir}.")
rows = []
normalized_rows = []
# YOLOX初期化
yolox_inferencer = None
if YOLOX_ENABLED:
yolox_inferencer = init_yolox()
# 時間計測用 dict (描画の時間は含まない)
timings = {
"rtmpose_single": [],
"yolox_single": [],
"conv_single": [],
"lightgbm_single": [],
"xgboost_single": [],
"earsnet_single": [],
"earsnet_cropped_single": [],
"pipeline_rtmpose_yolox_conv": [],
"pipeline_rtmpose_yolox_lightgbm": [],
"pipeline_rtmpose_yolox_xgboost": [],
"pipeline_earsnet": [],
"pipeline_earsnet_cropped": [],
}
# 各モデルの事前ロード
if LIGHTGBM_ENABLED:
lgb_model_x = load_model("./models/LightGBM/stethoscope_calc_x_best_model.pkl")
lgb_model_y = load_model("./models/LightGBM/stethoscope_calc_y_best_model.pkl")
lgb_scaler_x = load_scaler("./models/LightGBM/scaler-x.pkl")
lgb_scaler_y = load_scaler("./models/LightGBM/scaler-y.pkl")
if XGBOOST_ENABLED:
xg_model_x = load_model("./models/XGBoost/stethoscope_calc_x_best_model.pkl")
xg_model_y = load_model("./models/XGBoost/stethoscope_calc_y_best_model.pkl")
xg_scaler_x = load_scaler("./models/XGBoost/scaler-x.pkl")
xg_scaler_y = load_scaler("./models/XGBoost/scaler-y.pkl")
if CATBOOST_ENABLED:
catboost_model_x = load_model(
"./models/CatBoost/stethoscope_calc_x_best_model.pkl"
)
catboost_model_y = load_model(
"./models/CatBoost/stethoscope_calc_y_best_model.pkl"
)
if NGBOOST_ENABLED:
ngboost_model_x = load_model(
"./models/NGBoost/stethoscope_calc_x_best_model.pkl"
)
ngboost_model_y = load_model(
"./models/NGBoost/stethoscope_calc_y_best_model.pkl"
)
if EARSNET_ENABLED:
earsnet_predictor = EARSNetPredictor(
weight_path="models/EARSNet/best_model.pth",
resnet_depth="18",
pretrained=True,
device=DEVICE,
)
if EARSNET_CROP_ENABLED:
earsnet_cropped_predictor = EARSNetPredictor(
weight_path="models/EARSNet/crop/best_model.pth",
resnet_depth="18",
pretrained=True,
device=DEVICE,
)
input_columns = [
"left_shoulder_x",
"left_shoulder_y",
"right_shoulder_x",
"right_shoulder_y",
"left_hip_x",
"left_hip_y",
"right_hip_x",
"right_hip_y",
"stethoscope_x",
"stethoscope_y",
]
# -----------------------------
# メインループ:推論 & 座標計算のみ
# -----------------------------
for image_file_name in png_files:
image_path = os.path.join(base_dir, image_file_name)
frame = cv2.imread(image_path)
if frame is None:
print(f"Failed to load image: {image_path}")
continue
# (A) RTMPose or PoseNet
rtmpose_time = 0.0
if RTMPOSE_ENABLED:
start_time_rtmpose = time.time()
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
det_result = inference_detector(detector, frame_rgb)
pred_instance = det_result.pred_instances.cpu().numpy()
bboxes = np.concatenate(
(pred_instance.bboxes, pred_instance.scores[:, None]), axis=1
)
# 人物のみ
bboxes = bboxes[
np.logical_and(pred_instance.labels == 0, pred_instance.scores > 0.3)
]
bboxes = bboxes[nms(bboxes, 0.3), :4]
pose_results = inference_topdown(pose_estimator, frame_rgb, bboxes)
data_samples = merge_data_samples(pose_results)
pose_keypoints = extract_keypoints_rtmpose(pose_results)
end_time_rtmpose = time.time()
rtmpose_time = end_time_rtmpose - start_time_rtmpose
timings["rtmpose_single"].append(rtmpose_time)
if pose_keypoints is None:
print(f"Failed to extract keypoints for image: {image_path}")
processed_frames += 1
continue
# PoseOverlay(可視化) → 時間計測には含めない
if visualizer is not None:
visualizer.add_datasample(
"result",
frame_rgb,
data_sample=data_samples,
draw_gt=False,
draw_heatmap=False,
draw_bbox=False,
show_kpt_idx=False,
skeleton_style="mmpose",
show=False,
wait_time=0,
kpt_thr=0.3,
)
pose_overlay_img = visualizer.get_image()
pose_overlay_bgr = cv2.cvtColor(pose_overlay_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(
os.path.join(pose_overlay_dir, image_file_name), pose_overlay_bgr
)
left_shoulder = (pose_keypoints[5][0], pose_keypoints[5][1])
right_shoulder = (pose_keypoints[6][0], pose_keypoints[6][1])
left_hip = (pose_keypoints[11][0], pose_keypoints[11][1])
right_hip = (pose_keypoints[12][0], pose_keypoints[12][1])
elif POSENET_ENABLED:
start_time_rtmpose = time.time()
pose_overlay_img, *landmarks = ears_ai.pose_detect(frame, None)
end_time_rtmpose = time.time()
rtmpose_time = end_time_rtmpose - start_time_rtmpose
timings["rtmpose_single"].append(rtmpose_time)
left_shoulder = landmarks[0]
right_shoulder = landmarks[1]
left_hip = landmarks[2]
right_hip = landmarks[3]
cv2.imwrite(
os.path.join(pose_overlay_dir, image_file_name), pose_overlay_img
)
else:
left_shoulder = (0, 0)
right_shoulder = (0, 0)
left_hip = (0, 0)
right_hip = (0, 0)
# (B) YOLOX
yolox_time = 0.0
stethoscope_x, stethoscope_y = 0, 0
if YOLOX_ENABLED:
if (
RTMPOSE_ENABLED
and "pose_keypoints" in locals()
and pose_keypoints is not None
):
start_time_yolox = time.time()
stethoscope_overlay_img, stethoscope_x, stethoscope_y = (
yolox_detector_inference(frame, yolox_inferencer, pose_keypoints)
)
end_time_yolox = time.time()
yolox_time = end_time_yolox - start_time_yolox
timings["yolox_single"].append(yolox_time)
cv2.imwrite(
os.path.join(stethoscope_overlay_dir, image_file_name),
stethoscope_overlay_img,
)
elif POSENET_ENABLED:
pose_keypoints_pose_net = [[0, 0]] * 13
pose_keypoints_pose_net[5] = (left_shoulder[0], left_shoulder[1])
pose_keypoints_pose_net[6] = (right_shoulder[0], right_shoulder[1])
pose_keypoints_pose_net[11] = (left_hip[0], left_hip[1])
pose_keypoints_pose_net[12] = (right_hip[0], right_hip[1])
start_time_yolox = time.time()
stethoscope_overlay_img, stethoscope_x, stethoscope_y = (
yolox_detector_inference(
frame, yolox_inferencer, pose_keypoints_pose_net
)
)
end_time_yolox = time.time()
yolox_time = end_time_yolox - start_time_yolox
timings["yolox_single"].append(yolox_time)
cv2.imwrite(
os.path.join(stethoscope_overlay_dir, image_file_name),
stethoscope_overlay_img,
)
detection_time_rtmpose_yolox = rtmpose_time + yolox_time
row = {
"image_file_name": image_file_name,
"left_shoulder_x": left_shoulder[0],
"left_shoulder_y": left_shoulder[1],
"right_shoulder_x": right_shoulder[0],
"right_shoulder_y": right_shoulder[1],
"left_hip_x": left_hip[0],
"left_hip_y": left_hip[1],
"right_hip_x": right_hip[0],
"right_hip_y": right_hip[1],
"stethoscope_x": stethoscope_x,
"stethoscope_y": stethoscope_y,
}
# (C) EARSNet (単体)
if EARSNET_ENABLED:
start_time_earsnet = time.time()
earsnet_x, earsnet_y = earsnet_predictor.predict(image_path)
end_time_earsnet = time.time()
earsnet_time = end_time_earsnet - start_time_earsnet
timings["earsnet_single"].append(earsnet_time)
timings["pipeline_earsnet"].append(earsnet_time)
row["earsnet_stethoscope_x"] = earsnet_x
row["earsnet_stethoscope_y"] = earsnet_y
# (D) EARSNet (クロップ)
if EARSNET_CROP_ENABLED:
cropped_img, (crop_xmin, crop_ymin) = crop_body_from_keypoints(
frame, left_shoulder, right_shoulder, left_hip, right_hip
)
cropped_filename = os.path.splitext(image_file_name)[0] + "_cropped.png"
cv2.imwrite(os.path.join(cropped_dir, cropped_filename), cropped_img)
start_time_earsnet_cropped = time.time()
earsnet_cropped_x, earsnet_cropped_y = earsnet_cropped_predictor.predict(
os.path.join(cropped_dir, cropped_filename)
)
end_time_earsnet_cropped = time.time()
earsnet_cropped_time = end_time_earsnet_cropped - start_time_earsnet_cropped
timings["earsnet_cropped_single"].append(earsnet_cropped_time)
pipeline_earsnet_cropped_time = rtmpose_time + earsnet_cropped_time
timings["pipeline_earsnet_cropped"].append(pipeline_earsnet_cropped_time)
row["earsnet_crop_stethoscope_x"] = earsnet_cropped_x
row["earsnet_crop_stethoscope_y"] = earsnet_cropped_y
# (E) 正規化
source_points = np.array(
[
[float(row["left_shoulder_x"]), float(row["left_shoulder_y"])],
[float(row["right_shoulder_x"]), float(row["right_shoulder_y"])],
[float(row["left_hip_x"]), float(row["left_hip_y"])],
[float(row["right_hip_x"]), float(row["right_hip_y"])],
],
dtype=np.float32,
)
stethoscope_point = np.array(
[float(row["stethoscope_x"]), float(row["stethoscope_y"])]
)
normalized_points = normalize_quadrilateral_with_point(
source_points.flatten(), stethoscope_point
)
normalized_row = {
"image_file_name": image_file_name,
"left_shoulder_x": normalized_points[0, 0],
"left_shoulder_y": normalized_points[0, 1],
"right_shoulder_x": normalized_points[1, 0],
"right_shoulder_y": normalized_points[1, 1],
"left_hip_x": normalized_points[2, 0],
"left_hip_y": normalized_points[2, 1],
"right_hip_x": normalized_points[3, 0],
"right_hip_y": normalized_points[3, 1],
"stethoscope_x": normalized_points[4, 0],
"stethoscope_y": normalized_points[4, 1],
}
# --- EARSNet の Normalized
if EARSNET_ENABLED:
stetho_point_earsnet = np.array(
[
float(row.get("earsnet_stethoscope_x", 0)),
float(row.get("earsnet_stethoscope_y", 0)),
]
)
norm_earsnet = normalize_quadrilateral_with_point(
source_points.flatten(), stetho_point_earsnet
)
normalized_row["earsnet_stethoscope_x"] = norm_earsnet[4, 0]
normalized_row["earsnet_stethoscope_y"] = norm_earsnet[4, 1]
if EARSNET_CROP_ENABLED:
stetho_point_crop = np.array(
[
float(row.get("earsnet_crop_stethoscope_x", 0)),
float(row.get("earsnet_crop_stethoscope_y", 0)),
]
)
norm_earsnet_crop = normalize_quadrilateral_with_point(
source_points.flatten(), stetho_point_crop
)
normalized_row["earsnet_crop_stethoscope_x"] = norm_earsnet_crop[4, 0]
normalized_row["earsnet_crop_stethoscope_y"] = norm_earsnet_crop[4, 1]
# --- Conv (Affine)
if RTMPOSE_ENABLED and YOLOX_ENABLED and CONV_ENABLED:
# すでに start_conv, end_conv は timings計測用のみ
# → ここでピクセル座標を row に書き込む
source_pts = np.array(
[
[float(row[f"{pos}_x"]), float(row[f"{pos}_y"])]
for pos in [
"left_shoulder",
"right_shoulder",
"left_hip",
"right_hip",
]
],
dtype=np.float32,
)
stetho_pt = np.array(
[float(row["stethoscope_x"]), float(row["stethoscope_y"])]
)
calc_x, calc_y = calc_position.calc_affine(source_pts, *stetho_pt)
# conv_stethoscope_x/y を row に書き込み
row["conv_stethoscope_x"] = calc_x
row["conv_stethoscope_y"] = calc_y
# --- XGBoost
if RTMPOSE_ENABLED and YOLOX_ENABLED and XGBOOST_ENABLED:
if NORMALIZE_ENABLED:
input_data_xg = pd.DataFrame([normalized_row])
else:
input_data_xg = pd.DataFrame([row])
X_scaled_x = xg_scaler_x.transform(input_data_xg[input_columns])
x_pred = xg_model_x.predict(X_scaled_x)[0]
X_scaled_y = xg_scaler_y.transform(input_data_xg[input_columns])
y_pred = xg_model_y.predict(X_scaled_y)[0]
row["Xgboost_stethoscope_x"] = x_pred
row["Xgboost_stethoscope_y"] = y_pred
# --- LightGBM
if RTMPOSE_ENABLED and YOLOX_ENABLED and LIGHTGBM_ENABLED:
if NORMALIZE_ENABLED:
input_data_lgb = pd.DataFrame([normalized_row])
else:
input_data_lgb = pd.DataFrame([row])
X_scaled_x = lgb_scaler_x.transform(input_data_lgb[input_columns])
lgb_x_pred = lgb_model_x.predict(X_scaled_x)[0]
X_scaled_y = lgb_scaler_y.transform(input_data_lgb[input_columns])
lgb_y_pred = lgb_model_y.predict(X_scaled_y)[0]
row["lightGBM_stethoscope_x"] = lgb_x_pred
row["lightGBM_stethoscope_y"] = lgb_y_pred
# row, normalized_row を最終リストに追加
rows.append(row)
normalized_rows.append(normalized_row)
processed_frames += 1
# (G) CSV書き込み
if rows:
fieldnames = list(rows[0].keys())
csvfile_path = os.path.join(results_dir, "results.csv")
normfile_path = os.path.join(results_dir, "results-convert.csv")
with (
open(csvfile_path, "w", newline="") as csvfile,
open(normfile_path, "w", newline="") as norm_csvfile,
):
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
norm_fieldnames = list(normalized_rows[0].keys())
norm_writer = csv.DictWriter(norm_csvfile, fieldnames=norm_fieldnames)
norm_writer.writeheader()
for row_, norm_row_ in zip(rows, normalized_rows):
writer.writerow(row_)
norm_writer.writerow(norm_row_)
print(f"Processed and saved results to: {csvfile_path}")
print(f"Processed and saved normalized results to: {normfile_path}")
# ★★★推論結果が出そろった後で描画 (描画時間はFPSに含めない)★★★
generate_visualizations(csvfile_path, base_dir, results_dir)
else:
print("No data to write to CSV.")
# (H) FPS計算結果をCSV保存
fps_data = []
for method_name, time_list in timings.items():
if not time_list:
continue
total_time = sum(time_list)
num_calls = len(time_list)
avg_time = total_time / num_calls if num_calls > 0 else 0
fps = 1.0 / avg_time if avg_time > 0 else 0
fps_data.append(
{
"method_name": method_name,
"num_calls": num_calls,
"total_time_sec": f"{total_time:.6f}",
"avg_time_sec": f"{avg_time:.6f}",
"fps": f"{fps:.2f}",
}
)
fps_csv_path = os.path.join(results_dir, "fps_results.csv")
with open(fps_csv_path, "w", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=[
"method_name",
"num_calls",
"total_time_sec",
"avg_time_sec",
"fps",
],
)
writer.writeheader()
for rowf in fps_data:
writer.writerow(rowf)
print("\n===== FPS Results (subcomponent & pipeline) =====")
for rowf in fps_data:
print(
f"{rowf['method_name']}: calls={rowf['num_calls']}, "
f"total={rowf['total_time_sec']}s, avg={rowf['avg_time_sec']}s, FPS={rowf['fps']}"
)
###############################################################################
# 可視化・動画化 (描画時間はFPSに含めない)
###############################################################################
def generate_visualizations(csv_path, original_images_dir, results_dir):
"""
CSVを読み込み、BodyF.png上や元フレーム上に各手法の結果を描画 → 動画化。
描画時間はFPSに含めず、ここでまとめて行う。
手法が True になっているものについては、
'_with_trajectory' と '_without_trajectory' の両動画を生成。
"""
df = pd.read_csv(csv_path)
body_image_path = "./images/body/BodyF.png"
if not os.path.exists(body_image_path):
print(f"Warning: {body_image_path} not found.")
return
# Pillow(RGB)で開く
body_img_pil = Image.open(body_image_path).convert("RGB")
body_np_rgb = np.array(body_img_pil) # RGB順
# 結果格納用ディレクトリの準備
dirs = {"marked": "marked_images"}
if CONV_ENABLED:
dirs["conv"] = "conv"
if XGBOOST_ENABLED:
dirs["Xgboost"] = "Xgboost"
if LIGHTGBM_ENABLED:
dirs["lightGBM"] = "lightGBM"
if CATBOOST_ENABLED:
dirs["catboost"] = "catboost"
if NGBOOST_ENABLED:
dirs["ngboost"] = "ngboost"
if EARSNET_ENABLED:
dirs["earsnet"] = "earsnet"
if EARSNET_CROP_ENABLED:
dirs["earsnet_crop"] = "earsnet_crop"
# 常に combined も作成
dirs["combined"] = "combined"
os.makedirs(os.path.join(results_dir, "marked_images"), exist_ok=True)
for key in dirs:
if key != "marked":
os.makedirs(
os.path.join(results_dir, f"{dirs[key]}_with_trajectory"), exist_ok=True
)
os.makedirs(
os.path.join(results_dir, f"{dirs[key]}_without_trajectory"),
exist_ok=True,
)
# ★ポイントを各メソッド別に保持(1動画につき1回リセット)
points = {key: [] for key in dirs.keys() if key not in ["marked", "combined"]}
# 色設定
colors = {
"conv": CONV_COLOR,
"Xgboost": XGBOOST_COLOR,
"lightGBM": LIGHTGBM_COLOR,
"catboost": CATBOOST_COLOR,
"ngboost": NGBOOST_COLOR,
"earsnet": EARSNET_COLOR,
"earsnet_crop": (255, 51, 255),
}
# (1) 各フレームにマーキング + BodyF.pngへ軌跡描画
for _, row in df.iterrows():
original_image_path = os.path.join(original_images_dir, row["image_file_name"])
if not os.path.exists(original_image_path):
continue
original_image = cv2.imread(original_image_path)
if original_image is None:
continue
# ---- (1-A) 各フレームへのマーキング (肩・腰・聴診器)
pil_marked = Image.fromarray(cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB))
draw_marked = ImageDraw.Draw(pil_marked)
for point in [
"left_shoulder",
"right_shoulder",
"left_hip",
"right_hip",
"stethoscope",
]:
col_x = f"{point}_x"
col_y = f"{point}_y"
if (
col_x in row
and col_y in row
and not pd.isna(row[col_x])
and not pd.isna(row[col_y])
):
x, y = int(row[col_x]), int(row[col_y])
pillow_draw_circle(draw_marked, (x, y), 5, fill=(255, 255, 0))
marked_rgb = np.array(pil_marked)
marked_bgr = cv2.cvtColor(marked_rgb, cv2.COLOR_RGB2BGR)
marked_dir = os.path.join(results_dir, "marked_images")
cv2.imwrite(os.path.join(marked_dir, row["image_file_name"]), marked_bgr)
# ---- (1-B) BodyF.png へ各メソッドの軌跡を描画
combined_image_with_traj_rgb = body_np_rgb.copy()
combined_image_without_traj_rgb = body_np_rgb.copy()
pil_with_traj = Image.fromarray(combined_image_with_traj_rgb)
pil_without_traj = Image.fromarray(combined_image_without_traj_rgb)
draw_with_traj = ImageDraw.Draw(pil_with_traj)
draw_without_traj = ImageDraw.Draw(pil_without_traj)
# 各メソッドの推定結果(earsnet, conv, xgboost, lightGBM等)を取得
for key in points.keys():
col_x = f"{key}_stethoscope_x"
col_y = f"{key}_stethoscope_y"
if col_x not in row or col_y not in row:
continue
if pd.isna(row[col_x]) or pd.isna(row[col_y]):
continue
x, y = int(row[col_x]), int(row[col_y])
points[key].append((x, y))
color = colors[key] if key in colors else (0, 0, 255)
# (A) 個別 with trajectory
indiv_with_traj_rgb = body_np_rgb.copy()
pil_indiv_with = Image.fromarray(indiv_with_traj_rgb)
draw_indiv_with = ImageDraw.Draw(pil_indiv_with)
if len(points[key]) > 1:
pillow_draw_polyline(draw_indiv_with, points[key], color=color, width=2)
pillow_draw_circle(draw_indiv_with, (x, y), 10, fill=color)
indiv_with_traj_np = np.array(pil_indiv_with)
indiv_with_traj_bgr = cv2.cvtColor(indiv_with_traj_np, cv2.COLOR_RGB2BGR)
out_path_with = os.path.join(
results_dir, f"{dirs[key]}_with_trajectory", row["image_file_name"]
)
cv2.imwrite(out_path_with, indiv_with_traj_bgr)
# (B) 個別 without trajectory
indiv_without_traj_rgb = body_np_rgb.copy()
pil_indiv_without = Image.fromarray(indiv_without_traj_rgb)
draw_indiv_without = ImageDraw.Draw(pil_indiv_without)
pillow_draw_circle(draw_indiv_without, (x, y), 10, fill=color)
indiv_without_traj_np = np.array(pil_indiv_without)
indiv_without_traj_bgr = cv2.cvtColor(
indiv_without_traj_np, cv2.COLOR_RGB2BGR
)
out_path_without = os.path.join(
results_dir, f"{dirs[key]}_without_trajectory", row["image_file_name"]
)
cv2.imwrite(out_path_without, indiv_without_traj_bgr)
# (C) combined with trajectory
if len(points[key]) > 1:
pillow_draw_polyline(draw_with_traj, points[key], color=color, width=2)
pillow_draw_circle(draw_with_traj, (x, y), 10, fill=color)
# (D) combined without trajectory
pillow_draw_circle(draw_without_traj, (x, y), 10, fill=color)
# 結果 (pil_with_traj / pil_without_traj) を BGR に変換して保存
cwt_np = np.array(pil_with_traj)
cwt_bgr = cv2.cvtColor(cwt_np, cv2.COLOR_RGB2BGR)
cwd = os.path.join(results_dir, "combined_with_trajectory")
os.makedirs(cwd, exist_ok=True)
cv2.imwrite(os.path.join(cwd, row["image_file_name"]), cwt_bgr)
cwo_np = np.array(pil_without_traj)
cwo_bgr = cv2.cvtColor(cwo_np, cv2.COLOR_RGB2BGR)
cwod = os.path.join(results_dir, "combined_without_trajectory")
os.makedirs(cwod, exist_ok=True)
cv2.imwrite(os.path.join(cwod, row["image_file_name"]), cwo_bgr)
# (2) 動画化
create_video_from_images(
os.path.join(results_dir, "marked_images"),
os.path.join(results_dir, "marked_video.mp4"),
)
for key in dirs:
if key not in ["marked", "combined"]:
create_video_from_images(
os.path.join(results_dir, f"{dirs[key]}_with_trajectory"),
os.path.join(results_dir, f"{key}_video_with_trajectory.mp4"),
)
create_video_from_images(
os.path.join(results_dir, f"{dirs[key]}_without_trajectory"),
os.path.join(results_dir, f"{key}_video_without_trajectory.mp4"),
)
def create_video_from_images(image_dir, output_path):
if not os.path.exists(image_dir):
return
images = sorted(
[img for img in os.listdir(image_dir) if img.endswith(".png")],
key=lambda x: int(re.search(r"(\d+)", x).group()),
)
if not images:
print(f"No images found in {image_dir}")
return
frame = cv2.imread(os.path.join(image_dir, images[0]))
if frame is None:
print(f"Failed to read the first image in {image_dir}")
return
height, width, _ = frame.shape
video = cv2.VideoWriter(
output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height)
)
for image in images:
img_path = os.path.join(image_dir, image)
img = cv2.imread(img_path)
if img is not None:
video.write(img)
video.release()
print(f"Created video: {output_path}")
###############################################################################
# メイン
###############################################################################
def main():
parser = argparse.ArgumentParser(description="Process video and generate results.")
parser.add_argument(
"--video_path",
default="./video/tes.mp4",
help="Path to the input video file",
)
parser.add_argument(
"--output_dir",
default="output",
help="Directory to save output images and results",
)
# RTMpose用
det_config = "modules/rtmpose/mmdetection_cfg/rtmdet_m_640-8xb32_coco-person.py"
det_checkpoint = (
"models/rtmpose/rtmdet_m_8xb32-100e_coco-obj365-person-235e8209.pth"
)
pose_config = (
"modules/rtmpose/configs/body_2d_keypoint/rtmpose/body8/"
"rtmpose-l_8xb256-420e_body8-256x192.py"
)
pose_checkpoint = "models/rtmpose/rtmpose-l_simcc-aic-coco_pt-aic-coco_420e-256x192-f016ffe0_20230126.pth"
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
# (1) FPSモニタースレッド (推論のみ計測)
fps_thread = Thread(target=fps_monitor, args=(1.0,), daemon=True)
fps_thread.start()
# (2) 動画→フレーム
frames_dir = os.path.join(args.output_dir, "frames")
video_to_frames(args.video_path, frames_dir)
# (3) RTMposeの初期化 (必要であれば)
if RTMPOSE_ENABLED:
detector = init_detector(det_config, det_checkpoint, device=DEVICE)
detector.cfg = adapt_mmdet_pipeline(detector.cfg)
pose_estimator = init_pose_estimator(
pose_config, pose_checkpoint, device=DEVICE
)
visualizer = VISUALIZERS.build(pose_estimator.cfg.visualizer)
visualizer.set_dataset_meta(
pose_estimator.dataset_meta, skeleton_style="mmpose"
)
process_images(args, detector, pose_estimator, visualizer)
else:
# PoseNet or no keypoints usage
process_images(args, None, None, None)
# (4) スレッド終了
global stop_fps_thread
stop_fps_thread = True
fps_thread.join()
print("All done.")
if __name__ == "__main__":
main()