|
8 | 8 | import bisect |
9 | 9 | import logging |
10 | 10 |
|
11 | | -# Used to prevent sorting by anything other than position |
| 11 | +# Prevents sort (not a number, so not less, greater or equal to itself) |
12 | 12 | NO_SORT = float("nan") |
13 | 13 |
|
14 | 14 | # Block status codes |
|
19 | 19 | STATUS_SLOW = "*" # Non-trimmed, non-scraped (slow reads) |
20 | 20 | STATUS_SCRAPED = "#" # Non-trimmed, scraped (slow reads completed) |
21 | 21 |
|
| 22 | +# Helper sets for fast status categorization |
| 23 | +CACHED = {STATUS_OK, STATUS_SLOW, STATUS_SCRAPED} # Have data |
| 24 | +UNCACHED = {STATUS_UNTRIED} # Need data |
| 25 | +ERROR = {STATUS_ERROR, STATUS_TRIMMED} # Can't get data |
| 26 | +STATUSES = CACHED | UNCACHED | ERROR # All valid statuses |
| 27 | + |
22 | 28 | log = logging.getLogger(__name__) |
23 | 29 |
|
24 | 30 |
|
@@ -58,20 +64,70 @@ def __setitem__(self, key, status): |
58 | 64 | # Single offset |
59 | 65 | self._set_status_range(key, key, status) |
60 | 66 |
|
61 | | - def __iter__(self): |
62 | | - """Iterate over transitions yielding (pos, size, status) tuples.""" |
63 | | - if not self.transitions: |
64 | | - return |
| 67 | + def __getitem__(self, key): |
| 68 | + """Get status for range using slice notation: filemap[start:end] returns transitions""" |
| 69 | + if isinstance(key, slice): |
| 70 | + # Check bounds before calling indices() which clamps values |
| 71 | + if key.start is not None and key.start < 0: |
| 72 | + raise ValueError(f"Negative start index: {key.start}") |
| 73 | + if key.stop is not None and key.stop > self.size: |
| 74 | + raise ValueError(f"Stop index beyond device size: {key.stop} > {self.size}") |
| 75 | + |
| 76 | + start, stop, step = key.indices(self.size) |
| 77 | + if step != 1: |
| 78 | + raise ValueError("Step not supported") |
| 79 | + |
| 80 | + # Return empty list for empty range |
| 81 | + if start >= stop: |
| 82 | + return [] |
| 83 | + |
| 84 | + return self._get_transitions_range(start, stop - 1) |
| 85 | + else: |
| 86 | + # Single offset |
| 87 | + return self._get_status_at(key) |
| 88 | + |
| 89 | + def _get_transitions_range(self, start: int, end: int) -> list[tuple]: |
| 90 | + """Get transitions covering range with synthetic start/end positions.""" |
| 91 | + # Find transitions that fall within our range [start, end] |
| 92 | + # We want transitions where start < transition.pos <= end |
| 93 | + result = [] |
| 94 | + |
| 95 | + # Get status at start position |
| 96 | + start_search = (start + 1, NO_SORT, "") |
| 97 | + start_idx = bisect.bisect_left(self.transitions, start_search) |
| 98 | + start_transition_idx = max(0, start_idx - 1) |
| 99 | + start_status = self.transitions[start_transition_idx][2] |
| 100 | + |
| 101 | + # Add synthetic start |
| 102 | + result.append((start, NO_SORT, start_status)) |
| 103 | + |
| 104 | + # Add all transitions that fall within (start, end] |
| 105 | + for i in range(len(self.transitions)): |
| 106 | + pos = self.transitions[i][0] |
| 107 | + if start < pos <= end: |
| 108 | + result.append(self.transitions[i]) |
| 109 | + |
| 110 | + # Get status at end position (might be different from start if we crossed transitions) |
| 111 | + end_search = (end + 1, NO_SORT, "") |
| 112 | + end_idx = bisect.bisect_left(self.transitions, end_search) |
| 113 | + end_transition_idx = max(0, end_idx - 1) |
| 114 | + end_status = self.transitions[end_transition_idx][2] |
| 115 | + |
| 116 | + # Add synthetic end |
| 117 | + result.append((end, NO_SORT, end_status)) |
| 118 | + |
| 119 | + return result |
| 120 | + |
| 121 | + def _get_status_at(self, offset: int) -> str: |
| 122 | + """Get status at single offset using efficient bisect lookup.""" |
| 123 | + # Search for (offset + 1, ...) to find the transition that starts after offset |
| 124 | + search_key = (offset + 1, NO_SORT, "") |
| 125 | + idx = bisect.bisect_left(self.transitions, search_key) |
65 | 126 |
|
66 | | - # Process transitions to yield ranges |
67 | | - for i in range(len(self.transitions) - 1): |
68 | | - start = self.transitions[i][0] |
69 | | - end = self.transitions[i + 1][0] - 1 |
70 | | - status = self.transitions[i][2] |
| 127 | + # The transition covering offset is at idx-1 (or 0 if idx is 0) |
| 128 | + transition_idx = max(0, idx - 1) |
71 | 129 |
|
72 | | - size = end - start + 1 |
73 | | - if size > 0: # Skip zero-length ranges |
74 | | - yield (start, size, status) |
| 130 | + return self.transitions[transition_idx][2] |
75 | 131 |
|
76 | 132 | def _set_status_range(self, start: int, end: int, status: str) -> None: |
77 | 133 | """Set the status for a range of bytes.""" |
@@ -108,7 +164,7 @@ def _set_status_range(self, start: int, end: int, status: str) -> None: |
108 | 164 | # if the status is different, we need to add a new entry |
109 | 165 | splice.append(start_key) |
110 | 166 |
|
111 | | - if before_end_status != status: |
| 167 | + if before_end_status != status or end_idx == len(self.transitions): |
112 | 168 | splice.append((end + 1, NO_SORT, before_end_status)) |
113 | 169 | if end + 1 < after_pos: |
114 | 170 | splice.append((after_pos, NO_SORT, after_status)) |
|
0 commit comments