Newer
Older
Demo-Maker / main.py
@mikado-4410 mikado-4410 on 10 Oct 2024 21 KB 最初のコミット
import cv2
import os
import csv
import re
import numpy as np
import pandas as pd
import joblib
import math
import lightgbm
import xgboost
from util.ears_ai import EarsAI
from util.calc_ste_position import CalcStethoscopePosition
import torch
import torch.nn as nn
from torchvision import transforms
from PIL import Image
import argparse
from modules.EARSForDL.model import RegressionResNet  # モデル定義をインポート
import pickle
import matplotlib.pyplot as plt


def normalize_quadrilateral_with_point(points, extra_point):
    all_points = np.vstack([points.reshape(-1, 2), extra_point])
    center = np.mean(points.reshape(-1, 2), axis=0)
    centered_points = all_points - center

    left_shoulder = centered_points[0]
    right_shoulder = centered_points[1]
    shoulder_vector = right_shoulder - left_shoulder
    angle = np.arctan2(shoulder_vector[1], shoulder_vector[0])

    rotation_matrix = np.array([[np.cos(-angle), -np.sin(-angle)], [np.sin(-angle), np.cos(-angle)]])

    rotated_points = np.dot(centered_points, rotation_matrix.T)
    max_edge_length = np.max(np.linalg.norm(np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1))
    normalized_points = rotated_points / max_edge_length

    return normalized_points


def normalize_quadrilateral_with_point_average_rotation(points, extra_point):
    all_points = np.vstack([points.reshape(-1, 2), extra_point])
    center = np.mean(points.reshape(-1, 2), axis=0)
    centered_points = all_points - center

    left_shoulder, right_shoulder, left_hip, right_hip = centered_points[:4]

    shoulder_angle = calculate_rotation_angle(left_shoulder, right_shoulder)
    hip_angle = calculate_rotation_angle(left_hip, right_hip)

    average_angle = (shoulder_angle + hip_angle) / 2

    rotation_matrix = np.array(
        [[np.cos(-average_angle), -np.sin(-average_angle)], [np.sin(-average_angle), np.cos(-average_angle)]]
    )

    rotated_points = np.dot(centered_points, rotation_matrix.T)
    max_edge_length = np.max(np.linalg.norm(np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1))
    normalized_points = rotated_points / max_edge_length

    return normalized_points


def calculate_rotation_angle(point1, point2):
    vector = point2 - point1
    return np.arctan2(vector[1], vector[0])


