|
17 | 17 | from collections import OrderedDict, defaultdict |
18 | 18 | import logging |
19 | 19 | from pathlib import Path |
20 | | - |
| 20 | +import warnings |
21 | 21 | import numpy as np |
22 | 22 |
|
23 | 23 |
|
@@ -81,7 +81,9 @@ def load_xdf( |
81 | 81 | clock_reset_threshold_offset_seconds=1, |
82 | 82 | clock_reset_threshold_offset_stds=10, |
83 | 83 | winsor_threshold=0.0001, |
84 | | - verbose=None |
| 84 | + verbose=None, |
| 85 | + sync_timestamps=False, |
| 86 | + overlap_timestamps=False, |
85 | 87 | ): |
86 | 88 | """Import an XDF file. |
87 | 89 |
|
@@ -122,6 +124,27 @@ def load_xdf( |
122 | 124 | dejitter_timestamps : Whether to perform jitter removal for regularly |
123 | 125 | sampled streams. (default: true) |
124 | 126 |
|
| 127 | + sync_timestamps: {bool str} |
| 128 | + sync timestamps of all streams sample-wise with the stream to the |
| 129 | + highest effective sampling rate. Using sync_timestamps with any |
| 130 | + method other than linear has dependency on scipy, which is not a |
| 131 | + hard requirement of pyxdf. If scipy is not installed in your |
| 132 | + environment, the method supports linear interpolation with |
| 133 | + numpy. |
| 134 | +
|
| 135 | + False -> no syncing |
| 136 | + True -> linear syncing |
| 137 | + str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, |
| 138 | + ‘previous’, ‘next’> for method inherited from |
| 139 | + scipy.interpolate.interp1d. |
| 140 | +
|
| 141 | + overlap_timestamps: bool |
| 142 | + If true, return only overlapping streams, i.e. all streams |
| 143 | + are limited to periods where all streams have data. (default: True) |
| 144 | + If false, depends on whether sync_timestamps is set. If set, it |
| 145 | + expands all streams to include the earliest and latest timestamp of |
| 146 | + any stream, if not set it simply return streams independently. |
| 147 | + ` |
125 | 148 | on_chunk : Function that is called for each chunk of data as it is |
126 | 149 | being retrieved from the file; the function is allowed to modify |
127 | 150 | the data (for example, sub-sample it). The four input arguments |
@@ -402,6 +425,17 @@ def load_xdf( |
402 | 425 | stream["time_series"] = tmp.time_series |
403 | 426 | stream["time_stamps"] = tmp.time_stamps |
404 | 427 |
|
| 428 | + # sync sampling with the fastest timeseries by interpolation / shifting |
| 429 | + if sync_timestamps: |
| 430 | + if type(sync_timestamps) is not str: |
| 431 | + sync_timestamps = "linear" |
| 432 | + logger.warning('sync_timestamps defaults to "linear"') |
| 433 | + streams = _sync_timestamps(streams, kind=sync_timestamps) |
| 434 | + |
| 435 | + # limit streams to their overlapping periods |
| 436 | + if overlap_timestamps: |
| 437 | + streams = _limit_streams_to_overlap(streams) |
| 438 | + |
405 | 439 | streams = [s for s in streams.values()] |
406 | 440 | return streams, fileheader |
407 | 441 |
|
@@ -501,7 +535,7 @@ def _xml2dict(t): |
501 | 535 |
|
502 | 536 | def _scan_forward(f): |
503 | 537 | """Scan forward through file object until after the next boundary chunk.""" |
504 | | - blocklen = 2 ** 20 |
| 538 | + blocklen = 2**20 |
505 | 539 | signature = bytes( |
506 | 540 | [ |
507 | 541 | 0x43, |
@@ -859,3 +893,194 @@ def _read_chunks(f): |
859 | 893 | def _parse_streamheader(xml): |
860 | 894 | """Parse stream header XML.""" |
861 | 895 | return {el.tag: el.text for el in xml if el.tag != "desc"} |
| 896 | + |
| 897 | + |
| 898 | +def _interpolate( |
| 899 | + x: np.ndarray, y: np.ndarray, new_x: np.ndarray, kind="linear" |
| 900 | +) -> np.ndarray: |
| 901 | + """Perform interpolation for _sync_timestamps |
| 902 | +
|
| 903 | + If scipy is not installed, the method falls back to numpy, and then only |
| 904 | + supports linear interpolation. |
| 905 | + """ |
| 906 | + try: |
| 907 | + from scipy.interpolate import interp1d |
| 908 | + |
| 909 | + f = interp1d( |
| 910 | + x, |
| 911 | + y, |
| 912 | + kind=kind, |
| 913 | + axis=0, |
| 914 | + assume_sorted=True, # speed up |
| 915 | + bounds_error=False, |
| 916 | + ) |
| 917 | + return f(new_x) |
| 918 | + except ImportError as e: |
| 919 | + if kind != "linear": |
| 920 | + raise e |
| 921 | + else: |
| 922 | + return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) |
| 923 | + |
| 924 | + |
| 925 | +def _sync_timestamps(streams, kind="linear"): |
| 926 | + """Sync all streams to the fastest sampling rate by shifting or upsampling. |
| 927 | +
|
| 928 | + Depending on a streams channel-format, extrapolation is performed using |
| 929 | + with NaNs (numerical formats) or with [''] (string format). |
| 930 | +
|
| 931 | + Interpolation is only performed for numeric values, and depending on the |
| 932 | + kind argument which is inherited from scipy.interpolate.interp1d. Consider |
| 933 | + that If the channel format is an integer type (i.e. 'int8', 'int16', |
| 934 | + 'int32', or 'int64'), integer output is enforced by rounding the values. |
| 935 | + Additionally, consider that not all interpolation methods are convex, i.e. |
| 936 | + for some kinds, you might receive values above or below the desired |
| 937 | + integer type. There is no correction implemented for this, as it is assumed |
| 938 | + this is a desired behavior if you give the appropriate argument. |
| 939 | +
|
| 940 | + For string formats, events are shifted towards the nearest feasible |
| 941 | + timepoint. Any time-stamps without a marker get then assigned an empty |
| 942 | + marker, i.e. ['']. |
| 943 | + """ |
| 944 | + # selecting the stream with the highest effective sampling rate |
| 945 | + srate_key = "effective_srate" |
| 946 | + srates = [stream["info"][srate_key] for stream in streams.values()] |
| 947 | + max_fs = max(srates, default=0) |
| 948 | + |
| 949 | + if max_fs == 0: # either no valid stream or all streams are async |
| 950 | + return streams |
| 951 | + if srates.count(max_fs) > 1: |
| 952 | + # highly unlikely, with floating point precision and sampling noise |
| 953 | + # but anyways: better safe than sorry |
| 954 | + logger.warning( |
| 955 | + "I found at least two streams with identical effective " |
| 956 | + "srate. I select one at random for syncing timestamps." |
| 957 | + ) |
| 958 | + |
| 959 | + # selecting maximal time range of the whole recording |
| 960 | + # streams with fs=0 might are not dejittered be default, and therefore |
| 961 | + # indexing first and last might miss the earliest/latest |
| 962 | + # we therefore take the min and max timestamp |
| 963 | + stamps = [stream["time_stamps"] for stream in streams.values()] |
| 964 | + ts_first = min((min(s) for s in stamps)) |
| 965 | + ts_last = max((max(s) for s in stamps)) |
| 966 | + |
| 967 | + # generate new timestamps |
| 968 | + # based on extrapolation of the fastest timestamps towards the maximal |
| 969 | + # time range of the whole recording |
| 970 | + fs_step = 1.0 / max_fs |
| 971 | + new_timestamps = stamps[srates.index(max_fs)] |
| 972 | + num_steps = int((new_timestamps[0] - ts_first) / fs_step) + 1 |
| 973 | + front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps) |
| 974 | + num_steps = int((ts_last - new_timestamps[-1]) / fs_step) + 1 |
| 975 | + end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps) |
| 976 | + |
| 977 | + new_timestamps = np.concatenate( |
| 978 | + (front_stamps, new_timestamps[1:-1], end_stamps), axis=0 |
| 979 | + ) |
| 980 | + |
| 981 | + # interpolate or shift all streams to the new timestamps |
| 982 | + for stream in streams.values(): |
| 983 | + channel_format = stream["info"]["channel_format"][0] |
| 984 | + |
| 985 | + if (channel_format == "string") and (stream["info"][srate_key] == 0): |
| 986 | + # you can't really interpolate strings; and streams with srate=0 |
| 987 | + # don't have a real sampling rate. One approach to sync them is to |
| 988 | + # shift their events to the nearest timestamp of the new |
| 989 | + # timestamps |
| 990 | + shifted_x = [] |
| 991 | + for x in stream["time_stamps"]: |
| 992 | + argmin = (abs(new_timestamps - x)).argmin() |
| 993 | + shifted_x.append(new_timestamps[argmin]) |
| 994 | + |
| 995 | + shifted_y = [] |
| 996 | + for x in new_timestamps: |
| 997 | + try: |
| 998 | + idx = shifted_x.index(x) |
| 999 | + y = stream["time_series"][idx] |
| 1000 | + shifted_y.append([y]) |
| 1001 | + except ValueError: |
| 1002 | + shifted_y.append([""]) |
| 1003 | + |
| 1004 | + stream["time_series"] = np.asanyarray((shifted_y)) |
| 1005 | + stream["time_stamps"] = new_timestamps |
| 1006 | + |
| 1007 | + elif channel_format in [ |
| 1008 | + "float32", |
| 1009 | + "double64", |
| 1010 | + "int8", |
| 1011 | + "int16", |
| 1012 | + "int32", |
| 1013 | + "int64", |
| 1014 | + ]: |
| 1015 | + # continuous interpolation is possible using interp1d |
| 1016 | + # discrete interpolation requires some finetuning |
| 1017 | + # bounds_error=False replaces everything outside of interpolation |
| 1018 | + # zone with NaNs |
| 1019 | + y = stream["time_series"] |
| 1020 | + x = stream["time_stamps"] |
| 1021 | + |
| 1022 | + stream["time_series"] = _interpolate(x, y, new_timestamps, kind) |
| 1023 | + stream["time_stamps"] = new_timestamps |
| 1024 | + |
| 1025 | + if channel_format in ["int8", "int16", "int32", "int64"]: |
| 1026 | + # i am stuck with float64s, as integers have no nans |
| 1027 | + # therefore i round to the nearest integer instead |
| 1028 | + stream["time_series"] = np.around(stream["time_series"], 0) |
| 1029 | + elif (channel_format == "string") and (stream["info"][srate_key] != 0): |
| 1030 | + warnings.warn( |
| 1031 | + "Can't interpolate a channel with channel format string and an effective sampling rate != 0" |
| 1032 | + ) |
| 1033 | + else: |
| 1034 | + raise NotImplementedError( |
| 1035 | + "Don't know how to sync sampling for " |
| 1036 | + "channel_format=" |
| 1037 | + "{}".format(channel_format) |
| 1038 | + ) |
| 1039 | + stream["info"]["effective_srate"] = max_fs |
| 1040 | + |
| 1041 | + return streams |
| 1042 | + |
| 1043 | + |
| 1044 | +def _limit_streams_to_overlap(streams): |
| 1045 | + """takes streams, returns streams limited to time periods overlapping |
| 1046 | + between all streams |
| 1047 | +
|
| 1048 | + The overlapping periods start and end for each streams with the first and |
| 1049 | + last sample completely within the overlapping period. |
| 1050 | + If time_stamps have been synced, these are the same time-points for all |
| 1051 | + streams. Consider that in the case of unsynced time-stamps, the time-stamps |
| 1052 | + can not be exactly equal! |
| 1053 | + """ |
| 1054 | + ts_first, ts_last = [], [] |
| 1055 | + for stream in streams.values(): |
| 1056 | + # skip streams with fs=0 or if they send strings, because they might |
| 1057 | + # just not yet have send anything on purpose (i.e. markers) |
| 1058 | + # while other data was already being recorded. |
| 1059 | + if ( |
| 1060 | + stream["info"]["effective_srate"] != 0 |
| 1061 | + and stream["info"]["channel_format"][0] != "string" |
| 1062 | + ): |
| 1063 | + # extrapolation in _sync_timestamps is done with NaNs |
| 1064 | + not_extrapolated = np.where(~np.isnan(stream["time_series"]))[0] |
| 1065 | + ts_first.append(min(stream["time_stamps"][not_extrapolated])) |
| 1066 | + ts_last.append(max(stream["time_stamps"][not_extrapolated])) |
| 1067 | + |
| 1068 | + ts_first = max(ts_first) |
| 1069 | + ts_last = min(ts_last) |
| 1070 | + for stream in streams.values(): |
| 1071 | + # use np.around to prevent floating point hickups |
| 1072 | + around = np.around(stream["time_stamps"], 15) |
| 1073 | + a = np.where(around >= ts_first)[0] |
| 1074 | + b = np.where(around <= ts_last)[0] |
| 1075 | + select = np.intersect1d(a, b) |
| 1076 | + if type(stream["time_stamps"]) is list: |
| 1077 | + stream["time_stamps"] = [stream["time_stamps"][s] for s in select] |
| 1078 | + else: |
| 1079 | + stream["time_stamps"] = stream["time_stamps"][select] |
| 1080 | + |
| 1081 | + if type(stream["time_series"]) is list: |
| 1082 | + stream["time_series"] = [stream["time_series"][s] for s in select] |
| 1083 | + else: |
| 1084 | + stream["time_series"] = stream["time_series"][select] |
| 1085 | + |
| 1086 | + return streams |
0 commit comments