-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathoperator.py
More file actions
336 lines (268 loc) · 11.4 KB
/
operator.py
File metadata and controls
336 lines (268 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""The operator expression module."""
import json
from typing import Callable, Generator, List, Union
from jsonpath2.expression import Expression
from jsonpath2.node import Node
import re
class OperatorExpression(Expression):
"""Basic operator expression object."""
def __jsonpath__(
self,
) -> Generator[str, None, None]: # pragma: no cover abstract method
"""Abstract method to return the jsonpath."""
def evaluate(
self, root_value: object, current_value: object
) -> bool: # pragma: no cover abstract method
"""Abstract method to evaluate the expression."""
class BinaryOperatorExpression(OperatorExpression):
"""Binary operator expression."""
def __init__(
self,
token: str,
callback: Callable[[object, object], bool],
left_node_or_value: Union[Node, object],
right_node_or_value: Union[Node, object],
):
"""Constructor save the left right and token."""
super(BinaryOperatorExpression, self).__init__()
self.token = token
self.callback = callback
self.left_node_or_value = left_node_or_value
self.right_node_or_value = right_node_or_value
def __jsonpath__(self) -> Generator[str, None, None]:
"""Return the string json path of this expression."""
if isinstance(self.left_node_or_value, Node):
for left_node_token in self.left_node_or_value.__jsonpath__():
yield left_node_token
else:
yield json.dumps(self.left_node_or_value)
yield " "
yield self.token
yield " "
if isinstance(self.right_node_or_value, Node):
for right_node_token in self.right_node_or_value.__jsonpath__():
yield right_node_token
else:
yield json.dumps(self.right_node_or_value)
def evaluate(self, root_value: object, current_value: object) -> bool:
"""Evaluate the left and right values given the token."""
if isinstance(self.left_node_or_value, Node):
left_values = (
left_node_match_data.current_value
for left_node_match_data in self.left_node_or_value.match(
root_value, current_value
)
)
else:
left_values = [
self.left_node_or_value,
]
if isinstance(self.right_node_or_value, Node):
right_values = (
right_node_match_data.current_value
for right_node_match_data in self.right_node_or_value.match(
root_value, current_value
)
)
else:
right_values = [
self.right_node_or_value,
]
return any(
self.callback(left_value, right_value)
for left_value in left_values
for right_value in right_values
)
class EqualBinaryOperatorExpression(BinaryOperatorExpression):
"""Binary Equal operator expression."""
def __init__(self, *args, **kwargs):
"""Constructor with the right function."""
super(EqualBinaryOperatorExpression, self).__init__(
"=", EqualBinaryOperatorExpression.__evaluate__, *args, **kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform an equal on int or float."""
return x_obj == y_obj
class NotEqualBinaryOperatorExpression(BinaryOperatorExpression):
"""Binary Equal operator expression."""
def __init__(self, *args, **kwargs):
"""Constructor with the right function."""
super(NotEqualBinaryOperatorExpression, self).__init__(
"!=", NotEqualBinaryOperatorExpression.__evaluate__, *args, **kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform a not equal on int or float."""
return x_obj != y_obj
class LessThanBinaryOperatorExpression(BinaryOperatorExpression):
"""Expression to handle less than."""
def __init__(self, *args, **kwargs):
"""Construct the binary operator with appropriate method."""
super(LessThanBinaryOperatorExpression, self).__init__(
"<", LessThanBinaryOperatorExpression.__evaluate__, *args, **kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform a less than on int or float."""
if isinstance(x_obj, (float, int)) and isinstance(y_obj, (float, int)):
return x_obj < y_obj
return False
class LessThanOrEqualToBinaryOperatorExpression(BinaryOperatorExpression):
"""Expression to handle less than or equal."""
def __init__(self, *args, **kwargs):
"""Construct the binary operator with appropriate method."""
super(LessThanOrEqualToBinaryOperatorExpression, self).__init__(
"<=",
LessThanOrEqualToBinaryOperatorExpression.__evaluate__,
*args,
**kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform a less than or equal to on int or float."""
if isinstance(x_obj, (float, int)) and isinstance(y_obj, (float, int)):
return x_obj <= y_obj
return False
class GreaterThanBinaryOperatorExpression(BinaryOperatorExpression):
"""Expression to handle greater than."""
def __init__(self, *args, **kwargs):
"""Construct the binary operator with appropriate method."""
super(GreaterThanBinaryOperatorExpression, self).__init__(
">", GreaterThanBinaryOperatorExpression.__evaluate__, *args, **kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform a greater than on int or float."""
if isinstance(x_obj, (float, int)) and isinstance(y_obj, (float, int)):
return x_obj > y_obj
return False
class GreaterThanOrEqualToBinaryOperatorExpression(BinaryOperatorExpression):
"""Expression to handle greater than or equal."""
def __init__(self, *args, **kwargs):
"""Construct the binary operator with appropriate method."""
super(GreaterThanOrEqualToBinaryOperatorExpression, self).__init__(
">=",
GreaterThanOrEqualToBinaryOperatorExpression.__evaluate__,
*args,
**kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform a greater than or equal to on int or float."""
if isinstance(x_obj, (float, int)) and isinstance(y_obj, (float, int)):
return x_obj >= y_obj
return False
class ContainsBinaryOperatorExpression(BinaryOperatorExpression):
"""Expression to handle in."""
def __init__(self, *args, **kwargs):
"""Construct the binary operator with appropriate method."""
super(ContainsBinaryOperatorExpression, self).__init__(
"contains", ContainsBinaryOperatorExpression.__evaluate__, *args, **kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform an in."""
return y_obj in x_obj
class RegexBinaryOperatorExpression(BinaryOperatorExpression):
"""Expression to handle regex operator."""
def __init__(self, *args, **kwargs):
"""Construct the binary operator with appropriate method."""
super(RegexBinaryOperatorExpression, self).__init__(
"~=", RegexBinaryOperatorExpression.__evaluate__, *args, **kwargs
)
@staticmethod
def __evaluate__(x_obj, y_obj):
"""Perform a regex match."""
return True if re.match(y_obj.replace("\\\\", "\\"), x_obj) else False
class UnaryOperatorExpression(OperatorExpression):
"""Unary operator expression base class."""
def __init__(
self, token: str, callback: Callable[[bool], bool], expression: Expression
):
"""Save the callback operator the token and expression."""
super(UnaryOperatorExpression, self).__init__()
self.token = token
self.callback = callback
self.expression = expression
def __jsonpath__(self) -> Generator[str, None, None]:
"""Generate the jsonpath for a unary operator."""
yield self.token
yield " "
if isinstance(
self.expression, (UnaryOperatorExpression, VariadicOperatorExpression)
):
yield "("
for expression_token in self.expression.__jsonpath__():
yield expression_token
if isinstance(
self.expression, (UnaryOperatorExpression, VariadicOperatorExpression)
):
yield ")"
def evaluate(self, root_value: object, current_value: object) -> bool:
"""Evaluate the unary expression."""
return self.callback(self.expression.evaluate(root_value, current_value))
class NotUnaryOperatorExpression(UnaryOperatorExpression):
"""Unary class to handle the 'not' expression."""
def __init__(self, *args, **kwargs):
"""Call the unary operator expression with the right method."""
super(NotUnaryOperatorExpression, self).__init__(
"not", NotUnaryOperatorExpression.__evaluate__, *args, **kwargs
)
@staticmethod
def __evaluate__(x_obj):
"""The unary not function."""
return not x_obj
class VariadicOperatorExpression(OperatorExpression):
"""Base class to handle boolean expressions of variadic type."""
def __init__(
self,
token: str,
callback: Callable[[List[bool]], bool],
expressions: List[Expression] = None,
):
"""Save the operator token, callback and the list of expressions."""
super(VariadicOperatorExpression, self).__init__()
self.token = token
self.callback = callback
self.expressions = expressions if expressions else []
def __jsonpath__(self) -> Generator[str, None, None]:
"""Yield the string of the expression."""
expressions_count = len(self.expressions)
if expressions_count == 0:
pass
elif expressions_count == 1:
for expression_token in self.expressions[0].__jsonpath__():
yield expression_token
else:
for expression_index, expression in enumerate(self.expressions):
if expression_index > 0:
yield " "
yield self.token
yield " "
if isinstance(expression, VariadicOperatorExpression):
yield "("
for expression_token in expression.__jsonpath__():
yield expression_token
if isinstance(expression, VariadicOperatorExpression):
yield ")"
def evaluate(self, root_value: object, current_value: object) -> bool:
"""Evaluate the expressions against the boolean callback."""
return self.callback(
map(
lambda expression: expression.evaluate(root_value, current_value),
self.expressions,
)
)
class AndVariadicOperatorExpression(VariadicOperatorExpression):
"""The boolean 'and' operator expression."""
def __init__(self, *args, **kwargs):
"""Call the super with the 'and' boolean method."""
super(AndVariadicOperatorExpression, self).__init__("and", all, *args, **kwargs)
class OrVariadicOperatorExpression(VariadicOperatorExpression):
"""The boolean 'or' operator expression."""
def __init__(self, *args, **kwargs):
"""Call the super with the 'or' boolean method."""
super(OrVariadicOperatorExpression, self).__init__("or", any, *args, **kwargs)