Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions imap_processing/mag/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ class ModeFlags(Enum):
RANGE_BIT_WIDTH = 2
MAX_COMPRESSED_VECTOR_BITS = 60
FILLVAL = -1e31
# Relative tolerance (7.5%) for L1C cadence checks; allows small clock-drift
# variation around the expected cadence before a spacing is treated as a gap.
L1C_CADENCE_TOLERANCE = 0.075


def vectors_per_second_from_string(vecsec_string: str) -> dict:
Expand Down
33 changes: 24 additions & 9 deletions imap_processing/mag/l1c/interpolation_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def quadratic(
output_timestamps: np.ndarray,
input_rate: VecSec | None = None,
output_rate: VecSec | None = None,
extrapolate: bool = False,
) -> tuple[np.ndarray, np.ndarray]:
"""
Quadratic interpolation of input vectors to output timestamps.
Expand All @@ -108,16 +109,20 @@ def quadratic(
Not required for this interpolation method.
output_rate : VecSec, optional
Not required for this interpolation method.
extrapolate : bool, optional
Whether to allow extrapolation of output timestamps outside the range of input
timestamps. Default is False.

Returns
-------
numpy.ndarray
Interpolated vectors of shape (m, 3) where m is equal to the number of output
timestamps. Contains x, y, z components of the vector.
"""
output_timestamps = remove_invalid_output_timestamps(
input_timestamps, output_timestamps
)
if not extrapolate:
output_timestamps = remove_invalid_output_timestamps(
input_timestamps, output_timestamps
)
spline = make_interp_spline(input_timestamps, input_vectors, k=2)
return output_timestamps, spline(output_timestamps)

Expand All @@ -128,6 +133,7 @@ def cubic(
output_timestamps: np.ndarray,
input_rate: VecSec | None = None,
output_rate: VecSec | None = None,
extrapolate: bool = False,
) -> tuple[np.ndarray, np.ndarray]:
"""
Cubic interpolation of input vectors to output timestamps.
Expand All @@ -146,16 +152,20 @@ def cubic(
Not required for this interpolation method.
output_rate : VecSec, optional
Not required for this interpolation method.
extrapolate : bool, optional
Whether to allow extrapolation of output timestamps outside the range of input
timestamps. Default is False.

Returns
-------
numpy.ndarray
Interpolated vectors of shape (m, 3) where m is equal to the number of output
timestamps. Contains x, y, z components of the vector.
"""
output_timestamps = remove_invalid_output_timestamps(
input_timestamps, output_timestamps
)
if not extrapolate:
output_timestamps = remove_invalid_output_timestamps(
input_timestamps, output_timestamps
)
spline = make_interp_spline(input_timestamps, input_vectors, k=3)
return output_timestamps, spline(output_timestamps)

