11# mypy: ignore-errors
22"""Module containing interpolation methods for MAG L1C."""
33
4+ import logging
45from enum import Enum
6+ from typing import Optional
57
68import numpy as np
79from scipy .interpolate import make_interp_spline
10+ from scipy .signal import lfilter
11+
12+ from imap_processing .mag .constants import POSSIBLE_RATES , VecSec
13+
14+ logger = logging .getLogger (__name__ )
815
916
1017def linear (
1118 input_vectors : np .ndarray ,
1219 input_timestamps : np .ndarray ,
1320 output_timestamps : np .ndarray ,
21+ input_rate : Optional [VecSec ] = None ,
22+ output_rate : Optional [VecSec ] = None ,
1423) -> np .ndarray :
1524 """
1625 Linear interpolation of input vectors to output timestamps.
@@ -25,6 +34,10 @@ def linear(
2534 vectors.
2635 output_timestamps : numpy.ndarray
2736 Output timestamps of shape (m,) to generate interpolated vectors for.
37+ input_rate : VecSec, optional
38+ Not required for this interpolation method.
39+ output_rate : VecSec, optional
40+ Not required for this interpolation method.
2841
2942 Returns
3043 -------
@@ -40,6 +53,8 @@ def quadratic(
4053 input_vectors : np .ndarray ,
4154 input_timestamps : np .ndarray ,
4255 output_timestamps : np .ndarray ,
56+ input_rate : Optional [VecSec ] = None ,
57+ output_rate : Optional [VecSec ] = None ,
4358) -> np .ndarray :
4459 """
4560 Quadratic interpolation of input vectors to output timestamps.
@@ -54,6 +69,10 @@ def quadratic(
5469 vectors.
5570 output_timestamps : numpy.ndarray
5671 Output timestamps of shape (m,) to generate interpolated vectors for.
72+ input_rate : VecSec, optional
73+ Not required for this interpolation method.
74+ output_rate : VecSec, optional
75+ Not required for this interpolation method.
5776
5877 Returns
5978 -------
@@ -69,6 +88,8 @@ def cubic(
6988 input_vectors : np .ndarray ,
7089 input_timestamps : np .ndarray ,
7190 output_timestamps : np .ndarray ,
91+ input_rate : Optional [VecSec ] = None ,
92+ output_rate : Optional [VecSec ] = None ,
7293) -> np .ndarray :
7394 """
7495 Cubic interpolation of input vectors to output timestamps.
@@ -83,6 +104,10 @@ def cubic(
83104 vectors.
84105 output_timestamps : numpy.ndarray
85106 Output timestamps of shape (m,) to generate interpolated vectors for.
107+ input_rate : VecSec, optional
108+ Not required for this interpolation method.
109+ output_rate : VecSec, optional
110+ Not required for this interpolation method.
86111
87112 Returns
88113 -------
@@ -94,10 +119,105 @@ def cubic(
94119 return spline (output_timestamps )
95120
96121
122+ def estimate_rate (timestamps : np .ndarray ) -> VecSec :
123+ """
124+ Given a set of timestamps, estimate the rate of the timestamps.
125+
126+ This rate will be one of the defined rates in the VecSec enum. The calculation
127+ assumes there are no significant gaps in the timestamps.
128+
129+ Parameters
130+ ----------
131+ timestamps : numpy.ndarray
132+ 1D array of timestamps to estimate the rate of.
133+
134+ Returns
135+ -------
136+ VecSec
137+ Estimated rate of the timestamps.
138+ """
139+ samples_per_second = timestamps .shape [0 ] / (timestamps [- 1 ] - timestamps [0 ]) * 1e9
140+ per_second = VecSec (
141+ POSSIBLE_RATES [(np .abs (POSSIBLE_RATES - samples_per_second )).argmin ()]
142+ )
143+
144+ return per_second
145+
146+
147+ def cic_filter (
148+ input_vectors : np .ndarray ,
149+ input_timestamps : np .ndarray ,
150+ output_timestamps : np .ndarray ,
151+ input_rate : Optional [VecSec ],
152+ output_rate : Optional [VecSec ],
153+ ):
154+ """
155+ Apply CIC filter to data before interpolating.
156+
157+ The filtering uses a Cascaded integrator-comb (CIC) filter which is used in FSW to
158+ filter down the raw data to telemetered data.
159+
160+ This assumes that the input_vectors and input_timestamps are downsampled to
161+ the output_timestamps rate. Neither input_timestamps nor output_timestamps should
162+ have significant gaps.
163+
164+ After the CIC filter is applied, the timestamps at the beginning and end of the
165+ output are invalid. Therefore, we must pass in extra values and remove them after
166+ applying the filter. This needs about double the input values to work.
167+
168+ Parameters
169+ ----------
170+ input_vectors : numpy.ndarray
171+ Input vectors of shape (n, 3) where n is equal to the number of input
172+ timestamps. Contains x, y, z components of the vector.
173+ input_timestamps : numpy.ndarray
174+ Input timestamps of shape (n,) which correspond to the timestamps of the input
175+ vectors.
176+ output_timestamps : numpy.ndarray
177+ Output timestamps of shape (m,) to generate interpolated vectors for.
178+ input_rate : VecSec, optional
179+ Expected rate of input timestamps.
180+ output_rate : VecSec, optional
181+ Expected rate of output timestamps.
182+
183+ Returns
184+ -------
185+ input_filtered, vectors_filtered : tuple[numpy.ndarray]
186+ Filtered input timestamps and filtered input vectors.
187+ """
188+ # output rate should always be higher
189+ input_rate = estimate_rate (input_timestamps ) if input_rate is None else input_rate
190+ output_rate = (
191+ estimate_rate (output_timestamps ) if output_rate is None else output_rate
192+ )
193+
194+ if input_rate .value <= output_rate .value :
195+ raise ValueError (
196+ f"Burst mode input rate { input_rate } should never be less than "
197+ f"the normal mode output rate { output_rate } . "
198+ f"Both rates are required"
199+ )
200+
201+ decimation_factor = int (input_rate .value / output_rate .value )
202+ cic1 = np .ones (decimation_factor )
203+ cic1 = cic1 / decimation_factor
204+ cic2 = np .convolve (cic1 , cic1 )
205+ delay = (len (cic2 ) - 1 ) // 2
206+ input_filtered = input_timestamps
207+ if delay != 0 :
208+ input_filtered = input_timestamps [:- delay ]
209+
210+ vectors_filtered = lfilter (cic2 , 1 , input_vectors , axis = 0 )[delay :]
211+
212+ return input_filtered , vectors_filtered
213+
214+
97215def linear_filtered (
98216 input_vectors : np .ndarray ,
99217 input_timestamps : np .ndarray ,
100218 output_timestamps : np .ndarray ,
219+ input_rate : Optional [VecSec ] = None ,
220+ output_rate : Optional [VecSec ] = None ,
101221) -> np .ndarray :
102222 """
103223 Linear filtered interpolation of input vectors to output timestamps.
@@ -112,20 +232,31 @@ def linear_filtered(
112232 vectors.
113233 output_timestamps : numpy.ndarray
114234 Output timestamps of shape (m,) to generate interpolated vectors for.
235+ input_rate : VecSec, optional
236+ Expected rate of input timestamps to be passed into the CIC filter. If not
237+ provided, this will be estimated.
238+ output_rate : VecSec, optional
239+ Expected rate of output timestamps to be passed into the CIC filter. If not
240+ provided, this will be estimated.
115241
116242 Returns
117243 -------
118244 numpy.ndarray
119245 Interpolated vectors of shape (m, 3) where m is equal to the number of output
120246 timestamps. Contains x, y, z components of the vector.
121247 """
122- pass
248+ input_filtered , vectors_filtered = cic_filter (
249+ input_vectors , input_timestamps , output_timestamps , input_rate , output_rate
250+ )
251+ return linear (vectors_filtered , input_filtered , output_timestamps )
123252
124253
125254def quadratic_filtered (
126255 input_vectors : np .ndarray ,
127256 input_timestamps : np .ndarray ,
128257 output_timestamps : np .ndarray ,
258+ input_rate : Optional [VecSec ] = None ,
259+ output_rate : Optional [VecSec ] = None ,
129260) -> np .ndarray :
130261 """
131262 Quadratic filtered interpolation of input vectors to output timestamps.
@@ -140,20 +271,31 @@ def quadratic_filtered(
140271 vectors.
141272 output_timestamps : numpy.ndarray
142273 Output timestamps of shape (m,) to generate interpolated vectors for.
274+ input_rate : VecSec, optional
275+ Expected rate of input timestamps to be passed into the CIC filter. If not
276+ provided, this will be estimated.
277+ output_rate : VecSec, optional
278+ Expected rate of output timestamps to be passed into the CIC filter. If not
279+ provided, this will be estimated.
143280
144281 Returns
145282 -------
146283 numpy.ndarray
147284 Interpolated vectors of shape (m, 3) where m is equal to the number of output
148285 timestamps. Contains x, y, z components of the vector.
149286 """
150- pass
287+ input_filtered , vectors_filtered = cic_filter (
288+ input_vectors , input_timestamps , output_timestamps , input_rate , output_rate
289+ )
290+ return quadratic (vectors_filtered , input_filtered , output_timestamps )
151291
152292
153293def cubic_filtered (
154294 input_vectors : np .ndarray ,
155295 input_timestamps : np .ndarray ,
156296 output_timestamps : np .ndarray ,
297+ input_rate : Optional [VecSec ] = None ,
298+ output_rate : Optional [VecSec ] = None ,
157299) -> np .ndarray :
158300 """
159301 Cubic filtered interpolation of input vectors to output timestamps.
@@ -168,14 +310,23 @@ def cubic_filtered(
168310 vectors.
169311 output_timestamps : numpy.ndarray
170312 Output timestamps of shape (m,) to generate interpolated vectors for.
313+ input_rate : VecSec, optional
314+ Expected rate of input timestamps to be passed into the CIC filter. If not
315+ provided, this will be estimated.
316+ output_rate : VecSec, optional
317+ Expected rate of output timestamps to be passed into the CIC filter. If not
318+ provided, this will be estimated.
171319
172320 Returns
173321 -------
174322 numpy.ndarray
175323 Interpolated vectors of shape (m, 3) where m is equal to the number of output
176324 timestamps. Contains x, y, z components of the vector.
177325 """
178- pass
326+ input_filtered , vectors_filtered = cic_filter (
327+ input_vectors , input_timestamps , output_timestamps , input_rate , output_rate
328+ )
329+ return cubic (vectors_filtered , input_filtered , output_timestamps )
179330
180331
181332class InterpolationFunction (Enum ):
0 commit comments