Skip to content

Replace BatchableSubscriber interface with batch attributes#865

Open
DavidBadura wants to merge 6 commits into
4.0.xfrom
feature/batch-subscriber-attributes
Open

Replace BatchableSubscriber interface with batch attributes#865
DavidBadura wants to merge 6 commits into
4.0.xfrom
feature/batch-subscriber-attributes

Conversation

@DavidBadura

@DavidBadura DavidBadura commented Jun 16, 2026

Copy link
Copy Markdown
Member

Batching for subscribers no longer relies on the BatchableSubscriber interface. The batch lifecycle is now declared with attributes, in line with the rest of the subscriber API which is already attribute and argument-resolver driven.

A subscriber opts into batching with #[BatchBegin], #[BatchFlush], #[BatchRollback] and optionally #[BatchShouldFlush]. The begin method may return a state object, which is injected into subscribe handlers via the #[BatchState] parameter attribute and passed to flush/rollback.

Subscribers no longer implement the BatchableSubscriber interface to take
part in batching. Instead the batch lifecycle is declared with attributes:
#[BatchBegin], #[BatchFlush], #[BatchRollback] and #[BatchShouldFlush], with
#[BatchState] to inject the per-batch state into a subscribe handler. This
keeps batching consistent with the rest of the subscriber API, which is
already attribute and argument-resolver driven.

The metadata factory now reads these into a BatchMetadata and rejects
duplicate or incomplete batch method sets. Argument resolution for handlers
moved into the MessageProcessor, and a BatchManager/Batch pair tracks the
open batch per subscription. Upgrade notes and the subscription docs are
updated accordingly.
ArgumentMetadata now carries the list of attribute instances attached to a
subscribe handler parameter, with an attribute() lookup helper, rather than a
dedicated bool for BatchState. The batch argument resolver and the metadata
factory ask for the BatchState attribute through that list, which keeps the
metadata generic for future parameter attributes.

Also fixes the nullsafe chain in BatchSubscriber, where metadata() was called
on a possibly-null accessor before the null check.
@github-actions

github-actions Bot commented Jun 16, 2026

Copy link
Copy Markdown

Hello 👋

here is the most recent benchmark result:

SplitStreamBench
================

+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                         | time (kde mode)                                     | memory                                     |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                 | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad10000Events () | 3.486ms (±0.00%)   | 3.334ms (±0.00%)   | +4.57%    | 35.769mb        | 36.288mb   | -1.43%      |
| benchSave10000Events () | 504.569ms (±0.00%) | 510.278ms (±0.00%) | -1.12%    | 35.904mb        | 36.423mb   | -1.43%      |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SimpleSetupStreamStoreBench
===========================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 1.159ms (±0.00%)   | 1.149ms (±0.00%)   | +0.89%    | 35.141mb        | 35.141mb   | 0.00%       |
| benchLoad10000Events ()                | 70.840ms (±0.00%)  | 73.486ms (±0.00%)  | -3.60%    | 35.141mb        | 35.141mb   | 0.00%       |
| benchSave1Event ()                     | 1.184ms (±0.00%)   | 1.173ms (±0.00%)   | +0.94%    | 35.141mb        | 35.141mb   | 0.00%       |
| benchSave10000Events ()                | 287.761ms (±0.00%) | 293.522ms (±0.00%) | -1.96%    | 35.141mb        | 35.141mb   | 0.00%       |
| benchSave10000Aggregates ()            | 8.683s (±0.00%)    | 8.592s (±0.00%)    | +1.07%    | 35.141mb        | 35.141mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 5.565s (±0.00%)    | 5.569s (±0.00%)    | -0.07%    | 35.141mb        | 35.141mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SubscriptionEngineBatchBench
============================

+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                           | time (kde mode)                                     | memory                                     |
+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                   | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchHandle10000Events () | 109.478ms (±0.00%) | 103.430ms (±0.00%) | +5.85%    | 35.720mb        | 35.720mb   | 0.00%       |
+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

NoopSubscriptionEngineBench
===========================

+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                           | time (kde mode)                                     | memory                                     |
+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                   | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchHandle10000Events () | 108.680ms (±0.00%) | 101.233ms (±0.00%) | +7.36%    | 48.296mb        | 48.296mb   | 0.00%       |
+---------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SubscriptionEngineBench
=======================

+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
|                           | time (kde mode)                               | memory                                     |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
| subject                   | Tag: <current>  | Tag: base       | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
| benchHandle10000Events () | 3.435s (±0.00%) | 3.423s (±0.00%) | +0.38%    | 48.296mb        | 48.296mb   | 0.00%       |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+

