Skip to content

Commit a0b8501

Browse files
authored
Add option to load only specific streams (#24)
* Add option to load only specific streams * Update changelog * Add more details and examples in load_only description * Rename load_only -> stream_ids * Add stream_info function to extract information on streams contained in file * Rename stream_info -> resolve_streams, add docstring * Add match_streaminfo function * Factor out opening of XDF files and use _read_varlen_int * Allow list of dicts as stream_ids argument * Update docstring * Rename stream_ids parameter -> select_streams * Correctly handle default case of select_streams=None * Fix arg name in docstring * Rephrase select_streams doc
1 parent 4726204 commit a0b8501

3 files changed

Lines changed: 188 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## [1.15.2] - 2019-06-07
22
### Added
33
- Store unique stream ID inside the `["info"]["stream_id"]` dict value ([#19](https://github.com/xdf-modules/xdf-Python/pull/19) by [Clemens Brunner](https://github.com/cbrnr)).
4+
- Add option to load only specific streams ([#24](https://github.com/xdf-modules/xdf-Python/pull/24) by [Clemens Brunner](https://github.com/cbrnr)).
45

56
## [1.15.1] - 2019-04-26
67
### Added

pyxdf/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
# Authors: Christian Kothe & the Intheon pyxdf team
2+
# Clemens Brunner
23
#
34
# License: BSD (2-clause)
45

56
from pkg_resources import get_distribution, DistributionNotFound
67
try:
78
__version__ = get_distribution(__name__).version
8-
except DistributionNotFound:
9-
# package is not installed
9+
except DistributionNotFound: # package is not installed
1010
__version__ = None
11-
from .pyxdf import load_xdf
11+
from .pyxdf import load_xdf, resolve_streams, match_streaminfos
1212

pyxdf/pyxdf.py

Lines changed: 184 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def __init__(self, xml):
6767

6868

6969
def load_xdf(filename,
70+
select_streams=None,
7071
on_chunk=None,
7172
synchronize_clocks=True,
7273
handle_clock_resets=True,
@@ -94,6 +95,20 @@ def load_xdf(filename,
9495
Args:
9596
filename : name of the file to import (*.xdf or *.xdfz)
9697
98+
select_streams : int | list[int] | list[dict] | None
99+
One or more stream IDs to load. Accepted values are:
100+
- int or list[int]: load only specified stream IDs, e.g.
101+
select_streams=5 loads only the stream with stream ID 5, whereas
102+
select_streams=[2, 4] loads only streams with stream IDs 2 and 4.
103+
- list[dict]: load only streams matching the query, e.g.
104+
select_streams=[{'type': 'EEG'}] loads all streams of type 'EEG'.
105+
Entries within a dict must all match a stream, e.g.
106+
select_streams=[{'type': 'EEG', 'name': 'TestAMP'}] matches streams
107+
with both type 'EEG' *and* name 'TestAMP'. If
108+
select_streams=[{'type': 'EEG'}, {'name': 'TestAMP'}], streams
109+
matching either the type *or* the name will be loaded.
110+
- None: load all streams (default).
111+
97112
synchronize_clocks : Whether to enable clock synchronization based on
98113
ClockOffset chunks. (default: true)
99114
@@ -155,10 +170,10 @@ def load_xdf(filename,
155170
streams : list of dicts, one for each stream; the dicts
156171
have the following content:
157172
['time_series'] entry: contains the stream's time series
158-
[#Channels x #Samples] this matrix is of the type declared in
159-
['info']['channel_format']
160-
['time_stamps'] entry: contains the time stamps for each sample
161-
(synced across streams)
173+
[#Channels x #Samples] this matrix is of the type declared
174+
in ['info']['channel_format']
175+
['time_stamps'] entry: contains the time stamps for each
176+
sample (synced across streams)
162177
163178
['info'] field: contains the meta-data of the stream
164179
(all values are strings)
@@ -179,14 +194,31 @@ def load_xdf(filename,
179194
180195
Examples:
181196
load the streams contained in a given XDF file
182-
>>> streams, fileheader = load_xdf('C:\Recordings\myrecording.xdf')
197+
>>> streams, fileheader = load_xdf('myrecording.xdf')
183198
"""
184199

185200
logger.info('Importing XDF file %s...' % filename)
186201
if not os.path.exists(filename):
187202
raise Exception('file %s does not exist.' % filename)
188203

189-
# dict of returned streams, in order of apparance, indexed by stream id
204+
# if select_streams is an int or a list of int, load only streams
205+
# associated with the corresponding stream IDs
206+
# if select_streams is a list of dicts, use this to query and load streams
207+
# associated with these properties
208+
if select_streams is None:
209+
pass
210+
elif isinstance(select_streams, int):
211+
select_streams = [select_streams]
212+
elif all([isinstance(elem, dict) for elem in select_streams]):
213+
select_streams = match_streaminfos(resolve_streams(filename),
214+
select_streams)
215+
if not select_streams: # no streams found
216+
raise ValueError("No matching streams found.")
217+
elif not all([isinstance(elem, int) for elem in select_streams]):
218+
raise ValueError("Argument 'select_streams' must be an int, a list of "
219+
"ints or a list of dicts.")
220+
221+
# dict of returned streams, in order of appearance, indexed by stream id
190222
streams = OrderedDict()
191223
# dict of per-stream temporary data (StreamData), indexed by stream id
192224
temp = {}
@@ -195,22 +227,9 @@ def load_xdf(filename,
195227
# number of bytes in the file for fault tolerance
196228
filesize = os.path.getsize(filename)
197229

198-
# read file contents ([SomeText] below refers to items in the XDF Spec)
199-
filename = Path(filename) # convert to pathlib object
200-
if filename.suffix == '.xdfz' or filename.suffixes == ['.xdf', '.gz']:
201-
f_open = gzip.open
202-
else:
203-
f_open = open
204-
205-
with f_open(filename, 'rb') as f:
206-
# read [MagicCode]
207-
if f.read(4) != b'XDF:':
208-
raise Exception('not a valid XDF file: %s' % filename)
209-
210-
# for each chunk...
211-
StreamId = None
230+
with open_xdf(filename) as f:
231+
# for each chunk
212232
while True:
213-
214233
# noinspection PyBroadException
215234
try:
216235
# read [NumLengthBytes], [Length]
@@ -231,9 +250,16 @@ def load_xdf(filename,
231250
if tag in [2, 3, 4, 6]:
232251
StreamId = struct.unpack('<I', f.read(4))[0]
233252
log_str += ', StreamId={}'.format(StreamId)
253+
else:
254+
StreamId = None
234255

235256
logger.debug(log_str)
236257

258+
if StreamId is not None and select_streams is not None:
259+
if StreamId not in select_streams:
260+
f.read(chunklen - 2 - 4) # skip remaining chunk contents
261+
continue
262+
237263
# read the chunk's [Content]...
238264
if tag == 1:
239265
# read [FileHeader] chunk
@@ -260,7 +286,7 @@ def load_xdf(filename,
260286
# optionally send through the on_chunk function
261287
if on_chunk is not None:
262288
values, stamps, streams[StreamId] = on_chunk(values, stamps,
263-
streams[StreamId], StreamId)
289+
streams[StreamId], StreamId)
264290
# append to the time series...
265291
temp[StreamId].time_series.append(values)
266292
temp[StreamId].time_stamps.append(stamps)
@@ -336,6 +362,18 @@ def load_xdf(filename,
336362
return streams, fileheader
337363

338364

365+
def open_xdf(filename):
366+
"""Open XDF file for reading."""
367+
filename = Path(filename) # convert to pathlib object
368+
if filename.suffix == '.xdfz' or filename.suffixes == ['.xdf', '.gz']:
369+
f = gzip.open(filename, 'rb')
370+
else:
371+
f = open(filename, 'rb')
372+
if f.read(4) != b'XDF:': # magic bytes
373+
raise IOError('Invalid XDF file {}'.format(filename))
374+
return f
375+
376+
339377
def _read_chunk3(f, s):
340378
# read [NumSampleBytes], [NumSamples]
341379
nsamples = _read_varlen_int(f)
@@ -387,6 +425,8 @@ def _read_varlen_int(f):
387425
return struct.unpack('<I', f.read(4))[0]
388426
elif nbytes == b'\x08':
389427
return struct.unpack('<Q', f.read(8))[0]
428+
elif not nbytes: # EOF
429+
raise EOFError
390430
else:
391431
raise RuntimeError('invalid variable-length integer encountered.')
392432

@@ -584,3 +624,125 @@ def _robust_fit(A, y, rho=1, iters=1000):
584624
z = rho / (1 + rho) * d + 1 / (1 + rho) * tmp * d
585625
u = d - z
586626
return x
627+
628+
629+
def match_streaminfos(stream_infos, parameters):
630+
"""Find stream IDs matching specified criteria.
631+
632+
Parameters
633+
----------
634+
stream_infos : list of dicts
635+
List of dicts containing information on each stream. This information
636+
can be obtained using the function resolve_streams.
637+
parameters : list of dicts
638+
List of dicts containing key/values that should be present in streams.
639+
Examples: [{"name": "Keyboard"}] matches all streams with a "name"
640+
field equal to "Keyboard".
641+
[{"name": "Keyboard"}, {"type": "EEG"}] matches all streams
642+
with a "name" field equal to "Keyboard" and all streams with
643+
a "type" field equal to "EEG".
644+
"""
645+
matches = []
646+
for request in parameters:
647+
for info in stream_infos:
648+
for key in request.keys():
649+
match = info[key] == request[key]
650+
if not match:
651+
break
652+
if match:
653+
matches.append(info['stream_id'])
654+
655+
return list(set(matches)) # return unique values
656+
657+
658+
def resolve_streams(fname):
659+
"""Resolve streams in given XDF file.
660+
661+
Parameters
662+
----------
663+
fname : str
664+
Name of the XDF file.
665+
666+
Returns
667+
-------
668+
stream_infos : list of dicts
669+
List of dicts containing information on each stream.
670+
"""
671+
return parse_chunks(parse_xdf(fname))
672+
673+
674+
def parse_xdf(fname):
675+
"""Parse and return chunks contained in an XDF file.
676+
677+
Parameters
678+
----------
679+
fname : str
680+
Name of the XDF file.
681+
682+
Returns
683+
-------
684+
chunks : list
685+
List of all chunks contained in the XDF file.
686+
"""
687+
chunks = []
688+
with open_xdf(fname) as f:
689+
for chunk in _read_chunks(f):
690+
chunks.append(chunk)
691+
return chunks
692+
693+
694+
def parse_chunks(chunks):
695+
"""Parse chunks and extract information on individual streams."""
696+
streams = []
697+
for chunk in chunks:
698+
if chunk["tag"] == 2: # stream header chunk
699+
streams.append(dict(stream_id=chunk["stream_id"],
700+
name=chunk.get("name"), # optional
701+
type=chunk.get("type"), # optional
702+
source_id=chunk.get("source_id"), # optional
703+
created_at=chunk.get("created_at"), # optional
704+
uid=chunk.get("uid"), # optional
705+
session_id=chunk.get("session_id"), # optional
706+
hostname=chunk.get("hostname"), # optional
707+
channel_count=int(chunk["channel_count"]),
708+
channel_format=chunk["channel_format"],
709+
nominal_srate=int(chunk["nominal_srate"])))
710+
return streams
711+
712+
713+
def _read_chunks(f):
714+
"""Read and yield XDF chunks.
715+
716+
Parameters
717+
----------
718+
f : file handle
719+
File handle of XDF file.
720+
721+
722+
Yields
723+
------
724+
chunk : dict
725+
XDF chunk.
726+
"""
727+
while True:
728+
chunk = dict()
729+
try:
730+
chunk["nbytes"] = _read_varlen_int(f)
731+
except EOFError:
732+
return
733+
chunk["tag"] = struct.unpack('<H', f.read(2))[0]
734+
if chunk["tag"] in [2, 3, 4, 6]:
735+
chunk["stream_id"] = struct.unpack("<I", f.read(4))[0]
736+
if chunk["tag"] == 2: # parse StreamHeader chunk
737+
xml = ET.fromstring(f.read(chunk["nbytes"] - 6).decode())
738+
chunk = {**chunk, **_parse_streamheader(xml)}
739+
else: # skip remaining chunk contents
740+
f.seek(chunk["nbytes"] - 6, 1)
741+
else:
742+
f.seek(chunk["nbytes"] - 2, 1) # skip remaining chunk contents
743+
yield chunk
744+
745+
746+
def _parse_streamheader(xml):
747+
"""Parse stream header XML."""
748+
return {el.tag: el.text for el in xml if el.tag != "desc"}

0 commit comments

Comments
 (0)