import torch
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms
import torchmetrics
import lightning as L
import lightning.pytorch.callbacks as callbk
from lightning.pytorch.loggers import TensorBoardLogger
from tqdm.notebook import tqdm
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from CustomCallback import StepDropout
import Loaders
import defs
import argparse
import seaborn as sn
import Models as M
import pandas as pd
import warnings
from ultralytics import YOLO
torch.set_float32_matmul_precision('high')
torch.backends.cudnn.deterministic = True
def Calc_Eval_table_New(TrainModel:M.RARP_NVB_Model):
TrainModel.to(device)
TrainModel.eval()
Predictions = []
Labels = []
if isinstance(TrainModel, M.RARP_NVB_Model_test2):
with torch.no_grad():
for img, label in tqdm(iter(Test_DataLoader)):
img = img.float().to(device)
label = label.to(device)
pred = TrainModel(img)
Predictions.append(torch.softmax(pred, dim=1))
Labels.append(label)
Predictions = torch.cat(Predictions)
Labels = torch.cat(Labels)
print(Predictions, Labels)
acc = torchmetrics.Accuracy("multiclass", num_classes=2).to(device)(Predictions, Labels)
precision = torchmetrics.Precision("multiclass", num_classes=2).to(device)(Predictions, Labels)
recall = torchmetrics.Recall("multiclass", num_classes=2).to(device)(Predictions, Labels)
auc = torchmetrics.AUROC("multiclass", num_classes=2).to(device)(Predictions, Labels)
f1Score = torchmetrics.F1Score("multiclass", num_classes=2).to(device)(Predictions, Labels)
return [acc.item(), precision.item(), recall.item(), f1Score.item(), auc.item()]
with torch.no_grad():
for img, label in tqdm(iter(Test_DataLoader)):
img = img.float().to(device)
label = label.float().to(device)
pred = TrainModel(img)
Predictions.append(torch.sigmoid(pred.squeeze(1)))
Labels.append(label)
Predictions = torch.cat(Predictions)
Labels = torch.cat(Labels)
print(Predictions, Labels)
acc = torchmetrics.Accuracy('binary').to(device)(Predictions, Labels)
precision = torchmetrics.Precision('binary').to(device)(Predictions, Labels)
recall = torchmetrics.Recall('binary').to(device)(Predictions, Labels)
#cm = torchmetrics.ConfusionMatrix('binary')(Predictions, Labels)
auc = torchmetrics.AUROC('binary').to(device)(Predictions, Labels)
f1Score = torchmetrics.F1Score('binary').to(device)(Predictions, Labels)
return [acc.item(), precision.item(), recall.item(), f1Score.item(), auc.item()]
def Calc_Eval_table(TrainModel:M.RARP_NVB_Model,TestDataLoadre:DataLoader, Youden=False, modelName="", Add_TestDataset:DataLoader=None ):
TrainModel.to(device)
TrainModel.eval()
Predictions = []
Labels = []
with torch.no_grad():
for data, label in tqdm(iter(TestDataLoadre)):
data = data.float().to(device)
label = label.to(device)
pred = TrainModel(data).flatten()
Predictions.append(torch.sigmoid(pred))
Labels.append(label)
if Add_TestDataset is not None:
with torch.no_grad():
for data, label in tqdm(iter(Add_TestDataset)):
data = data.float().to(device)
label = label.to(device)
pred = TrainModel(data).flatten()
Predictions.append(torch.sigmoid(pred))
Labels.append(label)
Predictions = torch.cat(Predictions)
Labels = torch.cat(Labels)
print(Predictions, Labels)
acc = torchmetrics.Accuracy('binary').to(device)(Predictions, Labels)
precision = torchmetrics.Precision('binary').to(device)(Predictions, Labels)
recall = torchmetrics.Recall('binary').to(device)(Predictions, Labels)
auc = torchmetrics.AUROC('binary').to(device)(Predictions, Labels)
f1Score = torchmetrics.F1Score('binary').to(device)(Predictions, Labels)
specificty = torchmetrics.Specificity("binary").to(device)(Predictions, Labels)
table = [
["0.5000", f"{acc.item():.4f}", f"{precision.item():.4f}", f"{recall.item():.4f}", f"{f1Score.item():.4f}", f"{auc.item():.4f}", f"{specificty.item():.4f}", ""]
]
if Youden:
for i in range(2):
aucCurve = torchmetrics.ROC("binary").to(device)
fpr, tpr, thhols = aucCurve(Predictions, Labels)
index = torch.argmax(tpr - fpr)
th2 = (recall + specificty - 1).item()
th2 = 0.5 if th2 <= 0 else th2
th1 = thhols[index].item() if i == 0 else th2
accY = torchmetrics.Accuracy('binary', threshold=th1).to(device)(Predictions, Labels)
precisionY = torchmetrics.Precision('binary', threshold=th1).to(device)(Predictions, Labels)
recallY = torchmetrics.Recall('binary', threshold=th1).to(device)(Predictions, Labels)
specifictyY = torchmetrics.Specificity("binary", threshold=th1).to(device)(Predictions, Labels)
f1ScoreY = torchmetrics.F1Score('binary', threshold=th1).to(device)(Predictions, Labels)
#cm2 = torchmetrics.ConfusionMatrix('binary', threshold=th1).to(device)
#cm2.update(Predictions, Labels)
#_, ax = cm2.plot()
#ax.set_title(f"NVB Classifier (th={th1:.4f})")
table.append([f"{th1:.4f}", f"{accY.item():.4f}", f"{precisionY.item():.4f}", f"{recallY.item():.4f}", f"{f1ScoreY.item():.4f}", f"{auc.item():.4f}", f"{specifictyY.item():.4f}", modelName])
return table
def setup_seed(seed):
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic = True
def CAM(model:M.RARP_NVB_Model, img:torch.Tensor, device):
with torch.no_grad():
img = img.to(device).float().unsqueeze(0)
pred, feature = model(img)
_, c, h, w = feature.shape
feature = feature.reshape((c, h*w))
if isinstance(model, (M.RARP_NVB_ResNet18_CAM, M.RARP_NVB_ResNet50_CAM)):
wParams = list(model.model.fc.parameters())
elif isinstance(model, M.RARP_NVB_MobileNetV2_CAM):
wParams = list(model.model.classifier.parameters())
pesos = wParams[0].detach()
cam = torch.matmul(pesos, feature)
cam = cam - torch.min(cam)
cam_img = cam / torch.max(cam)
cam_img = cam_img.reshape(h, w).cpu()
return cam_img, torch.sigmoid(pred)
def CAMVisualizer(img, heatmap, pred, label, mean, std, ax, row):
img = img.numpy().transpose((1, 2, 0))
heatmap = transforms.functional.resize(heatmap.unsqueeze(0), (img.shape[0], img.shape[1]), antialias=True)[0]
img = np.clip((std * img + mean) / 255, 0, 1)
img = img[...,::-1].copy()
col = 0
if row > 3:
col = 2
if row > 7:
col = 4
ax[row % 4][col + 0].imshow(img)
ax[row % 4][col + 0].axis('off')
ax[row % 4][col + 1].imshow(img)
ax[row % 4][col + 1].imshow(heatmap, alpha=0.5, cmap="jet")
ax[row % 4][col + 1].axis('off')
ax[row % 4][col + 1].set_title(f"Pred.: {pred.item():.4f}; Label: {label}")
#plt.title()
def ShowCAM(TrainedModel:M.RARP_NVB_Model, DataSet, mean, std, title=""):
TrainedModel.to(device)
TrainedModel.eval()
i = 0
params = {
"left":0,
"bottom":0.01,
"right":1,
"top":0.914,
"wspace":0,
"hspace":0.164
}
fig, axis = plt.subplots(4, 6, gridspec_kw=params)
with torch.no_grad():
if len(DataSet) > 12:
ix = np.unique(DataSet.targets, return_counts=True)[1]
NOTNVB_Indexs = np.asarray(range(ix[0]))
NVB_Indexs = np.asarray(range(ix[0], ix[0] + ix[1]))
np.random.shuffle(NOTNVB_Indexs)
np.random.shuffle(NVB_Indexs)
for j, index in enumerate(NOTNVB_Indexs):
if j == 6:
break
img, label = DataSet[index]
cam, pred = CAM(TrainedModel, img, device)
CAMVisualizer(img, cam, pred, label, mean, std, axis, i)
i += 1
for j, index in enumerate(NVB_Indexs):
if j == 6:
break
img, label = DataSet[index]
cam, pred = CAM(TrainedModel, img, device)
CAMVisualizer(img, cam, pred, label, mean, std, axis, i)
i += 1
else:
for img, label in tqdm(DataSet):
cam, pred = CAM(TrainedModel, img, device)
CAMVisualizer(img, cam, pred, label, mean, std, axis, i)
i += 1
fig.suptitle(title)
def Calc_Eval(TrainModel:M.RARP_NVB_Model):
TrainModel.to(device)
TrainModel.eval()
Predictions = []
Labels = []
with torch.no_grad():
for data, label in tqdm(testDataset):
data = data.to(device).float().unsqueeze(0)
pred = torch.sigmoid(TrainModel(data)[0].cpu())
Predictions.append(pred)
Labels.append(label)
Predictions = torch.cat(Predictions)
Labels = torch.tensor(Labels).int()
print(Predictions, Labels)
acc = torchmetrics.Accuracy('binary')(Predictions, Labels)
precision = torchmetrics.Precision('binary')(Predictions, Labels)
recall = torchmetrics.Recall('binary')(Predictions, Labels)
cm = torchmetrics.ConfusionMatrix('binary')(Predictions, Labels)
auc = torchmetrics.AUROC('binary')(Predictions, Labels)
f1Score = torchmetrics.F1Score('binary')(Predictions, Labels)
print(f"Val Accuracy: {acc:.4f}")
print(f"Val Precision: {precision:.4f}")
print(f"Val Recall: {recall:.4f}")
print(f"F1 Score: {f1Score:.4f}")
print(f"AUROC: {auc:.4f}")
print(testDataset.classes)
ax = sn.heatmap(cm, cmap="Greens", cbar=False, annot=True, annot_kws={"size": 18}, fmt='g', xticklabels=testDataset.classes, yticklabels=testDataset.classes)
ax.set_title(f"NVB Classifier Split #{args.Fold+1}")
ax.set_xlabel('Predict')
ax.set_ylabel('True')
plt.show()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--Phase", default="train", type=str, help="'train' or 'eval'")
parser.add_argument("--Fold", type=int, default=0)
parser.add_argument("--Workers", type=int, default=0)
parser.add_argument("--Log_Name", type=str, default="logs_debug", help="the name of the directory of the log chkp")
parser.add_argument("--Model", type=int, default=0, help="0 = ResNet18, 1 = ResNet50")
parser.add_argument("-lv", "--Log_version", type=int)
parser.add_argument("-le", "--Log_epoch", type=int)
parser.add_argument("-ls", "--Log_step", type=int)
parser.add_argument("--Remove_Blackbar", type=bool, default=True)
parser.add_argument("--BGR2RGB", type=bool, default=False)
parser.add_argument("--CAM", type=bool, default=False)
parser.add_argument("-roi", "--Use_ROI_Dataset", type=int, default=0)
parser.add_argument("-s", "--imgSlice_pct", type=float, default=None)
parser.add_argument("-ns", "--Num_Slices", type=int, default=4)
parser.add_argument("-wl", "--Wloss",type=bool, default=False)
parser.add_argument("--sClass",type=int, default=None)
parser.add_argument("-tl", "--TypeLoss", type=int, default=0)
parser.add_argument("-cs", "--ColorSpace", type=int, default=None)
parser.add_argument("--JIndex", type=bool, default=False)
parser.add_argument("-me", "--maxEpochs", type=int, default=None)
parser.add_argument("-lc", "--LoadChkpt", type=str, default=None)
parser.add_argument("--AddTestSet", type=str, default=None)
args = parser.parse_args()
if args.CAM and args.Phase == "train":
raise Exception("Clases Activation Clases only in eval o eval_all")
match args.Use_ROI_Dataset:
case 0:
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_main",
FoldSeed=505,
createFile=True,
SavePath="./DataSet",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace
)
cropSize = 720
case 1:
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_Crop",
FoldSeed=505,
createFile=True,
SavePath="./DataSetCrop",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace
)
cropSize = 256
case 2:
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_Crop1",
FoldSeed=505,
createFile=True,
SavePath="./DataSetCrop1",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace
)
cropSize = 256
case 3:
YoloModel = YOLO(model="RARP_YoloV8_ROI.pt")
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_main",
FoldSeed=505,
createFile=True,
SavePath="./DataSet_YOLO",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace,
ROI_Yolo=YoloModel
)
cropSize = 256
case 4:
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_big",
FoldSeed=505,
createFile=True,
SavePath="./DatasetBig",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace
)
cropSize = 720
Dataset.mean, Dataset.std = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
case 6:
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_full",
FoldSeed=505,
createFile=True,
SavePath="./DatasetFull",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace
)
cropSize = 720
Dataset.mean, Dataset.std = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
case 7:
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_smallBalaced",
FoldSeed=505,
createFile=True,
SavePath="./DatasetSmallBalanced",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace
)
cropSize = 720
Dataset.mean, Dataset.std = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
case 5:
YoloModel = YOLO(model="RARP_YoloV8_ROI.pt")
Dataset = Loaders.RARP_DatasetCreator(
"./DataSet_big",
FoldSeed=505,
createFile=True,
SavePath="./DatasetBig_YOLO",
Fold=5,
removeBlackBar=args.Remove_Blackbar,
RGBGama=args.BGR2RGB,
SegImage=args.imgSlice_pct,
Num_Img_Slices=args.Num_Slices,
SegmentClass=args.sClass,
colorSpace=args.ColorSpace,
ROI_Yolo=YoloModel
)
cropSize = 256
Dataset.mean, Dataset.std = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
Dataset.CreateFolds()
setup_seed(2023)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
batchSize = 17 #17 #8, 32
numWorkers = args.Workers
InitResize = (256,256)
checkPtCallback = callbk.ModelCheckpoint(monitor='val_acc', filename="RARP-{epoch}", save_top_k=10, mode='max')
ckpLossBest = callbk.ModelCheckpoint(monitor="val_loss", filename="RARP-{epoch}-{val_loss:.2f}", save_top_k=2, mode='min')
traintransform = torch.nn.Sequential(
transforms.Resize(InitResize, antialias=True),
transforms.CenterCrop(224), #test w/ Centrecrop
transforms.RandomAffine(
degrees=(-5, 5), scale=(0.9, 1.1), #translate=(0, 0.05),
fill=5
),
#transforms.RandomResizedCrop(224, scale=(0.35, 1), antialias=True),
transforms.RandomHorizontalFlip(1.0),
transforms.Normalize(Dataset.mean, Dataset.std),
).to(device)
valtransform = torch.nn.Sequential(
transforms.Resize(256, antialias=True),
transforms.CenterCrop(224),
transforms.Normalize(Dataset.mean, Dataset.std)
).to(device)
testtransform = torch.nn.Sequential(
transforms.Resize(256, antialias=True),
transforms.CenterCrop(224),
transforms.Normalize(Dataset.mean, Dataset.std)
).to(device)
rootFile = Dataset.CVS_File.parent.parent/f"fold_{args.Fold}"
Add_Test_DataLoader = None
if args.AddTestSet is not None:
Add_TestDataset = torchvision.datasets.DatasetFolder(
str (Path(args.AddTestSet)/f"fold_{args.Fold}"/"test"),
loader=defs.load_file_tensor_norm,
extensions="npy",
transform=testtransform
)
Add_Test_DataLoader = DataLoader(
Add_TestDataset,
batch_size=batchSize,
num_workers=numWorkers,
shuffle=False,
pin_memory=True
)
trainDataset = torchvision.datasets.DatasetFolder(
str (rootFile/"train"),
loader=defs.load_file_tensor_norm,
extensions="npy",
transform=traintransform
)
valDataset = torchvision.datasets.DatasetFolder(
str (rootFile/"val"),
loader=defs.load_file_tensor_norm,
extensions="npy",
transform=valtransform
)
testDataset = torchvision.datasets.DatasetFolder(
str (rootFile/"test"),
loader=defs.load_file_tensor_norm,
extensions="npy",
transform=testtransform
)
Train_DataLoader = DataLoader(
trainDataset,
batch_size=batchSize,
num_workers=numWorkers,
shuffle=True,
pin_memory=True
)
Val_DataLoader = DataLoader(
valDataset,
batch_size=batchSize,
num_workers=numWorkers,
shuffle=False,
pin_memory=True
)
Test_DataLoader = DataLoader(
testDataset,
batch_size=batchSize,
num_workers=numWorkers,
shuffle=False,
pin_memory=True
)
if args.CAM:
testCAMDataset = torchvision.datasets.DatasetFolder(
str (rootFile/"test"),
loader=defs.load_file_tensor_norm,
extensions="npy",
transform=torch.nn.Sequential(
transforms.Resize((224, 224), antialias=True),
transforms.Normalize(Dataset.mean, Dataset.std)
).to(device)
)
TestCAM_DataLoader = DataLoader(
testCAMDataset,
batch_size=batchSize,
num_workers=numWorkers,
shuffle=False,
pin_memory=True
)
print(f"Currtent Fold Splits {Dataset.Splits[args.Fold]}")
print(f"Unique Values in sets")
info = np.unique(trainDataset.targets, return_counts=True), np.unique(valDataset.targets, return_counts=True), np.unique(testDataset.targets, return_counts=True)
print(info)
neg = 0
pos = 0
for i in info:
neg += i[1][0]
pos += i[1][1]
total = neg + pos
factor = 2 if args.TypeLoss == 1 else 1
InitWeight = torch.tensor([total/(neg * factor), total/(pos * factor)]).to(device) if args.Wloss else None
if InitWeight is not None:
print(f"Weights {InitWeight}")
TypeLoss = M.TypeLossFunction(args.TypeLoss)
match args.Model:
case 0:
Model = M.RARP_NVB_ResNet50(InitWeight, TypeLoss)
ModelCAM = M.RARP_NVB_ResNet50_CAM()
case 1:
Model = M.RARP_NVB_ResNet18(InitWeight, TypeLoss)
ModelCAM = M.RARP_NVB_ResNet18_CAM()
case 2:
Model = M.RARP_NVB_MobileNetV2(InitWeight, TypeLoss)
ModelCAM = M.RARP_NVB_MobileNetV2_CAM()
case 3:
Model = M.RARP_NVB_EfficientNetV2(InitWeight, TypeLoss)
ModelCAM = M.RARP_NVB_EfficientNetV2_CAM()
case 4:
Model = M.RARP_NVB_Vit_b_16(InitWeight, TypeLoss)
ModelCAM = None
case 5:
Model = M.RARP_NVB_DenseNet169(InitWeight, TypeLoss)
ModelCAM = None
case _:
raise Exception("Model Not yet Implemented")
NameModel = type(Model).__name__
print(f"Model Used: {NameModel}")
LogFileName = f"{args.Log_Name}" #-{NameModel}
MaxEpochs = 50
if args.Model == 4:
MaxEpochs = 150
if args.maxEpochs is not None:
MaxEpochs = args.maxEpochs
warnings.simplefilter("ignore")
trainer = L.Trainer(
accelerator='gpu',
devices=1,
logger=TensorBoardLogger(save_dir=f"./{LogFileName}"),
log_every_n_steps=5,
#callbacks=[checkPtCallback, StepDropout(5, base_drop_rate=0.2, gamma=0.05, ascending=True)],#if args.Model == 4 else checkPtCallback,
callbacks=[checkPtCallback, ckpLossBest, callbk.EarlyStopping(monitor="val_loss", mode="min", patience=5, verbose=True)],
max_epochs=MaxEpochs,
)
if args.Phase == "train":
print("Train Phase")
trainer.fit(Model, train_dataloaders=Train_DataLoader, val_dataloaders=Val_DataLoader, ckpt_path=args.LoadChkpt)
#trainer.callbacks
trainer.test(Model, dataloaders=Test_DataLoader, ckpt_path="best")
elif args.Phase == "eval_all":
print("Evaluation Phase")
rows = []
pathCkptFile = Path(f"./{LogFileName}/lightning_logs/version_{args.Log_version}/checkpoints/")
for ckpFile in pathCkptFile.glob("*.ckpt"):
print(ckpFile.name)
temp = Calc_Eval_table(Model.load_from_checkpoint(ckpFile, strict=False), Test_DataLoader, args.JIndex, ckpFile.name, Add_TestDataset=Add_Test_DataLoader)
rows += temp
if args.CAM:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
print("CAM")
ShowCAM(ModelCAM.load_from_checkpoint(ckpFile, strict=False), testCAMDataset, Dataset.mean, Dataset.std, ckpFile.name)
df = pd.DataFrame(rows, columns=["Youden", "Acc","Precision","Recall","F1","AUROC","Specificity","CheckPoint"])
df.style.highlight_max(color="red", axis=0)
print(df)
plt.show()
else:
print("Evaluation Phase")
trainLog = [args.Log_version, args.Log_epoch, args.Log_step]
pathCkptFile = Path(f"./{LogFileName}/lightning_logs/version_{trainLog[0]}/checkpoints/epoch={trainLog[1]}-step={trainLog[2]}.ckpt")
Calc_Eval(Model.load_from_checkpoint(pathCkptFile))
if args.CAM:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ShowCAM(ModelCAM.load_from_checkpoint(pathCkptFile, strict=False), testCAMDataset, Dataset.mean, Dataset.std, pathCkptFile.name)
plt.show()