diff --git a/mosaic-image.py b/mosaic-image.py new file mode 100644 index 0000000..6313100 --- /dev/null +++ b/mosaic-image.py @@ -0,0 +1,266 @@ +import os + +import cv2 + + +def mosaic_face_with_yunet( + input_video_path: str, + model_path: str, + output_video_path: str, + frames_output_dir: str, + mosaic_scale: float = 0.1, + score_threshold: float = 0.9, + nms_threshold: float = 0.3, + top_k: int = 5000, +): + """ + YuNet (FaceDetectorYN) を使用して動画の顔のみをモザイク処理し, + モザイク済みの動画と全フレームを出力する関数. + + Parameters + ---------- + input_video_path : str + 入力動画ファイルのパス + model_path : str + YuNet の ONNX モデルファイルへのパス + output_video_path : str + モザイクをかけた結果の出力動画ファイルのパス + frames_output_dir : str + 全フレーム画像の出力先ディレクトリパス + mosaic_scale : float + モザイクの縮小率 (例: 0.1 なら幅高さを1/10に縮小して再拡大) + score_threshold : float + 顔検出におけるスコア閾値 (自信度がこれより低い検出結果は無視) + nms_threshold : float + NMS(非最大抑制)の閾値 + top_k : int + NMS 後に保持する最大検出数 + """ + + # 出力用ディレクトリ作成 + os.makedirs(frames_output_dir, exist_ok=True) + + # 動画を開く + cap = cv2.VideoCapture(input_video_path) + if not cap.isOpened(): + print(f"動画ファイルを開けませんでした: {input_video_path}") + return + + # 動画情報を取得 + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) + + # 動画出力の設定 (fourcc はmp4用にmp4vを使用) + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) + + # YuNet FaceDetectorYN の初期化 + # 入力サイズは推論用に一旦 (320, 320) など固定サイズを与えておき, + # 各フレーム処理時に setInputSize で動的に変更します. + face_detector = cv2.FaceDetectorYN.create( + model=model_path, + config="", + input_size=(320, 320), # 後ほどフレームサイズに合わせて上書き + score_threshold=score_threshold, + nms_threshold=nms_threshold, + top_k=top_k, + backend_id=cv2.dnn.DNN_BACKEND_DEFAULT, + target_id=cv2.dnn.DNN_TARGET_CPU, + ) + + frame_count = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + # フレームサイズが変化する場合は毎回 setInputSize を呼び出す + face_detector.setInputSize((frame.shape[1], frame.shape[0])) + + # 顔検出 + # detect() は (retval, faces) のタプルを返し, faces が検出結果 + # faces の列は [x, y, w, h, confidence, x1, y1, x2, y2, ... x5, y5] (ランドマーク) + _, faces = face_detector.detect(frame) + + if faces is not None: + for face in faces: + x, y, w, h = face[:4].astype(int) + # 必要に応じて座標補正(画像範囲外をはみ出さないようにクリップ) + x = max(0, x) + y = max(0, y) + w = min(w, frame.shape[1] - x) + h = min(h, frame.shape[0] - y) + + # 顔領域を取得 + face_roi = frame[y : y + h, x : x + w] + + # 縮小 + face_roi_small = cv2.resize( + face_roi, + (max(1, int(w * mosaic_scale)), max(1, int(h * mosaic_scale))), + interpolation=cv2.INTER_LINEAR, + ) + # 再拡大 (NEARESTで拡大するとドットが強めのモザイクに) + face_roi_mosaic = cv2.resize( + face_roi_small, (w, h), interpolation=cv2.INTER_NEAREST + ) + + # モザイク部分をフレームに貼り付け + frame[y : y + h, x : x + w] = face_roi_mosaic + + # フレーム画像を保存 + frame_filename = os.path.join(frames_output_dir, f"frame_{frame_count:06d}.jpg") + cv2.imwrite(frame_filename, frame) + + # モザイク処理済みフレームを動画ファイルに書き出し + out.write(frame) + + frame_count += 1 + + cap.release() + out.release() + print("処理が完了しました.") + + +def mosaic_faces_in_directory( + input_dir: str, + output_dir: str, + model_path: str, + mosaic_scale: float = 0.1, + score_threshold: float = 0.9, + nms_threshold: float = 0.3, + top_k: int = 5000, +): + """ + YuNet (FaceDetectorYN) を使用して, 指定ディレクトリ内の画像に対して + 顔モザイク処理を行い, モザイク済み画像を出力ディレクトリへ保存する関数. + + Parameters + ---------- + input_dir : str + 顔モザイクをかけたい画像が格納されているディレクトリのパス + output_dir : str + モザイク後の画像を保存するディレクトリのパス + model_path : str + YuNet の ONNX モデルファイルへのパス + mosaic_scale : float + モザイクの縮小率 (例: 0.1 なら幅高さを1/10に縮小して再拡大) + score_threshold : float + 顔検出におけるスコア閾値 (自信度がこれより低い検出結果は無視) + nms_threshold : float + NMS(非最大抑制)の閾値 + top_k : int + NMS 後に保持する最大検出数 + """ + + # 入出力ディレクトリの作成 + os.makedirs(output_dir, exist_ok=True) + + # YuNet FaceDetectorYN の初期化 + # (入力サイズはとりあえず (320, 320). 後で画像ごとに変更する) + face_detector = cv2.FaceDetectorYN.create( + model=model_path, + config="", + input_size=(320, 320), + score_threshold=score_threshold, + nms_threshold=nms_threshold, + top_k=top_k, + backend_id=cv2.dnn.DNN_BACKEND_DEFAULT, + target_id=cv2.dnn.DNN_TARGET_CPU, + ) + + # 指定ディレクトリ内のファイル一覧を取得 + for filename in os.listdir(input_dir): + # 画像ファイルと想定できる拡張子の場合のみ読み込む (jpg, png など). + # 必要に応じて拡張子を追加してください. + if not ( + filename.lower().endswith(".jpg") + or filename.lower().endswith(".jpeg") + or filename.lower().endswith(".png") + ): + continue + + input_path = os.path.join(input_dir, filename) + img = cv2.imread(input_path) + if img is None: + print(f"読み込みに失敗しました: {input_path}") + continue + + # 画像サイズに合わせて setInputSize を更新 + face_detector.setInputSize((img.shape[1], img.shape[0])) + + # 顔検出 + _, faces = face_detector.detect(img) + + if faces is not None: + for face in faces: + x, y, w, h = face[:4].astype(int) + x = max(0, x) + y = max(0, y) + w = min(w, img.shape[1] - x) + h = min(h, img.shape[0] - y) + + # 顔領域を取得 + face_roi = img[y : y + h, x : x + w] + + # 縮小 + face_roi_small = cv2.resize( + face_roi, + (max(1, int(w * mosaic_scale)), max(1, int(h * mosaic_scale))), + interpolation=cv2.INTER_LINEAR, + ) + # 再拡大 + face_roi_mosaic = cv2.resize( + face_roi_small, (w, h), interpolation=cv2.INTER_NEAREST + ) + + # モザイク部分を画像に貼り付け + img[y : y + h, x : x + w] = face_roi_mosaic + + # モザイク済み画像の保存 + output_path = os.path.join(output_dir, filename) + cv2.imwrite(output_path, img) + + print("ディレクトリ内の画像に対するモザイク処理が完了しました.") + + +def main(): + # --- 以下は動画に対する処理 (コメントアウトで無効化) --- + """ + input_video = "video/tes.mp4" + model_path = "./face_detection_yunet_2023mar.onnx" + output_video = "output_mosaic_yunet.mp4" + frames_dir = "frames" + + mosaic_face_with_yunet( + input_video_path=input_video, + model_path=model_path, + output_video_path=output_video, + frames_output_dir=frames_dir, + mosaic_scale=0.2, # モザイクの粗さを調整 + score_threshold=0.9, + nms_threshold=0.3, + top_k=5000, + ) + """ + + # --- 以下はディレクトリ内の画像に対する処理 (コメントアウトを外して使用) --- + model_path = "./face_detection_yunet_2023mar.onnx" + input_dir = "./pose-detect-frame" # モザイク処理前の画像ディレクトリ + output_dir = "./output_images" # モザイク処理後の画像出力先ディレクトリ + + mosaic_faces_in_directory( + input_dir=input_dir, + output_dir=output_dir, + model_path=model_path, + mosaic_scale=0.2, + score_threshold=0.5, + nms_threshold=0.3, + top_k=5000, + ) + + +if __name__ == "__main__": + main() diff --git a/pose-detect.py b/pose-detect.py new file mode 100644 index 0000000..1c712ae --- /dev/null +++ b/pose-detect.py @@ -0,0 +1,201 @@ +import argparse +import os +import re + +import cv2 +import numpy as np + +# mmdet / mmpose +from mmdet.apis import inference_detector, init_detector +from mmpose.apis import inference_topdown +from mmpose.apis import init_model as init_pose_estimator +from mmpose.evaluation.functional import nms +from mmpose.structures import merge_data_samples +from mmpose.utils import adapt_mmdet_pipeline +from PIL import Image, ImageDraw + + +def extract_keypoints_rtmpose(pose_results): + """ + RTMposeの推論結果 (pose_results) から、もっとも平均可視スコアの高い + インスタンスを選んで keypoints([17,2]) を返す。検出できなければ None。 + """ + if not pose_results: + return None + + max_avg_visible = 0 + best_instance = None + for result in pose_results: + pred_instances = result.pred_instances + for instance in pred_instances: + avg_visible = np.mean(instance.keypoints_visible) + if avg_visible > max_avg_visible: + max_avg_visible = avg_visible + best_instance = instance + + if best_instance is None: + return None + + # best_instance.keypoints の shape が (1, num_kpts, 2) の場合がある + return best_instance.keypoints[0] + + +def pillow_draw_circle(draw, center, radius, fill=None, outline=None, width=1): + """(center=(x,y)) を中心とする円を描画。""" + x, y = int(center[0]), int(center[1]) + left_up = (x - radius, y - radius) + right_down = (x + radius, y + radius) + draw.ellipse([left_up, right_down], fill=fill, outline=outline, width=width) + + +def draw_glow_marker(draw, center, main_color, radius=15): + """ + main_color で塗りつぶし、白枠を付けて光っているように描画 + """ + x, y = int(center[0]), int(center[1]) + # 白枠 (外周3ピクセル分) + outer_radius = radius + 3 + pillow_draw_circle( + draw, (x, y), outer_radius, fill=None, outline=(255, 255, 255), width=2 + ) + # 内部を塗りつぶす + pillow_draw_circle(draw, (x, y), radius, fill=main_color, outline=None, width=0) + + +def main(): + parser = argparse.ArgumentParser(description="RTMpose: detect shoulders & hips.") + parser.add_argument( + "--img_dir", + type=str, + required=True, + help="入力画像フォルダのパス(PNG/JPG 画像が格納されている)", + ) + parser.add_argument( + "--output_dir", + type=str, + default="pose_out", + help="肩・腰描画後の画像を保存するフォルダ", + ) + parser.add_argument( + "--device", + type=str, + default="cuda", + help="推論に使用するデバイス (cuda / cpu など)", + ) + + # ここは「以下のソースコードの設定のままでいいです」とあった部分をそのまま利用 + det_config = "modules/rtmpose/mmdetection_cfg/rtmdet_m_640-8xb32_coco-person.py" + det_checkpoint = ( + "models/rtmpose/rtmdet_m_8xb32-100e_coco-obj365-person-235e8209.pth" + ) + pose_config = ( + "modules/rtmpose/configs/body_2d_keypoint/rtmpose/body8/" + "rtmpose-l_8xb256-420e_body8-256x192.py" + ) + pose_checkpoint = "models/rtmpose/rtmpose-l_simcc-aic-coco_pt-aic-coco_420e-256x192-f016ffe0_20230126.pth" + + args = parser.parse_args() + + os.makedirs(args.output_dir, exist_ok=True) + + # (A) 人体検出器 (RTDet) の初期化 + print("[INFO] Initializing RTDet with config:") + print(" det_config:", det_config) + print(" det_checkpoint:", det_checkpoint) + detector = init_detector(det_config, det_checkpoint, device=args.device) + detector.cfg = adapt_mmdet_pipeline(detector.cfg) + + # (B) RTMpose (姿勢推定) の初期化 + print("[INFO] Initializing RTMpose with config:") + print(" pose_config:", pose_config) + print(" pose_checkpoint:", pose_checkpoint) + pose_estimator = init_pose_estimator( + pose_config, pose_checkpoint, device=args.device + ) + + # 画像ファイル一覧 + images = sorted( + [ + f + for f in os.listdir(args.img_dir) + if f.lower().endswith((".png", ".jpg", ".jpeg")) + ], + key=lambda x: int(re.search(r"(\d+)", x).group()) + if re.search(r"(\d+)", x) + else x, + ) + if not images: + print(f"[WARNING] 指定フォルダに画像が見つかりません: {args.img_dir}") + return + + # 元コードで「肩・腰」に使われていた色 (pose_color_rgb = (33, 95, 154)) + pose_color_rgb = (33, 95, 154) + + for img_name in images: + img_path = os.path.join(args.img_dir, img_name) + frame_bgr = cv2.imread(img_path) + if frame_bgr is None: + print(f"[WARNING] 画像の読み込みに失敗: {img_path}") + continue + + # (1) 物体検出 + frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) + det_result = inference_detector(detector, frame_rgb) + pred_instance = det_result.pred_instances.cpu().numpy() + + # label=0 (person), スコア>0.3 の bboxをフィルタ + bboxes = np.concatenate( + (pred_instance.bboxes, pred_instance.scores[:, None]), axis=1 + ) + person_bboxes = bboxes[ + np.logical_and(pred_instance.labels == 0, pred_instance.scores > 0.3) + ] + + # NMS (バージョンによって引数形式が違う場合あり) + keep_idx = nms(person_bboxes, 0.3) + person_bboxes = person_bboxes[keep_idx, :4] + + if len(person_bboxes) == 0: + # 人物が検出されなければ何も描画せず保存 + out_path = os.path.join(args.output_dir, img_name) + cv2.imwrite(out_path, frame_bgr) + continue + + # (2) RTMpose で姿勢推定 + pose_results = inference_topdown(pose_estimator, frame_rgb, person_bboxes) + data_samples = merge_data_samples(pose_results) + + # (3) 肩・腰 キーポイントだけを取り出す + keypoints = extract_keypoints_rtmpose(pose_results) + if keypoints is None: + out_path = os.path.join(args.output_dir, img_name) + cv2.imwrite(out_path, frame_bgr) + continue + + # index: left_shoulder=5, right_shoulder=6, left_hip=11, right_hip=12 (COCO) + left_shoulder = keypoints[5] + right_shoulder = keypoints[6] + left_hip = keypoints[11] + right_hip = keypoints[12] + + # (4) 肩と腰を元画像に描画 (Pillow) + pil_img = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)) + draw = ImageDraw.Draw(pil_img) + + # 任意の可視性チェックなどは省略。ここでは座標がある4点を描画。 + for pt in [left_shoulder, right_shoulder, left_hip, right_hip]: + draw_glow_marker(draw, (pt[0], pt[1]), pose_color_rgb, radius=15) + + # Pillow → BGR + out_img_rgb = np.array(pil_img) + out_img_bgr = cv2.cvtColor(out_img_rgb, cv2.COLOR_RGB2BGR) + + # (5) 保存 + out_path = os.path.join(args.output_dir, img_name) + cv2.imwrite(out_path, out_img_bgr) + + print("All done. Results saved to:", args.output_dir) + + +if __name__ == "__main__": + main()