diff --git a/config.py b/config.py index 18aa43a..9de56cc 100644 --- a/config.py +++ b/config.py @@ -4,7 +4,7 @@ DEVICE = "cuda:0" # Colors for different models (R,G,B format) -CONV_COLOR = (0, 255, 0) # Green +CONV_COLOR = (255, 165, 0) # Green XGBOOST_COLOR = (255, 0, 0) # Red LIGHTGBM_COLOR = (0, 0, 255) # Blue RESNET_COLOR = (255, 165, 0) # Orange @@ -17,10 +17,10 @@ XGBOOST_ENABLED = False LIGHTGBM_ENABLED = False NORMALIZE_ENABLED = False -POSENET_ENABLED = True -RTMPOSE_ENABLED = False -MOBILENETV1SSD_ENABLED = True -YOLOX_ENABLED = False +POSENET_ENABLED = False +RTMPOSE_ENABLED = True +MOBILENETV1SSD_ENABLED = False +YOLOX_ENABLED = True # Neural network model settings RESNET_ENABLED = False diff --git a/main-cnn.py b/main-cnn.py deleted file mode 100644 index abb7445..0000000 --- a/main-cnn.py +++ /dev/null @@ -1,348 +0,0 @@ -import argparse -import csv -import os -import re - -import cv2 -import numpy as np -import pandas as pd -import torch -from dotenv import load_dotenv -from PIL import Image -from torchvision import transforms - -from modules.EARSForDL.EfficientNet import RegressionEfficientNet -from modules.EARSForDL.MobileNetV2 import RegressionMobileNetV2 -from modules.EARSForDL.ResNet import RegressionResNet -from modules.EARSForDL.SqueezeNet import RegressionSqueezeNet - -# RTMPose imports - -# Load environment variables -load_dotenv() - -# Get colors from environment variables -RESNET_COLOR = tuple( - map(int, os.getenv("RESNET_COLOR", "255,165,0").split(",")) -) # Orange for ResNet -EFFICIENTNET_COLOR = tuple( - map(int, os.getenv("EFFICIENTNET_COLOR", "0,0,255").split(",")) -) # Blue for EfficientNet -MOBILENET_COLOR = tuple( - map(int, os.getenv("MOBILENET_COLOR", "255,0,0").split(",")) -) # Red for MobileNet -SQUEEZENET_COLOR = tuple( - map(int, os.getenv("SQUEEZENET_COLOR", "128,0,128").split(",")) -) # Purple for SqueezeNet - -# Get model execution settings from environment variables -RESNET_ENABLED = os.getenv("RESNET_ENABLED", "True").lower() == "true" -EFFICIENTNET_ENABLED = os.getenv("EFFICIENTNET_ENABLED", "True").lower() == "true" -MOBILENET_ENABLED = os.getenv("MOBILENET_ENABLED", "True").lower() == "true" -SQUEEZENET_ENABLED = os.getenv("SQUEEZENET_ENABLED", "True").lower() == "true" - -# Get normalization setting -NORMALIZE_ENABLED = os.getenv("NORMALIZE_ENABLED", "False").lower() == "true" - - -def normalize_quadrilateral_with_point(points, extra_point): - all_points = np.vstack([points.reshape(-1, 2), extra_point]) - center = np.mean(points.reshape(-1, 2), axis=0) - centered_points = all_points - center - - shoulder_angle = calculate_rotation_angle(centered_points[0], centered_points[1]) - hip_angle = calculate_rotation_angle(centered_points[2], centered_points[3]) - average_angle = (shoulder_angle + hip_angle) / 2 - - rotation_matrix = np.array( - [ - [np.cos(-average_angle), -np.sin(-average_angle)], - [np.sin(-average_angle), np.cos(-average_angle)], - ] - ) - - rotated_points = np.dot(centered_points, rotation_matrix.T) - max_edge_length = np.max( - np.linalg.norm( - np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1 - ) - ) - return rotated_points / max_edge_length - - -def calculate_rotation_angle(point1, point2): - vector = point2 - point1 - return np.arctan2(vector[1], vector[0]) - - -def video_to_frames(video_path, output_dir): - os.makedirs(output_dir, exist_ok=True) - video = cv2.VideoCapture(video_path) - if not video.isOpened(): - raise IOError(f"Could not open video file: {video_path}") - - frame_num = 0 - while True: - success, frame = video.read() - if not success: - break - frame_num += 1 - cv2.imwrite(os.path.join(output_dir, f"{frame_num}-frame.png"), frame) - - video.release() - print(f"All frames saved to {output_dir}") - - -def preprocess_image(image_path): - transform = transforms.Compose( - [ - transforms.Resize((224, 224)), - transforms.ToTensor(), - transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), - ] - ) - return transform(Image.open(image_path).convert("RGB")).unsqueeze(0) - - -def extract_keypoints_rtmpose(pose_results): - if not pose_results: - print("No pose results found.") - return None - - max_avg_visible = 0 - best_instance = None - for result in pose_results: - pred_instances = result.pred_instances - for instance in pred_instances: - avg_visible = np.mean(instance.keypoints_visible) - if avg_visible > max_avg_visible: - max_avg_visible = avg_visible - best_instance = instance - - if best_instance is None: - print("No valid instances found.") - return None - - keypoints = best_instance.keypoints[0] - return keypoints - - -def process_images(args): - print("Starting process_images function...") - base_dir = os.path.join(args.output_dir, "frames") - results_dir = os.path.join(args.output_dir, "results") - csv_path = os.path.join(results_dir, "results.csv") - - # Load enabled models - device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") - models = {} - - if RESNET_ENABLED: - resnet_model = RegressionResNet(resnet_depth=18) - resnet_model.load_state_dict( - torch.load("./models/best_model-resnet.pth", map_location=device) - ) - resnet_model.to(device) - resnet_model.eval() - models["resnet"] = resnet_model - - if EFFICIENTNET_ENABLED: - efficientnet_model = RegressionEfficientNet("b1") - efficientnet_model.load_state_dict( - torch.load("./models/best_model-efficient.pth", map_location=device) - ) - efficientnet_model.to(device) - efficientnet_model.eval() - models["efficientnet"] = efficientnet_model - - if MOBILENET_ENABLED: - mobilenet_model = RegressionMobileNetV2() - mobilenet_model.load_state_dict( - torch.load("./models/best_model-mobilenetV2.pth", map_location=device) - ) - mobilenet_model.to(device) - mobilenet_model.eval() - models["mobilenet"] = mobilenet_model - - if SQUEEZENET_ENABLED: - squeezenet_model = RegressionSqueezeNet("1_1") - squeezenet_model.load_state_dict( - torch.load("./models/best_model-squeeze.pth", map_location=device) - ) - squeezenet_model.to(device) - squeezenet_model.eval() - models["squeezenet"] = squeezenet_model - - os.makedirs(results_dir, exist_ok=True) - - png_files = sorted( - [f for f in os.listdir(base_dir) if f.lower().endswith(".png")], - key=lambda x: int(re.search(r"(\d+)", x).group(1)), - ) - print(f"Found {len(png_files)} PNG files.") - - rows = [] - for image_file_name in png_files: - print(f"Processing image: {image_file_name}") - image_path = os.path.join(base_dir, image_file_name) - frame = cv2.imread(image_path) - if frame is None: - print(f"Failed to load image: {image_path}") - continue - - # Get predictions from all enabled models - processed_image = preprocess_image(image_path).to(device) - row = {"image_file_name": image_file_name} - - with torch.no_grad(): - for model_name, model in models.items(): - output = model(processed_image) - coords = output[0].cpu().numpy() - row[f"{model_name}_stethoscope_x"] = int(coords[0]) - row[f"{model_name}_stethoscope_y"] = int(coords[1]) - - rows.append(row) - - if rows: - fieldnames = list(rows[0].keys()) - with open(csv_path, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - for row in rows: - writer.writerow(row) - - print(f"Processed and saved results to: {csv_path}") - generate_visualizations(csv_path, base_dir, results_dir) - else: - print("No data to write to CSV.") - - -def generate_visualizations(csv_path, original_images_dir, results_dir): - df = pd.read_csv(csv_path) - body_image = cv2.imread("./images/body/BodyF.png") - - # Define directories and colors for enabled models - dirs = {} - colors = {} - - if RESNET_ENABLED: - dirs["resnet"] = "resnet" - colors["resnet"] = RESNET_COLOR - if EFFICIENTNET_ENABLED: - dirs["efficientnet"] = "efficientnet" - colors["efficientnet"] = EFFICIENTNET_COLOR - if MOBILENET_ENABLED: - dirs["mobilenet"] = "mobilenet" - colors["mobilenet"] = MOBILENET_COLOR - if SQUEEZENET_ENABLED: - dirs["squeezenet"] = "squeezenet" - colors["squeezenet"] = SQUEEZENET_COLOR - - # Create output directories - for key in dirs: - os.makedirs( - os.path.join(results_dir, f"{dirs[key]}_with_trajectory"), exist_ok=True - ) - os.makedirs( - os.path.join(results_dir, f"{dirs[key]}_without_trajectory"), exist_ok=True - ) - - points = {key: [] for key in dirs.keys()} - - for _, row in df.iterrows(): - # Process each prediction method - for key in points: - x = int(row[f"{key}_stethoscope_x"]) - y = int(row[f"{key}_stethoscope_y"]) - points[key].append((x, y)) - - # Draw with trajectory - image_with_trajectory = body_image.copy() - if len(points[key]) > 1: - cv2.polylines( - image_with_trajectory, - [np.array(points[key])], - False, - colors[key], - 2, - ) - cv2.circle(image_with_trajectory, (x, y), 10, colors[key], -1) - cv2.imwrite( - os.path.join( - results_dir, f"{dirs[key]}_with_trajectory", row["image_file_name"] - ), - image_with_trajectory, - ) - - # Draw without trajectory - image_without_trajectory = body_image.copy() - cv2.circle(image_without_trajectory, (x, y), 10, colors[key], -1) - cv2.imwrite( - os.path.join( - results_dir, - f"{dirs[key]}_without_trajectory", - row["image_file_name"], - ), - image_without_trajectory, - ) - - # Create videos for all methods - for key in dirs: - create_video_from_images( - os.path.join(results_dir, f"{dirs[key]}_with_trajectory"), - os.path.join(results_dir, f"{key}_video_with_trajectory.mp4"), - ) - create_video_from_images( - os.path.join(results_dir, f"{dirs[key]}_without_trajectory"), - os.path.join(results_dir, f"{key}_video_without_trajectory.mp4"), - ) - - -def create_video_from_images(image_dir, output_path): - images = sorted( - [img for img in os.listdir(image_dir) if img.endswith(".png")], - key=lambda x: int(re.search(r"(\d+)", x).group()), - ) - - if not images: - print(f"No images found in {image_dir}") - return - - frame = cv2.imread(os.path.join(image_dir, images[0])) - height, width, _ = frame.shape - - video = cv2.VideoWriter( - output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height) - ) - - for image in images: - img = cv2.imread(os.path.join(image_dir, image)) - video.write(img) - - video.release() - print(f"Created video: {output_path}") - - -def main(): - parser = argparse.ArgumentParser(description="Process video and generate results.") - parser.add_argument( - "--video_path", - default="./video/Test3-1.mp4", - help="Path to the input video file", - ) - parser.add_argument( - "--output_dir", - default="output-cnn", - help="Directory to save output images and results", - ) - - args = parser.parse_args() - os.makedirs(args.output_dir, exist_ok=True) - - frames_dir = os.path.join(args.output_dir, "frames") - video_to_frames(args.video_path, frames_dir) - process_images(args) - - -if __name__ == "__main__": - main() diff --git a/stethoscope_analysis.py b/stethoscope_analysis.py new file mode 100644 index 0000000..f3f22ed --- /dev/null +++ b/stethoscope_analysis.py @@ -0,0 +1,264 @@ +import cv2 +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + + +def calculate_welzl_circle(points: np.ndarray) -> tuple[tuple[float, float], float]: + """ + Welzlのアルゴリズムを使用して最小包含円を計算 + Args: + points: shape=(n, 2) のnumpy配列 + Returns: + ((center_x, center_y), radius) + """ + + def make_circle_3points( + p1: np.ndarray, p2: np.ndarray, p3: np.ndarray + ) -> tuple[np.ndarray, float]: + """3点から円を計算""" + # 3点から円の中心と半径を計算 + temp = p2[0] * p2[0] + p2[1] * p2[1] + bc = (p1[0] * p1[0] + p1[1] * p1[1] - temp) / 2.0 + cd = (temp - p3[0] * p3[0] - p3[1] * p3[1]) / 2.0 + det = (p1[0] - p2[0]) * (p2[1] - p3[1]) - (p2[0] - p3[0]) * (p1[1] - p2[1]) + + if abs(det) < 1e-10: + return None + + center_x = (bc * (p2[1] - p3[1]) - cd * (p1[1] - p2[1])) / det + center_y = ((p1[0] - p2[0]) * cd - (p2[0] - p3[0]) * bc) / det + center = np.array([center_x, center_y]) + radius = np.sqrt(np.sum((p1 - center) ** 2)) + + return center, radius + + def make_circle_2points(p1: np.ndarray, p2: np.ndarray) -> tuple[np.ndarray, float]: + """2点から円を計算(直径を使用)""" + center = (p1 + p2) / 2 + radius = np.sqrt(np.sum((p1 - p2) ** 2)) / 2 + return center, radius + + def is_point_in_circle( + point: np.ndarray, center: np.ndarray, radius: float + ) -> bool: + """点が円の中にあるかチェック""" + return np.sum((point - center) ** 2) <= radius * radius * (1 + 1e-10) + + def minimal_enclosing_circle(points: np.ndarray) -> tuple[np.ndarray, float]: + """最小包含円を再帰的に計算""" + if len(points) <= 1: + if len(points) == 1: + return points[0], 0 + return None + + # 2点の場合 + if len(points) == 2: + return make_circle_2points(points[0], points[1]) + + # ランダムに2点を選び、それらを通る円を作成 + for i in range(3): + for j in range(i + 1, len(points)): + center, radius = make_circle_2points(points[i], points[j]) + all_points_in_circle = all( + is_point_in_circle(p, center, radius) for p in points + ) + + if all_points_in_circle: + return center, radius + + # 3点を使用して円を作成 + for i in range(len(points) - 2): + for j in range(i + 1, len(points) - 1): + for k in range(j + 1, len(points)): + result = make_circle_3points(points[i], points[j], points[k]) + if result is None: + continue + center, radius = result + if all(is_point_in_circle(p, center, radius) for p in points): + return center, radius + + raise ValueError("No valid circle found") + + # 最小包含円の計算 + points = points.copy() + np.random.shuffle(points) # ランダム化により計算を効率化 + center, radius = minimal_enclosing_circle(points) + + return tuple(center), radius + + +def calculate_metrics_for_sequence( + points: np.ndarray, +) -> tuple[float, float, float, tuple[float, float]]: + """ + 時系列データの点列から標準偏差と最小包含円の半径を計算 + Args: + points: shape=(n, 2) のnumpy配列 + Returns: + (std_x, std_y, min_circle_radius, (center_x, center_y)) + """ + # 標準偏差の計算 + std_x = np.std(points[:, 0]) + std_y = np.std(points[:, 1]) + + # 最小包含円の計算 + center, radius = calculate_welzl_circle(points) + + return std_x, std_y, radius, center + + +def draw_circles_on_body(body_image: np.ndarray, circles_info: dict) -> np.ndarray: + """ + 体の画像上に最小包含円を描画 + Args: + body_image: 体の画像 + circles_info: 各手法の円の情報 {method_name: (center_x, center_y, radius)} + Returns: + 描画された画像 + """ + # 色の定義 + colors = { + "stethoscope": (0, 255, 255), # 黄色 + "conv": (0, 255, 0), # 緑 + "xgboost": (255, 0, 0), # 青 + "lightgbm": (0, 0, 255), # 赤 + } + + result_image = body_image.copy() + + # 各手法の円を描画 + for method, (center_x, center_y, radius) in circles_info.items(): + color = colors.get(method, (255, 255, 255)) # デフォルトは白 + # 円を描画 + cv2.circle(result_image, (int(center_x), int(center_y)), int(radius), color, 2) + # 中心点を描画 + cv2.circle(result_image, (int(center_x), int(center_y)), 3, color, -1) + # 手法名を描画 + cv2.putText( + result_image, + method, + (int(center_x), int(center_y - radius - 10)), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + color, + 2, + ) + + return result_image + + +def main(): + # CSVの読み込み + df = pd.read_csv("output/results-rtmpose+yolox/results.csv") + + # 体の画像を読み込み + body_image = cv2.imread("images/body/BodyF.png") + if body_image is None: + raise FileNotFoundError("Body image not found") + + # 各手法のカラム名のペア + method_columns = { + "conv": ["conv_stethoscope_x", "conv_stethoscope_y"], + } + + # XGBoostとLightGBMのカラムが存在する場合は追加 + if "Xgboost_stethoscope_x" in df.columns: + method_columns["xgboost"] = ["Xgboost_stethoscope_x", "Xgboost_stethoscope_y"] + if "lightGBM_stethoscope_x" in df.columns: + method_columns["lightgbm"] = [ + "lightGBM_stethoscope_x", + "lightGBM_stethoscope_y", + ] + + # 結果を格納する辞書 + results = {} + circles_info = {} + + # 各手法について計算 + for method_name, (x_col, y_col) in method_columns.items(): + # 有効な点のみを抽出 (0や欠損値を除外) + valid_points = df[[x_col, y_col]].values + valid_mask = ~( + np.isnan(valid_points).any(axis=1) | (valid_points == 0).all(axis=1) + ) + valid_points = valid_points[valid_mask] + + if len(valid_points) > 0: + std_x, std_y, radius, center = calculate_metrics_for_sequence(valid_points) + results[method_name] = { + "points_count": len(valid_points), + "std_x": std_x, + "std_y": std_y, + "min_circle_radius": radius, + } + circles_info[method_name] = (center[0], center[1], radius) + + # 結果をDataFrameに変換 + results_df = pd.DataFrame(results).T + + # 結果の表示 + print("\n=== 手法別分析結果 ===") + print(results_df.round(3)) + + # 結果をCSVに保存 + results_df.to_csv("stethoscope_analysis_results.csv") + + # 体の画像上に円を描画 + result_image = draw_circles_on_body(body_image, circles_info) + + # 結果の画像を保存 + cv2.imwrite("stethoscope_circles_on_body.png", result_image) + + # オリジナルの数値データの可視化も保持 + plt.figure(figsize=(15, 10)) + + # サブプロット1: STDの比較 + plt.subplot(2, 1, 1) + x = np.arange(len(results_df)) + width = 0.35 + plt.bar( + x - width / 2, results_df["std_x"], width, label="STD X", color="red", alpha=0.7 + ) + plt.bar( + x + width / 2, + results_df["std_y"], + width, + label="STD Y", + color="blue", + alpha=0.7, + ) + plt.xticks(x, results_df.index, rotation=45) + plt.title("Standard Deviation by Method") + plt.ylabel("Value (pixels)") + plt.legend() + plt.grid(True) + + # サブプロット2: 最小包含円の半径 + plt.subplot(2, 1, 2) + plt.bar(results_df.index, results_df["min_circle_radius"], color="green", alpha=0.7) + plt.title("Minimum Enclosing Circle Radius by Method") + plt.ylabel("Radius (pixels)") + plt.xticks(rotation=45) + plt.grid(True) + + plt.tight_layout() + plt.savefig("stethoscope_metrics_comparison.png") + plt.close() + + # 詳細な統計情報の出力 + print("\n=== 詳細統計情報 ===") + for method_name in results: + print(f"\n{method_name}:") + circle_info = circles_info[method_name] + print(f"データ点数: {results[method_name]['points_count']}") + print(f"中心座標: ({circle_info[0]:.2f}, {circle_info[1]:.2f})") + print(f"X方向の標準偏差: {results[method_name]['std_x']:.2f} pixels") + print(f"Y方向の標準偏差: {results[method_name]['std_y']:.2f} pixels") + print( + f"最小包含円の半径: {results[method_name]['min_circle_radius']:.2f} pixels" + ) + + +if __name__ == "__main__": + main()