-
Notifications
You must be signed in to change notification settings - Fork 271
Expand file tree
/
Copy pathneuralynxrawio.py
More file actions
1163 lines (969 loc) · 50 KB
/
neuralynxrawio.py
File metadata and controls
1163 lines (969 loc) · 50 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Class for reading data from Neuralynx files.
This IO supports NCS, NEV, NSE and NTT file formats.
NCS contains the sampled signal for one channel
NEV contains events
NSE contains spikes and waveforms for mono electrodes
NTT contains spikes and waveforms for tetrodes
All Neuralynx files contain a 16 kilobyte text header followed by 0 or more fixed length records.
The format of the header has never been formally specified, however, the Neuralynx programs which
write them have followed a set of conventions which have varied over the years. Additionally,
other programs like Pegasus write files with somewhat varying headers. This variation requires
parsing to determine the exact version and type which is handled within this RawIO by the
NlxHeader class.
Ncs files contain a series of 1044 byte records, each of which contains 512 16 byte samples and
header information which includes a 64 bit timestamp in microseconds, a 16 bit channel number,
the sampling frequency in integral Hz, and the number of the 512 samples which are considered
valid samples (the remaining samples within the record are invalid). The Ncs file header usually
contains a specification of the sampling frequency, which may be rounded to an integral number
of Hz or may be fractional. The actual sampling frequency in terms of the underlying clock is
physically determined by the spacing of the timestamps between records.
These variations of header format and possible differences between the stated sampling frequency
and actual sampling frequency can create apparent time discrepancies in .Ncs files. Additionally,
the Neuralynx recording software can start and stop recording while continuing to write samples
to a single .Ncs file, which creates larger gaps in the time sequences of the samples.
This RawIO attempts to correct for these deviations where possible and present a single section of
contiguous samples with one sampling frequency, t_start, and length for each .Ncs file. These
sections are determined by the NcsSectionsFactory class. In the
event the gaps are larger, this RawIO only provides the samples from the first section as belonging
to one Segment.
If .Ncs files are loaded these determine the Segments of data to be loaded. Events and spiking data
outside of Segments defined by .Ncs files will be ignored. To access all time point data in a
single Segment load a session excluding .Ncs files.
Continuous data streams are ordered by descending sampling rate.
This RawIO presents only a single Block.
Author: Julia Sprenger, Carlos Canova, Samuel Garcia, Peter N. Steinmetz.
"""
from ..baserawio import (
BaseRawIO,
_signal_channel_dtype,
_signal_stream_dtype,
_signal_buffer_dtype,
_spike_channel_dtype,
_event_channel_dtype,
)
import numpy as np
import os
from pathlib import Path
import copy
import warnings
from collections import namedtuple, OrderedDict
from neo.rawio.neuralynxrawio.ncssections import NcsSection, NcsSectionsFactory
from neo.rawio.neuralynxrawio.nlxheader import NlxHeader
# Named tuple for stream identification keys
StreamKey = namedtuple("StreamKey", ["sampling_rate", "input_range", "filter_params"])
class NeuralynxRawIO(BaseRawIO):
"""
Class for reading datasets recorded by Neuralynx.
This version works with rawmode of one-dir for a single directory of files or one-file for
a single file.
Parameters
----------
dirname: str, default: ''
Name of directory containing all files for a dataset. If provided, filename is
ignored. But one of either dirname or filename is required.
include_filenames: str | list | None, default: None
Name of a single ncs, nse, nev or ntt file or list of such files. Will only include
file names in the list. This can be plain filenames or fullpath or path relative to
dirname.
All files should be in a single path.
exclude_filenames: str | list | None, default: None
Name of a single ncs, nse, nev or ntt file or list of such files. Expects plain
filenames (without directory path).
None will search for all file types
keep_original_times: bool, default: False
If True, keep original start time as in files,
Otherwise set 0 of time to first time in dataset
gap_tolerance_ms : float | None, default: None
Controls how timestamp gaps in NCS files are handled.
If None (default), a ValueError is raised when gaps are detected, with a
detailed gap report showing the number, size, and location of each gap.
If a float value is provided, gaps smaller than this threshold (in milliseconds)
are ignored, and gaps larger than this threshold create new segments.
Use gap_tolerance_ms=0.0 to segment on all detected gaps.
strict_gap_mode : bool | None, default: None
.. deprecated::
Use ``gap_tolerance_ms`` instead. Will be removed in version 0.16.
If explicitly set, uses legacy gap detection behavior:
strict_gap_mode=True uses tight tolerance (0.2 sample intervals),
strict_gap_mode=False uses loose tolerance (quarter of 512-sample packet).
Notes
-----
* This IO supports NCS, NEV, NSE and NTT file formats (not NVT or NRD yet)
* These variations of header format and possible differences between the stated sampling frequency
and actual sampling frequency can create apparent time discrepancies in .Ncs files. Additionally,
the Neuralynx recording software can start and stop recording while continuing to write samples
to a single .Ncs file, which creates larger gaps in the time sequences of the samples.
* This RawIO attempts to correct for these deviations where possible and present a single section of
contiguous samples with one sampling frequency, t_start, and length for each .Ncs file. These
sections are determined by the NcsSectionsFactory class. In the
event the gaps are larger, this RawIO only provides the samples from the first section as belonging
to one Segment.
Examples
--------
>>> import neo.rawio
>>> reader = neo.rawio.NeuralynxRawIO(dirname='Cheetah_v5.5.1/original_data')
>>> reader.parse_header()
Inspect all files in the directory.
>>> print(reader)
Display all information about signal channels, units, segment size....
"""
extensions = ["nse", "ncs", "nev", "ntt", "nvt", "nrd"] # nvt and nrd are not yet supported
rawmode = "one-dir"
_ncs_dtype = [
("timestamp", "uint64"),
("channel_id", "uint32"),
("sample_rate", "uint32"),
("nb_valid", "uint32"),
("samples", "int16", NcsSection._RECORD_SIZE),
]
# Filter parameter keys used for stream differentiation
_filter_keys = [
"DSPLowCutFilterEnabled",
"DspLowCutFrequency",
"DspLowCutFilterType",
"DspLowCutNumTaps",
"DSPHighCutFilterEnabled",
"DspHighCutFrequency",
"DspHighCutFilterType",
"DspHighCutNumTaps",
"DspDelayCompensation",
"DspFilterDelay_µs",
]
def __init__(
self,
dirname="",
include_filenames=None,
exclude_filenames=None,
keep_original_times=False,
gap_tolerance_ms=None,
strict_gap_mode=None,
filename=None,
exclude_filename=None,
**kargs,
):
if not dirname:
raise ValueError("`dirname` cannot be empty.")
if filename is not None:
include_filenames = [filename]
warnings.warn(
"`filename` is deprecated and will be removed in version 1.0. Please use `include_filenames` instead"
)
if exclude_filename is not None:
if isinstance(exclude_filename, str):
exclude_filenames = [exclude_filename]
else:
exclude_filenames = exclude_filename
warnings.warn(
"`exclude_filename` is deprecated and will be removed in version 1.0. Please use `exclude_filenames` instead"
)
if include_filenames is None:
include_filenames = []
elif isinstance(include_filenames, str):
include_filenames = [include_filenames]
if exclude_filenames is None:
exclude_filenames = []
elif isinstance(exclude_filenames, str):
exclude_filenames = [exclude_filenames]
if include_filenames:
self.rawmode = "multiple-files"
else:
self.rawmode = "one-dir"
# Handle gap_tolerance_ms and deprecated strict_gap_mode
if strict_gap_mode is not None:
warnings.warn(
"`strict_gap_mode` is deprecated and will be removed in version 0.16. "
"Use `gap_tolerance_ms` instead to control gap handling. "
"See issue #1773 for details.",
DeprecationWarning,
stacklevel=2,
)
if gap_tolerance_ms is not None:
warnings.warn(
"Both `gap_tolerance_ms` and `strict_gap_mode` were provided. "
"`gap_tolerance_ms` takes precedence.",
UserWarning,
stacklevel=2,
)
self._use_legacy_gap_mode = gap_tolerance_ms is None
else:
self._use_legacy_gap_mode = False
self.dirname = dirname
self.include_filenames = include_filenames
self.exclude_filenames = exclude_filenames
self.keep_original_times = keep_original_times
self.gap_tolerance_ms = gap_tolerance_ms
self.strict_gap_mode = strict_gap_mode if strict_gap_mode is not None else True
BaseRawIO.__init__(self, **kargs)
def _source_name(self):
return self.dirname
def _parse_header(self):
stream_channels = []
signal_channels = []
spike_channels = []
event_channels = []
self.ncs_filenames = OrderedDict() # (chan_name, chan_id): filename
self.nse_ntt_filenames = OrderedDict() # (chan_name, chan_id): filename
self.nev_filenames = OrderedDict() # chan_id: filename
self.file_headers = OrderedDict() # filename: file header dict
self._nev_memmap = {}
self._spike_memmap = {}
self.internal_unit_ids = [] # channel_index > ((channel_name, channel_id), unit_id)
self.internal_event_ids = []
self._empty_ncs = [] # this list contains filenames of empty files
self._empty_nev = []
self._empty_nse_ntt = []
# Explore the directory looking for ncs, nev, nse and ntt
# and construct channels headers.
signal_annotations = []
unit_annotations = []
event_annotations = []
# 1) Get file paths based on mode and validate existence for multiple-files mode
if self.rawmode == "multiple-files":
# For multiple-files mode, validate that all explicitly provided files exist
file_paths = []
for filename in self.include_filenames:
full_path = Path(self.dirname) / filename
if not full_path.is_file():
raise ValueError(
f"Provided Filename is not a file: "
f"{full_path}. If you want to provide a "
f"directory use the `dirname` keyword"
)
file_paths.append(full_path)
else: # one-dir mode
# For one-dir mode, get all files from directory
dir_path = Path(self.dirname)
file_paths = [p for p in dir_path.iterdir() if p.is_file()]
file_paths = sorted(file_paths, key=lambda p: p.name)
# 2) Filter by exclude filenames
file_paths = [fp for fp in file_paths if fp.name not in self.exclude_filenames]
# 3) Filter to keep only files with correct extensions
# Note: suffix[1:] removes the leading dot from file extension (e.g., ".ncs" -> "ncs")
valid_file_paths = [fp for fp in file_paths if fp.suffix[1:].lower() in self.extensions]
# Convert back to strings for backwards compatibility with existing code
full_filenames = [str(fp) for fp in valid_file_paths]
stream_props = {} # {(sampling_rate, n_samples, t_start): {stream_id: [filenames]}
for filename in full_filenames:
_, ext = os.path.splitext(filename)
ext = ext[1:].lower() # remove dot and make lower case
# Skip Ncs files with only header. Other empty file types
# will have an empty dataset constructed later.
if (os.path.getsize(filename) <= NlxHeader.HEADER_SIZE) and ext in ["ncs"]:
self._empty_ncs.append(filename)
continue
# All file have more or less the same header structure
info = NlxHeader(filename)
self.file_headers[filename] = info
chan_names = info["channel_names"]
chan_ids = info["channel_ids"]
for idx, chan_id in enumerate(chan_ids):
chan_name = chan_names[idx]
chan_uid = (chan_name, str(chan_id))
if ext == "ncs":
# Calculate gain for this channel
gain = info["bit_to_microVolt"][idx]
if info.get("input_inverted", False):
gain *= -1
# Build stream key from acquisition parameters
sampling_rate = float(info["sampling_rate"])
# Get InputRange for this specific channel
# Normalized by NlxHeader to always be a list
input_range = info.get("InputRange")
if isinstance(input_range, list):
input_range = input_range[idx] if idx < len(input_range) else input_range[0]
# Build filter parameters tuple
filter_params = []
for key in self._filter_keys:
if key in info:
filter_params.append((key, info[key]))
# Create stream key (channels with same key go in same stream)
stream_key = StreamKey(
sampling_rate=sampling_rate,
input_range=input_range,
filter_params=tuple(sorted(filter_params)),
)
if stream_key not in stream_props:
stream_props[stream_key] = {
"stream_id": len(stream_props),
"filenames": [filename],
"channels": set(),
}
else:
stream_props[stream_key]["filenames"].append(filename)
stream_id = stream_props[stream_key]["stream_id"]
stream_props[stream_key]["channels"].add((chan_name, str(chan_id)))
# @zach @ramon : we need to discuss this split by channel buffer
buffer_id = ""
# a sampled signal channel
units = "uV"
offset = 0.0
signal_channels.append(
(
chan_name,
str(chan_id),
info["sampling_rate"],
"int16",
units,
gain,
offset,
stream_id,
buffer_id,
)
)
self.ncs_filenames[chan_uid] = filename
keys = [
"DspFilterDelay_µs",
"recording_opened",
"FileType",
"DspDelayCompensation",
"recording_closed",
"DspLowCutFilterType",
"HardwareSubSystemName",
"DspLowCutNumTaps",
"DSPLowCutFilterEnabled",
"HardwareSubSystemType",
"DspHighCutNumTaps",
"ADMaxValue",
"DspLowCutFrequency",
"DSPHighCutFilterEnabled",
"RecordSize",
"InputRange",
"DspHighCutFrequency",
"input_inverted",
"NumADChannels",
"DspHighCutFilterType",
]
d = {k: info[k] for k in keys if k in info}
signal_annotations.append(d)
elif ext in ("nse", "ntt"):
# nse and ntt are pretty similar except for the waveform shape.
# A file can contain several unit_id (so several unit channel).
assert chan_id not in self.nse_ntt_filenames, "Several nse or ntt files have the same unit_id!!!"
self.nse_ntt_filenames[chan_uid] = filename
data = self._get_file_map(filename)
self._spike_memmap[chan_uid] = data
unit_ids = np.unique(data["unit_id"])
for unit_id in unit_ids:
# a spike channel for each (chan_id, unit_id)
self.internal_unit_ids.append((chan_uid, unit_id))
unit_name = "ch{}#{}#{}".format(chan_name, chan_id, unit_id)
unit_id = "{}".format(unit_id)
wf_units = "uV"
wf_gain = info["bit_to_microVolt"][idx]
if info.get("input_inverted", False):
wf_gain *= -1
wf_offset = 0.0
wf_left_sweep = -1 # NOT KNOWN
wf_sampling_rate = info["sampling_rate"]
spike_channels.append(
(
unit_name,
"{}".format(unit_id),
wf_units,
wf_gain,
wf_offset,
wf_left_sweep,
wf_sampling_rate,
)
)
unit_annotations.append(dict(file_origin=filename))
elif ext == "nev":
# an event channel
# each ('event_id', 'ttl_input') give a new event channel
self.nev_filenames[chan_id] = filename
if os.path.getsize(filename) <= NlxHeader.HEADER_SIZE:
self._empty_nev.append(filename)
data = np.zeros((0,), dtype=nev_dtype)
internal_ids = []
else:
data = self._get_file_map(filename)
if data.shape[0] == 0: # empty file
self._empty_nse_ntt.append(filename)
internal_ids = np.unique(data[["event_id", "ttl_input"]]).tolist()
for internal_event_id in internal_ids:
if internal_event_id not in self.internal_event_ids:
event_id, ttl_input = internal_event_id
name = "{} event_id={} ttl={}".format(chan_name, event_id, ttl_input)
event_channels.append((name, chan_id, "event"))
self.internal_event_ids.append(internal_event_id)
self._nev_memmap[chan_id] = data
signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)
spike_channels = np.array(spike_channels, dtype=_spike_channel_dtype)
event_channels = np.array(event_channels, dtype=_event_channel_dtype)
if signal_channels.size > 0:
# Build DSP filter configuration registry: filter_id -> filter parameters dict
# Extract unique filter configurations from stream keys
unique_filter_tuples = {stream_key.filter_params for stream_key in stream_props.keys()}
_dsp_filter_configurations = {i: dict(filt) for i, filt in enumerate(sorted(unique_filter_tuples))}
# Create reverse mapping for looking up filter IDs during stream name construction
seen_filters = {filt: i for i, filt in enumerate(sorted(unique_filter_tuples))}
# Store DSP filter configurations as private instance attribute
# Keeping private for now - may expose via annotations or public API in future
self._dsp_filter_configurations = _dsp_filter_configurations
# Order streams by sampling rate (high to low)
# This is to keep some semblance of stability
ordered_stream_keys = sorted(stream_props.keys(), reverse=True, key=lambda x: x.sampling_rate)
stream_names = []
stream_ids = []
buffer_ids = []
for stream_id, stream_key in enumerate(ordered_stream_keys):
# Format stream name using namedtuple fields
dsp_filter_id = seen_filters[stream_key.filter_params]
voltage_mv = int(stream_key.input_range / 1000) if stream_key.input_range is not None else 0
stream_name = (
f"stream{stream_id}_{int(stream_key.sampling_rate)}Hz_{voltage_mv}mVRange_DSPFilter{dsp_filter_id}"
)
stream_names.append(stream_name)
stream_ids.append(str(stream_id))
buffer_ids.append("")
signal_streams = list(zip(stream_names, stream_ids, buffer_ids))
else:
signal_streams = []
self._filter_configurations = {}
signal_buffers = np.array([], dtype=_signal_buffer_dtype)
signal_streams = np.array(signal_streams, dtype=_signal_stream_dtype)
# set 2 attributes needed later for header in case there are no ncs files in dataset,
# e.g. Pegasus
self._timestamp_limits = None
self._nb_segment = 1
stream_infos = {}
# Read ncs files of each stream for gap detection and nb_segment computation.
for stream_id in np.unique(signal_channels["stream_id"]):
stream_channels = signal_channels[signal_channels["stream_id"] == stream_id]
stream_chan_uids = zip(stream_channels["name"], stream_channels["id"])
stream_filenames = [self.ncs_filenames[chuid] for chuid in stream_chan_uids]
_sigs_memmaps, ncsSegTimestampLimits, section_structure = self.scan_stream_ncs_files(stream_filenames)
stream_infos[stream_id] = {
"segment_sig_memmaps": _sigs_memmaps,
"ncs_segment_infos": ncsSegTimestampLimits,
"section_structure": section_structure,
}
# check if section structure across streams is compatible and merge infos
ref_sec_structure = None
for stream_id, stream_info in stream_infos.items():
ref_stream_id = list(stream_infos.keys())[0]
ref_sec_structure = stream_infos[ref_stream_id]["section_structure"]
sec_structure = stream_info["section_structure"]
# check if section structure of streams are compatible
# using tolerance of one data packet (512 samples)
tolerance = 512 / min(ref_sec_structure.sampFreqUsed, sec_structure.sampFreqUsed) * 1e6
if not ref_sec_structure.is_equivalent(sec_structure, abs_tol=tolerance):
ref_chan_ids = signal_channels[signal_channels["stream_id"] == ref_stream_id]["name"]
chan_ids = signal_channels[signal_channels["stream_id"] == stream_id]["name"]
raise ValueError(
"Incompatible section structures across streams: "
f"Stream id {ref_stream_id}:{ref_chan_ids} and "
f"{stream_id}:{chan_ids}."
)
if ref_sec_structure is not None:
self._nb_segment = len(ref_sec_structure.sects)
else:
# Use only a single segment if no ncs data is present
self._nb_segment = 1
def min_max_tuple(tuple1, tuple2):
"""Merge tuple by selecting min for first and max for 2nd entry"""
mins, maxs = zip(tuple1, tuple2)
result = (min(m for m in mins if m is not None), max(m for m in maxs if m is not None))
return result
# merge stream mmemmaps since streams are compatible
self._sigs_memmaps = [{} for seg_idx in range(self._nb_segment)]
# time limits of integer timestamps in ncs files
self._timestamp_limits = [(None, None) for seg_idx in range(self._nb_segment)]
# time limits physical times in ncs files
self._signal_limits = [(None, None) for seg_idx in range(self._nb_segment)]
for stream_id, stream_info in stream_infos.items():
stream_mmaps = stream_info["segment_sig_memmaps"]
for seg_idx, signal_dict in enumerate(stream_mmaps):
self._sigs_memmaps[seg_idx].update(signal_dict)
ncs_segment_info = stream_info["ncs_segment_infos"]
for seg_idx, (t_start, t_stop) in enumerate(ncs_segment_info.timestamp_limits):
self._timestamp_limits[seg_idx] = min_max_tuple(self._timestamp_limits[seg_idx], (t_start, t_stop))
for seg_idx in range(ncs_segment_info.nb_segment):
t_start = ncs_segment_info.t_start[seg_idx]
t_stop = ncs_segment_info.t_stop[seg_idx]
self._signal_limits[seg_idx] = min_max_tuple(self._signal_limits[seg_idx], (t_start, t_stop))
# precompute signal lengths within segments
self._sigs_length = []
if self._sigs_memmaps:
for seg_idx, sig_container in enumerate(self._sigs_memmaps):
self._sigs_length.append({})
for chan_uid, sig_infos in sig_container.items():
self._sigs_length[seg_idx][chan_uid] = int(sig_infos["nb_valid"].sum())
# Determine timestamp limits in nev, nse, ntt files by scanning them.
ts0, ts1 = None, None
for _data_memmap in (self._spike_memmap, self._nev_memmap):
for _, data in _data_memmap.items():
ts = data["timestamp"]
if ts.size == 0:
continue
if ts0 is None:
ts0 = ts[0]
ts1 = ts[-1]
ts0 = min(ts0, ts[0])
ts1 = max(ts1, ts[-1])
# rescaling for comparison with signal times
if ts0 is not None:
timestamps_start, timestamps_stop = ts0 / 1e6, ts1 / 1e6
# decide on segment and global start and stop times based on files available
if self._timestamp_limits is None:
# case NO ncs but HAVE nev or nse -> single segment covering all spikes & events
self._timestamp_limits = [(ts0, ts1)]
self._seg_t_starts = [timestamps_start]
self._seg_t_stops = [timestamps_stop]
self.global_t_start = timestamps_start
self.global_t_stop = timestamps_stop
elif ts0 is not None:
# case HAVE ncs AND HAVE nev or nse -> multi segments based on ncs segmentation
# ignoring nev/nse/ntt time limits, loading only data within ncs segments
global_events_limits = (timestamps_start, timestamps_stop)
global_signal_limits = (self._signal_limits[0][0], self._signal_limits[-1][-1])
self.global_t_start, self.global_t_stop = min_max_tuple(global_events_limits, global_signal_limits)
self._seg_t_starts = [limits[0] for limits in self._signal_limits]
self._seg_t_stops = [limits[1] for limits in self._signal_limits]
self._seg_t_starts[0] = self.global_t_start
self._seg_t_stops[-1] = self.global_t_stop
else:
# case HAVE ncs but NO nev or nse ->
self._seg_t_starts = [limits[0] for limits in self._signal_limits]
self._seg_t_stops = [limits[1] for limits in self._signal_limits]
self.global_t_start = self._signal_limits[0][0]
self.global_t_stop = self._signal_limits[-1][-1]
if self.keep_original_times:
self.global_t_stop = self.global_t_stop - self.global_t_start
self.global_t_start = 0
# fill header dictionary
self.header = {}
self.header["nb_block"] = 1
self.header["nb_segment"] = [self._nb_segment]
self.header["signal_buffers"] = signal_buffers
self.header["signal_streams"] = signal_streams
self.header["signal_channels"] = signal_channels
self.header["spike_channels"] = spike_channels
self.header["event_channels"] = event_channels
# Annotations
self._generate_minimal_annotations()
bl_annotations = self.raw_annotations["blocks"][0]
for seg_index in range(self._nb_segment):
seg_annotations = bl_annotations["segments"][seg_index]
for stream_id in range(signal_streams.size):
# one or no signal stream
stream_ann = seg_annotations["signals"][stream_id]
# handle array annotations
for key in signal_annotations[0].keys():
values = []
# only collect values from channels belonging to current stream
for d in np.where(signal_channels["stream_id"] == f"{stream_id}")[0]:
value = signal_annotations[d][key]
values.append(value)
values = np.array(values)
if values.ndim == 1:
# 'InputRange': is 2D and make bugs
stream_ann["__array_annotations__"][key] = values
for c in range(spike_channels.size):
unit_ann = seg_annotations["spikes"][c]
unit_ann.update(unit_annotations[c])
for c in range(event_channels.size):
# annotations for channel events
event_id, ttl_input = self.internal_event_ids[c]
chan_id = event_channels[c]["id"]
ev_ann = seg_annotations["events"][c]
ev_ann["file_origin"] = self.nev_filenames[chan_id]
# ~ ev_ann['marker_id'] =
# ~ ev_ann['nttl'] =
# ~ ev_ann['digital_marker'] =
# ~ ev_ann['analog_marker'] =
@staticmethod
def _get_file_map(filename):
"""
Create memory maps when needed
see also https://github.com/numpy/numpy/issues/19340
"""
filename = Path(filename)
suffix = filename.suffix.lower()[1:]
if suffix == "ncs":
return np.memmap(filename, dtype=NeuralynxRawIO._ncs_dtype, mode="r", offset=NlxHeader.HEADER_SIZE)
elif suffix in ["nse", "ntt"]:
info = NlxHeader(filename)
dtype = get_nse_or_ntt_dtype(info, suffix)
# return empty map if file does not contain data
if os.path.getsize(filename) <= NlxHeader.HEADER_SIZE:
return np.zeros((0,), dtype=dtype)
return np.memmap(filename, dtype=dtype, mode="r", offset=NlxHeader.HEADER_SIZE)
elif suffix == "nev":
return np.memmap(filename, dtype=nev_dtype, mode="r", offset=NlxHeader.HEADER_SIZE)
else:
raise ValueError(f"Unknown file suffix {suffix}")
# Accessors for segment times which are offset by appropriate global start time
def _segment_t_start(self, block_index, seg_index):
return self._seg_t_starts[seg_index] - self.global_t_start
def _segment_t_stop(self, block_index, seg_index):
return self._seg_t_stops[seg_index] - self.global_t_start
def _get_signal_size(self, block_index, seg_index, stream_index):
stream_id = self.header["signal_streams"][stream_index]["id"]
stream_mask = self.header["signal_channels"]["stream_id"] == stream_id
signals = self.header["signal_channels"][stream_mask]
if len(signals):
sig = signals[0]
return self._sigs_length[seg_index][(sig["name"], sig["id"])]
else:
raise ValueError(
f"No signals present for block {block_index}, segment {seg_index}," f" stream {stream_index}"
)
def _get_signal_t_start(self, block_index, seg_index, stream_index):
stream_id = self.header["signal_streams"][stream_index]["id"]
stream_mask = self.header["signal_channels"]["stream_id"] == stream_id
# use first channel of stream as all channels in stream have a common t_start
channel = self.header["signal_channels"][stream_mask][0]
data = self._sigs_memmaps[seg_index][(channel["name"], channel["id"])]
absolute_t_start = data["timestamp"][0]
return absolute_t_start / 1e6 - self.global_t_start
def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, stream_index, channel_indexes):
"""
Retrieve chunk of analog signal, a chunk being a set of contiguous samples.
Parameters
----------
block_index:
index of block in dataset, ignored as only 1 block in this implementation
seg_index:
index of segment to use
i_start:
sample index of first sample within segment to retrieve
i_stop:
sample index of last sample within segment to retrieve
channel_indexes:
list of channel indices to return data for
Returns
-------
array of samples, with each requested channel in a column
"""
if i_start is None:
i_start = 0
if i_stop is None:
i_stop = self.get_signal_size(block_index=block_index, seg_index=seg_index, stream_index=stream_index)
block_start = i_start // NcsSection._RECORD_SIZE
block_stop = i_stop // NcsSection._RECORD_SIZE + 1
sl0 = i_start % 512
sl1 = sl0 + (i_stop - i_start)
if channel_indexes is None:
channel_indexes = slice(None)
stream_id = self.header["signal_streams"][stream_index]["id"]
stream_mask = self.header["signal_channels"]["stream_id"] == stream_id
channel_ids = self.header["signal_channels"][stream_mask][channel_indexes]["id"]
channel_names = self.header["signal_channels"][stream_mask][channel_indexes]["name"]
# create buffer for samples
sigs_chunk = np.zeros((i_stop - i_start, len(channel_ids)), dtype="int16")
for i, chan_uid in enumerate(zip(channel_names, channel_ids)):
data = self._sigs_memmaps[seg_index][chan_uid]
sub = data[block_start:block_stop]
sigs_chunk[:, i] = sub["samples"].flatten()[sl0:sl1]
return sigs_chunk
def _spike_count(self, block_index, seg_index, unit_index):
chan_uid, unit_id = self.internal_unit_ids[unit_index]
data = self._spike_memmap[chan_uid]
ts = data["timestamp"]
ts0 = self.segment_t_start(block_index, seg_index)
ts1 = self.segment_t_stop(block_index, seg_index)
# rescale to integer sampling of time
ts0 = int((ts0 + self.global_t_start) * 1e6)
ts1 = int((ts1 + self.global_t_start) * 1e6)
# only count spikes inside the timestamp limits, inclusive, and for the specified unit
keep = (ts >= ts0) & (ts <= ts1) & (unit_id == data["unit_id"])
nb_spike = int(data[keep].size)
return nb_spike
def _get_spike_timestamps(self, block_index, seg_index, unit_index, t_start, t_stop):
"""
Extract timestamps within a Segment defined by ncs timestamps
"""
chan_uid, unit_id = self.internal_unit_ids[unit_index]
data = self._spike_memmap[chan_uid]
ts = data["timestamp"]
ts0, ts1 = t_start, t_stop
if ts0 is None:
ts0 = self.segment_t_start(block_index, seg_index)
if ts1 is None:
ts1 = self.segment_t_stop(block_index, seg_index)
# rescale to integer sampling of time
ts0 = int((ts0 + self.global_t_start) * 1e6)
ts1 = int((ts1 + self.global_t_start) * 1e6)
keep = (ts >= ts0) & (ts <= ts1) & (unit_id == data["unit_id"])
timestamps = ts[keep]
return timestamps
def _rescale_spike_timestamp(self, spike_timestamps, dtype):
spike_times = spike_timestamps.astype(dtype)
spike_times /= 1e6
spike_times -= self.global_t_start
return spike_times
def _get_spike_raw_waveforms(self, block_index, seg_index, unit_index, t_start, t_stop):
chan_uid, unit_id = self.internal_unit_ids[unit_index]
data = self._spike_memmap[chan_uid]
ts = data["timestamp"]
ts0, ts1 = t_start, t_stop
if ts0 is None:
ts0 = self.segment_t_start(block_index, seg_index)
if ts1 is None:
ts1 = self.segment_t_stop(block_index, seg_index)
# rescale to integer sampling of time
ts0 = int((ts0 + self.global_t_start) * 1e6)
ts1 = int((ts1 + self.global_t_start) * 1e6)
keep = (ts >= ts0) & (ts <= ts1) & (unit_id == data["unit_id"])
wfs = data[keep]["samples"]
if wfs.ndim == 2:
# case for nse
waveforms = wfs[:, None, :]
else:
# case for ntt change (n, 32, 4) to (n, 4, 32)
waveforms = wfs.swapaxes(1, 2)
return waveforms
def _event_count(self, block_index, seg_index, event_channel_index):
event_id, ttl_input = self.internal_event_ids[event_channel_index]
chan_id = self.header["event_channels"][event_channel_index]["id"]
data = self._nev_memmap[chan_id]
ts0 = self.segment_t_start(block_index, seg_index)
ts1 = self.segment_t_stop(block_index, seg_index)
# rescale to integer sampling of time
ts0 = int((ts0 + self.global_t_start) * 1e6)
ts1 = int((ts1 + self.global_t_start) * 1e6)
ts = data["timestamp"]
keep = (ts >= ts0) & (ts <= ts1) & (data["event_id"] == event_id) & (data["ttl_input"] == ttl_input)
nb_event = int(data[keep].size)
return nb_event
def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
event_id, ttl_input = self.internal_event_ids[event_channel_index]
chan_id = self.header["event_channels"][event_channel_index]["id"]
data = self._nev_memmap[chan_id]
ts0, ts1 = t_start, t_stop
if ts0 is None:
ts0 = self.segment_t_start(block_index, seg_index)
if ts1 is None:
ts1 = self.segment_t_stop(block_index, seg_index)
# rescale to integer sampling of time
ts0 = int((ts0 + self.global_t_start) * 1e6)
ts1 = int((ts1 + self.global_t_start) * 1e6)
ts = data["timestamp"]
keep = (ts >= ts0) & (ts <= ts1) & (data["event_id"] == event_id) & (data["ttl_input"] == ttl_input)
subdata = data[keep]
timestamps = subdata["timestamp"]
labels = subdata["event_string"].astype("U")
durations = None
return timestamps, durations, labels
def _rescale_event_timestamp(self, event_timestamps, dtype, event_channel_index):
event_times = event_timestamps.astype(dtype)
event_times /= 1e6
event_times -= self.global_t_start
return event_times
def _format_gap_report(self, detected_gaps, sampling_frequency, filename):
"""
Format a detailed gap report showing where timestamp discontinuities occur.
Parameters
----------
detected_gaps : list of (int, int)
List of (record_index, gap_size_us) tuples from NcsSections.detected_gaps.
sampling_frequency : float
Sampling frequency in Hz used for the file.
filename : str
Path to the NCS file for the report header.
Returns
-------
str
Formatted gap report with table.
"""
gap_durations_ms = [abs(gap_size) / 1000.0 for _, gap_size in detected_gaps]
gap_positions_seconds = [
record_index * NcsSection._RECORD_SIZE / sampling_frequency
for record_index, _ in detected_gaps
]
gap_detail_lines = [
f"| {record_index:>15,} | {pos:>21.6f} | {dur:>21.3f} |\n"
for (record_index, _), pos, dur in zip(detected_gaps, gap_positions_seconds, gap_durations_ms)
]
return (
f"Gap Report for {os.path.basename(filename)}:\n"
f"Found {len(detected_gaps)} timestamp gaps "
f"(detection threshold: {NcsSectionsFactory._maxGapSampFrac} sample intervals)\n\n"
"Gap Details:\n"
"+-----------------+-----------------------+-----------------------+\n"
"| Record Index | Record at (Seconds) | Gap Size (ms) |\n"
"+-----------------+-----------------------+-----------------------+\n"
+ "".join(gap_detail_lines)
+ "+-----------------+-----------------------+-----------------------+\n"
)
def _get_neuralynx_timestamps(self, block_index, seg_index, stream_index):
"""
Return original NCS record timestamps for the first channel in a stream/segment.
These are the raw hardware timestamps from the NCS file records, one timestamp
per 512-sample record, in microseconds.
Parameters
----------
block_index : int
Block index (always 0 for Neuralynx).
seg_index : int
Segment index.
stream_index : int
Stream index.
Returns
-------
np.ndarray
Timestamps in microseconds from the NCS records, one per 512-sample record.
"""
stream_id = self.header["signal_streams"][stream_index]["id"]
stream_mask = self.header["signal_channels"]["stream_id"] == stream_id
channel = self.header["signal_channels"][stream_mask][0]
chan_uid = (channel["name"], channel["id"])
data = self._sigs_memmaps[seg_index][chan_uid]
return data["timestamp"].copy()
def scan_stream_ncs_files(self, ncs_filenames):
"""
Given a list of ncs files, read their basic structure.
Ncs files have to have common sampling_rate, number of packets and t_start
(be part of a single stream)
Parameters
----------
ncs_filenames: list
List of ncs filenames to scan.
Returns
-------
memmaps
[ {} for seg_index in range(self._nb_segment) ][chan_uid]
seg_time_limits
SegmentTimeLimits for sections in scanned Ncs files
section_structure
Section structure common to the ncs files
Files will be scanned to determine the sections of records. If file is a single