Skip to content

Commit a7eae35

Browse files
committed
Extract DICOM conversion to util module and add proper affine matrix (#396)
- Move is_dicom_file_or_directory() and convert_dicom_to_nifti() from database.py to new util/volume_conversion.py - Implement extract_affine_from_dicom() using ImageOrientationPatient, ImagePositionPatient, and PixelSpacing from DICOM headers - Create tests/test_volume_conversion.py with unit tests for conversion functions - Remove conversion function tests from test_database.py, keep integration tests for write_volume - Update imports in database.py and test_database.py
1 parent 6cfacf3 commit a7eae35

4 files changed

Lines changed: 254 additions & 113 deletions

File tree

src/openlifu/db/database.py

Lines changed: 4 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212

1313
import h5py
1414
import nibabel as nib
15-
import numpy as np
16-
import pydicom
1715

1816
from openlifu.nav.photoscan import Photoscan, load_data_from_photoscan
1917
from openlifu.plan import Protocol, Run, Solution
2018
from openlifu.util.json import PYFUSEncoder
2119
from openlifu.util.types import PathLike
20+
from openlifu.util.volume_conversion import (
21+
convert_dicom_to_nifti,
22+
is_dicom_file_or_directory,
23+
)
2224
from openlifu.xdc import Transducer, TransducerArray
2325
from openlifu.xdc.util import load_transducer_from_file
2426

@@ -29,94 +31,6 @@
2931
OnConflictOpts = Enum('OnConflictOpts', ['ERROR', 'OVERWRITE', 'SKIP'])
3032

3133

32-
def is_dicom_file_or_directory(path: PathLike) -> bool:
33-
"""
34-
Check if a path is a DICOM file or directory containing DICOM files.
35-
36-
Args:
37-
path: Path to check
38-
39-
Returns:
40-
True if path is a DICOM file or directory with DICOM files, False otherwise
41-
"""
42-
path = Path(path)
43-
44-
if path.is_file():
45-
# check for 'DICM' magic bytes at offset 128
46-
try:
47-
with open(path, 'rb') as f:
48-
f.seek(128)
49-
return f.read(4) == b'DICM'
50-
except OSError:
51-
return False
52-
53-
elif path.is_dir():
54-
for file in path.iterdir():
55-
if file.is_file():
56-
try:
57-
with open(file, 'rb') as f:
58-
f.seek(128)
59-
if f.read(4) == b'DICM':
60-
return True
61-
except OSError:
62-
continue
63-
64-
return False
65-
66-
67-
def convert_dicom_to_nifti(input_path: PathLike, output_filepath: PathLike) -> None:
68-
"""
69-
Convert DICOM file(s) to NIfTI format using pydicom and nibabel.
70-
71-
Args:
72-
input_path: Path to either a DICOM file or directory containing DICOM files
73-
output_filepath: Path where the output NIfTI file should be saved
74-
75-
Raises:
76-
RuntimeError: If the conversion fails
77-
"""
78-
input_path = Path(input_path)
79-
output_filepath = Path(output_filepath)
80-
81-
try:
82-
if input_path.is_file():
83-
dicom_files = [input_path]
84-
else:
85-
# dicom files may not have .dcm extension
86-
dicom_files = [f for f in input_path.iterdir() if f.is_file()]
87-
88-
if not dicom_files:
89-
raise RuntimeError("No DICOM files found")
90-
91-
slices = []
92-
for dcm_file in dicom_files:
93-
try:
94-
ds = pydicom.dcmread(dcm_file)
95-
slices.append((ds.get('InstanceNumber', 0), ds.pixel_array))
96-
except Exception:
97-
# skip files that aren't valid dicom
98-
continue
99-
100-
if not slices:
101-
raise RuntimeError("No valid DICOM files found")
102-
103-
# sort by instance number - this is the slice order in the series
104-
# so we reconstruct the 3D volume in the right order
105-
slices.sort(key=lambda x: x[0])
106-
107-
# stack into 3D volume (handles both single and multiple slices)
108-
volume = np.stack([s[1] for s in slices], axis=-1)
109-
110-
# identity affine for now - could extract from dicom headers in the future
111-
affine = np.eye(4)
112-
113-
nifti_img = nib.Nifti1Image(volume, affine)
114-
nib.save(nifti_img, str(output_filepath))
115-
116-
except Exception as e:
117-
raise RuntimeError(f"DICOM to NIfTI conversion failed: {e}") from e
118-
119-
12034
class Database:
12135
def __init__(self, path: str | None = None):
12236
if path is None:
@@ -1096,7 +1010,6 @@ def load_volume(self, subject, volume_id):
10961010
if not volume_data_filepath.exists() or not volume_data_filepath.is_file():
10971011
self.logger.error(f"Volume data file not found for volume {volume_id}, subject {subject.id}")
10981012
raise FileNotFoundError(f"Volume data file not found for volume {volume_id}, subject {subject.id}")
1099-
import nibabel as nib
11001013
# Load the volume data using nibabel
11011014
volume_data = nib.load(volume_data_filepath)
11021015
self.logger.info(f"Loaded volume {volume_id} for subject {subject.id}")
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
"""Utilities for converting between different medical imaging formats."""
2+
3+
from __future__ import annotations
4+
5+
from pathlib import Path
6+
7+
import nibabel as nib
8+
import numpy as np
9+
import pydicom
10+
11+
from openlifu.util.types import PathLike
12+
13+
14+
def is_dicom_file_or_directory(path: PathLike) -> bool:
15+
"""
16+
Check if a path is a DICOM file or directory containing DICOM files.
17+
18+
Args:
19+
path: Path to check
20+
21+
Returns:
22+
True if path is a DICOM file or directory with DICOM files, False otherwise
23+
"""
24+
path = Path(path)
25+
26+
if path.is_file():
27+
# check for 'DICM' magic bytes at offset 128
28+
try:
29+
with open(path, 'rb') as f:
30+
f.seek(128)
31+
return f.read(4) == b'DICM'
32+
except OSError:
33+
return False
34+
35+
elif path.is_dir():
36+
for file in path.iterdir():
37+
if file.is_file():
38+
try:
39+
with open(file, 'rb') as f:
40+
f.seek(128)
41+
if f.read(4) == b'DICM':
42+
return True
43+
except OSError:
44+
continue
45+
46+
return False
47+
48+
49+
def extract_affine_from_dicom(dicom_slices: list) -> np.ndarray:
50+
"""
51+
Extract the affine transformation matrix from DICOM header information.
52+
53+
Args:
54+
dicom_slices: List of tuples (instance_number, pixel_array, dicom_dataset)
55+
56+
Returns:
57+
4x4 affine transformation matrix mapping voxel coordinates to world coordinates
58+
59+
Raises:
60+
RuntimeError: If required DICOM tags are missing
61+
"""
62+
# use the first slice to extract most parameters
63+
first_ds = dicom_slices[0][2]
64+
65+
try:
66+
# ImageOrientationPatient (0020,0037): direction cosines for row and column
67+
orientation = np.array(first_ds.ImageOrientationPatient, dtype=float)
68+
row_cosine = orientation[:3] # direction cosines for rows
69+
col_cosine = orientation[3:] # direction cosines for columns
70+
71+
# ImagePositionPatient (0020,0032): position of the upper-left voxel
72+
position = np.array(first_ds.ImagePositionPatient, dtype=float)
73+
74+
# PixelSpacing (0028,0030): spacing between pixels [row_spacing, col_spacing]
75+
pixel_spacing = np.array(first_ds.PixelSpacing, dtype=float)
76+
row_spacing = pixel_spacing[0]
77+
col_spacing = pixel_spacing[1]
78+
79+
except AttributeError as e:
80+
raise RuntimeError(
81+
f"Missing required DICOM tag for affine calculation: {e}"
82+
) from e
83+
84+
# calculate slice direction as cross product of row and column directions
85+
slice_cosine = np.cross(row_cosine, col_cosine)
86+
87+
# calculate slice spacing
88+
if len(dicom_slices) > 1:
89+
# calculate from the distance between first two slices
90+
first_pos = np.array(dicom_slices[0][2].ImagePositionPatient, dtype=float)
91+
second_pos = np.array(dicom_slices[1][2].ImagePositionPatient, dtype=float)
92+
slice_spacing = np.linalg.norm(second_pos - first_pos)
93+
else:
94+
# single slice - try to get from SliceThickness or default to 1.0
95+
slice_spacing = float(getattr(first_ds, 'SliceThickness', 1.0))
96+
97+
# construct the affine matrix
98+
# The affine maps from voxel indices (i, j, k) to physical coordinates (x, y, z)
99+
affine = np.eye(4)
100+
affine[:3, 0] = row_cosine * row_spacing
101+
affine[:3, 1] = col_cosine * col_spacing
102+
affine[:3, 2] = slice_cosine * slice_spacing
103+
affine[:3, 3] = position
104+
105+
return affine
106+
107+
108+
def convert_dicom_to_nifti(input_path: PathLike, output_filepath: PathLike) -> None:
109+
"""
110+
Convert DICOM file(s) to NIfTI format using pydicom and nibabel.
111+
112+
The affine transformation matrix is extracted from DICOM headers to properly
113+
map voxel coordinates to scanner coordinates in the NIfTI output.
114+
115+
Args:
116+
input_path: Path to either a DICOM file or directory containing DICOM files
117+
output_filepath: Path where the output NIfTI file should be saved
118+
119+
Raises:
120+
RuntimeError: If the conversion fails
121+
"""
122+
input_path = Path(input_path)
123+
output_filepath = Path(output_filepath)
124+
125+
try:
126+
if input_path.is_file():
127+
dicom_files = [input_path]
128+
else:
129+
# dicom files may not have .dcm extension
130+
dicom_files = [f for f in input_path.iterdir() if f.is_file()]
131+
132+
if not dicom_files:
133+
raise RuntimeError("No DICOM files found")
134+
135+
slices = []
136+
for dcm_file in dicom_files:
137+
try:
138+
ds = pydicom.dcmread(dcm_file)
139+
# store instance number, pixel array, and dataset for affine extraction
140+
slices.append((ds.get('InstanceNumber', 0), ds.pixel_array, ds))
141+
except Exception:
142+
# skip files that aren't valid dicom
143+
continue
144+
145+
if not slices:
146+
raise RuntimeError("No valid DICOM files found")
147+
148+
# sort by instance number - this is the slice order in the series
149+
# so we reconstruct the 3D volume in the right order
150+
slices.sort(key=lambda x: x[0])
151+
152+
# stack into 3D volume (handles both single and multiple slices)
153+
volume = np.stack([s[1] for s in slices], axis=-1)
154+
155+
# extract affine from DICOM headers
156+
affine = extract_affine_from_dicom(slices)
157+
158+
nifti_img = nib.Nifti1Image(volume, affine)
159+
nib.save(nifti_img, str(output_filepath))
160+
161+
except Exception as e:
162+
raise RuntimeError(f"DICOM to NIfTI conversion failed: {e}") from e

tests/test_database.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515

1616
from openlifu import Solution
1717
from openlifu.db import Session, Subject, User
18-
from openlifu.db.database import Database, OnConflictOpts, is_dicom_file_or_directory
18+
from openlifu.db.database import Database, OnConflictOpts
1919
from openlifu.db.session import TransducerTrackingResult
2020
from openlifu.geo import ArrayTransform, Point
2121
from openlifu.nav.photoscan import Photoscan
2222
from openlifu.plan import Protocol, Run
23+
from openlifu.util.volume_conversion import is_dicom_file_or_directory
2324
from openlifu.xdc import Transducer
2425

2526

@@ -744,27 +745,6 @@ def test_get_transducer_absolute_filepaths(example_database, tmp_path: Path, reg
744745
else:
745746
assert absolute_file_paths["transducer_body_abspath"] is None
746747

747-
def test_is_dicom_file_or_directory(tmp_path: Path):
748-
test_dicom_file = Path(__file__).parent / "resources" / "CT_small.dcm"
749-
assert test_dicom_file.exists(), "CT_small.dcm test file should exist"
750-
assert is_dicom_file_or_directory(test_dicom_file)
751-
752-
non_dicom_file = tmp_path / "test.nii"
753-
non_dicom_file.write_bytes(b'not a dicom file')
754-
assert not is_dicom_file_or_directory(non_dicom_file)
755-
756-
dicom_dir = tmp_path / "dicom_series"
757-
dicom_dir.mkdir()
758-
shutil.copy(test_dicom_file, dicom_dir / "CT_small.dcm")
759-
assert is_dicom_file_or_directory(dicom_dir)
760-
761-
non_dicom_dir = tmp_path / "non_dicom"
762-
non_dicom_dir.mkdir()
763-
(non_dicom_dir / "file.txt").write_text("not dicom")
764-
assert not is_dicom_file_or_directory(non_dicom_dir)
765-
766-
assert not is_dicom_file_or_directory(tmp_path / "nonexistent.dcm")
767-
768748
def test_write_volume_dicom(example_database: Database):
769749
"""Test writing a volume from DICOM file - conversion to NIfTI and storage"""
770750
subject_id = "example_subject"

0 commit comments

Comments
 (0)