Expand Down Expand Up @@ -296,7 +306,7 @@ def linear_filtered(
input_filtered, vectors_filtered = cic_filter(
input_vectors, input_timestamps, output_timestamps, input_rate, output_rate
)
return linear(vectors_filtered, input_filtered, output_timestamps)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should add extrapolate=true to all the _filtered methods if we're doing it for one.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, you're right, the same post-CIC tail-clipping bug exists in quadratic_filtered and cubic_filtered. It's dormant because L1C_INTERPOLATION_METHOD = "linear_filtered", but it'd bite us if the config changed. I'll add it to the other methods, and add an edge-case test that hits all three at the tail boundary.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return linear(vectors_filtered, input_filtered, output_timestamps, extrapolate=True)


def quadratic_filtered(
Expand Down Expand Up @@ -338,7 +348,12 @@ def quadratic_filtered(
input_filtered, vectors_filtered = cic_filter(
input_vectors, input_timestamps, output_timestamps, input_rate, output_rate
)
return quadratic(vectors_filtered, input_filtered, output_timestamps)
return quadratic(
vectors_filtered,
input_filtered,
output_timestamps,
extrapolate=True,
)


def cubic_filtered(
Expand Down Expand Up @@ -380,7 +395,7 @@ def cubic_filtered(
input_filtered, vectors_filtered = cic_filter(
input_vectors, input_timestamps, output_timestamps, input_rate, output_rate
)
return cubic(vectors_filtered, input_filtered, output_timestamps)
return cubic(vectors_filtered, input_filtered, output_timestamps, extrapolate=True)


class InterpolationFunction(Enum):
Expand Down
169 changes: 137 additions & 32 deletions imap_processing/mag/l1c/mag_l1c.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.mag import imap_mag_sdc_configuration_v001 as configuration
from imap_processing.mag.constants import ModeFlags, VecSec
from imap_processing.mag.constants import (
L1C_CADENCE_TOLERANCE,
ModeFlags,
VecSec,
)
from imap_processing.mag.l1c.interpolation_methods import InterpolationFunction
from imap_processing.spice.time import et_to_ttj2000ns, str_to_et

Expand Down Expand Up @@ -461,6 +465,8 @@ def interpolate_gaps(
6-7 - compression flags.
"""
burst_epochs = burst_dataset["epoch"].data
filled_timeline_epochs = filled_norm_timeline[:, 0]
has_norm_context = np.any(filled_norm_timeline[:, 5] == ModeFlags.NORM.value)
# Exclude range values
burst_vectors = burst_dataset["vectors"].data
# Default to two vectors per second
Expand Down Expand Up @@ -497,14 +503,20 @@ def interpolate_gaps(
burst_buffer = int(required_seconds * burst_rate.value)

burst_start = max(0, burst_gap_start - burst_buffer)
burst_end = min(len(burst_epochs) - 1, burst_gap_end + burst_buffer)
burst_end = min(len(burst_epochs), burst_gap_end + burst_buffer + 1)

gap_timeline = filled_norm_timeline[
(filled_norm_timeline > gap[0]) & (filled_norm_timeline < gap[1])
gap_timeline = filled_timeline_epochs[
(filled_timeline_epochs > gap[0]) & (filled_timeline_epochs < gap[1])
]

usable_burst_end_epoch = burst_epochs[burst_end - 1]
if not has_norm_context:
# In the burst-only fallback, CIC delay compensation shortens the usable
# filtered range at the trailing edge by roughly one output cadence.
usable_burst_end_epoch -= int(1e9 / norm_rate.value)

short = (gap_timeline >= burst_epochs[burst_start]) & (
gap_timeline <= burst_epochs[burst_end]
gap_timeline <= usable_burst_end_epoch
)
num_short = int(short.sum())

Expand All @@ -524,7 +536,7 @@ def interpolate_gaps(

# gaps should not have data in timeline, still check it
for index, timestamp in enumerate(adjusted_gap_timeline):
timeline_index = np.searchsorted(filled_norm_timeline[:, 0], timestamp)
timeline_index = np.searchsorted(filled_timeline_epochs, timestamp)
if sum(
filled_norm_timeline[timeline_index, 1:4]
) == 0 and burst_gap_start + index < len(burst_vectors):
Expand All @@ -542,13 +554,12 @@ def interpolate_gaps(
missing_timeline = np.setdiff1d(gap_timeline, adjusted_gap_timeline)

for timestamp in missing_timeline:
timeline_index = np.searchsorted(filled_norm_timeline[:, 0], timestamp)
timeline_index = np.searchsorted(filled_timeline_epochs, timestamp)
if filled_norm_timeline[timeline_index, 5] != ModeFlags.MISSING.value:
raise RuntimeError(
"Self-inconsistent data. "
"Gaps not included in final timeline should be missing."
)
np.delete(filled_norm_timeline, timeline_index)

return filled_norm_timeline

Expand All @@ -557,8 +568,8 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
"""
Generate a new timeline from existing, gap-filled timeline and gaps.

The gaps are generated at a .5 second cadence, regardless of the cadence of the
existing data.
The gaps are generated at the cadence implied by the gap rate. If no rate is
provided, a default cadence of 0.5 seconds is used.

Parameters
----------
Expand All @@ -573,7 +584,8 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
numpy.ndarray
The new timeline, filled with the existing data and the generated gaps.
"""
full_timeline: np.ndarray = np.array([])
epoch_data = np.asarray(epoch_data)
full_timeline: np.ndarray = np.array([], dtype=epoch_data.dtype)
last_index = 0
for gap in gaps:
epoch_start_index = np.searchsorted(epoch_data, gap[0], side="left")
Expand All @@ -582,6 +594,7 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
)
generated_timestamps = generate_missing_timestamps(gap)
if generated_timestamps.size == 0:
last_index = int(np.searchsorted(epoch_data, gap[1], side="left"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

continue

# Remove any generated timestamps that are already in the timeline
Expand Down Expand Up @@ -639,37 +652,48 @@ def find_all_gaps(
specified as (start, end, vector_rate) where start and end both exist in the
timeline.
"""
gaps: np.ndarray = np.zeros((0, 3))
gaps: np.ndarray = np.empty((0, 3), dtype=np.int64)

# TODO: when we go back to the previous file, also retrieve expected
# vectors per second

vecsec_dict = {0: VecSec.TWO_VECS_PER_S.value} | (vecsec_dict or {})

end_index = epoch_data.shape[0]
rate_segments = _find_rate_segments(epoch_data, vecsec_dict)
if rate_segments:
first_rate = rate_segments[0][1]
last_rate = rate_segments[-1][1]
else:
default_rate = next(iter(vecsec_dict.values()))
first_rate = default_rate
last_rate = default_rate

if start_of_day_ns is not None and epoch_data[0] > start_of_day_ns:
# Add a gap from the start of the day to the first timestamp
gaps = np.concatenate(
(gaps, np.array([[start_of_day_ns, epoch_data[0], vecsec_dict[0]]]))
)

for start_time in reversed(sorted(vecsec_dict.keys())):
# Find the start index that is equal to or immediately after start_time
start_index = np.searchsorted(epoch_data, start_time, side="left")
gaps = np.concatenate(
(
find_gaps(
epoch_data[start_index : end_index + 1], vecsec_dict[start_time]
),
gaps,
np.array(
[[start_of_day_ns, epoch_data[0], first_rate]], dtype=np.int64
),
)
)
end_index = start_index

for index, (start_index, vectors_per_second) in enumerate(rate_segments):
next_start_index = (
rate_segments[index + 1][0]
if index + 1 < len(rate_segments)
else epoch_data.shape[0] - 1
)
epoch_slice = epoch_data[start_index : next_start_index + 1]
gaps = np.concatenate((gaps, find_gaps(epoch_slice, vectors_per_second)))

if end_of_day_ns is not None and epoch_data[-1] < end_of_day_ns:
gaps = np.concatenate(
(gaps, np.array([[epoch_data[-1], end_of_day_ns, vecsec_dict[start_time]]]))
(
gaps,
np.array([[epoch_data[-1], end_of_day_ns, last_rate]], dtype=np.int64),
)
)

return gaps
Expand All @@ -696,14 +720,19 @@ def find_gaps(timeline_data: np.ndarray, vectors_per_second: int) -> np.ndarray:
end_gap, as well as vectors_per_second. Start_gap and end_gap both correspond
to points in timeline_data.
"""
if timeline_data.shape[0] < 2:
return np.empty((0, 3), dtype=np.int64)

# Expected difference between timestamps in nanoseconds.
expected_gap = 1 / vectors_per_second * 1e9

diffs = abs(np.diff(timeline_data))

# Gap can be up to 7.5% larger than expected vectors per second due to clock drift
gap_index = np.asarray(diffs - expected_gap > expected_gap * 0.075).nonzero()[0]
output: np.ndarray = np.zeros((len(gap_index), 3))
gap_index = np.asarray(
diffs - expected_gap > expected_gap * L1C_CADENCE_TOLERANCE
).nonzero()[0]
output: np.ndarray = np.zeros((len(gap_index), 3), dtype=np.int64)

for index, gap in enumerate(gap_index):
output[index, :] = [
Expand All @@ -719,8 +748,8 @@ def generate_missing_timestamps(gap: np.ndarray) -> np.ndarray:
"""
Generate a new timeline from input gaps.

Any gaps specified in gaps will be filled with timestamps that are 0.5 seconds
apart.
Any gaps specified in gaps will be filled with timestamps at the gap rate. If the
gap rate is not included, the default cadence is 0.5 seconds.

Parameters
----------
Expand All @@ -734,12 +763,88 @@ def generate_missing_timestamps(gap: np.ndarray) -> np.ndarray:
full_timeline: numpy.ndarray
Completed timeline.
"""
# Generated timestamps should always be 0.5 seconds apart
difference_ns = 0.5 * 1e9
output: np.ndarray = np.arange(gap[0], gap[1], difference_ns)
difference_ns = int(0.5 * 1e9)
# Support both legacy (start, end) gaps, which use the historical 0.5 s cadence,
# and newer (start, end, rate) gaps, which use the declared cadence.
if len(gap) > 2:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning for this if statement? Add a comment plz

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if len(gap) > 2 is there to support both legacy (start, end) gaps, which keep the historical 0.5 s cadence, and newer (start, end, rate) gaps, which use the declared cadence.

I added this comment:

# Support both legacy (start, end) gaps, which use the historical 0.5 s cadence,
# and newer (start, end, rate) gaps, which use the declared cadence.

difference_ns = int(1e9 / int(gap[2]))
output: np.ndarray = np.arange(
int(np.rint(gap[0])),
int(np.rint(gap[1])),
difference_ns,
dtype=np.int64,
)
return output


def _is_expected_rate(timestamp_difference: float, vectors_per_second: int) -> bool:
"""
Determine whether a timestamp spacing matches an expected cadence.

Parameters
----------
timestamp_difference : float
The observed spacing between adjacent timestamps, in nanoseconds.
vectors_per_second : int
The expected number of vectors per second for the cadence being checked.

Returns
-------
bool
True when the observed spacing is within `L1C_CADENCE_TOLERANCE` of the expected
cadence.
"""
expected_gap = 1 / vectors_per_second * 1e9
return (
abs(timestamp_difference - expected_gap) <= expected_gap * L1C_CADENCE_TOLERANCE
)


def _find_rate_segments(
epoch_data: np.ndarray, vecsec_dict: dict[int, int]
) -> list[tuple[int, int]]:
"""
Build contiguous rate segments using observed cadence near each transition.

Walk each configured transition backward while the observed cadence already matches
the new rate so gaps stay attached to the correct segment instead of producing
spurious single-sample micro-gaps at delayed Config boundaries.

Parameters
----------
epoch_data : numpy.ndarray
The sorted epoch timestamps for the current timeline, in nanoseconds.
vecsec_dict : dict[int, int]
Mapping of transition start time to expected vectors-per-second rate.

Returns
-------
list[tuple[int, int]]
Pairs of `(start_index, vectors_per_second)` describing contiguous rate
segments in `epoch_data`.
"""
if epoch_data.shape[0] == 0:
return []

segments: list[tuple[int, int]] = []
for start_time, vectors_per_second in sorted(vecsec_dict.items()):
start_index = int(np.searchsorted(epoch_data, start_time, side="left"))
start_index = min(start_index, epoch_data.shape[0] - 1)
lower_bound = segments[-1][0] if segments else 0

while start_index > lower_bound and _is_expected_rate(
epoch_data[start_index] - epoch_data[start_index - 1], vectors_per_second
):
start_index -= 1

if segments and start_index == segments[-1][0]:
segments[-1] = (start_index, vectors_per_second)
else:
segments.append((start_index, vectors_per_second))

return segments


def vectors_per_second_from_string(vecsec_string: str) -> dict:
"""
Extract the vectors per second from a string into a dictionary.
Expand Down
Loading
Loading