Skip to content

feat: multi-source ingestion#3517

Open
Ziinc wants to merge 6 commits into
mainfrom
claude/focused-fermat-ZdeFU
Open

feat: multi-source ingestion#3517
Ziinc wants to merge 6 commits into
mainfrom
claude/focused-fermat-ZdeFU

Conversation

@Ziinc

@Ziinc Ziinc commented May 22, 2026

Copy link
Copy Markdown
Contributor

This allows source routing at the payload level, with __LF_SOURCE being provided as a special routing key.
This allows separate HTTP pipelines to be merged together into a multi-source request, while maintaining backwards compatability with query parameter source routing.

internally, at LogEvent.make/2 nothing changes. All changes are done at the controller level.

Pre-merge todos:

  • test locally
  • verify performance
  • docs update

Closes O11Y-1838

here i am sending a multi-source payload to all sources named loadfest.

for benchmark of 500 events:

##### With input default #####
Name                                                                  ips        average  deviation         median         99th %
VerifyDeclaredSources - multi-source (1 source, 500 events)       16.80 K       59.51 μs    ±99.27%       53.04 μs      121.76 μs
VerifyDeclaredSources - passthrough (no __LF_SOURCE)              12.80 K       78.14 μs    ±34.03%       78.37 μs      122.00 μs

Comparison: 
VerifyDeclaredSources - multi-source (1 source, 500 events)       16.80 K
VerifyDeclaredSources - passthrough (no __LF_SOURCE)              12.80 K - 1.31x slower +18.63 μs

Reduction count statistics:

Name                                                        Reduction count
VerifyDeclaredSources - multi-source (1 source, 500 events)          9.63 K
VerifyDeclaredSources - passthrough (no __LF_SOURCE)              0.00400 K - 0.00x reduction count -9.62100 K

##### With input default #####
Name                                                              ips        average  deviation         median         99th %
Full ingest pipeline - single source (baseline)                1.67 K      598.03 μs  ±3055.95%      244.88 μs     3814.65 μs
Full ingest pipeline - multi source (1 declared source)        1.64 K      610.24 μs   ±621.85%      315.63 μs     2450.91 μs

Comparison: 
Full ingest pipeline - single source (baseline)                1.67 K
Full ingest pipeline - multi source (1 declared source)        1.64 K - 1.02x slower +12.21 μs

Name                                                            average  deviation         median         99th %
Full ingest pipeline - single source (baseline)                 28.36 K     ±0.00%        28.36 K        28.36 K
Full ingest pipeline - multi source (1 declared source)         51.70 K     ±0.00%        51.70 K        51.70 K

Comparison: 
Full ingest pipeline - single source (baseline)                 28.36 K
Full ingest pipeline - multi source (1 declared source)         51.70 K - 1.82x reduction count +23.35 K

there is very marginal performance impact, in the grand scheme of things it would speed things up as less http requests are made overall.

CleanShot.2026-05-22.at.14.13.42.mp4

claude added 3 commits May 22, 2026 01:56
Ingest requests to POST /, /api/logs, and /api/events can now declare a
source per event by setting __LF_SOURCE to the source UUID on each event.
The ?source= query param becomes optional and a single batch can fan out
to multiple sources.

A new VerifyDeclaredSources plug detects multi-source mode by pattern
matching on the first event, collects unique declared UUIDs, resolves
them via Sources.Cache, and verifies the caller owns each one (including
OAuth scope check). VerifyResourceAccess now passes through when the
query-param source is nil but declared sources are present.

LogController.create dispatches events per declared source, strips
__LF_SOURCE before ingest, and aggregates per-source results. Events
with malformed/missing __LF_SOURCE and no default source surface as
errors in the response while valid events are still ingested.

https://claude.ai/code/session_012GiTAWivtpRQfB1bUHnpW2
- group_events_by_source and aggregate_results: use for/reduce instead
  of Enum.reduce for consistency with codebase idioms
- event_source_token: use pattern matching function clauses instead of
  Map.get
- resolve_and_verify token collection: use for/into instead of
  reduce + filter
- log_controller tests: remove unsupported single-event test, add _json
  fan-out case, use source.token directly, move multi_source_setup to
  top of helpers, insert test-specific sources at test level, drop
  reject_context_functions from multi-source describe (test-level sources
  can't be pre-warmed before Mimic stubs run)
- verify_declared_sources tests: insert sources at test level, combine
  passthrough tests into one parameterized assertion, combine auth
  failure cases into one parameterized test

https://claude.ai/code/session_012GiTAWivtpRQfB1bUHnpW2
@Ziinc Ziinc requested review from Baishan, amokan and djwhitt May 22, 2026 06:42
@Ziinc Ziinc marked this pull request as ready for review May 22, 2026 06:43
def call(%{assigns: %{resource_type: :source, user: %User{} = user}} = conn, _opts) do
case extract_events(conn.body_params) do
[first | _] = events
when is_map(first) and (is_map_key(first, :__LF_SOURCE) or is_map_key(first, "__LF_SOURCE")) ->

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this more explicit by using a header to identify these? I worry about mistakes with the more implicit check on the first event.

@spec call(Plug.Conn.t(), opts()) :: Plug.Conn.t()
def call(%{assigns: %{declared_sources: declared}} = conn, _opts)
when map_size(declared) > 0 do
conn

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the limiter bypass here intentional?

}
)

assert json_response(conn, 406)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 406 right? Seems more like 400.

{:ok, n} -> {acc + n, errs}
:ok -> {acc, errs}
{:error, more} when is_list(more) -> {acc, errs ++ more}
{:error, err} -> {acc, errs ++ [err]}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: pretty sure this O(N) in list length. Could prepend and reverse after.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants