|
5 | 5 | import logging |
6 | 6 | import os |
7 | 7 | import shutil |
| 8 | +import tempfile |
8 | 9 | from enum import Enum |
9 | 10 | from pathlib import Path |
10 | 11 | from typing import Dict, List |
11 | 12 |
|
12 | 13 | import h5py |
| 14 | +import nibabel as nib |
| 15 | +import numpy as np |
| 16 | +import pydicom |
13 | 17 |
|
14 | 18 | from openlifu.nav.photoscan import Photoscan, load_data_from_photoscan |
15 | 19 | from openlifu.plan import Protocol, Run, Solution |
|
24 | 28 |
|
25 | 29 | OnConflictOpts = Enum('OnConflictOpts', ['ERROR', 'OVERWRITE', 'SKIP']) |
26 | 30 |
|
| 31 | + |
| 32 | +def is_dicom_file_or_directory(path: PathLike) -> bool: |
| 33 | + """ |
| 34 | + Check if a path is a DICOM file or directory containing DICOM files. |
| 35 | +
|
| 36 | + Args: |
| 37 | + path: Path to check |
| 38 | +
|
| 39 | + Returns: |
| 40 | + True if path is a DICOM file or directory with DICOM files, False otherwise |
| 41 | + """ |
| 42 | + path = Path(path) |
| 43 | + |
| 44 | + if path.is_file(): |
| 45 | + # check for 'DICM' magic bytes at offset 128 |
| 46 | + try: |
| 47 | + with open(path, 'rb') as f: |
| 48 | + f.seek(128) |
| 49 | + return f.read(4) == b'DICM' |
| 50 | + except OSError: |
| 51 | + return False |
| 52 | + |
| 53 | + elif path.is_dir(): |
| 54 | + for file in path.iterdir(): |
| 55 | + if file.is_file(): |
| 56 | + try: |
| 57 | + with open(file, 'rb') as f: |
| 58 | + f.seek(128) |
| 59 | + if f.read(4) == b'DICM': |
| 60 | + return True |
| 61 | + except OSError: |
| 62 | + continue |
| 63 | + |
| 64 | + return False |
| 65 | + |
| 66 | + |
| 67 | +def convert_dicom_to_nifti(input_path: PathLike, output_filepath: PathLike) -> None: |
| 68 | + """ |
| 69 | + Convert DICOM file(s) to NIfTI format using pydicom and nibabel. |
| 70 | +
|
| 71 | + Args: |
| 72 | + input_path: Path to either a DICOM file or directory containing DICOM files |
| 73 | + output_filepath: Path where the output NIfTI file should be saved |
| 74 | +
|
| 75 | + Raises: |
| 76 | + RuntimeError: If the conversion fails |
| 77 | + """ |
| 78 | + input_path = Path(input_path) |
| 79 | + output_filepath = Path(output_filepath) |
| 80 | + |
| 81 | + try: |
| 82 | + if input_path.is_file(): |
| 83 | + dicom_files = [input_path] |
| 84 | + else: |
| 85 | + # dicom files may not have .dcm extension |
| 86 | + dicom_files = [f for f in input_path.iterdir() if f.is_file()] |
| 87 | + |
| 88 | + if not dicom_files: |
| 89 | + raise RuntimeError("No DICOM files found") |
| 90 | + |
| 91 | + slices = [] |
| 92 | + for dcm_file in dicom_files: |
| 93 | + try: |
| 94 | + ds = pydicom.dcmread(dcm_file) |
| 95 | + slices.append((ds.get('InstanceNumber', 0), ds.pixel_array)) |
| 96 | + except Exception: |
| 97 | + # skip files that aren't valid dicom |
| 98 | + continue |
| 99 | + |
| 100 | + if not slices: |
| 101 | + raise RuntimeError("No valid DICOM files found") |
| 102 | + |
| 103 | + # sort by instance number - this is the slice order in the series |
| 104 | + # so we reconstruct the 3D volume in the right order |
| 105 | + slices.sort(key=lambda x: x[0]) |
| 106 | + |
| 107 | + # stack into 3D volume |
| 108 | + if len(slices) == 1: |
| 109 | + # single slice needs extra dimension |
| 110 | + volume = slices[0][1][np.newaxis, :, :] if slices[0][1].ndim == 2 else slices[0][1] |
| 111 | + else: |
| 112 | + # multiple slices stacked along last axis |
| 113 | + volume = np.stack([s[1] for s in slices], axis=-1) |
| 114 | + |
| 115 | + # identity affine for now - could extract from dicom headers in the future |
| 116 | + affine = np.eye(4) |
| 117 | + |
| 118 | + nifti_img = nib.Nifti1Image(volume, affine) |
| 119 | + nib.save(nifti_img, str(output_filepath)) |
| 120 | + |
| 121 | + except Exception as e: |
| 122 | + raise RuntimeError(f"DICOM to NIfTI conversion failed: {e}") from e |
| 123 | + |
| 124 | + |
27 | 125 | class Database: |
28 | 126 | def __init__(self, path: str | None = None): |
29 | 127 | if path is None: |
@@ -378,35 +476,60 @@ def write_volume(self, subject_id, volume_id, volume_name, volume_data_filepath, |
378 | 476 | if not Path(volume_data_filepath).exists(): |
379 | 477 | raise ValueError(f'Volume data filepath does not exist: {volume_data_filepath}') |
380 | 478 |
|
381 | | - volume_ids = self.get_volume_ids(subject_id) |
382 | | - if volume_id in volume_ids: |
383 | | - if on_conflict == OnConflictOpts.ERROR: |
384 | | - raise ValueError(f"Volume with ID {volume_id} already exists for subject {subject_id}.") |
385 | | - elif on_conflict == OnConflictOpts.OVERWRITE: |
386 | | - self.logger.info(f"Overwriting volume with ID {volume_id} for subject {subject_id}.") |
387 | | - elif on_conflict == OnConflictOpts.SKIP: |
388 | | - self.logger.info(f"Skipping volume with ID {volume_id} for subject {subject_id} as it already exists.") |
389 | | - return |
390 | | - else: |
391 | | - raise ValueError("Invalid 'on_conflict' option. Use 'error', 'overwrite', or 'skip'.") |
392 | | - |
393 | | - # Create volume metadata |
394 | | - volume_metadata_dict = {"id": volume_id, "name": volume_name, "data_filename": Path(volume_data_filepath).name} |
395 | | - volume_metadata_json = json.dumps(volume_metadata_dict, separators=(',', ':'), cls=PYFUSEncoder) |
396 | | - |
397 | | - # Save the volume metadata to a JSON file and copy volume data file to database |
398 | | - volume_metadata_filepath = self.get_volume_metadata_filepath(subject_id, volume_id) #subject_id/volume/volume_id/volume_id.json |
399 | | - Path(volume_metadata_filepath).parent.parent.mkdir(exist_ok=True) # volume directory |
400 | | - Path(volume_metadata_filepath).parent.mkdir(exist_ok=True) |
401 | | - with open(volume_metadata_filepath, 'w') as file: |
402 | | - file.write(volume_metadata_json) |
403 | | - shutil.copy(Path(volume_data_filepath), Path(volume_metadata_filepath).parent) |
404 | | - |
405 | | - if volume_id not in volume_ids: |
406 | | - volume_ids.append(volume_id) |
407 | | - self.write_volume_ids(subject_id, volume_ids) |
408 | | - |
409 | | - self.logger.info(f"Added volume with ID {volume_id} for subject {subject_id} to the database.") |
| 479 | + path = Path(volume_data_filepath) |
| 480 | + if path.is_dir() and not is_dicom_file_or_directory(volume_data_filepath): |
| 481 | + raise ValueError(f'Volume data filepath is a directory without DICOM files: {volume_data_filepath}') |
| 482 | + |
| 483 | + # convert dicom to nifti if needed |
| 484 | + temp_nifti_file = None |
| 485 | + if is_dicom_file_or_directory(volume_data_filepath): |
| 486 | + self.logger.info(f"Detected DICOM input for volume {volume_id}, converting to NIfTI format") |
| 487 | + temp_nifti_file = tempfile.NamedTemporaryFile(suffix='.nii.gz', delete=False) |
| 488 | + temp_nifti_path = Path(temp_nifti_file.name) |
| 489 | + temp_nifti_file.close() |
| 490 | + |
| 491 | + try: |
| 492 | + convert_dicom_to_nifti(volume_data_filepath, temp_nifti_path) |
| 493 | + volume_data_filepath = temp_nifti_path |
| 494 | + except Exception as e: |
| 495 | + if temp_nifti_path.exists(): |
| 496 | + temp_nifti_path.unlink() |
| 497 | + raise RuntimeError(f"Failed to convert DICOM to NIfTI: {e}") from e |
| 498 | + |
| 499 | + try: |
| 500 | + volume_ids = self.get_volume_ids(subject_id) |
| 501 | + if volume_id in volume_ids: |
| 502 | + if on_conflict == OnConflictOpts.ERROR: |
| 503 | + raise ValueError(f"Volume with ID {volume_id} already exists for subject {subject_id}.") |
| 504 | + elif on_conflict == OnConflictOpts.OVERWRITE: |
| 505 | + self.logger.info(f"Overwriting volume with ID {volume_id} for subject {subject_id}.") |
| 506 | + elif on_conflict == OnConflictOpts.SKIP: |
| 507 | + self.logger.info(f"Skipping volume with ID {volume_id} for subject {subject_id} as it already exists.") |
| 508 | + return |
| 509 | + else: |
| 510 | + raise ValueError("Invalid 'on_conflict' option. Use 'error', 'overwrite', or 'skip'.") |
| 511 | + |
| 512 | + volume_metadata_dict = {"id": volume_id, "name": volume_name, "data_filename": Path(volume_data_filepath).name} |
| 513 | + volume_metadata_json = json.dumps(volume_metadata_dict, separators=(',', ':'), cls=PYFUSEncoder) |
| 514 | + |
| 515 | + volume_metadata_filepath = self.get_volume_metadata_filepath(subject_id, volume_id) |
| 516 | + Path(volume_metadata_filepath).parent.parent.mkdir(exist_ok=True) |
| 517 | + Path(volume_metadata_filepath).parent.mkdir(exist_ok=True) |
| 518 | + with open(volume_metadata_filepath, 'w') as file: |
| 519 | + file.write(volume_metadata_json) |
| 520 | + shutil.copy(Path(volume_data_filepath), Path(volume_metadata_filepath).parent) |
| 521 | + |
| 522 | + if volume_id not in volume_ids: |
| 523 | + volume_ids.append(volume_id) |
| 524 | + self.write_volume_ids(subject_id, volume_ids) |
| 525 | + |
| 526 | + self.logger.info(f"Added volume with ID {volume_id} for subject {subject_id} to the database.") |
| 527 | + finally: |
| 528 | + # cleanup temp nifti file |
| 529 | + if temp_nifti_file is not None: |
| 530 | + temp_path = Path(temp_nifti_file.name) |
| 531 | + if temp_path.exists(): |
| 532 | + temp_path.unlink() |
410 | 533 |
|
411 | 534 | def write_photocollection(self, subject_id, session_id, reference_number: str, photo_paths: List[PathLike], on_conflict=OnConflictOpts.ERROR): |
412 | 535 | """ Writes a photocollection to database and copies the associated |
|
0 commit comments