Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion imap_processing/mag/l1c/interpolation_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def linear_filtered(
input_filtered, vectors_filtered = cic_filter(
input_vectors, input_timestamps, output_timestamps, input_rate, output_rate
)
return linear(vectors_filtered, input_filtered, output_timestamps)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should add extrapolate=true to all the _filtered methods if we're doing it for one.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, you're right, the same post-CIC tail-clipping bug exists in quadratic_filtered and cubic_filtered. It's dormant because L1C_INTERPOLATION_METHOD = "linear_filtered", but it'd bite us if the config changed. I'll add it to the other methods, and add an edge-case test that hits all three at the tail boundary.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return linear(vectors_filtered, input_filtered, output_timestamps, extrapolate=True)


def quadratic_filtered(
Expand Down
161 changes: 129 additions & 32 deletions imap_processing/mag/l1c/mag_l1c.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

logger = logging.getLogger(__name__)

GAP_TOLERANCE = 0.075
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should go in mag/constants.py. Might need a clearer name too (L1C_GAP_TOLERANCE) and a comment describing what this is.

Copy link
Copy Markdown
Author

@sapols sapols Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Moved to constants.py under the new name L1C_CADENCE_TOLERANCE (to make clear that it applies to cadence matching generally, not just gap detection) and I added this comment:

# Relative tolerance (7.5%) for L1C cadence checks; allows small clock-drift
# variation around the expected cadence before a spacing is treated as a gap.

(If you like L1C_GAP_TOLERANCE better we can use it just lmk)



def mag_l1c(
first_input_dataset: xr.Dataset,
Expand Down Expand Up @@ -461,6 +463,8 @@ def interpolate_gaps(
6-7 - compression flags.
"""
burst_epochs = burst_dataset["epoch"].data
norm_epochs = filled_norm_timeline[:, 0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this variable to make it clear it's the full or filled timeline?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed norm_epochs to filled_timeline_epochs

has_norm_context = np.any(filled_norm_timeline[:, 5] == ModeFlags.NORM.value)
# Exclude range values
burst_vectors = burst_dataset["vectors"].data
# Default to two vectors per second
Expand Down Expand Up @@ -497,14 +501,18 @@ def interpolate_gaps(
burst_buffer = int(required_seconds * burst_rate.value)

burst_start = max(0, burst_gap_start - burst_buffer)
burst_end = min(len(burst_epochs) - 1, burst_gap_end + burst_buffer)
burst_end = min(len(burst_epochs), burst_gap_end + burst_buffer + 1)

gap_timeline = filled_norm_timeline[
(filled_norm_timeline > gap[0]) & (filled_norm_timeline < gap[1])
]
gap_timeline = norm_epochs[(norm_epochs > gap[0]) & (norm_epochs < gap[1])]

short_end = burst_epochs[burst_end - 1]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this variable is kind of confusing to me

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally fair. It was called short_end because it’s the shortened upper bound used for the short mask after the optional one-cadence trim in the burst-only fallback, but that name wasn’t clear about what was being shortened or that it was an epoch. So I renamed it to usable_burst_end_epoch.

if not has_norm_context:
# In the burst-only fallback, CIC delay compensation shortens the usable
# filtered range at the trailing edge by roughly one output cadence.
short_end -= int(1e9 / norm_rate.value)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in the main section about my concerns about this section.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this refers to your question 3, I replied above. Happy to expand here too if useful.


short = (gap_timeline >= burst_epochs[burst_start]) & (
gap_timeline <= burst_epochs[burst_end]
gap_timeline <= short_end
)
num_short = int(short.sum())

Expand All @@ -524,7 +532,7 @@ def interpolate_gaps(

# gaps should not have data in timeline, still check it
for index, timestamp in enumerate(adjusted_gap_timeline):
timeline_index = np.searchsorted(filled_norm_timeline[:, 0], timestamp)
timeline_index = np.searchsorted(norm_epochs, timestamp)
if sum(
filled_norm_timeline[timeline_index, 1:4]
) == 0 and burst_gap_start + index < len(burst_vectors):
Expand All @@ -542,13 +550,12 @@ def interpolate_gaps(
missing_timeline = np.setdiff1d(gap_timeline, adjusted_gap_timeline)

for timestamp in missing_timeline:
timeline_index = np.searchsorted(filled_norm_timeline[:, 0], timestamp)
timeline_index = np.searchsorted(norm_epochs, timestamp)
if filled_norm_timeline[timeline_index, 5] != ModeFlags.MISSING.value:
raise RuntimeError(
"Self-inconsistent data. "
"Gaps not included in final timeline should be missing."
)
np.delete(filled_norm_timeline, timeline_index)

return filled_norm_timeline

Expand All @@ -557,8 +564,8 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
"""
Generate a new timeline from existing, gap-filled timeline and gaps.

The gaps are generated at a .5 second cadence, regardless of the cadence of the
existing data.
The gaps are generated at the cadence implied by the gap rate. If no rate is
provided, a default cadence of 0.5 seconds is used.

Parameters
----------
Expand All @@ -573,7 +580,8 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
numpy.ndarray
The new timeline, filled with the existing data and the generated gaps.
"""
full_timeline: np.ndarray = np.array([])
epoch_data = np.asarray(epoch_data)
full_timeline: np.ndarray = np.array([], dtype=epoch_data.dtype)
last_index = 0
for gap in gaps:
epoch_start_index = np.searchsorted(epoch_data, gap[0], side="left")
Expand All @@ -582,6 +590,7 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
)
generated_timestamps = generate_missing_timestamps(gap)
if generated_timestamps.size == 0:
last_index = int(np.searchsorted(epoch_data, gap[1], side="left"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

continue

# Remove any generated timestamps that are already in the timeline
Expand Down Expand Up @@ -639,37 +648,48 @@ def find_all_gaps(
specified as (start, end, vector_rate) where start and end both exist in the
timeline.
"""
gaps: np.ndarray = np.zeros((0, 3))
gaps: np.ndarray = np.empty((0, 3), dtype=np.int64)

# TODO: when we go back to the previous file, also retrieve expected
# vectors per second

vecsec_dict = {0: VecSec.TWO_VECS_PER_S.value} | (vecsec_dict or {})

end_index = epoch_data.shape[0]
rate_segments = _find_rate_segments(epoch_data, vecsec_dict)
if rate_segments:
first_rate = rate_segments[0][1]
last_rate = rate_segments[-1][1]
else:
default_rate = next(iter(vecsec_dict.values()))
first_rate = default_rate
last_rate = default_rate

if start_of_day_ns is not None and epoch_data[0] > start_of_day_ns:
# Add a gap from the start of the day to the first timestamp
gaps = np.concatenate(
(gaps, np.array([[start_of_day_ns, epoch_data[0], vecsec_dict[0]]]))
)

for start_time in reversed(sorted(vecsec_dict.keys())):
# Find the start index that is equal to or immediately after start_time
start_index = np.searchsorted(epoch_data, start_time, side="left")
gaps = np.concatenate(
(
find_gaps(
epoch_data[start_index : end_index + 1], vecsec_dict[start_time]
),
gaps,
np.array(
[[start_of_day_ns, epoch_data[0], first_rate]], dtype=np.int64
),
)
)
end_index = start_index

for index, (start_index, vectors_per_second) in enumerate(rate_segments):
next_start_index = (
rate_segments[index + 1][0]
if index + 1 < len(rate_segments)
else epoch_data.shape[0] - 1
)
epoch_slice = epoch_data[start_index : next_start_index + 1]
gaps = np.concatenate((gaps, find_gaps(epoch_slice, vectors_per_second)))

if end_of_day_ns is not None and epoch_data[-1] < end_of_day_ns:
gaps = np.concatenate(
(gaps, np.array([[epoch_data[-1], end_of_day_ns, vecsec_dict[start_time]]]))
(
gaps,
np.array([[epoch_data[-1], end_of_day_ns, last_rate]], dtype=np.int64),
)
)

return gaps
Expand All @@ -696,14 +716,19 @@ def find_gaps(timeline_data: np.ndarray, vectors_per_second: int) -> np.ndarray:
end_gap, as well as vectors_per_second. Start_gap and end_gap both correspond
to points in timeline_data.
"""
if timeline_data.shape[0] < 2:
return np.empty((0, 3), dtype=np.int64)

# Expected difference between timestamps in nanoseconds.
expected_gap = 1 / vectors_per_second * 1e9

diffs = abs(np.diff(timeline_data))

# Gap can be up to 7.5% larger than expected vectors per second due to clock drift
gap_index = np.asarray(diffs - expected_gap > expected_gap * 0.075).nonzero()[0]
output: np.ndarray = np.zeros((len(gap_index), 3))
gap_index = np.asarray(
diffs - expected_gap > expected_gap * GAP_TOLERANCE
).nonzero()[0]
output: np.ndarray = np.zeros((len(gap_index), 3), dtype=np.int64)

for index, gap in enumerate(gap_index):
output[index, :] = [
Expand All @@ -719,8 +744,8 @@ def generate_missing_timestamps(gap: np.ndarray) -> np.ndarray:
"""
Generate a new timeline from input gaps.

Any gaps specified in gaps will be filled with timestamps that are 0.5 seconds
apart.
Any gaps specified in gaps will be filled with timestamps at the gap rate. If the
gap rate is not included, the default cadence is 0.5 seconds.

Parameters
----------
Expand All @@ -734,12 +759,84 @@ def generate_missing_timestamps(gap: np.ndarray) -> np.ndarray:
full_timeline: numpy.ndarray
Completed timeline.
"""
# Generated timestamps should always be 0.5 seconds apart
difference_ns = 0.5 * 1e9
output: np.ndarray = np.arange(gap[0], gap[1], difference_ns)
difference_ns = int(0.5 * 1e9)
if len(gap) > 2:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning for this if statement? Add a comment plz

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if len(gap) > 2 is there to support both legacy (start, end) gaps, which keep the historical 0.5 s cadence, and newer (start, end, rate) gaps, which use the declared cadence.

I added this comment:

# Support both legacy (start, end) gaps, which use the historical 0.5 s cadence,
# and newer (start, end, rate) gaps, which use the declared cadence.

difference_ns = int(1e9 / int(gap[2]))
output: np.ndarray = np.arange(
int(np.rint(gap[0])),
int(np.rint(gap[1])),
difference_ns,
dtype=np.int64,
)
return output


def _is_expected_rate(timestamp_difference: float, vectors_per_second: int) -> bool:
"""
Determine whether a timestamp spacing matches an expected cadence.

Parameters
----------
timestamp_difference : float
The observed spacing between adjacent timestamps, in nanoseconds.
vectors_per_second : int
The expected number of vectors per second for the cadence being checked.

Returns
-------
bool
True when the observed spacing is within `GAP_TOLERANCE` of the expected
cadence.
"""
expected_gap = 1 / vectors_per_second * 1e9
return abs(timestamp_difference - expected_gap) <= expected_gap * GAP_TOLERANCE


def _find_rate_segments(
epoch_data: np.ndarray, vecsec_dict: dict[int, int]
) -> list[tuple[int, int]]:
"""
Build contiguous rate segments using observed cadence near each transition.

Walk each configured transition backward while the observed cadence already matches
the new rate so gaps stay attached to the correct segment instead of producing
spurious single-sample micro-gaps at delayed Config boundaries.

Parameters
----------
epoch_data : numpy.ndarray
The sorted epoch timestamps for the current timeline, in nanoseconds.
vecsec_dict : dict[int, int]
Mapping of transition start time to expected vectors-per-second rate.

Returns
-------
list[tuple[int, int]]
Pairs of `(start_index, vectors_per_second)` describing contiguous rate
segments in `epoch_data`.
"""
if epoch_data.shape[0] == 0:
return []

segments: list[tuple[int, int]] = []
for start_time, vectors_per_second in sorted(vecsec_dict.items()):
start_index = int(np.searchsorted(epoch_data, start_time, side="left"))
start_index = min(start_index, epoch_data.shape[0] - 1)
lower_bound = segments[-1][0] if segments else 0

while start_index > lower_bound and _is_expected_rate(
epoch_data[start_index] - epoch_data[start_index - 1], vectors_per_second
):
start_index -= 1

if segments and start_index == segments[-1][0]:
segments[-1] = (start_index, vectors_per_second)
else:
segments.append((start_index, vectors_per_second))

return segments


def vectors_per_second_from_string(vecsec_string: str) -> dict:
"""
Extract the vectors per second from a string into a dictionary.
Expand Down
Loading
Loading