|
| 1 | +""" Manages events of temporal extent. """ |
| 2 | + |
| 3 | +from hed.tools.analysis.temporal_event import TemporalEvent |
| 4 | +from hed.models.model_constants import DefTagNames |
| 5 | + |
| 6 | + |
| 7 | +class EventManagerCopy: |
| 8 | + |
| 9 | + def __init__(self, input_data, hed_schema, extra_def_dict=None): |
| 10 | + """ Create an event manager for an events file. Manages events of temporal extent. This |
| 11 | +
|
| 12 | + Parameters: |
| 13 | + hed_strings (list): A list of HED strings |
| 14 | + onsets (list): A list of onset times that is the same length as hed_strings |
| 15 | + def_dict (DefinitionDict): Contains the definitions for this dataset. |
| 16 | +
|
| 17 | + :raises HedFileError: |
| 18 | + - if there are any unmatched offsets. |
| 19 | +
|
| 20 | + Notes: Keeps the events of temporal extend by their starting index in events file. These events |
| 21 | + are separated from the rest of the annotations. |
| 22 | +
|
| 23 | + """ |
| 24 | + |
| 25 | + self.event_list = [[] for _ in range(len(onsets))] |
| 26 | + self.onsets = onsets |
| 27 | + self.hed_strings = hed_strings ## copy.deepcopy(hed_strings) |
| 28 | + self.def_dict = def_dict |
| 29 | + self.anchor_dict ={} |
| 30 | + self._create_event_list() |
| 31 | + self._create_anchor_list() |
| 32 | + |
| 33 | + # def iter_context(self): |
| 34 | + # """ Iterate rows of context. |
| 35 | + # |
| 36 | + # Yields: |
| 37 | + # int: position in the dataFrame |
| 38 | + # HedString: Context |
| 39 | + # |
| 40 | + # """ |
| 41 | + # |
| 42 | + # for index in range(len(self.contexts)): |
| 43 | + # yield index, self.contexts[index] |
| 44 | + |
| 45 | + def _create_anchor_list(self): |
| 46 | + """ Populate the dictionary of def names to list of temporal events. |
| 47 | +
|
| 48 | + :raises HedFileError: |
| 49 | + - If the hed_strings contain unmatched offsets. |
| 50 | +
|
| 51 | + Notes: |
| 52 | +
|
| 53 | + """ |
| 54 | + for index, events in enumerate(self.event_list): |
| 55 | + for event in events: |
| 56 | + index_list = self.anchor_dict.get(event.anchor, []) |
| 57 | + index_list.append(event) |
| 58 | + self.anchor_dict[event.anchor] = index_list |
| 59 | + |
| 60 | + def _create_event_list(self): |
| 61 | + """ Populate the event_list with the events with temporal extent indexed by event number. |
| 62 | +
|
| 63 | + :raises HedFileError: |
| 64 | + - If the hed_strings contain unmatched offsets. |
| 65 | +
|
| 66 | + Notes: |
| 67 | +
|
| 68 | + """ |
| 69 | + onset_dict = {} # Temporary dictionary keeping track of temporal events that haven't ended yet. |
| 70 | + for event_index, hed in enumerate(self.hed_strings): |
| 71 | + self._extract_temporal_events(hed, event_index, onset_dict) |
| 72 | + # Now handle the events that extend to end of list |
| 73 | + for item in onset_dict.values(): |
| 74 | + item.set_end(len(self.onsets), None) |
| 75 | + |
| 76 | + def _extract_temporal_events(self, hed, event_index, onset_dict): |
| 77 | + """ Extract the temporal events and remove them from the other HED strings. |
| 78 | +
|
| 79 | + Parameters: |
| 80 | + hed (HedString): The assembled HedString at position event_index in the data. |
| 81 | + event_index (int): The position of this string in the data. |
| 82 | + onset_dict (dict): Running dict that keeps track of temporal events that haven't yet ended. |
| 83 | +
|
| 84 | + Note: |
| 85 | + This removes the events of temporal extent from the HED string. |
| 86 | +
|
| 87 | + """ |
| 88 | + if not hed: |
| 89 | + return |
| 90 | + group_tuples = hed.find_top_level_tags(anchor_tags={DefTagNames.ONSET_KEY, DefTagNames.OFFSET_KEY}, |
| 91 | + include_groups=2) |
| 92 | + to_remove = [] |
| 93 | + for tup in group_tuples: |
| 94 | + anchor_tag = tup[1].find_def_tags(recursive=False, include_groups=0)[0] |
| 95 | + anchor = anchor_tag.extension.lower() |
| 96 | + if anchor in onset_dict or tup[0].short_base_tag.lower() == DefTagNames.OFFSET_KEY: |
| 97 | + temporal_event = onset_dict.pop(anchor) |
| 98 | + temporal_event.set_end(event_index, self.onsets[event_index]) |
| 99 | + if tup[0] == DefTagNames.ONSET_KEY: |
| 100 | + new_event = TemporalEvent(tup[1], event_index, self.onsets[event_index]) |
| 101 | + self.event_list[event_index].append(new_event) |
| 102 | + onset_dict[anchor] = new_event |
| 103 | + to_remove.append(tup[1]) |
| 104 | + hed.remove(to_remove) |
| 105 | + |
| 106 | + def _set_event_contexts(self): |
| 107 | + """ Creates an event context for each hed string. |
| 108 | +
|
| 109 | + Notes: |
| 110 | + The event context would be placed in an event context group, but is kept in a separate array without the |
| 111 | + event context group or tag. |
| 112 | +
|
| 113 | + """ |
| 114 | + # contexts = [[] for _ in range(len(self.hed_strings))] |
| 115 | + # for onset in self.onset_list: |
| 116 | + # for i in range(onset.start_index+1, onset.end_index): |
| 117 | + # contexts[i].append(onset.contents) |
| 118 | + # for i in range(len(self.hed_strings)): |
| 119 | + # contexts[i] = HedString(",".join(contexts[i]), hed_schema=self.hed_schema) |
| 120 | + # self.contexts = contexts |
| 121 | + print("_set_event_contexts not implemented yet") |
| 122 | + |
| 123 | + def _update_onset_list(self, group, onset_dict, event_index): |
| 124 | + """ Process one onset or offset group to create onset_list. |
| 125 | +
|
| 126 | + Parameters: |
| 127 | + group (HedGroup): The HedGroup containing the onset or offset. |
| 128 | + onset_dict (dict): A dictionary of OnsetGroup objects that keep track of span of an event. |
| 129 | + event_index (int): The event number in the list. |
| 130 | +
|
| 131 | + :raises HedFileError: |
| 132 | + - if an unmatched offset is encountered. |
| 133 | +
|
| 134 | + Notes: |
| 135 | + - Modifies onset_dict and onset_list. |
| 136 | + """ |
| 137 | + # def_tags = group.find_def_tags(recursive=False, include_groups=0) |
| 138 | + # name = def_tags[0].extension |
| 139 | + # onset_element = onset_dict.pop(name, None) |
| 140 | + # if onset_element: |
| 141 | + # onset_element.end_index = event_index |
| 142 | + # self.onset_list.append(onset_element) |
| 143 | + # elif is_offset: |
| 144 | + # raise HedFileError("UnmatchedOffset", f"Unmatched {name} offset at event {event_index}", " ") |
| 145 | + # if not is_offset: |
| 146 | + # onset_element = TemporalEvent(name, group, event_index) |
| 147 | + # onset_dict[name] = onset_element |
0 commit comments