diff --git a/.gitignore b/.gitignore index b72b7f9..1c2a7e9 100644 --- a/.gitignore +++ b/.gitignore @@ -176,4 +176,5 @@ results/ video/ models/ -output/ \ No newline at end of file +output/ +data/ \ No newline at end of file diff --git a/calc_circle.py b/calc_circle.py new file mode 100644 index 0000000..bd527d4 --- /dev/null +++ b/calc_circle.py @@ -0,0 +1,90 @@ +import csv +import math +import random + + +class Point: + def __init__(self, x, y): + self.x = float(x) + self.y = float(y) + + +def read_csv_file(filename): + data = [] + with open(filename, "r") as file: + csv_reader = csv.DictReader(file) + for row in csv_reader: + data.append(row) + return data + + +def dist(a, b): + return math.sqrt((a.x - b.x) ** 2 + (a.y - b.y) ** 2) + + +def circle_center(bx, by, cx, cy): + B = bx * bx + by * by + C = cx * cx + cy * cy + D = bx * cy - by * cx + return Point((cy * B - by * C) / (2 * D), (bx * C - cx * B) / (2 * D)) + + +def is_inside(c, r, p): + return dist(c, p) <= r + + +def get_circle_center(p1, p2, p3=None): + if p3 is None: + return Point((p1.x + p2.x) / 2, (p1.y + p2.y) / 2), dist(p1, p2) / 2 + + center = circle_center(p2.x - p1.x, p2.y - p1.y, p3.x - p1.x, p3.y - p1.y) + center.x += p1.x + center.y += p1.y + return center, dist(center, p1) + + +def min_circle(points): + if len(points) == 0: + return Point(0, 0), 0 + elif len(points) == 1: + return points[0], 0 + + shuffle = random.sample(points, len(points)) + c, r = get_circle_center(shuffle[0], shuffle[1]) + + for i in range(2, len(shuffle)): + if not is_inside(c, r, shuffle[i]): + c, r = get_circle_center(shuffle[0], shuffle[i]) + for j in range(1, i): + if not is_inside(c, r, shuffle[j]): + c, r = get_circle_center(shuffle[j], shuffle[i]) + for k in range(j): + if not is_inside(c, r, shuffle[k]): + c, r = get_circle_center(shuffle[j], shuffle[i], shuffle[k]) + + return c, r + + +def process_data(data, model_name): + points = [ + Point(row[f"{model_name}_stethoscope_x"], row[f"{model_name}_stethoscope_y"]) + for row in data + ] + center, radius = min_circle(points) + return radius * 2 # diameter + + +# Main execution +result_data = read_csv_file("output/results/results.csv") +result_convert_data = read_csv_file("output/results/results-convert.csv") + +models = ["conv", "Xgboost", "lightGBM"] + +print("Minimum enclosing circle diameters for each model:") +for model in models: + result_diameter = process_data(result_data, model) + result_convert_diameter = process_data(result_convert_data, model) + print(f"{model}:") + print(f" results.csv: {result_diameter:.2f}") + print(f" results-convert.csv: {result_convert_diameter:.2f}") + print() diff --git a/check_normalized.py b/check_normalized.py new file mode 100644 index 0000000..1f9228f --- /dev/null +++ b/check_normalized.py @@ -0,0 +1,85 @@ +import os + +import matplotlib.pyplot as plt +import pandas as pd + + +def plot_coordinates(df, output_folder): + os.makedirs(output_folder, exist_ok=True) + + for _, row in df.iterrows(): + fig, ax = plt.subplots(figsize=(10, 10)) + + # Plot points + ax.scatter( + row["left_shoulder_x"], + row["left_shoulder_y"], + color="red", + label="Left Shoulder", + ) + ax.scatter( + row["right_shoulder_x"], + row["right_shoulder_y"], + color="blue", + label="Right Shoulder", + ) + ax.scatter( + row["left_hip_x"], row["left_hip_y"], color="green", label="Left Hip" + ) + ax.scatter( + row["right_hip_x"], row["right_hip_y"], color="purple", label="Right Hip" + ) + ax.scatter( + row["stethoscope_x"], + row["stethoscope_y"], + color="orange", + label="Stethoscope", + ) + + # Connect shoulders and hips + ax.plot( + [row["left_shoulder_x"], row["right_shoulder_x"]], + [row["left_shoulder_y"], row["right_shoulder_y"]], + "k-", + ) + ax.plot( + [row["left_hip_x"], row["right_hip_x"]], + [row["left_hip_y"], row["right_hip_y"]], + "k-", + ) + ax.plot( + [row["left_shoulder_x"], row["left_hip_x"]], + [row["left_shoulder_y"], row["left_hip_y"]], + "k-", + ) + ax.plot( + [row["right_shoulder_x"], row["right_hip_x"]], + [row["right_shoulder_y"], row["right_hip_y"]], + "k-", + ) + ax.set_aspect("equal") + # Invert y-axis to match image coordinates (origin at top-left) + ax.invert_yaxis() + ax.legend() + ax.set_title(f"Coordinates for {row['image_file_name']}") + + plt.savefig( + os.path.join(output_folder, f"{row['image_file_name'].split('.')[0]}.png") + ) + plt.close() + + +# Read CSV files +result_df = pd.read_csv("output/results/results.csv") +result_convert_df = pd.read_csv("output/results/results-convert.csv") + +# Plot and save results +plot_coordinates(result_df, "result_plot") +plot_coordinates(result_convert_df, "result-convert-plot") + +""" result_df = pd.read_csv("data(卓球玉正規化座標)/01/results-convert.csv") +plot_coordinates(result_df, "data-confirm") """ + +print( + "Plotting completed. Results saved in 'result_plot' and 'result-convert-plot' folders." +) diff --git a/main-cnn.py b/main-cnn.py new file mode 100644 index 0000000..abb7445 --- /dev/null +++ b/main-cnn.py @@ -0,0 +1,348 @@ +import argparse +import csv +import os +import re + +import cv2 +import numpy as np +import pandas as pd +import torch +from dotenv import load_dotenv +from PIL import Image +from torchvision import transforms + +from modules.EARSForDL.EfficientNet import RegressionEfficientNet +from modules.EARSForDL.MobileNetV2 import RegressionMobileNetV2 +from modules.EARSForDL.ResNet import RegressionResNet +from modules.EARSForDL.SqueezeNet import RegressionSqueezeNet + +# RTMPose imports + +# Load environment variables +load_dotenv() + +# Get colors from environment variables +RESNET_COLOR = tuple( + map(int, os.getenv("RESNET_COLOR", "255,165,0").split(",")) +) # Orange for ResNet +EFFICIENTNET_COLOR = tuple( + map(int, os.getenv("EFFICIENTNET_COLOR", "0,0,255").split(",")) +) # Blue for EfficientNet +MOBILENET_COLOR = tuple( + map(int, os.getenv("MOBILENET_COLOR", "255,0,0").split(",")) +) # Red for MobileNet +SQUEEZENET_COLOR = tuple( + map(int, os.getenv("SQUEEZENET_COLOR", "128,0,128").split(",")) +) # Purple for SqueezeNet + +# Get model execution settings from environment variables +RESNET_ENABLED = os.getenv("RESNET_ENABLED", "True").lower() == "true" +EFFICIENTNET_ENABLED = os.getenv("EFFICIENTNET_ENABLED", "True").lower() == "true" +MOBILENET_ENABLED = os.getenv("MOBILENET_ENABLED", "True").lower() == "true" +SQUEEZENET_ENABLED = os.getenv("SQUEEZENET_ENABLED", "True").lower() == "true" + +# Get normalization setting +NORMALIZE_ENABLED = os.getenv("NORMALIZE_ENABLED", "False").lower() == "true" + + +def normalize_quadrilateral_with_point(points, extra_point): + all_points = np.vstack([points.reshape(-1, 2), extra_point]) + center = np.mean(points.reshape(-1, 2), axis=0) + centered_points = all_points - center + + shoulder_angle = calculate_rotation_angle(centered_points[0], centered_points[1]) + hip_angle = calculate_rotation_angle(centered_points[2], centered_points[3]) + average_angle = (shoulder_angle + hip_angle) / 2 + + rotation_matrix = np.array( + [ + [np.cos(-average_angle), -np.sin(-average_angle)], + [np.sin(-average_angle), np.cos(-average_angle)], + ] + ) + + rotated_points = np.dot(centered_points, rotation_matrix.T) + max_edge_length = np.max( + np.linalg.norm( + np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1 + ) + ) + return rotated_points / max_edge_length + + +def calculate_rotation_angle(point1, point2): + vector = point2 - point1 + return np.arctan2(vector[1], vector[0]) + + +def video_to_frames(video_path, output_dir): + os.makedirs(output_dir, exist_ok=True) + video = cv2.VideoCapture(video_path) + if not video.isOpened(): + raise IOError(f"Could not open video file: {video_path}") + + frame_num = 0 + while True: + success, frame = video.read() + if not success: + break + frame_num += 1 + cv2.imwrite(os.path.join(output_dir, f"{frame_num}-frame.png"), frame) + + video.release() + print(f"All frames saved to {output_dir}") + + +def preprocess_image(image_path): + transform = transforms.Compose( + [ + transforms.Resize((224, 224)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), + ] + ) + return transform(Image.open(image_path).convert("RGB")).unsqueeze(0) + + +def extract_keypoints_rtmpose(pose_results): + if not pose_results: + print("No pose results found.") + return None + + max_avg_visible = 0 + best_instance = None + for result in pose_results: + pred_instances = result.pred_instances + for instance in pred_instances: + avg_visible = np.mean(instance.keypoints_visible) + if avg_visible > max_avg_visible: + max_avg_visible = avg_visible + best_instance = instance + + if best_instance is None: + print("No valid instances found.") + return None + + keypoints = best_instance.keypoints[0] + return keypoints + + +def process_images(args): + print("Starting process_images function...") + base_dir = os.path.join(args.output_dir, "frames") + results_dir = os.path.join(args.output_dir, "results") + csv_path = os.path.join(results_dir, "results.csv") + + # Load enabled models + device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + models = {} + + if RESNET_ENABLED: + resnet_model = RegressionResNet(resnet_depth=18) + resnet_model.load_state_dict( + torch.load("./models/best_model-resnet.pth", map_location=device) + ) + resnet_model.to(device) + resnet_model.eval() + models["resnet"] = resnet_model + + if EFFICIENTNET_ENABLED: + efficientnet_model = RegressionEfficientNet("b1") + efficientnet_model.load_state_dict( + torch.load("./models/best_model-efficient.pth", map_location=device) + ) + efficientnet_model.to(device) + efficientnet_model.eval() + models["efficientnet"] = efficientnet_model + + if MOBILENET_ENABLED: + mobilenet_model = RegressionMobileNetV2() + mobilenet_model.load_state_dict( + torch.load("./models/best_model-mobilenetV2.pth", map_location=device) + ) + mobilenet_model.to(device) + mobilenet_model.eval() + models["mobilenet"] = mobilenet_model + + if SQUEEZENET_ENABLED: + squeezenet_model = RegressionSqueezeNet("1_1") + squeezenet_model.load_state_dict( + torch.load("./models/best_model-squeeze.pth", map_location=device) + ) + squeezenet_model.to(device) + squeezenet_model.eval() + models["squeezenet"] = squeezenet_model + + os.makedirs(results_dir, exist_ok=True) + + png_files = sorted( + [f for f in os.listdir(base_dir) if f.lower().endswith(".png")], + key=lambda x: int(re.search(r"(\d+)", x).group(1)), + ) + print(f"Found {len(png_files)} PNG files.") + + rows = [] + for image_file_name in png_files: + print(f"Processing image: {image_file_name}") + image_path = os.path.join(base_dir, image_file_name) + frame = cv2.imread(image_path) + if frame is None: + print(f"Failed to load image: {image_path}") + continue + + # Get predictions from all enabled models + processed_image = preprocess_image(image_path).to(device) + row = {"image_file_name": image_file_name} + + with torch.no_grad(): + for model_name, model in models.items(): + output = model(processed_image) + coords = output[0].cpu().numpy() + row[f"{model_name}_stethoscope_x"] = int(coords[0]) + row[f"{model_name}_stethoscope_y"] = int(coords[1]) + + rows.append(row) + + if rows: + fieldnames = list(rows[0].keys()) + with open(csv_path, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) + + print(f"Processed and saved results to: {csv_path}") + generate_visualizations(csv_path, base_dir, results_dir) + else: + print("No data to write to CSV.") + + +def generate_visualizations(csv_path, original_images_dir, results_dir): + df = pd.read_csv(csv_path) + body_image = cv2.imread("./images/body/BodyF.png") + + # Define directories and colors for enabled models + dirs = {} + colors = {} + + if RESNET_ENABLED: + dirs["resnet"] = "resnet" + colors["resnet"] = RESNET_COLOR + if EFFICIENTNET_ENABLED: + dirs["efficientnet"] = "efficientnet" + colors["efficientnet"] = EFFICIENTNET_COLOR + if MOBILENET_ENABLED: + dirs["mobilenet"] = "mobilenet" + colors["mobilenet"] = MOBILENET_COLOR + if SQUEEZENET_ENABLED: + dirs["squeezenet"] = "squeezenet" + colors["squeezenet"] = SQUEEZENET_COLOR + + # Create output directories + for key in dirs: + os.makedirs( + os.path.join(results_dir, f"{dirs[key]}_with_trajectory"), exist_ok=True + ) + os.makedirs( + os.path.join(results_dir, f"{dirs[key]}_without_trajectory"), exist_ok=True + ) + + points = {key: [] for key in dirs.keys()} + + for _, row in df.iterrows(): + # Process each prediction method + for key in points: + x = int(row[f"{key}_stethoscope_x"]) + y = int(row[f"{key}_stethoscope_y"]) + points[key].append((x, y)) + + # Draw with trajectory + image_with_trajectory = body_image.copy() + if len(points[key]) > 1: + cv2.polylines( + image_with_trajectory, + [np.array(points[key])], + False, + colors[key], + 2, + ) + cv2.circle(image_with_trajectory, (x, y), 10, colors[key], -1) + cv2.imwrite( + os.path.join( + results_dir, f"{dirs[key]}_with_trajectory", row["image_file_name"] + ), + image_with_trajectory, + ) + + # Draw without trajectory + image_without_trajectory = body_image.copy() + cv2.circle(image_without_trajectory, (x, y), 10, colors[key], -1) + cv2.imwrite( + os.path.join( + results_dir, + f"{dirs[key]}_without_trajectory", + row["image_file_name"], + ), + image_without_trajectory, + ) + + # Create videos for all methods + for key in dirs: + create_video_from_images( + os.path.join(results_dir, f"{dirs[key]}_with_trajectory"), + os.path.join(results_dir, f"{key}_video_with_trajectory.mp4"), + ) + create_video_from_images( + os.path.join(results_dir, f"{dirs[key]}_without_trajectory"), + os.path.join(results_dir, f"{key}_video_without_trajectory.mp4"), + ) + + +def create_video_from_images(image_dir, output_path): + images = sorted( + [img for img in os.listdir(image_dir) if img.endswith(".png")], + key=lambda x: int(re.search(r"(\d+)", x).group()), + ) + + if not images: + print(f"No images found in {image_dir}") + return + + frame = cv2.imread(os.path.join(image_dir, images[0])) + height, width, _ = frame.shape + + video = cv2.VideoWriter( + output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height) + ) + + for image in images: + img = cv2.imread(os.path.join(image_dir, image)) + video.write(img) + + video.release() + print(f"Created video: {output_path}") + + +def main(): + parser = argparse.ArgumentParser(description="Process video and generate results.") + parser.add_argument( + "--video_path", + default="./video/Test3-1.mp4", + help="Path to the input video file", + ) + parser.add_argument( + "--output_dir", + default="output-cnn", + help="Directory to save output images and results", + ) + + args = parser.parse_args() + os.makedirs(args.output_dir, exist_ok=True) + + frames_dir = os.path.join(args.output_dir, "frames") + video_to_frames(args.video_path, frames_dir) + process_images(args) + + +if __name__ == "__main__": + main() diff --git a/main.py b/main.py index 290202b..016aa33 100644 --- a/main.py +++ b/main.py @@ -1,36 +1,41 @@ -import cv2 -import os +import argparse import csv +import os +import pickle import re + +import cv2 import numpy as np import pandas as pd -import pickle -import matplotlib.pyplot as plt -import torch -from PIL import Image -import argparse -from torchvision import transforms -from util.ears_ai import EarsAI -from util.calc_ste_position import CalcStethoscopePosition -from modules.EARSForDL.model import RegressionResNet from dotenv import load_dotenv +from mmdet.apis import inference_detector, init_detector # New imports for RTMPose from mmpose.apis import inference_topdown from mmpose.apis import init_model as init_pose_estimator from mmpose.evaluation.functional import nms from mmpose.registry import VISUALIZERS -from mmpose.structures import merge_data_samples, split_instances +from mmpose.structures import merge_data_samples from mmpose.utils import adapt_mmdet_pipeline -from mmdet.apis import inference_detector, init_detector +from PIL import Image +from torchvision import transforms + +from util.calc_ste_position import CalcStethoscopePosition +from util.ears_ai import EarsAI # Load environment variables load_dotenv() # Get colors from environment variables -CONV_COLOR = tuple(map(int, os.getenv("CONV_COLOR", "0,255,0").split(","))) # Default: Green -XGBOOST_COLOR = tuple(map(int, os.getenv("XGBOOST_COLOR", "255,0,0").split(","))) # Default: Red -LIGHTGBM_COLOR = tuple(map(int, os.getenv("LIGHTGBM_COLOR", "0,0,255").split(","))) # Default: Blue +CONV_COLOR = tuple( + map(int, os.getenv("CONV_COLOR", "0,255,0").split(",")) +) # Default: Green +XGBOOST_COLOR = tuple( + map(int, os.getenv("XGBOOST_COLOR", "255,0,0").split(",")) +) # Default: Red +LIGHTGBM_COLOR = tuple( + map(int, os.getenv("LIGHTGBM_COLOR", "0,0,255").split(",")) +) # Default: Blue # Get model execution settings CONV_ENABLED = os.getenv("CONV_ENABLED", "True").lower() == "true" @@ -58,11 +63,18 @@ average_angle = (shoulder_angle + hip_angle) / 2 rotation_matrix = np.array( - [[np.cos(-average_angle), -np.sin(-average_angle)], [np.sin(-average_angle), np.cos(-average_angle)]] + [ + [np.cos(-average_angle), -np.sin(-average_angle)], + [np.sin(-average_angle), np.cos(-average_angle)], + ] ) rotated_points = np.dot(centered_points, rotation_matrix.T) - max_edge_length = np.max(np.linalg.norm(np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1)) + max_edge_length = np.max( + np.linalg.norm( + np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1 + ) + ) return rotated_points / max_edge_length @@ -164,8 +176,12 @@ elif RTMPOSE_ENABLED: det_result = inference_detector(detector, frame) pred_instance = det_result.pred_instances.cpu().numpy() - bboxes = np.concatenate((pred_instance.bboxes, pred_instance.scores[:, None]), axis=1) - bboxes = bboxes[np.logical_and(pred_instance.labels == 0, pred_instance.scores > 0.3)] + bboxes = np.concatenate( + (pred_instance.bboxes, pred_instance.scores[:, None]), axis=1 + ) + bboxes = bboxes[ + np.logical_and(pred_instance.labels == 0, pred_instance.scores > 0.3) + ] bboxes = bboxes[nms(bboxes, 0.3), :4] pose_results = inference_topdown(pose_estimator, frame, bboxes) data_samples = merge_data_samples(pose_results) @@ -173,10 +189,14 @@ if keypoints is None: print(f"Failed to extract keypoints for image: {image_path}") continue - left_shoulder = keypoints[5] + """ left_shoulder = keypoints[5] right_shoulder = keypoints[6] left_hip = keypoints[11] - right_hip = keypoints[12] + right_hip = keypoints[12] テレコ確認""" + left_shoulder = keypoints[6] + right_shoulder = keypoints[5] + left_hip = keypoints[12] + right_hip = keypoints[11] if visualizer is not None: visualizer.add_datasample( "result", @@ -193,11 +213,18 @@ ) pose_overlay_img = visualizer.get_image() else: - print("No pose estimation method enabled. Please enable either PoseNet or RTMPose.") + print( + "No pose estimation method enabled. Please enable either PoseNet or RTMPose." + ) continue - stethoscope_overlay_img, stethoscope_x, stethoscope_y = ears_ai.ssd_detect(frame, None) + stethoscope_overlay_img, stethoscope_x, stethoscope_y = ears_ai.ssd_detect( + frame, None + ) - cv2.imwrite(os.path.join(pose_overlay_dir, image_file_name), cv2.cvtColor(pose_overlay_img, cv2.COLOR_RGB2BGR)) + cv2.imwrite( + os.path.join(pose_overlay_dir, image_file_name), + cv2.cvtColor(pose_overlay_img, cv2.COLOR_RGB2BGR), + ) cv2.imwrite( os.path.join(stethoscope_overlay_dir, image_file_name), cv2.cvtColor(stethoscope_overlay_img, cv2.COLOR_RGB2BGR), @@ -232,7 +259,9 @@ "stethoscope_y": stethoscope_y, } else: - print("No pose estimation method enabled. Please enable either PoseNet or RTMPose.") + print( + "No pose estimation method enabled. Please enable either PoseNet or RTMPose." + ) continue rows.append(row) @@ -244,8 +273,12 @@ ], dtype=np.float32, ) - stethoscope_point = np.array([float(row["stethoscope_x"]), float(row["stethoscope_y"])]) - normalized_points = normalize_quadrilateral_with_point(source_points.flatten(), stethoscope_point) + stethoscope_point = np.array( + [float(row["stethoscope_x"]), float(row["stethoscope_y"])] + ) + normalized_points = normalize_quadrilateral_with_point( + source_points.flatten(), stethoscope_point + ) normalized_row = { "image_file_name": image_file_name, @@ -273,13 +306,23 @@ fieldnames.extend(["lightGBM_stethoscope_x", "lightGBM_stethoscope_y"]) if LIGHTGBM_ENABLED: - lgb_model_x = load_model("./models/lgb_stethoscope_calc_x_best_model-Fold4.pkl") - lgb_model_y = load_model("./models/lgb_stethoscope_calc_y_best_model-Fold4.pkl") + lgb_model_x = load_model( + "./models/lgb_stethoscope_calc_x_best_model-Fold4.pkl" + ) + lgb_model_y = load_model( + "./models/lgb_stethoscope_calc_y_best_model-Fold4.pkl" + ) if XGBOOST_ENABLED: - xg_model_x = load_model("./models/xg_stethoscope_calc_x_best_model-Fold4.pkl") - xg_model_y = load_model("./models/xg_stethoscope_calc_y_best_model-Fold4.pkl") + xg_model_x = load_model( + "./models/xg_stethoscope_calc_x_best_model-Fold4.pkl" + ) + xg_model_y = load_model( + "./models/xg_stethoscope_calc_y_best_model-Fold4.pkl" + ) - with open(csv_path, "w", newline="") as csvfile, open(normalized_csv_path, "w", newline="") as norm_csvfile: + with open(csv_path, "w", newline="") as csvfile, open( + normalized_csv_path, "w", newline="" + ) as norm_csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) norm_writer = csv.DictWriter(norm_csvfile, fieldnames=fieldnames) writer.writeheader() @@ -297,21 +340,40 @@ source_points = np.array( [ [float(row[f"{pos}_x"]), float(row[f"{pos}_y"])] - for pos in ["left_shoulder", "right_shoulder", "left_hip", "right_hip"] + for pos in [ + "left_shoulder", + "right_shoulder", + "left_hip", + "right_hip", + ] ], dtype=np.float32, ) - stethoscope_point = np.array([float(row["stethoscope_x"]), float(row["stethoscope_y"])]) + stethoscope_point = np.array( + [float(row["stethoscope_x"]), float(row["stethoscope_y"])] + ) if stethoscope_point[0] == 0 and stethoscope_point[1] == 0: for key in prev_values: - row[f"{key}_stethoscope_x"], row[f"{key}_stethoscope_y"] = prev_values[key] - norm_row[f"{key}_stethoscope_x"], norm_row[f"{key}_stethoscope_y"] = prev_values[key] + row[f"{key}_stethoscope_x"], row[f"{key}_stethoscope_y"] = ( + prev_values[key] + ) + ( + norm_row[f"{key}_stethoscope_x"], + norm_row[f"{key}_stethoscope_y"], + ) = prev_values[key] else: if CONV_ENABLED: - conv_stethoscope = calc_position.calc_affine(source_points, *stethoscope_point) - row["conv_stethoscope_x"], row["conv_stethoscope_y"] = conv_stethoscope - norm_row["conv_stethoscope_x"], norm_row["conv_stethoscope_y"] = conv_stethoscope + conv_stethoscope = calc_position.calc_affine( + source_points, *stethoscope_point + ) + row["conv_stethoscope_x"], row["conv_stethoscope_y"] = ( + conv_stethoscope + ) + ( + norm_row["conv_stethoscope_x"], + norm_row["conv_stethoscope_y"], + ) = conv_stethoscope if NORMALIZE_ENABLED: input_data = pd.DataFrame([norm_row]) @@ -320,24 +382,45 @@ input_columns = [ f"{pos}_{coord}" - for pos in ["left_shoulder", "right_shoulder", "left_hip", "right_hip", "stethoscope"] + for pos in [ + "left_shoulder", + "right_shoulder", + "left_hip", + "right_hip", + "stethoscope", + ] for coord in ["x", "y"] ] if LIGHTGBM_ENABLED: lgb_x = int(lgb_model_x.predict(input_data[input_columns])[0]) lgb_y = int(lgb_model_y.predict(input_data[input_columns])[0]) - row["lightGBM_stethoscope_x"], row["lightGBM_stethoscope_y"] = lgb_x, lgb_y - norm_row["lightGBM_stethoscope_x"], norm_row["lightGBM_stethoscope_y"] = lgb_x, lgb_y + row["lightGBM_stethoscope_x"], row["lightGBM_stethoscope_y"] = ( + lgb_x, + lgb_y, + ) + ( + norm_row["lightGBM_stethoscope_x"], + norm_row["lightGBM_stethoscope_y"], + ) = lgb_x, lgb_y if XGBOOST_ENABLED: xg_x = int(xg_model_x.predict(input_data[input_columns])[0]) xg_y = int(xg_model_y.predict(input_data[input_columns])[0]) - row["Xgboost_stethoscope_x"], row["Xgboost_stethoscope_y"] = xg_x, xg_y - norm_row["Xgboost_stethoscope_x"], norm_row["Xgboost_stethoscope_y"] = xg_x, xg_y + row["Xgboost_stethoscope_x"], row["Xgboost_stethoscope_y"] = ( + xg_x, + xg_y, + ) + ( + norm_row["Xgboost_stethoscope_x"], + norm_row["Xgboost_stethoscope_y"], + ) = xg_x, xg_y for key in prev_values: - prev_values[key] = (row[f"{key}_stethoscope_x"], row[f"{key}_stethoscope_y"]) + prev_values[key] = ( + row[f"{key}_stethoscope_x"], + row[f"{key}_stethoscope_y"], + ) writer.writerow(row) norm_writer.writerow(norm_row) @@ -365,22 +448,46 @@ os.makedirs(os.path.join(results_dir, "marked_images"), exist_ok=True) for key in dirs: if key != "marked": - os.makedirs(os.path.join(results_dir, f"{dirs[key]}_with_trajectory"), exist_ok=True) - os.makedirs(os.path.join(results_dir, f"{dirs[key]}_without_trajectory"), exist_ok=True) + os.makedirs( + os.path.join(results_dir, f"{dirs[key]}_with_trajectory"), exist_ok=True + ) + os.makedirs( + os.path.join(results_dir, f"{dirs[key]}_without_trajectory"), + exist_ok=True, + ) points = {key: [] for key in dirs.keys() if key != "marked"} colors = {"conv": CONV_COLOR, "Xgboost": XGBOOST_COLOR, "lightGBM": LIGHTGBM_COLOR} for _, row in df.iterrows(): - original_image = cv2.imread(os.path.join(original_images_dir, row["image_file_name"])) + original_image = cv2.imread( + os.path.join(original_images_dir, row["image_file_name"]) + ) if original_image is None: - print(f"Failed to load image: {os.path.join(original_images_dir, row['image_file_name'])}") + print( + f"Failed to load image: {os.path.join(original_images_dir, row['image_file_name'])}" + ) continue - for point in ["left_shoulder", "right_shoulder", "left_hip", "right_hip", "stethoscope"]: - cv2.circle(original_image, (int(row[f"{point}_x"]), int(row[f"{point}_y"])), 10, (255, 255, 0), -1) + for point in [ + "left_shoulder", + "right_shoulder", + "left_hip", + "right_hip", + "stethoscope", + ]: + cv2.circle( + original_image, + (int(row[f"{point}_x"]), int(row[f"{point}_y"])), + 10, + (255, 255, 0), + -1, + ) - cv2.imwrite(os.path.join(results_dir, "marked_images", row["image_file_name"]), original_image) + cv2.imwrite( + os.path.join(results_dir, "marked_images", row["image_file_name"]), + original_image, + ) for key in points: x, y = int(row[f"{key}_stethoscope_x"]), int(row[f"{key}_stethoscope_y"]) @@ -388,20 +495,36 @@ image_with_trajectory = body_image.copy() if len(points[key]) > 1: - cv2.polylines(image_with_trajectory, [np.array(points[key])], False, colors[key], 2) + cv2.polylines( + image_with_trajectory, + [np.array(points[key])], + False, + colors[key], + 2, + ) cv2.circle(image_with_trajectory, (x, y), 10, colors[key], -1) cv2.imwrite( - os.path.join(results_dir, f"{dirs[key]}_with_trajectory", row["image_file_name"]), image_with_trajectory + os.path.join( + results_dir, f"{dirs[key]}_with_trajectory", row["image_file_name"] + ), + image_with_trajectory, ) image_without_trajectory = body_image.copy() cv2.circle(image_without_trajectory, (x, y), 10, colors[key], -1) cv2.imwrite( - os.path.join(results_dir, f"{dirs[key]}_without_trajectory", row["image_file_name"]), + os.path.join( + results_dir, + f"{dirs[key]}_without_trajectory", + row["image_file_name"], + ), image_without_trajectory, ) - create_video_from_images(os.path.join(results_dir, "marked_images"), os.path.join(results_dir, "marked_video.mp4")) + create_video_from_images( + os.path.join(results_dir, "marked_images"), + os.path.join(results_dir, "marked_video.mp4"), + ) for key in dirs: if key != "marked": @@ -428,7 +551,9 @@ frame = cv2.imread(os.path.join(image_dir, images[0])) height, width, _ = frame.shape - video = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height)) + video = cv2.VideoWriter( + output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height) + ) for image in images: img = cv2.imread(os.path.join(image_dir, image)) @@ -441,12 +566,18 @@ def main(): parser = argparse.ArgumentParser(description="Process video and generate results.") parser = argparse.ArgumentParser(description="Process video and generate results.") - parser.add_argument("--video_path", default="./video/Test3-1.mp4", help="Path to the input video file") - parser.add_argument("--output_dir", default="output", help="Directory to save output images and results") - det_config = "modules/rtmpose/mmdetection_cfg/rtmdet_m_640-8xb32_coco-person.py" - det_checkpoint = ( - "https://download.openmmlab.com/mmpose/v1/projects/rtmpose/rtmdet_m_8xb32-100e_coco-obj365-person-235e8209.pth" + parser.add_argument( + "--video_path", + default="./video/Test3-1.mp4", + help="Path to the input video file", ) + parser.add_argument( + "--output_dir", + default="output", + help="Directory to save output images and results", + ) + det_config = "modules/rtmpose/mmdetection_cfg/rtmdet_m_640-8xb32_coco-person.py" + det_checkpoint = "https://download.openmmlab.com/mmpose/v1/projects/rtmpose/rtmdet_m_8xb32-100e_coco-obj365-person-235e8209.pth" pose_config = "modules/rtmpose/configs/body_2d_keypoint/rtmpose/body8/rtmpose-m_8xb256-420e_body8-256x192.py" pose_checkpoint = "https://download.openmmlab.com/mmpose/v1/projects/rtmposev1/rtmpose-m_simcc-body7_pt-body7_420e-256x192-e48f03d0_20230504.pth" @@ -460,9 +591,13 @@ if RTMPOSE_ENABLED: detector = init_detector(det_config, det_checkpoint, device="cuda:0") detector.cfg = adapt_mmdet_pipeline(detector.cfg) - pose_estimator = init_pose_estimator(pose_config, pose_checkpoint, device="cuda:0") + pose_estimator = init_pose_estimator( + pose_config, pose_checkpoint, device="cuda:0" + ) visualizer = VISUALIZERS.build(pose_estimator.cfg.visualizer) - visualizer.set_dataset_meta(pose_estimator.dataset_meta, skeleton_style="mmpose") + visualizer.set_dataset_meta( + pose_estimator.dataset_meta, skeleton_style="mmpose" + ) process_images(args, detector, pose_estimator, visualizer) else: diff --git a/modules/EARSForDL/EfficientNet.py b/modules/EARSForDL/EfficientNet.py new file mode 100644 index 0000000..7395171 --- /dev/null +++ b/modules/EARSForDL/EfficientNet.py @@ -0,0 +1,52 @@ +import torch.nn as nn +from torchvision.models import ( + EfficientNet_B0_Weights, + EfficientNet_B1_Weights, + EfficientNet_B2_Weights, + EfficientNet_B3_Weights, + EfficientNet_B4_Weights, + EfficientNet_B5_Weights, + EfficientNet_B6_Weights, + EfficientNet_B7_Weights, + efficientnet_b0, + efficientnet_b1, + efficientnet_b2, + efficientnet_b3, + efficientnet_b4, + efficientnet_b5, + efficientnet_b6, + efficientnet_b7, +) + + +class RegressionEfficientNet(nn.Module): + def __init__(self, efficientnet_version): + super(RegressionEfficientNet, self).__init__() + + if efficientnet_version == "b0": + self.model = efficientnet_b0(weights=EfficientNet_B0_Weights.IMAGENET1K_V1) + elif efficientnet_version == "b1": + self.model = efficientnet_b1(weights=EfficientNet_B1_Weights.IMAGENET1K_V1) + elif efficientnet_version == "b2": + self.model = efficientnet_b2(weights=EfficientNet_B2_Weights.IMAGENET1K_V1) + elif efficientnet_version == "b3": + self.model = efficientnet_b3(weights=EfficientNet_B3_Weights.IMAGENET1K_V1) + elif efficientnet_version == "b4": + self.model = efficientnet_b4(weights=EfficientNet_B4_Weights.IMAGENET1K_V1) + elif efficientnet_version == "b5": + self.model = efficientnet_b5(weights=EfficientNet_B5_Weights.IMAGENET1K_V1) + elif efficientnet_version == "b6": + self.model = efficientnet_b6(weights=EfficientNet_B6_Weights.IMAGENET1K_V1) + elif efficientnet_version == "b7": + self.model = efficientnet_b7(weights=EfficientNet_B7_Weights.IMAGENET1K_V1) + else: + raise ValueError("Invalid EfficientNet version. Choose from 'b0' to 'b7'.") + + # Modify the final fully connected layer + num_features = self.model.classifier[1].in_features + self.model.classifier = nn.Sequential( + nn.Dropout(p=0.2, inplace=True), nn.Linear(num_features, 2) + ) + + def forward(self, x): + return self.model(x) diff --git a/modules/EARSForDL/MobileNetV2.py b/modules/EARSForDL/MobileNetV2.py new file mode 100644 index 0000000..f7a9673 --- /dev/null +++ b/modules/EARSForDL/MobileNetV2.py @@ -0,0 +1,25 @@ +import torch.nn as nn +import torchvision.models as models +from torchvision.models import MobileNet_V2_Weights + + +class RegressionMobileNetV2(nn.Module): + def __init__(self, pretrained=True): + super(RegressionMobileNetV2, self).__init__() + + # Load pretrained MobileNetV2 + if pretrained: + self.model = models.mobilenet_v2(weights=MobileNet_V2_Weights.IMAGENET1K_V1) + else: + self.model = models.mobilenet_v2(weights=None) + + # Get the number of features from the last layer + num_features = self.model.classifier[1].in_features + + # Replace the classifier with a new one for regression + self.model.classifier = nn.Sequential( + nn.Dropout(p=0.2), nn.Linear(num_features, 2) + ) + + def forward(self, x): + return self.model(x) diff --git a/modules/EARSForDL/ResNet.py b/modules/EARSForDL/ResNet.py new file mode 100644 index 0000000..a2b1214 --- /dev/null +++ b/modules/EARSForDL/ResNet.py @@ -0,0 +1,33 @@ +import torch.nn as nn +import torchvision.models as models +from torchvision.models import ( + ResNet18_Weights, + ResNet34_Weights, + ResNet50_Weights, + ResNet101_Weights, + ResNet152_Weights, +) + + +class RegressionResNet(nn.Module): + def __init__(self, resnet_depth): + super(RegressionResNet, self).__init__() + if resnet_depth == 18: + self.model = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1) + elif resnet_depth == 34: + self.model = models.resnet34(weights=ResNet34_Weights.IMAGENET1K_V1) + elif resnet_depth == 50: + self.model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1) + elif resnet_depth == 101: + self.model = models.resnet101(weights=ResNet101_Weights.IMAGENET1K_V1) + elif resnet_depth == 152: + self.model = models.resnet152(weights=ResNet152_Weights.IMAGENET1K_V1) + else: + raise ValueError("Invalid ResNet depth. Choose from 18, 34, 50, 101, 152.") + + # Modify the final fully connected layer + num_features = self.model.fc.in_features + self.model.fc = nn.Linear(num_features, 2) + + def forward(self, x): + return self.model(x) diff --git a/modules/EARSForDL/SqueezeNet.py b/modules/EARSForDL/SqueezeNet.py new file mode 100644 index 0000000..5c519e9 --- /dev/null +++ b/modules/EARSForDL/SqueezeNet.py @@ -0,0 +1,31 @@ +import torch +import torch.nn as nn +import torchvision.models as models +from torchvision.models import SqueezeNet1_0_Weights, SqueezeNet1_1_Weights + + +class RegressionSqueezeNet(nn.Module): + def __init__(self, version="1_0"): + super(RegressionSqueezeNet, self).__init__() + if version == "1_0": + self.model = models.squeezenet1_0(weights=SqueezeNet1_0_Weights.IMAGENET1K_V1) + elif version == "1_1": + self.model = models.squeezenet1_1(weights=SqueezeNet1_1_Weights.IMAGENET1K_V1) + else: + raise ValueError("Invalid SqueezeNet version. Choose from '1_0' or '1_1'.") + + # Remove the original classifier + self.model.classifier = nn.Sequential( + nn.Dropout(p=0.5), nn.Conv2d(512, 2, kernel_size=1), nn.ReLU(inplace=True), nn.AdaptiveAvgPool2d((1, 1)) + ) + + # Initialize the new classifier weights + for m in self.model.classifier: + if isinstance(m, nn.Conv2d): + nn.init.kaiming_uniform_(m.weight) + if m.bias is not None: + nn.init.constant_(m.bias, 0) + + def forward(self, x): + x = self.model(x) + return x.view(x.size(0), -1) diff --git a/modules/EARSForDL/model.py b/modules/EARSForDL/model.py deleted file mode 100644 index aee252e..0000000 --- a/modules/EARSForDL/model.py +++ /dev/null @@ -1,34 +0,0 @@ -import torch -import torch.nn as nn -import torchvision.models as models -from torchvision.models import ( - ResNet18_Weights, - ResNet34_Weights, - ResNet50_Weights, - ResNet101_Weights, - ResNet152_Weights, -) - - -class RegressionResNet(nn.Module): - def __init__(self, resnet_depth): - super(RegressionResNet, self).__init__() - if resnet_depth == 18: - self.model = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1) - elif resnet_depth == 34: - self.model = models.resnet34(weights=ResNet34_Weights.IMAGENET1K_V1) - elif resnet_depth == 50: - self.model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1) - elif resnet_depth == 101: - self.model = models.resnet101(weights=ResNet101_Weights.IMAGENET1K_V1) - elif resnet_depth == 152: - self.model = models.resnet152(weights=ResNet152_Weights.IMAGENET1K_V1) - else: - raise ValueError("Invalid ResNet depth. Choose from 18, 34, 50, 101, 152.") - - # Modify the final fully connected layer - num_features = self.model.fc.in_features - self.model.fc = nn.Linear(num_features, 2) - - def forward(self, x): - return self.model(x)