import cv2
import os
import csv
import re
import numpy as np
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import torch
from PIL import Image
import argparse
from torchvision import transforms
from util.ears_ai import EarsAI
from util.calc_ste_position import CalcStethoscopePosition
from modules.EARSForDL.model import RegressionResNet
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get colors from environment variables
CONV_COLOR = tuple(map(int, os.getenv("CONV_COLOR", "0,255,0").split(","))) # Default: Green
XGBOOST_COLOR = tuple(map(int, os.getenv("XGBOOST_COLOR", "255,0,0").split(","))) # Default: Red
LIGHTGBM_COLOR = tuple(map(int, os.getenv("LIGHTGBM_COLOR", "0,0,255").split(","))) # Default: Blue
def load_model(model_path, model_type="lgb"):
with open(model_path, "rb") as model_file:
return pickle.load(model_file)
def normalize_quadrilateral_with_point(points, extra_point):
all_points = np.vstack([points.reshape(-1, 2), extra_point])
center = np.mean(points.reshape(-1, 2), axis=0)
centered_points = all_points - center
shoulder_angle = calculate_rotation_angle(centered_points[0], centered_points[1])
hip_angle = calculate_rotation_angle(centered_points[2], centered_points[3])
average_angle = (shoulder_angle + hip_angle) / 2
rotation_matrix = np.array(
[[np.cos(-average_angle), -np.sin(-average_angle)], [np.sin(-average_angle), np.cos(-average_angle)]]
)
rotated_points = np.dot(centered_points, rotation_matrix.T)
max_edge_length = np.max(np.linalg.norm(np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1))
return rotated_points / max_edge_length
def calculate_rotation_angle(point1, point2):
vector = point2 - point1
return np.arctan2(vector[1], vector[0])
def video_to_frames(video_path, output_dir):
os.makedirs(output_dir, exist_ok=True)
video = cv2.VideoCapture(video_path)
if not video.isOpened():
raise IOError(f"Could not open video file: {video_path}")
frame_num = 0
while True:
success, frame = video.read()
if not success:
break
frame_num += 1
cv2.imwrite(os.path.join(output_dir, f"{frame_num}-frame.png"), frame)
video.release()
print(f"All frames saved to {output_dir}")
def preprocess_image(image_path):
transform = transforms.Compose(
[
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]
)
return transform(Image.open(image_path).convert("RGB")).unsqueeze(0)
def process_images(base_dir):
ears_ai = EarsAI()
calc_position = CalcStethoscopePosition()
results_dir = os.path.join(os.path.dirname(base_dir), "results")
csv_path = os.path.join(results_dir, "results.csv")
pose_overlay_dir = os.path.join(results_dir, "pose_overlay_image")
stethoscope_overlay_dir = os.path.join(results_dir, "stethoscope_overlay_image")
os.makedirs(results_dir, exist_ok=True)
os.makedirs(pose_overlay_dir, exist_ok=True)
os.makedirs(stethoscope_overlay_dir, exist_ok=True)
png_files = sorted(
[f for f in os.listdir(base_dir) if f.lower().endswith(".png")],
key=lambda x: int(re.search(r"(\d+)", x).group(1)),
)
rows = []
for image_file_name in png_files:
image_path = os.path.join(base_dir, image_file_name)
frame = cv2.imread(image_path)
if frame is None:
print(f"Failed to load image: {image_path}")
continue
pose_overlay_img, *landmarks = ears_ai.pose_detect(frame, None)
stethoscope_overlay_img, stethoscope_x, stethoscope_y = ears_ai.ssd_detect(frame, None)
cv2.imwrite(os.path.join(pose_overlay_dir, image_file_name), cv2.cvtColor(pose_overlay_img, cv2.COLOR_RGB2BGR))
cv2.imwrite(
os.path.join(stethoscope_overlay_dir, image_file_name),
cv2.cvtColor(stethoscope_overlay_img, cv2.COLOR_RGB2BGR),
)
row = {
"image_file_name": image_file_name,
"left_shoulder_x": landmarks[0][1],
"left_shoulder_y": landmarks[0][0],
"right_shoulder_x": landmarks[1][1],
"right_shoulder_y": landmarks[1][0],
"left_hip_x": landmarks[2][1],
"left_hip_y": landmarks[2][0],
"right_hip_x": landmarks[3][1],
"right_hip_y": landmarks[3][0],
"stethoscope_x": stethoscope_x,
"stethoscope_y": stethoscope_y,
}
rows.append(row)
if rows:
fieldnames = list(rows[0].keys()) + [
"conv_stethoscope_x",
"conv_stethoscope_y",
"Xgboost_stethoscope_x",
"Xgboost_stethoscope_y",
"lightGBM_stethoscope_x",
"lightGBM_stethoscope_y",
]
lgb_model_x = load_model("./models/lgb_stethoscope_calc_x_best_model-Fold4.pkl")
lgb_model_y = load_model("./models/lgb_stethoscope_calc_y_best_model-Fold4.pkl")
xg_model_x = load_model("./models/xg_stethoscope_calc_x_best_model-Fold4.pkl")
xg_model_y = load_model("./models/xg_stethoscope_calc_y_best_model-Fold4.pkl")
with open(csv_path, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
prev_values = {"conv": (180, 180), "lightGBM": (180, 180), "Xgboost": (180, 180)}
for row in rows:
source_points = np.array(
[
[float(row[f"{pos}_x"]), float(row[f"{pos}_y"])]
for pos in ["left_shoulder", "right_shoulder", "left_hip", "right_hip"]
],
dtype=np.float32,
)
stethoscope_point = np.array([float(row["stethoscope_x"]), float(row["stethoscope_y"])])
if stethoscope_point[0] == 0 and stethoscope_point[1] == 0:
for key in prev_values:
row[f"{key}_stethoscope_x"], row[f"{key}_stethoscope_y"] = prev_values[key]
else:
conv_stethoscope = calc_position.calc_affine(source_points, *stethoscope_point)
row["conv_stethoscope_x"], row["conv_stethoscope_y"] = conv_stethoscope
normalized_points = normalize_quadrilateral_with_point(source_points.flatten(), stethoscope_point)
row_convert = {
f"{pos}_{coord}": normalized_points[i, j]
for i, pos in enumerate(
["left_shoulder", "right_shoulder", "left_hip", "right_hip", "stethoscope"]
)
for j, coord in enumerate(["x", "y"])
}
input_data = pd.DataFrame([row_convert])
input_columns = list(row_convert.keys())
for model_name, models in [
("lightGBM", (lgb_model_x, lgb_model_y)),
("Xgboost", (xg_model_x, xg_model_y)),
]:
row[f"{model_name}_stethoscope_x"] = int(models[0].predict(input_data[input_columns])[0])
row[f"{model_name}_stethoscope_y"] = int(models[1].predict(input_data[input_columns])[0])
for key in prev_values:
prev_values[key] = (row[f"{key}_stethoscope_x"], row[f"{key}_stethoscope_y"])
writer.writerow(row)
print(f"Processed and saved results to: {csv_path}")
generate_visualizations(csv_path, base_dir)
else:
print("No data to write to CSV.")
def generate_visualizations(csv_path, original_images_dir):
df = pd.read_csv(csv_path)
body_image = cv2.imread("images/body/BodyF.png")
results_dir = "images/body/results"
os.makedirs(results_dir, exist_ok=True)
dirs = {"marked": "marked_images", "conv": "conv", "Xgboost": "Xgboost", "lightGBM": "lightGBM"}
# Create all necessary directories
for dir_name in dirs.values():
os.makedirs(os.path.join(results_dir, dir_name), exist_ok=True)
os.makedirs(os.path.join(results_dir, f"{dir_name}_with_trajectory"), exist_ok=True)
os.makedirs(os.path.join(results_dir, f"{dir_name}_without_trajectory"), exist_ok=True)
points = {key: [] for key in ["conv", "Xgboost", "lightGBM"]}
colors = {"conv": CONV_COLOR, "Xgboost": XGBOOST_COLOR, "lightGBM": LIGHTGBM_COLOR}
for _, row in df.iterrows():
original_image = cv2.imread(os.path.join(original_images_dir, row["image_file_name"]))
if original_image is None:
print(f"Failed to load image: {os.path.join(original_images_dir, row['image_file_name'])}")
continue
# Draw markers on original image
for point in ["left_shoulder", "right_shoulder", "left_hip", "right_hip", "stethoscope"]:
cv2.circle(original_image, (int(row[f"{point}_x"]), int(row[f"{point}_y"])), 10, (255, 255, 0), -1)
cv2.imwrite(os.path.join(results_dir, dirs["marked"], row["image_file_name"]), original_image)
for key in points:
x, y = int(row[f"{key}_stethoscope_x"]), int(row[f"{key}_stethoscope_y"])
points[key].append((x, y))
# Create image with trajectory
image_with_trajectory = body_image.copy()
if len(points[key]) > 1:
cv2.polylines(image_with_trajectory, [np.array(points[key])], False, colors[key], 2)
cv2.circle(image_with_trajectory, (x, y), 10, colors[key], -1)
cv2.imwrite(
os.path.join(results_dir, f"{dirs[key]}_with_trajectory", row["image_file_name"]), image_with_trajectory
)
# Create image without trajectory
image_without_trajectory = body_image.copy()
cv2.circle(image_without_trajectory, (x, y), 10, colors[key], -1)
cv2.imwrite(
os.path.join(results_dir, f"{dirs[key]}_without_trajectory", row["image_file_name"]),
image_without_trajectory,
)
# Generate videos
for key in dirs:
if key != "marked":
create_video_from_images(
os.path.join(results_dir, f"{dirs[key]}_with_trajectory"),
os.path.join(results_dir, f"{key}_video_with_trajectory.mp4"),
)
create_video_from_images(
os.path.join(results_dir, f"{dirs[key]}_without_trajectory"),
os.path.join(results_dir, f"{key}_video_without_trajectory.mp4"),
)
# Generate video for marked images
create_video_from_images(os.path.join(results_dir, dirs["marked"]), os.path.join(results_dir, "marked_video.mp4"))
def create_video_from_images(image_dir, output_path):
images = sorted(
[img for img in os.listdir(image_dir) if img.endswith(".png")],
key=lambda x: int(re.search(r"(\d+)", x).group()),
)
if not images:
print(f"No images found in {image_dir}")
return
frame = cv2.imread(os.path.join(image_dir, images[0]))
height, width, _ = frame.shape
video = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height))
for image in images:
img = cv2.imread(os.path.join(image_dir, image))
video.write(img)
video.release()
print(f"Created video: {output_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process video and generate results.")
parser.add_argument("--video_path", default="./video/Test3-1.mp4", help="Path to the input video file")
parser.add_argument("--output_dir", default="output", help="Directory to save output images and results")
args = parser.parse_args()
video_to_frames(args.video_path, args.output_dir)
process_images(args.output_dir)