def video_to_frames(video_path, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    video = cv2.VideoCapture(video_path)

    if not video.isOpened():
        raise IOError(f"動画ファイルを開けませんでした: {video_path}")

    frame_num = 0

    while True:
        success, frame = video.read()
        if not success:
            break

        frame_num += 1
        output_filename = f"{frame_num}-frame.png"
        cv2.imwrite(os.path.join(output_dir, output_filename), frame)

    video.release()
    print(f"全てのフレームを {output_dir} に保存しました。")


def lgb_load_model(model_path):
    with open(model_path, 'rb') as model_file:
        loaded_model = pickle.load(model_file)
    return loaded_model


def xg_load_model(model_path):
    with open(model_path, 'rb') as model_file:
        loaded_model = pickle.load(model_file)
    return loaded_model


def CNN_load_model(model_path, device, resnet_depth=18):
    model = RegressionResNet(resnet_depth)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    return model


def predict(model, data):
    return model.predict(data)


def calculate_distance(point1, point2):
    return math.sqrt((point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2)


def preprocess_image(image_path):
    transform = transforms.Compose(
        [
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    )
    image = Image.open(image_path).convert("RGB")
    return transform(image).unsqueeze(0)


def cnn_predict(model, image_tensor, device):
    with torch.no_grad():
        output = model(image_tensor.to(device))
    return output.cpu().numpy()[0]


def process_images(base_dir, draw_trajectory=True):
    ears_ai = EarsAI()
    calc_position = CalcStethoscopePosition()
    images_dir = base_dir
    results_dir = os.path.join(os.path.dirname(base_dir), "results")
    csv_path = os.path.join(results_dir, "results.csv")
    pose_overlay_dir = os.path.join(results_dir, "pose_overlay_image")
    stethoscope_overlay_dir = os.path.join(results_dir, "stethoscope_overlay_image")

    os.makedirs(results_dir, exist_ok=True)
    os.makedirs(pose_overlay_dir, exist_ok=True)
    os.makedirs(stethoscope_overlay_dir, exist_ok=True)

    png_files = [f for f in os.listdir(images_dir) if f.lower().endswith(".png")]
    png_files.sort(key=lambda x: int(re.search(r"(\d+)", x).group(1)))

    rows = []

    for image_file_name in png_files:
        image_path = os.path.join(images_dir, image_file_name)
        print(f"Processing image: {image_path}")

        frame = cv2.imread(image_path)

        if frame is None:
            print(f"Failed to load image: {image_path}")
            continue

        pose_overlay_img, left_shoulder, right_shoulder, left_hip, right_hip = ears_ai.pose_detect(frame, None)
        stethoscope_overlay_img, stethoscope_x, stethoscope_y = ears_ai.ssd_detect(frame, None)

        pose_overlay_path = os.path.join(pose_overlay_dir, image_file_name)
        cv2.imwrite(pose_overlay_path, cv2.cvtColor(pose_overlay_img, cv2.COLOR_RGB2BGR))
        print(f"Saved pose overlay image: {pose_overlay_path}")

        stethoscope_overlay_path = os.path.join(stethoscope_overlay_dir, image_file_name)
        cv2.imwrite(stethoscope_overlay_path, cv2.cvtColor(stethoscope_overlay_img, cv2.COLOR_RGB2BGR))
        print(f"Saved stethoscope overlay image: {stethoscope_overlay_path}")

        # S5とS6の計算
        S5 = calculate_distance(right_shoulder, left_hip)
        S6 = calculate_distance(left_shoulder, right_hip)

        # S5とS6の比率計算
        S5_standard = 215
        S6_standard = 204
        S5_ratio = S5 / S5_standard
        S6_ratio = S6 / S6_standard
        theta_1 = ((-4.5 * 100 * S5_ratio + 440) + (5.0 * 100 * S6_ratio - 500)) / 2
        theta_2 = ((53 * S5_ratio - 53) + (57 * S6_ratio - 57)) / 2
        x_e = 1.01 * theta_1 + 0.58
        y_e = 0.79 * theta_2 - 0.45

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # モデルの読み込み
        """ cnn_model = CNN_load_model("./models/best_model.pth", device, 18)
        image_tensor = preprocess_image(image_path)
        cnn_prediction = cnn_predict(cnn_model, image_tensor, device)
        cnn_stethoscope_x = int(round(cnn_prediction[0]))
        cnn_stethoscope_y = int(round(cnn_prediction[1])) """

        row = {
            "image_file_name": image_file_name,
            "left_shoulder_x": left_shoulder[1],
            "left_shoulder_y": left_shoulder[0],
            "right_shoulder_x": right_shoulder[1],
            "right_shoulder_y": right_shoulder[0],
            "left_hip_x": left_hip[1],
            "left_hip_y": left_hip[0],
            "right_hip_x": right_hip[1],
            "right_hip_y": right_hip[0],
            "stethoscope_x": stethoscope_x,
            "stethoscope_y": stethoscope_y,
        }
        """ row = {
            "image_file_name": image_file_name,
            "left_shoulder_x": left_shoulder[1],
            "left_shoulder_y": left_shoulder[0],
            "right_shoulder_x": right_shoulder[1],
            "right_shoulder_y": right_shoulder[0],
            "left_hip_x": left_hip[1],
            "left_hip_y": left_hip[0],
            "right_hip_x": right_hip[1],
            "right_hip_y": right_hip[0],
            "stethoscope_x": stethoscope_x,
            "stethoscope_y": stethoscope_y,
            "cnn_stethoscope_x": cnn_stethoscope_x,
            "cnn_stethoscope_y": cnn_stethoscope_y,
        } """
        rows.append(row)

    if rows:
        """ fieldnames = list(rows[0].keys()) + [
            "conv_stethoscope_x",
            "conv_stethoscope_y",
            "formula_stethoscope_x",
            "formula_stethoscope_y",
            "lightGBM_stethoscope_x",
            "lightGBM_stethoscope_y",
        ] """
        fieldnames = list(rows[0].keys()) + [
            "conv_stethoscope_x",
            "conv_stethoscope_y",
            "Xgboost_stethoscope_x",
            "Xgboost_stethoscope_y",
            "lightGBM_stethoscope_x",
            "lightGBM_stethoscope_y",
        ]

        # Load models

        # LightGBMモデルをロード

        lgb_model_x = lgb_load_model("./models/lgb_stethoscope_calc_x_best_model.pkl")
        lgb_model_y = lgb_load_model("./models/lgb_stethoscope_calc_y_best_model.pkl")

        # XGBoostをロード
        xg_model_x = xg_load_model("./models/xg_stethoscope_calc_x_best_model.pkl")
        xg_model_y = xg_load_model("./models/xg_stethoscope_calc_y_best_model.pkl")

        with open(csv_path, "w", newline="") as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

            # 変数の初期化
            pre_conv_stethoscope_x = pre_conv_stethoscope_y = 180
            # pre_formula_stethoscope_x = pre_formula_stethoscope_y = 180
            pre_lightGBM_stethoscope_x = pre_lightGBM_stethoscope_y = 180
            pre_xgboost_stethoscope_x = pre_xgboost_stethoscope_y = 180
            if stethoscope_x == 0 and stethoscope_y == 0:
                    row["stethoscope_x"] = 320
                    row["stethoscope_x"] = 240

            for row in rows:
                source_points = np.array(
                    [
                        [float(row["left_shoulder_x"]), float(row["left_shoulder_y"])],
                        [float(row["right_shoulder_x"]), float(row["right_shoulder_y"])],
                        [float(row["left_hip_x"]), float(row["left_hip_y"])],
                        [float(row["right_hip_x"]), float(row["right_hip_y"])],
                    ],
                    dtype=np.float32,
                )
                stethoscope_x = float(row["stethoscope_x"])
                stethoscope_y = float(row["stethoscope_y"])
                quadrilateral_points = np.array(
                    [
                        float(row["left_shoulder_x"]),
                        float(row["left_shoulder_y"]),
                        float(row["right_shoulder_x"]),
                        float(row["right_shoulder_y"]),
                        float(row["left_hip_x"]),
                        float(row["left_hip_y"]),
                        float(row["right_hip_x"]),
                        float(row["right_hip_y"]),
                    ]
                )
                stethoscope_point = np.array([float(row["stethoscope_x"]), float(row["stethoscope_y"])])

                normalized_points = normalize_quadrilateral_with_point_average_rotation(
                    quadrilateral_points, stethoscope_point
                )

                if stethoscope_x == 0 and stethoscope_y == 0:
                    row["conv_stethoscope_x"] = pre_conv_stethoscope_x
                    row["conv_stethoscope_y"] = pre_conv_stethoscope_y
                    # row["formula_stethoscope_x"] = pre_formula_stethoscope_x
                    # row["formula_stethoscope_y"] = pre_formula_stethoscope_y
                    row["lightGBM_stethoscope_x"] = pre_lightGBM_stethoscope_x
                    row["lightGBM_stethoscope_y"] = pre_lightGBM_stethoscope_y
                    row["Xgboost_stethoscope_x"] = pre_xgboost_stethoscope_x
                    row["Xgboost_stethoscope_y"] = pre_xgboost_stethoscope_y
                else:
                    conv_stethoscope = calc_position.calc_affine(source_points, stethoscope_x, stethoscope_y)
                    row["conv_stethoscope_x"], row["conv_stethoscope_y"] = conv_stethoscope

                    """ row["formula_stethoscope_x"], row["formula_stethoscope_y"] = int(conv_stethoscope[0] - x_e), int(
                        conv_stethoscope[1] - y_e
                    ) """
                    row_convert = {
                        "left_shoulder_x": normalized_points[0, 0],
                        "left_shoulder_y": normalized_points[0, 1],
                        "right_shoulder_x": normalized_points[1, 0],
                        "right_shoulder_y": normalized_points[1, 1],
                        "left_hip_x": normalized_points[2, 0],
                        "left_hip_y": normalized_points[2, 1],
                        "right_hip_x": normalized_points[3, 0],
                        "right_hip_y": normalized_points[3, 1],
                        "stethoscope_x": normalized_points[4, 0],
                        "stethoscope_y": normalized_points[4, 1],
                    }
                    # 各点をプロット
                    plt.scatter(row_convert['left_shoulder_x'], row_convert['left_shoulder_y'], color='blue', s=100, label='Left Shoulder')
                    plt.scatter(row_convert['right_shoulder_x'], row_convert['right_shoulder_y'], color='blue', s=100, label='Right Shoulder')
                    plt.scatter(row_convert['left_hip_x'], row_convert['left_hip_y'], color='green', s=100, label='Left Hip')
                    plt.scatter(row_convert['right_hip_x'], row_convert['right_hip_y'], color='green', s=100, label='Right Hip')
                    plt.scatter(row_convert['stethoscope_x'], row_convert['stethoscope_y'], color='red', s=100, label='Stethoscope')
                    # Machine Learning prediction
                    input_columns = [
                        "left_shoulder_x",
                        "left_shoulder_y",
                        "right_shoulder_x",
                        "right_shoulder_y",
                        "left_hip_x",
                        "left_hip_y",
                        "right_hip_x",
                        "right_hip_y",
                        "stethoscope_x",
                        "stethoscope_y",
                    ]
                    input_data = pd.DataFrame([row_convert])
                    row["lightGBM_stethoscope_x"] = int(predict(lgb_model_x, input_data[input_columns])[0])
                    row["lightGBM_stethoscope_y"] = int(predict(lgb_model_y, input_data[input_columns])[0])
                    row["Xgboost_stethoscope_x"] = int(predict(xg_model_x, input_data[input_columns])[0])
                    row["Xgboost_stethoscope_y"] = int(predict(xg_model_y, input_data[input_columns])[0])

                    pre_conv_stethoscope_x = row["conv_stethoscope_x"]
                    pre_conv_stethoscope_y = row["conv_stethoscope_y"]
                    # pre_formula_stethoscope_x = row["formula_stethoscope_x"]
                    # pre_formula_stethoscope_y = row["formula_stethoscope_y"]
                    pre_lightGBM_stethoscope_x = row["lightGBM_stethoscope_x"]
                    pre_lightGBM_stethoscope_y = row["lightGBM_stethoscope_y"]
                    pre_xgboost_stethoscope_x = row["Xgboost_stethoscope_x"]
                    pre_xgboost_stethoscope_y = row["Xgboost_stethoscope_y"]

                writer.writerow(row)

        print(f"Processed and saved results to: {csv_path}")

        # Add these new variables at the beginning of your script
        conv_points = []
        cnn_points = []
        lgbm_points = []

        # 新しい処理を追加
        df = pd.read_csv(csv_path)
        original_images_dir = base_dir
        body_image = cv2.imread("images/body/BodyF.png")
        results_dir = "images/body/results"
        os.makedirs(results_dir, exist_ok=True)
        marked_images_dir = os.path.join(results_dir, "marked_images")
        os.makedirs(marked_images_dir, exist_ok=True)

        # 新しいディレクトリを作成
        conv_dir = os.path.join(results_dir, "conv")
        cnn_dir = os.path.join(results_dir, "cnn")
        lgbm_dir = os.path.join(results_dir, "lgbm")
        os.makedirs(conv_dir, exist_ok=True)
        os.makedirs(cnn_dir, exist_ok=True)
        os.makedirs(lgbm_dir, exist_ok=True)

        for _, row in df.iterrows():
            image_file_name = row["image_file_name"]
            conv_x, conv_y = int(row["conv_stethoscope_x"]), int(row["conv_stethoscope_y"])
            cnn_x, cnn_y = int(row["Xgboost_stethoscope_x"]), int(row["Xgboost_stethoscope_x"])
            lgbm_x, lgbm_y = int(row["lightGBM_stethoscope_x"]), int(row["lightGBM_stethoscope_y"])

            original_image_path = os.path.join(original_images_dir, image_file_name)
            original_image = cv2.imread(original_image_path)

            if original_image is None:
                print(f"Failed to load image: {original_image_path}")
                continue

            # Draw markers in cyan color (BGR: 255, 255, 0)
            cv2.circle(
                original_image, (int(row["left_shoulder_x"]), int(row["left_shoulder_y"])), 10, (255, 255, 0), -1
            )
            cv2.circle(
                original_image, (int(row["right_shoulder_x"]), int(row["right_shoulder_y"])), 10, (255, 255, 0), -1
            )
            cv2.circle(original_image, (int(row["left_hip_x"]), int(row["left_hip_y"])), 10, (255, 255, 0), -1)
            cv2.circle(original_image, (int(row["right_hip_x"]), int(row["right_hip_y"])), 10, (255, 255, 0), -1)
            cv2.circle(original_image, (int(row["stethoscope_x"]), int(row["stethoscope_y"])), 10, (255, 255, 0), -1)

            # Save marked image
            marked_image_path = os.path.join(marked_images_dir, image_file_name)
            cv2.imwrite(marked_image_path, original_image)

            conv_points.append((conv_x, conv_y))
            cnn_points.append((cnn_x, cnn_y))
            lgbm_points.append((lgbm_x, lgbm_y))

            # Conv画像を生成
            conv_image = body_image.copy()
            if draw_trajectory and len(conv_points) > 1:
                cv2.polylines(conv_image, [np.array(conv_points)], False, (0, 255, 0), 2)
            cv2.circle(conv_image, (conv_x, conv_y), 10, (0, 255, 0), -1)
            cv2.imwrite(os.path.join(conv_dir, image_file_name), conv_image)

            # CNN画像を生成
            cnn_image = body_image.copy()
            if draw_trajectory and len(cnn_points) > 1:
                cv2.polylines(cnn_image, [np.array(cnn_points)], False, (255, 0, 0), 2)
            cv2.circle(cnn_image, (cnn_x, cnn_y), 10, (255, 0, 0), -1)
            cv2.imwrite(os.path.join(cnn_dir, image_file_name), cnn_image)

            # LightGBM画像を生成
            lgbm_image = body_image.copy()
            if draw_trajectory and len(lgbm_points) > 1:
                cv2.polylines(lgbm_image, [np.array(lgbm_points)], False, (0, 0, 255), 2)
            cv2.circle(lgbm_image, (lgbm_x, lgbm_y), 10, (0, 0, 255), -1)
            cv2.imwrite(os.path.join(lgbm_dir, image_file_name), lgbm_image)

        # 動画を生成
        create_video_from_images(conv_dir, os.path.join(results_dir, "conv_video_with_trajectory.mp4"))
        create_video_from_images(cnn_dir, os.path.join(results_dir, "cnn_video_with_trajectory.mp4"))
        create_video_from_images(lgbm_dir, os.path.join(results_dir, "lgbm_video_with_trajectory.mp4"))

        # 軌跡なしの動画を生成
        create_video_from_images(conv_dir, os.path.join(results_dir, "conv_video_without_trajectory.mp4"), False)
        create_video_from_images(cnn_dir, os.path.join(results_dir, "cnn_video_without_trajectory.mp4"), False)
        create_video_from_images(lgbm_dir, os.path.join(results_dir, "lgbm_video_without_trajectory.mp4"), False)

        # Create video from marked images
        create_video_from_images(marked_images_dir, os.path.join(results_dir, "marked_video.mp4"))

    else:
        print("No data to write to CSV.")


def create_video_from_images(image_dir, output_path, with_trajectory=True):
    images = [img for img in os.listdir(image_dir) if img.endswith(".png")]
    images.sort(key=lambda x: int(re.search(r"(\d+)", x).group()))

    if images:
        frame = cv2.imread(os.path.join(image_dir, images[0]))
        height, width, layers = frame.shape

        video = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height))

        for image in images:
            img = cv2.imread(os.path.join(image_dir, image))
            if not with_trajectory:
                # 軌跡を消去(背景画像で上書き)
                background = cv2.imread("images/body/BodyF.png")
                mask = cv2.threshold(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), 10, 255, cv2.THRESH_BINARY)[1]
                img = cv2.bitwise_and(img, img, mask=mask)
                background = cv2.bitwise_and(background, background, mask=cv2.bitwise_not(mask))
                img = cv2.add(img, background)
            video.write(img)

        cv2.destroyAllWindows()
        video.release()

        print(f"Created video: {output_path}")
    else:
        print(f"No images found in {image_dir}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Process video and generate results.")
    parser.add_argument("--video_path", default="./video/Test3-1.mp4", help="Path to the input video file")
    parser.add_argument("--output_dir", default="images", help="Directory to save output images and results")
    parser.add_argument("--draw_trajectory", action="store_true", help="Draw trajectory in the output video")

    args = parser.parse_args()

    # Step 1: Convert video to frames
    video_to_frames(args.video_path, args.output_dir)

    # Step 2: Process the generated images
    process_images(args.output_dir, args.draw_trajectory)