diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index e647032..5066ba1 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -124,6 +124,8 @@ Top level Name Type Default Description =================== ======= ========= ========================================================== ``default_cocos`` int 3 COCOS convention assumed when the input does not specify one. +``debug_plots`` bool false Draw (blocking) matplotlib debug plots when the equilibrium + initialization fails. Keep disabled in headless environments. =================== ======= ========= ========================================================== ``[grid]`` — default computational grids @@ -267,3 +269,34 @@ Old name New name 700/1200"); the new fields default to 700/1200 directly. Unprefixed environment variables (e.g. ``NPSI_GRID``) are no longer read — use the ``PLEQUE_`` prefix. + + +.. _logging: + +Logging +------- + +PLEQUE logs through the standard :mod:`logging` machinery under the +``pleque`` logger hierarchy (one logger per module, e.g. +``pleque.core.equilibrium``). As a library, PLEQUE does not configure any +handler by default; to see the messages either use the convenience helper + +.. code-block:: python + + import logging + import pleque + + pleque.set_log_level(logging.DEBUG) # or logging.INFO + +or configure the logger yourself: + +.. code-block:: python + + import logging + + logging.basicConfig() + logging.getLogger("pleque").setLevel(logging.INFO) + +The ``verbose=True`` argument of :class:`pleque.Equilibrium` is kept for +backward compatibility and simply calls +``pleque.set_log_level(logging.DEBUG)``. diff --git a/docs/source/index.rst b/docs/source/index.rst index 72b52ab..c42f230 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -12,6 +12,7 @@ PLEQUE is a code which allows easy and quick access to tokamak equilibria obtain first_steps examples + initialization coordinates flux_expansion naming_convention diff --git a/docs/source/initialization.rst b/docs/source/initialization.rst new file mode 100644 index 0000000..659fd48 --- /dev/null +++ b/docs/source/initialization.rst @@ -0,0 +1,75 @@ +Equilibrium initialisation +========================== + +The central object of PLEQUE is the :class:`pleque.Equilibrium` class. It is +usually constructed by one of the readers in :mod:`pleque.io` (e.g. +:func:`pleque.io.readers.read_geqdsk`), which build an ``xarray.Dataset`` with +the poloidal flux function :math:`\psi(R, Z)` on a rectangular grid and 1D +profiles of the toroidal field function :math:`F` and pressure :math:`p` (or +their :math:`\psi`-derivatives ``FFprime`` and ``pprime``) as functions of +:math:`\psi_\mathrm{N}`. + +Initialisation sequence +----------------------- + +#. **Input parsing.** Spatial data, profiles and optional hints are read from + the dataset and constructor arguments. If no first wall is provided, an + artificial rectangular wall slightly inset with respect to the + :math:`\psi`-grid is synthesized (``grid.synthetic_wall_margin`` and + ``grid.synthetic_wall_points_per_side`` settings). +#. **2D spline construction.** :math:`\psi(R, Z)` is interpolated by a + bivariate spline (``splines.psi_order`` / ``splines.psi_smooth`` settings). +#. **Critical points.** Candidate extremes of :math:`|\nabla\psi|^2` are + located on a regular grid and classified by the determinant of the Hessian + of :math:`\psi` into O-points (local extrema) and X-points (saddle points). + The magnetic axis and the relevant X-points are recognized among the + candidates, and the plasma configuration (limiter vs. diverted) is + determined together with the limiter point and :math:`\psi` on the last + closed flux surface. +#. **Plasma boundary.** Strike points are found as intersections of the + LCFS-:math:`\psi` contour with the first wall. The LCFS contour is located + by the marching-squares algorithm on a search grid and refined by an + iterative downhill (gradient-step) method until the relative :math:`\psi` + error drops below ``lcfs.refinement_tolerance``; for diverted plasmas the + X-point is inserted into the contour. Poloidal field-line tracing of the + boundary is *not* used during the initialisation; it is available + separately via :meth:`pleque.Equilibrium.lcfs_field_line`. +#. **1D profile splines.** If derivatives (``pprime``, ``FFprime``) are + provided — the preferred input, as it avoids differentiating noisy data — + the :math:`p` and :math:`F` profiles are obtained by integration (this + requires :math:`\psi_\mathrm{axis}` and :math:`\psi_\mathrm{LCFS}`, which + is why this step runs after the critical-point search). Otherwise the + derivatives are computed from the provided profiles. If neither pressure + nor :math:`F` is available, a vacuum equilibrium (:math:`p = F = 0`) is + constructed. +#. **Midplane mapping.** A 1D spline mapping the outer-midplane radius to + :math:`\psi` is built. + +Hints and ``init_method`` +------------------------- + +The optional arguments ``mg_axis``, ``psi_lcfs``, ``x_points`` and +``strike_points`` act as *hints*. Each may alternatively be provided as a +dataset variable of the same name; an explicitly passed argument takes +precedence (with a logged warning). The ``init_method`` argument controls how +the hints are used: + +``"full"`` + All hints are ignored; the module recognizes all critical points itself. +``"hints"`` (default) + The hints assist the recognition of the magnetic axis and X-points. +``"fast_forward"`` + The ``mg_axis`` and ``x_points`` hints are taken as final and the + critical-point search is skipped; ``psi_lcfs`` and ``strike_points`` + hints are likewise used as final when given. If the required hints are + missing, the initialisation falls back to ``"hints"`` with a logged + warning. + +Diagnostics +----------- + +PLEQUE logs through the standard :mod:`logging` machinery under the +``pleque`` logger; see :ref:`logging` for how to enable it. If the +initialisation fails, the exception is logged and re-raised; enabling the +``debug_plots`` setting (e.g. ``PLEQUE_DEBUG_PLOTS=1``) additionally draws a +debug plot of the partially initialised equilibrium. diff --git a/pleque/__init__.py b/pleque/__init__.py index f63e97d..f3b55cb 100644 --- a/pleque/__init__.py +++ b/pleque/__init__.py @@ -1,3 +1,33 @@ +import logging + __version__ = '0.0.10' -from pleque.core import * +logging.getLogger(__name__).addHandler(logging.NullHandler()) + +_LOG_HANDLER_ATTR = "_pleque_handler" + + +def set_log_level(level=logging.INFO): + """ + Convenience helper to make PLEQUE log messages visible. + + Sets the level of the ``pleque`` logger and attaches a single + :class:`logging.StreamHandler` to it (idempotent — repeated calls reuse + the same handler). For full control configure the ``pleque`` logger + directly with the standard :mod:`logging` machinery instead. + + :param level: logging level, e.g. ``logging.DEBUG`` (default ``logging.INFO``) + """ + logger = logging.getLogger(__name__) + logger.setLevel(level) + + handler = next((h for h in logger.handlers if getattr(h, _LOG_HANDLER_ATTR, False)), None) + if handler is None: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s: %(message)s")) + setattr(handler, _LOG_HANDLER_ATTR, True) + logger.addHandler(handler) + handler.setLevel(level) + + +from pleque.core import * # noqa: E402 diff --git a/pleque/config/settings.py b/pleque/config/settings.py index 7604349..157d55d 100644 --- a/pleque/config/settings.py +++ b/pleque/config/settings.py @@ -321,6 +321,11 @@ class Settings(BaseSettings): ) default_cocos: int = Field(3, description="COCOS convention assumed when the input does not specify one.") + debug_plots: bool = Field( + False, + description="Draw (blocking) matplotlib debug plots when the equilibrium initialization fails. " + "Keep disabled in headless environments; enable e.g. with PLEQUE_DEBUG_PLOTS=1.", + ) grid: GridSettings = Field(default_factory=GridSettings) splines: SplineSettings = Field(default_factory=SplineSettings) critical_points: CriticalPointSettings = Field(default_factory=CriticalPointSettings) diff --git a/pleque/core/coordinates.py b/pleque/core/coordinates.py index 6bc80ea..0a42510 100644 --- a/pleque/core/coordinates.py +++ b/pleque/core/coordinates.py @@ -1,3 +1,4 @@ +import logging import warnings from collections.abc import Iterable from typing import ClassVar, Union @@ -10,6 +11,8 @@ from .cocos import cocos_coefs +logger = logging.getLogger(__name__) + #: Canonical description of the coordinate input syntax accepted across PLEQUE. #: It is appended (via :func:`pleque.utils.decorators.append_to_doc`) to the #: docstrings of :meth:`Coordinates.from_coords` and @@ -244,8 +247,8 @@ def _init_fast_path(self, coordinates, coord_type, coords): self.x3 = self._x3_input = xs[2] if self.grid and n != 2: - print('WARNING: grid == True is not allowed for dim != 2 (yet).' - 'Turning grid = False.') + logger.warning('grid == True is not allowed for dim != 2 (yet).' + 'Turning grid = False.') self.grid = False return True @@ -723,8 +726,8 @@ def _evaluate_input(self, *coordinates, coord_type=None, **coords): xy = coordinates[0] if self.grid: - print('WARNING: grid == True is not allowed for this coordinates input. ' - 'Turning grid = False.') + logger.warning('grid == True is not allowed for this coordinates input. ' + 'Turning grid = False.') self.grid = False if isinstance(xy, Iterable): @@ -779,8 +782,8 @@ def _evaluate_input(self, *coordinates, coord_type=None, **coords): self._convert_to_default_coord_type() if self.dim != 2 and self.grid: - print('WARNING: grid == True is not allowed for dim != 2 (yet).' - 'Turning grid = False.') + logger.warning('grid == True is not allowed for dim != 2 (yet).' + 'Turning grid = False.') self.grid = False def _verify_coord_type(self, coord_type): @@ -796,10 +799,8 @@ def _verify_coord_type(self, coord_type): ret_coord_type = tuple(coord_type) else: ret_coord_type = ('psi_n',) - print("WARNING: _coord_type_input is not correct. \n" - f"{tuple(coord_type)} is not allowed \n" - "Force set _coord_type_input = ('psi_n',)" - ) + logger.warning("_coord_type_input is not correct. %s is not allowed. " + "Force set _coord_type_input = ('psi_n',)", tuple(coord_type)) elif self.dim == 2: if coord_type is None: ret_coord_type = ('R', 'Z') @@ -809,10 +810,8 @@ def _verify_coord_type(self, coord_type): ret_coord_type = tuple(coord_type[::-1]) else: ret_coord_type = ('R', 'Z') - print("WARNING: _coord_type_input is not correct. \n" - f"{tuple(coord_type)} is not allowed \n" - "Force set _coord_type_input = ('R', 'Z')" - ) + logger.warning("_coord_type_input is not correct. %s is not allowed. " + "Force set _coord_type_input = ('R', 'Z')", tuple(coord_type)) elif self.dim == 3: if coord_type is None: ret_coord_type = ('R', 'Z', 'phi') @@ -820,10 +819,8 @@ def _verify_coord_type(self, coord_type): ret_coord_type = tuple(coord_type) else: ret_coord_type = ('R', 'Z', 'phi') - print("WARNING: _coord_type_input is not correct. \n" - f"{tuple(coord_type)} is not allowed \n" - "Force set _coord_type_input = ('R', 'Z', 'phi')" - ) + logger.warning("_coord_type_input is not correct. %s is not allowed. " + "Force set _coord_type_input = ('R', 'Z', 'phi')", tuple(coord_type)) else: raise ValueError(f'Operation in {self.dim} space has not be en allowed yet. Sorry.' diff --git a/pleque/core/equilibrium.py b/pleque/core/equilibrium.py index 70ca9b2..4e7b774 100644 --- a/pleque/core/equilibrium.py +++ b/pleque/core/equilibrium.py @@ -1,4 +1,5 @@ import copy +import logging from collections.abc import Sequence import numpy as np @@ -24,10 +25,49 @@ from pleque.utils.surfaces import track_plasma_boundary from pleque.utils.tools import arglis, xp_sections +logger = logging.getLogger(__name__) + class Equilibrium: """ - Equilibrium class ... + Tokamak plasma equilibrium based on the poloidal flux function psi(R, Z). + + The equilibrium is initialised from an `xarray.Dataset` with psi on a + rectangular (R, Z) grid and 1D profiles of the toroidal field function F + and pressure (or their psi-derivatives) as functions of psi_n. The + initialisation proceeds in the following steps: + + 1. *Input parsing.* Spatial data, profiles and optional hints are read from + the dataset and constructor arguments. If no first wall is given, an + artificial rectangular wall slightly inset with respect to the psi grid + is synthesized. + 2. *2D spline construction.* psi(R, Z) is interpolated by a + `RectBivariateSpline` (order and smoothing from the PLEQUE settings). + 3. *Critical points.* Candidate extremes of |grad psi|^2 are located on a + regular grid and classified by the determinant of the Hessian of psi + into O-points (det > 0) and X-points (det < 0); the magnetic axis and + the relevant X-points are then recognized among the candidates and the + plasma configuration (limiter vs. X-point/diverted) is determined, + including the limiter point and psi on the last closed flux surface. + 4. *Plasma boundary.* Strike points are found as intersections of the + LCFS-psi contour with the first wall. The LCFS contour itself is found + by the marching-squares algorithm on a search grid and refined by an + iterative downhill (gradient-step) method until the relative psi error + drops below the `lcfs.refinement_tolerance` setting; for diverted + plasmas the X-point is inserted into the contour. (Note: poloidal + field-line tracing of the boundary is *not* used during the + initialisation; it is available separately via `lcfs_field_line`.) + 5. *1D profile splines.* If derivatives (pprime, FFprime) are provided, + the p and F profiles are obtained by integration — this requires + psi_axis and psi_lcfs, which is why this step runs *after* the + critical-point search. Otherwise the derivatives are computed from the + provided p and F profiles. If neither pressure nor F is available a + vacuum equilibrium (p = F = 0) is constructed. + 6. *Midplane mapping.* A 1D spline mapping the outer-midplane radius to + psi is built. + + Known limitations: COCOS handling is not complete (`_Bpol_sign` is not yet + resolved from the input convention). """ @staticmethod @@ -60,34 +100,51 @@ def __init__(self, Equilibrium class instance should be obtained generally by functions in pleque.io package. - Optional arguments may help the initialization. + The hint arguments (`mg_axis`, `psi_lcfs`, `x_points`, `strike_points`) can be used + to speed up the initialisation or to make it more robust; each of them may + alternatively be provided as a `basedata` variable of the same name (an explicitly + passed argument takes precedence, with a logged warning). How the hints are used is + controlled by `init_method`. + + If the initialisation fails, the exception is logged (and re-raised); set the + `debug_plots` PLEQUE setting (e.g. ``PLEQUE_DEBUG_PLOTS=1``) to also draw a debug + plot of the partially initialised equilibrium. - :param basedata: xarray.Dataset with psi(R, Z) on a rectangular R, Z grid, f(psi_norm), p(psi_norm) - f = B_tor * R + :param basedata: xarray.Dataset with psi(R, Z) on a rectangular R, Z grid, F(psi_n), p(psi_n) + (or their derivatives FFprime(psi_n), pprime(psi_n)); F = B_tor * R :param first_wall: array-like (Nwall, 2) required for initialization in case of limiter configuration. - :param mg_axis: suspected position of the o-point - :param psi_lcfs: - :param x_points: - :param strike_points: - :param init_method: str On of ("full", "hints", "fast_forward"). - If "full" no hints are taken and module tries to recognize all critical points itself. - If "hints" module use given optional arguments as a help with initialization. - If "fast-forward" module use given optional arguments as final and doesn't try to correct. - *Note:* Only "hints" method is currently tested. + If not given (nor present in basedata), an artificial rectangular wall slightly + inset with respect to the psi grid is synthesized. + :param mg_axis: array-like (2) suspected (R, Z) position of the magnetic axis + :param psi_lcfs: float, suspected value of psi on the last closed flux surface + :param x_points: array-like (n, 2) suspected (R, Z) positions of the X-points + :param strike_points: array-like (n, 2) suspected (R, Z) positions of the strike points + :param init_method: str One of ("full", "hints", "fast_forward"): + "full" — all hints are ignored and the module recognizes all critical + points itself; + "hints" (default) — the given hints are used to assist the recognition; + "fast_forward" — the `mg_axis` and `x_points` hints are taken as final + and the critical-point search is skipped (`psi_lcfs` and `strike_points` + hints are likewise used as final when given); if the required hints are + missing, the method falls back to "hints" with a logged warning. :param spline_order: Order of the 2D psi spline. Defaults to the value from PLEQUE settings. :param spline_smooth: Smoothing factor of the 2D psi spline. Defaults to the value from PLEQUE settings. :param find_extremes_order: Define number of points on internal grid used for identifying magnetic axis and x-points. Defaults to the value from PLEQUE settings. :param cocos: At the moment module assume cocos to be 3 (no other option). The implemetnation is not fully working. Be aware of signs in the module! - :param verbose: - + :param verbose: Superseded by logging: `verbose=True` calls + ``pleque.set_log_level(logging.DEBUG)`` (process-global). Prefer configuring + the ``pleque`` logger directly. """ if verbose: - print('---------------------------------') - print('Equilibrium module initialization') - print('---------------------------------') + # Backward-compatible alias for configuring the `pleque` logger. + from pleque import set_log_level + + set_log_level(logging.DEBUG) + + logger.debug('Equilibrium module initialization') cfg = get_settings() if spline_order is None: @@ -105,23 +162,23 @@ def __init__(self, else: cocos = cfg.default_cocos + if init_method == "fast": + init_method = "fast_forward" + if init_method not in ("full", "hints", "fast_forward"): + raise ValueError(f"Unknown init_method {init_method!r}; expected one of ('full', 'hints', 'fast_forward').") + self._basedata = basedata self._verbose = verbose - self._mg_axis = mg_axis - self._psi_lcfs = psi_lcfs - self._x_points = x_points - self._strike_points = strike_points + self._load_hints(basedata, mg_axis, psi_lcfs, x_points, strike_points) self._spline_order = spline_order - # TODO TODO TODO self._init_method = init_method self._cocos = cocos self._cocosdic = cc.cocos_coefs(cocos) - # todo: resolve this from input (for COCOS time) TODO TODO TODO + # todo: resolve this from the input COCOS convention self._Bpol_sign = 1 - if self._verbose: - print(f"Equilibrium initialized with cocos={self._cocos} and init_method={self._init_method}") + logger.debug("Equilibrium initialized with cocos=%s and init_method=%s", self._cocos, self._init_method) self._flux_surfaces = None @@ -129,85 +186,93 @@ def __init__(self, r, z, psi = self._load_spatial_data(basedata, first_wall) psi_n, pressure, pprime, F, FFprime = self._load_profiles(basedata) - if verbose: - print('--- Generate 2D spline ---') + logger.debug('Generating 2D spline') self._setup_psi_spline(r, z, psi, spline_order, spline_smooth) - if verbose: - print('--- Looking for critical points ---') + logger.debug('Looking for critical points') self._find_critical_points(r, z, find_extremes_order) - if verbose: - print('--- Recognizing equilibrium type ---') + logger.debug('Recognizing equilibrium type') self._setup_plasma_boundary() - if verbose: - print('--- Generate 1D splines ---') + logger.debug('Generating 1D splines') self._setup_1d_profiles(psi_n, F, FFprime, pressure, pprime) - if verbose: - print('--- Mapping midplane to psi_n ---') + logger.debug('Mapping midplane to psi_n') self._map_midplane2psi() - except: + except Exception: + logger.exception("Equilibrium initialization failed.") - import matplotlib.pyplot as plt + if get_settings().debug_plots: + try: + import matplotlib.pyplot as plt - from pleque.utils.plotting import _plot_debug + from pleque.utils.plotting import _plot_debug - plt.figure() - _plot_debug(self) - plt.show() + plt.figure() + _plot_debug(self) + plt.show() + except Exception: + logger.exception("Debug plot of the failed initialization could not be drawn.") raise + def _load_hints(self, basedata: xarray.Dataset, mg_axis, psi_lcfs, x_points, strike_points): + """ + Resolve initialization hints from constructor arguments with fallback to `basedata` variables. + + An explicitly passed argument takes precedence over a `basedata` variable of the same + name; a warning is logged when both are given. + """ + + def resolve(name, value): + if value is not None: + if name in basedata: + logger.warning("%s specified both in basedata and as an argument. Using the argument value.", name) + return value + if name in basedata: + return basedata[name].values + return None + + self._mg_axis = resolve('mg_axis', mg_axis) + self._psi_lcfs = resolve('psi_lcfs', psi_lcfs) + self._x_points = resolve('x_points', x_points) + self._strike_points = resolve('strike_points', strike_points) + + def _resolve_first_wall(self, basedata: xarray.Dataset, first_wall, r, z): + """ + Resolve the first wall from the argument, `basedata` variables, or synthesize one. + + NaN rows are dropped from the result. + """ + if first_wall is not None: + logger.debug("First wall taken from the constructor argument.") + elif 'first_wall' in basedata: + first_wall = basedata["first_wall"].values + logger.debug("First wall taken from the 'first_wall' basedata variable.") + elif 'R_first_wall' in basedata and 'Z_first_wall' in basedata: + first_wall = np.array([basedata.R_first_wall.values, + basedata.Z_first_wall.values]).T + logger.debug("First wall taken from the 'R_first_wall'/'Z_first_wall' basedata variables.") + elif 'r_lim' in basedata and 'z_lim' in basedata: + first_wall = np.array([basedata.r_lim.values, + basedata.z_lim.values]).T + logger.debug("First wall taken from the 'r_lim'/'z_lim' basedata variables.") + else: + first_wall = eq_tools.synthesize_rectangular_wall(r, z) + logger.info("No first wall given; a rectangular wall around the psi grid was synthesized.") + + first_wall = np.asarray(first_wall) + return first_wall[~np.isnan(first_wall).any(axis=1)] + def _load_spatial_data(self, basedata: xarray.Dataset, first_wall) -> tuple: """Load psi grid, first wall, and spatial metadata from dataset.""" r = basedata.R.values z = basedata.Z.values psi = basedata.psi.transpose('R', 'Z').values - if first_wall is None: - if 'first_wall' in basedata: - self._first_wall = basedata["first_wall"].values - elif 'R_first_wall' in basedata and 'Z_first_wall' in basedata: - self._first_wall = np.array([basedata.R_first_wall.values, - basedata.Z_first_wall.values]).T - elif 'r_lim' in basedata and 'z_lim' in basedata: - self._first_wall = np.array([basedata.r_lim.values, - basedata.z_lim.values]).T - else: - rwall_min = np.min(r) - rwall_max = np.max(r) - zwall_min = np.min(z) - zwall_max = np.max(z) - - dr = rwall_max - rwall_min - dz = zwall_max - zwall_min - - # todo: remove this if possible - # lets reduce the wall a bit to be have some plasma behind the wall - wall_cfg = get_settings().grid - rwall_min += dr * wall_cfg.synthetic_wall_margin - rwall_max -= dr * wall_cfg.synthetic_wall_margin - zwall_min += dz * wall_cfg.synthetic_wall_margin - zwall_max -= dz * wall_cfg.synthetic_wall_margin - - corners = np.array( - [[rwall_min, zwall_max], [rwall_max, zwall_max], [rwall_max, zwall_min], - [rwall_min, zwall_min]]) - newwall_r = [] - newwall_z = [] - for i in range(-1, 3): - rs = np.linspace(corners[i, 0], corners[i + 1, 0], wall_cfg.synthetic_wall_points_per_side) - zs = np.linspace(corners[i, 1], corners[i + 1, 1], wall_cfg.synthetic_wall_points_per_side) - newwall_r += list(rs) - newwall_z += list(zs) - self._first_wall = np.stack((newwall_r, newwall_z)).T - else: - self._first_wall = first_wall - - self._first_wall = self._first_wall[~np.isnan(self._first_wall).any(axis=1)] + self._first_wall = self._resolve_first_wall(basedata, first_wall, r, z) if 'time' in basedata: self.time = basedata['time'].values @@ -276,35 +341,61 @@ def _setup_psi_spline(self, r, z, psi, spline_order: int, spline_smooth: float): s=spline_smooth) def _find_critical_points(self, r, z, find_extremes_order: int): - """Find and classify all critical points; set plasma type and psi at LCFS.""" - x_points, o_points = eq_tools.find_extremes(r, z, self._spl_psi, order=find_extremes_order) - - # TODO: Raise warning if no o_point was found! + """ + Find and classify all critical points; set plasma type and psi at LCFS. + + Hint usage depends on `init_method`: "full" ignores all hints, "hints" + (default) uses them to assist the recognition, and "fast_forward" takes + the magnetic-axis and X-point hints as final, skipping the critical-point + search entirely. + """ + use_hints = self._init_method in ("hints", "fast_forward") + mg_axis_hint = self._mg_axis if use_hints else None + psi_lcfs_hint = self._psi_lcfs if use_hints else None + x_points_hint = self._x_points if use_hints else None + + trust_hints = self._init_method == "fast_forward" + if trust_hints and (mg_axis_hint is None or x_points_hint is None): + logger.warning("init_method='fast_forward' requires mg_axis and x_points hints; " + "falling back to 'hints' behaviour.") + trust_hints = False + + if trust_hints: + self._mg_axis = np.asarray(mg_axis_hint, dtype=float) + self._psi_axis = np.asarray(self._spl_psi(self._mg_axis[0], self._mg_axis[1], grid=False)).item() + self._o_points = self._mg_axis[np.newaxis, :] + + self._x_points = np.asarray(x_points_hint, dtype=float).reshape(-1, 2) + xp1 = self._x_points[0] if len(self._x_points) > 0 else None + self._x_point = xp1 + self._x_point2 = self._x_points[1] if len(self._x_points) > 1 else None + self._psi_xp = self._spl_psi(*xp1, grid=False) if xp1 is not None else None + else: + x_points, o_points = eq_tools.find_extremes(r, z, self._spl_psi, order=find_extremes_order) - r_lim = (self.R_min, self.R_max) - z_lim = (self.Z_min, self.Z_max) + r_lim = (self.R_min, self.R_max) + z_lim = (self.Z_min, self.Z_max) - self._mg_axis, sortidx = eq_tools.recognize_mg_axis(o_points, self._spl_psi, r_lim, z_lim, - first_wall=self._first_wall, - mg_axis_candidate=self._mg_axis) - self._psi_axis = np.asarray(self._spl_psi(self._mg_axis[0], self._mg_axis[1], grid=False)).item() - self._o_points = o_points[sortidx] - self._o_points[0] = self._mg_axis + self._mg_axis, sortidx = eq_tools.recognize_mg_axis(o_points, self._spl_psi, r_lim, z_lim, + first_wall=self._first_wall, + mg_axis_candidate=mg_axis_hint) + self._psi_axis = np.asarray(self._spl_psi(self._mg_axis[0], self._mg_axis[1], grid=False)).item() + self._o_points = o_points[sortidx] + self._o_points[0] = self._mg_axis - # todo: use these two x-points in the future - (xp1, xp2), sortidx = eq_tools.recognize_x_points(x_points, self._mg_axis, self._psi_axis, - self._spl_psi, - r_lim, z_lim, self._psi_lcfs, self._x_points) + (xp1, xp2), sortidx = eq_tools.recognize_x_points(x_points, self._mg_axis, self._psi_axis, + self._spl_psi, + r_lim, z_lim, psi_lcfs_hint, x_points_hint) - self._x_point = xp1 - self._x_point2 = xp2 - self._psi_xp = self._spl_psi(*xp1, grid=False) if xp1 is not None else None + self._x_point = xp1 + self._x_point2 = xp2 + self._psi_xp = self._spl_psi(*xp1, grid=False) if xp1 is not None else None - self._x_points = x_points[sortidx] - if xp1 is not None: - self._x_points[0] = xp1 - if xp2 is not None: - self._x_points[1] = xp2 + self._x_points = x_points[sortidx] + if xp1 is not None: + self._x_points[0] = xp1 + if xp2 is not None: + self._x_points[1] = xp2 limiter_plasma, limiter_point = eq_tools.recognize_plasma_type(self._x_point, self._first_wall, self._mg_axis, self._psi_axis, @@ -312,10 +403,12 @@ def _find_critical_points(self, r, z, find_extremes_order: int): self._limiter_plasma = limiter_plasma self._limiter_point = limiter_point - if self._verbose: - print(">> Limiter plasma found." if limiter_plasma else ">> X-point plasma found.") + logger.info("Limiter plasma found." if limiter_plasma else "X-point plasma found.") - self._psi_lcfs = self._spl_psi(*limiter_point, grid=False) + if trust_hints and psi_lcfs_hint is not None: + self._psi_lcfs = float(np.asarray(psi_lcfs_hint).item()) + else: + self._psi_lcfs = self._spl_psi(*limiter_point, grid=False) def _setup_plasma_boundary(self): """Find LCFS contour and strike points using the recognized plasma type.""" @@ -330,14 +423,15 @@ def _setup_plasma_boundary(self): self._contact_point = self._limiter_point else: self._contact_point = None - if len(self._first_wall) < 4: + if self._init_method == "fast_forward" and self._strike_points is not None: + self._strike_points = np.asarray(self._strike_points, dtype=float).reshape(-1, 2) + elif len(self._first_wall) < 4: self._strike_points = None else: self._strike_points = eq_tools.find_strike_points(self._spl_psi, rs, zs, self._psi_lcfs, self._first_wall) - if self._verbose: - print("--- Looking for LCFS: ---") + logger.debug("Looking for LCFS") # sometimes this close_lcfs is empty - investigate! close_lcfs = eq_tools.find_close_lcfs(self._psi_lcfs, rs, zs, self._spl_psi, @@ -346,8 +440,8 @@ def _setup_plasma_boundary(self): while surf.fluxsurf_error(self._spl_psi, close_lcfs, self._psi_lcfs) > lcfs_cfg.refinement_tolerance: close_lcfs = eq_tools.find_surface_step(self._spl_psi, self._psi_lcfs, close_lcfs) - if self._verbose: - print(f"Relative LCFS error: {surf.fluxsurf_error(self._spl_psi, close_lcfs, self._psi_lcfs)}") + if logger.isEnabledFor(logging.INFO): + logger.info("Relative LCFS error: %s", surf.fluxsurf_error(self._spl_psi, close_lcfs, self._psi_lcfs)) if not self._limiter_plasma: close_lcfs = surf.add_xpoint(self._x_point, close_lcfs, self._mg_axis) @@ -364,7 +458,10 @@ def _setup_1d_profiles(self, psi_n, F, FFprime, pressure, pprime): Fprime = None if FFprime is not None: F = eq_tools.ffprime2f(FFprime, self._psi_axis, self._psi_lcfs, self.F0) - Fprime = FFprime / F + F_arr = np.asarray(F) + if np.any(F_arr == 0): + logger.warning("F profile contains zeros; F' is set to zero there.") + Fprime = np.divide(FFprime, F_arr, out=np.zeros_like(F_arr, dtype=float), where=F_arr != 0) if pprime is not None: pressure = eq_tools.pprime2p(pprime, self._psi_axis, self._psi_lcfs) @@ -842,7 +939,6 @@ def outter_parallel_fl_expansion_coef(self, *coordinates, R=None, Z=None, coord_ midplane. """ target = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords) - # print(coord.r_mid) B_midplane = self.B_abs(r=target.r_mid, theta=np.zeros_like(target.r_mid), grid=False) B_coord = self.B_abs(target) @@ -856,7 +952,6 @@ def outter_poloidal_fl_expansion_coef(self, *coordinates, R=None, Z=None, coord_ midplane. """ target = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords) - # print(coord.r_mid) B_midplane = self.B_pol(r=target.r_mid, theta=np.zeros_like(target.r_mid), grid=False) B_coord = self.B_pol(target) @@ -898,7 +993,7 @@ def plot_geometry(self, axs=None, **kwargs): fw = self.first_wall if len(fw) < 4: - print('Warning: first wall is not sufficient. LCFS is used instead.') + logger.warning('First wall is not sufficient. LCFS is used instead.') fw = self.lcfs R_min = np.min(fw.R) @@ -1717,45 +1812,36 @@ def trace_field_line(self, *coordinates, R: np.array = None, Z: np.array = None, xp_dist = np.sqrt(np.sum((xp - y0) ** 2)) atol = np.minimum(xp_dist * flt_cfg.x_point_atol_scale, atol) - if self._verbose: - print(f'>>> tracing from: {y0[0]:3f},{y0[1]:3f},{phi0:3f}') - print(f'>>> atol = {atol}') + logger.debug('tracing from: %3f,%3f,%3f', y0[0], y0[1], phi0) + logger.debug('atol = %s', atol) stopper = None if stopper_method is None: if coords.psi_n[i] <= 1: # todo: determine the direction (now -1) !! - if self._verbose: - print('>>> poloidal stopper is used') + logger.debug('poloidal stopper is used') # XXX Direction (TODO) # XXX add these values to cocos dict! # sign(dtheta/dphi) = sigma_pol * sign(I * B) # dphidtheta = self._cocosdic['sigma_pol'] * np.sign(self.I_plasma) * np.sign(self.F0) - # print('dir: {}\nsigma_pol: {}\nsigma_tor: {}\nIp: {}\nF0: {}'.format( - # direction, self._cocosdic['sigma_pol'], self._cocosdic['sigma_cyl'], self.I_plasma, self.F0 - # )) - # print('------------------') dphidtheta = np.sign(self.F0) * self._cocosdic['sigma_pol'] * self._cocosdic['sigma_cyl'] - print(f'direction: {direction}') - print(f'dphidtheta: {dphidtheta}') + logger.debug('direction: %s', direction) + logger.debug('dphidtheta: %s', dphidtheta) stopper = flt.poloidal_angle_stopper_factory(y0, self.magnetic_axis.as_array()[0], dphidtheta * direction, stop_res=flt_cfg.poloidal_stop_resolution) else: - if self._verbose: - print('>>> z-lim stopper is used') + logger.debug('z-lim stopper is used') stopper = flt.rz_coordinate_stopper_factory(r_lims, z_lims) elif stopper_method == 'z-stopper': - if self._verbose: - print('>>> z-lim stopper is used') + logger.debug('z-lim stopper is used') stopper = flt.rz_coordinate_stopper_factory(r_lims, z_lims) elif stopper_method == 'poloidal': - if self._verbose: - print('>>> poloidal stopper is used') + logger.debug('poloidal stopper is used') dphidtheta = np.sign(self.F0) * self._cocosdic['sigma_pol'] * self._cocosdic['sigma_cyl'] stopper = flt.poloidal_angle_stopper_factory(y0, self.magnetic_axis.as_array()[0], @@ -1774,8 +1860,7 @@ def trace_field_line(self, *coordinates, R: np.array = None, Z: np.array = None, rtol=flt_cfg.rtol, ) - if self._verbose: - print(f"{sol.message}, {sol.nfev}") + logger.debug("%s, %s", sol.message, sol.nfev) phi = sol.t R, Z = sol.y @@ -2002,11 +2087,10 @@ def _init_q(self): psi_n = np.arange(fs_cfg.q_psi_n_min, 1, fs_cfg.q_psi_n_step) qs = [] - if self._verbose: - print("--- Generating q-splines ---") + logger.debug("Generating q-splines") for i, pn in enumerate(psi_n): - if self._verbose and np.mod(i, 20) == 0: - print(f"{pn / np.max(psi_n) * 100:.0f}%\r") + if np.mod(i, 20) == 0: + logger.debug("q-spline progress: %.0f%%", pn / np.max(psi_n) * 100) surface = self._flux_surface(psi_n=pn) c = surface[0] qs.append(c.eval_q) diff --git a/pleque/core/surfacefunctions.py b/pleque/core/surfacefunctions.py index 775c729..95158fb 100644 --- a/pleque/core/surfacefunctions.py +++ b/pleque/core/surfacefunctions.py @@ -26,11 +26,8 @@ def add_surface_func(self, name, data, *coordinates, R=None, Z=None, psi_n=None, coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, **coords) indr, indz = self.evalcoord(coord) - #print(coord.R[indr].shape) - #print(coord.Z[indz].shape) data[indr, :] data = data[:, indz] - #print(data.shape) self._func_names.append(name) interp2d = RectBivariateSpline(coord.R[indr], coord.Z[indz], data, kx=spline_order, ky=spline_order, diff --git a/pleque/io/_geqdsk.py b/pleque/io/_geqdsk.py index 7a9558b..7ca7c87 100644 --- a/pleque/io/_geqdsk.py +++ b/pleque/io/_geqdsk.py @@ -20,6 +20,7 @@ """ +import logging from datetime import date import numpy as np @@ -30,6 +31,8 @@ from ._fileutils import ChunkOutput, f2s, next_value, write_1d, write_2d +logger = logging.getLogger(__name__) + def write(data, fh, label=None, shot=None, time=None): """ @@ -65,7 +68,7 @@ def write(data, fh, label=None, shot=None, time=None): label = "PLEQUE" if len(label) > 11: label = label[0:12] - print(f'WARNING: label too long, it will be shortened to {label}') + logger.warning('label too long, it will be shortened to %s', label) creation_date = date.today().strftime("%d/%m/%Y") @@ -187,7 +190,7 @@ def read(fh): nx = int(words[-2]) ny = int(words[-1]) - print(f" nx = {nx}, ny = {ny}") + logger.debug("nx = %s, ny = %s", nx, ny) # Dictionary to hold result data = {"nx": nx, "ny": ny} @@ -238,7 +241,7 @@ def read_2d(n, m): nbdry = next(values) nlim = next(values) - print(nbdry, nlim) + logger.debug("nbdry = %s, nlim = %s", nbdry, nlim) if nbdry > 0: # Read (R,Z) pairs @@ -291,12 +294,13 @@ def data_as_ds(data): return eq_xarray -def read_as_equilibrium(fh, cocos=3, first_wall=None): +def read_as_equilibrium(fh, cocos=3, first_wall=None, init_method="hints"): """ Read the eqdsk file and open it as `pleque.Equilibrium`. :param fh: file handler :param cocos: Tokamak coordinates convension. Default cocos = 3 (EFIT). + :param init_method: One of ("full", "hints", "fast_forward"), see `pleque.Equilibrium`. :return: instance of `Equilibrium` """ @@ -308,5 +312,5 @@ def read_as_equilibrium(fh, cocos=3, first_wall=None): else: fw = first_wall - eq = pleque.Equilibrium(ds, fw, cocos=cocos) + eq = pleque.Equilibrium(ds, fw, cocos=cocos, init_method=init_method) return eq diff --git a/pleque/io/compass.py b/pleque/io/compass.py index 90475f1..0aae005 100644 --- a/pleque/io/compass.py +++ b/pleque/io/compass.py @@ -10,8 +10,7 @@ from pleque.io._geqdsk import data_as_ds, read from pleque.io.tools import EquilibriaTimeSlices -logger = (logging.getLogger(__name__)) -# logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) def cdb(shot=None, time=1060, revision=1, variant=''): """ @@ -179,7 +178,7 @@ def get_ds_from_cudb(shot, time=None, revision=-1, variant='', time_unit='s', fi if isinstance(first_wall, str) and first_wall == 'IBAv3.1': resource_package = 'pleque' - print('--- No limiter specified. The IBA v3.1 limiter will be used.') + logger.info('No limiter specified. The IBA v3.1 limiter will be used.') first_wall_resource = 'resources/limiter_v3_1_iba_v2.dat' first_wall_path = resources.files(resource_package).joinpath(*first_wall_resource.split("/")) first_wall = np.loadtxt(str(first_wall_path)) @@ -297,7 +296,7 @@ def read_fiesta_equilibrium(filepath, first_wall=None): first_wall = np.stack((ds.r_lim.values, ds.z_lim.values)).T if first_wall is None: - print('--- No limiter specified. The IBA v3.1 limiter will be used.') + logger.info('No limiter specified. The IBA v3.1 limiter will be used.') first_wall_resource = 'resources/limiter_v3_1_iba_v2.dat' first_wall = str(resources.files(resource_package).joinpath(*first_wall_resource.split("/"))) diff --git a/pleque/io/geqdsk.py b/pleque/io/geqdsk.py index 2983a71..3c4b1bb 100644 --- a/pleque/io/geqdsk.py +++ b/pleque/io/geqdsk.py @@ -8,6 +8,8 @@ from ._geqdsk import read_as_equilibrium from ._geqdsk import write as write_geqdsk +logger = logging.getLogger(__name__) + # Sentinel distinguishing "argument not given" from an explicit None (None has its own # meaning for `nbdry` in `write`). _UNSET = object() @@ -33,10 +35,9 @@ def _is_1dprofile_in_basedata(basedata, name, nx): if name in basedata: if basedata[name].shape[0] == nx and basedata[name].ndim == 1: return True - logging.info(f"Basedata {name} has wrong shape or dimension.\n" - f"Shape: {basedata[name].shape}, expected: ({nx},), " - f"ndim: {basedata[name].ndim}, expected: 1.") - logging.info(f"Basedata {name} not found.") + logger.info("Basedata %s has wrong shape or dimension. Shape: %s, expected: (%s,), ndim: %s, expected: 1.", + name, basedata[name].shape, nx, basedata[name].ndim) + logger.info("Basedata %s not found.", name) return False diff --git a/pleque/io/jet/reader_jet.py b/pleque/io/jet/reader_jet.py index 3687fa7..870e280 100644 --- a/pleque/io/jet/reader_jet.py +++ b/pleque/io/jet/reader_jet.py @@ -7,6 +7,8 @@ """ +import logging + import numpy as np import xarray as xr from jet.data import sal @@ -14,6 +16,8 @@ from pleque.core import Equilibrium +logger = logging.getLogger(__name__) + def deltapsi_calc(pulse): """ @@ -132,7 +136,7 @@ def sal_jet(pulse, timex=47.0, time_unit="s"): except NodeNotFound: limiter_r = sal.get(data_path.format(94508, 'rlim', sequence)).data.T limiter_z = sal.get(data_path.format(94508, 'zlim', sequence)).data.T - print(f"Limiter points not present in #{pulse}, loaded from #94508") + logger.warning("Limiter points not present in #%s, loaded from #94508", pulse) limiter = np.column_stack([limiter_r, limiter_z]) diff --git a/pleque/io/omas.py b/pleque/io/omas.py index b2b6279..e82f7d7 100644 --- a/pleque/io/omas.py +++ b/pleque/io/omas.py @@ -1,3 +1,5 @@ +import logging + import numpy as np import omas import xarray as xr @@ -5,6 +7,8 @@ from pleque.config.settings import get_settings from pleque.core import Equilibrium +logger = logging.getLogger(__name__) + def read(ods: omas.ODS, time=None, time_unit="s"): """ @@ -34,7 +38,7 @@ def read(ods: omas.ODS, time=None, time_unit="s"): from omas.omas_physics import add_wall add_wall(omas) except ImportError: - print("The newest OMAS is required to add wall to IDS.") + logger.warning("The newest OMAS is required to add wall to IDS.") except KeyError: pass @@ -149,7 +153,8 @@ def write(equilibrium: Equilibrium, grid_1d=None, grid_2d=None, gridtype=1, ods= if equilibrium.time_unit == "ms": shot_time *= 1e3 elif equilibrium.time_unit != "s": - print(f"WARNING: unknown time unit ({equilibrium.time_unit}) is used. Seconds will be used insted for saving.") + logger.warning("Unknown time unit (%s) is used. Seconds will be used instead for saving.", + equilibrium.time_unit) # ods["info"]["shot"] = equilibrium.shot ods["dataset_description"]["data_entry"]["pulse"] = equilibrium.shot diff --git a/pleque/io/readers.py b/pleque/io/readers.py index d5fb2af..da5a8c7 100644 --- a/pleque/io/readers.py +++ b/pleque/io/readers.py @@ -2,7 +2,7 @@ from ._geqdsk import read_as_equilibrium -def read_geqdsk(filename, cocos=3): +def read_geqdsk(filename, cocos=3, init_method="hints"): """ Read a G-EQDSK formatted equilibrium file @@ -15,11 +15,12 @@ def read_geqdsk(filename, cocos=3): COordinate COnventions. Not fully handled yet, only whether psi is divided by 2pi or not. if < 10 then psi is divided by 2pi, otherwise not. + :param str init_method: One of ("full", "hints", "fast_forward"), see `pleque.Equilibrium`. :return: instance of `Equilibrium` """ with open(filename) as f: - eq = read_as_equilibrium(f, cocos) + eq = read_as_equilibrium(f, cocos, init_method=init_method) return eq diff --git a/pleque/io/tools.py b/pleque/io/tools.py index f7d3597..705357c 100644 --- a/pleque/io/tools.py +++ b/pleque/io/tools.py @@ -1,3 +1,4 @@ +import logging from collections import OrderedDict import numpy as np @@ -5,6 +6,8 @@ from pleque import Equilibrium +logger = logging.getLogger(__name__) + class EquilibriaTimeSlices: """ @@ -22,13 +25,14 @@ def __init__(self, eqs_dataset, limiter=None, cocos=3): self.limiter = limiter self.cocos = cocos - def get_time_slice(self, time: float, tolerance=None): + def get_time_slice(self, time: float, tolerance=None, init_method="hints"): """ Creates an Equilibrium from the slice nearest to the specified time :param time: float, time in ms :param tolerance: float or None, raise ValueError if the selected time slice is outside the tolerance. If None the warning is shown if the time difference is more then 10 ms. + :param init_method: str One of ("full", "hints", "fast_forward"), see `pleque.Equilibrium`. """ ds: xr.Dataset = self.eqs_dataset.dropna(dim='time', how='all') \ .sel(time=time, method='nearest') @@ -40,13 +44,11 @@ def get_time_slice(self, time: float, tolerance=None): ) elif np.abs(ds.time - time) > 10: - print('!!!!!!!!!!!') - print(f'WARNING: Insufficient time slice found! Delta time: {ds.time.item() - time:.1f} ms\n' - f' Required time: {time:.1f} ms, selected time {ds.time.item():.1f} ms. ' - ) - print('!!!!!!!!!!!') + logger.warning('Insufficient time slice found! Delta time: %.1f ms. ' + 'Required time: %.1f ms, selected time %.1f ms.', + ds.time.item() - time, time, ds.time.item()) - eq = Equilibrium(ds, self.limiter, cocos=self.cocos) + eq = Equilibrium(ds, self.limiter, init_method=init_method, cocos=self.cocos) return eq diff --git a/pleque/tests/utils.py b/pleque/tests/utils.py index c7611a3..8e55510 100644 --- a/pleque/tests/utils.py +++ b/pleque/tests/utils.py @@ -79,3 +79,71 @@ def get_test_divertor(): limiterfile = 'resources/limiter_v3_1_iba.dat' limiter = [resource_path(resource_package, limiterfile)] return limiter + + +def synthetic_test_wall(): + """First wall matching the grid of :func:`synthetic_dataset`.""" + return np.array([ + [1.0, -1.0], + [2.0, -1.0], + [2.0, -0.2], + [2.0, 0], + [2.0, 0.2], + [2.0, 1.1], + [1.0, 1.1], + [1.0, 0.2], + [1.0, 0], + [1.0, -0.2], + [1.0, -1.0] + ]) + + +def synthetic_dataset(profiles="pprime_ffprime"): + """ + Build a simple analytic equilibrium dataset for testing. + + psi = (R - 1.5)^2 + (Z - 0.2)^2, i.e. a circular limiter plasma with the + magnetic axis at (1.5, 0.2) and psi_axis = 0. + + :param profiles: "pprime_ffprime" (default) provides the profile derivatives, + "p_f" provides the integrated pressure and F profiles, + "none" provides no profiles (vacuum equilibrium). + :return: xarray.Dataset suitable for direct `Equilibrium` construction + """ + R = np.linspace(0.9, 2.1, 15) + Z = np.linspace(-1.15, 1.2, 30) + R_mesh, Z_mesh = np.meshgrid(R, Z) + + psi = (R_mesh - 1.5) ** 2 + (Z_mesh - 0.2) ** 2 + + F0 = 1 + psi_n = np.linspace(0.0, 1.0, 10) + + data_vars = {'psi': (['Z', 'R'], psi)} + + if profiles == "pprime_ffprime": + data_vars['pprime'] = (['psi_n'], np.linspace(-1.0, 0.0, 10)) + data_vars['FFprime'] = (['psi_n'], np.linspace(0.0, 1.0, 10)) + elif profiles == "p_f": + import pleque.utils.equi_tools as eq_tools + + pprime = np.linspace(-1.0, 0.0, 10) + ffprime = np.linspace(0.0, 1.0, 10) + # psi_axis = 0; psi_lcfs given by the contact point of synthetic_test_wall() + psi_lcfs = (2.0 - 1.5) ** 2 + data_vars['pressure'] = (['psi_n'], np.asarray(eq_tools.pprime2p(pprime, 0.0, psi_lcfs))) + data_vars['F'] = (['psi_n'], np.asarray(eq_tools.ffprime2f(ffprime, 0.0, psi_lcfs, F0))) + elif profiles != "none": + raise ValueError(f"Unknown profiles option: {profiles!r}") + + return xr.Dataset( + data_vars=data_vars, + coords={ + 'R': R, + 'Z': Z, + 'psi_n': psi_n, + }, + attrs={ + "F0": F0, + } + ) diff --git a/pleque/utils/equi_tools.py b/pleque/utils/equi_tools.py index bfd16bd..b7dca2b 100644 --- a/pleque/utils/equi_tools.py +++ b/pleque/utils/equi_tools.py @@ -1,3 +1,4 @@ +import logging from collections.abc import Iterable from scipy.optimize import brentq, minimize @@ -15,6 +16,49 @@ from pleque.config.settings import get_settings from pleque.utils.surfaces import find_contour, points_inside_curve +logger = logging.getLogger(__name__) + + +def synthesize_rectangular_wall(rs, zs): + """ + Synthesize an artificial rectangular first wall around the psi grid. + + The rectangle is slightly inset with respect to the grid extent + (`grid.synthetic_wall_margin` setting) so that some plasma can be found + behind the wall; each side is sampled with + `grid.synthetic_wall_points_per_side` points. + + :param rs: array-like, R coordinates of the psi grid + :param zs: array-like, Z coordinates of the psi grid + :return: array (4 * points_per_side, 2) of wall points + """ + wall_cfg = get_settings().grid + + rwall_min = np.min(rs) + rwall_max = np.max(rs) + zwall_min = np.min(zs) + zwall_max = np.max(zs) + + dr = rwall_max - rwall_min + dz = zwall_max - zwall_min + + rwall_min += dr * wall_cfg.synthetic_wall_margin + rwall_max -= dr * wall_cfg.synthetic_wall_margin + zwall_min += dz * wall_cfg.synthetic_wall_margin + zwall_max -= dz * wall_cfg.synthetic_wall_margin + + corners = np.array( + [[rwall_min, zwall_max], [rwall_max, zwall_max], [rwall_max, zwall_min], + [rwall_min, zwall_min]]) + newwall_r = [] + newwall_z = [] + for i in range(-1, 3): + side_r = np.linspace(corners[i, 0], corners[i + 1, 0], wall_cfg.synthetic_wall_points_per_side) + side_z = np.linspace(corners[i, 1], corners[i + 1, 1], wall_cfg.synthetic_wall_points_per_side) + newwall_r += list(side_r) + newwall_z += list(side_z) + return np.stack((newwall_r, newwall_z)).T + def _make_psi_grad_sq(psi_spl): """Return a closure that evaluates the squared gradient magnitude of psi at point x=(R,Z).""" @@ -170,23 +214,24 @@ def psi_2nd_derivatives(r_coord, z_coord): x_points.append((r_ex, z_ex)) - print(f"Found {len(o_points)} o-points and {len(x_points)} x-points with order {order}") + logger.debug("Found %d o-points and %d x-points with order %d", len(o_points), len(x_points), order) order = order // 2 if len(o_points) == 0 and order == 0: - - import matplotlib.pyplot as plt - _fig, ax = plt.subplots() - ax.contourf(rs, zs, psi_xysq.T) - ax.plot(rs[mins0[0]], zs[mins0[1]], 'rx') - ax.plot(rs[mins1[0]], zs[mins1[1]], 'b+') - ax.set_aspect('equal') - ax.set_title(f"DEBUG plot of d2psi_dxdy with argrelmin order {order}") - ax.set_xlabel('R') - ax.set_ylabel('Z') - - plt.show() - + logger.warning("No O-point candidate found on the grid; magnetic-axis recognition will likely fail.") + + if get_settings().debug_plots: + import matplotlib.pyplot as plt + _fig, ax = plt.subplots() + ax.contourf(rs, zs, psi_xysq.T) + ax.plot(rs[mins0[0]], zs[mins0[1]], 'rx') + ax.plot(rs[mins1[0]], zs[mins1[1]], 'b+') + ax.set_aspect('equal') + ax.set_title(f"DEBUG plot of d2psi_dxdy with argrelmin order {order}") + ax.set_xlabel('R') + ax.set_ylabel('Z') + + plt.show() o_points = np.array(o_points) x_points = np.array(x_points) diff --git a/pleque/utils/field_line_tracers.py b/pleque/utils/field_line_tracers.py index 0e82cc8..dfacc8b 100644 --- a/pleque/utils/field_line_tracers.py +++ b/pleque/utils/field_line_tracers.py @@ -4,6 +4,8 @@ from pleque.config.settings import get_settings +logger = logging.getLogger(__name__) + def dphi_tracer_factory(BR_func, BZ_func, Bphi_func, BR_pert_func=None, BZ_pert_func=None, direction=1): """Factory for function $d[R,Z]/d\\phi=f(\\phi, [R,Z])$ @@ -27,7 +29,7 @@ def dphi_tracer_factory(BR_func, BZ_func, Bphi_func, BR_pert_func=None, BZ_pert_ This function is mostly useful when the full spatial coordinates of the field line are required. """ if direction < 0: - logging.warning("Tracing field lines in the opposite direction of the magnetic field.") + logger.warning("Tracing field lines in the opposite direction of the magnetic field.") if BR_pert_func and BR_pert_func: diff --git a/pleque/utils/plotting.py b/pleque/utils/plotting.py index 1fd7a0c..7a82698 100644 --- a/pleque/utils/plotting.py +++ b/pleque/utils/plotting.py @@ -1,3 +1,5 @@ +import logging + import matplotlib.pyplot as plt import numpy as np @@ -5,6 +7,8 @@ from pleque.config.settings import get_settings from pleque.utils.equi_tools import get_psi_n_on_q +logger = logging.getLogger(__name__) + def _plot_extremes(o_points, x_points, ax: plt.Axes = None, all_o_points=True, all_x_points=True, **kwargs): @@ -38,51 +42,51 @@ def _plot_debug(eq: pleque.Equilibrium, ax: plt.Axes = None, levels=None, colorb if colorbar: plt.contour(cl) except Exception: - print("WARNING: Something wrong with psi spline.") + logger.warning("Something wrong with psi spline.") try: ax.plot(eq._first_wall[:, 0], eq._first_wall[:, 1], "k+-", label='first wall') except Exception: - print("WARNING: No first wall?!") + logger.warning("No first wall?!") try: ax.plot(eq._lcfs[:, 0], eq._lcfs[:, 1], "C0", label='LCFS') except Exception: - print("WARNING: LCFS in troubles?!") + logger.warning("LCFS in troubles?!") try: ax.contour(rs, zs, eq._spl_psi(rs, zs).T, [eq._psi_lcfs], colors="C1", linestyles="--") except Exception: - print("WARNING: LCFS contour problem.") + logger.warning("LCFS contour problem.") try: ax.plot(eq._o_points[:, 0], eq._o_points[:, 1], "C0o", label='o-points') except Exception: - print("WARNING: O-points in trouble") + logger.warning("O-points in trouble") try: ax.plot(*eq._mg_axis, "C1o", label='mg axis') except Exception: - print("WARNING: mg. axis in trouble") + logger.warning("mg. axis in trouble") try: ax.plot(eq._x_points[:, 0], eq._x_points[:, 1], "C2x", label='x-points') except Exception: - print("WARNING: X-points in trouble") + logger.warning("X-points in trouble") try: ax.plot(eq._x_point[0], eq._x_point[1], "rx", lw=2, label='x-point') except Exception: - print("WARNING: THE X-point in trouble") + logger.warning("THE X-point in trouble") try: ax.plot(eq._limiter_point[0], eq._limiter_point[1], "g+", lw=3, label='limiter point') except Exception: - print("WARNING: Limiter point is in trouble.") + logger.warning("Limiter point is in trouble.") try: ax.plot(eq._strike_points[:, 0], eq._strike_points[:, 1], "C3+", lw=2, label='strike points') except Exception: - print("WARNING: Strike-points in trouble.") + logger.warning("Strike-points in trouble.") ax.set_title("DEBUG PLOT") ax.legend() diff --git a/tests/test_equilibria.py b/tests/test_equilibria.py index c649a09..fc4f8f9 100644 --- a/tests/test_equilibria.py +++ b/tests/test_equilibria.py @@ -13,22 +13,20 @@ def test_critical(equilibrium): """ - Test if all critical points are properly set. It differ for x-point and limter plasma. + Test if all critical points are properly set. It differs for x-point and limiter plasma. """ - - # # x-point plasma: - # This code will be added soon - # if equilibrium.is_xpoint_plasma: - # assert equilibrium.x_point == equilibrium.limiter_point - # assert np.isclose(equilibrium._psi_lcfs, equilibrium._psi_xp) - # assert equilibrium.contact_point is None - # if len(equilibrium.first_wall) > 4: - # assert len(equilibrium.strike_points) > 1 - # - # else: - # assert equilibrium.contact_point == equilibrium.strike_points - # assert equilibrium.contact_point == equilibrium.limiter_point - pass + if not equilibrium._limiter_plasma: + # x-point plasma: the plasma is limited by the x-point + assert equilibrium._x_point is not None + assert np.allclose(equilibrium._x_point, equilibrium._limiter_point) + assert np.isclose(np.asarray(equilibrium._psi_lcfs).item(), np.asarray(equilibrium._psi_xp).item()) + assert equilibrium.contact_point is None + if len(equilibrium.first_wall) > 4: + assert len(equilibrium.strike_points) > 1 + else: + assert np.allclose(equilibrium._contact_point, equilibrium._limiter_point) + assert np.allclose(equilibrium._strike_points[0], equilibrium._contact_point) + assert isinstance(equilibrium.contact_point, Coordinates) o_points = [array([0.90891258, 0.01440663]), array([0.9083923, 0.06746012]), diff --git a/tests/test_equilibrium_init.py b/tests/test_equilibrium_init.py new file mode 100644 index 0000000..997a6f5 --- /dev/null +++ b/tests/test_equilibrium_init.py @@ -0,0 +1,216 @@ +"""Tests of the `Equilibrium` initialisation: hints, init_method, synthetic wall, +vacuum equilibria, profile handling, logging and the failure path.""" + +import logging + +import numpy as np +import pytest + +import pleque.utils.equi_tools as eq_tools +from pleque import Equilibrium +from pleque.config.settings import reload_settings +from pleque.io.readers import read_geqdsk +from pleque.tests.utils import get_test_equilibria_filenames, synthetic_dataset, synthetic_test_wall + +MG_AXIS = (1.5, 0.2) +PSI_LCFS = 0.25 + + +@pytest.fixture +def synthetic_eq(): + return Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall()) + + +def test_basic_construction(synthetic_eq): + eq = synthetic_eq + assert np.allclose(eq._mg_axis, MG_AXIS, atol=1e-3) + assert np.isclose(eq._psi_axis, 0.0, atol=1e-8) + assert np.isclose(np.asarray(eq._psi_lcfs).item(), PSI_LCFS, atol=1e-6) + assert eq._limiter_plasma + assert not eq._vacuum + assert len(eq._lcfs) > 10 + + +def test_synthetic_wall_generation(): + from pleque.config.settings import get_settings + + ds = synthetic_dataset() + eq = Equilibrium(ds) + + wall_cfg = get_settings().grid + fw = eq._first_wall + assert len(fw) == 4 * wall_cfg.synthetic_wall_points_per_side + # the synthesized wall is slightly inset with respect to the grid + assert np.min(fw[:, 0]) > np.min(ds.R.values) + assert np.max(fw[:, 0]) < np.max(ds.R.values) + assert np.min(fw[:, 1]) > np.min(ds.Z.values) + assert np.max(fw[:, 1]) < np.max(ds.Z.values) + + +def test_synthesize_rectangular_wall_unit(): + rs = np.linspace(1.0, 2.0, 10) + zs = np.linspace(-1.0, 1.0, 20) + wall = eq_tools.synthesize_rectangular_wall(rs, zs) + assert wall.ndim == 2 and wall.shape[1] == 2 + assert np.min(wall[:, 0]) > 1.0 and np.max(wall[:, 0]) < 2.0 + assert np.min(wall[:, 1]) > -1.0 and np.max(wall[:, 1]) < 1.0 + + +def test_vacuum_equilibrium(): + eq = Equilibrium(synthetic_dataset(profiles="none"), first_wall=synthetic_test_wall()) + assert eq._vacuum + psi_n = np.linspace(0, 1, 5) + assert np.allclose(eq.pressure(psi_n=psi_n), 0) + # construction completed: boundary and critical points exist + assert eq._limiter_plasma + assert np.allclose(eq._mg_axis, MG_AXIS, atol=1e-3) + + +def test_profile_input_equivalence(): + """Profiles given as derivatives (pprime, FFprime) vs integrated (p, F) give the same equilibrium.""" + eq_deriv = Equilibrium(synthetic_dataset(profiles="pprime_ffprime"), first_wall=synthetic_test_wall()) + eq_direct = Equilibrium(synthetic_dataset(profiles="p_f"), first_wall=synthetic_test_wall()) + + psi_n = np.linspace(0, 1, 20) + assert np.allclose(eq_deriv.pressure(psi_n=psi_n), eq_direct.pressure(psi_n=psi_n), rtol=1e-3, atol=1e-4) + assert np.allclose(eq_deriv.F(psi_n=psi_n), eq_direct.F(psi_n=psi_n), rtol=1e-3, atol=1e-4) + + +def test_hint_kwargs(): + eq = Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall(), + mg_axis=MG_AXIS, psi_lcfs=PSI_LCFS, + x_points=np.empty((0, 2)), strike_points=None) + assert np.allclose(eq._mg_axis, MG_AXIS, atol=1e-3) + + +def test_hints_from_basedata(): + ds = synthetic_dataset() + ds['mg_axis'] = (('rz',), np.array(MG_AXIS)) + ds['psi_lcfs'] = PSI_LCFS + eq = Equilibrium(ds, first_wall=synthetic_test_wall()) + assert np.allclose(eq._mg_axis, MG_AXIS, atol=1e-3) + + +def test_hint_duplicity_warning(caplog): + ds = synthetic_dataset() + ds['mg_axis'] = (('rz',), np.array([1.4, 0.1])) + with caplog.at_level(logging.WARNING, logger="pleque"): + Equilibrium(ds, first_wall=synthetic_test_wall(), mg_axis=MG_AXIS) + assert any("mg_axis specified both in basedata and as an argument" in r.message for r in caplog.records) + + +def test_fast_forward_trusts_hints(): + eq = Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall(), + mg_axis=MG_AXIS, psi_lcfs=PSI_LCFS, x_points=np.empty((0, 2)), + init_method="fast_forward") + # hints are taken as final: no critical-point search, exact hint values kept + assert np.array_equal(eq._mg_axis, np.asarray(MG_AXIS, dtype=float)) + assert eq._x_point is None + assert len(eq._o_points) == 1 + assert np.asarray(eq._psi_lcfs).item() == PSI_LCFS + assert eq._limiter_plasma + + +def test_fast_forward_fallback_warning(caplog): + with caplog.at_level(logging.WARNING, logger="pleque"): + eq = Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall(), init_method="fast_forward") + assert any("falling back to 'hints'" in r.message for r in caplog.records) + assert np.allclose(eq._mg_axis, MG_AXIS, atol=1e-3) + + +def test_invalid_init_method(): + with pytest.raises(ValueError, match="init_method"): + Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall(), init_method="bogus") + + +@pytest.mark.parametrize("case", [0, 3]) +def test_init_method_consistency(case): + """'full', 'hints' and 'fast_forward' (without hints: fallback) agree on bundled cases.""" + gfile = get_test_equilibria_filenames()[case] + eq_full = read_geqdsk(gfile, init_method="full") + eq_hints = read_geqdsk(gfile, init_method="hints") + eq_fast = read_geqdsk(gfile, init_method="fast_forward") + + for eq in (eq_full, eq_fast): + assert np.allclose(eq._mg_axis, eq_hints._mg_axis) + assert np.isclose(eq._psi_axis, eq_hints._psi_axis) + assert np.isclose(np.asarray(eq._psi_lcfs).item(), np.asarray(eq_hints._psi_lcfs).item()) + if eq_hints._x_point is not None: + assert np.allclose(eq._x_point, eq_hints._x_point) + if eq_hints._strike_points is not None and eq._strike_points is not None: + assert np.allclose(eq._strike_points, eq_hints._strike_points) + + +def test_double_null_x_points(): + gfile = get_test_equilibria_filenames()[2] + eq = read_geqdsk(gfile) + assert eq._x_point is not None + assert eq._x_point2 is not None + psi1 = np.asarray(eq._spl_psi(*eq._x_point, grid=False)).item() + psi2 = np.asarray(eq._spl_psi(*eq._x_point2, grid=False)).item() + psi_span = abs(np.asarray(eq._psi_lcfs).item() - eq._psi_axis) + assert abs(psi1 - psi2) / psi_span < 0.05 + + +def test_spline_order_override(): + eq = Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall(), spline_order=5, spline_smooth=0) + assert eq._spl_psi.degrees == (5, 5) + + +def test_init_logging(caplog): + with caplog.at_level(logging.DEBUG, logger="pleque"): + Equilibrium(synthetic_dataset()) + messages = [r.message for r in caplog.records] + assert any("Limiter plasma found." in m for m in messages) + assert any("rectangular wall" in m for m in messages) + + +def test_verbose_smoke(): + eq = Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall(), verbose=True) + assert eq._verbose + # restore the default level changed by verbose=True + logging.getLogger("pleque").setLevel(logging.WARNING) + + +def _dataset_without_profiles_coord(): + """Dataset that fails the initialisation after the spatial data are loaded (no psi_n).""" + ds = synthetic_dataset() + return ds.drop_vars(["pprime", "FFprime"]).drop_vars("psi_n") + + +def test_failure_is_logged_and_no_debug_plot(caplog, monkeypatch): + import matplotlib + + matplotlib.use("Agg") + import matplotlib.pyplot as plt + + shown = [] + monkeypatch.setattr(plt, "show", lambda *a, **k: shown.append(True)) + + with caplog.at_level(logging.ERROR, logger="pleque"): + with pytest.raises(AttributeError): + Equilibrium(_dataset_without_profiles_coord(), first_wall=synthetic_test_wall()) + + assert any("Equilibrium initialization failed." in r.message for r in caplog.records) + assert shown == [] + + +def test_failure_debug_plot_enabled(monkeypatch): + import matplotlib + + matplotlib.use("Agg") + import matplotlib.pyplot as plt + + shown = [] + monkeypatch.setattr(plt, "show", lambda *a, **k: shown.append(True)) + + monkeypatch.setenv("PLEQUE_DEBUG_PLOTS", "1") + reload_settings() + try: + with pytest.raises(AttributeError): + Equilibrium(_dataset_without_profiles_coord(), first_wall=synthetic_test_wall()) + assert shown == [True] + finally: + monkeypatch.delenv("PLEQUE_DEBUG_PLOTS") + reload_settings() + plt.close("all") diff --git a/tests/test_invert_equilibrium.py b/tests/test_invert_equilibrium.py index 42ddf50..1ef1e9b 100644 --- a/tests/test_invert_equilibrium.py +++ b/tests/test_invert_equilibrium.py @@ -1,64 +1,12 @@ import numpy as np -import xarray as xr from pleque import Equilibrium -from pleque.tests.utils import load_testing_equilibrium +from pleque.tests.utils import load_testing_equilibrium, synthetic_dataset, synthetic_test_wall def create_test_equilibrium(): """Create a simple test equilibrium for testing inversion.""" - # Create a simple grid - R = np.linspace(0.9, 2.1, 15) - Z = np.linspace(-1.15, 1.2, 30) - R_mesh, Z_mesh = np.meshgrid(R, Z) - - # Create a simple psi function: (R-1.5)^2 + Z^2 - psi = (R_mesh - 1.5)**2 + (Z_mesh-0.2)**2 - - # Create a simple F function - F0 = 1 - psi_n = np.linspace(0.0, 1.0, 10) - - # Create a simple pprime function - pprime = np.linspace(-1.0, 0.0, 10) - - # Create a simple FFprime function - FFprime = np.linspace(0.0, 1.0, 10) - - # Create a simple first wall - first_wall = np.array([ - [1.0, -1.0], - [2.0, -1.0], - [2.0, -0.2], - [2.0, 0], - [2.0, 0.2], - [2.0, 1.1], - [1.0, 1.1], - [1.0, 0.2], - [1.0, 0], - [1.0, -0.2], - [1.0, -1.0] - ]) - - # Create a dataset - ds = xr.Dataset( - data_vars={ - 'psi': (['Z', 'R'], psi), - 'pprime': (['psi_n'], pprime), - 'FFprime': (['psi_n'], FFprime), - }, - coords={ - 'R': R, - 'Z': Z, - 'psi_n': psi_n, - }, - attrs={ - "F0": F0, - } - ) - - # Create an equilibrium - eq = Equilibrium(ds, first_wall=first_wall) + eq = Equilibrium(synthetic_dataset(), first_wall=synthetic_test_wall()) return eq