forked from IfcOpenShell/step-file-parser
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparse.py
More file actions
185 lines (157 loc) · 6.3 KB
/
parse.py
File metadata and controls
185 lines (157 loc) · 6.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from dataclasses import dataclass
from collections import defaultdict
import re
import sys
import builtins
from lark import Lark, UnexpectedCharacters, UnexpectedToken
# import transformer
from .transformer import (
Transformer,
entity_instance,
make_header_ent,
create_step_entity,
)
from .grammar import grammar, HEADER_FIELDS
from .errors import (
HeaderFieldError,
DuplicateNameError,
ErrorCollector,
SyntaxError,
InvalidNameError,
)
def validate_header_fields(header, error_collector, only_header=False):
for field in HEADER_FIELDS.keys():
observed = header.get(field.upper(), [])
expected = HEADER_FIELDS.get(field)._fields
if len(observed) != len(expected):
error_collector.add(
HeaderFieldError(field.upper(), len(observed), len(expected))
)
if only_header:
error_collector.raise_if_any()
@dataclass
class ParseResult:
header: dict
entities: dict[int, list[entity_instance]]
def process_tree(filecontent, file_tree, with_progress, error_collector):
ents = defaultdict(list)
header, data = file_tree.children
header = dict(map(make_header_ent, header.children[0].children))
validate_header_fields(header, error_collector)
n = len(data.children)
if n:
percentages = [i * 100.0 / n for i in range(n + 1)]
num_dots = [int(b) - int(a) for a, b in zip(percentages, percentages[1:])]
for idx, entity_tree in enumerate(data.children):
if with_progress:
sys.stdout.write(num_dots[idx] * ".")
sys.stdout.flush()
ent = create_step_entity(entity_tree)
id_ = int(ent["id"])
if id_ == 0:
error_collector.add(InvalidNameError(filecontent, ent["id"], ent["lines"]))
if ents[id_]:
error_collector.add(
DuplicateNameError(filecontent, ent["id"], ent["lines"])
)
else:
ents[id_].append(ent)
return header, ents
def parse(
*,
filename=None,
filecontent=None,
with_progress=False,
with_tree=True,
only_header=False,
) -> ParseResult:
error_collector = ErrorCollector()
if filename:
if filecontent:
raise ValueError("Cannot specify both filename and filecontent")
filecontent = builtins.open(filename, encoding=None).read()
# Match and remove the comments
p = r"/\*[\s\S]*?\*/"
def replace_fn(match):
return re.sub(r"[^\n]", " ", match.group(), flags=re.M)
filecontent_wo_comments = re.sub(p, replace_fn, filecontent)
if only_header:
# Extract just the HEADER section using regex
header_match = re.search(
r"ISO-10303-21;\s*HEADER;(.*?)ENDSEC;",
filecontent_wo_comments,
flags=re.DOTALL | re.IGNORECASE,
)
if not header_match:
error_collector.add(
HeaderFieldError("header", "", "No HEADER section found in file")
)
error_collector.raise_if_any()
header_text = f"HEADER;{header_match.group(1)}ENDSEC;"
full_header_text = f"ISO-10303-21;{header_text}DATA;ENDSEC;END-ISO-10303-21;"
parser = Lark(grammar, parser="lalr", start="file")
try:
ast = parser.parse(full_header_text)
except (UnexpectedToken, UnexpectedCharacters) as e:
error_collector.add(SyntaxError(filecontent, e))
error_collector.raise_if_any() # Immediately abort in case of critical error
header_tree = ast.children[0] # HEADER section
header = dict(map(make_header_ent, header_tree.children[0].children))
validate_header_fields(header, error_collector, only_header=True)
error_collector.raise_if_any()
return ParseResult(header=header, entities=defaultdict(list))
instance_identifiers = []
transformer = {}
if not with_tree:
# If we're not going to return the tree, we also don't need to
# keep in memory while parsing. So we build a transformer that
# just returns None for every rule. lark creates a dictionary
# of callbacks from the transformer type object, so we can't
# simply use __getattr__ we need an actual type objects with
# callback functions for the rules given in the
# Create a temporary parser just for analysing the grammar
temp = Lark(grammar, parser="lalr", start="file")
# Extract the rule names
rule_names = filter(
lambda s: not s.startswith("_"), set(r.origin.name for r in temp.rules)
)
null_function = lambda self, *args: None
# Create dictionary of methods for type() creation
methods = {r: null_function for r in rule_names}
# Even in this case we do want to report duplicate identifiers
# so these need to be captured
methods["id"] = lambda self, *args: args
methods["simple_entity_instance"] = (
lambda self, tree: instance_identifiers.append(
(int(tree[0][0][0][1:]), int(tree[0][0][0].line))
)
)
NT = type("NullTransformer", (Transformer,), methods)
transformer = {"transformer": NT()}
parser = Lark(grammar, parser="lalr", start="file", **transformer)
try:
ast = parser.parse(filecontent_wo_comments)
except (UnexpectedToken, UnexpectedCharacters) as e:
error_collector.add(SyntaxError(filecontent, e))
error_collector.raise_if_any() # Immediately abort in case of critical error
if with_tree:
header, data = process_tree(filecontent, ast, with_progress, error_collector)
error_collector.raise_if_any()
return ParseResult(header=header, entities=data)
else:
# process_tree() would take care of duplicate identifiers,
# but we need to do it ourselves now using our rudimentary
# transformer
seen = set()
for iden, lineno in instance_identifiers:
if iden == 0:
error_collector.add(
InvalidNameError(filecontent, iden, [lineno, lineno])
)
if iden in seen:
error_collector.add(
DuplicateNameError(filecontent, iden, [lineno, lineno])
)
else:
seen.add(iden)
error_collector.raise_if_any()