import cv2
import numpy as np
from pathlib import Path
import time
from tqdm import tqdm
import argparse
def mse(img1, img2):
h, w = img1.shape
diff = cv2.subtract(img1, img2)
err = np.sum(diff**2)
mse = err/(float(h*w))
return mse
def centerCrop(img:np, size:tuple = None, pct_size:float = 0.3):
if size is None:
size = int(img.shape[0] * (1 - pct_size)), int(img.shape[1] * (1 - pct_size))
x, y = (img.shape[1] - size[1]) // 2, (img.shape[0] - size[0]) // 2
return img[y:y+size[0], x:x+size[1]]
def removeBlackBorder(image):
copyImg = cv2.cvtColor(image.copy(), cv2.COLOR_BGR2HSV)
h = copyImg[:,:,0]
mask = np.ones(h.shape, dtype=np.uint8) * 255
th = (25, 175)
mask[(h > th[0]) & (h < th[1])] = 0
copyImg = cv2.cvtColor(copyImg, cv2.COLOR_HSV2BGR)
resROI = cv2.bitwise_and(copyImg, copyImg, mask=mask)
image_gray = cv2.cvtColor(resROI, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(image_gray, 0, 255, cv2.THRESH_BINARY)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
morph = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
contours = cv2.findContours(morph, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = contours[0] if len(contours) == 2 else contours[1]
bigCont = max(contours, key=cv2.contourArea)
x, y, w, h = cv2.boundingRect(bigCont)
crop = image[y : y + h, x : x + w]
return crop, (x, y, w, h)
def normalization(img, mean = 0.458971, std = 0.225609):
Norm_img = cv2.cvtColor(img.copy(), cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
Norm_img = (Norm_img - mean) / std
Norm_img = cv2.GaussianBlur(Norm_img, (5, 5), 1)
return Norm_img
def seconds_to_hms(seconds):
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
return f'{int(hours)}:{int(minutes):02}:{int(secs):02}'
def fft_cross_correlation(img1, img2):
h, w = img1.shape
H, W = cv2.getOptimalDFTSize(2*h), cv2.getOptimalDFTSize(2*w)
pad1 = np.zeros((H, W), dtype=np.float32)
pad2 = np.zeros((H, W), dtype=np.float32)
pad1[:h, :w] = img1
pad2[:h, :w] = img2
dtf1 = cv2.dft(pad1, flags=cv2.DFT_COMPLEX_OUTPUT)
dtf2 = cv2.dft(pad2, flags=cv2.DFT_COMPLEX_OUTPUT)
cross_power = cv2.mulSpectrums(dtf1, dtf2, 0, conjB=True)
corr = cv2.idft(cross_power, flags=cv2.DFT_SCALE | cv2.DFT_REAL_OUTPUT)
corr = np.fft.fftshift(corr)
max_val = corr.max()
n_img1 = np.linalg.norm(img1)
n_img2 = np.linalg.norm(img2)
sim = max_val / (n_img1 * n_img2)
return corr, sim
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--Input", type=str)
parser.add_argument("-o", "--Output", type=str, default="output.mp4")
parser.add_argument("-t", "--Target", type=str)
parser.add_argument("--Sim_Check", type=bool, default=False)
parser.add_argument("-p", "--Patience", type=int, default=None, help="Seconds analyzed after the threshold is reached")
parser.add_argument("-br","--Remove_Blackbar", type=bool, default=False)
parser.add_argument("-f","--Forze_resize", type=bool, default=False)
args = parser.parse_args()
start_time = time.time()
img = cv2.imread(str(Path(args.Target)), cv2.IMREAD_COLOR)
if args.Remove_Blackbar:
img, _ = removeBlackBorder(img)
img = normalization(img)
H, W = img.shape
cap = cv2.VideoCapture(str(Path(args.Input)))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
video_fps = cap.get(cv2.CAP_PROP_FPS)
print(f"Video FPS: {video_fps:.2f}, Total frames: {total_frames}, Video length: {seconds_to_hms(total_frames / video_fps)}")
pbar = tqdm(
total=total_frames,
bar_format="{desc}", # only render the description
desc="aFPS: 0.00" # initial text
)
errorList = []
prev = time.perf_counter()
best_match = 0.99
frameCouter = 0
best_match_frame = 0
ROI_rect = None
resizeFrame = False
Max_Sim = 0.90
early_Stop_count = 0
sim = 0
sim_str = ""
prev_sim = 0
while cap.isOpened():
if early_Stop_count == args.Patience:
break
cap.set(cv2.CAP_PROP_POS_FRAMES, frameCouter)
ret, frame = cap.read()
if not ret:
break
try:
if (frameCouter == 0):
if args.Remove_Blackbar:
frame, ROI_rect = removeBlackBorder(frame)
img_size = img.shape[0] * img.shape[1]
frame_size = frame.shape[0] * frame.shape[1]
if (img_size > frame_size):
img = cv2.resize(img, (frame.shape[1], frame.shape[0]) if not args.Forze_resize else (512, 512), interpolation=cv2.INTER_CUBIC)
else:
resizeFrame = True
if args.Remove_Blackbar and frameCouter > 0:
x, y, w, h = ROI_rect
frame = frame[y : y + h, x : x + w]
if resizeFrame or args.Forze_resize:
frame = cv2.resize(frame, (W, H) if not args.Forze_resize else (512, 512))
frame = normalization(frame)
error = mse(img, frame)
errorList.append(error)
if error < best_match:
best_match = error
best_match_frame = frameCouter
if (args.Sim_Check):
#_, sim = fft_cross_correlation(img, frame)
sim = cv2.matchTemplate(img, frame, cv2.TM_CCORR_NORMED)[0][0]
sim_str = f"frame #{best_match_frame} similarity: {sim:.6f}"
early_Stop_count = 0 if prev_sim > sim else early_Stop_count
prev_sim = sim
cv2.imwrite(f"./output/{Path(args.Target).name}", frame)
early_Stop_count += 1 if (args.Patience is not None) and (sim >= Max_Sim) else 0
except Exception as e:
continue
finally:
frameCouter += round(video_fps)
now = time.perf_counter()
fps = 1.0 / (now - prev)
prev = now
pbar.set_description(f"Progress: {(frameCouter * 100 / total_frames):.2f}% {frameCouter} / {total_frames}, Time: {seconds_to_hms(frameCouter / video_fps)} FPS: {fps:.2f} best match {best_match:.6f}/{error:.6f} Aprox timestamp {seconds_to_hms(best_match_frame / video_fps)} {sim_str}")
pbar.update(1)
pbar.close()
cap.release()
end_time = time.time()
print(f"{seconds_to_hms(end_time - start_time)} seconds, mean error: {np.array(errorList).mean()}, std: {np.array(errorList).std()}")