Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions elasticgraph-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,29 @@ indexer = ElasticGraph::Indexer.from_yaml_file("config/settings/local.yaml")
events = [] # JSON events read from an async datastream
indexer.processor.process(events)
```

## Custom Payload Decoding

`ElasticGraph::Indexer` can be configured with an indexing event decoder extension. Decoders turn raw payload strings
from a transport into ElasticGraph indexing event hashes before the normal validation and indexing pipeline runs. The
default decoder expects JSON Lines.

```yaml
indexer:
indexing_event_decoder:
name: MyCompany::ElasticGraph::CSVIndexingEventDecoder
require_path: ./lib/my_company/elastic_graph/csv_indexing_event_decoder
config:
delimiter: ","
```

Decoder extensions must implement:

```ruby
def initialize(config:, schema_artifacts:, logger:)
end

def decode(payload)
# return an array of ElasticGraph indexing event hashes
end
```
12 changes: 12 additions & 0 deletions elasticgraph-indexer/lib/elastic_graph/indexer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ def operation_factory
end
end

def indexing_event_decoder
@indexing_event_decoder ||= begin
extension = config.indexing_event_decoder
decoder_class = extension.extension_class # : untyped
decoder_class.new(
config: extension.config,
schema_artifacts: schema_artifacts,
logger: logger
)
end
end

def monotonic_clock
@monotonic_clock ||= begin
require "elastic_graph/support/monotonic_clock"
Expand Down
62 changes: 58 additions & 4 deletions elasticgraph-indexer/lib/elastic_graph/indexer/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
#
# frozen_string_literal: true

require "elastic_graph/support/config"
require "elastic_graph/errors"
require "elastic_graph/indexer/indexing_event_decoder"
require "elastic_graph/schema_artifacts/runtime_metadata/extension_loader"
require "elastic_graph/support/config"

module ElasticGraph
class Indexer
class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms, :skip_derived_indexing_type_updates)
class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms, :skip_derived_indexing_type_updates, :indexing_event_decoder)
DEFAULT_INDEXING_EVENT_DECODER = {
"name" => "ElasticGraph::Indexer::IndexingEventDecoder::JSONLines",
"require_path" => "elastic_graph/indexer/indexing_event_decoder"
}

json_schema at: "indexer",
optional: false,
description: "Configuration for indexing operations and metrics used by `elasticgraph-indexer`.",
Expand Down Expand Up @@ -42,17 +49,64 @@ class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms
{}, # : untyped
{"WidgetWorkspace" => ["ABC12345678"]}
]
},
indexing_event_decoder: {
description: "Extension object used to decode raw indexing payloads into ElasticGraph indexing event hashes. The default decoder expects JSON Lines.",
type: "object",
properties: {
name: {
description: "The name of the indexing event decoder extension class.",
type: "string",
pattern: /^[A-Z]\w+(::[A-Z]\w+)*$/.source, # https://rubular.com/r/UuqAz4fR3kdMip
examples: ["MyCompany::ElasticGraph::CSVIndexingEventDecoder"]
},
require_path: {
description: "The path to require to load the indexing event decoder extension.",
type: "string",
minLength: 1,
examples: ["./lib/my_company/elastic_graph/csv_indexing_event_decoder"]
},
config: {
description: "Configuration for the indexing event decoder. Will be passed into the decoder's `#initialize` method.",
type: "object",
default: {}, # : untyped
examples: [
{}, # : untyped
{"delimiter" => ","}
]
}
},
required: ["name", "require_path"],
default: DEFAULT_INDEXING_EVENT_DECODER,
examples: [
DEFAULT_INDEXING_EVENT_DECODER,
{
"name" => "MyCompany::ElasticGraph::CSVIndexingEventDecoder",
"require_path" => "./lib/my_company/elastic_graph/csv_indexing_event_decoder",
"config" => {"delimiter" => ","}
}
]
}
}

private

