diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4856cc1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,178 @@ +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +### Python Patch ### +# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration +poetry.toml + +# ruff +.ruff_cache/ + +# LSP config files +pyrightconfig.json + +# Additional item +_models/ +images/ +results/ +video/ +models/ \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..db235f3 --- /dev/null +++ b/main.py @@ -0,0 +1,502 @@ +import cv2 +import os +import csv +import re +import numpy as np +import pandas as pd +import joblib +import math +import lightgbm +import xgboost +from util.ears_ai import EarsAI +from util.calc_ste_position import CalcStethoscopePosition +import torch +import torch.nn as nn +from torchvision import transforms +from PIL import Image +import argparse +from modules.EARSForDL.model import RegressionResNet # モデル定義をインポート +import pickle +import matplotlib.pyplot as plt + + +def normalize_quadrilateral_with_point(points, extra_point): + all_points = np.vstack([points.reshape(-1, 2), extra_point]) + center = np.mean(points.reshape(-1, 2), axis=0) + centered_points = all_points - center + + left_shoulder = centered_points[0] + right_shoulder = centered_points[1] + shoulder_vector = right_shoulder - left_shoulder + angle = np.arctan2(shoulder_vector[1], shoulder_vector[0]) + + rotation_matrix = np.array([[np.cos(-angle), -np.sin(-angle)], [np.sin(-angle), np.cos(-angle)]]) + + rotated_points = np.dot(centered_points, rotation_matrix.T) + max_edge_length = np.max(np.linalg.norm(np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1)) + normalized_points = rotated_points / max_edge_length + + return normalized_points + + +def normalize_quadrilateral_with_point_average_rotation(points, extra_point): + all_points = np.vstack([points.reshape(-1, 2), extra_point]) + center = np.mean(points.reshape(-1, 2), axis=0) + centered_points = all_points - center + + left_shoulder, right_shoulder, left_hip, right_hip = centered_points[:4] + + shoulder_angle = calculate_rotation_angle(left_shoulder, right_shoulder) + hip_angle = calculate_rotation_angle(left_hip, right_hip) + + average_angle = (shoulder_angle + hip_angle) / 2 + + rotation_matrix = np.array( + [[np.cos(-average_angle), -np.sin(-average_angle)], [np.sin(-average_angle), np.cos(-average_angle)]] + ) + + rotated_points = np.dot(centered_points, rotation_matrix.T) + max_edge_length = np.max(np.linalg.norm(np.roll(rotated_points[:4], -1, axis=0) - rotated_points[:4], axis=1)) + normalized_points = rotated_points / max_edge_length + + return normalized_points + + +def calculate_rotation_angle(point1, point2): + vector = point2 - point1 + return np.arctan2(vector[1], vector[0]) + + +def video_to_frames(video_path, output_dir): + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + video = cv2.VideoCapture(video_path) + + if not video.isOpened(): + raise IOError(f"動画ファイルを開けませんでした: {video_path}") + + frame_num = 0 + + while True: + success, frame = video.read() + if not success: + break + + frame_num += 1 + output_filename = f"{frame_num}-frame.png" + cv2.imwrite(os.path.join(output_dir, output_filename), frame) + + video.release() + print(f"全てのフレームを {output_dir} に保存しました。") + + +def lgb_load_model(model_path): + with open(model_path, 'rb') as model_file: + loaded_model = pickle.load(model_file) + return loaded_model + + +def xg_load_model(model_path): + with open(model_path, 'rb') as model_file: + loaded_model = pickle.load(model_file) + return loaded_model + + +def CNN_load_model(model_path, device, resnet_depth=18): + model = RegressionResNet(resnet_depth) + model.load_state_dict(torch.load(model_path, map_location=device)) + model.to(device) + model.eval() + return model + + +def predict(model, data): + return model.predict(data) + + +def calculate_distance(point1, point2): + return math.sqrt((point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2) + + +def preprocess_image(image_path): + transform = transforms.Compose( + [ + transforms.Resize((224, 224)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), + ] + ) + image = Image.open(image_path).convert("RGB") + return transform(image).unsqueeze(0) + + +def cnn_predict(model, image_tensor, device): + with torch.no_grad(): + output = model(image_tensor.to(device)) + return output.cpu().numpy()[0] + + +def process_images(base_dir, draw_trajectory=True): + ears_ai = EarsAI() + calc_position = CalcStethoscopePosition() + images_dir = base_dir + results_dir = os.path.join(os.path.dirname(base_dir), "results") + csv_path = os.path.join(results_dir, "results.csv") + pose_overlay_dir = os.path.join(results_dir, "pose_overlay_image") + stethoscope_overlay_dir = os.path.join(results_dir, "stethoscope_overlay_image") + + os.makedirs(results_dir, exist_ok=True) + os.makedirs(pose_overlay_dir, exist_ok=True) + os.makedirs(stethoscope_overlay_dir, exist_ok=True) + + png_files = [f for f in os.listdir(images_dir) if f.lower().endswith(".png")] + png_files.sort(key=lambda x: int(re.search(r"(\d+)", x).group(1))) + + rows = [] + + for image_file_name in png_files: + image_path = os.path.join(images_dir, image_file_name) + print(f"Processing image: {image_path}") + + frame = cv2.imread(image_path) + + if frame is None: + print(f"Failed to load image: {image_path}") + continue + + pose_overlay_img, left_shoulder, right_shoulder, left_hip, right_hip = ears_ai.pose_detect(frame, None) + stethoscope_overlay_img, stethoscope_x, stethoscope_y = ears_ai.ssd_detect(frame, None) + + pose_overlay_path = os.path.join(pose_overlay_dir, image_file_name) + cv2.imwrite(pose_overlay_path, cv2.cvtColor(pose_overlay_img, cv2.COLOR_RGB2BGR)) + print(f"Saved pose overlay image: {pose_overlay_path}") + + stethoscope_overlay_path = os.path.join(stethoscope_overlay_dir, image_file_name) + cv2.imwrite(stethoscope_overlay_path, cv2.cvtColor(stethoscope_overlay_img, cv2.COLOR_RGB2BGR)) + print(f"Saved stethoscope overlay image: {stethoscope_overlay_path}") + + # S5とS6の計算 + S5 = calculate_distance(right_shoulder, left_hip) + S6 = calculate_distance(left_shoulder, right_hip) + + # S5とS6の比率計算 + S5_standard = 215 + S6_standard = 204 + S5_ratio = S5 / S5_standard + S6_ratio = S6 / S6_standard + theta_1 = ((-4.5 * 100 * S5_ratio + 440) + (5.0 * 100 * S6_ratio - 500)) / 2 + theta_2 = ((53 * S5_ratio - 53) + (57 * S6_ratio - 57)) / 2 + x_e = 1.01 * theta_1 + 0.58 + y_e = 0.79 * theta_2 - 0.45 + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + # モデルの読み込み + """ cnn_model = CNN_load_model("./models/best_model.pth", device, 18) + image_tensor = preprocess_image(image_path) + cnn_prediction = cnn_predict(cnn_model, image_tensor, device) + cnn_stethoscope_x = int(round(cnn_prediction[0])) + cnn_stethoscope_y = int(round(cnn_prediction[1])) """ + + row = { + "image_file_name": image_file_name, + "left_shoulder_x": left_shoulder[1], + "left_shoulder_y": left_shoulder[0], + "right_shoulder_x": right_shoulder[1], + "right_shoulder_y": right_shoulder[0], + "left_hip_x": left_hip[1], + "left_hip_y": left_hip[0], + "right_hip_x": right_hip[1], + "right_hip_y": right_hip[0], + "stethoscope_x": stethoscope_x, + "stethoscope_y": stethoscope_y, + } + """ row = { + "image_file_name": image_file_name, + "left_shoulder_x": left_shoulder[1], + "left_shoulder_y": left_shoulder[0], + "right_shoulder_x": right_shoulder[1], + "right_shoulder_y": right_shoulder[0], + "left_hip_x": left_hip[1], + "left_hip_y": left_hip[0], + "right_hip_x": right_hip[1], + "right_hip_y": right_hip[0], + "stethoscope_x": stethoscope_x, + "stethoscope_y": stethoscope_y, + "cnn_stethoscope_x": cnn_stethoscope_x, + "cnn_stethoscope_y": cnn_stethoscope_y, + } """ + rows.append(row) + + if rows: + """ fieldnames = list(rows[0].keys()) + [ + "conv_stethoscope_x", + "conv_stethoscope_y", + "formula_stethoscope_x", + "formula_stethoscope_y", + "lightGBM_stethoscope_x", + "lightGBM_stethoscope_y", + ] """ + fieldnames = list(rows[0].keys()) + [ + "conv_stethoscope_x", + "conv_stethoscope_y", + "Xgboost_stethoscope_x", + "Xgboost_stethoscope_y", + "lightGBM_stethoscope_x", + "lightGBM_stethoscope_y", + ] + + # Load models + + # LightGBMモデルをロード + + lgb_model_x = lgb_load_model("./models/lgb_stethoscope_calc_x_best_model.pkl") + lgb_model_y = lgb_load_model("./models/lgb_stethoscope_calc_y_best_model.pkl") + + # XGBoostをロード + xg_model_x = xg_load_model("./models/xg_stethoscope_calc_x_best_model.pkl") + xg_model_y = xg_load_model("./models/xg_stethoscope_calc_y_best_model.pkl") + + with open(csv_path, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + # 変数の初期化 + pre_conv_stethoscope_x = pre_conv_stethoscope_y = 180 + # pre_formula_stethoscope_x = pre_formula_stethoscope_y = 180 + pre_lightGBM_stethoscope_x = pre_lightGBM_stethoscope_y = 180 + pre_xgboost_stethoscope_x = pre_xgboost_stethoscope_y = 180 + if stethoscope_x == 0 and stethoscope_y == 0: + row["stethoscope_x"] = 320 + row["stethoscope_x"] = 240 + + for row in rows: + source_points = np.array( + [ + [float(row["left_shoulder_x"]), float(row["left_shoulder_y"])], + [float(row["right_shoulder_x"]), float(row["right_shoulder_y"])], + [float(row["left_hip_x"]), float(row["left_hip_y"])], + [float(row["right_hip_x"]), float(row["right_hip_y"])], + ], + dtype=np.float32, + ) + stethoscope_x = float(row["stethoscope_x"]) + stethoscope_y = float(row["stethoscope_y"]) + quadrilateral_points = np.array( + [ + float(row["left_shoulder_x"]), + float(row["left_shoulder_y"]), + float(row["right_shoulder_x"]), + float(row["right_shoulder_y"]), + float(row["left_hip_x"]), + float(row["left_hip_y"]), + float(row["right_hip_x"]), + float(row["right_hip_y"]), + ] + ) + stethoscope_point = np.array([float(row["stethoscope_x"]), float(row["stethoscope_y"])]) + + normalized_points = normalize_quadrilateral_with_point_average_rotation( + quadrilateral_points, stethoscope_point + ) + + if stethoscope_x == 0 and stethoscope_y == 0: + row["conv_stethoscope_x"] = pre_conv_stethoscope_x + row["conv_stethoscope_y"] = pre_conv_stethoscope_y + # row["formula_stethoscope_x"] = pre_formula_stethoscope_x + # row["formula_stethoscope_y"] = pre_formula_stethoscope_y + row["lightGBM_stethoscope_x"] = pre_lightGBM_stethoscope_x + row["lightGBM_stethoscope_y"] = pre_lightGBM_stethoscope_y + row["Xgboost_stethoscope_x"] = pre_xgboost_stethoscope_x + row["Xgboost_stethoscope_y"] = pre_xgboost_stethoscope_y + else: + conv_stethoscope = calc_position.calc_affine(source_points, stethoscope_x, stethoscope_y) + row["conv_stethoscope_x"], row["conv_stethoscope_y"] = conv_stethoscope + + """ row["formula_stethoscope_x"], row["formula_stethoscope_y"] = int(conv_stethoscope[0] - x_e), int( + conv_stethoscope[1] - y_e + ) """ + row_convert = { + "left_shoulder_x": normalized_points[0, 0], + "left_shoulder_y": normalized_points[0, 1], + "right_shoulder_x": normalized_points[1, 0], + "right_shoulder_y": normalized_points[1, 1], + "left_hip_x": normalized_points[2, 0], + "left_hip_y": normalized_points[2, 1], + "right_hip_x": normalized_points[3, 0], + "right_hip_y": normalized_points[3, 1], + "stethoscope_x": normalized_points[4, 0], + "stethoscope_y": normalized_points[4, 1], + } + # 各点をプロット + plt.scatter(row_convert['left_shoulder_x'], row_convert['left_shoulder_y'], color='blue', s=100, label='Left Shoulder') + plt.scatter(row_convert['right_shoulder_x'], row_convert['right_shoulder_y'], color='blue', s=100, label='Right Shoulder') + plt.scatter(row_convert['left_hip_x'], row_convert['left_hip_y'], color='green', s=100, label='Left Hip') + plt.scatter(row_convert['right_hip_x'], row_convert['right_hip_y'], color='green', s=100, label='Right Hip') + plt.scatter(row_convert['stethoscope_x'], row_convert['stethoscope_y'], color='red', s=100, label='Stethoscope') + # Machine Learning prediction + input_columns = [ + "left_shoulder_x", + "left_shoulder_y", + "right_shoulder_x", + "right_shoulder_y", + "left_hip_x", + "left_hip_y", + "right_hip_x", + "right_hip_y", + "stethoscope_x", + "stethoscope_y", + ] + input_data = pd.DataFrame([row_convert]) + row["lightGBM_stethoscope_x"] = int(predict(lgb_model_x, input_data[input_columns])[0]) + row["lightGBM_stethoscope_y"] = int(predict(lgb_model_y, input_data[input_columns])[0]) + row["Xgboost_stethoscope_x"] = int(predict(xg_model_x, input_data[input_columns])[0]) + row["Xgboost_stethoscope_y"] = int(predict(xg_model_y, input_data[input_columns])[0]) + + pre_conv_stethoscope_x = row["conv_stethoscope_x"] + pre_conv_stethoscope_y = row["conv_stethoscope_y"] + # pre_formula_stethoscope_x = row["formula_stethoscope_x"] + # pre_formula_stethoscope_y = row["formula_stethoscope_y"] + pre_lightGBM_stethoscope_x = row["lightGBM_stethoscope_x"] + pre_lightGBM_stethoscope_y = row["lightGBM_stethoscope_y"] + pre_xgboost_stethoscope_x = row["Xgboost_stethoscope_x"] + pre_xgboost_stethoscope_y = row["Xgboost_stethoscope_y"] + + writer.writerow(row) + + print(f"Processed and saved results to: {csv_path}") + + # Add these new variables at the beginning of your script + conv_points = [] + cnn_points = [] + lgbm_points = [] + + # 新しい処理を追加 + df = pd.read_csv(csv_path) + original_images_dir = base_dir + body_image = cv2.imread("images/body/BodyF.png") + results_dir = "images/body/results" + os.makedirs(results_dir, exist_ok=True) + marked_images_dir = os.path.join(results_dir, "marked_images") + os.makedirs(marked_images_dir, exist_ok=True) + + # 新しいディレクトリを作成 + conv_dir = os.path.join(results_dir, "conv") + cnn_dir = os.path.join(results_dir, "cnn") + lgbm_dir = os.path.join(results_dir, "lgbm") + os.makedirs(conv_dir, exist_ok=True) + os.makedirs(cnn_dir, exist_ok=True) + os.makedirs(lgbm_dir, exist_ok=True) + + for _, row in df.iterrows(): + image_file_name = row["image_file_name"] + conv_x, conv_y = int(row["conv_stethoscope_x"]), int(row["conv_stethoscope_y"]) + cnn_x, cnn_y = int(row["Xgboost_stethoscope_x"]), int(row["Xgboost_stethoscope_x"]) + lgbm_x, lgbm_y = int(row["lightGBM_stethoscope_x"]), int(row["lightGBM_stethoscope_y"]) + + original_image_path = os.path.join(original_images_dir, image_file_name) + original_image = cv2.imread(original_image_path) + + if original_image is None: + print(f"Failed to load image: {original_image_path}") + continue + + # Draw markers in cyan color (BGR: 255, 255, 0) + cv2.circle( + original_image, (int(row["left_shoulder_x"]), int(row["left_shoulder_y"])), 10, (255, 255, 0), -1 + ) + cv2.circle( + original_image, (int(row["right_shoulder_x"]), int(row["right_shoulder_y"])), 10, (255, 255, 0), -1 + ) + cv2.circle(original_image, (int(row["left_hip_x"]), int(row["left_hip_y"])), 10, (255, 255, 0), -1) + cv2.circle(original_image, (int(row["right_hip_x"]), int(row["right_hip_y"])), 10, (255, 255, 0), -1) + cv2.circle(original_image, (int(row["stethoscope_x"]), int(row["stethoscope_y"])), 10, (255, 255, 0), -1) + + # Save marked image + marked_image_path = os.path.join(marked_images_dir, image_file_name) + cv2.imwrite(marked_image_path, original_image) + + conv_points.append((conv_x, conv_y)) + cnn_points.append((cnn_x, cnn_y)) + lgbm_points.append((lgbm_x, lgbm_y)) + + # Conv画像を生成 + conv_image = body_image.copy() + if draw_trajectory and len(conv_points) > 1: + cv2.polylines(conv_image, [np.array(conv_points)], False, (0, 255, 0), 2) + cv2.circle(conv_image, (conv_x, conv_y), 10, (0, 255, 0), -1) + cv2.imwrite(os.path.join(conv_dir, image_file_name), conv_image) + + # CNN画像を生成 + cnn_image = body_image.copy() + if draw_trajectory and len(cnn_points) > 1: + cv2.polylines(cnn_image, [np.array(cnn_points)], False, (255, 0, 0), 2) + cv2.circle(cnn_image, (cnn_x, cnn_y), 10, (255, 0, 0), -1) + cv2.imwrite(os.path.join(cnn_dir, image_file_name), cnn_image) + + # LightGBM画像を生成 + lgbm_image = body_image.copy() + if draw_trajectory and len(lgbm_points) > 1: + cv2.polylines(lgbm_image, [np.array(lgbm_points)], False, (0, 0, 255), 2) + cv2.circle(lgbm_image, (lgbm_x, lgbm_y), 10, (0, 0, 255), -1) + cv2.imwrite(os.path.join(lgbm_dir, image_file_name), lgbm_image) + + # 動画を生成 + create_video_from_images(conv_dir, os.path.join(results_dir, "conv_video_with_trajectory.mp4")) + create_video_from_images(cnn_dir, os.path.join(results_dir, "cnn_video_with_trajectory.mp4")) + create_video_from_images(lgbm_dir, os.path.join(results_dir, "lgbm_video_with_trajectory.mp4")) + + # 軌跡なしの動画を生成 + create_video_from_images(conv_dir, os.path.join(results_dir, "conv_video_without_trajectory.mp4"), False) + create_video_from_images(cnn_dir, os.path.join(results_dir, "cnn_video_without_trajectory.mp4"), False) + create_video_from_images(lgbm_dir, os.path.join(results_dir, "lgbm_video_without_trajectory.mp4"), False) + + # Create video from marked images + create_video_from_images(marked_images_dir, os.path.join(results_dir, "marked_video.mp4")) + + else: + print("No data to write to CSV.") + + +def create_video_from_images(image_dir, output_path, with_trajectory=True): + images = [img for img in os.listdir(image_dir) if img.endswith(".png")] + images.sort(key=lambda x: int(re.search(r"(\d+)", x).group())) + + if images: + frame = cv2.imread(os.path.join(image_dir, images[0])) + height, width, layers = frame.shape + + video = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 30, (width, height)) + + for image in images: + img = cv2.imread(os.path.join(image_dir, image)) + if not with_trajectory: + # 軌跡を消去(背景画像で上書き) + background = cv2.imread("images/body/BodyF.png") + mask = cv2.threshold(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), 10, 255, cv2.THRESH_BINARY)[1] + img = cv2.bitwise_and(img, img, mask=mask) + background = cv2.bitwise_and(background, background, mask=cv2.bitwise_not(mask)) + img = cv2.add(img, background) + video.write(img) + + cv2.destroyAllWindows() + video.release() + + print(f"Created video: {output_path}") + else: + print(f"No images found in {image_dir}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Process video and generate results.") + parser.add_argument("--video_path", default="./video/Test3-1.mp4", help="Path to the input video file") + parser.add_argument("--output_dir", default="images", help="Directory to save output images and results") + parser.add_argument("--draw_trajectory", action="store_true", help="Draw trajectory in the output video") + + args = parser.parse_args() + + # Step 1: Convert video to frames + video_to_frames(args.video_path, args.output_dir) + + # Step 2: Process the generated images + process_images(args.output_dir, args.draw_trajectory) diff --git a/modules/EARSForDL/model.py b/modules/EARSForDL/model.py new file mode 100644 index 0000000..aee252e --- /dev/null +++ b/modules/EARSForDL/model.py @@ -0,0 +1,34 @@ +import torch +import torch.nn as nn +import torchvision.models as models +from torchvision.models import ( + ResNet18_Weights, + ResNet34_Weights, + ResNet50_Weights, + ResNet101_Weights, + ResNet152_Weights, +) + + +class RegressionResNet(nn.Module): + def __init__(self, resnet_depth): + super(RegressionResNet, self).__init__() + if resnet_depth == 18: + self.model = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1) + elif resnet_depth == 34: + self.model = models.resnet34(weights=ResNet34_Weights.IMAGENET1K_V1) + elif resnet_depth == 50: + self.model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1) + elif resnet_depth == 101: + self.model = models.resnet101(weights=ResNet101_Weights.IMAGENET1K_V1) + elif resnet_depth == 152: + self.model = models.resnet152(weights=ResNet152_Weights.IMAGENET1K_V1) + else: + raise ValueError("Invalid ResNet depth. Choose from 18, 34, 50, 101, 152.") + + # Modify the final fully connected layer + num_features = self.model.fc.in_features + self.model.fc = nn.Linear(num_features, 2) + + def forward(self, x): + return self.model(x) diff --git a/modules/PytorchSSD/nn/alexnet.py b/modules/PytorchSSD/nn/alexnet.py new file mode 100644 index 0000000..c7b6956 --- /dev/null +++ b/modules/PytorchSSD/nn/alexnet.py @@ -0,0 +1,60 @@ +import torch.nn as nn +import torch.utils.model_zoo as model_zoo + +# copied from torchvision (https://github.com/pytorch/vision/blob/master/torchvision/models/alexnet.py). +# The forward function is modified for model pruning. + +__all__ = ["AlexNet", "alexnet"] + + +model_urls = { + "alexnet": "https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth", +} + + +class AlexNet(nn.Module): + def __init__(self, num_classes=1000): + super(AlexNet, self).__init__() + self.features = nn.Sequential( + nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2), + nn.ReLU(inplace=True), + nn.MaxPool2d(kernel_size=3, stride=2), + nn.Conv2d(64, 192, kernel_size=5, padding=2), + nn.ReLU(inplace=True), + nn.MaxPool2d(kernel_size=3, stride=2), + nn.Conv2d(192, 384, kernel_size=3, padding=1), + nn.ReLU(inplace=True), + nn.Conv2d(384, 256, kernel_size=3, padding=1), + nn.ReLU(inplace=True), + nn.Conv2d(256, 256, kernel_size=3, padding=1), + nn.ReLU(inplace=True), + nn.MaxPool2d(kernel_size=3, stride=2), + ) + self.classifier = nn.Sequential( + nn.Dropout(), + nn.Linear(256 * 6 * 6, 4096), + nn.ReLU(inplace=True), + nn.Dropout(), + nn.Linear(4096, 4096), + nn.ReLU(inplace=True), + nn.Linear(4096, num_classes), + ) + + def forward(self, x): + x = self.features(x) + x = x.view(x.size(0), -1) + x = self.classifier(x) + return x + + +def alexnet(pretrained=False, **kwargs): + r"""AlexNet model architecture from the + `"One weird trick..." `_ paper. + + Args: + pretrained (bool): If True, returns a model pre-trained on ImageNet + """ + model = AlexNet(**kwargs) + if pretrained: + model.load_state_dict(model_zoo.load_url(model_urls["alexnet"])) + return model diff --git a/modules/PytorchSSD/nn/mobilenet.py b/modules/PytorchSSD/nn/mobilenet.py new file mode 100644 index 0000000..e122fdd --- /dev/null +++ b/modules/PytorchSSD/nn/mobilenet.py @@ -0,0 +1,49 @@ +# borrowed from "https://github.com/marvis/pytorch-mobilenet" + +import torch.nn as nn +import torch.nn.functional as F + + +class MobileNetV1(nn.Module): + def __init__(self, num_classes=1024): + super(MobileNetV1, self).__init__() + + def conv_bn(inp, oup, stride): + return nn.Sequential( + nn.Conv2d(inp, oup, 3, stride, 1, bias=False), nn.BatchNorm2d(oup), nn.ReLU(inplace=True) + ) + + def conv_dw(inp, oup, stride): + return nn.Sequential( + nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False), + nn.BatchNorm2d(inp), + nn.ReLU(inplace=True), + nn.Conv2d(inp, oup, 1, 1, 0, bias=False), + nn.BatchNorm2d(oup), + nn.ReLU(inplace=True), + ) + + self.model = nn.Sequential( + conv_bn(3, 32, 2), + conv_dw(32, 64, 1), + conv_dw(64, 128, 2), + conv_dw(128, 128, 1), + conv_dw(128, 256, 2), + conv_dw(256, 256, 1), + conv_dw(256, 512, 2), + conv_dw(512, 512, 1), + conv_dw(512, 512, 1), + conv_dw(512, 512, 1), + conv_dw(512, 512, 1), + conv_dw(512, 512, 1), + conv_dw(512, 1024, 2), + conv_dw(1024, 1024, 1), + ) + self.fc = nn.Linear(1024, num_classes) + + def forward(self, x): + x = self.model(x) + x = F.avg_pool2d(x, 7) + x = x.view(-1, 1024) + x = self.fc(x) + return x diff --git a/modules/PytorchSSD/nn/mobilenet_v2.py b/modules/PytorchSSD/nn/mobilenet_v2.py new file mode 100644 index 0000000..3817933 --- /dev/null +++ b/modules/PytorchSSD/nn/mobilenet_v2.py @@ -0,0 +1,183 @@ +import torch.nn as nn +import math + +# Modified from https://github.com/tonylins/pytorch-mobilenet-v2/blob/master/MobileNetV2.py. +# In this version, Relu6 is replaced with Relu to make it ONNX compatible. +# BatchNorm Layer is optional to make it easy do batch norm confusion. + + +def conv_bn(inp, oup, stride, use_batch_norm=True, onnx_compatible=False): + ReLU = nn.ReLU if onnx_compatible else nn.ReLU6 + + if use_batch_norm: + return nn.Sequential(nn.Conv2d(inp, oup, 3, stride, 1, bias=False), nn.BatchNorm2d(oup), ReLU(inplace=True)) + else: + return nn.Sequential(nn.Conv2d(inp, oup, 3, stride, 1, bias=False), ReLU(inplace=True)) + + +def conv_1x1_bn(inp, oup, use_batch_norm=True, onnx_compatible=False): + ReLU = nn.ReLU if onnx_compatible else nn.ReLU6 + if use_batch_norm: + return nn.Sequential(nn.Conv2d(inp, oup, 1, 1, 0, bias=False), nn.BatchNorm2d(oup), ReLU(inplace=True)) + else: + return nn.Sequential(nn.Conv2d(inp, oup, 1, 1, 0, bias=False), ReLU(inplace=True)) + + +class InvertedResidual(nn.Module): + def __init__(self, inp, oup, stride, expand_ratio, use_batch_norm=True, onnx_compatible=False): + super(InvertedResidual, self).__init__() + ReLU = nn.ReLU if onnx_compatible else nn.ReLU6 + + self.stride = stride + assert stride in [1, 2] + + hidden_dim = round(inp * expand_ratio) + self.use_res_connect = self.stride == 1 and inp == oup + + if expand_ratio == 1: + if use_batch_norm: + self.conv = nn.Sequential( + # dw + nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False), + nn.BatchNorm2d(hidden_dim), + ReLU(inplace=True), + # pw-linear + nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), + nn.BatchNorm2d(oup), + ) + else: + self.conv = nn.Sequential( + # dw + nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False), + ReLU(inplace=True), + # pw-linear + nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), + ) + else: + if use_batch_norm: + self.conv = nn.Sequential( + # pw + nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False), + nn.BatchNorm2d(hidden_dim), + ReLU(inplace=True), + # dw + nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False), + nn.BatchNorm2d(hidden_dim), + ReLU(inplace=True), + # pw-linear + nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), + nn.BatchNorm2d(oup), + ) + else: + self.conv = nn.Sequential( + # pw + nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False), + ReLU(inplace=True), + # dw + nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False), + ReLU(inplace=True), + # pw-linear + nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), + ) + + def forward(self, x): + if self.use_res_connect: + return x + self.conv(x) + else: + return self.conv(x) + + +class MobileNetV2(nn.Module): + def __init__( + self, + n_class=1000, + input_size=224, + width_mult=1.0, + dropout_ratio=0.2, + use_batch_norm=True, + onnx_compatible=False, + ): + super(MobileNetV2, self).__init__() + block = InvertedResidual + input_channel = 32 + last_channel = 1280 + interverted_residual_setting = [ + # t, c, n, s + [1, 16, 1, 1], + [6, 24, 2, 2], + [6, 32, 3, 2], + [6, 64, 4, 2], + [6, 96, 3, 1], + [6, 160, 3, 2], + [6, 320, 1, 1], + ] + + # building first layer + assert input_size % 32 == 0 + input_channel = int(input_channel * width_mult) + self.last_channel = int(last_channel * width_mult) if width_mult > 1.0 else last_channel + self.features = [conv_bn(3, input_channel, 2, onnx_compatible=onnx_compatible)] + # building inverted residual blocks + for t, c, n, s in interverted_residual_setting: + output_channel = int(c * width_mult) + for i in range(n): + if i == 0: + self.features.append( + block( + input_channel, + output_channel, + s, + expand_ratio=t, + use_batch_norm=use_batch_norm, + onnx_compatible=onnx_compatible, + ) + ) + else: + self.features.append( + block( + input_channel, + output_channel, + 1, + expand_ratio=t, + use_batch_norm=use_batch_norm, + onnx_compatible=onnx_compatible, + ) + ) + input_channel = output_channel + # building last several layers + self.features.append( + conv_1x1_bn( + input_channel, self.last_channel, use_batch_norm=use_batch_norm, onnx_compatible=onnx_compatible + ) + ) + # make it nn.Sequential + self.features = nn.Sequential(*self.features) + + # building classifier + self.classifier = nn.Sequential( + nn.Dropout(dropout_ratio), + nn.Linear(self.last_channel, n_class), + ) + + self._initialize_weights() + + def forward(self, x): + x = self.features(x) + x = x.mean(3).mean(2) + x = self.classifier(x) + return x + + def _initialize_weights(self): + for m in self.modules(): + if isinstance(m, nn.Conv2d): + n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels + m.weight.data.normal_(0, math.sqrt(2.0 / n)) + if m.bias is not None: + m.bias.data.zero_() + elif isinstance(m, nn.BatchNorm2d): + m.weight.data.fill_(1) + m.bias.data.zero_() + elif isinstance(m, nn.Linear): + n = m.weight.size(1) + m.weight.data.normal_(0, 0.01) + m.bias.data.zero_() diff --git a/modules/PytorchSSD/nn/mobilenetv3.py b/modules/PytorchSSD/nn/mobilenetv3.py new file mode 100644 index 0000000..d62df14 --- /dev/null +++ b/modules/PytorchSSD/nn/mobilenetv3.py @@ -0,0 +1,231 @@ +"""MobileNetV3 in PyTorch. + +See the paper "Inverted Residuals and Linear Bottlenecks: +Mobile Networks for Classification, Detection and Segmentation" for more details. +""" +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.nn import init + + +class hswish(nn.Module): + def forward(self, x): + out = x * F.relu6(x + 3, inplace=True) / 6 + return out + + +class hsigmoid(nn.Module): + def forward(self, x): + out = F.relu6(x + 3, inplace=True) / 6 + return out + + +class SeModule(nn.Module): + def __init__(self, in_size, reduction=4): + super(SeModule, self).__init__() + self.se = nn.Sequential( + nn.AdaptiveAvgPool2d(1), + nn.Conv2d(in_size, in_size // reduction, kernel_size=1, stride=1, padding=0, bias=False), + nn.BatchNorm2d(in_size // reduction), + nn.ReLU(inplace=True), + nn.Conv2d(in_size // reduction, in_size, kernel_size=1, stride=1, padding=0, bias=False), + nn.BatchNorm2d(in_size), + hsigmoid(), + ) + + def forward(self, x): + return x * self.se(x) + + +class Block(nn.Module): + """expand + depthwise + pointwise""" + + def __init__(self, kernel_size, in_size, expand_size, out_size, nolinear, semodule, stride): + super(Block, self).__init__() + self.stride = stride + self.se = semodule + + self.conv1 = nn.Conv2d(in_size, expand_size, kernel_size=1, stride=1, padding=0, bias=False) + self.bn1 = nn.BatchNorm2d(expand_size) + self.nolinear1 = nolinear + self.conv2 = nn.Conv2d( + expand_size, + expand_size, + kernel_size=kernel_size, + stride=stride, + padding=kernel_size // 2, + groups=expand_size, + bias=False, + ) + self.bn2 = nn.BatchNorm2d(expand_size) + self.nolinear2 = nolinear + self.conv3 = nn.Conv2d(expand_size, out_size, kernel_size=1, stride=1, padding=0, bias=False) + self.bn3 = nn.BatchNorm2d(out_size) + + self.shortcut = nn.Sequential() + if stride == 1 and in_size != out_size: + self.shortcut = nn.Sequential( + nn.Conv2d(in_size, out_size, kernel_size=1, stride=1, padding=0, bias=False), + nn.BatchNorm2d(out_size), + ) + + def forward(self, x): + out = self.nolinear1(self.bn1(self.conv1(x))) + out = self.nolinear2(self.bn2(self.conv2(out))) + out = self.bn3(self.conv3(out)) + if self.se != None: + out = self.se(out) + out = out + self.shortcut(x) if self.stride == 1 else out + return out + + +class MobileNetV3_Large(nn.Module): + def __init__(self, num_classes=1000): + super(MobileNetV3_Large, self).__init__() + + self.features = [] + + self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1, bias=False) + self.features.append(self.conv1) + self.bn1 = nn.BatchNorm2d(16) + self.features.append(self.bn1) + self.hs1 = hswish() + self.features.append(self.hs1) + + self.bneck = nn.Sequential( + Block(3, 16, 16, 16, nn.ReLU(inplace=True), None, 1), + Block(3, 16, 64, 24, nn.ReLU(inplace=True), None, 2), + Block(3, 24, 72, 24, nn.ReLU(inplace=True), None, 1), + Block(5, 24, 72, 40, nn.ReLU(inplace=True), SeModule(40), 2), + Block(5, 40, 120, 40, nn.ReLU(inplace=True), SeModule(40), 1), + Block(5, 40, 120, 40, nn.ReLU(inplace=True), SeModule(40), 1), + Block(3, 40, 240, 80, hswish(), None, 2), + Block(3, 80, 200, 80, hswish(), None, 1), + Block(3, 80, 184, 80, hswish(), None, 1), + Block(3, 80, 184, 80, hswish(), None, 1), + Block(3, 80, 480, 112, hswish(), SeModule(112), 1), + Block(3, 112, 672, 112, hswish(), SeModule(112), 1), + Block(5, 112, 672, 160, hswish(), SeModule(160), 1), + Block(5, 160, 672, 160, hswish(), SeModule(160), 2), + Block(5, 160, 960, 160, hswish(), SeModule(160), 1), + ) + + self.features.extend([block for block in self.bneck]) + + self.conv2 = nn.Conv2d(160, 960, kernel_size=1, stride=1, padding=0, bias=False) + self.features.append(self.conv2) + self.bn2 = nn.BatchNorm2d(960) + self.features.append(self.bn2) + self.hs2 = hswish() + self.features.append(self.hs2) + + self.linear3 = nn.Linear(960, 1280) + self.bn3 = nn.BatchNorm1d(1280) + self.hs3 = hswish() + self.linear4 = nn.Linear(1280, num_classes) + self.init_params() + + self.features = nn.Sequential(*self.features) + + def init_params(self): + for m in self.modules(): + if isinstance(m, nn.Conv2d): + init.kaiming_normal_(m.weight, mode="fan_out") + if m.bias is not None: + init.constant_(m.bias, 0) + elif isinstance(m, nn.BatchNorm2d): + init.constant_(m.weight, 1) + init.constant_(m.bias, 0) + elif isinstance(m, nn.Linear): + init.normal_(m.weight, std=0.001) + if m.bias is not None: + init.constant_(m.bias, 0) + + def forward(self, x): + out = self.hs1(self.bn1(self.conv1(x))) + out = self.bneck(out) + out = self.hs2(self.bn2(self.conv2(out))) + out = F.avg_pool2d(out, 7) + out = out.view(out.size(0), -1) + out = self.hs3(self.bn3(self.linear3(out))) + out = self.linear4(out) + return out + + +class MobileNetV3_Small(nn.Module): + def __init__(self, num_classes=1000): + super(MobileNetV3_Small, self).__init__() + + self.features = [] + + self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1, bias=False) + self.features.append(self.conv1) + self.bn1 = nn.BatchNorm2d(16) + self.features.append(self.bn1) + self.hs1 = hswish() + self.features.append(self.hs1) + + self.bneck = nn.Sequential( + Block(3, 16, 16, 16, nn.ReLU(inplace=True), SeModule(16), 2), + Block(3, 16, 72, 24, nn.ReLU(inplace=True), None, 2), + Block(3, 24, 88, 24, nn.ReLU(inplace=True), None, 1), + Block(5, 24, 96, 40, hswish(), SeModule(40), 2), + Block(5, 40, 240, 40, hswish(), SeModule(40), 1), + Block(5, 40, 240, 40, hswish(), SeModule(40), 1), + Block(5, 40, 120, 48, hswish(), SeModule(48), 1), + Block(5, 48, 144, 48, hswish(), SeModule(48), 1), + Block(5, 48, 288, 96, hswish(), SeModule(96), 2), + Block(5, 96, 576, 96, hswish(), SeModule(96), 1), + Block(5, 96, 576, 96, hswish(), SeModule(96), 1), + ) + + self.features.extend([block for block in self.bneck]) + + self.conv2 = nn.Conv2d(96, 576, kernel_size=1, stride=1, padding=0, bias=False) + self.features.append(self.conv2) + self.bn2 = nn.BatchNorm2d(576) + self.features.append(self.bn2) + self.hs2 = hswish() + self.features.append(self.hs2) + self.linear3 = nn.Linear(576, 1280) + self.bn3 = nn.BatchNorm1d(1280) + self.hs3 = hswish() + self.linear4 = nn.Linear(1280, num_classes) + self.init_params() + + self.features = nn.Sequential(*self.features) + + def init_params(self): + for m in self.modules(): + if isinstance(m, nn.Conv2d): + init.kaiming_normal_(m.weight, mode="fan_out") + if m.bias is not None: + init.constant_(m.bias, 0) + elif isinstance(m, nn.BatchNorm2d): + init.constant_(m.weight, 1) + init.constant_(m.bias, 0) + elif isinstance(m, nn.Linear): + init.normal_(m.weight, std=0.001) + if m.bias is not None: + init.constant_(m.bias, 0) + + def forward(self, x): + out = self.hs1(self.bn1(self.conv1(x))) + out = self.bneck(out) + out = self.hs2(self.bn2(self.conv2(out))) + out = F.avg_pool2d(out, 7) + out = out.view(out.size(0), -1) + out = self.hs3(self.bn3(self.linear3(out))) + out = self.linear4(out) + return out + + +def test(): + net = MobileNetV3_Small() + x = torch.randn(2, 3, 224, 224) + y = net(x) + print(y.size()) + + +# test() diff --git a/modules/PytorchSSD/nn/multibox_loss.py b/modules/PytorchSSD/nn/multibox_loss.py new file mode 100644 index 0000000..e79db64 --- /dev/null +++ b/modules/PytorchSSD/nn/multibox_loss.py @@ -0,0 +1,46 @@ +import torch.nn as nn +import torch.nn.functional as F +import torch + + +from ..utils import box_utils + + +class MultiboxLoss(nn.Module): + def __init__(self, priors, iou_threshold, neg_pos_ratio, center_variance, size_variance, device): + """Implement SSD Multibox Loss. + + Basically, Multibox loss combines classification loss + and Smooth L1 regression loss. + """ + super(MultiboxLoss, self).__init__() + self.iou_threshold = iou_threshold + self.neg_pos_ratio = neg_pos_ratio + self.center_variance = center_variance + self.size_variance = size_variance + self.priors = priors + self.priors.to(device) + + def forward(self, confidence, predicted_locations, labels, gt_locations): + """Compute classification loss and smooth l1 loss. + + Args: + confidence (batch_size, num_priors, num_classes): class predictions. + locations (batch_size, num_priors, 4): predicted locations. + labels (batch_size, num_priors): real labels of all the priors. + boxes (batch_size, num_priors, 4): real boxes corresponding all the priors. + """ + num_classes = confidence.size(2) + with torch.no_grad(): + # derived from cross_entropy=sum(log(p)) + loss = -F.log_softmax(confidence, dim=2)[:, :, 0] + mask = box_utils.hard_negative_mining(loss, labels, self.neg_pos_ratio) + + confidence = confidence[mask, :] + classification_loss = F.cross_entropy(confidence.reshape(-1, num_classes), labels[mask], size_average=False) + pos_mask = labels > 0 + predicted_locations = predicted_locations[pos_mask, :].reshape(-1, 4) + gt_locations = gt_locations[pos_mask, :].reshape(-1, 4) + smooth_l1_loss = F.smooth_l1_loss(predicted_locations, gt_locations, size_average=False) + num_pos = gt_locations.size(0) + return smooth_l1_loss / num_pos, classification_loss / num_pos diff --git a/modules/PytorchSSD/nn/scaled_l2_norm.py b/modules/PytorchSSD/nn/scaled_l2_norm.py new file mode 100644 index 0000000..6dba288 --- /dev/null +++ b/modules/PytorchSSD/nn/scaled_l2_norm.py @@ -0,0 +1,18 @@ +import torch.nn as nn +import torch +import torch.nn.functional as F + + +class ScaledL2Norm(nn.Module): + def __init__(self, in_channels, initial_scale): + super(ScaledL2Norm, self).__init__() + self.in_channels = in_channels + self.scale = nn.Parameter(torch.Tensor(in_channels)) + self.initial_scale = initial_scale + self.reset_parameters() + + def forward(self, x): + return F.normalize(x, p=2, dim=1) * self.scale.unsqueeze(0).unsqueeze(2).unsqueeze(3) + + def reset_parameters(self): + self.scale.data.fill_(self.initial_scale) diff --git a/modules/PytorchSSD/nn/squeezenet.py b/modules/PytorchSSD/nn/squeezenet.py new file mode 100644 index 0000000..712f1d8 --- /dev/null +++ b/modules/PytorchSSD/nn/squeezenet.py @@ -0,0 +1,120 @@ +import math +import torch +import torch.nn as nn +import torch.nn.init as init +import torch.utils.model_zoo as model_zoo + + +__all__ = ["SqueezeNet", "squeezenet1_0", "squeezenet1_1"] + + +model_urls = { + "squeezenet1_0": "https://download.pytorch.org/models/squeezenet1_0-a815701f.pth", + "squeezenet1_1": "https://download.pytorch.org/models/squeezenet1_1-f364aa15.pth", +} + + +class Fire(nn.Module): + def __init__(self, inplanes, squeeze_planes, expand1x1_planes, expand3x3_planes): + super(Fire, self).__init__() + self.inplanes = inplanes + self.squeeze = nn.Conv2d(inplanes, squeeze_planes, kernel_size=1) + self.squeeze_activation = nn.ReLU(inplace=True) + self.expand1x1 = nn.Conv2d(squeeze_planes, expand1x1_planes, kernel_size=1) + self.expand1x1_activation = nn.ReLU(inplace=True) + self.expand3x3 = nn.Conv2d(squeeze_planes, expand3x3_planes, kernel_size=3, padding=1) + self.expand3x3_activation = nn.ReLU(inplace=True) + + def forward(self, x): + x = self.squeeze_activation(self.squeeze(x)) + return torch.cat( + [self.expand1x1_activation(self.expand1x1(x)), self.expand3x3_activation(self.expand3x3(x))], 1 + ) + + +class SqueezeNet(nn.Module): + def __init__(self, version=1.0, num_classes=1000): + super(SqueezeNet, self).__init__() + if version not in [1.0, 1.1]: + raise ValueError("Unsupported SqueezeNet version {version}:" "1.0 or 1.1 expected".format(version=version)) + self.num_classes = num_classes + if version == 1.0: + self.features = nn.Sequential( + nn.Conv2d(3, 96, kernel_size=7, stride=2), + nn.ReLU(inplace=True), + nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True), + Fire(96, 16, 64, 64), + Fire(128, 16, 64, 64), + Fire(128, 32, 128, 128), + nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True), + Fire(256, 32, 128, 128), + Fire(256, 48, 192, 192), + Fire(384, 48, 192, 192), + Fire(384, 64, 256, 256), + nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True), + Fire(512, 64, 256, 256), + ) + else: + self.features = nn.Sequential( + nn.Conv2d(3, 64, kernel_size=3, stride=2), + nn.ReLU(inplace=True), + nn.MaxPool2d(kernel_size=3, stride=2), + Fire(64, 16, 64, 64), + Fire(128, 16, 64, 64), + nn.MaxPool2d(kernel_size=3, stride=2), + Fire(128, 32, 128, 128), + Fire(256, 32, 128, 128), + nn.MaxPool2d(kernel_size=3, stride=2), + Fire(256, 48, 192, 192), + Fire(384, 48, 192, 192), + Fire(384, 64, 256, 256), + Fire(512, 64, 256, 256), + ) + # Final convolution is initialized differently form the rest + final_conv = nn.Conv2d(512, self.num_classes, kernel_size=1) + self.classifier = nn.Sequential( + nn.Dropout(p=0.5), final_conv, nn.ReLU(inplace=True), nn.AvgPool2d(13, stride=1) + ) + + for m in self.modules(): + if isinstance(m, nn.Conv2d): + if m is final_conv: + init.normal_(m.weight, mean=0.0, std=0.01) + else: + init.kaiming_uniform_(m.weight) + if m.bias is not None: + init.constant_(m.bias, 0) + + def forward(self, x): + x = self.features(x) + x = self.classifier(x) + return x.view(x.size(0), self.num_classes) + + +def squeezenet1_0(pretrained=False, **kwargs): + r"""SqueezeNet model architecture from the `"SqueezeNet: AlexNet-level + accuracy with 50x fewer parameters and <0.5MB model size" + `_ paper. + + Args: + pretrained (bool): If True, returns a model pre-trained on ImageNet + """ + model = SqueezeNet(version=1.0, **kwargs) + if pretrained: + model.load_state_dict(model_zoo.load_url(model_urls["squeezenet1_0"])) + return model + + +def squeezenet1_1(pretrained=False, **kwargs): + r"""SqueezeNet 1.1 model from the `official SqueezeNet repo + `_. + SqueezeNet 1.1 has 2.4x less computation and slightly fewer parameters + than SqueezeNet 1.0, without sacrificing accuracy. + + Args: + pretrained (bool): If True, returns a model pre-trained on ImageNet + """ + model = SqueezeNet(version=1.1, **kwargs) + if pretrained: + model.load_state_dict(model_zoo.load_url(model_urls["squeezenet1_1"])) + return model diff --git a/modules/PytorchSSD/nn/vgg.py b/modules/PytorchSSD/nn/vgg.py new file mode 100644 index 0000000..1428951 --- /dev/null +++ b/modules/PytorchSSD/nn/vgg.py @@ -0,0 +1,24 @@ +import torch.nn as nn + + +# borrowed from https://github.com/amdegroot/ssd.pytorch/blob/master/ssd.py +def vgg(cfg, batch_norm=False): + layers = [] + in_channels = 3 + for v in cfg: + if v == "M": + layers += [nn.MaxPool2d(kernel_size=2, stride=2)] + elif v == "C": + layers += [nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)] + else: + conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1) + if batch_norm: + layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)] + else: + layers += [conv2d, nn.ReLU(inplace=True)] + in_channels = v + pool5 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1) + conv6 = nn.Conv2d(512, 1024, kernel_size=3, padding=6, dilation=6) + conv7 = nn.Conv2d(1024, 1024, kernel_size=1) + layers += [pool5, conv6, nn.ReLU(inplace=True), conv7, nn.ReLU(inplace=True)] + return layers diff --git a/modules/PytorchSSD/prunning/prunner.py b/modules/PytorchSSD/prunning/prunner.py new file mode 100644 index 0000000..c890af0 --- /dev/null +++ b/modules/PytorchSSD/prunning/prunner.py @@ -0,0 +1,248 @@ +import torch +import torch.nn as nn +import logging +from heapq import nsmallest + +from ..utils.model_book import ModelBook + + +class ModelPrunner: + def __init__(self, model, train_fun, ignored_paths=[]): + """Implement the pruning algorithm described in the paper https://arxiv.org/pdf/1611.06440.pdf . + + The prunning criteria is dC/dh * h, while C is the cost, h is the activation. + """ + self.model = model + self.train_fun = train_fun + self.ignored_paths = ignored_paths + self.book = ModelBook(self.model) + self.outputs = {} + self.grads = {} + self.handles = [] + self.decendent_batch_norms = {} # descendants impacted by the conv layers. + self.last_conv_path = None # used to trace the graph + self.descendent_convs = {} # descendants impacted by the conv layers. + self.descendent_linears = {} # descendants impacted by the linear layers. + self.last_linear_path = None # used to trace the graph + + def _make_new_conv(self, conv, filter_index, channel_type="out"): + if not isinstance(conv, nn.Conv2d): + raise TypeError(f"The module is not Conv2d, but {type(conv)}.") + + if channel_type == "out": + new_conv = nn.Conv2d( + conv.in_channels, + conv.out_channels - 1, + conv.kernel_size, + conv.stride, + conv.padding, + conv.dilation, + conv.groups, + conv.bias is not None, + ) + mask = torch.ones(conv.out_channels, dtype=torch.uint8) + mask[filter_index] = 0 + new_conv.weight.data = conv.weight.data[mask, :, :, :] + if conv.bias is not None: + new_conv.bias.data = conv.bias.data[mask] + + elif channel_type == "in": + new_conv = nn.Conv2d( + conv.in_channels - 1, + conv.out_channels, + conv.kernel_size, + conv.stride, + conv.padding, + conv.dilation, + conv.groups, + conv.bias is not None, + ) + mask = torch.ones(conv.in_channels, dtype=torch.uint8) + mask[filter_index] = 0 + new_conv.weight.data = conv.weight.data[:, mask, :, :] + if conv.bias is not None: + new_conv.bias.data = conv.bias.data + else: + raise ValueError(f"{channel_type} should be either 'in' or 'out'.") + return new_conv + + def remove_conv_filter(self, path, filter_index): + conv = self.book.get_module(path) + logging.info(f'Prune Conv: {"/".join(path)}, Filter: {filter_index}, Layer: {conv}') + new_conv = self._make_new_conv(conv, filter_index, channel_type="out") + self._update_model(path, new_conv) + + next_conv_path = self.descendent_convs.get(path) + if next_conv_path: + next_conv = self.book.get_module(next_conv_path) + new_next_conv = self._make_new_conv(next_conv, filter_index, channel_type="in") + self._update_model(next_conv_path, new_next_conv) + + # reduce the num_features of batch norm + batch_norm_path = self.decendent_batch_norms.get(path) + if batch_norm_path: + batch_norm = self.book.get_module(batch_norm_path) + new_batch_norm = nn.BatchNorm2d(batch_norm.num_features - 1) + self._update_model(batch_norm_path, new_batch_norm) + + # reduce the in channels of linear layer + linear_path = self.descendent_linears.get(path) + if linear_path: + linear = self.book.get_module(linear_path) + new_linear = self._make_new_linear(linear, filter_index, conv, channel_type="in") + self._update_model(linear_path, new_linear) + + @staticmethod + def _make_new_linear(linear, feature_index, conv=None, channel_type="out"): + if channel_type == "out": + new_linear = nn.Linear(linear.in_features, linear.out_features - 1, bias=linear.bias is not None) + mask = torch.ones(linear.out_features, dtype=torch.uint8) + mask[feature_index] = 0 + new_linear.weight.data = linear.weight.data[mask, :] + if linear.bias is not None: + new_linear.bias.data = linear.bias.data[mask] + elif channel_type == "in": + if conv: + block = int(linear.in_features / conv.out_channels) + else: + block = 1 + new_linear = nn.Linear(linear.in_features - block, linear.out_features, bias=linear.bias is not None) + start_index = feature_index * block + end_index = (feature_index + 1) * block + mask = torch.ones(linear.in_features, dtype=torch.uint8) + mask[start_index:end_index] = 0 + new_linear.weight.data = linear.weight.data[:, mask] + if linear.bias is not None: + new_linear.bias.data = linear.bias.data + else: + raise ValueError(f"{channel_type} should be either 'in' or 'out'.") + return new_linear + + def prune_conv_layers(self, num=1): + """Prune one conv2d filter.""" + self.register_conv_hooks() + before_loss, before_accuracy = self.train_fun(self.model) + ranks = [] + for path, output in self.outputs.items(): + output = output.data + grad = self.grads[path].data + v = grad * output + v = v.sum(0).sum(1).sum(1) # sum to the channel axis. + v = torch.abs(v) + v = v / torch.sqrt(torch.sum(v * v)) # normalize + for i, e in enumerate(v): + ranks.append((path, i, e)) + to_prune = nsmallest(num, ranks, key=lambda t: t[2]) + to_prune = sorted( + to_prune, key=lambda t: (t[0], -t[1]) + ) # prune the filters with bigger indexes first to avoid rearrangement. + for path, filter_index, value in to_prune: + self.remove_conv_filter(path, filter_index) + self.deregister_hooks() + after_loss, after_accuracy = self.train_fun(self.model) + return after_loss - before_loss, after_accuracy - before_accuracy + + def register_conv_hooks(self): + """Run register before training for pruning.""" + self.outputs.clear() + self.grads.clear() + self.handles.clear() + self.last_conv_path = None + self.decendent_batch_norms.clear() + self.descendent_convs.clear() + self.descendent_linears.clear() + + def forward_hook(m, input, output): + path = self.book.get_path(m) + if isinstance(m, nn.Conv2d): + if path not in self.ignored_paths: + self.outputs[path] = output + if self.last_conv_path: + self.descendent_convs[self.last_conv_path] = path + self.last_conv_path = path + elif isinstance(m, nn.BatchNorm2d): + if self.last_conv_path: + self.decendent_batch_norms[self.last_conv_path] = path + elif isinstance(m, nn.Linear): + if self.last_conv_path: + self.descendent_linears[self.last_conv_path] = path + self.last_conv_path = None # after a linear layer the conv layer doesn't matter + + def backward_hook(m, input, output): + path = self.book.get_path(m) + self.grads[path] = output[0] + + for path, m in self.book.modules(module_type=(nn.Conv2d, nn.BatchNorm2d, nn.Linear)): + h = m.register_forward_hook(forward_hook) + self.handles.append(h) + h = m.register_backward_hook(backward_hook) + self.handles.append(h) + + def deregister_hooks(self): + """Run degresiter before retraining to recover the model""" + for handle in self.handles: + handle.remove() + + def prune_linear_layers(self, num=1): + self.register_linear_hooks() + before_loss, before_accuracy = self.train_fun(self.model) + ranks = [] + for path, output in self.outputs.items(): + output = output.data + grad = self.grads[path].data + v = grad * output + v = v.sum(0) # sum to the channel axis. + v = torch.abs(v) + v = v / torch.sqrt(torch.sum(v * v)) # normalize + for i, e in enumerate(v): + ranks.append((path, i, e)) + to_prune = nsmallest(num, ranks, key=lambda t: t[2]) + to_prune = sorted(to_prune, key=lambda t: (t[0], -t[1])) + for path, feature_index, value in to_prune: + self.remove_linear_feature(path, feature_index) + self.deregister_hooks() + after_loss, after_accuracy = self.train_fun(self.model) + return after_loss - before_loss, after_accuracy - before_accuracy + + def register_linear_hooks(self): + self.outputs.clear() + self.grads.clear() + self.handles.clear() + self.descendent_linears.clear() + self.last_linear_path = None + + def forward_hook(m, input, output): + path = self.book.get_path(m) + if path not in self.ignored_paths: + self.outputs[path] = output + if self.last_linear_path: + self.descendent_linears[self.last_linear_path] = path + self.last_linear_path = path + + def backward_hook(m, input, output): + path = self.book.get_path(m) + self.grads[path] = output[0] + + for _, m in self.book.linear_modules(): + h = m.register_forward_hook(forward_hook) + self.handles.append(h) + h = m.register_backward_hook(backward_hook) + self.handles.append(h) + + def remove_linear_feature(self, path, feature_index): + linear = self.book.get_module(path) + logging.info(f'Prune Linear: {"/".join(path)}, Filter: {feature_index}, Layer: {linear}') + new_linear = self._make_new_linear(linear, feature_index, channel_type="out") + self._update_model(path, new_linear) + + # update following linear layers + next_linear_path = self.descendent_linears.get(path) + if next_linear_path: + next_linear = self.book.get_module(next_linear_path) + new_next_linear = self._make_new_linear(next_linear, feature_index, channel_type="in") + self._update_model(next_linear_path, new_next_linear) + + def _update_model(self, path, module): + parent = self.book.get_module(path[:-1]) + parent._modules[path[-1]] = module + self.book.update(path, module) diff --git a/modules/PytorchSSD/ssd/config/mobilenetv1_ssd_config.py b/modules/PytorchSSD/ssd/config/mobilenetv1_ssd_config.py new file mode 100644 index 0000000..07781a8 --- /dev/null +++ b/modules/PytorchSSD/ssd/config/mobilenetv1_ssd_config.py @@ -0,0 +1,23 @@ +import numpy as np + +from modules.PytorchSSD.utils.box_utils import SSDSpec, SSDBoxSizes, generate_ssd_priors + + +image_size = 300 +image_mean = np.array([127, 127, 127]) # RGB layout +image_std = 128.0 +iou_threshold = 0.45 +center_variance = 0.1 +size_variance = 0.2 + +specs = [ + SSDSpec(19, 16, SSDBoxSizes(60, 105), [2, 3]), + SSDSpec(10, 32, SSDBoxSizes(105, 150), [2, 3]), + SSDSpec(5, 64, SSDBoxSizes(150, 195), [2, 3]), + SSDSpec(3, 100, SSDBoxSizes(195, 240), [2, 3]), + SSDSpec(2, 150, SSDBoxSizes(240, 285), [2, 3]), + SSDSpec(1, 300, SSDBoxSizes(285, 330), [2, 3]), +] + + +priors = generate_ssd_priors(specs, image_size) diff --git a/modules/PytorchSSD/ssd/config/squeezenet_ssd_config.py b/modules/PytorchSSD/ssd/config/squeezenet_ssd_config.py new file mode 100644 index 0000000..111383c --- /dev/null +++ b/modules/PytorchSSD/ssd/config/squeezenet_ssd_config.py @@ -0,0 +1,23 @@ +import numpy as np + +from vision.utils.box_utils import SSDSpec, SSDBoxSizes, generate_ssd_priors + + +image_size = 300 +image_mean = np.array([127, 127, 127]) # RGB layout +image_std = 128.0 +iou_threshold = 0.45 +center_variance = 0.1 +size_variance = 0.2 + +specs = [ + SSDSpec(17, 16, SSDBoxSizes(60, 105), [2, 3]), + SSDSpec(10, 32, SSDBoxSizes(105, 150), [2, 3]), + SSDSpec(5, 64, SSDBoxSizes(150, 195), [2, 3]), + SSDSpec(3, 100, SSDBoxSizes(195, 240), [2, 3]), + SSDSpec(2, 150, SSDBoxSizes(240, 285), [2, 3]), + SSDSpec(1, 300, SSDBoxSizes(285, 330), [2, 3]) +] + + +priors = generate_ssd_priors(specs, image_size) \ No newline at end of file diff --git a/modules/PytorchSSD/ssd/config/vgg_ssd_config.py b/modules/PytorchSSD/ssd/config/vgg_ssd_config.py new file mode 100644 index 0000000..a4d3de6 --- /dev/null +++ b/modules/PytorchSSD/ssd/config/vgg_ssd_config.py @@ -0,0 +1,24 @@ +import numpy as np + +from vision.utils.box_utils import SSDSpec, SSDBoxSizes, generate_ssd_priors + + +image_size = 300 +image_mean = np.array([123, 117, 104]) # RGB layout +image_std = 1.0 + +iou_threshold = 0.45 +center_variance = 0.1 +size_variance = 0.2 + +specs = [ + SSDSpec(38, 8, SSDBoxSizes(30, 60), [2]), + SSDSpec(19, 16, SSDBoxSizes(60, 111), [2, 3]), + SSDSpec(10, 32, SSDBoxSizes(111, 162), [2, 3]), + SSDSpec(5, 64, SSDBoxSizes(162, 213), [2, 3]), + SSDSpec(3, 100, SSDBoxSizes(213, 264), [2]), + SSDSpec(1, 300, SSDBoxSizes(264, 315), [2]) +] + + +priors = generate_ssd_priors(specs, image_size) \ No newline at end of file diff --git a/modules/PytorchSSD/ssd/data_preprocessing.py b/modules/PytorchSSD/ssd/data_preprocessing.py new file mode 100644 index 0000000..e138005 --- /dev/null +++ b/modules/PytorchSSD/ssd/data_preprocessing.py @@ -0,0 +1,69 @@ +from ..transforms.transforms import * + + +class ScaleByStd: + def __init__(self, std: float): + self.std = std + + def __call__(self, img, boxes=None, labels=None): + return (img / self.std, boxes, labels) + + +class TrainAugmentation: + def __init__(self, size, mean=0, std=1.0): + """ + Args: + size: the size the of final image. + mean: mean pixel value per channel. + """ + self.mean = mean + self.size = size + self.augment = Compose( + [ + ConvertFromInts(), + PhotometricDistort(), + Expand(self.mean), + RandomSampleCrop(), + RandomMirror(), + ToPercentCoords(), + Resize(self.size), + SubtractMeans(self.mean), + ScaleByStd(std), + ToTensor(), + ] + ) + + def __call__(self, img, boxes, labels): + """ + + Args: + img: the output of cv.imread in RGB layout. + boxes: boundding boxes in the form of (x1, y1, x2, y2). + labels: labels of boxes. + """ + return self.augment(img, boxes, labels) + + +class TestTransform: + def __init__(self, size, mean=0.0, std=1.0): + self.transform = Compose( + [ + ToPercentCoords(), + Resize(size), + SubtractMeans(mean), + ScaleByStd(std), + ToTensor(), + ] + ) + + def __call__(self, image, boxes, labels): + return self.transform(image, boxes, labels) + + +class PredictionTransform: + def __init__(self, size, mean=0.0, std=1.0): + self.transform = Compose([Resize(size), SubtractMeans(mean), ScaleByStd(std), ToTensor()]) + + def __call__(self, image): + image, _, _ = self.transform(image) + return image diff --git a/modules/PytorchSSD/ssd/fpn_mobilenetv1_ssd.py b/modules/PytorchSSD/ssd/fpn_mobilenetv1_ssd.py new file mode 100644 index 0000000..0040025 --- /dev/null +++ b/modules/PytorchSSD/ssd/fpn_mobilenetv1_ssd.py @@ -0,0 +1,91 @@ +import torch +from torch.nn import Conv2d, Sequential, ModuleList, ReLU +from ..nn.mobilenet import MobileNetV1 + +from .fpn_ssd import FPNSSD +from .predictor import Predictor +from .config import mobilenetv1_ssd_config as config + + +def create_fpn_mobilenetv1_ssd(num_classes): + base_net = MobileNetV1(1001).features # disable dropout layer + + source_layer_indexes = [ + (69, Conv2d(in_channels=512, out_channels=256, kernel_size=1)), + (len(base_net), Conv2d(in_channels=1024, out_channels=256, kernel_size=1)), + ] + extras = ModuleList( + [ + Sequential( + Conv2d(in_channels=1024, out_channels=256, kernel_size=1), + ReLU(), + Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + ] + ) + + regression_headers = ModuleList( + [ + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d( + in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1 + ), # TODO: change to kernel_size=1, padding=0? + ] + ) + + classification_headers = ModuleList( + [ + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d( + in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1 + ), # TODO: change to kernel_size=1, padding=0? + ] + ) + + return FPNSSD(num_classes, base_net, source_layer_indexes, extras, classification_headers, regression_headers) + + +def create_fpn_mobilenetv1_ssd_predictor( + net, candidate_size=200, nms_method=None, sigma=0.5, device=torch.device("cpu") +): + predictor = Predictor( + net, + config.image_size, + config.image_mean, + config.priors, + config.center_variance, + config.size_variance, + nms_method=nms_method, + iou_threshold=config.iou_threshold, + candidate_size=candidate_size, + sigma=sigma, + device=device, + ) + return predictor diff --git a/modules/PytorchSSD/ssd/fpn_ssd.py b/modules/PytorchSSD/ssd/fpn_ssd.py new file mode 100644 index 0000000..cfa1b26 --- /dev/null +++ b/modules/PytorchSSD/ssd/fpn_ssd.py @@ -0,0 +1,148 @@ +import torch.nn as nn +import torch +import torch.nn.functional as F +import numpy as np +from typing import List, Tuple + +from ..utils import box_utils + + +class FPNSSD(nn.Module): + def __init__( + self, + num_classes: int, + base_net: nn.ModuleList, + source_layer_indexes: List[int], + extras: nn.ModuleList, + classification_headers: nn.ModuleList, + regression_headers: nn.ModuleList, + upsample_mode="nearest", + ): + """Compose a SSD model using the given components.""" + super(FPNSSD, self).__init__() + + self.num_classes = num_classes + self.base_net = base_net + self.source_layer_indexes = source_layer_indexes + self.extras = extras + self.classification_headers = classification_headers + self.regression_headers = regression_headers + self.upsample_mode = upsample_mode + + # register layers in source_layer_indexes by adding them to a module list + self.source_layer_add_ons = nn.ModuleList([t[1] for t in source_layer_indexes if isinstance(t, tuple)]) + self.upsamplers = [ + nn.Upsample(size=(19, 19), mode="bilinear"), + nn.Upsample(size=(10, 10), mode="bilinear"), + nn.Upsample(size=(5, 5), mode="bilinear"), + nn.Upsample(size=(3, 3), mode="bilinear"), + nn.Upsample(size=(2, 2), mode="bilinear"), + ] + + def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: + confidences = [] + locations = [] + start_layer_index = 0 + header_index = 0 + features = [] + for end_layer_index in self.source_layer_indexes: + if isinstance(end_layer_index, tuple): + added_layer = end_layer_index[1] + end_layer_index = end_layer_index[0] + else: + added_layer = None + for layer in self.base_net[start_layer_index:end_layer_index]: + x = layer(x) + start_layer_index = end_layer_index + if added_layer: + y = added_layer(x) + else: + y = x + # confidence, location = self.compute_header(header_index, y) + features.append(y) + header_index += 1 + # confidences.append(confidence) + # locations.append(location) + + for layer in self.base_net[end_layer_index:]: + x = layer(x) + + for layer in self.extras: + x = layer(x) + # confidence, location = self.compute_header(header_index, x) + features.append(x) + header_index += 1 + # confidences.append(confidence) + # locations.append(location) + + upstream_feature = None + for i in range(len(features) - 1, -1, -1): + feature = features[i] + if upstream_feature is not None: + upstream_feature = self.upsamplers[i](upstream_feature) + upstream_feature += feature + else: + upstream_feature = feature + confidence, location = self.compute_header(i, upstream_feature) + confidences.append(confidence) + locations.append(location) + confidences = torch.cat(confidences, 1) + locations = torch.cat(locations, 1) + return confidences, locations + + def compute_header(self, i, x): + confidence = self.classification_headers[i](x) + confidence = confidence.permute(0, 2, 3, 1).contiguous() + confidence = confidence.view(confidence.size(0), -1, self.num_classes) + + location = self.regression_headers[i](x) + location = location.permute(0, 2, 3, 1).contiguous() + location = location.view(location.size(0), -1, 4) + + return confidence, location + + def init_from_base_net(self, model): + self.base_net.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage), strict=False) + self.source_layer_add_ons.apply(_xavier_init_) + self.extras.apply(_xavier_init_) + self.classification_headers.apply(_xavier_init_) + self.regression_headers.apply(_xavier_init_) + + def init(self): + self.base_net.apply(_xavier_init_) + self.source_layer_add_ons.apply(_xavier_init_) + self.extras.apply(_xavier_init_) + self.classification_headers.apply(_xavier_init_) + self.regression_headers.apply(_xavier_init_) + + def load(self, model): + self.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage)) + + def save(self, model_path): + torch.save(self.state_dict(), model_path) + + +class MatchPrior(object): + def __init__(self, center_form_priors, center_variance, size_variance, iou_threshold): + self.center_form_priors = center_form_priors + self.corner_form_priors = box_utils.center_form_to_corner_form(center_form_priors) + self.center_variance = center_variance + self.size_variance = size_variance + self.iou_threshold = iou_threshold + + def __call__(self, gt_boxes, gt_labels): + if type(gt_boxes) is np.ndarray: + gt_boxes = torch.from_numpy(gt_boxes) + if type(gt_labels) is np.ndarray: + gt_labels = torch.from_numpy(gt_labels) + boxes, labels = box_utils.assign_priors(gt_boxes, gt_labels, self.corner_form_priors, self.iou_threshold) + boxes = box_utils.corner_form_to_center_form(boxes) + locations = box_utils.convert_boxes_to_locations( + boxes, self.center_form_priors, self.center_variance, self.size_variance + ) + return locations, labels + + +def _xavier_init_(m: nn.Module): + if isinstance(m, nn.Conv2d): + nn.init.xavier_uniform_(m.weight) diff --git a/modules/PytorchSSD/ssd/mobilenet_v2_ssd_lite.py b/modules/PytorchSSD/ssd/mobilenet_v2_ssd_lite.py new file mode 100644 index 0000000..ffa2a1d --- /dev/null +++ b/modules/PytorchSSD/ssd/mobilenet_v2_ssd_lite.py @@ -0,0 +1,97 @@ +import torch +from torch.nn import Conv2d, Sequential, ModuleList, BatchNorm2d +from torch import nn +from ..nn.mobilenet_v2 import MobileNetV2, InvertedResidual + +from .ssd import SSD, GraphPath +from .predictor import Predictor +from .config import mobilenetv1_ssd_config as config + + +def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, onnx_compatible=False): + """Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.""" + ReLU = nn.ReLU if onnx_compatible else nn.ReLU6 + return Sequential( + Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=kernel_size, + groups=in_channels, + stride=stride, + padding=padding, + ), + BatchNorm2d(in_channels), + ReLU(), + Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1), + ) + + +def create_mobilenetv2_ssd_lite(num_classes, width_mult=1.0, use_batch_norm=True, onnx_compatible=False, is_test=False): + base_net = MobileNetV2( + width_mult=width_mult, use_batch_norm=use_batch_norm, onnx_compatible=onnx_compatible + ).features + + source_layer_indexes = [ + GraphPath(14, "conv", 3), + 19, + ] + extras = ModuleList( + [ + InvertedResidual(1280, 512, stride=2, expand_ratio=0.2), + InvertedResidual(512, 256, stride=2, expand_ratio=0.25), + InvertedResidual(256, 256, stride=2, expand_ratio=0.5), + InvertedResidual(256, 64, stride=2, expand_ratio=0.25), + ] + ) + + regression_headers = ModuleList( + [ + SeperableConv2d( + in_channels=round(576 * width_mult), out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False + ), + SeperableConv2d(in_channels=1280, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + Conv2d(in_channels=64, out_channels=6 * 4, kernel_size=1), + ] + ) + + classification_headers = ModuleList( + [ + SeperableConv2d( + in_channels=round(576 * width_mult), out_channels=6 * num_classes, kernel_size=3, padding=1 + ), + SeperableConv2d(in_channels=1280, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=64, out_channels=6 * num_classes, kernel_size=1), + ] + ) + + return SSD( + num_classes, + base_net, + source_layer_indexes, + extras, + classification_headers, + regression_headers, + is_test=is_test, + config=config, + ) + + +def create_mobilenetv2_ssd_lite_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None): + predictor = Predictor( + net, + config.image_size, + config.image_mean, + config.image_std, + nms_method=nms_method, + iou_threshold=config.iou_threshold, + candidate_size=candidate_size, + sigma=sigma, + device=device, + ) + return predictor diff --git a/modules/PytorchSSD/ssd/mobilenetv1_ssd.py b/modules/PytorchSSD/ssd/mobilenetv1_ssd.py new file mode 100644 index 0000000..4496d1d --- /dev/null +++ b/modules/PytorchSSD/ssd/mobilenetv1_ssd.py @@ -0,0 +1,96 @@ +import torch +from torch.nn import Conv2d, Sequential, ModuleList, ReLU +from ..nn.mobilenet import MobileNetV1 + +from .ssd import SSD +from .predictor import Predictor +from .config import mobilenetv1_ssd_config as config + + +def create_mobilenetv1_ssd(num_classes, is_test=False): + base_net = MobileNetV1(1001).model # disable dropout layer + + source_layer_indexes = [ + 12, + 14, + ] + extras = ModuleList( + [ + Sequential( + Conv2d(in_channels=1024, out_channels=256, kernel_size=1), + ReLU(), + Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=512, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + ] + ) + + regression_headers = ModuleList( + [ + Conv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=1024, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d( + in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1 + ), # TODO: change to kernel_size=1, padding=0? + ] + ) + + classification_headers = ModuleList( + [ + Conv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=1024, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d( + in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1 + ), # TODO: change to kernel_size=1, padding=0? + ] + ) + + return SSD( + num_classes, + base_net, + source_layer_indexes, + extras, + classification_headers, + regression_headers, + is_test=is_test, + config=config, + ) + + +def create_mobilenetv1_ssd_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None): + predictor = Predictor( + net, + config.image_size, + config.image_mean, + config.image_std, + nms_method=nms_method, + iou_threshold=config.iou_threshold, + candidate_size=candidate_size, + sigma=sigma, + device=device, + ) + return predictor diff --git a/modules/PytorchSSD/ssd/mobilenetv1_ssd_lite.py b/modules/PytorchSSD/ssd/mobilenetv1_ssd_lite.py new file mode 100644 index 0000000..f496623 --- /dev/null +++ b/modules/PytorchSSD/ssd/mobilenetv1_ssd_lite.py @@ -0,0 +1,104 @@ +import torch +from torch.nn import Conv2d, Sequential, ModuleList, ReLU, BatchNorm2d +from ..nn.mobilenet import MobileNetV1 + +from .ssd import SSD +from .predictor import Predictor +from .config import mobilenetv1_ssd_config as config + + +def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0): + """Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.""" + return Sequential( + Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=kernel_size, + groups=in_channels, + stride=stride, + padding=padding, + ), + ReLU(), + Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1), + ) + + +def create_mobilenetv1_ssd_lite(num_classes, is_test=False): + base_net = MobileNetV1(1001).model # disable dropout layer + + source_layer_indexes = [ + 12, + 14, + ] + extras = ModuleList( + [ + Sequential( + Conv2d(in_channels=1024, out_channels=256, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1), + ), + Sequential( + Conv2d(in_channels=512, out_channels=128, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ), + ] + ) + + regression_headers = ModuleList( + [ + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=1024, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=1), + ] + ) + + classification_headers = ModuleList( + [ + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=1024, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=1), + ] + ) + + return SSD( + num_classes, + base_net, + source_layer_indexes, + extras, + classification_headers, + regression_headers, + is_test=is_test, + config=config, + ) + + +def create_mobilenetv1_ssd_lite_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None): + predictor = Predictor( + net, + config.image_size, + config.image_mean, + config.image_std, + nms_method=nms_method, + iou_threshold=config.iou_threshold, + candidate_size=candidate_size, + sigma=sigma, + device=device, + ) + return predictor diff --git a/modules/PytorchSSD/ssd/mobilenetv3_ssd_lite.py b/modules/PytorchSSD/ssd/mobilenetv3_ssd_lite.py new file mode 100644 index 0000000..2d0f02b --- /dev/null +++ b/modules/PytorchSSD/ssd/mobilenetv3_ssd_lite.py @@ -0,0 +1,147 @@ +import torch +from torch.nn import Conv2d, Sequential, ModuleList, BatchNorm2d +from torch import nn +from ..nn.mobilenetv3 import MobileNetV3_Large, MobileNetV3_Small, Block, hswish + +from .ssd import SSD +from .predictor import Predictor +from .config import mobilenetv1_ssd_config as config + + +def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, onnx_compatible=False): + """Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.""" + ReLU = nn.ReLU if onnx_compatible else nn.ReLU6 + return Sequential( + Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=kernel_size, + groups=in_channels, + stride=stride, + padding=padding, + ), + BatchNorm2d(in_channels), + ReLU(), + Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1), + ) + + +def create_mobilenetv3_large_ssd_lite( + num_classes, width_mult=1.0, use_batch_norm=True, onnx_compatible=False, is_test=False +): + base_net = MobileNetV3_Large().features + + source_layer_indexes = [15, 21] + extras = ModuleList( + [ + Block(3, 960, 256, 512, hswish(), None, stride=2), + Block(3, 512, 128, 256, hswish(), None, stride=2), + Block(3, 256, 128, 256, hswish(), None, stride=2), + Block(3, 256, 64, 64, hswish(), None, stride=2), + ] + ) + + regression_headers = ModuleList( + [ + SeperableConv2d( + in_channels=round(112 * width_mult), out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False + ), + SeperableConv2d(in_channels=960, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + Conv2d(in_channels=64, out_channels=6 * 4, kernel_size=1), + ] + ) + + classification_headers = ModuleList( + [ + SeperableConv2d( + in_channels=round(112 * width_mult), out_channels=6 * num_classes, kernel_size=3, padding=1 + ), + SeperableConv2d(in_channels=960, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=64, out_channels=6 * num_classes, kernel_size=1), + ] + ) + + return SSD( + num_classes, + base_net, + source_layer_indexes, + extras, + classification_headers, + regression_headers, + is_test=is_test, + config=config, + ) + + +def create_mobilenetv3_small_ssd_lite( + num_classes, width_mult=1.0, use_batch_norm=True, onnx_compatible=False, is_test=False +): + base_net = MobileNetV3_Small().features + + source_layer_indexes = [11, 17] + extras = ModuleList( + [ + Block(3, 576, 256, 512, hswish(), None, stride=2), + Block(3, 512, 128, 256, hswish(), None, stride=2), + Block(3, 256, 128, 256, hswish(), None, stride=2), + Block(3, 256, 64, 64, hswish(), None, stride=2), + ] + ) + + regression_headers = ModuleList( + [ + SeperableConv2d( + in_channels=round(48 * width_mult), out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False + ), + SeperableConv2d(in_channels=576, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1, onnx_compatible=False), + Conv2d(in_channels=64, out_channels=6 * 4, kernel_size=1), + ] + ) + + classification_headers = ModuleList( + [ + SeperableConv2d(in_channels=round(48 * width_mult), out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=576, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=64, out_channels=6 * num_classes, kernel_size=1), + ] + ) + + return SSD( + num_classes, + base_net, + source_layer_indexes, + extras, + classification_headers, + regression_headers, + is_test=is_test, + config=config, + ) + + +def create_mobilenetv3_ssd_lite_predictor( + net, candidate_size=200, nms_method=None, sigma=0.5, device=torch.device("cpu") +): + predictor = Predictor( + net, + config.image_size, + config.image_mean, + config.image_std, + nms_method=nms_method, + iou_threshold=config.iou_threshold, + candidate_size=candidate_size, + sigma=sigma, + device=device, + ) + return predictor diff --git a/modules/PytorchSSD/ssd/predictor.py b/modules/PytorchSSD/ssd/predictor.py new file mode 100644 index 0000000..5e75206 --- /dev/null +++ b/modules/PytorchSSD/ssd/predictor.py @@ -0,0 +1,85 @@ +import torch + +from ..utils import box_utils +from .data_preprocessing import PredictionTransform +from ..utils.misc import Timer + + +class Predictor: + def __init__( + self, + net, + size, + mean=0.0, + std=1.0, + nms_method=None, + iou_threshold=0.45, + filter_threshold=0.01, + candidate_size=200, + sigma=0.5, + device=None, + ): + self.net = net + self.transform = PredictionTransform(size, mean, std) + self.iou_threshold = iou_threshold + self.filter_threshold = filter_threshold + self.candidate_size = candidate_size + self.nms_method = nms_method + + self.sigma = sigma + if device: + self.device = device + else: + self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + self.net.to(self.device) + self.net.eval() + + self.timer = Timer() + + def predict(self, image, top_k=-1, prob_threshold=None): + cpu_device = torch.device("cpu") + height, width, _ = image.shape + image = self.transform(image) + images = image.unsqueeze(0) + images = images.to(self.device) + with torch.no_grad(): + self.timer.start() + scores, boxes = self.net.forward(images) + # print("Inference time: ", self.timer.end()) + boxes = boxes[0] + scores = scores[0] + if not prob_threshold: + prob_threshold = self.filter_threshold + # this version of nms is slower on GPU, so we move data to CPU. + boxes = boxes.to(cpu_device) + scores = scores.to(cpu_device) + picked_box_probs = [] + picked_labels = [] + for class_index in range(1, scores.size(1)): + probs = scores[:, class_index] + mask = probs > prob_threshold + probs = probs[mask] + if probs.size(0) == 0: + continue + subset_boxes = boxes[mask, :] + box_probs = torch.cat([subset_boxes, probs.reshape(-1, 1)], dim=1) + box_probs = box_utils.nms( + box_probs, + self.nms_method, + score_threshold=prob_threshold, + iou_threshold=self.iou_threshold, + sigma=self.sigma, + top_k=top_k, + candidate_size=self.candidate_size, + ) + picked_box_probs.append(box_probs) + picked_labels.extend([class_index] * box_probs.size(0)) + if not picked_box_probs: + return torch.tensor([]), torch.tensor([]), torch.tensor([]) + picked_box_probs = torch.cat(picked_box_probs) + picked_box_probs[:, 0] *= width + picked_box_probs[:, 1] *= height + picked_box_probs[:, 2] *= width + picked_box_probs[:, 3] *= height + return picked_box_probs[:, :4], torch.tensor(picked_labels), picked_box_probs[:, 4] diff --git a/modules/PytorchSSD/ssd/squeezenet_ssd_lite.py b/modules/PytorchSSD/ssd/squeezenet_ssd_lite.py new file mode 100644 index 0000000..68cb458 --- /dev/null +++ b/modules/PytorchSSD/ssd/squeezenet_ssd_lite.py @@ -0,0 +1,108 @@ +import torch +from torch.nn import Conv2d, Sequential, ModuleList, ReLU +from ..nn.squeezenet import squeezenet1_1 + +from .ssd import SSD +from .predictor import Predictor +from .config import squeezenet_ssd_config as config + + +def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0): + """Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.""" + return Sequential( + Conv2d( + in_channels=in_channels, + out_channels=in_channels, + kernel_size=kernel_size, + groups=in_channels, + stride=stride, + padding=padding, + ), + ReLU(), + Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1), + ) + + +def create_squeezenet_ssd_lite(num_classes, is_test=False): + base_net = squeezenet1_1(False).features # disable dropout layer + + source_layer_indexes = [12] + extras = ModuleList( + [ + Sequential( + Conv2d(in_channels=512, out_channels=256, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=2), + ), + Sequential( + Conv2d(in_channels=512, out_channels=256, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1), + ), + Sequential( + Conv2d(in_channels=512, out_channels=128, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + SeperableConv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ), + ] + ) + + regression_headers = ModuleList( + [ + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=1), + ] + ) + + classification_headers = ModuleList( + [ + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + SeperableConv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=1), + ] + ) + + return SSD( + num_classes, + base_net, + source_layer_indexes, + extras, + classification_headers, + regression_headers, + is_test=is_test, + config=config, + ) + + +def create_squeezenet_ssd_lite_predictor( + net, candidate_size=200, nms_method=None, sigma=0.5, device=torch.device("cpu") +): + predictor = Predictor( + net, + config.image_size, + config.image_mean, + config.image_std, + nms_method=nms_method, + iou_threshold=config.iou_threshold, + candidate_size=candidate_size, + sigma=sigma, + device=device, + ) + return predictor diff --git a/modules/PytorchSSD/ssd/ssd.py b/modules/PytorchSSD/ssd/ssd.py new file mode 100644 index 0000000..365169d --- /dev/null +++ b/modules/PytorchSSD/ssd/ssd.py @@ -0,0 +1,178 @@ +import torch.nn as nn +import torch +import numpy as np +from typing import List, Tuple +import torch.nn.functional as F + +from ..utils import box_utils +from collections import namedtuple + +GraphPath = namedtuple("GraphPath", ["s0", "name", "s1"]) # + + +class SSD(nn.Module): + def __init__( + self, + num_classes: int, + base_net: nn.ModuleList, + source_layer_indexes: List[int], + extras: nn.ModuleList, + classification_headers: nn.ModuleList, + regression_headers: nn.ModuleList, + is_test=False, + config=None, + device=None, + ): + """Compose a SSD model using the given components.""" + super(SSD, self).__init__() + + self.num_classes = num_classes + self.base_net = base_net + self.source_layer_indexes = source_layer_indexes + self.extras = extras + self.classification_headers = classification_headers + self.regression_headers = regression_headers + self.is_test = is_test + self.config = config + + # register layers in source_layer_indexes by adding them to a module list + self.source_layer_add_ons = nn.ModuleList( + [t[1] for t in source_layer_indexes if isinstance(t, tuple) and not isinstance(t, GraphPath)] + ) + if device: + self.device = device + else: + self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + if is_test: + self.config = config + self.priors = config.priors.to(self.device) + + def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: + confidences = [] + locations = [] + start_layer_index = 0 + header_index = 0 + for end_layer_index in self.source_layer_indexes: + if isinstance(end_layer_index, GraphPath): + path = end_layer_index + end_layer_index = end_layer_index.s0 + added_layer = None + elif isinstance(end_layer_index, tuple): + added_layer = end_layer_index[1] + end_layer_index = end_layer_index[0] + path = None + else: + added_layer = None + path = None + for layer in self.base_net[start_layer_index:end_layer_index]: + x = layer(x) + if added_layer: + y = added_layer(x) + else: + y = x + if path: + sub = getattr(self.base_net[end_layer_index], path.name) + for layer in sub[: path.s1]: + x = layer(x) + y = x + for layer in sub[path.s1 :]: + x = layer(x) + end_layer_index += 1 + start_layer_index = end_layer_index + confidence, location = self.compute_header(header_index, y) + header_index += 1 + confidences.append(confidence) + locations.append(location) + + for layer in self.base_net[end_layer_index:]: + x = layer(x) + + for layer in self.extras: + x = layer(x) + confidence, location = self.compute_header(header_index, x) + header_index += 1 + confidences.append(confidence) + locations.append(location) + + confidences = torch.cat(confidences, 1) + locations = torch.cat(locations, 1) + + if self.is_test: + confidences = F.softmax(confidences, dim=2) + boxes = box_utils.convert_locations_to_boxes( + locations, self.priors, self.config.center_variance, self.config.size_variance + ) + boxes = box_utils.center_form_to_corner_form(boxes) + return confidences, boxes + else: + return confidences, locations + + def compute_header(self, i, x): + confidence = self.classification_headers[i](x) + confidence = confidence.permute(0, 2, 3, 1).contiguous() + confidence = confidence.view(confidence.size(0), -1, self.num_classes) + + location = self.regression_headers[i](x) + location = location.permute(0, 2, 3, 1).contiguous() + location = location.view(location.size(0), -1, 4) + + return confidence, location + + def init_from_base_net(self, model): + self.base_net.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage), strict=True) + self.source_layer_add_ons.apply(_xavier_init_) + self.extras.apply(_xavier_init_) + self.classification_headers.apply(_xavier_init_) + self.regression_headers.apply(_xavier_init_) + + def init_from_pretrained_ssd(self, model): + state_dict = torch.load(model, map_location=lambda storage, loc: storage) + state_dict = { + k: v + for k, v in state_dict.items() + if not (k.startswith("classification_headers") or k.startswith("regression_headers")) + } + model_dict = self.state_dict() + model_dict.update(state_dict) + self.load_state_dict(model_dict) + self.classification_headers.apply(_xavier_init_) + self.regression_headers.apply(_xavier_init_) + + def init(self): + self.base_net.apply(_xavier_init_) + self.source_layer_add_ons.apply(_xavier_init_) + self.extras.apply(_xavier_init_) + self.classification_headers.apply(_xavier_init_) + self.regression_headers.apply(_xavier_init_) + + def load(self, model): + self.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage)) + + def save(self, model_path): + torch.save(self.state_dict(), model_path) + + +class MatchPrior(object): + def __init__(self, center_form_priors, center_variance, size_variance, iou_threshold): + self.center_form_priors = center_form_priors + self.corner_form_priors = box_utils.center_form_to_corner_form(center_form_priors) + self.center_variance = center_variance + self.size_variance = size_variance + self.iou_threshold = iou_threshold + + def __call__(self, gt_boxes, gt_labels): + if type(gt_boxes) is np.ndarray: + gt_boxes = torch.from_numpy(gt_boxes) + if type(gt_labels) is np.ndarray: + gt_labels = torch.from_numpy(gt_labels) + boxes, labels = box_utils.assign_priors(gt_boxes, gt_labels, self.corner_form_priors, self.iou_threshold) + boxes = box_utils.corner_form_to_center_form(boxes) + locations = box_utils.convert_boxes_to_locations( + boxes, self.center_form_priors, self.center_variance, self.size_variance + ) + return locations, labels + + +def _xavier_init_(m: nn.Module): + if isinstance(m, nn.Conv2d): + nn.init.xavier_uniform_(m.weight) diff --git a/modules/PytorchSSD/ssd/vgg_ssd.py b/modules/PytorchSSD/ssd/vgg_ssd.py new file mode 100644 index 0000000..0e3b187 --- /dev/null +++ b/modules/PytorchSSD/ssd/vgg_ssd.py @@ -0,0 +1,96 @@ +import torch +from torch.nn import Conv2d, Sequential, ModuleList, ReLU, BatchNorm2d +from ..nn.vgg import vgg + +from .ssd import SSD +from .predictor import Predictor +from .config import vgg_ssd_config as config + + +def create_vgg_ssd(num_classes, is_test=False): + vgg_config = [64, 64, "M", 128, 128, "M", 256, 256, 256, "C", 512, 512, 512, "M", 512, 512, 512] + base_net = ModuleList(vgg(vgg_config)) + + source_layer_indexes = [ + (23, BatchNorm2d(512)), + len(base_net), + ] + extras = ModuleList( + [ + Sequential( + Conv2d(in_channels=1024, out_channels=256, kernel_size=1), + ReLU(), + Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=512, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1), + ReLU(), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3), + ReLU(), + ), + Sequential( + Conv2d(in_channels=256, out_channels=128, kernel_size=1), + ReLU(), + Conv2d(in_channels=128, out_channels=256, kernel_size=3), + ReLU(), + ), + ] + ) + + regression_headers = ModuleList( + [ + Conv2d(in_channels=512, out_channels=4 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=1024, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=512, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * 4, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=4 * 4, kernel_size=3, padding=1), + Conv2d( + in_channels=256, out_channels=4 * 4, kernel_size=3, padding=1 + ), # TODO: change to kernel_size=1, padding=0? + ] + ) + + classification_headers = ModuleList( + [ + Conv2d(in_channels=512, out_channels=4 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=1024, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=512, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=6 * num_classes, kernel_size=3, padding=1), + Conv2d(in_channels=256, out_channels=4 * num_classes, kernel_size=3, padding=1), + Conv2d( + in_channels=256, out_channels=4 * num_classes, kernel_size=3, padding=1 + ), # TODO: change to kernel_size=1, padding=0? + ] + ) + + return SSD( + num_classes, + base_net, + source_layer_indexes, + extras, + classification_headers, + regression_headers, + is_test=is_test, + config=config, + ) + + +def create_vgg_ssd_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None): + predictor = Predictor( + net, + config.image_size, + config.image_mean, + nms_method=nms_method, + iou_threshold=config.iou_threshold, + candidate_size=candidate_size, + sigma=sigma, + device=device, + ) + return predictor diff --git a/modules/PytorchSSD/transforms/transforms.py b/modules/PytorchSSD/transforms/transforms.py new file mode 100644 index 0000000..398c1c2 --- /dev/null +++ b/modules/PytorchSSD/transforms/transforms.py @@ -0,0 +1,397 @@ +# from https://github.com/amdegroot/ssd.pytorch + + +import torch +from torchvision import transforms +import cv2 +import numpy as np +import types +from numpy import random + + +def intersect(box_a, box_b): + max_xy = np.minimum(box_a[:, 2:], box_b[2:]) + min_xy = np.maximum(box_a[:, :2], box_b[:2]) + inter = np.clip((max_xy - min_xy), a_min=0, a_max=np.inf) + return inter[:, 0] * inter[:, 1] + + +def jaccard_numpy(box_a, box_b): + """Compute the jaccard overlap of two sets of boxes. The jaccard overlap + is simply the intersection over union of two boxes. + E.g.: + A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B) + Args: + box_a: Multiple bounding boxes, Shape: [num_boxes,4] + box_b: Single bounding box, Shape: [4] + Return: + jaccard overlap: Shape: [box_a.shape[0], box_a.shape[1]] + """ + inter = intersect(box_a, box_b) + area_a = (box_a[:, 2] - box_a[:, 0]) * (box_a[:, 3] - box_a[:, 1]) # [A,B] + area_b = (box_b[2] - box_b[0]) * (box_b[3] - box_b[1]) # [A,B] + union = area_a + area_b - inter + return inter / union # [A,B] + + +class Compose(object): + """Composes several augmentations together. + Args: + transforms (List[Transform]): list of transforms to compose. + Example: + >>> augmentations.Compose([ + >>> transforms.CenterCrop(10), + >>> transforms.ToTensor(), + >>> ]) + """ + + def __init__(self, transforms): + self.transforms = transforms + + def __call__(self, img, boxes=None, labels=None): + for t in self.transforms: + img, boxes, labels = t(img, boxes, labels) + return img, boxes, labels + + +class Lambda(object): + """Applies a lambda as a transform.""" + + def __init__(self, lambd): + assert isinstance(lambd, types.LambdaType) + self.lambd = lambd + + def __call__(self, img, boxes=None, labels=None): + return self.lambd(img, boxes, labels) + + +class ConvertFromInts(object): + def __call__(self, image, boxes=None, labels=None): + return image.astype(np.float32), boxes, labels + + +class SubtractMeans(object): + def __init__(self, mean): + self.mean = np.array(mean, dtype=np.float32) + + def __call__(self, image, boxes=None, labels=None): + image = image.astype(np.float32) + image -= self.mean + return image.astype(np.float32), boxes, labels + + +class ToAbsoluteCoords(object): + def __call__(self, image, boxes=None, labels=None): + height, width, channels = image.shape + boxes[:, 0] *= width + boxes[:, 2] *= width + boxes[:, 1] *= height + boxes[:, 3] *= height + + return image, boxes, labels + + +class ToPercentCoords(object): + def __call__(self, image, boxes=None, labels=None): + height, width, channels = image.shape + boxes[:, 0] /= width + boxes[:, 2] /= width + boxes[:, 1] /= height + boxes[:, 3] /= height + + return image, boxes, labels + + +class Resize(object): + def __init__(self, size=300): + self.size = size + + def __call__(self, image, boxes=None, labels=None): + image = cv2.resize(image, (self.size, self.size)) + return image, boxes, labels + + +class RandomSaturation(object): + def __init__(self, lower=0.5, upper=1.5): + self.lower = lower + self.upper = upper + assert self.upper >= self.lower, "contrast upper must be >= lower." + assert self.lower >= 0, "contrast lower must be non-negative." + + def __call__(self, image, boxes=None, labels=None): + if random.randint(2): + image[:, :, 1] *= random.uniform(self.lower, self.upper) + + return image, boxes, labels + + +class RandomHue(object): + def __init__(self, delta=18.0): + assert delta >= 0.0 and delta <= 360.0 + self.delta = delta + + def __call__(self, image, boxes=None, labels=None): + if random.randint(2): + image[:, :, 0] += random.uniform(-self.delta, self.delta) + image[:, :, 0][image[:, :, 0] > 360.0] -= 360.0 + image[:, :, 0][image[:, :, 0] < 0.0] += 360.0 + return image, boxes, labels + + +class RandomLightingNoise(object): + def __init__(self): + self.perms = ((0, 1, 2), (0, 2, 1), (1, 0, 2), (1, 2, 0), (2, 0, 1), (2, 1, 0)) + + def __call__(self, image, boxes=None, labels=None): + if random.randint(2): + swap = self.perms[random.randint(len(self.perms))] + shuffle = SwapChannels(swap) # shuffle channels + image = shuffle(image) + return image, boxes, labels + + +class ConvertColor(object): + def __init__(self, current, transform): + self.transform = transform + self.current = current + + def __call__(self, image, boxes=None, labels=None): + if self.current == "BGR" and self.transform == "HSV": + image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) + elif self.current == "RGB" and self.transform == "HSV": + image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV) + elif self.current == "BGR" and self.transform == "RGB": + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + elif self.current == "HSV" and self.transform == "BGR": + image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR) + elif self.current == "HSV" and self.transform == "RGB": + image = cv2.cvtColor(image, cv2.COLOR_HSV2RGB) + else: + raise NotImplementedError + return image, boxes, labels + + +class RandomContrast(object): + def __init__(self, lower=0.5, upper=1.5): + self.lower = lower + self.upper = upper + assert self.upper >= self.lower, "contrast upper must be >= lower." + assert self.lower >= 0, "contrast lower must be non-negative." + + # expects float image + def __call__(self, image, boxes=None, labels=None): + if random.randint(2): + alpha = random.uniform(self.lower, self.upper) + image *= alpha + return image, boxes, labels + + +class RandomBrightness(object): + def __init__(self, delta=32): + assert delta >= 0.0 + assert delta <= 255.0 + self.delta = delta + + def __call__(self, image, boxes=None, labels=None): + if random.randint(2): + delta = random.uniform(-self.delta, self.delta) + image += delta + return image, boxes, labels + + +class ToCV2Image(object): + def __call__(self, tensor, boxes=None, labels=None): + return tensor.cpu().numpy().astype(np.float32).transpose((1, 2, 0)), boxes, labels + + +class ToTensor(object): + def __call__(self, cvimage, boxes=None, labels=None): + return torch.from_numpy(cvimage.astype(np.float32)).permute(2, 0, 1), boxes, labels + + +class RandomSampleCrop(object): + """Crop + Arguments: + img (Image): the image being input during training + boxes (Tensor): the original bounding boxes in pt form + labels (Tensor): the class labels for each bbox + mode (float tuple): the min and max jaccard overlaps + Return: + (img, boxes, classes) + img (Image): the cropped image + boxes (Tensor): the adjusted bounding boxes in pt form + labels (Tensor): the class labels for each bbox + """ + + def __init__(self): + self.sample_options = ( + # using entire original input image + None, + # sample a patch s.t. MIN jaccard w/ obj in .1,.3,.4,.7,.9 + (0.1, None), + (0.3, None), + (0.7, None), + (0.9, None), + # randomly sample a patch + (None, None), + ) + + def __call__(self, image, boxes=None, labels=None): + height, width, _ = image.shape + while True: + # randomly choose a mode + random_idx = random.randint(0, len(self.sample_options) - 1) + mode = self.sample_options[random_idx] + if mode is None: + return image, boxes, labels + + min_iou, max_iou = mode + if min_iou is None: + min_iou = float("-inf") + if max_iou is None: + max_iou = float("inf") + + # max trails (50) + for _ in range(50): + current_image = image + + w = random.uniform(0.3 * width, width) + h = random.uniform(0.3 * height, height) + + # aspect ratio constraint b/t .5 & 2 + if h / w < 0.5 or h / w > 2: + continue + + left = random.uniform(width - w) + top = random.uniform(height - h) + + # convert to integer rect x1,y1,x2,y2 + rect = np.array([int(left), int(top), int(left + w), int(top + h)]) + + # calculate IoU (jaccard overlap) b/t the cropped and gt boxes + overlap = jaccard_numpy(boxes, rect) + + # is min and max overlap constraint satisfied? if not try again + if overlap.min() < min_iou and max_iou < overlap.max(): + continue + + # cut the crop from the image + current_image = current_image[rect[1] : rect[3], rect[0] : rect[2], :] + + # keep overlap with gt box IF center in sampled patch + centers = (boxes[:, :2] + boxes[:, 2:]) / 2.0 + + # mask in all gt boxes that above and to the left of centers + m1 = (rect[0] < centers[:, 0]) * (rect[1] < centers[:, 1]) + + # mask in all gt boxes that under and to the right of centers + m2 = (rect[2] > centers[:, 0]) * (rect[3] > centers[:, 1]) + + # mask in that both m1 and m2 are true + mask = m1 * m2 + + # have any valid boxes? try again if not + if not mask.any(): + continue + + # take only matching gt boxes + current_boxes = boxes[mask, :].copy() + + # take only matching gt labels + current_labels = labels[mask] + + # should we use the box left and top corner or the crop's + current_boxes[:, :2] = np.maximum(current_boxes[:, :2], rect[:2]) + # adjust to crop (by substracting crop's left,top) + current_boxes[:, :2] -= rect[:2] + + current_boxes[:, 2:] = np.minimum(current_boxes[:, 2:], rect[2:]) + # adjust to crop (by substracting crop's left,top) + current_boxes[:, 2:] -= rect[:2] + + return current_image, current_boxes, current_labels + + +class Expand(object): + def __init__(self, mean): + self.mean = mean + + def __call__(self, image, boxes, labels): + if random.randint(2): + return image, boxes, labels + + height, width, depth = image.shape + ratio = random.uniform(1, 4) + left = random.uniform(0, width * ratio - width) + top = random.uniform(0, height * ratio - height) + + expand_image = np.zeros((int(height * ratio), int(width * ratio), depth), dtype=image.dtype) + expand_image[:, :, :] = self.mean + expand_image[int(top) : int(top + height), int(left) : int(left + width)] = image + image = expand_image + + boxes = boxes.copy() + boxes[:, :2] += (int(left), int(top)) + boxes[:, 2:] += (int(left), int(top)) + + return image, boxes, labels + + +class RandomMirror(object): + def __call__(self, image, boxes, classes): + _, width, _ = image.shape + if random.randint(2): + image = image[:, ::-1] + boxes = boxes.copy() + boxes[:, 0::2] = width - boxes[:, 2::-2] + return image, boxes, classes + + +class SwapChannels(object): + """Transforms a tensorized image by swapping the channels in the order + specified in the swap tuple. + Args: + swaps (int triple): final order of channels + eg: (2, 1, 0) + """ + + def __init__(self, swaps): + self.swaps = swaps + + def __call__(self, image): + """ + Args: + image (Tensor): image tensor to be transformed + Return: + a tensor with channels swapped according to swap + """ + # if torch.is_tensor(image): + # image = image.data.cpu().numpy() + # else: + # image = np.array(image) + image = image[:, :, self.swaps] + return image + + +class PhotometricDistort(object): + def __init__(self): + self.pd = [ + RandomContrast(), # RGB + ConvertColor(current="RGB", transform="HSV"), # HSV + RandomSaturation(), # HSV + RandomHue(), # HSV + ConvertColor(current="HSV", transform="RGB"), # RGB + RandomContrast(), # RGB + ] + self.rand_brightness = RandomBrightness() + self.rand_light_noise = RandomLightingNoise() + + def __call__(self, image, boxes, labels): + im = image.copy() + im, boxes, labels = self.rand_brightness(im, boxes, labels) + if random.randint(2): + distort = Compose(self.pd[:-1]) + else: + distort = Compose(self.pd[1:]) + im, boxes, labels = distort(im, boxes, labels) + return self.rand_light_noise(im, boxes, labels) diff --git a/modules/PytorchSSD/utils/__init__.py b/modules/PytorchSSD/utils/__init__.py new file mode 100644 index 0000000..0789bdb --- /dev/null +++ b/modules/PytorchSSD/utils/__init__.py @@ -0,0 +1 @@ +from .misc import * diff --git a/modules/PytorchSSD/utils/box_utils.py b/modules/PytorchSSD/utils/box_utils.py new file mode 100644 index 0000000..f5fea2a --- /dev/null +++ b/modules/PytorchSSD/utils/box_utils.py @@ -0,0 +1,273 @@ +import collections +import torch +import itertools +from typing import List +import math + +SSDBoxSizes = collections.namedtuple("SSDBoxSizes", ["min", "max"]) + +SSDSpec = collections.namedtuple("SSDSpec", ["feature_map_size", "shrinkage", "box_sizes", "aspect_ratios"]) + + +def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True) -> torch.Tensor: + """Generate SSD Prior Boxes. + + It returns the center, height and width of the priors. The values are relative to the image size + Args: + specs: SSDSpecs about the shapes of sizes of prior boxes. i.e. + specs = [ + SSDSpec(38, 8, SSDBoxSizes(30, 60), [2]), + SSDSpec(19, 16, SSDBoxSizes(60, 111), [2, 3]), + SSDSpec(10, 32, SSDBoxSizes(111, 162), [2, 3]), + SSDSpec(5, 64, SSDBoxSizes(162, 213), [2, 3]), + SSDSpec(3, 100, SSDBoxSizes(213, 264), [2]), + SSDSpec(1, 300, SSDBoxSizes(264, 315), [2]) + ] + image_size: image size. + clamp: if true, clamp the values to make fall between [0.0, 1.0] + Returns: + priors (num_priors, 4): The prior boxes represented as [[center_x, center_y, w, h]]. All the values + are relative to the image size. + """ + priors = [] + for spec in specs: + scale = image_size / spec.shrinkage + for j, i in itertools.product(range(spec.feature_map_size), repeat=2): + x_center = (i + 0.5) / scale + y_center = (j + 0.5) / scale + + # small sized square box + size = spec.box_sizes.min + h = w = size / image_size + priors.append([x_center, y_center, w, h]) + + # big sized square box + size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min) + h = w = size / image_size + priors.append([x_center, y_center, w, h]) + + # change h/w ratio of the small sized box + size = spec.box_sizes.min + h = w = size / image_size + for ratio in spec.aspect_ratios: + ratio = math.sqrt(ratio) + priors.append([x_center, y_center, w * ratio, h / ratio]) + priors.append([x_center, y_center, w / ratio, h * ratio]) + + priors = torch.tensor(priors) + if clamp: + torch.clamp(priors, 0.0, 1.0, out=priors) + return priors + + +def convert_locations_to_boxes(locations, priors, center_variance, size_variance): + """Convert regressional location results of SSD into boxes in the form of (center_x, center_y, h, w). + + The conversion: + $$predicted\_center * center_variance = \frac {real\_center - prior\_center} {prior\_hw}$$ + $$exp(predicted\_hw * size_variance) = \frac {real\_hw} {prior\_hw}$$ + We do it in the inverse direction here. + Args: + locations (batch_size, num_priors, 4): the regression output of SSD. It will contain the outputs as well. + priors (num_priors, 4) or (batch_size/1, num_priors, 4): prior boxes. + center_variance: a float used to change the scale of center. + size_variance: a float used to change of scale of size. + Returns: + boxes: priors: [[center_x, center_y, h, w]]. All the values + are relative to the image size. + """ + # priors can have one dimension less. + if priors.dim() + 1 == locations.dim(): + priors = priors.unsqueeze(0) + return torch.cat( + [ + locations[..., :2] * center_variance * priors[..., 2:] + priors[..., :2], + torch.exp(locations[..., 2:] * size_variance) * priors[..., 2:], + ], + dim=locations.dim() - 1, + ) + + +def convert_boxes_to_locations(center_form_boxes, center_form_priors, center_variance, size_variance): + # priors can have one dimension less + if center_form_priors.dim() + 1 == center_form_boxes.dim(): + center_form_priors = center_form_priors.unsqueeze(0) + return torch.cat( + [ + (center_form_boxes[..., :2] - center_form_priors[..., :2]) / center_form_priors[..., 2:] / center_variance, + torch.log(center_form_boxes[..., 2:] / center_form_priors[..., 2:]) / size_variance, + ], + dim=center_form_boxes.dim() - 1, + ) + + +def area_of(left_top, right_bottom) -> torch.Tensor: + """Compute the areas of rectangles given two corners. + + Args: + left_top (N, 2): left top corner. + right_bottom (N, 2): right bottom corner. + + Returns: + area (N): return the area. + """ + hw = torch.clamp(right_bottom - left_top, min=0.0) + return hw[..., 0] * hw[..., 1] + + +def iou_of(boxes0, boxes1, eps=1e-5): + """Return intersection-over-union (Jaccard index) of boxes. + + Args: + boxes0 (N, 4): ground truth boxes. + boxes1 (N or 1, 4): predicted boxes. + eps: a small number to avoid 0 as denominator. + Returns: + iou (N): IoU values. + """ + overlap_left_top = torch.max(boxes0[..., :2], boxes1[..., :2]) + overlap_right_bottom = torch.min(boxes0[..., 2:], boxes1[..., 2:]) + + overlap_area = area_of(overlap_left_top, overlap_right_bottom) + area0 = area_of(boxes0[..., :2], boxes0[..., 2:]) + area1 = area_of(boxes1[..., :2], boxes1[..., 2:]) + return overlap_area / (area0 + area1 - overlap_area + eps) + + +def assign_priors(gt_boxes, gt_labels, corner_form_priors, iou_threshold): + """Assign ground truth boxes and targets to priors. + + Args: + gt_boxes (num_targets, 4): ground truth boxes. + gt_labels (num_targets): labels of targets. + priors (num_priors, 4): corner form priors + Returns: + boxes (num_priors, 4): real values for priors. + labels (num_priros): labels for priors. + """ + # size: num_priors x num_targets + ious = iou_of(gt_boxes.unsqueeze(0), corner_form_priors.unsqueeze(1)) + # size: num_priors + best_target_per_prior, best_target_per_prior_index = ious.max(1) + # size: num_targets + best_prior_per_target, best_prior_per_target_index = ious.max(0) + + for target_index, prior_index in enumerate(best_prior_per_target_index): + best_target_per_prior_index[prior_index] = target_index + # 2.0 is used to make sure every target has a prior assigned + best_target_per_prior.index_fill_(0, best_prior_per_target_index, 2) + # size: num_priors + labels = gt_labels[best_target_per_prior_index] + labels[best_target_per_prior < iou_threshold] = 0 # the backgournd id + boxes = gt_boxes[best_target_per_prior_index] + return boxes, labels + + +def hard_negative_mining(loss, labels, neg_pos_ratio): + """ + It used to suppress the presence of a large number of negative prediction. + It works on image level not batch level. + For any example/image, it keeps all the positive predictions and + cut the number of negative predictions to make sure the ratio + between the negative examples and positive examples is no more + the given ratio for an image. + + Args: + loss (N, num_priors): the loss for each example. + labels (N, num_priors): the labels. + neg_pos_ratio: the ratio between the negative examples and positive examples. + """ + pos_mask = labels > 0 + num_pos = pos_mask.long().sum(dim=1, keepdim=True) + num_neg = num_pos * neg_pos_ratio + + loss[pos_mask] = -math.inf + _, indexes = loss.sort(dim=1, descending=True) + _, orders = indexes.sort(dim=1) + neg_mask = orders < num_neg + return pos_mask | neg_mask + + +def center_form_to_corner_form(locations): + return torch.cat( + [locations[..., :2] - locations[..., 2:] / 2, locations[..., :2] + locations[..., 2:] / 2], locations.dim() - 1 + ) + + +def corner_form_to_center_form(boxes): + return torch.cat([(boxes[..., :2] + boxes[..., 2:]) / 2, boxes[..., 2:] - boxes[..., :2]], boxes.dim() - 1) + + +def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200): + """ + + Args: + box_scores (N, 5): boxes in corner-form and probabilities. + iou_threshold: intersection over union threshold. + top_k: keep top_k results. If k <= 0, keep all the results. + candidate_size: only consider the candidates with the highest scores. + Returns: + picked: a list of indexes of the kept boxes + """ + scores = box_scores[:, -1] + boxes = box_scores[:, :-1] + picked = [] + _, indexes = scores.sort(descending=True) + indexes = indexes[:candidate_size] + while len(indexes) > 0: + current = indexes[0] + picked.append(current.item()) + if 0 < top_k == len(picked) or len(indexes) == 1: + break + current_box = boxes[current, :] + indexes = indexes[1:] + rest_boxes = boxes[indexes, :] + iou = iou_of( + rest_boxes, + current_box.unsqueeze(0), + ) + indexes = indexes[iou <= iou_threshold] + + return box_scores[picked, :] + + +def nms(box_scores, nms_method=None, score_threshold=None, iou_threshold=None, sigma=0.5, top_k=-1, candidate_size=200): + if nms_method == "soft": + return soft_nms(box_scores, score_threshold, sigma, top_k) + else: + return hard_nms(box_scores, iou_threshold, top_k, candidate_size=candidate_size) + + +def soft_nms(box_scores, score_threshold, sigma=0.5, top_k=-1): + """Soft NMS implementation. + + References: + https://arxiv.org/abs/1704.04503 + https://github.com/facebookresearch/Detectron/blob/master/detectron/utils/cython_nms.pyx + + Args: + box_scores (N, 5): boxes in corner-form and probabilities. + score_threshold: boxes with scores less than value are not considered. + sigma: the parameter in score re-computation. + scores[i] = scores[i] * exp(-(iou_i)^2 / simga) + top_k: keep top_k results. If k <= 0, keep all the results. + Returns: + picked_box_scores (K, 5): results of NMS. + """ + picked_box_scores = [] + while box_scores.size(0) > 0: + max_score_index = torch.argmax(box_scores[:, 4]) + cur_box_prob = torch.tensor(box_scores[max_score_index, :]) + picked_box_scores.append(cur_box_prob) + if len(picked_box_scores) == top_k > 0 or box_scores.size(0) == 1: + break + cur_box = cur_box_prob[:-1] + box_scores[max_score_index, :] = box_scores[-1, :] + box_scores = box_scores[:-1, :] + ious = iou_of(cur_box.unsqueeze(0), box_scores[:, :-1]) + box_scores[:, -1] = box_scores[:, -1] * torch.exp(-(ious * ious) / sigma) + box_scores = box_scores[box_scores[:, -1] > score_threshold, :] + if len(picked_box_scores) > 0: + return torch.stack(picked_box_scores) + else: + return torch.tensor([]) diff --git a/modules/PytorchSSD/utils/box_utils_numpy.py b/modules/PytorchSSD/utils/box_utils_numpy.py new file mode 100644 index 0000000..f13ef23 --- /dev/null +++ b/modules/PytorchSSD/utils/box_utils_numpy.py @@ -0,0 +1,224 @@ +from .box_utils import SSDSpec + +from typing import List +import itertools +import math +import numpy as np + + +def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True): + """Generate SSD Prior Boxes. + + It returns the center, height and width of the priors. The values are relative to the image size + Args: + specs: SSDSpecs about the shapes of sizes of prior boxes. i.e. + specs = [ + SSDSpec(38, 8, SSDBoxSizes(30, 60), [2]), + SSDSpec(19, 16, SSDBoxSizes(60, 111), [2, 3]), + SSDSpec(10, 32, SSDBoxSizes(111, 162), [2, 3]), + SSDSpec(5, 64, SSDBoxSizes(162, 213), [2, 3]), + SSDSpec(3, 100, SSDBoxSizes(213, 264), [2]), + SSDSpec(1, 300, SSDBoxSizes(264, 315), [2]) + ] + image_size: image size. + clamp: if true, clamp the values to make fall between [0.0, 1.0] + Returns: + priors (num_priors, 4): The prior boxes represented as [[center_x, center_y, w, h]]. All the values + are relative to the image size. + """ + priors = [] + for spec in specs: + scale = image_size / spec.shrinkage + for j, i in itertools.product(range(spec.feature_map_size), repeat=2): + x_center = (i + 0.5) / scale + y_center = (j + 0.5) / scale + + # small sized square box + size = spec.box_sizes.min + h = w = size / image_size + priors.append([x_center, y_center, w, h]) + + # big sized square box + size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min) + h = w = size / image_size + priors.append([x_center, y_center, w, h]) + + # change h/w ratio of the small sized box + size = spec.box_sizes.min + h = w = size / image_size + for ratio in spec.aspect_ratios: + ratio = math.sqrt(ratio) + priors.append([x_center, y_center, w * ratio, h / ratio]) + priors.append([x_center, y_center, w / ratio, h * ratio]) + + priors = np.array(priors, dtype=np.float32) + if clamp: + np.clip(priors, 0.0, 1.0, out=priors) + return priors + + +def convert_locations_to_boxes(locations, priors, center_variance, size_variance): + """Convert regressional location results of SSD into boxes in the form of (center_x, center_y, h, w). + + The conversion: + $$predicted\_center * center_variance = \frac {real\_center - prior\_center} {prior\_hw}$$ + $$exp(predicted\_hw * size_variance) = \frac {real\_hw} {prior\_hw}$$ + We do it in the inverse direction here. + Args: + locations (batch_size, num_priors, 4): the regression output of SSD. It will contain the outputs as well. + priors (num_priors, 4) or (batch_size/1, num_priors, 4): prior boxes. + center_variance: a float used to change the scale of center. + size_variance: a float used to change of scale of size. + Returns: + boxes: priors: [[center_x, center_y, h, w]]. All the values + are relative to the image size. + """ + # priors can have one dimension less. + if len(priors.shape) + 1 == len(locations.shape): + priors = np.expand_dims(priors, 0) + return np.concatenate( + [ + locations[..., :2] * center_variance * priors[..., 2:] + priors[..., :2], + np.exp(locations[..., 2:] * size_variance) * priors[..., 2:], + ], + axis=len(locations.shape) - 1, + ) + + +def convert_boxes_to_locations(center_form_boxes, center_form_priors, center_variance, size_variance): + # priors can have one dimension less + if len(center_form_priors.shape) + 1 == len(center_form_boxes.shape): + center_form_priors = np.expand_dims(center_form_priors, 0) + return np.concatenate( + [ + (center_form_boxes[..., :2] - center_form_priors[..., :2]) / center_form_priors[..., 2:] / center_variance, + np.log(center_form_boxes[..., 2:] / center_form_priors[..., 2:]) / size_variance, + ], + axis=len(center_form_boxes.shape) - 1, + ) + + +def area_of(left_top, right_bottom): + """Compute the areas of rectangles given two corners. + + Args: + left_top (N, 2): left top corner. + right_bottom (N, 2): right bottom corner. + + Returns: + area (N): return the area. + """ + hw = np.clip(right_bottom - left_top, 0.0, None) + return hw[..., 0] * hw[..., 1] + + +def iou_of(boxes0, boxes1, eps=1e-5): + """Return intersection-over-union (Jaccard index) of boxes. + + Args: + boxes0 (N, 4): ground truth boxes. + boxes1 (N or 1, 4): predicted boxes. + eps: a small number to avoid 0 as denominator. + Returns: + iou (N): IoU values. + """ + overlap_left_top = np.maximum(boxes0[..., :2], boxes1[..., :2]) + overlap_right_bottom = np.minimum(boxes0[..., 2:], boxes1[..., 2:]) + + overlap_area = area_of(overlap_left_top, overlap_right_bottom) + area0 = area_of(boxes0[..., :2], boxes0[..., 2:]) + area1 = area_of(boxes1[..., :2], boxes1[..., 2:]) + return overlap_area / (area0 + area1 - overlap_area + eps) + + +def center_form_to_corner_form(locations): + return np.concatenate( + [locations[..., :2] - locations[..., 2:] / 2, locations[..., :2] + locations[..., 2:] / 2], + len(locations.shape) - 1, + ) + + +def corner_form_to_center_form(boxes): + return np.concatenate( + [(boxes[..., :2] + boxes[..., 2:]) / 2, boxes[..., 2:] - boxes[..., :2]], len(boxes.shape) - 1 + ) + + +def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200): + """ + + Args: + box_scores (N, 5): boxes in corner-form and probabilities. + iou_threshold: intersection over union threshold. + top_k: keep top_k results. If k <= 0, keep all the results. + candidate_size: only consider the candidates with the highest scores. + Returns: + picked: a list of indexes of the kept boxes + """ + scores = box_scores[:, -1] + boxes = box_scores[:, :-1] + picked = [] + # _, indexes = scores.sort(descending=True) + indexes = np.argsort(scores) + # indexes = indexes[:candidate_size] + indexes = indexes[-candidate_size:] + while len(indexes) > 0: + # current = indexes[0] + current = indexes[-1] + picked.append(current) + if 0 < top_k == len(picked) or len(indexes) == 1: + break + current_box = boxes[current, :] + # indexes = indexes[1:] + indexes = indexes[:-1] + rest_boxes = boxes[indexes, :] + iou = iou_of( + rest_boxes, + np.expand_dims(current_box, axis=0), + ) + indexes = indexes[iou <= iou_threshold] + + return box_scores[picked, :] + + +# def nms(box_scores, nms_method=None, score_threshold=None, iou_threshold=None, +# sigma=0.5, top_k=-1, candidate_size=200): +# if nms_method == "soft": +# return soft_nms(box_scores, score_threshold, sigma, top_k) +# else: +# return hard_nms(box_scores, iou_threshold, top_k, candidate_size=candidate_size) + +# +# def soft_nms(box_scores, score_threshold, sigma=0.5, top_k=-1): +# """Soft NMS implementation. +# +# References: +# https://arxiv.org/abs/1704.04503 +# https://github.com/facebookresearch/Detectron/blob/master/detectron/utils/cython_nms.pyx +# +# Args: +# box_scores (N, 5): boxes in corner-form and probabilities. +# score_threshold: boxes with scores less than value are not considered. +# sigma: the parameter in score re-computation. +# scores[i] = scores[i] * exp(-(iou_i)^2 / simga) +# top_k: keep top_k results. If k <= 0, keep all the results. +# Returns: +# picked_box_scores (K, 5): results of NMS. +# """ +# picked_box_scores = [] +# while box_scores.size(0) > 0: +# max_score_index = torch.argmax(box_scores[:, 4]) +# cur_box_prob = torch.tensor(box_scores[max_score_index, :]) +# picked_box_scores.append(cur_box_prob) +# if len(picked_box_scores) == top_k > 0 or box_scores.size(0) == 1: +# break +# cur_box = cur_box_prob[:-1] +# box_scores[max_score_index, :] = box_scores[-1, :] +# box_scores = box_scores[:-1, :] +# ious = iou_of(cur_box.unsqueeze(0), box_scores[:, :-1]) +# box_scores[:, -1] = box_scores[:, -1] * torch.exp(-(ious * ious) / sigma) +# box_scores = box_scores[box_scores[:, -1] > score_threshold, :] +# if len(picked_box_scores) > 0: +# return torch.stack(picked_box_scores) +# else: +# return torch.tensor([]) diff --git a/modules/PytorchSSD/utils/measurements.py b/modules/PytorchSSD/utils/measurements.py new file mode 100644 index 0000000..81e1e3a --- /dev/null +++ b/modules/PytorchSSD/utils/measurements.py @@ -0,0 +1,32 @@ +import numpy as np + + +def compute_average_precision(precision, recall): + """ + It computes average precision based on the definition of Pascal Competition. It computes the under curve area + of precision and recall. Recall follows the normal definition. Precision is a variant. + pascal_precision[i] = typical_precision[i:].max() + """ + # identical but faster version of new_precision[i] = old_precision[i:].max() + precision = np.concatenate([[0.0], precision, [0.0]]) + for i in range(len(precision) - 1, 0, -1): + precision[i - 1] = np.maximum(precision[i - 1], precision[i]) + + # find the index where the value changes + recall = np.concatenate([[0.0], recall, [1.0]]) + changing_points = np.where(recall[1:] != recall[:-1])[0] + + # compute under curve area + areas = (recall[changing_points + 1] - recall[changing_points]) * precision[changing_points + 1] + return areas.sum() + + +def compute_voc2007_average_precision(precision, recall): + ap = 0.0 + for t in np.arange(0.0, 1.1, 0.1): + if np.sum(recall >= t) == 0: + p = 0 + else: + p = np.max(precision[recall >= t]) + ap = ap + p / 11.0 + return ap diff --git a/modules/PytorchSSD/utils/misc.py b/modules/PytorchSSD/utils/misc.py new file mode 100644 index 0000000..abc76af --- /dev/null +++ b/modules/PytorchSSD/utils/misc.py @@ -0,0 +1,43 @@ +import time +import torch + + +def str2bool(s): + return s.lower() in ("true", "1") + + +class Timer: + def __init__(self): + self.clock = {} + + def start(self, key="default"): + self.clock[key] = time.time() + + def end(self, key="default"): + if key not in self.clock: + raise Exception(f"{key} is not in the clock.") + interval = time.time() - self.clock[key] + del self.clock[key] + return interval + + +def save_checkpoint(epoch, net_state_dict, optimizer_state_dict, best_score, checkpoint_path, model_path): + torch.save( + {"epoch": epoch, "model": net_state_dict, "optimizer": optimizer_state_dict, "best_score": best_score}, + checkpoint_path, + ) + torch.save(net_state_dict, model_path) + + +def load_checkpoint(checkpoint_path): + return torch.load(checkpoint_path) + + +def freeze_net_layers(net): + for param in net.parameters(): + param.requires_grad = False + + +def store_labels(path, labels): + with open(path, "w") as f: + f.write("\n".join(labels)) diff --git a/modules/PytorchSSD/utils/model_book.py b/modules/PytorchSSD/utils/model_book.py new file mode 100644 index 0000000..b1e9d17 --- /dev/null +++ b/modules/PytorchSSD/utils/model_book.py @@ -0,0 +1,81 @@ +from collections import OrderedDict +import torch.nn as nn + + +class ModelBook: + """Maintain the mapping between modules and their paths. + + Example: + book = ModelBook(model_ft) + for p, m in book.conv2d_modules(): + print('path:', p, 'num of filters:', m.out_channels) + assert m is book.get_module(p) + """ + + def __init__(self, model): + self._model = model + self._modules = OrderedDict() + self._paths = OrderedDict() + path = [] + self._construct(self._model, path) + + def _construct(self, module, path): + if not module._modules: + return + for name, m in module._modules.items(): + cur_path = tuple(path + [name]) + self._paths[m] = cur_path + self._modules[cur_path] = m + self._construct(m, path + [name]) + + def conv2d_modules(self): + return self.modules(nn.Conv2d) + + def linear_modules(self): + return self.modules(nn.Linear) + + def modules(self, module_type=None): + for p, m in self._modules.items(): + if not module_type or isinstance(m, module_type): + yield p, m + + def num_of_conv2d_modules(self): + return self.num_of_modules(nn.Conv2d) + + def num_of_conv2d_filters(self): + """Return the sum of out_channels of all conv2d layers. + + Here we treat the sub weight with size of [in_channels, h, w] as a single filter. + """ + num_filters = 0 + for _, m in self.conv2d_modules(): + num_filters += m.out_channels + return num_filters + + def num_of_linear_modules(self): + return self.num_of_modules(nn.Linear) + + def num_of_linear_filters(self): + num_filters = 0 + for _, m in self.linear_modules(): + num_filters += m.out_features + return num_filters + + def num_of_modules(self, module_type=None): + num = 0 + for p, m in self._modules.items(): + if not module_type or isinstance(m, module_type): + num += 1 + return num + + def get_module(self, path): + return self._modules.get(path) + + def get_path(self, module): + return self._paths.get(module) + + def update(self, path, module): + old_module = self._modules[path] + del self._paths[old_module] + self._paths[module] = path + self._modules[path] = module diff --git a/modules/posenet/__init__.py b/modules/posenet/__init__.py new file mode 100644 index 0000000..24a198c --- /dev/null +++ b/modules/posenet/__init__.py @@ -0,0 +1,5 @@ +from modules.posenet.constants import * +from modules.posenet.decode_multi import decode_multiple_poses +from modules.posenet.models.model_factory import load_model +from modules.posenet.models import MobileNetV1, MOBILENET_V1_CHECKPOINTS +from modules.posenet.utils import * diff --git a/modules/posenet/constants.py b/modules/posenet/constants.py new file mode 100644 index 0000000..3eaeeae --- /dev/null +++ b/modules/posenet/constants.py @@ -0,0 +1,90 @@ +PART_NAMES = [ + "nose", + "leftEye", + "rightEye", + "leftEar", + "rightEar", + "leftShoulder", + "rightShoulder", + "leftElbow", + "rightElbow", + "leftWrist", + "rightWrist", + "leftHip", + "rightHip", + "leftKnee", + "rightKnee", + "leftAnkle", + "rightAnkle", +] + +NUM_KEYPOINTS = len(PART_NAMES) + +PART_IDS = {pn: pid for pid, pn in enumerate(PART_NAMES)} + +CONNECTED_PART_NAMES = [ + ("leftHip", "leftShoulder"), + ("leftElbow", "leftShoulder"), + ("leftElbow", "leftWrist"), + ("leftHip", "leftKnee"), + ("leftKnee", "leftAnkle"), + ("rightHip", "rightShoulder"), + ("rightElbow", "rightShoulder"), + ("rightElbow", "rightWrist"), + ("rightHip", "rightKnee"), + ("rightKnee", "rightAnkle"), + ("leftShoulder", "rightShoulder"), + ("leftHip", "rightHip"), +] + +CONNECTED_PART_INDICES = [(PART_IDS[a], PART_IDS[b]) for a, b in CONNECTED_PART_NAMES] + +LOCAL_MAXIMUM_RADIUS = 1 + +POSE_CHAIN = [ + ("nose", "leftEye"), + ("leftEye", "leftEar"), + ("nose", "rightEye"), + ("rightEye", "rightEar"), + ("nose", "leftShoulder"), + ("leftShoulder", "leftElbow"), + ("leftElbow", "leftWrist"), + ("leftShoulder", "leftHip"), + ("leftHip", "leftKnee"), + ("leftKnee", "leftAnkle"), + ("nose", "rightShoulder"), + ("rightShoulder", "rightElbow"), + ("rightElbow", "rightWrist"), + ("rightShoulder", "rightHip"), + ("rightHip", "rightKnee"), + ("rightKnee", "rightAnkle"), +] + +PARENT_CHILD_TUPLES = [(PART_IDS[parent], PART_IDS[child]) for parent, child in POSE_CHAIN] + +PART_CHANNELS = [ + "left_face", + "right_face", + "right_upper_leg_front", + "right_lower_leg_back", + "right_upper_leg_back", + "left_lower_leg_front", + "left_upper_leg_front", + "left_upper_leg_back", + "left_lower_leg_back", + "right_feet", + "right_lower_leg_front", + "left_feet", + "torso_front", + "torso_back", + "right_upper_arm_front", + "right_upper_arm_back", + "right_lower_arm_back", + "left_lower_arm_front", + "left_upper_arm_front", + "left_upper_arm_back", + "left_lower_arm_back", + "right_hand", + "right_lower_arm_front", + "left_hand", +] diff --git a/modules/posenet/converter/tfjs2pytorch.py b/modules/posenet/converter/tfjs2pytorch.py new file mode 100644 index 0000000..23c9d65 --- /dev/null +++ b/modules/posenet/converter/tfjs2pytorch.py @@ -0,0 +1,112 @@ +import json +import struct +import cv2 +import numpy as np +import os +import tempfile +import torch + +from modules.posenet import MobileNetV1, MOBILENET_V1_CHECKPOINTS + + +BASE_DIR = os.path.join(tempfile.gettempdir(), "_posenet_weights") + + +def to_torch_name(tf_name): + tf_name = tf_name.lower() + tf_split = tf_name.split("/") + tf_layer_split = tf_split[1].split("_") + tf_variable_type = tf_split[2] + if tf_variable_type == "weights" or tf_variable_type == "depthwise_weights": + variable_postfix = ".weight" + elif tf_variable_type == "biases": + variable_postfix = ".bias" + else: + variable_postfix = "" + + if tf_layer_split[0] == "conv2d": + torch_name = "features.conv" + tf_layer_split[1] + if len(tf_layer_split) > 2: + torch_name += "." + tf_layer_split[2] + else: + torch_name += ".conv" + torch_name += variable_postfix + else: + if tf_layer_split[0] in ["offset", "displacement", "heatmap"] and tf_layer_split[-1] == "2": + torch_name = "_".join(tf_layer_split[:-1]) + torch_name += variable_postfix + else: + torch_name = "" + + return torch_name + + +def load_variables(chkpoint, base_dir=BASE_DIR): + manifest_path = os.path.join(base_dir, chkpoint, "manifest.json") + if not os.path.exists(manifest_path): + print("Weights for checkpoint %s are not downloaded. Downloading to %s ..." % (chkpoint, base_dir)) + from modules.posenet.converter.wget import download + + download(chkpoint, base_dir) + assert os.path.exists(manifest_path) + + manifest = open(manifest_path) + variables = json.load(manifest) + manifest.close() + + state_dict = {} + for x in variables: + torch_name = to_torch_name(x) + if not torch_name: + continue + filename = variables[x]["filename"] + byte = open(os.path.join(base_dir, chkpoint, filename), "rb").read() + fmt = str(int(len(byte) / struct.calcsize("f"))) + "f" + d = struct.unpack(fmt, byte) + d = np.array(d, dtype=np.float32) + shape = variables[x]["shape"] + if len(shape) == 4: + tpt = (2, 3, 0, 1) if "depthwise" in filename else (3, 2, 0, 1) + d = np.reshape(d, shape).transpose(tpt) + state_dict[torch_name] = torch.Tensor(d) + + return state_dict + + +def _read_imgfile(path, width, height): + img = cv2.imread(path) + img = cv2.resize(img, (width, height)) + img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + img = img.astype(np.float32) + img = img * (2.0 / 255.0) - 1.0 + img = img.transpose((2, 0, 1)) + return img + + +def convert(model_id, model_dir, output_stride=16, image_size=513, check=True): + checkpoint_name = MOBILENET_V1_CHECKPOINTS[model_id] + width = image_size + height = image_size + + if not os.path.exists(model_dir): + os.makedirs(model_dir) + + state_dict = load_variables(checkpoint_name) + m = MobileNetV1(model_id, output_stride=output_stride) + m.load_state_dict(state_dict) + checkpoint_path = os.path.join(model_dir, checkpoint_name) + ".pth" + torch.save(m.state_dict(), checkpoint_path) + + if check and os.path.exists("./images/tennis_in_crowd.jpg"): + # Result + input_image = _read_imgfile("./images/tennis_in_crowd.jpg", width, height) + input_image = np.array(input_image, dtype=np.float32) + input_image = input_image.reshape(1, 3, height, width) + input_image = torch.Tensor(input_image) + + heatmaps_result, offset_result, displacement_fwd_result, displacement_bwd_result = m(input_image) + + print("Heatmaps") + print(heatmaps_result.shape) + print(heatmaps_result[:, 0:1, 0:1]) + print(torch.mean(heatmaps_result)) diff --git a/modules/posenet/converter/wget.py b/modules/posenet/converter/wget.py new file mode 100644 index 0000000..86c6d31 --- /dev/null +++ b/modules/posenet/converter/wget.py @@ -0,0 +1,50 @@ +import requests +import json +import posixpath +import os + +from modules.posenet import MOBILENET_V1_CHECKPOINTS + +GOOGLE_CLOUD_STORAGE_DIR = "https://storage.googleapis.com/tfjs-models/weights/posenet/" + + +def download_json(checkpoint, filename, base_dir): + url = posixpath.join(GOOGLE_CLOUD_STORAGE_DIR, checkpoint, filename) + response = requests.get(url) + data = json.loads(response.content) + + with open(os.path.join(base_dir, checkpoint, filename), "w") as outfile: + json.dump(data, outfile) + + +def download_file(checkpoint, filename, base_dir): + url = posixpath.join(GOOGLE_CLOUD_STORAGE_DIR, checkpoint, filename) + response = requests.get(url) + f = open(os.path.join(base_dir, checkpoint, filename), "wb") + f.write(response.content) + f.close() + + +def download(checkpoint, base_dir="./weights/"): + save_dir = os.path.join(base_dir, checkpoint) + if not os.path.exists(save_dir): + os.makedirs(save_dir) + + download_json(checkpoint, "manifest.json", base_dir) + + f = open(os.path.join(save_dir, "manifest.json"), "r") + json_dict = json.load(f) + + for x in json_dict: + filename = json_dict[x]["filename"] + print("Downloading", filename) + download_file(checkpoint, filename, base_dir) + + +def main(): + checkpoint = MOBILENET_V1_CHECKPOINTS[101] + download(checkpoint) + + +if __name__ == "__main__": + main() diff --git a/modules/posenet/decode.py b/modules/posenet/decode.py new file mode 100644 index 0000000..c6c3da9 --- /dev/null +++ b/modules/posenet/decode.py @@ -0,0 +1,73 @@ +import numpy as np + +from modules.posenet.constants import * + + +def traverse_to_targ_keypoint( + edge_id, source_keypoint, target_keypoint_id, scores, offsets, output_stride, displacements +): + height = scores.shape[1] + width = scores.shape[2] + + source_keypoint_indices = np.clip( + np.round(source_keypoint / output_stride), a_min=0, a_max=[height - 1, width - 1] + ).astype(np.int32) + + displaced_point = source_keypoint + displacements[edge_id, source_keypoint_indices[0], source_keypoint_indices[1]] + + displaced_point_indices = np.clip( + np.round(displaced_point / output_stride), a_min=0, a_max=[height - 1, width - 1] + ).astype(np.int32) + + score = scores[target_keypoint_id, displaced_point_indices[0], displaced_point_indices[1]] + + image_coord = ( + displaced_point_indices * output_stride + + offsets[target_keypoint_id, displaced_point_indices[0], displaced_point_indices[1]] + ) + + return score, image_coord + + +def decode_pose( + root_score, root_id, root_image_coord, scores, offsets, output_stride, displacements_fwd, displacements_bwd +): + num_parts = scores.shape[0] + num_edges = len(PARENT_CHILD_TUPLES) + + instance_keypoint_scores = np.zeros(num_parts) + instance_keypoint_coords = np.zeros((num_parts, 2)) + instance_keypoint_scores[root_id] = root_score + instance_keypoint_coords[root_id] = root_image_coord + + for edge in reversed(range(num_edges)): + target_keypoint_id, source_keypoint_id = PARENT_CHILD_TUPLES[edge] + if instance_keypoint_scores[source_keypoint_id] > 0.0 and instance_keypoint_scores[target_keypoint_id] == 0.0: + score, coords = traverse_to_targ_keypoint( + edge, + instance_keypoint_coords[source_keypoint_id], + target_keypoint_id, + scores, + offsets, + output_stride, + displacements_bwd, + ) + instance_keypoint_scores[target_keypoint_id] = score + instance_keypoint_coords[target_keypoint_id] = coords + + for edge in range(num_edges): + source_keypoint_id, target_keypoint_id = PARENT_CHILD_TUPLES[edge] + if instance_keypoint_scores[source_keypoint_id] > 0.0 and instance_keypoint_scores[target_keypoint_id] == 0.0: + score, coords = traverse_to_targ_keypoint( + edge, + instance_keypoint_coords[source_keypoint_id], + target_keypoint_id, + scores, + offsets, + output_stride, + displacements_fwd, + ) + instance_keypoint_scores[target_keypoint_id] = score + instance_keypoint_coords[target_keypoint_id] = coords + + return instance_keypoint_scores, instance_keypoint_coords diff --git a/modules/posenet/decode_multi.py b/modules/posenet/decode_multi.py new file mode 100644 index 0000000..88d9b3a --- /dev/null +++ b/modules/posenet/decode_multi.py @@ -0,0 +1,116 @@ +from modules.posenet.decode import * +from modules.posenet.constants import * +import torch +import torch.nn as nn +import torch.nn.functional as F + + +def within_nms_radius_fast(pose_coords, squared_nms_radius, point): + if not pose_coords.shape[0]: + return False + return np.any(np.sum((pose_coords - point) ** 2, axis=1) <= squared_nms_radius) + + +def get_instance_score_fast(exist_pose_coords, squared_nms_radius, keypoint_scores, keypoint_coords): + if exist_pose_coords.shape[0]: + s = np.sum((exist_pose_coords - keypoint_coords) ** 2, axis=2) > squared_nms_radius + not_overlapped_scores = np.sum(keypoint_scores[np.all(s, axis=0)]) + else: + not_overlapped_scores = np.sum(keypoint_scores) + return not_overlapped_scores / len(keypoint_scores) + + +def build_part_with_score_torch(score_threshold, local_max_radius, scores): + lmd = 2 * local_max_radius + 1 + max_vals = F.max_pool2d(scores, lmd, stride=1, padding=1) + max_loc = (scores == max_vals) & (scores >= score_threshold) + max_loc_idx = max_loc.nonzero() + scores_vec = scores[max_loc] + sort_idx = torch.argsort(scores_vec, descending=True) + return scores_vec[sort_idx], max_loc_idx[sort_idx] + + +# FIXME leaving here as reference for now +# def build_part_with_score_fast(score_threshold, local_max_radius, scores): +# parts = [] +# num_keypoints = scores.shape[0] +# lmd = 2 * local_max_radius + 1 +# +# # NOTE it seems faster to iterate over the keypoints and perform maximum_filter +# # on each subarray vs doing the op on the full score array with size=(lmd, lmd, 1) +# for keypoint_id in range(num_keypoints): +# kp_scores = scores[keypoint_id, :, :].copy() +# kp_scores[kp_scores < score_threshold] = 0. +# max_vals = ndi.maximum_filter(kp_scores, size=lmd, mode='constant') +# max_loc = np.logical_and(kp_scores == max_vals, kp_scores > 0) +# max_loc_idx = max_loc.nonzero() +# for y, x in zip(*max_loc_idx): +# parts.append(( +# scores[keypoint_id, y, x], +# keypoint_id, +# np.array((y, x)) +# )) +# +# return parts + + +def decode_multiple_poses( + scores, + offsets, + displacements_fwd, + displacements_bwd, + output_stride, + max_pose_detections=10, + score_threshold=0.5, + nms_radius=20, + min_pose_score=0.5, +): + # perform part scoring step on GPU as it's expensive + # TODO determine how much more of this would be worth performing on the GPU + part_scores, part_idx = build_part_with_score_torch(score_threshold, LOCAL_MAXIMUM_RADIUS, scores) + part_scores = part_scores.cpu().numpy() + part_idx = part_idx.cpu().numpy() + + scores = scores.cpu().numpy() + height = scores.shape[1] + width = scores.shape[2] + # change dimensions from (x, h, w) to (x//2, h, w, 2) to allow return of complete coord array + offsets = offsets.cpu().numpy().reshape(2, -1, height, width).transpose((1, 2, 3, 0)) + displacements_fwd = displacements_fwd.cpu().numpy().reshape(2, -1, height, width).transpose((1, 2, 3, 0)) + displacements_bwd = displacements_bwd.cpu().numpy().reshape(2, -1, height, width).transpose((1, 2, 3, 0)) + + squared_nms_radius = nms_radius**2 + pose_count = 0 + pose_scores = np.zeros(max_pose_detections) + pose_keypoint_scores = np.zeros((max_pose_detections, NUM_KEYPOINTS)) + pose_keypoint_coords = np.zeros((max_pose_detections, NUM_KEYPOINTS, 2)) + + for root_score, (root_id, root_coord_y, root_coord_x) in zip(part_scores, part_idx): + root_coord = np.array([root_coord_y, root_coord_x]) + root_image_coords = root_coord * output_stride + offsets[root_id, root_coord_y, root_coord_x] + + if within_nms_radius_fast(pose_keypoint_coords[:pose_count, root_id, :], squared_nms_radius, root_image_coords): + continue + + keypoint_scores, keypoint_coords = decode_pose( + root_score, root_id, root_image_coords, scores, offsets, output_stride, displacements_fwd, displacements_bwd + ) + + pose_score = get_instance_score_fast( + pose_keypoint_coords[:pose_count, :, :], squared_nms_radius, keypoint_scores, keypoint_coords + ) + + # NOTE this isn't in the original implementation, but it appears that by initially ordering by + # part scores, and having a max # of detections, we can end up populating the returned poses with + # lower scored poses than if we discard 'bad' ones and continue (higher pose scores can still come later). + # Set min_pose_score to 0. to revert to original behaviour + if min_pose_score == 0.0 or pose_score >= min_pose_score: + pose_scores[pose_count] = pose_score + pose_keypoint_scores[pose_count, :] = keypoint_scores + pose_keypoint_coords[pose_count, :, :] = keypoint_coords + pose_count += 1 + + if pose_count >= max_pose_detections: + break + + return pose_scores, pose_keypoint_scores, pose_keypoint_coords diff --git a/modules/posenet/utils.py b/modules/posenet/utils.py new file mode 100644 index 0000000..91863af --- /dev/null +++ b/modules/posenet/utils.py @@ -0,0 +1,102 @@ +import cv2 +import numpy as np + +import modules.posenet.constants + + +def valid_resolution(width, height, output_stride=16): + target_width = (int(width) // output_stride) * output_stride + 1 + target_height = (int(height) // output_stride) * output_stride + 1 + return target_width, target_height + + +def _process_input(source_img, scale_factor=1.0, output_stride=16): + target_width, target_height = valid_resolution( + source_img.shape[1] * scale_factor, source_img.shape[0] * scale_factor, output_stride=output_stride + ) + scale = np.array([source_img.shape[0] / target_height, source_img.shape[1] / target_width]) + + input_img = cv2.resize(source_img, (target_width, target_height), interpolation=cv2.INTER_LINEAR) + input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB).astype(np.float32) + input_img = input_img * (2.0 / 255.0) - 1.0 + input_img = input_img.transpose((2, 0, 1)).reshape(1, 3, target_height, target_width) + return input_img, source_img, scale + + +def read_cap(cap, scale_factor=1.0, output_stride=16): + res, img = cap.read() + if not res: + raise IOError("webcam failure") + return _process_input(img, scale_factor, output_stride) + + +def read_imgfile(img, scale_factor=1.0, output_stride=16): + return _process_input(img, scale_factor, output_stride) + + +def draw_keypoints( + img, instance_scores, keypoint_scores, keypoint_coords, min_pose_confidence=0.5, min_part_confidence=0.5 +): + cv_keypoints = [] + for ii, score in enumerate(instance_scores): + if score < min_pose_confidence: + continue + for ks, kc in zip(keypoint_scores[ii, :], keypoint_coords[ii, :, :]): + if ks < min_part_confidence: + continue + cv_keypoints.append(cv2.KeyPoint(kc[1], kc[0], 10.0 * ks)) + out_img = cv2.drawKeypoints(img, cv_keypoints, outImage=np.array([])) + return out_img + + +def get_adjacent_keypoints(keypoint_scores, keypoint_coords, min_confidence=0.1): + results = [] + for left, right in modules.posenet.CONNECTED_PART_INDICES: + if keypoint_scores[left] < min_confidence or keypoint_scores[right] < min_confidence: + continue + results.append( + np.array([keypoint_coords[left][::-1], keypoint_coords[right][::-1]]).astype(np.int32), + ) + return results + + +def draw_skeleton( + img, instance_scores, keypoint_scores, keypoint_coords, min_pose_confidence=0.5, min_part_confidence=0.5 +): + out_img = img + adjacent_keypoints = [] + for ii, score in enumerate(instance_scores): + if score < min_pose_confidence: + continue + new_keypoints = get_adjacent_keypoints(keypoint_scores[ii, :], keypoint_coords[ii, :, :], min_part_confidence) + adjacent_keypoints.extend(new_keypoints) + out_img = cv2.polylines(out_img, adjacent_keypoints, isClosed=False, color=(255, 255, 0)) + return out_img + + +def draw_skel_and_kp(img, instance_scores, keypoint_scores, keypoint_coords, min_pose_score=0.5, min_part_score=0.5): + out_img = img + adjacent_keypoints = [] + cv_keypoints = [] + for ii, score in enumerate(instance_scores): + if score < min_pose_score: + continue + + new_keypoints = get_adjacent_keypoints(keypoint_scores[ii, :], keypoint_coords[ii, :, :], min_part_score) + adjacent_keypoints.extend(new_keypoints) + + for ks, kc in zip(keypoint_scores[ii, :], keypoint_coords[ii, :, :]): + if ks < min_part_score: + continue + cv_keypoints.append(cv2.KeyPoint(kc[1], kc[0], 10.0 * ks)) + + if cv_keypoints: + out_img = cv2.drawKeypoints( + out_img, + cv_keypoints, + outImage=np.array([]), + color=(0, 255, 255), + flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS, + ) + out_img = cv2.polylines(out_img, adjacent_keypoints, isClosed=False, color=(0, 255, 255)) + return out_img diff --git a/modules/util/calc_ste_position.py b/modules/util/calc_ste_position.py new file mode 100644 index 0000000..63721f8 --- /dev/null +++ b/modules/util/calc_ste_position.py @@ -0,0 +1,33 @@ +import cv2 +import numpy as np +import modules.util.const as const + + +class CalcStethoscopePosition: + def __init__(self): + self.target_points = np.array( + [ + [const.LEFTSHOLDER_X, const.LEFTSHOLDER_Y], + [const.RIGHTSHOLDER_X, const.RIGHTSHOLDER_Y], + [const.LEFTHIP_X, const.LEFTHIP_Y], + [const.RIGHTHIP_X, const.RIGHTHIP_Y], + ], + dtype=np.float32, + ) + + def calc_affine(self, source_points, stethoscope_x, stethoscope_y): + mat = cv2.getPerspectiveTransform(source_points, self.target_points) + x_0 = mat[0][0] * stethoscope_x + mat[0][1] * stethoscope_y + mat[0][2] + y_0 = mat[1][0] * stethoscope_x + mat[1][1] * stethoscope_y + mat[1][2] + x_1_y_1 = mat[2][0] * stethoscope_x + mat[2][1] * stethoscope_y + mat[2][2] + stethoscope_calc = list((int(x_0 / x_1_y_1), int(y_0 / x_1_y_1))) + + if ( + stethoscope_calc[0] > const.MAXIMAIUM_SIZE + or stethoscope_calc[1] > const.MAXIMAIUM_SIZE + or stethoscope_calc[0] < const.MINIMUM_SIZE + or stethoscope_calc[1] < const.MINIMUM_SIZE + ): + stethoscope_calc = list((0, 0)) + + return stethoscope_calc diff --git a/modules/util/camera_selector.py b/modules/util/camera_selector.py new file mode 100644 index 0000000..326aa5b --- /dev/null +++ b/modules/util/camera_selector.py @@ -0,0 +1,144 @@ +import tkinter as tk +from tkinter import Button, Listbox, Toplevel, Label, messagebox, Frame, Scrollbar +import subprocess +import cv2 +from PIL import Image, ImageTk +import modules.util.const as const +import threading + + +class CameraSelector(Toplevel): + def __init__(self, parent): + super().__init__(parent) + self.title("カメラの選択") + self.geometry("800x400") + self.cap = None + self.stop_thread = True + + self.cameras = self.get_cameras() + + self.setup_gui() + + self.protocol("WM_DELETE_WINDOW", self.on_close) + + def setup_gui(self): + """GUIのセットアップ""" + self.setup_camera_frame() + self.setup_control_frame() + + def setup_camera_frame(self): + """カメラフレームのセットアップ""" + self.camera_frame = Frame(self) + self.camera_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + self.feed_label = Label(self.camera_frame) + self.feed_label.pack(pady=20, padx=20, fill=tk.BOTH, expand=True) + + def setup_control_frame(self): + """コントロールフレームのセットアップ""" + self.control_frame = Frame(self) + self.control_frame.pack(side=tk.RIGHT, fill=tk.BOTH, padx=10, pady=10) + scrollbar = Scrollbar(self.control_frame) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + self.listbox = Listbox(self.control_frame, yscrollcommand=scrollbar.set) + for cam_id, name in self.cameras: + self.listbox.insert(tk.END, f"[{cam_id}] {name}") + self.listbox.pack(pady=20, padx=20, fill=tk.BOTH, expand=True) + scrollbar.config(command=self.listbox.yview) + self.listbox.bind("", self.on_select_camera_id) + + self.preview_button = Button(self.control_frame, text="カメラ画像確認", command=self.on_preview_camera) + self.preview_button.pack(pady=10) + + self.select_id_button = Button(self.control_frame, text="カメラID確定", command=self.finalize_selection) + self.select_id_button.pack(pady=10) + + def get_cameras(self): + """CameraFinder.exeを使用して利用可能なカメラのIDと名前を取得する""" + result = subprocess.run([const.CAMERA_FINDER_PATH], capture_output=True, text=True) + lines = result.stdout.splitlines() + cameras = [] + for line in lines: + if "[" in line and "]" in line: + idx = line.index("[") + id_end = line.index("]") + cam_id = int(line[idx + 1 : id_end]) + cam_name = line[id_end + 2 :].strip() + cameras.append((cam_id, cam_name)) + return cameras + + def on_select_camera_id(self, event=None): + selected_idx = self.listbox.curselection() + if not selected_idx: + return + self.selected_cam_id = self.cameras[selected_idx[0]][0] + self.listbox.selection_set(selected_idx) + + def on_preview_camera(self): + if not hasattr(self, "selected_cam_id"): + messagebox.showinfo("情報", "カメラIDを選択してください。") + return + + if self.cap and self.cap.isOpened(): + self.cap.release() + + self.stop_thread = False + self.thread = threading.Thread(target=self.show_camera_feed, args=(self.selected_cam_id,)) + self.thread.start() + + def show_camera_feed(self, cam_id): + self.cap = cv2.VideoCapture(cam_id) + if not self.cap.isOpened(): + messagebox.showerror("エラー", "カメラを開けませんでした。") + return + + while not self.stop_thread: + ret, frame = self.cap.read() + if ret: + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + # アスペクト比を保ったままリサイズ + h, w, _ = frame.shape + aspect_ratio = w / h + new_width = int(self.feed_label.winfo_height() * aspect_ratio) + frame = cv2.resize(frame, (new_width, self.feed_label.winfo_height())) + image = Image.fromarray(frame) + photo = ImageTk.PhotoImage(image=image) + self.feed_label.config(image=photo) + self.feed_label.image = photo + + self.cap.release() + + def get_selected_camera_id(self): + """選択されたカメラIDを返す""" + if hasattr(self, "selected_cam_id"): + return self.selected_cam_id + return None + + def finalize_selection(self): + if not hasattr(self, "selected_cam_id"): + messagebox.showinfo("情報", "カメラIDを選択してください。") + return + self.selected_camera_id = self.get_selected_camera_id() + self.master.camera_id = self.selected_camera_id + messagebox.showinfo("情報", f"カメラID {self.selected_camera_id} が確定されました。") + self.destroy() + + def on_close(self): + self.stop_thread = True # スレッドを停止 + if hasattr(self, "thread"): + self.thread.join() # スレッドが終了するのを待つ + if self.cap and self.cap.isOpened(): + self.cap.release() + self.destroy() # ウィンドウを終了 + + @staticmethod + def get_default_camera_id(): + """デフォルトで "VGA Camera" という名前のカメラIDを返す""" + result = subprocess.run([const.CAMERA_FINDER_PATH], capture_output=True, text=True) + lines = result.stdout.splitlines() + for line in lines: + if "VGA Camera" in line: + idx = line.index("[") + id_end = line.index("]") + cam_id = int(line[idx + 1 : id_end]) + return cam_id + return None diff --git a/modules/util/const.py b/modules/util/const.py new file mode 100644 index 0000000..dc9341f --- /dev/null +++ b/modules/util/const.py @@ -0,0 +1,33 @@ +# CameraFinder.exeの場所 +CAMERA_FINDER_PATH = "bin/CameraFinder.exe" + +# 聴診デバイス +BAR_TOR = 64 +ADJUST_VALUE = 48 +VID = 1027 +PID = 24597 +BAUDRATE = 115200 + +# SSDモデルのセットアップ +MODEL_PATH = "./models/mb1-ssd-second.pth" +LABEL_PATH = "./models/voc-model-labels.txt" + +# EARS音源のセットアップ +EARS_MAP_PATH = "img/map/" +EARS_SOUND_PATH = "sound/" + +# 聴診位置計算 +LEFTSHOLDER_X = 290 +LEFTSHOLDER_Y = 90 +RIGHTSHOLDER_X = 100 +RIGHTSHOLDER_Y = 90 +LEFTHIP_X = 280 +LEFTHIP_Y = 390 +RIGHTHIP_X = 110 +RIGHTHIP_Y = 390 + +MAXIMAIUM_SIZE = 390 +MINIMUM_SIZE = 0 + +# logフォルダの場所 +LOG_PATH = "./log" diff --git a/modules/util/ears_ai.py b/modules/util/ears_ai.py new file mode 100644 index 0000000..784fbe1 --- /dev/null +++ b/modules/util/ears_ai.py @@ -0,0 +1,92 @@ +import cv2 +import numpy as np +import torch +import modules.posenet as posenet +from modules.PytorchSSD.ssd.mobilenetv1_ssd import create_mobilenetv1_ssd, create_mobilenetv1_ssd_predictor +from modules.util import const + + +class EarsAI: + def __init__(self): + self.model_path = const.MODEL_PATH + self.label_path = const.LABEL_PATH + + self.setup_ssd_model() + self.setup_posenet() + + def setup_ssd_model(self): + """SSDモデルのセットアップを行う""" + class_names = [name.strip() for name in open(self.label_path).readlines()] + net = create_mobilenetv1_ssd(len(class_names), is_test=True) + net.load(self.model_path) + self.predictor = create_mobilenetv1_ssd_predictor(net, candidate_size=200) + self.class_names = class_names + + def setup_posenet(self): + """PoseNetのセットアップを行う""" + self.posenet_model = posenet.load_model(101).cuda() + self.output_stride = self.posenet_model.output_stride + + def pose_detect(self, frame, vid): + """姿勢検出を行う""" + print("Entering pose_detect method") # デバッグ出力 + print(f"Frame type: {type(frame)}") # デバッグ出力 + print(f"Frame shape: {frame.shape if hasattr(frame, 'shape') else 'No shape attribute'}") # デバッグ出力 + + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + input_image, draw_image, output_scale = posenet.read_imgfile( + frame, scale_factor=0.7125, output_stride=self.output_stride + ) + with torch.no_grad(): + input_image = torch.Tensor(input_image).cuda() + heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = self.posenet_model( + input_image + ) + pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multiple_poses( + heatmaps_result.squeeze(0), + offsets_result.squeeze(0), + displacement_fwd_result.squeeze(0), + displacement_bwd_result.squeeze(0), + output_stride=self.output_stride, + max_pose_detections=10, + min_pose_score=0.30, + ) + + keypoint_coords *= output_scale + overlay_image = posenet.draw_skel_and_kp( + frame, pose_scores, keypoint_scores, keypoint_coords, min_pose_score=0.15, min_part_score=0.1 + ) + + # Extract keypoint coordinates + left_shoulder = keypoint_coords[0, posenet.PART_NAMES.index("leftShoulder"), :].astype(np.int32) + right_shoulder = keypoint_coords[0, posenet.PART_NAMES.index("rightShoulder"), :].astype(np.int32) + left_hip = keypoint_coords[0, posenet.PART_NAMES.index("leftHip"), :].astype(np.int32) + right_hip = keypoint_coords[0, posenet.PART_NAMES.index("rightHip"), :].astype(np.int32) + + return overlay_image, left_shoulder, right_shoulder, left_hip, right_hip + + def ssd_detect(self, frame, vid): + """SSDによる検出を行う""" + overlay_image = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + boxes, labels, probs = self.predictor.predict(overlay_image, 1, 0.20) + overlay_image = cv2.cvtColor(overlay_image, cv2.COLOR_BGR2RGB) + + stethoscope_x, stethoscope_y = 0, 0 + if len(probs) != 0: + max_index = np.argmax(probs) + box = boxes[max_index, :] + cv2.rectangle(overlay_image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0, 255, 255), 2) + stethoscope_x = int((box[0] + box[2]) / 2) + stethoscope_y = int((box[1] + box[3]) / 2) + label = f"{self.class_names[labels[max_index]]}: {probs[max_index]:.2f}" + cv2.putText( + overlay_image, + label, + (int(box[0]) + 20, int(box[1]) + 40), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (255, 0, 255), + 1, + ) + + return overlay_image, stethoscope_x, stethoscope_y diff --git a/modules/util/ears_sound.py b/modules/util/ears_sound.py new file mode 100644 index 0000000..3860e2d --- /dev/null +++ b/modules/util/ears_sound.py @@ -0,0 +1,72 @@ +import cv2 +import numpy as np +import pygame +import os +import modules.util.const as const + + +class EarsSound: + def __init__(self, result): + pygame.mixer.init() + self.volume = 0.0 + self.min_volume = 0.0 + self.playing = False + + if result[0] == "none": + self.map_image = None + self.sound = None + else: + self.map_image = cv2.imread(os.path.join(const.EARS_MAP_PATH + result[0])) + self.sound_file = os.path.join(const.EARS_SOUND_PATH + result[1]) + self.sound = pygame.mixer.Sound(self.sound_file) + + def volume_change(self, stethoscope, flag=False, type=None): + """音量を変更する""" + if self.map_image is None: + return + + R, G, B = self.map_image[stethoscope[1], stethoscope[0]] + vol = R if R != 0 else B + VOLUME_CURVE = 9.5 + Y = pow((vol / 255.0), 1.0 / VOLUME_CURVE) + set_volume = Y + + if set_volume > 1: + set_volume = 1 + elif set_volume < 0: + set_volume = 0 + + if type == 1: + set_volume = set_volume * 0.5 + + self.volume = set_volume + if flag and self.volume != 0: + if stethoscope[0] > 195: + self.volume = 0.1 + self.sound.set_volume(self.volume) + + def get_length(self): + """音源の長さを秒単位で取得する""" + if not self.sound: + return 0 + + array = pygame.sndarray.array(self.sound) + sample_rate = pygame.mixer.get_init()[0] + duration = array.shape[0] / float(sample_rate) + return duration + + def set_volume(self, volume): + """音量を設定する""" + self.volume = volume + + def play(self): + """音を再生する""" + self.sound.set_volume(self.volume) + self.sound.play(-1) + + def stop(self): + self.sound.set_volume(self.min_volume) + + def close(self): + """リソースを解放する""" + pygame.mixer.quit() diff --git a/modules/util/logger.py b/modules/util/logger.py new file mode 100644 index 0000000..e04cf67 --- /dev/null +++ b/modules/util/logger.py @@ -0,0 +1,79 @@ +import logging +import datetime +import os +import cv2 +from modules.util import const + + +class SingletonMeta(type): + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] + + +class Logger(metaclass=SingletonMeta): + def __init__(self): + if not hasattr(self, "initialized"): + # Ensure the base log directory exists + if not os.path.exists(const.LOG_PATH): + os.makedirs(const.LOG_PATH) + + # Create a directory structure based on the current datetime + current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + self.current_log_dir = os.path.join(const.LOG_PATH, current_time) + + # Ensure the log directory exists + if not os.path.exists(self.current_log_dir): + os.makedirs(self.current_log_dir) + + # Ensure the image directory exists + self.img_path = os.path.join(self.current_log_dir, "img") + if not os.path.exists(self.img_path): + os.makedirs(self.img_path) + + # Create log filename + log_filename = os.path.join(self.current_log_dir, f"log_{current_time}.log") + + # Set up logging as before + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.setLevel(logging.DEBUG) + + fh = logging.FileHandler(log_filename) + fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) + self.logger.addHandler(fh) + + ch = logging.StreamHandler() + ch.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) + self.logger.addHandler(ch) + + self.initialized = True + + def debug(self, message): + self.logger.debug(message) + + def info(self, message): + self.logger.info(message) + + def warning(self, message): + self.logger.warning(message) + + def error(self, message): + self.logger.error(message) + + def critical(self, message): + self.logger.critical(message) + + def save_image(self, frame, img_name=None): + # If img_name isn't provided, use the current time as the filename + if img_name is None: + file_name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S%f") + ".jpg" # %f is microseconds + else: + file_name = ( + datetime.datetime.now().strftime("%Y%m%d_%H%M%S%f") + "_" + img_name + ".jpg" + ) # %f is microseconds + img_path = os.path.join(self.img_path, file_name) + cv2.imwrite(img_path, frame) diff --git a/modules/util/touch_sensor.py b/modules/util/touch_sensor.py new file mode 100644 index 0000000..fa524bd --- /dev/null +++ b/modules/util/touch_sensor.py @@ -0,0 +1,74 @@ +import serial +from serial.tools import list_ports +import modules.util.const as const + + +class TouchSensor: + def __init__(self): + self.ser = None + self.inhale_start = None + self.exhale_start = None + self.exhale_end = None + self.serial_command = "0" + self.previous = 0 + self.setup_sensor() + + def setup_sensor(self): + """センサーのセットアップを行う""" + self.ser = self.init_serial_communication() + + def init_serial_communication(self): + """シリアル通信の初期化を行う""" + com = "" + ports = list(list_ports.comports()) + for p in ports: + if p.vid == const.VID and p.pid == const.PID: + com = p.device + break + + if not com: + print("Sensor device not found!") + return None + + return serial.Serial(com, const.BAUDRATE, timeout=3) + + def read_sensor_value(self): + """センサーの値を読み取る""" + if not self.ser: + return None + + self.ser.write(self.serial_command.encode()) + dlen = self.ser.inWaiting() + d = self.ser.read(dlen) + strword = d.decode("utf-8", errors="ignore") + + if len(strword) != 0: + touchData = strword[-1:] + value = int(touchData[0]) + self.previous = value + return value + else: + return 2 + + def calc_serial_value(self, current_pos): + """シリアル値を計算する""" + bar = 0 + if not self.ser: + return None + + if int(current_pos) < self.inhale_start: + bar = current_pos * const.BAR_TOR / self.inhale_start + elif int(current_pos) < self.exhale_start: + bar = const.BAR_TOR + elif int(current_pos) < self.exhale_end: + bar = const.BAR_TOR - const.BAR_TOR * (current_pos - self.exhale_start) / ( + self.exhale_end - self.exhale_start + ) + + self.serial_command = chr(int(bar) + const.ADJUST_VALUE) + + def close_connection(self): + """シリアル接続を解除する""" + if self.ser: + self.ser.close() + self.ser = None diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1d9772d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +python-dotenv == 1.0.1 +opencv-python == 4.10.0.84 +torch == 2.4.0 +torchvision == 0.19.0 +requests == 2.32.3 +pandas == 2.2.2 +joblib == 1.4.2 +lightgbm == 4.5.0 \ No newline at end of file diff --git a/util/calc_ste_position.py b/util/calc_ste_position.py new file mode 100644 index 0000000..63721f8 --- /dev/null +++ b/util/calc_ste_position.py @@ -0,0 +1,33 @@ +import cv2 +import numpy as np +import modules.util.const as const + + +class CalcStethoscopePosition: + def __init__(self): + self.target_points = np.array( + [ + [const.LEFTSHOLDER_X, const.LEFTSHOLDER_Y], + [const.RIGHTSHOLDER_X, const.RIGHTSHOLDER_Y], + [const.LEFTHIP_X, const.LEFTHIP_Y], + [const.RIGHTHIP_X, const.RIGHTHIP_Y], + ], + dtype=np.float32, + ) + + def calc_affine(self, source_points, stethoscope_x, stethoscope_y): + mat = cv2.getPerspectiveTransform(source_points, self.target_points) + x_0 = mat[0][0] * stethoscope_x + mat[0][1] * stethoscope_y + mat[0][2] + y_0 = mat[1][0] * stethoscope_x + mat[1][1] * stethoscope_y + mat[1][2] + x_1_y_1 = mat[2][0] * stethoscope_x + mat[2][1] * stethoscope_y + mat[2][2] + stethoscope_calc = list((int(x_0 / x_1_y_1), int(y_0 / x_1_y_1))) + + if ( + stethoscope_calc[0] > const.MAXIMAIUM_SIZE + or stethoscope_calc[1] > const.MAXIMAIUM_SIZE + or stethoscope_calc[0] < const.MINIMUM_SIZE + or stethoscope_calc[1] < const.MINIMUM_SIZE + ): + stethoscope_calc = list((0, 0)) + + return stethoscope_calc diff --git a/util/const.py b/util/const.py new file mode 100644 index 0000000..dc9341f --- /dev/null +++ b/util/const.py @@ -0,0 +1,33 @@ +# CameraFinder.exeの場所 +CAMERA_FINDER_PATH = "bin/CameraFinder.exe" + +# 聴診デバイス +BAR_TOR = 64 +ADJUST_VALUE = 48 +VID = 1027 +PID = 24597 +BAUDRATE = 115200 + +# SSDモデルのセットアップ +MODEL_PATH = "./models/mb1-ssd-second.pth" +LABEL_PATH = "./models/voc-model-labels.txt" + +# EARS音源のセットアップ +EARS_MAP_PATH = "img/map/" +EARS_SOUND_PATH = "sound/" + +# 聴診位置計算 +LEFTSHOLDER_X = 290 +LEFTSHOLDER_Y = 90 +RIGHTSHOLDER_X = 100 +RIGHTSHOLDER_Y = 90 +LEFTHIP_X = 280 +LEFTHIP_Y = 390 +RIGHTHIP_X = 110 +RIGHTHIP_Y = 390 + +MAXIMAIUM_SIZE = 390 +MINIMUM_SIZE = 0 + +# logフォルダの場所 +LOG_PATH = "./log" diff --git a/util/ears_ai.py b/util/ears_ai.py new file mode 100644 index 0000000..669b924 --- /dev/null +++ b/util/ears_ai.py @@ -0,0 +1,93 @@ +import cv2 +import numpy as np +import torch +import modules.posenet as posenet +from modules.PytorchSSD.ssd.mobilenetv1_ssd import create_mobilenetv1_ssd, create_mobilenetv1_ssd_predictor +from modules.util import const + + +class EarsAI: + def __init__(self): + self.model_path = const.MODEL_PATH + self.label_path = const.LABEL_PATH + + self.setup_ssd_model() + self.setup_posenet() + + def setup_ssd_model(self): + """SSDモデルのセットアップを行う""" + class_names = [name.strip() for name in open(self.label_path).readlines()] + net = create_mobilenetv1_ssd(len(class_names), is_test=True) + net.load(self.model_path) + self.predictor = create_mobilenetv1_ssd_predictor(net, candidate_size=200) + self.class_names = class_names + + def setup_posenet(self): + """PoseNetのセットアップを行う""" + self.posenet_model = posenet.load_model(101).cuda() + self.output_stride = self.posenet_model.output_stride + + def pose_detect(self, frame, vid): + """姿勢検出を行う""" + if frame is None: + raise ValueError("Input frame is None") + + print(f"Pose detect - Input frame shape: {frame.shape}") + + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + input_image, draw_image, output_scale = posenet.read_imgfile( + frame, scale_factor=0.7125, output_stride=self.output_stride + ) + with torch.no_grad(): + input_image = torch.Tensor(input_image).cuda() + heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = self.posenet_model( + input_image + ) + pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multiple_poses( + heatmaps_result.squeeze(0), + offsets_result.squeeze(0), + displacement_fwd_result.squeeze(0), + displacement_bwd_result.squeeze(0), + output_stride=self.output_stride, + max_pose_detections=10, + min_pose_score=0.05, + ) + + keypoint_coords *= output_scale + overlay_image = posenet.draw_skel_and_kp( + frame, pose_scores, keypoint_scores, keypoint_coords, min_pose_score=0.15, min_part_score=0.1 + ) + + # Extract keypoint coordinates + left_shoulder = keypoint_coords[0, posenet.PART_NAMES.index("leftShoulder"), :].astype(np.int32) + right_shoulder = keypoint_coords[0, posenet.PART_NAMES.index("rightShoulder"), :].astype(np.int32) + left_hip = keypoint_coords[0, posenet.PART_NAMES.index("leftHip"), :].astype(np.int32) + right_hip = keypoint_coords[0, posenet.PART_NAMES.index("rightHip"), :].astype(np.int32) + + return overlay_image, left_shoulder, right_shoulder, left_hip, right_hip + + def ssd_detect(self, frame, vid): + """SSDによる検出を行う""" + overlay_image = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + boxes, labels, probs = self.predictor.predict(overlay_image, 1, 0.20) + overlay_image = cv2.cvtColor(overlay_image, cv2.COLOR_BGR2RGB) + + stethoscope_x, stethoscope_y = 0, 0 + if len(probs) != 0: + max_index = np.argmax(probs) + box = boxes[max_index, :] + cv2.rectangle(overlay_image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0, 255, 255), 2) + stethoscope_x = int((box[0] + box[2]) / 2) + stethoscope_y = int((box[1] + box[3]) / 2) + label = f"{self.class_names[labels[max_index]]}: {probs[max_index]:.2f}" + cv2.putText( + overlay_image, + label, + (int(box[0]) + 20, int(box[1]) + 40), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (255, 0, 255), + 1, + ) + + return overlay_image, stethoscope_x, stethoscope_y