Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 198 additions & 13 deletions config/schema/artifacts/datastore_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,9 @@ index_templates:
type: integer
nested_fields2|the_seasons:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1310,6 +1313,9 @@ index_templates:
type: integer
widget_options|colors:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1476,6 +1482,9 @@ index_templates:
type: integer
fees|amount_cents:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1527,6 +1536,9 @@ indices:
type: integer
shapes|coordinates:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1593,6 +1605,9 @@ indices:
type: integer
owner_ids:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1625,6 +1640,9 @@ indices:
type: keyword
__typename:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1654,6 +1672,9 @@ indices:
type: integer
manufacturer_id:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1690,6 +1711,9 @@ indices:
type: keyword
nationality:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1722,6 +1746,9 @@ indices:
type: keyword
manufacturer_id:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1753,6 +1780,9 @@ indices:
type: keyword
__typename:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand All @@ -1778,6 +1808,9 @@ indices:
format: strict_date
active:
type: boolean
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand All @@ -1803,6 +1836,9 @@ indices:
type: keyword
name:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1835,6 +1871,9 @@ indices:
created_at:
type: date
format: strict_date_time
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -2156,13 +2195,107 @@ scripts:

// No timestamp values matched the params, so return `false`.
return false;
update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d:
update_index_data_f8fc933145913e9a98574ede2f3881ac:
context: update
script:
lang: painless
source: |-
// --- Helper Functions --- //
void setup(Map source, String relationship, Map counts) {

// Encodes a list of strings into an unambiguous, length-prefixed string ("len:value" parts concatenated).
String encodeKey(List parts) {
StringBuilder sb = new StringBuilder();
for (String part : parts) {
sb.append(part.length());
sb.append(':');
sb.append(part);
}
return sb.toString();
}

// Inverse of `encodeKey`.
List decodeKey(String key) {
List parts = new ArrayList();
int i = 0;
while (i < key.length()) {
int colonPos = key.indexOf(":", i);
int length = Integer.parseInt(key.substring(i, colonPos));
int valueStart = colonPos + 1;
parts.add(key.substring(valueStart, valueStart + length));
i = valueStart + length;
}
return parts;
}

// The encoded path to the nested element this event targets, or "" for a top-level event (no path).
// List segments contribute their matched id (the value at `sourceField`); object segments contribute their
// field name. Every segment contributes one part, so a path with any segments yields a non-empty key.
String buildNestedElementKey(String relationship, Map sourcedFromNestedPaths, Map sourcedFromNestedPathIdentifiers) {
List pathSegments = (List) sourcedFromNestedPaths.get(relationship);
if (pathSegments == null) {
return "";
}
List parts = new ArrayList();
for (Map segment : pathSegments) {
if (segment.containsKey("sourceField")) {
parts.add(sourcedFromNestedPathIdentifiers[segment.sourceField]);
} else {
parts.add(segment.get("field"));
}
}
return encodeKey(parts);
}

// The `__versions` key: the relationship for top-level events, or relationship + element identifiers for nested ones.
String buildVersionsKey(String relationship, String nestedElementKey) {
if (nestedElementKey.isEmpty()) {
return relationship;
}
List parts = decodeKey(nestedElementKey);
parts.add(0, relationship);
return encodeKey(parts);
}

// Finds the element of `elements` whose `id` equals `matchValue`, or null.
def findInList(List elements, String matchValue) {
for (Map element : elements) {
if (matchValue.equals(element.id)) {
return element;
}
}
return null;
}

// Navigates `source` through `pathSegments` to the target nested element, or null if any hop is missing.
// `keyParts` has one entry per segment (aligned by index): a list element's matched id, or an object field name.
def navigateToNestedElement(Map source, List pathSegments, List keyParts) {
Map current = source;

for (int i = 0; i < pathSegments.size(); i++) {
Map segment = (Map) pathSegments.get(i);
String field = (String) segment.get("field");

if (!current.containsKey(field)) {
return null;
}

if (segment.containsKey("sourceField")) {
current = (Map) findInList((List) current.get(field), (String) keyParts.get(i));
} else {
current = (Map) current.get(field);
}

if (current == null) {
return null;
}
}

return current;
}

// --- Main Functions --- //

void setup(Map source, String versionsKey, String relationship, String nestedElementKey, Map counts) {
if (source.__sources == null) {
source.__sources = [];
}
Expand All @@ -2171,18 +2304,27 @@ scripts:
source.__versions = [:];
}

if (source.__versions[relationship] == null) {
source.__versions[relationship] = [:];
if (source.__versions[versionsKey] == null) {
source.__versions[versionsKey] = [:];
}

if (!nestedElementKey.isEmpty()) {
if (source.__nested_sourced_data == null) {
source.__nested_sourced_data = [:];
}
if (source.__nested_sourced_data[relationship] == null) {
source.__nested_sourced_data[relationship] = [:];
}
}

if (counts != null && source.__counts == null) {
source.__counts = [:];
}
}

void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) {
Map relationshipVersionsMap = source.__versions.get(relationship);
List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList());
void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion, String versionsKey) {
Map versionsMap = source.__versions[versionsKey];
List previousSourceIdsForRelationship = versionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList());