def convert_values(skip_derived_indexing_type_updates:, latency_slo_thresholds_by_timestamp_in_ms:)
def convert_values(skip_derived_indexing_type_updates:, latency_slo_thresholds_by_timestamp_in_ms:, indexing_event_decoder:)
{
skip_derived_indexing_type_updates: skip_derived_indexing_type_updates.transform_values(&:to_set),
latency_slo_thresholds_by_timestamp_in_ms: latency_slo_thresholds_by_timestamp_in_ms
latency_slo_thresholds_by_timestamp_in_ms: latency_slo_thresholds_by_timestamp_in_ms,
indexing_event_decoder: load_indexing_event_decoder(indexing_event_decoder)
}
end

def load_indexing_event_decoder(config)
loader = SchemaArtifacts::RuntimeMetadata::ExtensionLoader.new(IndexingEventDecoder::Interface)
loader.load(
config.fetch("name"),
from: config.fetch("require_path"),
config: config["config"] || {}
)
end
end
end
end
4 changes: 2 additions & 2 deletions elasticgraph-indexer/lib/elastic_graph/indexer/event_id.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module ElasticGraph
class Indexer
# A unique identifier for an event ingested by the indexer. As a string, takes the form of
# "[type]:[id]@v[version]", such as "Widget:123abc@v7". This format was designed to make it
# easy to put these ids in a comma-seperated list.
# easy to put these ids in a comma-separated list.
EventID = ::Data.define(:type, :id, :version) do
# @implements EventID
def self.from_event(event)
Expand All @@ -26,7 +26,7 @@ def to_s

# Steep weirdly expects them here...
# @dynamic initialize, config, datastore_core, schema_artifacts, datastore_router, monotonic_clock
# @dynamic record_preparer_factory, processor, operation_factory, logger
# @dynamic record_preparer_factory, processor, operation_factory, indexing_event_decoder, logger
# @dynamic self.from_parsed_yaml
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "json"

module ElasticGraph
class Indexer
# Namespace for indexing event decoders, which turn raw payload strings from a transport into
# ElasticGraph indexing event hashes. The decoder to use is configured via the
# `indexer.indexing_event_decoder` setting.
module IndexingEventDecoder
# Defines the indexing event decoder interface, which our extension loader will validate against.
class Interface
# @param config [Hash<String, Object>] configuration from the `indexing_event_decoder.config` setting
# @param schema_artifacts [SchemaArtifacts::FromDisk] the schema artifacts
# @param logger [Logger] the ElasticGraph logger
def initialize(config:, schema_artifacts:, logger:)
# must be defined, but nothing to do
end

# @param payload [String] a raw payload from the transport
# @return [Array<Hash<String, Object>>] the decoded ElasticGraph indexing events
def decode(payload)
# :nocov: -- must return an array to satisfy Steep type checking but never called
[]
# :nocov:
end
end

# The default indexing event decoder, which expects newline-delimited JSON objects.
class JSONLines < Interface
# (see Interface#initialize)
def initialize(config:, schema_artifacts:, logger:)
# must be defined for extension interface verification, but nothing to do
end

# (see Interface#decode)
def decode(payload)
payload.split("\n").map { |event| JSON.parse(event) }
end
end
end
end
end
3 changes: 3 additions & 0 deletions elasticgraph-indexer/sig/elastic_graph/indexer.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ module ElasticGraph
@operation_factory: Operation::Factory?
def operation_factory: () -> Operation::Factory

@indexing_event_decoder: indexingEventDecoder?
def indexing_event_decoder: () -> indexingEventDecoder

@monotonic_clock: Support::MonotonicClock?
def monotonic_clock: () -> Support::MonotonicClock

Expand Down
14 changes: 11 additions & 3 deletions elasticgraph-indexer/sig/elastic_graph/indexer/config.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,33 @@ module ElasticGraph

attr_reader latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer]
attr_reader skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]
attr_reader indexing_event_decoder: SchemaArtifacts::RuntimeMetadata::Extension

def initialize: (
?latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer],
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]) -> void
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]],
?indexing_event_decoder: ::Hash[::String, untyped]) -> void

def with: (
?latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer],
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]) -> Config
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]],
?indexing_event_decoder: SchemaArtifacts::RuntimeMetadata::Extension) -> Config

def self.members: () -> ::Array[::Symbol]
end

class Config < ConfigSupertype
DEFAULT_INDEXING_EVENT_DECODER: ::Hash[::String, untyped]