CommandToQueryBench
===================

+----------------+------------------+------------------+-----------+-----------------+------------+-------------+
|                | time (kde mode)                                 | memory                                     |
+----------------+------------------+------------------+-----------+-----------------+------------+-------------+
| subject        | Tag: <current>   | Tag: base        | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------+------------------+------------------+-----------+-----------------+------------+-------------+
| benchCreate () | 3.016ms (±0.00%) | 2.940ms (±0.00%) | +2.58%    | 5.060mb         | 4.881mb    | +3.68%      |
| benchUpdate () | 4.296ms (±0.00%) | 4.187ms (±0.00%) | +2.60%    | 5.051mb         | 5.067mb    | -0.32%      |
| benchBoth ()   | 7.177ms (±0.00%) | 7.042ms (±0.00%) | +1.91%    | 5.750mb         | 5.072mb    | +13.36%     |
+----------------+------------------+------------------+-----------+-----------------+------------+-------------+

PersonalDataBench
=================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 1.207ms (±0.00%)   | 1.192ms (±0.00%)   | +1.25%    | 35.805mb        | 35.805mb   | 0.00%       |
| benchLoad10000Events ()                | 110.351ms (±0.00%) | 108.585ms (±0.00%) | +1.63%    | 35.805mb        | 35.805mb   | 0.00%       |
| benchSave1Event ()                     | 2.120ms (±0.00%)   | 2.193ms (±0.00%)   | -3.35%    | 35.805mb        | 35.805mb   | 0.00%       |
| benchSave10000Events ()                | 322.726ms (±0.00%) | 321.292ms (±0.00%) | +0.45%    | 35.807mb        | 35.807mb   | 0.00%       |
| benchSave10000Aggregates ()            | 13.240s (±0.00%)   | 13.384s (±0.00%)   | -1.08%    | 35.805mb        | 35.805mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 10.185s (±0.00%)   | 10.119s (±0.00%)   | +0.65%    | 36.045mb        | 36.045mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SnapshotsBench
==============

+----------------------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                   | memory                                     |
+----------------------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>    | Tag: base         | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+
| benchLoad10000EventsMissingSnapshot () | 72.510ms (±0.00%) | 72.063ms (±0.00%) | +0.62%    | 35.146mb        | 35.146mb   | 0.00%       |
| benchLoad10000Events ()                | 1.343ms (±0.00%)  | 1.334ms (±0.00%)  | +0.70%    | 35.145mb        | 35.145mb   | 0.00%       |
+----------------------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+

SimpleSetupTaggableStoreBench
=============================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 1.198ms (±0.00%)   | 1.193ms (±0.00%)   | +0.39%    | 36.306mb        | 36.306mb   | 0.00%       |
| benchLoad10000Events ()                | 74.879ms (±0.00%)  | 73.768ms (±0.00%)  | +1.51%    | 36.306mb        | 36.306mb   | 0.00%       |
| benchSave1Event ()                     | 1.222ms (±0.00%)   | 1.199ms (±0.00%)   | +1.90%    | 36.306mb        | 36.306mb   | 0.00%       |
| benchSave10000Events ()                | 321.238ms (±0.00%) | 314.813ms (±0.00%) | +2.04%    | 36.306mb        | 36.306mb   | 0.00%       |
| benchSave10000Aggregates ()            | 8.661s (±0.00%)    | 8.603s (±0.00%)    | +0.67%    | 36.307mb        | 36.307mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 5.634s (±0.00%)    | 5.627s (±0.00%)    | +0.12%    | 36.307mb        | 36.307mb   | 0.00%       |
| benchAppend1Event ()                   | 1.543ms (±0.00%)   | 1.542ms (±0.00%)   | +0.06%    | 36.306mb        | 36.306mb   | 0.00%       |
| benchAppend100Events ()                | 7.710ms (±0.00%)   | 7.827ms (±0.00%)   | -1.50%    | 36.306mb        | 36.306mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

This comment gets update everytime a new commit comes in!

The event emitter section passed argumentResolvers to
MetadataSubscriberAccessorRepository, which no longer takes them. Resolvers
are registered on the DefaultSubscriptionEngine, matching the upgrade guide.
The lookup resolver is not registered by default, so show how to pass it to
the subscription engine, the same as the event emitter resolver.
@DavidBadura DavidBadura requested a review from DanielBadura June 16, 2026 17:24
Every caller only checks for presence, so a bool hasAttribute() reads better
than returning the attribute instance.
Add tests for ArgumentMetadata::hasAttribute, the usesBatching attribute
detection, message argument resolving in MessageProcessor and the
afterMessages flush path of the batch subscriber.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant