Newer
Older
ss2502 / ss2502 / 2502-01.py
@刘老老 刘老老 on 16 Oct 10 KB project
import argparse
import os
import numpy as np
import pydicom
from typing import List, Tuple
import SimpleITK as sitk


class DICOMProcessor:
    def __init__(self):
        self.dicom_files = []
        self.volume_data = None
        self.metadata = {}

    def get_dicom_files(self, folder_path: str) -> List[str]:
        """Get list of DICOM files in specified folder"""
        print(f"Scanning folder: {folder_path}")
        dicom_files = []

        for file in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file)
            if file.lower().endswith('.dcm') and os.path.isfile(file_path):
                dicom_files.append(file_path)
                print(f"Found DICOM file: {file}")

        if not dicom_files:
            raise ValueError(f"No DICOM files (*.dcm) found in folder {folder_path}")

        print(f"Total {len(dicom_files)} DICOM files found")
        return dicom_files

    def get_instance_number(self, dicom_file: str) -> int:
        """Read DICOM file and get Instance Number"""
        try:
            ds = pydicom.dcmread(dicom_file, stop_before_pixels=True)
            instance_number = int(getattr(ds, 'InstanceNumber', 0))
            return instance_number
        except Exception as e:
            print(f"Error reading file {dicom_file}: {e}")
            return 0

    def calculate_slice_spacing(self, ds1: pydicom.Dataset, ds2: pydicom.Dataset) -> float:
        """Calculate Z-direction resolution (slice spacing)"""

        # Method 1: Use SliceLocation difference
        slice_loc1 = getattr(ds1, 'SliceLocation', None)
        slice_loc2 = getattr(ds2, 'SliceLocation', None)

        if slice_loc1 is not None and slice_loc2 is not None:
            try:
                spacing_from_slice_loc = abs(float(slice_loc2) - float(slice_loc1))
                print(f"Spacing calculated from SliceLocation: {spacing_from_slice_loc:.4f} mm")
                return spacing_from_slice_loc
            except (ValueError, TypeError):
                pass

        # Method 2: Use Euclidean distance from ImagePositionPatient
        pos1 = getattr(ds1, 'ImagePositionPatient', None)
        pos2 = getattr(ds2, 'ImagePositionPatient', None)

        if pos1 is not None and pos2 is not None:
            try:
                pos1_array = np.array([float(pos1[0]), float(pos1[1]), float(pos1[2])])
                pos2_array = np.array([float(pos2[0]), float(pos2[1]), float(pos2[2])])
                spacing_from_position = np.linalg.norm(pos2_array - pos1_array)
                print(f"Spacing calculated from ImagePositionPatient: {spacing_from_position:.4f} mm")
                return spacing_from_position
            except (ValueError, TypeError, IndexError):
                pass

        print("Warning: Unable to calculate slice spacing, using default value 1.0 mm")
        return 1.0

    def extract_metadata(self, dicom_files: List[Tuple[str, int]]) -> dict:
        """Extract metadata from DICOM files"""
        if len(dicom_files) < 2:
            print("Warning: Insufficient files to calculate slice spacing")
            return {}

        file1, instance1 = dicom_files[0]
        file2, instance2 = dicom_files[1]

        ds1 = pydicom.dcmread(file1)
        ds2 = pydicom.dcmread(file2)

        metadata = {}

        # Get pixel spacing
        metadata['pixel_spacing'] = getattr(ds1, 'PixelSpacing', [1.0, 1.0])
        print(f"Pixel spacing: {metadata['pixel_spacing']} mm")

        # Calculate slice spacing
        metadata['slice_spacing'] = self.calculate_slice_spacing(ds1, ds2)

        # Get image orientation
        metadata['image_orientation'] = getattr(ds1, 'ImageOrientationPatient', [1, 0, 0, 0, 1, 0])

        # Get image position
        metadata['image_position'] = getattr(ds1, 'ImagePositionPatient', [0, 0, 0])

        # Get other metadata
        metadata['rows'] = getattr(ds1, 'Rows', 0)
        metadata['columns'] = getattr(ds1, 'Columns', 0)
        metadata['bits_allocated'] = getattr(ds1, 'BitsAllocated', 16)
        metadata['bits_stored'] = getattr(ds1, 'BitsStored', 12)
        metadata['pixel_representation'] = getattr(ds1, 'PixelRepresentation', 0)

        return metadata

    def create_volume_data(self, folder_path: str) -> np.ndarray:
        """Create 3D volume data"""
        self.dicom_files = self.get_dicom_files(folder_path)

        print("Reading Instance Numbers...")
        file_instance_pairs = []
        for file_path in self.dicom_files:
            instance_number = self.get_instance_number(file_path)
            file_instance_pairs.append((file_path, instance_number))

        file_instance_pairs.sort(key=lambda x: x[1])
        print("Files sorted by Instance Number completed")

        self.metadata = self.extract_metadata(file_instance_pairs)

        print("Reading pixel data and creating volume data...")
        slice_data = []
        for i, (file_path, instance_num) in enumerate(file_instance_pairs):
            if i % 10 == 0 or i == len(file_instance_pairs) - 1:
                print(f"Reading slice {i + 1}/{len(file_instance_pairs)}")

            ds = pydicom.dcmread(file_path)
            pixel_array = ds.pixel_array

            if hasattr(ds, 'PixelRepresentation') and ds.PixelRepresentation == 1:
                if hasattr(ds, 'RescaleSlope') and hasattr(ds, 'RescaleIntercept'):
                    pixel_array = pixel_array.astype(np.float32) * ds.RescaleSlope + ds.RescaleIntercept

            slice_data.append(pixel_array)

        volume_data = np.stack(slice_data, axis=0)
        print(f"Volume data creation completed: {volume_data.shape}")

        return volume_data

    def save_as_mhd(self, volume_data: np.ndarray, output_filename: str):
        """Save as MHD/RAW format"""
        try:
            image = sitk.GetImageFromArray(volume_data)

            if self.metadata:
                pixel_spacing = self.metadata.get('pixel_spacing', [1.0, 1.0])
                slice_spacing = self.metadata.get('slice_spacing', 1.0)
                image.SetSpacing([float(pixel_spacing[0]), float(pixel_spacing[1]), float(slice_spacing)])
                print(f"Setting voxel spacing: [{pixel_spacing[0]}, {pixel_spacing[1]}, {slice_spacing}]")

            direction = (1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, -1.0)
            image.SetDirection(direction)
            print("Setting direction matrix: [1, 0, 0, 0, 1, 0, 0, 0, -1]")

            sitk.WriteImage(image, output_filename)
            print(f"MHD file saved: {output_filename}")

            self._enhance_mhd_file(output_filename)

        except Exception as e:
            print(f"Error saving MHD file: {e}")
            import traceback
            traceback.print_exc()

    def _enhance_mhd_file(self, mhd_filename: str):
        """Enhance MHD file by adding necessary metadata"""
        try:
            with open(mhd_filename, 'r') as f:
                content = f.read()

            if 'TransformMatrix' not in content:
                lines = content.split('\n')
                new_lines = []

                for line in lines:
                    new_lines.append(line)
                    if line.startswith('ElementSpacing'):
                        new_lines.append('TransformMatrix = 1 0 0 0 1 0 0 0 -1')
                        new_lines.append('Offset = 0 0 0')
                        new_lines.append('CenterOfRotation = 0 0 0')
                        new_lines.append('AnatomicalOrientation = RAS')

                with open(mhd_filename, 'w') as f:
                    f.write('\n'.join(new_lines))

                print("MHD file metadata enhanced")

        except Exception as e:
            print(f"Error enhancing MHD file: {e}")

    def save_as_nifti(self, volume_data: np.ndarray, output_filename: str):
        """Save as NIfTI format"""
        try:
            image = sitk.GetImageFromArray(volume_data)

            if self.metadata:
                pixel_spacing = self.metadata.get('pixel_spacing', [1.0, 1.0])
                slice_spacing = self.metadata.get('slice_spacing', 1.0)
                image.SetSpacing([float(pixel_spacing[0]), float(pixel_spacing[1]), float(slice_spacing)])
                print(f"Setting voxel spacing: [{pixel_spacing[0]}, {pixel_spacing[1]}, {slice_spacing}]")

            sitk.WriteImage(image, output_filename)
            print(f"NIfTI file saved: {output_filename}")

        except Exception as e:
            print(f"Error saving NIfTI file: {e}")
            import traceback
            traceback.print_exc()


