{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"from torch.utils.data import DataLoader\n",
"import torchvision\n",
"from torchvision import transforms\n",
"import torchmetrics\n",
"import lightning as L\n",
"import lightning.pytorch.callbacks as callbk\n",
"from lightning.pytorch.loggers import TensorBoardLogger\n",
"from tqdm.notebook import tqdm\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from pathlib import Path\n",
"import Loaders\n",
"import defs\n",
"import Models as M\n",
"import pandas as pd\n",
"import van\n",
"\n",
"def setup_seed(seed):\n",
" torch.manual_seed(seed)\n",
" torch.cuda.manual_seed_all(seed)\n",
" np.random.seed(seed)\n",
" torch.backends.cudnn.deterministic = True\n",
"\n",
"batchSize = 8\n",
"numWorkers = 0"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Dataset = Loaders.RARP_DatasetCreator(\n",
" \"./DataSet_AndoCrop\",\n",
" FoldSeed=505,\n",
" createFile=True,\n",
" SavePath=\"./DataSet_SB_Ando_Crop\",\n",
" Fold=5,\n",
")\n",
"\n",
"Dataset.CreateFolds()\n",
"\n",
"Fold = 0\n",
" \n",
"Dataset.mean, Dataset.std = ([30.38144216, 42.03988769, 97.8896116], [40.63141752, 44.26910074, 50.29294373])\n",
"\n",
"setup_seed(2023)\n",
"device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"InitResize = (256,256)\n",
"ImgResize = (224, 224)\n",
"checkPtCallback = callbk.ModelCheckpoint(monitor='val_acc', filename=\"RARP-{epoch}\", save_top_k=10, mode='max')\n",
"ckpLossBest = callbk.ModelCheckpoint(monitor=\"val_loss\", filename=\"RARP-{epoch}-{val_loss:.2f}\", save_top_k=2, mode='min')\n",
"\n",
"traintransform = torch.nn.Sequential(\n",
" transforms.Resize(InitResize, antialias=True),\n",
" transforms.CenterCrop(224), #AQUI se cambio 2024/05/10\n",
" #transforms.RandomCrop(ImgResize),\n",
" transforms.RandomAffine(\n",
" degrees=(-5, 5), scale=(0.9, 1.1), \n",
" fill=5\n",
" ),\n",
" transforms.RandomHorizontalFlip(1.0),\n",
" transforms.Normalize(Dataset.mean, Dataset.std),\n",
").to(device)\n",
"\n",
"valtransform = torch.nn.Sequential(\n",
" transforms.Resize(256, antialias=True),\n",
" transforms.CenterCrop(224),\n",
" transforms.Normalize(Dataset.mean, Dataset.std)\n",
").to(device)\n",
"\n",
"testtransform = torch.nn.Sequential(\n",
" transforms.Resize(256, antialias=True),\n",
" transforms.CenterCrop(224),\n",
" transforms.Normalize(Dataset.mean, Dataset.std)\n",
").to(device)\n",
"\n",
"rootFile = Dataset.CVS_File.parent.parent/f\"fold_{Fold}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trainDataset = torchvision.datasets.DatasetFolder(\n",
" str (rootFile/\"train\"),\n",
" loader=defs.load_file_tensor,\n",
" extensions=\"npy\",\n",
" transform=traintransform\n",
")\n",
"\n",
"valDataset = torchvision.datasets.DatasetFolder(\n",
" str (rootFile/\"val\"),\n",
" loader=defs.load_file_tensor,\n",
" extensions=\"npy\",\n",
" transform=valtransform\n",
" #transform=testtransform2\n",
")\n",
"\n",
"testDataset = torchvision.datasets.DatasetFolder(\n",
" str (rootFile/\"test\"),\n",
" loader=defs.load_file_tensor,\n",
" extensions=\"npy\",\n",
" transform=testtransform\n",
" #transform=testtransform2\n",
") "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Train_DataLoader = DataLoader(\n",
" trainDataset, \n",
" batch_size=batchSize, \n",
" num_workers=numWorkers, \n",
" shuffle=True, \n",
" pin_memory=True\n",
")\n",
"Val_DataLoader = DataLoader(\n",
" valDataset, \n",
" batch_size=batchSize, \n",
" num_workers=numWorkers, \n",
" shuffle=False, \n",
" pin_memory=True\n",
")\n",
"Test_DataLoader = DataLoader(\n",
" testDataset, \n",
" batch_size=batchSize, \n",
" num_workers=numWorkers, \n",
" shuffle=False, \n",
" pin_memory=True\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TestModel = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.DEFAULT)\n",
"tempFC_ft = TestModel.fc.in_features\n",
"TestModel.fc = torch.nn.Linear(in_features=tempFC_ft, out_features=2)\n",
"Model = M.RARP_NVB_MultiClassModel(None, Model=TestModel)\n",
"\n",
"NameModel = type(Model).__name__\n",
"print(f\"Model Used: {NameModel}\")\n",
"MaxEpochs = 150"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Model = M.RARP_NVB_ResNet50.load_from_checkpoint(\"log_ResNet50_X10/lightning_logs/version_0/checkpoints/RARP-epoch=10.ckpt\")\n",
"Model =M.RARP_NVB_VAN.load_from_checkpoint(\"log_VAN_X10/lightning_logs/version_0/checkpoints/RARP-epoch=0.ckpt\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pytorch_grad_cam import GradCAM\n",
"from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget\n",
"from pytorch_grad_cam.utils.image import show_cam_on_image\n",
"\n",
"targetL = [Model.model.block4[-1]]\n",
"\n",
"CAM = GradCAM(model=Model, target_layers=targetL)\n",
"tar = [ClassifierOutputTarget(0)]\n",
"\n",
"\n",
"\n",
"for data, label in tqdm(iter(Test_DataLoader)):\n",
" gi = CAM(data, targets=tar)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import matplotlib.pyplot as plt\n",
"\n",
"mean, std = ([30.38144216, 42.03988769, 97.8896116], [40.63141752, 44.26910074, 50.29294373])\n",
"\n",
"b = 0\n",
"\n",
"rgb_img = data[b, :].numpy().transpose((1, 2, 0))\n",
"rgb_img = np.clip((std * rgb_img + mean) / 255, 0, 1)\n",
"rgb_img = rgb_img[...,::-1].copy()\n",
"\n",
"grayscale_cam = gi[b, :]\n",
"\n",
"visualization = show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True)\n",
"\n",
"plt.imshow(visualization)\n",
"label"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"LogFileName = \"log_MultiClass\"\n",
"trainer = L.Trainer(\n",
" accelerator='gpu', \n",
" devices=1, \n",
" logger=TensorBoardLogger(save_dir=f\"./{LogFileName}\"),\n",
" log_every_n_steps=5, \n",
" #callbacks=[checkPtCallback, StepDropout(5, base_drop_rate=0.2, gamma=0.05, ascending=True)],#if args.Model == 4 else checkPtCallback, \n",
" callbacks=[checkPtCallback, ckpLossBest, callbk.EarlyStopping(monitor=\"val_loss\", mode=\"min\", patience=5)],\n",
" max_epochs=MaxEpochs,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"Train Phase\")\n",
"trainer.fit(Model, train_dataloaders=Train_DataLoader, val_dataloaders=Val_DataLoader)\n",
"trainer.test(Model, dataloaders=Test_DataLoader, ckpt_path=\"best\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def Calc_EvalMulticlass_table(TrainModel:M.RARP_NVB_Model,TestDataLoadre:DataLoader, Youden=False, modelName=\"\"):\n",
" TrainModel.to(device)\n",
" TrainModel.eval()\n",
"\n",
" Predictions = []\n",
" Labels = []\n",
"\n",
" with torch.no_grad():\n",
" for data, label in tqdm(iter(TestDataLoadre)):\n",
" data = data.float().to(device)\n",
" label = label.to(device)\n",
" \n",
" pred = TrainModel(data)\n",
" Predictions.append(torch.softmax(pred, dim=1))\n",
" Labels.append(label)\n",
"\n",
" Predictions = torch.cat(Predictions)\n",
" Labels = torch.cat(Labels)\n",
"\n",
" print(Predictions, Labels)\n",
"\n",
" acc = torchmetrics.Accuracy(\"multiclass\", num_classes=2).to(device)(Predictions, Labels)\n",
" precision = torchmetrics.Precision(\"multiclass\", num_classes=2).to(device)(Predictions, Labels)\n",
" recall = torchmetrics.Recall(\"multiclass\", num_classes=2).to(device)(Predictions, Labels)\n",
" auc = torchmetrics.AUROC(\"multiclass\", num_classes=2).to(device)(Predictions, Labels)\n",
" f1Score = torchmetrics.F1Score(\"multiclass\", num_classes=2).to(device)(Predictions, Labels)\n",
" specificty = torchmetrics.Specificity(\"multiclass\", num_classes=2).to(device)(Predictions, Labels)\n",
" \n",
" table = [\n",
" [\"0.5000\", f\"{acc}\", f\"{precision}\", f\"{recall}\", f\"{f1Score}\", f\"{auc}\", f\"{specificty}\", f\"*{modelName}*\"] #.item():.4f\n",
" ]\n",
" \n",
" cm2 = torchmetrics.ConfusionMatrix(\"multiclass\", num_classes=2).to(device)\n",
" cm2.update(Predictions, Labels)\n",
" _, ax = cm2.plot()\n",
" ax.set_title(f\"NVB Classifier\")\n",
"\n",
" if Youden:\n",
" for i in range(2):\n",
" aucCurve = torchmetrics.ROC(\"multiclass\", num_classes=2).to(device)\n",
" fpr, tpr, thhols = aucCurve(Predictions, Labels)\n",
" index = torch.argmax(tpr - fpr)\n",
" th2 = (recall + specificty - 1).item()\n",
" th2 = 0.5 if th2 <= 0 else th2\n",
" th1 = thhols[index].item() if i == 0 else th2\n",
" accY = torchmetrics.Accuracy(\"multiclass\", num_classes=2, threshold=th1).to(device)(Predictions, Labels)\n",
" precisionY = torchmetrics.Precision(\"multiclass\", num_classes=2, threshold=th1).to(device)(Predictions, Labels)\n",
" recallY = torchmetrics.Recall(\"multiclass\", num_classes=2, threshold=th1).to(device)(Predictions, Labels)\n",
" specifictyY = torchmetrics.Specificity(\"binary\", threshold=th1).to(device)(Predictions, Labels)\n",
" f1ScoreY = torchmetrics.F1Score(\"multiclass\", num_classes=2, threshold=th1).to(device)(Predictions, Labels)\n",
" #cm2 = torchmetrics.ConfusionMatrix(\"multiclass\", num_classes=2, threshold=th1).to(device)\n",
" #cm2.update(Predictions, Labels)\n",
" #_, ax = cm2.plot()\n",
" #ax.set_title(f\"NVB Classifier (th={th1:.4f})\")\n",
" table.append([f\"{th1:.4f}\", f\"{accY.item():.4f}\", f\"{precisionY.item():.4f}\", f\"{recallY.item():.4f}\", f\"{f1ScoreY.item():.4f}\", f\"{auc.item():.4f}\", f\"{specifictyY.item():.4f}\", modelName])\n",
" \n",
"\n",
" return table"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"Evaluation Phase\")\n",
"Log_version = 0\n",
"rows = []\n",
"pathCkptFile = Path(f\"./{LogFileName}/lightning_logs/version_{Log_version}/checkpoints/\")\n",
"for ckpFile in pathCkptFile.glob(\"*.ckpt\"):\n",
" print(ckpFile.name)\n",
" temp = Calc_EvalMulticlass_table(Model, Test_DataLoader, modelName=ckpFile.name)\n",
" rows += temp\n",
"df = pd.DataFrame(rows, columns=[\"Youden\", \"Acc\",\"Precision\",\"Recall\",\"F1\",\"AUROC\",\"Specificity\",\"CheckPoint\"])\n",
"print(df)\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"import numpy as np\n",
"import torchvision.transforms\n",
"\n",
"t = torch.rand(3, 256, 256)*255\n",
"\n",
"invT = torch.ones(3, 256, 256)*255\n",
"\n",
"t.shape\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"t"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"255 - t "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"t2 = t[[2,1,0]]\n",
"t2.shape"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"import torchvision\n",
"\n",
"model = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.DEFAULT)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"len([*model.conv1.children()])\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with torch.no_grad():\n",
" w = model.conv1.weight\n",
" print(w)\n",
" print (w.shape)\n",
" print (w.sum(2).shape)\n",
" print (w.unsqueeze(2).shape)\n",
" print (model.layer1[0].conv2.bias)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import decord\n",
"from pathlib import Path\n",
"import torch\n",
"import cv2\n",
"import numpy as np\n",
"\n",
"def _removeBlackBorder(image):\n",
" image = np.array(image)\n",
" \n",
" copyImg = cv2.cvtColor(image.copy(), cv2.COLOR_BGR2HSV)\n",
" h = copyImg[:,:,0]\n",
" mask = np.ones(h.shape, dtype=np.uint8) * 255\n",
" th = (25, 175)\n",
" mask[(h > th[0]) & (h < th[1])] = 0\n",
" copyImg = cv2.cvtColor(copyImg, cv2.COLOR_HSV2BGR)\n",
" resROI = cv2.bitwise_and(copyImg, copyImg, mask=mask)\n",
" \n",
" image_gray = cv2.cvtColor(resROI, cv2.COLOR_BGR2GRAY)\n",
" _, thresh = cv2.threshold(image_gray, 0, 255, cv2.THRESH_BINARY)\n",
" kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))\n",
" morph = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)\n",
" contours = cv2.findContours(morph, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)\n",
" contours = contours[0] if len(contours) == 2 else contours[1]\n",
" bigCont = max(contours, key=cv2.contourArea)\n",
" x, y, w, h = cv2.boundingRect(bigCont)\n",
" crop = image[y : y + h, x : x + w]\n",
" return crop\n",
"\n",
"def VideoPrePros_BackBars(VideoIN_Path:str, VideoOUT_Path:str):\n",
" size = (1269, 1007)\n",
" fourcc = cv2.VideoWriter_fourcc(*'mp4v')\n",
" video = cv2.VideoWriter(VideoOUT_Path, fourcc, 30, size)\n",
"\n",
" cap = cv2.VideoCapture(VideoIN_Path)\n",
" while (True):\n",
" ret, frame = cap.read()\n",
" if not ret:\n",
" break\n",
" \n",
" img = _removeBlackBorder(frame)\n",
" img = cv2.resize(img, size, interpolation=cv2.INTER_CUBIC)\n",
" video.write(img)\n",
" \n",
" video.release()\n",
" cap.release()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"videoPath = Path(\"./Video3D/dataset/NOT_NVB/181.mp4\")\n",
"\n",
"VideoPrePros_BackBars(str(videoPath.absolute()), \"outVideo.mp4\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"FPS:30.0\n",
"Video Length: 15.0 seg.\n"
]
}
],
"source": [
"\n",
"vr = decord.VideoReader(\"outVideo.mp4\", decord.cpu(0))\n",
"print (f\"FPS:{vr.get_avg_fps()}\")\n",
"print (f\"Video Length: {len(vr)/vr.get_avg_fps()} seg.\")\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(90, 1006, 1268, 3)"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"frames = vr.get_batch(list(range(0, 450, 5))).asnumpy()\n",
"#frames = torch.Tensor(frames)\n",
"frames.shape"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"frames[5].shape"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"cv2.imwrite(\"salida.jpg\", cv2.cvtColor(frames[5], cv2.COLOR_RGB2BGR))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "pyRARP",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}