Skip to content

Commit 47994a4

Browse files
Add EvalMap for pre-decoded JSON input (RecoLabs#2)
* Add EvalMap for pre-decoded JSON input Add StreamEvaluator.EvalMap that accepts map[string]json.RawMessage directly, avoiding the need to serialize data to []byte before evaluation. Internally, DecodeRawMap converts each value individually via DecodeJSON into an *OrderedMap, then delegates to the shared evalInternal method (refactored out of EvalMany). - Add DecodeRawMap to internal/evaluator/ordered_map.go - Refactor EvalMany into evalInternal + thin EvalMany wrapper - Add EvalMap with schemaKey support for GroupPlan caching - Add resolveGjsonPath for O(1) map key lookup in fast paths - Add tests for EvalMap (parity, multi-expr, nil/empty edge cases) Signed-off-by: NirBarak-RecoLabs <nirb@recolabs.ai> * Add EvalMap to README documentation - Add EvalMap code example to Tier 3 StreamEvaluator section - Add pre-decoded map input bullet to Streaming API key properties - Update project structure to mention EvalMap in stream.go Signed-off-by: NirBarak-RecoLabs <nirb@recolabs.ai> --------- Signed-off-by: NirBarak-RecoLabs <nirb@recolabs.ai>
1 parent 42e7b4d commit 47994a4

6 files changed

Lines changed: 250 additions & 18 deletions

File tree

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ for i, result := range results {
122122
handleMatch(indices[i], result)
123123
}
124124
}
125+
126+
// Alternative: pre-decoded map input (avoids re-serialization)
127+
results, _ = se.EvalMap(ctx, fieldMap, schemaKey, indices)
125128
```
126129

127130
### Custom Function Registration
@@ -156,6 +159,7 @@ Hot Path (millions/day, lock-free)
156159
- **Schema-keyed caching** — the `GroupPlan` (merged paths, expression groupings, selective unmarshal targets) is computed once per schema key and reused immutably.
157160
- **Lock-free reads**`BoundedCache` publishes an `atomic.Pointer` snapshot on every write; reads scan the snapshot without acquiring a lock. Writes are serialised by a mutex.
158161
- **Selective unmarshal** — full-path expressions unmarshal only the subtrees they need (e.g., just the `items` array from a 10KB event), not the entire document.
162+
- **Pre-decoded map input**`EvalMap` accepts `map[string]json.RawMessage` directly, skipping full-document serialization when the caller already has individually-encoded fields. Fast paths resolve top-level keys via O(1) map lookup.
159163
- **Dynamic mutation**`Replace`, `Remove`, and `Reset` methods allow modifying registered expressions at runtime with automatic cache invalidation.
160164
- **Observability** — implement `MetricsHook` to receive per-evaluation callbacks for cache hits/misses, eval latency, fast-path usage, and errors.
161165

@@ -417,7 +421,7 @@ All standard regex features (character classes, quantifiers, alternation, groupi
417421
```
418422
gnata/
419423
├── gnata.go # Public API: Compile, Eval, EvalBytes, EvalWithVars, CustomFunc
420-
├── stream.go # StreamEvaluator, GroupPlan, EvalMany, MetricsHook
424+
├── stream.go # StreamEvaluator, GroupPlan, EvalMany, EvalMap, MetricsHook
421425
├── bounded_cache.go # Lock-free FIFO ring-buffer plan cache
422426
├── deep_equal.go # JSONata-compatible deep equality
423427
├── internal/

func_fast.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ var funcFastHandlers = map[parser.FuncFastKind]funcFastHandler{
7272
parser.FuncFastAverage: evalFuncAverage,
7373
}
7474