if (previousSourceIdsForRelationship.size() > 0) {
throw new IllegalArgumentException(
Expand All @@ -2194,7 +2336,7 @@ scripts:
);
}

Number maybeDocVersion = relationshipVersionsMap.get(sourceId);
Number maybeDocVersion = versionsMap.get(sourceId);

// Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null.
long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue();
Expand All @@ -2216,8 +2358,43 @@ scripts:
}
}

void recordSource(Map source, String relationship, String sourceId, long eventVersion) {
source.__versions[relationship][sourceId] = eventVersion;
// Buffers nested sourced fields keyed by the target element, so they can be re-applied after any later self-event.
void storeNestedSourcedData(Map source, String relationship, Map sourcedFromNestedFields, String nestedElementKey) {
if (sourcedFromNestedFields.isEmpty()) {
return;
}

((Map) source.__nested_sourced_data[relationship]).put(nestedElementKey, sourcedFromNestedFields);
}

// Re-applies all buffered nested sourced data to its target elements. Runs on every event so that a
// self-event's `putAll` (which overwrites nested arrays with fresh data) doesn't drop previously sourced fields.
void applyNestedSourcedData(Map source, Map sourcedFromNestedPaths) {
if (source.__nested_sourced_data == null) {
return;
}

for (sourcedEntry in source.__nested_sourced_data.entrySet()) {
String relationship = (String) sourcedEntry.getKey();
Map dataByKey = (Map) sourcedEntry.getValue();
List pathSegments = (List) sourcedFromNestedPaths.get(relationship);

if (pathSegments == null) {
continue;
}

for (elementEntry in dataByKey.entrySet()) {
List keyParts = decodeKey((String) elementEntry.getKey());
Map target = (Map) navigateToNestedElement(source, pathSegments, keyParts);
if (target != null) {
target.putAll((Map) elementEntry.getValue());
}
}
}
}

void recordSource(Map source, String versionsKey, String relationship, String sourceId, long eventVersion) {
source.__versions[versionsKey][sourceId] = eventVersion;

// Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list.
// This ensures deterministic ordering of its elements regardless of event ingestion order, and lets us check membership in O(log N) time.
Expand All @@ -2241,8 +2418,16 @@ scripts:
String sourceId = params.sourceId;
long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles
Map counts = params.__counts;
Map sourcedFromNestedFields = params.sourcedFromNestedFields;
Map sourcedFromNestedPathIdentifiers = params.sourcedFromNestedPathIdentifiers;
Map sourcedFromNestedPaths = params.sourcedFromNestedPaths;

String nestedElementKey = buildNestedElementKey(relationship, sourcedFromNestedPaths, sourcedFromNestedPathIdentifiers);
String versionsKey = buildVersionsKey(relationship, nestedElementKey);

setup(source, relationship, counts);
validateSource(source, id, relationship, sourceId, eventVersion);
setup(source, versionsKey, relationship, nestedElementKey, counts);
validateSource(source, id, relationship, sourceId, eventVersion, versionsKey);
applyTopLevelFields(source, id, params.topLevelFields, counts);
recordSource(source, relationship, sourceId, eventVersion);
storeNestedSourcedData(source, relationship, sourcedFromNestedFields, nestedElementKey);
applyNestedSourcedData(source, sourcedFromNestedPaths);
recordSource(source, versionsKey, relationship, sourceId, eventVersion);
Loading