diff --git a/main.py b/main.py index 1bb4624..2ebd93f 100644 --- a/main.py +++ b/main.py @@ -158,7 +158,6 @@ draw.ellipse([left_up, right_down], fill=fill, outline=outline, width=width) -### 変更点 1) カスタム描画用の関数を用意 def draw_custom_circle(draw, center, style): """ style辞書により,丸の描画スタイルを変える。 @@ -180,8 +179,7 @@ x, y = int(center[0]), int(center[1]) outer_radius = radius + outline_width # 外枠込みの半径 - # まず外枠用の円(太めの枠)を描く(fillはなし) - # fill=None なので枠線のみ + # 外枠用の円(太めの枠)を描く pillow_draw_circle( draw, (x, y), @@ -199,16 +197,10 @@ pillow_draw_circle(draw, (x, y), radius, fill=None, outline=color, width=2) elif fill_type == "striped": # 簡易的な縞々を実装 (円内に斜線を引く) - # 円のバウンディングボックスで少しピッチ小さめの線を引く - # fillなし下地に線だけ。 pillow_draw_circle(draw, (x, y), radius, fill=None, outline=color, width=1) - # 縞々を引く - spacing = 4 # 縞の間隔 + spacing = 4 left_up = (x - radius, y - radius) right_down = (x + radius, y + radius) - - # PILで斜線を引き,円の外にはみ出す部分はあえてそのままに(簡易実装) - # 正確に円内のみ線を引く場合はマスクを使うなど工夫が必要 lx, ly = left_up rx, ry = right_down for i in range(int(rx - lx)): @@ -247,7 +239,6 @@ if stethoscope_x is not None and stethoscope_y is not None: x, y = int(stethoscope_x), int(stethoscope_y) - # ここでは簡易的に fill_type='fill' の赤色マーカーを描画 style_temp = { "radius": 8, "fill_type": "fill", @@ -504,9 +495,9 @@ "pipeline_rtmpose_yolox_conv": [], "pipeline_rtmpose_yolox_lightgbm": [], "pipeline_rtmpose_yolox_xgboost": [], - "pipeline_rtmpose_ssd_conv": [], # SSD 用 追加 - "pipeline_rtmpose_ssd_lightgbm": [], # SSD 用 追加 - "pipeline_rtmpose_ssd_xgboost": [], # SSD 用 追加 + "pipeline_rtmpose_ssd_conv": [], + "pipeline_rtmpose_ssd_lightgbm": [], + "pipeline_rtmpose_ssd_xgboost": [], "pipeline_earsnet": [], "pipeline_earsnet_cropped": [], } @@ -524,6 +515,10 @@ xg_scaler_x = load_scaler("./models/XGBoost/scaler-x.pkl") xg_scaler_y = load_scaler("./models/XGBoost/scaler-y.pkl") + # (catboost や NGBoost も使う場合はここでロードする想定) + # if CATBOOST_ENABLED: ... + # if NGBOOST_ENABLED: ... + if EARSNET_ENABLED: earsnet_predictor = EARSNetPredictor( weight_path="models/EARSNet/best_model.pth", @@ -553,7 +548,6 @@ "stethoscope_y", ] - # メインループ for image_file_name in png_files: image_path = os.path.join(base_dir, image_file_name) frame = cv2.imread(image_path) @@ -561,7 +555,7 @@ print(f"Failed to load image: {image_path}") continue - # ============= (A) RTMpose or PoseNet で人体キーポイント ============= + # (A) RTMpose or PoseNet rtmpose_time = 0.0 pose_keypoints = None @@ -626,7 +620,6 @@ timings["rtmpose_single"].append(rtmpose_time) # landmarks = [right_shoulder, left_shoulder, right_hip, left_hip] (例) - # 環境に合わせてインデックスを調整する left_shoulder = (landmarks[0][1], landmarks[0][0]) right_shoulder = (landmarks[1][1], landmarks[1][0]) left_hip = (landmarks[2][1], landmarks[2][0]) @@ -643,28 +636,24 @@ left_hip = (0, 0) right_hip = (0, 0) - # ============= (B) 聴診器検出: YOLOX or SSD ============= + # (B) 聴診器検出: YOLOX or SSD stethoscope_x, stethoscope_y = 0, 0 detection_time = 0.0 - # 1) YOLOX - if YOLOX_ENABLED: - if pose_keypoints is not None: - start_t = time.time() - from_yolox_img, stethoscope_x, stethoscope_y = yolox_detector_inference( - frame, yolox_inferencer, pose_keypoints - ) - end_t = time.time() - detection_time = end_t - start_t - timings["yolox_single"].append(detection_time) + if YOLOX_ENABLED and pose_keypoints is not None: + start_t = time.time() + from_yolox_img, stethoscope_x, stethoscope_y = yolox_detector_inference( + frame, yolox_inferencer, pose_keypoints + ) + end_t = time.time() + detection_time = end_t - start_t + timings["yolox_single"].append(detection_time) - # 可視化 - cv2.imwrite( - os.path.join(stethoscope_overlay_dir, image_file_name), - from_yolox_img, - ) + cv2.imwrite( + os.path.join(stethoscope_overlay_dir, image_file_name), + from_yolox_img, + ) - # 2) SSD (MobileNetV1) elif MOBILENETV1SSD_ENABLED: start_t = time.time() from_ssd_img, stethoscope_x, stethoscope_y = ssd_detector_inference( @@ -674,16 +663,14 @@ detection_time = end_t - start_t timings["ssd_single"].append(detection_time) - # 可視化 cv2.imwrite( os.path.join(stethoscope_overlay_dir, image_file_name), from_ssd_img, ) - # Pipeline で合計検出時間 detection_time_rtmpose_detector = rtmpose_time + detection_time - # ============= (C) EARSNet (単体) ============= + # (C) EARSNet (単体) if EARSNET_ENABLED: start_time_earsnet = time.time() earsnet_x, earsnet_y = earsnet_predictor.predict(image_path) @@ -694,7 +681,7 @@ else: earsnet_x, earsnet_y = 0, 0 - # ============= (D) EARSNet (クロップ) ============= + # (D) EARSNet (クロップ) if EARSNET_CROP_ENABLED: cropped_img, (crop_xmin, crop_ymin) = crop_body_from_keypoints( frame, left_shoulder, right_shoulder, left_hip, right_hip @@ -715,7 +702,7 @@ else: earsnet_cropped_x, earsnet_cropped_y = 0, 0 - # ============= (E) リザルト保存用 row を組み立て ============= + # ============= リザルト保存用 row を組み立て ============= row = { "image_file_name": image_file_name, "left_shoulder_x": left_shoulder[0], @@ -732,15 +719,16 @@ "earsnet_stethoscope_y": earsnet_y, "earsnet_crop_stethoscope_x": earsnet_cropped_x, "earsnet_crop_stethoscope_y": earsnet_cropped_y, + # 後で "conv_stethoscope_x", "lightGBM_stethoscope_x" などを追加 } # ============= (F) 正規化 ============= source_points = np.array( [ - [float(row["left_shoulder_x"]), float(row["left_shoulder_y"])], - [float(row["right_shoulder_x"]), float(row["right_shoulder_y"])], - [float(row["left_hip_x"]), float(row["left_hip_y"])], - [float(row["right_hip_x"]), float(row["right_hip_y"])], + [row["left_shoulder_x"], row["left_shoulder_y"]], + [row["right_shoulder_x"], row["right_shoulder_y"]], + [row["left_hip_x"], row["left_hip_y"]], + [row["right_hip_x"], row["right_hip_y"]], ], dtype=np.float32, ) @@ -784,128 +772,110 @@ normalized_row["earsnet_crop_stethoscope_x"] = norm_earsnet_crop[4, 0] normalized_row["earsnet_crop_stethoscope_y"] = norm_earsnet_crop[4, 1] - # ============= (G) パイプライン (Conv/XGBoost/LightGBM) ============= - # YOLOX vs. SSD で結果保存先のキーが違うのでここで使う変数を決める - # RTMPose + YOLOX or SSD - if RTMPOSE_ENABLED: - # YOLOX - if YOLOX_ENABLED and pose_keypoints is not None: - # conv - if CONV_ENABLED: - start_conv = time.time() - sp = np.array( - [ - [row["left_shoulder_x"], row["left_shoulder_y"]], - [row["right_shoulder_x"], row["right_shoulder_y"]], - [row["left_hip_x"], row["left_hip_y"]], - [row["right_hip_x"], row["right_hip_y"]], - ], - dtype=np.float32, - ) - stp = np.array([row["stethoscope_x"], row["stethoscope_y"]]) - _ = calc_position.calc_affine(sp, *stp) - end_conv = time.time() - conv_time = end_conv - start_conv - timings["conv_single"].append(conv_time) + # (G) パイプライン (Conv / XGBoost / LightGBM など) + # ------------------------------------------------ + # 例: conv + if RTMPOSE_ENABLED and (YOLOX_ENABLED or MOBILENETV1SSD_ENABLED): + # Conv + if CONV_ENABLED: + start_conv = time.time() + sp = np.array( + [ + [row["left_shoulder_x"], row["left_shoulder_y"]], + [row["right_shoulder_x"], row["right_shoulder_y"]], + [row["left_hip_x"], row["left_hip_y"]], + [row["right_hip_x"], row["right_hip_y"]], + ], + dtype=np.float32, + ) + stp = np.array([row["stethoscope_x"], row["stethoscope_y"]]) + # ここで実際に計算 + conv_calc_x, conv_calc_y = calc_position.calc_affine(sp, *stp) + end_conv = time.time() + + # 時間と結果を記録 + row["conv_stethoscope_x"] = conv_calc_x + row["conv_stethoscope_y"] = conv_calc_y + + conv_time = end_conv - start_conv + timings["conv_single"].append(conv_time) + if YOLOX_ENABLED: timings["pipeline_rtmpose_yolox_conv"].append( rtmpose_time + detection_time + conv_time ) - - # XGBoost - if XGBOOST_ENABLED: - xg_start = time.time() - if NORMALIZE_ENABLED: - input_data_xg = pd.DataFrame([normalized_row]) - else: - input_data_xg = pd.DataFrame([row]) - X_scaled_x = xg_scaler_x.transform(input_data_xg[input_columns]) - _ = xg_model_x.predict(X_scaled_x)[0] - X_scaled_y = xg_scaler_y.transform(input_data_xg[input_columns]) - _ = xg_model_y.predict(X_scaled_y)[0] - xg_end = time.time() - xg_time = xg_end - xg_start - timings["xgboost_single"].append(xg_time) - timings["pipeline_rtmpose_yolox_xgboost"].append( - rtmpose_time + detection_time + xg_time - ) - - # LightGBM - if LIGHTGBM_ENABLED: - lgb_start = time.time() - if NORMALIZE_ENABLED: - input_data_lgb = pd.DataFrame([normalized_row]) - else: - input_data_lgb = pd.DataFrame([row]) - X_scaled_x = lgb_scaler_x.transform(input_data_lgb[input_columns]) - _ = lgb_model_x.predict(X_scaled_x)[0] - X_scaled_y = lgb_scaler_y.transform(input_data_lgb[input_columns]) - _ = lgb_model_y.predict(X_scaled_y)[0] - lgb_end = time.time() - lgb_time = lgb_end - lgb_start - timings["lightgbm_single"].append(lgb_time) - timings["pipeline_rtmpose_yolox_lightgbm"].append( - rtmpose_time + detection_time + lgb_time - ) - - # SSD - elif MOBILENETV1SSD_ENABLED: - # conv - if CONV_ENABLED: - start_conv = time.time() - sp = np.array( - [ - [row["left_shoulder_x"], row["left_shoulder_y"]], - [row["right_shoulder_x"], row["right_shoulder_y"]], - [row["left_hip_x"], row["left_hip_y"]], - [row["right_hip_x"], row["right_hip_y"]], - ], - dtype=np.float32, - ) - stp = np.array([row["stethoscope_x"], row["stethoscope_y"]]) - _ = calc_position.calc_affine(sp, *stp) - end_conv = time.time() - conv_time = end_conv - start_conv - timings["conv_single"].append(conv_time) + else: timings["pipeline_rtmpose_ssd_conv"].append( rtmpose_time + detection_time + conv_time ) - # XGBoost - if XGBOOST_ENABLED: - xg_start = time.time() - if NORMALIZE_ENABLED: - input_data_xg = pd.DataFrame([normalized_row]) - else: - input_data_xg = pd.DataFrame([row]) - X_scaled_x = xg_scaler_x.transform(input_data_xg[input_columns]) - _ = xg_model_x.predict(X_scaled_x)[0] - X_scaled_y = xg_scaler_y.transform(input_data_xg[input_columns]) - _ = xg_model_y.predict(X_scaled_y)[0] - xg_end = time.time() - xg_time = xg_end - xg_start - timings["xgboost_single"].append(xg_time) + # XGBoost + if XGBOOST_ENABLED: + xg_start = time.time() + if NORMALIZE_ENABLED: + input_data_xg = pd.DataFrame([normalized_row]) + else: + input_data_xg = pd.DataFrame([row]) + + X_scaled_x = xg_scaler_x.transform(input_data_xg[input_columns]) + xg_calc_x = xg_model_x.predict(X_scaled_x)[0] + X_scaled_y = xg_scaler_y.transform(input_data_xg[input_columns]) + xg_calc_y = xg_model_y.predict(X_scaled_y)[0] + + # 推論結果を row に格納 + row["Xgboost_stethoscope_x"] = xg_calc_x + row["Xgboost_stethoscope_y"] = xg_calc_y + + xg_end = time.time() + xg_time = xg_end - xg_start + timings["xgboost_single"].append(xg_time) + if YOLOX_ENABLED: + timings["pipeline_rtmpose_yolox_xgboost"].append( + rtmpose_time + detection_time + xg_time + ) + else: timings["pipeline_rtmpose_ssd_xgboost"].append( rtmpose_time + detection_time + xg_time ) - # LightGBM - if LIGHTGBM_ENABLED: - lgb_start = time.time() - if NORMALIZE_ENABLED: - input_data_lgb = pd.DataFrame([normalized_row]) - else: - input_data_lgb = pd.DataFrame([row]) - X_scaled_x = lgb_scaler_x.transform(input_data_lgb[input_columns]) - _ = lgb_model_x.predict(X_scaled_x)[0] - X_scaled_y = lgb_scaler_y.transform(input_data_lgb[input_columns]) - _ = lgb_model_y.predict(X_scaled_y)[0] - lgb_end = time.time() - lgb_time = lgb_end - lgb_start - timings["lightgbm_single"].append(lgb_time) + # LightGBM + if LIGHTGBM_ENABLED: + lgb_start = time.time() + if NORMALIZE_ENABLED: + input_data_lgb = pd.DataFrame([normalized_row]) + else: + input_data_lgb = pd.DataFrame([row]) + + X_scaled_x = lgb_scaler_x.transform(input_data_lgb[input_columns]) + lgb_calc_x = lgb_model_x.predict(X_scaled_x)[0] + X_scaled_y = lgb_scaler_y.transform(input_data_lgb[input_columns]) + lgb_calc_y = lgb_model_y.predict(X_scaled_y)[0] + + # 推論結果を row に格納 + row["lightGBM_stethoscope_x"] = lgb_calc_x + row["lightGBM_stethoscope_y"] = lgb_calc_y + + lgb_end = time.time() + lgb_time = lgb_end - lgb_start + timings["lightgbm_single"].append(lgb_time) + if YOLOX_ENABLED: + timings["pipeline_rtmpose_yolox_lightgbm"].append( + rtmpose_time + detection_time + lgb_time + ) + else: timings["pipeline_rtmpose_ssd_lightgbm"].append( rtmpose_time + detection_time + lgb_time ) + # (catboost, NGBoost も同様に実装する想定) + # if CATBOOST_ENABLED: + # ... + # if NGBOOST_ENABLED: + # ... + + # ------------------------------------------------ + # 正規化後の値も同様に xg_calc_x, lgb_calc_x などから計算したい場合は同じく格納 + # ただしここでは簡易的にraw座標のみを格納している。 + rows.append(row) # 正規化後データ normalized_rows.append(normalized_row) @@ -997,6 +967,7 @@ ・聴診器検出だけ描画した `marked_stethoscope_images` も生成&動画化する。 """ + import pandas as pd df = pd.read_csv(csv_path) body_image_path = "./images/body/BodyF.png" @@ -1004,11 +975,9 @@ print(f"Warning: {body_image_path} not found.") return - # Pillow(RGB)で開く body_img_pil = Image.open(body_image_path).convert("RGB") body_np_rgb = np.array(body_img_pil) # RGB順 - # 出力先フォルダ dirs = {"marked": "marked_images"} if CONV_ENABLED: dirs["conv"] = "conv" @@ -1026,7 +995,6 @@ dirs["earsnet_crop"] = "earsnet_crop" dirs["combined"] = "combined" - # 追加フォルダ pose_only_dir = "marked_pose_images" stetho_only_dir = "marked_stethoscope_images" @@ -1044,18 +1012,15 @@ exist_ok=True, ) - # 各手法の描画スタイル設定 (色や半径,塗りつぶしの有無など) - ### 変更点 2) ここで各手法ごとの描画スタイルを指定 + # 手法ごとの描画スタイル marker_styles = { - # 例: conv手法は赤い塗りつぶし "conv": { "radius": 8, - "fill_type": "fill", # 'fill', 'outline', 'striped' - "color": CONV_COLOR, # config.CONV_COLOR を使う + "fill_type": "fill", + "color": CONV_COLOR, "outline_color": CONV_COLOR, "outline_width": 2, }, - # 例: Xgboostは青の枠のみ "Xgboost": { "radius": 8, "fill_type": "outline", @@ -1063,7 +1028,6 @@ "outline_color": XGBOOST_COLOR, "outline_width": 2, }, - # 例: lightGBMは緑の縞々 "lightGBM": { "radius": 8, "fill_type": "striped", @@ -1081,13 +1045,26 @@ "earsnet_crop": { "radius": 8, "fill_type": "outline", - "color": EARSNET_CROP_COLOR, # 例としてピンク色 + "color": EARSNET_CROP_COLOR, "outline_color": EARSNET_CROP_COLOR, "outline_width": 2, }, + "catboost": { + "radius": 8, + "fill_type": "fill", + "color": CATBOOST_COLOR, + "outline_color": CATBOOST_COLOR, + "outline_width": 2, + }, + "ngboost": { + "radius": 8, + "fill_type": "fill", + "color": NGBOOST_COLOR, + "outline_color": NGBOOST_COLOR, + "outline_width": 2, + }, } - # デフォルトスタイル(該当keyが無い場合用) default_style = { "radius": 8, "fill_type": "fill", @@ -1096,28 +1073,19 @@ "outline_width": 2, } - # 各手法の過去フレーム座標(軌跡描画用) points = {key: [] for key in dirs.keys() if key not in ["marked", "combined"]} - # 色固定 pose_color_rgb = (33, 95, 154) stetho_color_rgb = (19, 80, 27) def draw_glow_marker(draw, center, main_color, radius=5): - """ - 旧実装の姿勢や聴診器用マーカー(特大に光彩付き)。 - ここはポーズ・聴診器用(デモ)として残しています。 - """ outer_radius = radius + 3 x, y = int(center[0]), int(center[1]) - # 白枠 pillow_draw_circle( draw, (x, y), outer_radius, fill=None, outline=(255, 255, 255), width=2 ) - # 中心塗りつぶし pillow_draw_circle(draw, (x, y), radius, fill=main_color) - # CSV行ごとに画像を生成 for _, row in df.iterrows(): original_image_path = os.path.join(original_images_dir, row["image_file_name"]) if not os.path.exists(original_image_path): @@ -1126,11 +1094,10 @@ if original_image is None: continue - # 1) marked_images(肩/腰/聴診器) + # (1) marked_images: 肩/腰/聴診器 pil_marked = Image.fromarray(cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)) draw_marked = ImageDraw.Draw(pil_marked) - # 肩・腰・聴診器を一括で for point in [ "left_shoulder", "right_shoulder", @@ -1147,7 +1114,6 @@ and not pd.isna(row[col_y]) ): x, y = int(row[col_x]), int(row[col_y]) - # ここはシンプルに塗りつぶし丸などにする draw_glow_marker( draw_marked, (x, y), main_color=(255, 255, 0), radius=5 ) @@ -1157,7 +1123,7 @@ marked_dir = os.path.join(results_dir, "marked_images") cv2.imwrite(os.path.join(marked_dir, row["image_file_name"]), marked_bgr) - # 2) marked_pose_images(姿勢のみ) + # (2) marked_pose_images: 肩/腰のみ pil_pose = Image.fromarray(cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)) draw_pose = ImageDraw.Draw(pil_pose) @@ -1180,7 +1146,7 @@ pose_dir_path = os.path.join(results_dir, pose_only_dir) cv2.imwrite(os.path.join(pose_dir_path, row["image_file_name"]), pose_bgr) - # 3) marked_stethoscope_images(聴診器のみ) + # (3) marked_stethoscope_images: 聴診器のみ pil_stetho = Image.fromarray(cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)) draw_stetho = ImageDraw.Draw(pil_stetho) @@ -1201,7 +1167,7 @@ stetho_dir_path = os.path.join(results_dir, stetho_only_dir) cv2.imwrite(os.path.join(stetho_dir_path, row["image_file_name"]), stetho_bgr) - # 4) combined系 + # (4) combined系 combined_image_with_traj_rgb = body_np_rgb.copy() combined_image_without_traj_rgb = body_np_rgb.copy() @@ -1210,7 +1176,7 @@ draw_with_traj = ImageDraw.Draw(pil_with_traj) draw_without_traj = ImageDraw.Draw(pil_without_traj) - # 各手法の聴診器推定座標を描画 + # 各手法: conv, Xgboost, lightGBM, earsnet, earsnet_crop, catboost, ngboost etc. for key in points.keys(): col_x = f"{key}_stethoscope_x" col_y = f"{key}_stethoscope_y" @@ -1229,12 +1195,10 @@ pil_indiv_with = Image.fromarray(indiv_with_traj_rgb) draw_indiv_with = ImageDraw.Draw(pil_indiv_with) - # 軌跡 (過去フレーム分) if len(points[key]) > 1: pillow_draw_polyline( draw_indiv_with, points[key], color=style["color"], width=2 ) - # 現フレームのマーカー draw_custom_circle(draw_indiv_with, (x, y), style) indiv_with_traj_np = np.array(pil_indiv_with)