|
| 1 | +from copy import deepcopy |
| 2 | +from pathlib import Path |
| 3 | +from tempfile import TemporaryDirectory |
| 4 | + |
| 5 | +import numpy as np |
| 6 | +import pytest |
| 7 | + |
| 8 | +from kwave.data import Vector |
| 9 | +from kwave.kgrid import kWaveGrid |
| 10 | +from kwave.kmedium import kWaveMedium |
| 11 | +from kwave.ksensor import kSensor |
| 12 | +from kwave.ksource import kSource |
| 13 | +from kwave.kspaceFirstOrder3D import kspaceFirstOrder3D |
| 14 | +from kwave.options.simulation_execution_options import SimulationExecutionOptions |
| 15 | +from kwave.options.simulation_options import SimulationOptions |
| 16 | +from kwave.utils.filters import smooth |
| 17 | +from kwave.utils.mapgen import make_ball |
| 18 | + |
| 19 | + |
| 20 | +def make_simulation_parameters(directory: Path): |
| 21 | + # create the computational grid |
| 22 | + PML_size = 10 # size of the PML in grid points |
| 23 | + N = Vector([32, 64, 64]) - 2 * PML_size # number of grid points |
| 24 | + d = Vector([0.2e-3, 0.2e-3, 0.2e-3]) # grid point spacing [m] |
| 25 | + kgrid = kWaveGrid(N, d) |
| 26 | + |
| 27 | + # define the properties of the propagation medium |
| 28 | + medium = kWaveMedium(sound_speed=1500) # [m/s] |
| 29 | + |
| 30 | + # create initial pressure distribution using makeBall |
| 31 | + ball_magnitude = 10 # [Pa] |
| 32 | + ball_radius = 3 # [grid points] |
| 33 | + p0_array = ball_magnitude * make_ball(N, N / 2, ball_radius) |
| 34 | + p0_array = smooth(p0_array, restore_max=True) |
| 35 | + |
| 36 | + source = kSource() |
| 37 | + source.p0 = p0_array |
| 38 | + |
| 39 | + # define a binary planar sensor |
| 40 | + sensor = kSensor() |
| 41 | + sensor_mask_array = np.zeros(N) |
| 42 | + sensor_mask_array[0, :, :] = 1 # Corrected to be a plane for 3D |
| 43 | + sensor.mask = sensor_mask_array |
| 44 | + |
| 45 | + input_filename = directory / "kwave_input.h5" |
| 46 | + output_filename = directory / "kwave_output.h5" |
| 47 | + checkpoint_filename = directory / "kwave_checkpoint.h5" |
| 48 | + |
| 49 | + simulation_options = SimulationOptions( |
| 50 | + save_to_disk=True, # Must be true for kspaceFirstOrder3D |
| 51 | + pml_size=PML_size, |
| 52 | + pml_inside=False, |
| 53 | + smooth_p0=False, # p0 is already smoothed |
| 54 | + data_cast="single", |
| 55 | + input_filename=input_filename, |
| 56 | + output_filename=output_filename, |
| 57 | + ) |
| 58 | + |
| 59 | + checkpoint_timesteps = 300 |
| 60 | + |
| 61 | + execution_options = SimulationExecutionOptions( |
| 62 | + is_gpu_simulation=False, # Assuming CPU for basic test |
| 63 | + checkpoint_file=checkpoint_filename, |
| 64 | + checkpoint_timesteps=checkpoint_timesteps, |
| 65 | + verbose_level=0 # Keep test output clean |
| 66 | + ) |
| 67 | + return kgrid, medium, source, sensor, simulation_options, execution_options |
| 68 | + |
| 69 | + |
| 70 | +def test_kspaceFirstOrder3D_input_state_preservation(): |
| 71 | + with TemporaryDirectory() as tmpdir_str: |
| 72 | + tmpdir = Path(tmpdir_str) |
| 73 | + kgrid, medium, source, sensor, simulation_options, execution_options = make_simulation_parameters(tmpdir) |
| 74 | + |
| 75 | + # Store original states of critical attributes for comparison |
| 76 | + original_source_p0 = deepcopy(source.p0) |
| 77 | + original_sensor_mask = deepcopy(sensor.mask) |
| 78 | + |
| 79 | + # If source.p or source.u were time-varying, store their initial states too. |
| 80 | + # For this test, p0 is the main source attribute. |
| 81 | + |
| 82 | + # First run |
| 83 | + try: |
| 84 | + _ = kspaceFirstOrder3D(kgrid, medium, source, sensor, simulation_options, execution_options) |
| 85 | + except Exception as e: |
| 86 | + pytest.fail(f"First call to kspaceFirstOrder3D failed: {e}") |
| 87 | + |
| 88 | + # Check if original source and sensor attributes are unchanged |
| 89 | + assert np.array_equal(source.p0, original_source_p0), "source.p0 was modified after first run" |
| 90 | + assert np.array_equal(sensor.mask, original_sensor_mask), "sensor.mask was modified after first run" |
| 91 | + |
| 92 | + # Second run (should not fail if state is preserved) |
| 93 | + # For the second run, we need new input/output filenames or to ensure the C++ code can overwrite. |
| 94 | + # Easiest is to use new filenames for the test. |
| 95 | + simulation_options_run2 = deepcopy(simulation_options) |
| 96 | + simulation_options_run2.input_filename = tmpdir / "kwave_input_run2.h5" |
| 97 | + simulation_options_run2.output_filename = tmpdir / "kwave_output_run2.h5" |
| 98 | + |
| 99 | + execution_options_run2 = deepcopy(execution_options) |
| 100 | + if execution_options_run2.checkpoint_file: # Only change if it exists |
| 101 | + execution_options_run2.checkpoint_file = tmpdir / "kwave_checkpoint_run2.h5" |
| 102 | + |
| 103 | + try: |
| 104 | + _ = kspaceFirstOrder3D(kgrid, medium, source, sensor, simulation_options_run2, execution_options_run2) |
| 105 | + except Exception as e: |
| 106 | + pytest.fail(f"Second call to kspaceFirstOrder3D with original objects failed: {e}") |
| 107 | + |
| 108 | + # Final check that attributes are still the same as the initial state |
| 109 | + assert np.array_equal(source.p0, original_source_p0), "source.p0 was modified after second run" |
| 110 | + assert np.array_equal(sensor.mask, original_sensor_mask), "sensor.mask was modified after second run" |
0 commit comments