diff --git a/Loaders.py b/Loaders.py index 83e0bb7..0b9d64b 100644 --- a/Loaders.py +++ b/Loaders.py @@ -4,11 +4,14 @@ from pathlib import Path from torch.utils.data import Dataset, DataLoader, Subset from lightning.pytorch import LightningDataModule +import lightning as L import numpy as np import cv2 from typing import Optional, Sequence, Tuple, Union, List, Any, Callable from sklearn.model_selection import KFold, StratifiedKFold import torchvision +import torchvision.transforms as T +import json import math import defs import csv @@ -17,6 +20,8 @@ import shutil from ultralytics import YOLO import itertools +import albumentations as A +from albumentations.pytorch import ToTensorV2 class NVB_Classes(Enum): NOT_NVB = 0 @@ -79,6 +84,48 @@ k, m = divmod(len(a), n) return (a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n)) + def _ROI_Mask(self, input_img:np.ndarray): + transform = A.Compose( + [ + A.Resize(224, 224, interpolation=cv2.INTER_CUBIC), + A.Normalize(self.mean, self.std), + ToTensorV2() + ] + ) + + device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + self.Masked_roi_model.to(device) + self.Masked_roi_model.eval() + + h, w, _ = input_img.shape + sample = input_img.copy() + + input_img = transform(image=input_img) + input_img = input_img["image"] + input_img = input_img.repeat(1, 1, 1, 1) + input_img = input_img.to(device) + + with torch.no_grad(): + mask = torch.sigmoid(self.Masked_roi_model(input_img)) + + mask = mask[0].cpu().numpy().transpose((1, 2, 0)) + mask = cv2.resize(mask, (w, h), interpolation=cv2.INTER_CUBIC) + mask = cv2.GaussianBlur(mask, (5, 5), 0) + + _, th_mask = cv2.threshold(mask, 0.5, 1, cv2.THRESH_BINARY) + th_mask = cv2.morphologyEx(th_mask, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))) + th_mask = th_mask.astype(np.uint8) + + contours = cv2.findContours(th_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + contours = contours[0] if len(contours) == 2 else contours[1] + + roi = max(contours, key=cv2.contourArea) + x, y, w, h = cv2.boundingRect(roi) + + return cv2.bitwise_and(sample, sample, mask=th_mask)[y : y + h, x : x + w] + + def ROI_Extract_YOLO(self, YoloModel:YOLO, image, threshold=0.75): listROIs = [] res = YoloModel(image, stream=True) @@ -110,7 +157,8 @@ colorSpace:int=None, ROI_Yolo:YOLO=None, thresholdYolo_Accuracy:float=0.75, - Num_Labels:int = None + Num_Labels:int = None, + ROI_Mask:L.LightningModule=None ) -> None: root = Path(RootPath) @@ -118,6 +166,10 @@ lStd = [] NO_NVB = [] NVB = [] + self.Num_Labels = None + + self.Masked_roi_model = ROI_Mask + if Num_Labels is not None: MultiClasses = [[] for _ in range(Num_Labels)] self.Num_Labels = Num_Labels @@ -145,7 +197,6 @@ tempImg = cv2.cvtColor(tempImg, colorSpace) if preResize is not None: tempImg = cv2.resize(tempImg, preResize, interpolation=cv2.INTER_AREA)#Cambio de resolucion - tempImgList = [tempImg] #ROI w/ YOLOv8 @@ -294,7 +345,14 @@ root = self.CVS_File.parent.parent database = pd.read_csv(self.CVS_File, usecols=["id", "label", "class", "path"]) arrFolds = np.load(self.Folds_File, allow_pickle=True) - + + if self.Masked_roi_model is not None: + database.reset_index() + for _, row in database.iterrows(): + img = np.load(Path(row["path"])) + np.save(Path(row["path"]), self._ROI_Mask(img)) + + database.reset_index() for foldNum, splits in enumerate(np.array_split(arrFolds, len(arrFolds)/3)): foldPath = root/f"fold_{foldNum}" for datasetType, subSet in enumerate(splits): @@ -345,6 +403,146 @@ target = self.target_transform(target) return (sample, Extra_data), target + +class RARP_DatasetFolder_RoiExtractor(torchvision.datasets.DatasetFolder): + def __init__( + self, + root, + loader, + extensions = None, + transform = None, + target_transform = None, + is_valid_file = None, + create_mask:bool = False + ): + super().__init__(root, loader, extensions, transform, target_transform, is_valid_file) + + self.create_mask = create_mask + + def _removeBlackBorder_mask(self, image:np.ndarray, input_mask:np.ndarray): + #image = np.array(image) + + copyImg = cv2.cvtColor(image.copy(), cv2.COLOR_BGR2HSV) + h = copyImg[:,:,0] + mask = np.ones(h.shape, dtype=np.uint8) * 255 + th = (25, 175) + mask[(h > th[0]) & (h < th[1])] = 0 + copyImg = cv2.cvtColor(copyImg, cv2.COLOR_HSV2BGR) + resROI = cv2.bitwise_and(copyImg, copyImg, mask=mask) + + image_gray = cv2.cvtColor(resROI, cv2.COLOR_BGR2GRAY) + _, thresh = cv2.threshold(image_gray, 0, 255, cv2.THRESH_BINARY) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) + morph = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) + contours = cv2.findContours(morph, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + contours = contours[0] if len(contours) == 2 else contours[1] + bigCont = max(contours, key=cv2.contourArea) + x, y, w, h = cv2.boundingRect(bigCont) + + return image[y : y + h, x : x + w], input_mask[y : y + h, x : x + w] + + def _catmull_rom_spline(self, P0, P1, P2, P3, n_points=20): + points = [] + for t in np.linspace(0, 1, n_points): + # Catmull-Rom formula + t2 = t * t + t3 = t2 * t + x = 0.5 * ((2 * P1[0]) + + (-P0[0] + P2[0]) * t + + (2 * P0[0] - 5 * P1[0] + 4 * P2[0] - P3[0]) * t2 + + (-P0[0] + 3 * P1[0] - 3 * P2[0] + P3[0]) * t3) + + y = 0.5 * ((2 * P1[1]) + + (-P0[1] + P2[1]) * t + + (2 * P0[1] - 5 * P1[1] + 4 * P2[1] - P3[1]) * t2 + + (-P0[1] + 3 * P1[1] - 3 * P2[1] + P3[1]) * t3) + + points.append((x, y)) + + return points + + def _catmull_rom_closed_loop(self, points, n_points=20): + spline_points = [] + n = len(points) + + for i in range(n): + P0 = points[(i - 1) % n] + P1 = points[i] + P2 = points[(i + 1) % n] + P3 = points[(i + 2) % n] + spline_points += self._catmull_rom_spline(P0, P1, P2, P3, n_points) + + return np.array(spline_points) + + def _create_mask_from_contour(self, spline_points:np, mask_size:Tuple = (0, 0)): + smooth_curve_int = np.round(spline_points).astype(np.int32) + mask = np.zeros(mask_size, dtype=np.uint8) + + return cv2.fillPoly(mask, [smooth_curve_int], 1) + + def __getitem__(self, index: int) -> Tuple[Any, Any]: + path, _ = self.samples[index] + + pth = Path(path) + newPth = pth.parent / f"{pth.name.split('.')[0]}.json" + + data = json.load(open(newPth)) + kpts = data["shapes"][0]["points"] + + img = self.loader(path) + if self.transform is not None: + if not self.create_mask: + sample = self.transform(image=img, keypoints=kpts) + + img = sample["image"] + kpts = torch.tensor(sample["keypoints"]) + _, h, w = img.shape + kpts = kpts / torch.tensor([h, w]) + else: + h, w, _ = img.shape + smood_perimeter = self._catmull_rom_closed_loop(kpts, n_points=15) + roi_mask = self._create_mask_from_contour(smood_perimeter, (h, w)) + + crop_img, crop_mask = self._removeBlackBorder_mask(img, roi_mask) + crop_mask = crop_mask.astype(np.float32) + + sample = self.transform(image=crop_img, mask=crop_mask) + img = sample["image"] + kpts = sample["mask"] + + + return img, kpts + + +class RARP_DatasetFolder_ExtraLabel(torchvision.datasets.DatasetFolder): + def __init__( + self, + root: str, + loader: Callable[[str], Any], + Extra_Data: pd.DataFrame, + extensions: Tuple[str, ...] | None = None, + transform: Callable[..., Any] | None = None, + target_transform: Callable[..., Any] | None = None, + is_valid_file: Callable[[str], bool] | None = None + ) -> None: + super().__init__(root, loader, extensions, transform, target_transform, is_valid_file) + self.Extra_Data = Extra_Data + + def __getitem__(self, index: int) -> Tuple[Any, Any]: + path, _ = self.samples[index] + + name = Path(path).name + Extra_data = [int(x) for x in str(self.Extra_Data[self.Extra_Data["raw_name"] == name]["encode_l1"].values[0]).split("|")] + Extra_data = torch.tensor(Extra_data) + + #Loadres is 2 values + sample = self.loader(path) + if self.transform is not None: + sample = self.transform(sample) + if self.target_transform is not None: + Extra_data = self.target_transform(Extra_data) + + return sample, Extra_data class RARP_DatasetFolder_DobleTransform(torchvision.datasets.DatasetFolder): def __init__(self, @@ -376,8 +574,7 @@ return (sample1, sample2, self.PassOrigianlImage(sample)), target else: return (sample1, sample2), target - - + class RARP_PreprocessCreator(): def removeBlackBorder(self, image): image = np.array(image) @@ -762,78 +959,94 @@ device = None, mean:float = None, std:float = None, - Tranform_0 = None + Tranform_0 = None, + Init_Resize = (512,512) ) -> None: self.NumLocal_Crops= NumLocalCrops self.globalCrop1 = torch.nn.Sequential( - torchvision.transforms.RandomResizedCrop( + T.Resize(Init_Resize, antialias=True, interpolation=T.InterpolationMode.BICUBIC), + T.RandomRotation( + degrees=(-15, 15), + fill=5 + ), + T.RandomResizedCrop( Size, scale=GloblaCropsScale, antialias=True, - interpolation=torchvision.transforms.InterpolationMode.BICUBIC + interpolation=T.InterpolationMode.BICUBIC ), - torchvision.transforms.RandomHorizontalFlip(0.5), - torchvision.transforms.RandomErasing(0.2, value="random"), - #torchvision.transforms.RandomApply([ + #T.RandomHorizontalFlip(0.5), + T.RandomErasing(0.2, value="random"), + #T.RandomApply([ # RARP_ChannelSwap() #]), - torchvision.transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2)), - torchvision.transforms.Normalize(mean, std) + T.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)), + T.Normalize(mean, std) ).to(device) self.globalCrop2 = torch.nn.Sequential( - torchvision.transforms.RandomResizedCrop( + T.Resize(Init_Resize, antialias=True, interpolation=T.InterpolationMode.BICUBIC), + T.RandomRotation( + degrees=(-15, 15), + fill=5 + ), + T.RandomResizedCrop( Size, scale=GloblaCropsScale, antialias=True, - interpolation=torchvision.transforms.InterpolationMode.BICUBIC + interpolation=T.InterpolationMode.BICUBIC ), - torchvision.transforms.RandomHorizontalFlip(0.5), - torchvision.transforms.RandomErasing(0.8, value="random"), - #torchvision.transforms.RandomApply([ - # RARP_ChannelSwap() - #]), - torchvision.transforms.RandomApply([ - torchvision.transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2)) + #T.RandomHorizontalFlip(0.5), + T.RandomErasing(0.8, value="random"), + T.RandomApply([ + RARP_ChannelSwap() + ]), + T.RandomApply([ + T.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)) ], 0.1), - torchvision.transforms.RandomApply([ + T.RandomApply([ RARP_Invert() ], 0.2), - torchvision.transforms.Normalize(mean, std) + T.Normalize(mean, std) ).to(device) self.local = torch.nn.Sequential( - torchvision.transforms.RandomResizedCrop( + T.Resize(Init_Resize, antialias=True, interpolation=T.InterpolationMode.BICUBIC), + T.RandomRotation( + degrees=(-5, 5), + fill=5 + ), + T.RandomResizedCrop( Size, scale=LocalCropsScale, antialias=True, - interpolation=torchvision.transforms.InterpolationMode.BICUBIC + interpolation=T.InterpolationMode.BICUBIC ), - torchvision.transforms.RandomHorizontalFlip(0.5), - torchvision.transforms.RandomErasing(0.5, value="random"), - #torchvision.transforms.RandomApply([ - # RARP_ChannelSwap() - #]), - torchvision.transforms.RandomApply([ - torchvision.transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2)) + #T.RandomHorizontalFlip(0.5), + T.RandomErasing(0.1, value="random"), + T.RandomApply([ + RARP_ChannelSwap() ], 0.5), - torchvision.transforms.Normalize(mean, std) + T.RandomApply([ + T.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)) + ], 0.5), + T.Normalize(mean, std) ).to(device) InitResize = (256,256) self.classification = torch.nn.Sequential( - torchvision.transforms.Resize( + T.Resize( InitResize, antialias=True, - interpolation=torchvision.transforms.InterpolationMode.BICUBIC + interpolation=T.InterpolationMode.BICUBIC ), - torchvision.transforms.CenterCrop(224), - torchvision.transforms.RandomAffine( + T.CenterCrop(224), + T.RandomAffine( degrees=(-5, 5), scale=(0.9, 1.1), fill=5 ), - torchvision.transforms.RandomHorizontalFlip(1.0), - torchvision.transforms.Normalize(mean, std), + T.RandomHorizontalFlip(1.0), + T.Normalize(mean, std), ).to(device) if Tranform_0 == None else Tranform_0 def __call__(self, img):