private

def convert_values: (
latency_slo_thresholds_by_timestamp_in_ms: untyped,
skip_derived_indexing_type_updates: untyped
skip_derived_indexing_type_updates: untyped,
indexing_event_decoder: untyped
) -> ::Hash[::Symbol, untyped]

def load_indexing_event_decoder: (::Hash[::String, untyped]) -> SchemaArtifacts::RuntimeMetadata::Extension
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module ElasticGraph
class Indexer
type indexingEventDecoder = IndexingEventDecoder::Interface

module IndexingEventDecoder
class Interface
def initialize: (
config: ::Hash[::Symbol | ::String, untyped],
schema_artifacts: schemaArtifacts,
logger: ::Logger
) -> void

def decode: (::String) -> ::Array[event]
end

class JSONLines < Interface
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

class ExampleIndexingEventDecoder
attr_reader :config, :schema_artifacts, :logger

def initialize(config:, schema_artifacts:, logger:)
@config = config
@schema_artifacts = schema_artifacts
@logger = logger
end

def decode(payload)
payload.split(config.fetch("delimiter")).map { |value| {"value" => value} }
end
end

class InvalidIndexingEventDecoder
def initialize(config:, schema_artifacts:, logger:)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ class Indexer

expect(config.skip_derived_indexing_type_updates).to eq("WidgetCurrency" => ["USD"].to_set)
end

it "uses the JSON Lines indexing event decoder by default" do
expect(Config.new.indexing_event_decoder.extension_class).to be(IndexingEventDecoder::JSONLines)
end

it "loads a configured indexing event decoder" do
config = Config.from_parsed_yaml("indexer" => {
"indexing_event_decoder" => {
"name" => "ExampleIndexingEventDecoder",
"require_path" => "support/example_extensions/indexing_event_decoder",
"config" => {"delimiter" => "|"}
}
})

expect(config.indexing_event_decoder.extension_class).to be(ExampleIndexingEventDecoder)
expect(config.indexing_event_decoder.config).to eq({"delimiter" => "|"})
end

it "verifies that a configured indexing event decoder implements the expected interface" do
expect {
Config.from_parsed_yaml("indexer" => {
"indexing_event_decoder" => {
"name" => "InvalidIndexingEventDecoder",
"require_path" => "support/example_extensions/indexing_event_decoder"
}
})
}.to raise_error Errors::InvalidExtensionError, a_string_including("InvalidIndexingEventDecoder", "Missing instance methods: `decode`")
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/indexer/indexing_event_decoder"

module ElasticGraph
class Indexer
RSpec.describe IndexingEventDecoder::JSONLines, :capture_logs do
it "decodes newline-delimited JSON objects" do
decoder = described_class.new(config: {}, schema_artifacts: nil, logger: logger)
payload = <<~JSONL
{"op":"upsert","id":"1"}
{"op":"upsert","id":"2"}
JSONL

expect(decoder.decode(payload)).to eq([
{"op" => "upsert", "id" => "1"},
{"op" => "upsert", "id" => "2"}
])
end
end
end
end
22 changes: 22 additions & 0 deletions elasticgraph-indexer/spec/unit/elastic_graph/indexer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,28 @@ module ElasticGraph

expect(indexer).to be_a(Indexer)
end

it "builds a configured indexing event decoder" do
config = Indexer::Config.from_parsed_yaml("indexer" => {
"indexing_event_decoder" => {
"name" => "ExampleIndexingEventDecoder",
"require_path" => "support/example_extensions/indexing_event_decoder",
"config" => {"delimiter" => "|"}
}
})
indexer = Indexer.new(config: config, datastore_core: build_datastore_core)

decoder = indexer.indexing_event_decoder

expect(decoder).to be_a(ExampleIndexingEventDecoder)
expect(decoder.config).to eq({"delimiter" => "|"})
expect(decoder.schema_artifacts).to be(indexer.schema_artifacts)
expect(decoder.logger).to be(indexer.logger)
expect(decoder.decode("one|two")).to eq([
{"value" => "one"},
{"value" => "two"}
])
end
end
end
end
Loading
Loading