|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from collections.abc import Iterable |
| 4 | +from typing import TYPE_CHECKING, cast |
| 5 | + |
| 6 | +from hyperbase.hyperedge import Atom, Hyperedge, UniqueAtom |
| 7 | + |
| 8 | +if TYPE_CHECKING: |
| 9 | + from hyperbase.parsers.parse_result import ParseResult |
| 10 | + |
| 11 | + |
| 12 | +def str_to_atom(s: str) -> str: |
| 13 | + """Converts a string into a valid atom.""" |
| 14 | + atom = s.lower() |
| 15 | + |
| 16 | + atom = atom.replace("%", "%25") |
| 17 | + atom = atom.replace("/", "%2f") |
| 18 | + atom = atom.replace(" ", "%20") |
| 19 | + atom = atom.replace("(", "%28") |
| 20 | + atom = atom.replace(")", "%29") |
| 21 | + atom = atom.replace(".", "%2e") |
| 22 | + atom = atom.replace("*", "%2a") |
| 23 | + atom = atom.replace("&", "%26") |
| 24 | + atom = atom.replace("@", "%40") |
| 25 | + atom = atom.replace("\n", "%0a") |
| 26 | + atom = atom.replace("\r", "%0d") |
| 27 | + |
| 28 | + return atom |
| 29 | + |
| 30 | + |
| 31 | +def _edge_str_has_outer_parens(edge_str: str) -> bool: |
| 32 | + """Check if string representation of edge is delimited by outer |
| 33 | + parenthesis. |
| 34 | + """ |
| 35 | + if len(edge_str) < 2: |
| 36 | + return False |
| 37 | + return edge_str[0] == "(" |
| 38 | + |
| 39 | + |
| 40 | +def split_edge_str(edge_str: str) -> tuple[str, ...]: |
| 41 | + """Shallow split into tokens of a string representation of an edge, |
| 42 | + without outer parenthesis. |
| 43 | + """ |
| 44 | + start = 0 |
| 45 | + depth = 0 |
| 46 | + str_length = len(edge_str) |
| 47 | + active = 0 |
| 48 | + tokens: list[str] = [] |
| 49 | + for i in range(str_length): |
| 50 | + c = edge_str[i] |
| 51 | + if c == " ": |
| 52 | + if active and depth == 0: |
| 53 | + tokens.append(edge_str[start:i]) |
| 54 | + active = 0 |
| 55 | + elif c == "(": |
| 56 | + if depth == 0: |
| 57 | + active = 1 |
| 58 | + start = i |
| 59 | + depth += 1 |
| 60 | + elif c == ")": |
| 61 | + depth -= 1 |
| 62 | + if depth == 0: |
| 63 | + tokens.append(edge_str[start : i + 1]) |
| 64 | + active = 0 |
| 65 | + elif depth < 0: |
| 66 | + raise ValueError(f"Unbalanced parenthesis in edge string: '{edge_str}'") |
| 67 | + else: |
| 68 | + if not active: |
| 69 | + active = 1 |
| 70 | + start = i |
| 71 | + |
| 72 | + if active: |
| 73 | + if depth > 0: |
| 74 | + raise ValueError(f"Unbalanced parenthesis in edge string: '{edge_str}'") |
| 75 | + else: |
| 76 | + tokens.append(edge_str[start:]) |
| 77 | + |
| 78 | + return tuple(tokens) |
| 79 | + |
| 80 | + |
| 81 | +def _parsed_token(token: str) -> Hyperedge: |
| 82 | + if _edge_str_has_outer_parens(token): |
| 83 | + return hedge(token) |
| 84 | + else: |
| 85 | + return Atom(token) |
| 86 | + |
| 87 | + |
| 88 | +def _collect_positions(tok_pos: Hyperedge) -> list[int]: |
| 89 | + """Collect all valid (>= 0) token positions from a tok_pos tree.""" |
| 90 | + if tok_pos.atom: |
| 91 | + pos = int(str(tok_pos)) |
| 92 | + return [pos] if pos >= 0 else [] |
| 93 | + else: |
| 94 | + positions: list[int] = [] |
| 95 | + for sub in tok_pos: |
| 96 | + positions.extend(_collect_positions(sub)) |
| 97 | + return positions |
| 98 | + |
| 99 | + |
| 100 | +def _rebuild_with_text( |
| 101 | + edge: Hyperedge, |
| 102 | + tok_pos: Hyperedge, |
| 103 | + tokens: list[str], |
| 104 | +) -> Hyperedge: |
| 105 | + """Recursively rebuild an edge, assigning text from tokens and tok_pos.""" |
| 106 | + if edge.atom: |
| 107 | + atom = cast(Atom, edge) |
| 108 | + pos = int(str(tok_pos)) |
| 109 | + text = tokens[pos] if pos >= 0 else None |
| 110 | + return Atom(str(atom), atom.parens, text=text) |
| 111 | + else: |
| 112 | + new_children = tuple( |
| 113 | + _rebuild_with_text(sub_edge, sub_tok_pos, tokens) |
| 114 | + for sub_edge, sub_tok_pos in zip(edge, tok_pos, strict=False) |
| 115 | + ) |
| 116 | + positions = _collect_positions(tok_pos) |
| 117 | + if positions: |
| 118 | + min_pos = min(positions) |
| 119 | + max_pos = max(positions) |
| 120 | + text = " ".join(tokens[min_pos : max_pos + 1]) |
| 121 | + else: |
| 122 | + text = None |
| 123 | + return Hyperedge(new_children, text=text) |
| 124 | + |
| 125 | + |
| 126 | +def hedge( |
| 127 | + source: str | Hyperedge | list | tuple | ParseResult, |
| 128 | +) -> Hyperedge: |
| 129 | + """Create a hyperedge.""" |
| 130 | + # Check for ParseResult via duck typing to avoid circular import |
| 131 | + if ( |
| 132 | + hasattr(source, "tok_pos") |
| 133 | + and hasattr(source, "tokens") |
| 134 | + and hasattr(source, "edge") |
| 135 | + ): |
| 136 | + from hyperbase.parsers import ParseResult |
| 137 | + |
| 138 | + _source = cast(ParseResult, source) |
| 139 | + edge = _rebuild_with_text(_source.edge, _source.tok_pos, _source.tokens) |
| 140 | + object.__setattr__(edge, "text", _source.text) |
| 141 | + return edge |
| 142 | + if type(source) in {tuple, list}: |
| 143 | + _source = cast(Iterable, source) |
| 144 | + return Hyperedge(tuple(hedge(item) for item in _source)) |
| 145 | + elif type(source) is str: |
| 146 | + edge_str = source.strip().replace("\n", " ") |
| 147 | + edge_inner_str = edge_str |
| 148 | + |
| 149 | + parens = _edge_str_has_outer_parens(edge_str) |
| 150 | + if parens: |
| 151 | + edge_inner_str = edge_str[1:-1] |
| 152 | + |
| 153 | + tokens = split_edge_str(edge_inner_str) |
| 154 | + if not tokens: |
| 155 | + raise ValueError(f"Edge string is empty: '{source}'") |
| 156 | + edges = tuple(_parsed_token(token) for token in tokens) |
| 157 | + if len(edges) == 1 and isinstance(edges[0], Atom): |
| 158 | + return Atom(str(edges[0]), parens) |
| 159 | + elif len(edges) > 0: |
| 160 | + return Hyperedge(edges) |
| 161 | + else: |
| 162 | + raise ValueError(f"Edge string is empty: '{source}'") |
| 163 | + elif type(source) in {Hyperedge, Atom, UniqueAtom}: |
| 164 | + return source # type: ignore |
| 165 | + else: |
| 166 | + raise TypeError( |
| 167 | + f"Cannot create hyperedge from {type(source).__name__}: {source!r}" |
| 168 | + ) |
| 169 | + |
| 170 | + |
| 171 | +def build_atom(text: str, *parts: str) -> Atom: |
| 172 | + """Build an atom from text and other parts.""" |
| 173 | + atom = str_to_atom(text) |
| 174 | + parts_str = "/".join([part for part in parts if part]) |
| 175 | + if len(parts_str) > 0: |
| 176 | + atom_str = "".join((atom, "/", parts_str)) |
| 177 | + return Atom(atom_str) |
0 commit comments