75-
func evalFuncBytes(f *parser.FuncFastPath, data json.RawMessage) (result any, handled bool, err error) {
76-
r := gjson.GetBytes(data, f.Path)
75+
func evalFunc(f *parser.FuncFastPath, data json.RawMessage, mapData map[string]json.RawMessage) (result any, handled bool, err error) {
76+
r := resolveGjsonPath(data, mapData, f.Path)
7777
if !r.Exists() {
7878
// Fall through to full evaluator — gjson doesn't auto-map through
7979
// arrays, so the path might still resolve via the AST walker.

gnata.go

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"encoding/json"
1515
"fmt"
1616
"slices"
17+
"strings"
1718

1819
"github.com/recolabs/gnata/functions"
1920
"github.com/recolabs/gnata/internal/evaluator"
@@ -206,12 +207,12 @@ func (e *Expression) EvalBytes(ctx context.Context, data json.RawMessage) (resul
206207
// which handles auto-mapping through arrays correctly.
207208
}
208209
if e.cmpFast != nil {
209-
if res, handled, evalErr := evalComparisonBytes(e.cmpFast, data); handled || evalErr != nil {
210+
if res, handled, evalErr := evalComparison(e.cmpFast, data, nil); handled || evalErr != nil {
210211
return res, evalErr
211212
}
212213
}
213214
if e.funcFast != nil {
214-
if res, handled, evalErr := evalFuncBytes(e.funcFast, data); handled || evalErr != nil {
215+
if res, handled, evalErr := evalFunc(e.funcFast, data, nil); handled || evalErr != nil {
215216
return res, evalErr
216217
}
217218
}
@@ -222,14 +223,44 @@ func (e *Expression) EvalBytes(ctx context.Context, data json.RawMessage) (resul
222223
return e.Eval(ctx, v)
223224
}
224225

225-
// evalComparisonBytes evaluates a pre-compiled comparison fast path against raw JSON bytes.
226-
// Returns (result, true, nil) on success. Returns (nil, false, nil) when the expression
227-
// cannot safely short-circuit (e.g. the LHS is a JSON array that requires auto-mapping),
228-
// signalling the caller to fall back to full evaluation.
226+
// resolveGjsonPath resolves a gjson path from either raw bytes or a pre-decoded map.
227+
// When data is available (EvalMany), it delegates to gjson.GetBytes on the full blob.
228+
// When mapData is available (EvalMap), it does an O(1) map lookup for the top-level
229+
// key, then uses gjson on the nested value bytes — skipping sibling keys entirely.
230+
// Paths containing gjson special characters fall through to return an empty result,
231+
// letting the caller fall back to full AST evaluation.
232+
func resolveGjsonPath(data json.RawMessage, mapData map[string]json.RawMessage, path string) gjson.Result {
233+
if data != nil {
234+
return gjson.GetBytes(data, path)
235+
}
236+
if mapData == nil {
237+
return gjson.Result{}
238+
}
239+
if strings.ContainsAny(path, `\#*?@`) {
240+
return gjson.Result{}
241+
}
242+
key, rest, hasDot := strings.Cut(path, ".")
243+
raw, ok := mapData[key]
244+
if !ok {
245+
return gjson.Result{}
246+
}
247+
if !hasDot {
248+
return gjson.ParseBytes(raw)
249+
}
250+
return gjson.GetBytes(raw, rest)
251+
}
252+
253+
// evalComparison evaluates a pre-compiled comparison fast path against raw JSON
254+
// bytes or a pre-decoded map. Returns (result, true, nil) on success. Returns
255+
// (nil, false, nil) when the expression cannot safely short-circuit (e.g. the
256+
// LHS is a JSON array that requires auto-mapping), signalling the caller to
257+
// fall back to full evaluation.
229258
//
230259
//nolint:unparam // err is part of the funcFastHandler contract; always nil for now
231-
func evalComparisonBytes(c *parser.ComparisonFastPath, data json.RawMessage) (result any, handled bool, err error) {
232-
lhs := gjson.GetBytes(data, c.LHSPath)
260+
func evalComparison(
261+
c *parser.ComparisonFastPath, data json.RawMessage, mapData map[string]json.RawMessage,
262+
) (result any, handled bool, err error) {
263+
lhs := resolveGjsonPath(data, mapData, c.LHSPath)
233264
if !lhs.Exists() {
234265
// gjson couldn't resolve the path. This could be because the path is
235266
// truly undefined OR because an intermediate element is a JSON array

internal/evaluator/ordered_map.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package evaluator
33
import (
44
"bytes"
55
"encoding/json"
6+
"fmt"
67
"maps"
78
"slices"
89
)
@@ -147,6 +148,21 @@ func DecodeJSON(b json.RawMessage) (any, error) {
147148
return decodeValue(dec)
148149
}
149150

151+
// DecodeRawMap converts a map of field names to raw JSON values into an
152+
// *OrderedMap by decoding each value individually via DecodeJSON. Objects
153+
// in the values preserve key insertion order, consistent with DecodeJSON.
154+
func DecodeRawMap(m map[string]json.RawMessage) (*OrderedMap, error) {
155+
om := NewOrderedMapWithCapacity(len(m))
156+
for key, raw := range m {
157+
val, err := DecodeJSON(raw)
158+
if err != nil {
159+
return nil, fmt.Errorf("decode key %q: %w", key, err)
160+
}
161+
om.Set(key, val)
162+
}
163+
return om, nil
164+
}
165+
150166
func decodeValue(dec *json.Decoder) (any, error) {
151167
tok, err := dec.Token()
152168
if err != nil {

stream.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/recolabs/gnata/internal/evaluator"
1313
"github.com/recolabs/gnata/internal/parser"
14-
"github.com/tidwall/gjson"
1514
)
1615

1716
// StreamEvaluator manages compiled expressions for high-throughput streaming evaluation.
@@ -216,6 +215,25 @@ func (se *StreamEvaluator) Reset() {
216215
// Returns results[i] = evaluation of expressions[exprIndices[i]], or nil for undefined.
217216
func (se *StreamEvaluator) EvalMany(
218217
ctx context.Context, data json.RawMessage, schemaKey string, exprIndices []int,
218+
) ([]any, error) {
219+
return se.evalInternal(ctx, data, nil, nil, schemaKey, exprIndices)
220+
}
221+
222+
// EvalMap evaluates the specified expressions against a map of raw JSON values.
223+
// - data: map of field names to raw JSON-encoded values (decoded individually).
224+
// - schemaKey: external key identifying the event schema. On first encounter, builds
225+
// and caches a GroupPlan. Subsequent calls are lock-free. Pass "" to disable caching.
226+
// - exprIndices: which compiled expressions to evaluate.
227+
//
228+
// Returns results[i] = evaluation of expressions[exprIndices[i]], or nil for undefined.
229+
func (se *StreamEvaluator) EvalMap(
230+
ctx context.Context, data map[string]json.RawMessage, schemaKey string, exprIndices []int,
231+
) ([]any, error) {
232+
return se.evalInternal(ctx, nil, nil, data, schemaKey, exprIndices)
233+
}
234+
235+
func (se *StreamEvaluator) evalInternal(
236+
ctx context.Context, data json.RawMessage, preparsed any, mapData map[string]json.RawMessage, schemaKey string, exprIndices []int,
219237
) (results []any, err error) {
220238
defer recoverEvalPanic(&err)
221239
if len(exprIndices) == 0 {
@@ -247,7 +265,7 @@ func (se *StreamEvaluator) EvalMany(
247265
}
248266

249267
results = make([]any, len(exprIndices))
250-
batch := evalBatch{se: se, plan: plan, data: data}
268+
batch := evalBatch{se: se, plan: plan, data: data, mapData: mapData, parsed: preparsed, parseAttempted: preparsed != nil}
251269
for i, idx := range exprIndices {
252270
if err := ctx.Err(); err != nil {
253271
return nil, err
@@ -275,6 +293,7 @@ type evalBatch struct {
275293
se *StreamEvaluator
276294
plan *GroupPlan
277295
data json.RawMessage
296+
mapData map[string]json.RawMessage
278297
parsed any
279298
parsedErr error
280299
parseAttempted bool
@@ -300,18 +319,17 @@ func (b *evalBatch) evalSingleExpr(ctx context.Context, i, idx int, expr *Expres
300319
// or (nil, true, err) on error.
301320
func (b *evalBatch) tryFastPaths(i, idx int, start time.Time) (result any, done bool, err error) {
302321
if b.plan != nil && i < len(b.plan.ExprFastPath) && b.plan.ExprFastPath[i] && b.plan.FastPaths[i] != "" {
303-
r := gjson.GetBytes(b.data, b.plan.FastPaths[i])
322+
r := resolveGjsonPath(b.data, b.mapData, b.plan.FastPaths[i])
304323
if r.Exists() {
305324
if b.se.metrics != nil {
306325
b.se.metrics.OnEval(idx, true, time.Since(start), nil)
307326
}
308327
return gjsonValueToAny(&r), true, nil
309328
}
310-
// gjson miss — fall through to full evaluator (handles array auto-mapping)
311329
}
312330

313331
if b.plan != nil && i < len(b.plan.CmpFast) && b.plan.CmpFast[i] != nil {
314-
if result, handled, err := evalComparisonBytes(b.plan.CmpFast[i], b.data); err != nil {
332+
if result, handled, err := evalComparison(b.plan.CmpFast[i], b.data, b.mapData); err != nil {
315333
if b.se.metrics != nil {
316334
b.se.metrics.OnEval(idx, true, time.Since(start), err)
317335
}
@@ -325,7 +343,7 @@ func (b *evalBatch) tryFastPaths(i, idx int, start time.Time) (result any, done
325343
}
326344

327345
if b.plan != nil && i < len(b.plan.FuncFast) && b.plan.FuncFast[i] != nil {
328-
if result, handled, err := evalFuncBytes(b.plan.FuncFast[i], b.data); err != nil {
346+
if result, handled, err := evalFunc(b.plan.FuncFast[i], b.data, b.mapData); err != nil {
329347
if b.se.metrics != nil {
330348
b.se.metrics.OnEval(idx, true, time.Since(start), err)
331349
}
@@ -345,7 +363,11 @@ func (b *evalBatch) tryFastPaths(i, idx int, start time.Time) (result any, done
345363
func (b *evalBatch) fullEval(ctx context.Context, idx int, expr *Expression, start time.Time) (any, error) {
346364
if !b.parseAttempted {
347365
b.parseAttempted = true
348-
b.parsed, b.parsedErr = evaluator.DecodeJSON(b.data)
366+
if len(b.mapData) > 0 {
367+
b.parsed, b.parsedErr = evaluator.DecodeRawMap(b.mapData)
368+
} else if len(b.data) > 0 {
369+
b.parsed, b.parsedErr = evaluator.DecodeJSON(b.data)
370+
}
349371
}
350372
if b.parsedErr != nil {
351373
return nil, b.parsedErr

0 commit comments

Comments
 (0)