def main():
    parser = argparse.ArgumentParser(description='Convert DICOM files to 3D volume data')
    parser.add_argument('input_folder', help='Input DICOM folder path')
    parser.add_argument('output_file', help='Output file path (.mhd or .nii.gz)')

    args = parser.parse_args()

    # Check if input folder exists
    if not os.path.isdir(args.input_folder):
        print(f"Error: Folder {args.input_folder} does not exist")
        return

    # Check output file extension
    output_lower = args.output_file.lower()
    if output_lower.endswith('.nii.gz'):
        output_ext = '.nii.gz'
    else:
        output_ext = os.path.splitext(output_lower)[1]

    valid_extensions = ['.mhd', '.nii.gz']
    if output_ext not in valid_extensions:
        print(f"Error: Unsupported output format. Supported formats: {valid_extensions}")
        return

    try:
        processor = DICOMProcessor()

        print("=" * 50)
        print("Starting DICOM file processing...")
        print("=" * 50)

        volume_data = processor.create_volume_data(args.input_folder)

        print("\n" + "=" * 50)
        print("Volume data information:")
        print("=" * 50)
        print(f"  Shape (Z, Y, X): {volume_data.shape}")
        print(f"  Data type: {volume_data.dtype}")
        print(f"  Value range: {np.min(volume_data):.2f} - {np.max(volume_data):.2f}")

        print("\n" + "=" * 50)
        print("Saving file...")
        print("=" * 50)

        # Save file based on extension
        if output_ext == '.mhd':
            processor.save_as_mhd(volume_data, args.output_file)
        elif output_ext == '.nii.gz':
            processor.save_as_nifti(volume_data, args.output_file)

        print("\n" + "=" * 50)
        print("Conversion completed!")
        print("=" * 50)

    except Exception as e:
        print(f"\nError: Processing failed: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    main()