import argparse
import os
import numpy as np
import pydicom
from typing import List, Tuple
import SimpleITK as sitk
class DICOMProcessor:
def __init__(self):
self.dicom_files = []
self.volume_data = None
self.metadata = {}
def get_dicom_files(self, folder_path: str) -> List[str]:
"""Get list of DICOM files in specified folder"""
print(f"Scanning folder: {folder_path}")
dicom_files = []
for file in os.listdir(folder_path):
file_path = os.path.join(folder_path, file)
if file.lower().endswith('.dcm') and os.path.isfile(file_path):
dicom_files.append(file_path)
print(f"Found DICOM file: {file}")
if not dicom_files:
raise ValueError(f"No DICOM files (*.dcm) found in folder {folder_path}")
print(f"Total {len(dicom_files)} DICOM files found")
return dicom_files
def get_instance_number(self, dicom_file: str) -> int:
"""Read DICOM file and get Instance Number"""
try:
ds = pydicom.dcmread(dicom_file, stop_before_pixels=True)
instance_number = int(getattr(ds, 'InstanceNumber', 0))
return instance_number
except Exception as e:
print(f"Error reading file {dicom_file}: {e}")
return 0
def calculate_slice_spacing(self, ds1: pydicom.Dataset, ds2: pydicom.Dataset) -> float:
"""Calculate Z-direction resolution (slice spacing)"""
# Method 1: Use SliceLocation difference
slice_loc1 = getattr(ds1, 'SliceLocation', None)
slice_loc2 = getattr(ds2, 'SliceLocation', None)
if slice_loc1 is not None and slice_loc2 is not None:
try:
spacing_from_slice_loc = abs(float(slice_loc2) - float(slice_loc1))
print(f"Spacing calculated from SliceLocation: {spacing_from_slice_loc:.4f} mm")
return spacing_from_slice_loc
except (ValueError, TypeError):
pass
# Method 2: Use Euclidean distance from ImagePositionPatient
pos1 = getattr(ds1, 'ImagePositionPatient', None)
pos2 = getattr(ds2, 'ImagePositionPatient', None)
if pos1 is not None and pos2 is not None:
try:
pos1_array = np.array([float(pos1[0]), float(pos1[1]), float(pos1[2])])
pos2_array = np.array([float(pos2[0]), float(pos2[1]), float(pos2[2])])
spacing_from_position = np.linalg.norm(pos2_array - pos1_array)
print(f"Spacing calculated from ImagePositionPatient: {spacing_from_position:.4f} mm")
return spacing_from_position
except (ValueError, TypeError, IndexError):
pass
print("Warning: Unable to calculate slice spacing, using default value 1.0 mm")
return 1.0
def extract_metadata(self, dicom_files: List[Tuple[str, int]]) -> dict:
"""Extract metadata from DICOM files"""
if len(dicom_files) < 2:
print("Warning: Insufficient files to calculate slice spacing")
return {}
file1, instance1 = dicom_files[0]
file2, instance2 = dicom_files[1]
ds1 = pydicom.dcmread(file1)
ds2 = pydicom.dcmread(file2)
metadata = {}
# Get pixel spacing
metadata['pixel_spacing'] = getattr(ds1, 'PixelSpacing', [1.0, 1.0])
print(f"Pixel spacing: {metadata['pixel_spacing']} mm")
# Calculate slice spacing
metadata['slice_spacing'] = self.calculate_slice_spacing(ds1, ds2)
# Get image orientation
metadata['image_orientation'] = getattr(ds1, 'ImageOrientationPatient', [1, 0, 0, 0, 1, 0])
# Get image position
metadata['image_position'] = getattr(ds1, 'ImagePositionPatient', [0, 0, 0])
# Get other metadata
metadata['rows'] = getattr(ds1, 'Rows', 0)
metadata['columns'] = getattr(ds1, 'Columns', 0)
metadata['bits_allocated'] = getattr(ds1, 'BitsAllocated', 16)
metadata['bits_stored'] = getattr(ds1, 'BitsStored', 12)
metadata['pixel_representation'] = getattr(ds1, 'PixelRepresentation', 0)
return metadata
def create_volume_data(self, folder_path: str) -> np.ndarray:
"""Create 3D volume data"""
self.dicom_files = self.get_dicom_files(folder_path)
print("Reading Instance Numbers...")
file_instance_pairs = []
for file_path in self.dicom_files:
instance_number = self.get_instance_number(file_path)
file_instance_pairs.append((file_path, instance_number))
file_instance_pairs.sort(key=lambda x: x[1])
print("Files sorted by Instance Number completed")
self.metadata = self.extract_metadata(file_instance_pairs)
print("Reading pixel data and creating volume data...")
slice_data = []
for i, (file_path, instance_num) in enumerate(file_instance_pairs):
if i % 10 == 0 or i == len(file_instance_pairs) - 1:
print(f"Reading slice {i + 1}/{len(file_instance_pairs)}")
ds = pydicom.dcmread(file_path)
pixel_array = ds.pixel_array
if hasattr(ds, 'PixelRepresentation') and ds.PixelRepresentation == 1:
if hasattr(ds, 'RescaleSlope') and hasattr(ds, 'RescaleIntercept'):
pixel_array = pixel_array.astype(np.float32) * ds.RescaleSlope + ds.RescaleIntercept
slice_data.append(pixel_array)
volume_data = np.stack(slice_data, axis=0)
print(f"Volume data creation completed: {volume_data.shape}")
return volume_data
def save_as_mhd(self, volume_data: np.ndarray, output_filename: str):
"""Save as MHD/RAW format"""
try:
image = sitk.GetImageFromArray(volume_data)
if self.metadata:
pixel_spacing = self.metadata.get('pixel_spacing', [1.0, 1.0])
slice_spacing = self.metadata.get('slice_spacing', 1.0)
image.SetSpacing([float(pixel_spacing[0]), float(pixel_spacing[1]), float(slice_spacing)])
print(f"Setting voxel spacing: [{pixel_spacing[0]}, {pixel_spacing[1]}, {slice_spacing}]")
direction = (1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, -1.0)
image.SetDirection(direction)
print("Setting direction matrix: [1, 0, 0, 0, 1, 0, 0, 0, -1]")
sitk.WriteImage(image, output_filename)
print(f"MHD file saved: {output_filename}")
self._enhance_mhd_file(output_filename)
except Exception as e:
print(f"Error saving MHD file: {e}")
import traceback
traceback.print_exc()
def _enhance_mhd_file(self, mhd_filename: str):
"""Enhance MHD file by adding necessary metadata"""
try:
with open(mhd_filename, 'r') as f:
content = f.read()
if 'TransformMatrix' not in content:
lines = content.split('\n')
new_lines = []
for line in lines:
new_lines.append(line)
if line.startswith('ElementSpacing'):
new_lines.append('TransformMatrix = 1 0 0 0 1 0 0 0 -1')
new_lines.append('Offset = 0 0 0')
new_lines.append('CenterOfRotation = 0 0 0')
new_lines.append('AnatomicalOrientation = RAS')
with open(mhd_filename, 'w') as f:
f.write('\n'.join(new_lines))
print("MHD file metadata enhanced")
except Exception as e:
print(f"Error enhancing MHD file: {e}")
def save_as_nifti(self, volume_data: np.ndarray, output_filename: str):
"""Save as NIfTI format"""
try:
image = sitk.GetImageFromArray(volume_data)
if self.metadata:
pixel_spacing = self.metadata.get('pixel_spacing', [1.0, 1.0])
slice_spacing = self.metadata.get('slice_spacing', 1.0)
image.SetSpacing([float(pixel_spacing[0]), float(pixel_spacing[1]), float(slice_spacing)])
print(f"Setting voxel spacing: [{pixel_spacing[0]}, {pixel_spacing[1]}, {slice_spacing}]")
sitk.WriteImage(image, output_filename)
print(f"NIfTI file saved: {output_filename}")
except Exception as e:
print(f"Error saving NIfTI file: {e}")
import traceback
traceback.print_exc()
def main():
parser = argparse.ArgumentParser(description='Convert DICOM files to 3D volume data')
parser.add_argument('input_folder', help='Input DICOM folder path')
parser.add_argument('output_file', help='Output file path (.mhd or .nii.gz)')
args = parser.parse_args()
# Check if input folder exists
if not os.path.isdir(args.input_folder):
print(f"Error: Folder {args.input_folder} does not exist")
return
# Check output file extension
output_lower = args.output_file.lower()
if output_lower.endswith('.nii.gz'):
output_ext = '.nii.gz'
else:
output_ext = os.path.splitext(output_lower)[1]
valid_extensions = ['.mhd', '.nii.gz']
if output_ext not in valid_extensions:
print(f"Error: Unsupported output format. Supported formats: {valid_extensions}")
return
try:
processor = DICOMProcessor()
print("=" * 50)
print("Starting DICOM file processing...")
print("=" * 50)
volume_data = processor.create_volume_data(args.input_folder)
print("\n" + "=" * 50)
print("Volume data information:")
print("=" * 50)
print(f" Shape (Z, Y, X): {volume_data.shape}")
print(f" Data type: {volume_data.dtype}")
print(f" Value range: {np.min(volume_data):.2f} - {np.max(volume_data):.2f}")
print("\n" + "=" * 50)
print("Saving file...")
print("=" * 50)
# Save file based on extension
if output_ext == '.mhd':
processor.save_as_mhd(volume_data, args.output_file)
elif output_ext == '.nii.gz':
processor.save_as_nifti(volume_data, args.output_file)
print("\n" + "=" * 50)
print("Conversion completed!")
print("=" * 50)
except Exception as e:
print(f"\nError: Processing failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()