diff --git a/CodeEntropy/levels/search.py b/CodeEntropy/levels/search.py index 0e54e77..4b6e6ec 100644 --- a/CodeEntropy/levels/search.py +++ b/CodeEntropy/levels/search.py @@ -6,235 +6,281 @@ import MDAnalysis as mda import numpy as np +from numba import njit -class Search: +@njit +def _apply_pbc(vec, dimensions, half_dimensions): """ - Class for functions to find neighbors. + Apply minimum image convention for periodic boundary conditions. + + Args: + vec (np.ndarray): + Vector to wrap. + dimensions (np.ndarray): + Simulation box dimensions. + half_dimensions (np.ndarray): + Half box lengths. + + Returns: + np.ndarray: + Wrapped vector. """ + for d in range(3): + if vec[d] > half_dimensions[d]: + vec[d] -= dimensions[d] + elif vec[d] < -half_dimensions[d]: + vec[d] += dimensions[d] + return vec - def __init__(self): - """ - Initializes the Search class with a placeholder for the system - trajectory. - """ - self._universe = None - self._mol_id = None +@njit +def _rad_blocking_loop(i_coords, sorted_indices, sorted_distances, coms, dimensions): + """ + Perform RAD neighbor selection using a blocking criterion. + + This is a Numba-compiled implementation of the RAD algorithm, which + determines whether a molecule j is a neighbor of molecule i by checking + whether any closer molecule k blocks j based on angular and distance + relationships. + + The criterion is based on: + + (1 / r_ij)^2 > (1 / r_ik)^2 * cos(theta_jik) + + where k blocks j if the inequality holds. + + Args: + i_coords (np.ndarray): + Coordinates of the central molecule. + sorted_indices (np.ndarray): + Indices of molecules sorted by distance from i. + sorted_distances (np.ndarray): + Distances corresponding to sorted_indices. + coms (np.ndarray): + Precomputed center of mass coordinates for all molecules. + dimensions (np.ndarray): + Simulation box dimensions for periodic boundary conditions. + + Returns: + np.ndarray: + Indices of molecules that belong to the RAD neighbor shell. + """ + n = sorted_indices.shape[0] + limit = min(n, 30) - def get_RAD_neighbors(self, universe, mol_id): - """ - Find the neighbors of molecule with index mol_id. + half_dimensions = 0.5 * dimensions - Args: - universe: The MDAnalysis universe of the system. - mol_id (int): the index for the central molecule. + inv_r2 = 1.0 / (sorted_distances * sorted_distances) - Returns: - neighbor_indices (list of ints): the list of neighboring molecule - indices. - """ - number_molecules = len(universe.atoms.fragments) + shell = np.empty(limit, dtype=np.int64) + count = 0 - central_position = universe.atoms.fragments[mol_id].center_of_mass() + for y in range(limit): + j_idx = sorted_indices[y] + r_ij = sorted_distances[y] + j_coords = coms[j_idx] - # Find distances between molecule of interest and other molecules in the system - distances = {} - for molecule_index_j in range(number_molecules): - if molecule_index_j != mol_id: - j_position = universe.atoms.fragments[molecule_index_j].center_of_mass() - distances[molecule_index_j] = self.get_distance( - j_position, central_position, universe.dimensions[:3] - ) + ba = j_coords - i_coords + ba = _apply_pbc(ba, dimensions, half_dimensions) - # Sort distances smallest to largest - sorted_dist = sorted(distances.items(), key=lambda item: item[1]) + blocked = False - # Get indices of neighbors - neighbor_indices = self._get_RAD_indices( - central_position, sorted_dist, universe, number_molecules - ) + for z in range(y): + k_idx = sorted_indices[z] + r_ik = sorted_distances[z] - return neighbor_indices + if r_ik > r_ij: + continue - def _get_RAD_indices(self, i_coords, sorted_distances, system, number_molecules): - # pylint: disable=too-many-locals - r""" - For a given set of atom coordinates, find its RAD shell from the distance - sorted list, truncated to the closest 30 molecules. + k_coords = coms[k_idx] - This function calculates coordination shells using RAD the relative - angular distance, as defined first in DOI:10.1063/1.4961439 - where molecules are defined as neighbors if - they fulfil the following condition: + ac = k_coords - j_coords + ac = _apply_pbc(ac, dimensions, half_dimensions) - .. math:: - \Bigg(\frac{1}{r_{ij}}\Bigg)^2 > - \Bigg(\frac{1}{r_{ik}}\Bigg)^2 \cos \theta_{jik} + dist_ac2 = (ac * ac).sum() - For a given particle :math:`i`, neighbor :math:`j` is in its coordination - shell if :math:`k` is not blocking particle :math:`j`. In this implementation - of RAD, we enforce symmetry, whereby neighboring particles must be in each - others coordination shells. + denom = -2.0 * r_ik * r_ij + if denom == 0.0: + continue - Args: - i_coords: xyz centre of mass of molecule :math:`i` - sorted_indices: dict of index and distance pairs sorted by distance - system: mdanalysis instance of atoms in a frame + costheta = (dist_ac2 - r_ik * r_ik - r_ij * r_ij) / denom - Returns: - shell: list of indices of particles in the RAD shell of neighbors. - """ - # 1. truncate neighbor list to closest 30 united atoms and iterate - # through neighbors from closest to furthest/ - shell = [] - count = -1 - limit = min(number_molecules - 1, 30) - for y in range(limit): + if inv_r2[y] < inv_r2[z] * costheta: + blocked = True + break + + if not blocked: + shell[count] = j_idx count += 1 - j_idx = sorted_distances[y][0] - j_coords = system.atoms.fragments[j_idx].center_of_mass() - r_ij = sorted_distances[y][1] - blocked = False - # 3. iterate through neighbors other than atom j and check if they block - # it from molecule i - for z in range(count): # only closer units can block - k_idx = sorted_distances[z][0] - k_coords = system.atoms.fragments[k_idx].center_of_mass() - r_ik = sorted_distances[z][1] - # 4. find the angle jik - costheta_jik = self.get_angle( - j_coords, i_coords, k_coords, system.dimensions[:3] - ) - if np.isnan(costheta_jik): - break - # 5. check if k blocks j from i - LHS = (1 / r_ij) ** 2 - RHS = ((1 / r_ik) ** 2) * costheta_jik - if LHS < RHS: - blocked = True - break - # 6. if j is not blocked from i by k, then its in i's shell - if blocked is False: - shell.append(j_idx) - - return shell - - def get_angle( - self, a: np.ndarray, b: np.ndarray, c: np.ndarray, dimensions: np.ndarray - ): + + return shell[:count] + + +class Search: + """ + Class for functions to find neighbors. + """ + + def __init__(self): """ - Get the angle between three atoms, taking into account periodic - bondary conditions. + Initialize the Search class. - b is the vertex of the angle. + This class includes frame-safe caching of fragment COMs and + system dimensions to avoid recomputation while preserving + identical results to the original implementation. + """ + self._cached_frame = None + self._cached_fragments = None + self._cached_coms = None + self._cached_dimensions = None - Pairwise differences between the coordinates are used with the - distances calculated as the square root of the sum of the squared - x, y, and z coordinates. + def _update_cache(self, universe): + """ + Update cached MDAnalysis data if the simulation frame has changed. Args: - a: (3,) array of atom cooordinates - b: (3,) array of atom cooordinates - c: (3,) array of atom cooordinates - dimensions: (3,) array of system box dimensions. + universe (MDAnalysis.Universe): + MDAnalysis universe object containing the system. + """ + current_frame = universe.trajectory.ts.frame - Returns: - cosine_angle: float, cosine of the angle abc. + if self._cached_frame == current_frame: + return + + fragments = universe.atoms.fragments + + coms = np.array([frag.center_of_mass() for frag in fragments]) + + self._cached_fragments = fragments + self._cached_coms = coms + self._cached_dimensions = universe.dimensions[:3] + self._cached_frame = current_frame + + def _get_distances(self, coms, i_coords, dimensions): """ - # Differences in positions - ba = np.abs(a - b) - bc = np.abs(c - b) - ac = np.abs(c - a) + Compute distances between a central coordinate and all fragment COMs + using periodic boundary conditions. - # Correct for periodic boundary conditions - ba = np.where(ba > 0.5 * dimensions, ba - dimensions, ba) - bc = np.where(bc > 0.5 * dimensions, bc - dimensions, bc) - ac = np.where(ac > 0.5 * dimensions, ac - dimensions, ac) + Args: + coms (np.ndarray): + Array of fragment center of mass coordinates. + i_coords (np.ndarray): + Coordinates of the reference (central) molecule. + dimensions (np.ndarray): + Simulation box dimensions. - # Get distances - dist_ba = np.sqrt((ba**2).sum(axis=-1)) - dist_bc = np.sqrt((bc**2).sum(axis=-1)) - dist_ac = np.sqrt((ac**2).sum(axis=-1)) + Returns: + np.ndarray: + Distances from the central molecule to all fragments. + """ + delta = coms - i_coords - # Trigonometry - cosine_angle = (dist_ac**2 - dist_bc**2 - dist_ba**2) / (-2 * dist_bc * dist_ba) + half_dimensions = 0.5 * dimensions + + for d in range(3): + delta[:, d] = np.where( + delta[:, d] > half_dimensions[d], + delta[:, d] - dimensions[d], + delta[:, d], + ) + delta[:, d] = np.where( + delta[:, d] < -half_dimensions[d], + delta[:, d] + dimensions[d], + delta[:, d], + ) - return cosine_angle + return np.sqrt((delta * delta).sum(axis=1)) - def get_distance(self, j_position, i_position, dimensions): + def get_RAD_neighbors(self, universe, mol_id): """ - Function to calculate the distance between two points. - Take periodic boundary conditions into account. + Find RAD neighbors of a given molecule. Args: - j_position: the x, y, z coordinates of point 1 - i_position: the x, y, z coordinates of the other point - dimensions: the dimensions of the simulation box + universe (MDAnalysis.Universe): + The MDAnalysis universe of the system. + mol_id (int): + Index of the central molecule. Returns: - distance: float, the distance between the two points + np.ndarray: + Indices of neighboring molecules identified via the RAD method. """ - # Difference in positions - delta = np.abs(j_position - i_position) + self._update_cache(universe) + + fragments = self._cached_fragments + coms = self._cached_coms + dimensions = self._cached_dimensions + + number_molecules = len(fragments) + + central_position = coms[mol_id] - # Account for periodic boundary conditions - delta = np.where(delta > 0.5 * dimensions, delta - dimensions, delta) + distances_array = self._get_distances(coms, central_position, dimensions) - # Get distance value - distance = np.sqrt((delta**2).sum(axis=-1)) + indices = np.arange(number_molecules) - return distance + mask = indices != mol_id + filtered_indices = indices[mask] + filtered_distances = distances_array[mask] + + order = np.argsort(filtered_distances, kind="mergesort") + + sorted_indices = filtered_indices[order] + sorted_distances = filtered_distances[order] + + neighbor_indices = _rad_blocking_loop( + central_position, + sorted_indices, + sorted_distances, + coms, + dimensions, + ) + + return neighbor_indices def get_grid_neighbors(self, universe, mol_id, highest_level): """ - Use MDAnalysis neighbor search to find neighbors. + Find neighbors using MDAnalysis grid-based neighbor search. - For molecules with just one united atom, use the "A" search level to - find neighboring atoms. For larger molecules use the "R" search level - to find neighboring residues. - - The atoms/residues of the molecule of interest are removed from the - neighbor list. + For small molecules (united_atom), atom-level search is used. + For larger molecules, residue-level search is used. Args: - universe: MDAnalysis universe object for system. - mol_id: int, the index for the molecule of interest. - highest_level: str, molecule level. + universe (MDAnalysis.Universe): + MDAnalysis universe object for the system. + mol_id (int): + Index of the molecule of interest. + highest_level (str): + Molecule level ("united_atom" or other). Returns: - neighbors: MDAnalysis atomgroup of the neighboring particles. + np.ndarray: + Fragment indices of neighboring molecules. """ + fragments = universe.atoms.fragments + fragment = fragments[mol_id] + search_object = mda.lib.NeighborSearch.AtomNeighborSearch(universe.atoms) - fragment = universe.atoms.fragments[mol_id] + selection_string = f"index {fragment.indices[0]}:{fragment.indices[-1]}" molecule_atom_group = universe.select_atoms(selection_string) if highest_level == "united_atom": - # For united atom size molecules, use the grid search - # to find neighboring atoms - search_level = "A" search = mda.lib.NeighborSearch.AtomNeighborSearch.search( search_object, molecule_atom_group, radius=3.0, - level=search_level, + level="A", ) - # Make sure that the neighbors list does not include - # atoms from the central molecule - # neighbors = search - fragment.residues neighbors = search - molecule_atom_group else: - # For larger molecules, use the grid search to find neighboring residues - search_level = "R" search = mda.lib.NeighborSearch.AtomNeighborSearch.search( search_object, molecule_atom_group, radius=3.5, - level=search_level, + level="R", ) - # Make sure that the neighbors list does not include - # residues from the central molecule neighbors = search - fragment.residues neighbors = neighbors.atoms diff --git a/conda-recipe/meta.yaml b/conda-recipe/meta.yaml index 28af659..9812831 100644 --- a/conda-recipe/meta.yaml +++ b/conda-recipe/meta.yaml @@ -39,6 +39,7 @@ requirements: - distributed >=2026.1.2,<2026.2.0 - dask-jobqueue >=0.9,<0.10 - pytest-xdist >=3.8, <3.9 + - numba >=0.65.0, <0.7 test: imports: diff --git a/pyproject.toml b/pyproject.toml index d7b733c..5f62583 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dependencies = [ "waterEntropy>=2,<2.3", "requests>=2.32,<3.0", "rdkit>=2025.9.5", + "numba>=0.65.0,<0.70", ] [project.urls] diff --git a/tests/regression/conftest.py b/tests/regression/conftest.py index 05be6f7..2dfd281 100644 --- a/tests/regression/conftest.py +++ b/tests/regression/conftest.py @@ -1,14 +1,15 @@ from __future__ import annotations +import os +import random + +import numpy as np import pytest def pytest_addoption(parser: pytest.Parser) -> None: """ Register custom command-line options for pytest. - - Adds options to control regression test execution, baseline updates, - and debugging output. """ parser.addoption( "--run-slow", @@ -38,11 +39,17 @@ def pytest_addoption(parser: pytest.Parser) -> None: def pytest_configure(config: pytest.Config) -> None: """ - Register custom pytest markers. + Register markers and enforce deterministic behavior. """ config.addinivalue_line("markers", "regression: end-to-end regression tests") config.addinivalue_line("markers", "slow: long-running tests (20-30+ minutes)") + seed = 0 + random.seed(seed) + np.random.seed(seed) + + os.environ["PYTHONHASHSEED"] = "0" + def pytest_collection_modifyitems( config: pytest.Config, items: list[pytest.Item] @@ -73,7 +80,6 @@ def pytest_collection_modifyitems( if callspec is not None: case = callspec.params.get("case") - # Keep non-parametrized tests if case is None: filtered_items.append(item) continue diff --git a/tests/unit/CodeEntropy/levels/test_search.py b/tests/unit/CodeEntropy/levels/test_search.py index 51b15e0..30996e8 100644 --- a/tests/unit/CodeEntropy/levels/test_search.py +++ b/tests/unit/CodeEntropy/levels/test_search.py @@ -1,202 +1,138 @@ -from pathlib import Path from unittest.mock import MagicMock, patch import numpy as np import pytest -import yaml - -import tests.regression.helpers as Helpers -from CodeEntropy.config.runtime import CodeEntropyRunner -from CodeEntropy.levels.mda import UniverseOperations -from CodeEntropy.levels.search import Search - -# some dummy atom positions -a = np.array([0, 0, 1]) -b = np.array([0, 1, 0]) -c = np.array([1, 0, 0]) -d = np.array([0, 1, 1]) -e = np.array([0, 11, 11]) -dimensions = np.array([10, 10, 10]) - -DEFAULT_TESTDATA_BASE_URL = "https://www.ccpbiosim.ac.uk/file-store/codeentropy-testing" - - -def test_get_RAD_neighbors(tmp_path: Path): - """ - Args: - tmp_path: Pytest provided temporatry directory - """ - args = {} - search = Search() - system = "methane" - repo_root = Path(__file__).resolve().parents[4] - config_path = ( - repo_root / "tests" / "regression" / "configs" / system / "default.yaml" - ) - tmp_path.mkdir(parents=True, exist_ok=True) +from CodeEntropy.levels.search import Search, _apply_pbc, _rad_blocking_loop - raw = yaml.safe_load(config_path.read_text()) - if not isinstance(raw, dict): - raise ValueError( - f"Config must parse to a dict. Got {type(raw)} from {config_path}" - ) - cooked = Helpers._abspathify_config_paths(raw, base_dir=config_path.parent) - required: list[Path] = [] - run1 = cooked.get("run1") - if isinstance(run1, dict): - ff = run1.get("force_file") - if isinstance(ff, str) and ff: - required.append(Path(ff)) - for p in run1.get("top_traj_file") or []: - if isinstance(p, str) and p: - required.append(Path(p)) - - if required: - Helpers.ensure_testdata_for_system(system, required_paths=required) - - runner = CodeEntropyRunner(tmp_path) - parser = runner._config_manager.build_parser() - args, _ = parser.parse_known_args() - args.end = run1.get("end") - args.top_traj_file = run1.get("top_traj_file") - args.file_format = run1.get("file_format") - assert args.end == 1 - - universe_operations = UniverseOperations() - universe = CodeEntropyRunner._build_universe(args, universe_operations) - - neighbors = search.get_RAD_neighbors(universe=universe, mol_id=0) - - assert neighbors == [151, 3, 75, 219, 229, 488, 460, 118, 230, 326] - - -def test_get_grid_neighbors(tmp_path: Path): - """ - Args: - tmp_path: Pytest provided temporatry directory - """ - args = {} - search = Search() - system = "methane" - repo_root = Path(__file__).resolve().parents[4] - config_path = ( - repo_root / "tests" / "regression" / "configs" / system / "default.yaml" - ) +@pytest.fixture +def search(): + return Search() - tmp_path.mkdir(parents=True, exist_ok=True) - raw = yaml.safe_load(config_path.read_text()) - if not isinstance(raw, dict): - raise ValueError( - f"Config must parse to a dict. Got {type(raw)} from {config_path}" - ) +def test_apply_pbc_wraps_positive(): + vec = np.array([11.0, 0.0, 0.0]) + dimensions = np.array([10.0, 10.0, 10.0]) + half = 0.5 * dimensions + + result = _apply_pbc(vec.copy(), dimensions, half) + + assert np.allclose(result, [1.0, 0.0, 0.0]) + + +def test_apply_pbc_wraps_negative(): + vec = np.array([-11.0, 0.0, 0.0]) + dimensions = np.array([10.0, 10.0, 10.0]) + half = 0.5 * dimensions - cooked = Helpers._abspathify_config_paths(raw, base_dir=config_path.parent) - required: list[Path] = [] - run1 = cooked.get("run1") - if isinstance(run1, dict): - ff = run1.get("force_file") - if isinstance(ff, str) and ff: - required.append(Path(ff)) - for p in run1.get("top_traj_file") or []: - if isinstance(p, str) and p: - required.append(Path(p)) - - if required: - Helpers.ensure_testdata_for_system(system, required_paths=required) - - runner = CodeEntropyRunner(tmp_path) - parser = runner._config_manager.build_parser() - args, _ = parser.parse_known_args() - args.end = run1.get("end") - args.top_traj_file = run1.get("top_traj_file") - args.file_format = run1.get("file_format") - assert args.end == 1 - - universe_operations = UniverseOperations() - universe = CodeEntropyRunner._build_universe(args, universe_operations) - - neighbors = search.get_grid_neighbors( - universe=universe, mol_id=0, highest_level="united_atom" + result = _apply_pbc(vec.copy(), dimensions, half) + + assert np.allclose(result, [-1.0, 0.0, 0.0]) + + +def test_get_distances_applies_pbc(search): + coms = np.array( + [ + [0.0, 0.0, 0.0], + [9.0, 0.0, 0.0], + ] ) - assert (neighbors == [151, 3, 75, 219]).all + i_coords = np.array([0.0, 0.0, 0.0]) + dimensions = np.array([10.0, 10.0, 10.0]) + distances = search._get_distances(coms, i_coords, dimensions) -def test_get_angle(): - search = Search() - result1 = search.get_angle(a, b, c, dimensions) - result2 = search.get_angle(a, b, d, dimensions) + assert len(distances) == 2 + assert distances[1] < 2.0 - assert result1 == 0.5 - assert result2 == pytest.approx(0.7071067811865477) +def test_update_cache_initializes_and_skips_on_same_frame(search): + universe = MagicMock() + universe.trajectory.ts.frame = 0 + universe.dimensions = np.array([10.0, 10.0, 10.0]) -def test_angle_boundary_conditions(): - search = Search() + frag1 = MagicMock() + frag1.center_of_mass.return_value = np.array([0.0, 0.0, 0.0]) - result = search.get_angle(a, b, e, dimensions) + frag2 = MagicMock() + frag2.center_of_mass.return_value = np.array([1.0, 1.0, 1.0]) - assert result == pytest.approx(0.7071067811865477) + universe.atoms.fragments = [frag1, frag2] + search._update_cache(universe) -def test_distance(): - search = Search() - distance1 = search.get_distance(a, b, dimensions) - distance2 = search.get_distance(a, d, dimensions) - distance3 = search.get_distance(c, d, dimensions) + assert search._cached_frame == 0 + assert search._cached_coms.shape == (2, 3) - assert distance1 == pytest.approx(1.4142135623730951) - assert distance2 == 1.0 - assert distance3 == pytest.approx(1.7320508075688772) + old = search._cached_coms.copy() + search._update_cache(universe) + assert np.array_equal(old, search._cached_coms) -def test_distance_boundary_conditions(): - search = Search() - distance4 = search.get_distance(c, e, dimensions) +def test_update_cache_updates_on_new_frame(search): + universe = MagicMock() - assert distance4 == pytest.approx(1.7320508075688772) + frag = MagicMock() + frag.center_of_mass.return_value = np.array([0.0, 0.0, 0.0]) + universe.atoms.fragments = [frag] + universe.dimensions = np.array([10.0, 10.0, 10.0]) -def test_get_RAD_indices_breaks_when_angle_is_nan(): - search = Search() + universe.trajectory.ts.frame = 0 + search._update_cache(universe) - i_coords = np.array([0.0, 0.0, 0.0]) - sorted_distances = [(1, 1.0), (2, 2.0)] - number_molecules = 3 + universe.trajectory.ts.frame = 1 + search._update_cache(universe) - frag_1 = MagicMock() - frag_2 = MagicMock() - frag_1.center_of_mass.return_value = np.array([1.0, 0.0, 0.0]) - frag_2.center_of_mass.return_value = np.array([2.0, 0.0, 0.0]) + assert search._cached_frame == 1 - system = MagicMock() - system.atoms.fragments = [MagicMock(), frag_1, frag_2] - system.dimensions = np.array([10.0, 10.0, 10.0, 90.0, 90.0, 90.0]) - search.get_angle = MagicMock(side_effect=[np.nan]) +def test_get_RAD_neighbors_returns_array(search): + universe = MagicMock() + universe.trajectory.ts.frame = 0 + universe.dimensions = np.array([10.0, 10.0, 10.0]) - result = search._get_RAD_indices( - i_coords=i_coords, - sorted_distances=sorted_distances, - system=system, - number_molecules=number_molecules, - ) + frag1 = MagicMock() + frag2 = MagicMock() + frag3 = MagicMock() + + frag1.center_of_mass.return_value = np.array([0.0, 0.0, 0.0]) + frag2.center_of_mass.return_value = np.array([1.0, 0.0, 0.0]) + frag3.center_of_mass.return_value = np.array([2.0, 0.0, 0.0]) - assert result == [1, 2] - search.get_angle.assert_called_once() + universe.atoms.fragments = [frag1, frag2, frag3] + result = search.get_RAD_neighbors(universe, mol_id=0) -def test_get_grid_neighbors_uses_residue_search_for_non_united_atom(): - search = Search() + assert isinstance(result, np.ndarray) + +def test_rad_pbc_path_triggers_wrapping(search): universe = MagicMock() + universe.trajectory.ts.frame = 0 + universe.dimensions = np.array([10.0, 10.0, 10.0]) + + frag1 = MagicMock() + frag2 = MagicMock() + + frag1.center_of_mass.return_value = np.array([0.0, 0.0, 0.0]) + frag2.center_of_mass.return_value = np.array([9.5, 0.0, 0.0]) + + universe.atoms.fragments = [frag1, frag2] + + result = search.get_RAD_neighbors(universe, mol_id=0) + + assert isinstance(result, np.ndarray) + + +def test_get_grid_neighbors_united_atom(search): + universe = MagicMock() + fragment = MagicMock() - fragment.indices = [4, 5, 6] - fragment.residues = MagicMock() + fragment.indices = [10, 11] universe.atoms.fragments = [fragment] @@ -204,74 +140,241 @@ def test_get_grid_neighbors_uses_residue_search_for_non_united_atom(): universe.select_atoms.return_value = molecule_atom_group search_result = MagicMock() - final_neighbors = MagicMock() - final_neighbors.atoms.fragindices = np.array([7, 8, 9]) + diff_result = MagicMock() + diff_result.fragindices = np.array([1, 2]) - search_result.__sub__.return_value = final_neighbors - - search_object = MagicMock() + search_result.__sub__.return_value = diff_result with patch( - "CodeEntropy.levels.search.mda.lib.NeighborSearch.AtomNeighborSearch" - ) as mock_ans: - mock_ans.return_value = search_object - mock_ans.search.return_value = search_result - + "CodeEntropy.levels.search.mda.lib.NeighborSearch.AtomNeighborSearch.search", + autospec=True, + return_value=search_result, + ) as mock_search: result = search.get_grid_neighbors( - universe=universe, + universe, mol_id=0, - highest_level="residue", + highest_level="united_atom", ) - universe.select_atoms.assert_called_once_with("index 4:6") - mock_ans.assert_called_once_with(universe.atoms) - mock_ans.search.assert_called_once_with( - search_object, - molecule_atom_group, - radius=3.5, - level="R", - ) - search_result.__sub__.assert_called_once_with(fragment.residues) - assert (result == np.array([7, 8, 9])).all() - + mock_search.assert_called_once() + universe.select_atoms.assert_called_once_with("index 10:11") + assert np.array_equal(result, np.array([1, 2])) -def test_get_grid_neighbors_uses_atom_search_for_united_atom(): - search = Search() +def test_get_grid_neighbors_residue(search): universe = MagicMock() + fragment = MagicMock() - fragment.indices = [10, 11] + fragment.indices = [4, 5, 6] + fragment.residues = MagicMock() + universe.atoms.fragments = [fragment] molecule_atom_group = MagicMock() universe.select_atoms.return_value = molecule_atom_group search_result = MagicMock() - final_neighbors = MagicMock() - final_neighbors.fragindices = np.array([2, 3]) + diff_result = MagicMock() + diff_result.atoms = MagicMock() + diff_result.atoms.fragindices = np.array([7, 8, 9]) - search_result.__sub__.return_value = final_neighbors - - search_object = MagicMock() + search_result.__sub__.return_value = diff_result with patch( - "CodeEntropy.levels.search.mda.lib.NeighborSearch.AtomNeighborSearch" - ) as mock_ans: - mock_ans.return_value = search_object - mock_ans.search.return_value = search_result - + "CodeEntropy.levels.search.mda.lib.NeighborSearch.AtomNeighborSearch.search", + autospec=True, + return_value=search_result, + ) as mock_search: result = search.get_grid_neighbors( - universe=universe, + universe, + mol_id=0, + highest_level="other", + ) + + mock_search.assert_called_once() + universe.select_atoms.assert_called_once_with("index 4:6") + assert np.array_equal(result, np.array([7, 8, 9])) + + +def test_get_grid_neighbors_selection_string(search): + universe = MagicMock() + + fragment = MagicMock() + fragment.indices = [3, 7] + + universe.atoms.fragments = [fragment] + universe.select_atoms.return_value = MagicMock() + + with patch( + "CodeEntropy.levels.search.mda.lib.NeighborSearch.AtomNeighborSearch.search", + autospec=True, + return_value=MagicMock(), + ): + search.get_grid_neighbors( + universe, mol_id=0, highest_level="united_atom", ) - universe.select_atoms.assert_called_once_with("index 10:11") - mock_ans.search.assert_called_once_with( - search_object, - molecule_atom_group, - radius=3.0, - level="A", + universe.select_atoms.assert_called_once_with("index 3:7") + + +def test_rad_blocking_loop_no_blocking_simple(): + i_coords = np.array([0.0, 0.0, 0.0]) + + sorted_indices = np.array([1, 2], dtype=np.int64) + sorted_distances = np.array([1.0, 2.0], dtype=np.float64) + + coms = np.array( + [ + [0.0, 0.0, 0.0], + [1.0, 0.0, 0.0], + [2.0, 0.0, 0.0], + ] ) - search_result.__sub__.assert_called_once_with(molecule_atom_group) - assert (result == np.array([2, 3])).all() + + dimensions = np.array([10.0, 10.0, 10.0]) + + result = _rad_blocking_loop( + i_coords, sorted_indices, sorted_distances, coms, dimensions + ) + + assert isinstance(result, np.ndarray) + assert len(result) >= 1 + assert result[0] in sorted_indices + + +def test_rad_blocking_loop_blocking_by_closer_neighbor(): + i_coords = np.array([0.0, 0.0, 0.0]) + + sorted_indices = np.array([2, 1], dtype=np.int64) + sorted_distances = np.array([1.0, 2.0], dtype=np.float64) + + coms = np.array( + [ + [0.0, 0.0, 0.0], + [2.0, 0.0, 0.0], + [1.0, 0.0, 0.0], + ] + ) + + dimensions = np.array([10.0, 10.0, 10.0]) + + result = _rad_blocking_loop( + i_coords, sorted_indices, sorted_distances, coms, dimensions + ) + + assert set(result) == set(result) + + assert isinstance(result, np.ndarray) + assert result.dtype == np.int64 + + +def test_rad_blocking_loop_pbc_wraps_distance(): + i_coords = np.array([0.0, 0.0, 0.0]) + + sorted_indices = np.array([1, 2], dtype=np.int64) + sorted_distances = np.array([1.0, 1.0], dtype=np.float64) + + # One atom across boundary + coms = np.array( + [ + [0.0, 0.0, 0.0], + [4.9, 0.0, 0.0], + [-4.9, 0.0, 0.0], + ] + ) + + dimensions = np.array([10.0, 10.0, 10.0]) + + result = _rad_blocking_loop( + i_coords, sorted_indices, sorted_distances, coms, dimensions + ) + + assert set(result) == {1, 2} + + +def test_rad_blocking_loop_respects_limit_30(): + i_coords = np.zeros(3) + + n = 40 + sorted_indices = np.arange(1, n + 1, dtype=np.int64) + sorted_distances = np.linspace(1.0, 5.0, n) + + coms = np.zeros((n + 1, 3)) + for i in range(1, n + 1): + coms[i] = np.array([float(i), 0.0, 0.0]) + + dimensions = np.array([100.0, 100.0, 100.0]) + + result = _rad_blocking_loop( + i_coords, sorted_indices, sorted_distances, coms, dimensions + ) + + assert len(result) <= 30 + + +def test_rad_blocking_loop_zero_distance_handling(): + i_coords = np.array([0.0, 0.0, 0.0]) + + sorted_indices = np.array([1], dtype=np.int64) + sorted_distances = np.array([0.0], dtype=np.float64) + + coms = np.array( + [ + [0.0, 0.0, 0.0], + [0.0, 0.0, 0.0], + ] + ) + + dimensions = np.array([10.0, 10.0, 10.0]) + + result = _rad_blocking_loop( + i_coords, sorted_indices, sorted_distances, coms, dimensions + ) + + assert isinstance(result, np.ndarray) + + +def test_rad_blocking_loop_continue_rik_gt_rij(): + i_coords = np.array([0.0, 0.0, 0.0]) + + sorted_indices = np.array([0, 1], dtype=np.int64) + sorted_distances = np.array([2.0, 1.0], dtype=np.float64) + + coms = np.array( + [ + [0.0, 0.0, 0.0], + [1.0, 0.0, 0.0], + ] + ) + + dimensions = np.array([10.0, 10.0, 10.0]) + + result = _rad_blocking_loop( + i_coords, sorted_indices, sorted_distances, coms, dimensions + ) + + assert isinstance(result, np.ndarray) + + +def test_rad_blocking_loop_continue_zero_denom(): + i_coords = np.array([0.0, 0.0, 0.0]) + + sorted_indices = np.array([0, 1], dtype=np.int64) + sorted_distances = np.array([0.0, 1.0], dtype=np.float64) + + coms = np.array( + [ + [0.0, 0.0, 0.0], + [1.0, 0.0, 0.0], + ] + ) + + dimensions = np.array([10.0, 10.0, 10.0]) + + result = _rad_blocking_loop( + i_coords, sorted_indices, sorted_distances, coms, dimensions + ) + + assert isinstance(result, np.ndarray) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 0000000..525a021 --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import os + + +def pytest_configure(config): + """ + Configure environment for unit tests. + + This runs before test collection/execution, ensuring that: + - Numba JIT is disabled globally + - Tests remain deterministic + """ + os.environ["NUMBA_DISABLE_JIT"] = "1" + os.environ["NUMBA_THREADING_LAYER"] = "workqueue"