From 240548ac9d645e34f3d36ea5bfcacf038c938e8c Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 19:02:15 +0200 Subject: [PATCH 1/7] Replace BatchableSubscriber interface with batch attributes Subscribers no longer implement the BatchableSubscriber interface to take part in batching. Instead the batch lifecycle is declared with attributes: #[BatchBegin], #[BatchFlush], #[BatchRollback] and #[BatchShouldFlush], with #[BatchState] to inject the per-batch state into a subscribe handler. This keeps batching consistent with the rest of the subscriber API, which is already attribute and argument-resolver driven. The metadata factory now reads these into a BatchMetadata and rejects duplicate or incomplete batch method sets. Argument resolution for handlers moved into the MessageProcessor, and a BatchManager/Batch pair tracks the open batch per subscription. Upgrade notes and the subscription docs are updated accordingly. --- docs/UPGRADE-4.0.md | 130 ++++++++++ docs/subscription.md | 77 +++--- phpstan-baseline.neon | 2 +- src/Attribute/BatchBegin.php | 12 + src/Attribute/BatchFlush.php | 16 ++ src/Attribute/BatchRollback.php | 12 + src/Attribute/BatchShouldFlush.php | 12 + src/Attribute/BatchState.php | 12 + src/Metadata/Subscriber/ArgumentMetadata.php | 1 + .../AttributeSubscriberMetadataFactory.php | 99 ++++++++ src/Metadata/Subscriber/BatchMetadata.php | 17 ++ .../Subscriber/DuplicateBeginBatchMethod.php | 25 ++ .../Subscriber/DuplicateFlushMethod.php | 25 ++ .../DuplicateRollbackBatchMethod.php | 25 ++ .../Subscriber/DuplicateShouldFlushMethod.php | 25 ++ .../Subscriber/IncompleteBatchMethods.php | 23 ++ .../Subscriber/SubscriberMetadata.php | 1 + .../Engine/DefaultSubscriptionEngine.php | 9 + .../Engine/Listener/BatchSubscriber.php | 126 +++++++--- .../Engine/Listener/FailSubscriber.php | 3 +- src/Subscription/Engine/MessageProcessor.php | 112 ++++++++- .../BatchArgumentResolver.php | 26 ++ src/Subscription/Subscriber/Batch.php | 21 ++ src/Subscription/Subscriber/BatchManager.php | 49 ++++ src/Subscription/Subscriber/BatchNotFound.php | 17 ++ .../Subscriber/BatchableSubscriber.php | 16 -- .../Subscriber/MetadataSubscriberAccessor.php | 73 +----- .../MetadataSubscriberAccessorRepository.php | 26 +- .../Projection/BatchProfileProjector.php | 59 +++-- .../Projection/BatchProfileState.php | 11 + .../Subscriber/BatchProfileProjection.php | 119 +++++++++ .../Subscriber/BatchProfileState.php | 11 + .../Subscriber/ProfileProjection.php | 16 +- .../ProfileProjectionWithCleanup.php | 16 +- .../Subscription/SubscriptionTest.php | 212 +++++++++++++++- tests/Unit/Fixture/BatchingState.php | 13 + tests/Unit/Fixture/BatchingSubscriber.php | 70 +++--- .../DefaultStateBatchingSubscriber.php | 37 +++ .../Fixture/VoidBeginBatchingSubscriber.php | 43 ++++ ...AttributeSubscriberMetadataFactoryTest.php | 235 ++++++++++++++++++ .../Engine/Handler/BootHandlerTest.php | 24 +- .../Engine/Handler/RunHandlerTest.php | 2 +- .../Engine/Listener/BatchSubscriberTest.php | 129 +++++++--- .../Engine/Listener/FailSubscriberTest.php | 24 +- .../Engine/MessageProcessorTest.php | 1 + .../BatchArgumentResolverTest.php | 79 ++++++ .../Subscriber/BatchManagerTest.php | 87 +++++++ ...tadataSubscriberAccessorRepositoryTest.php | 51 ---- .../MetadataSubscriberAccessorTest.php | 151 ++--------- 49 files changed, 1875 insertions(+), 507 deletions(-) create mode 100644 src/Attribute/BatchBegin.php create mode 100644 src/Attribute/BatchFlush.php create mode 100644 src/Attribute/BatchRollback.php create mode 100644 src/Attribute/BatchShouldFlush.php create mode 100644 src/Attribute/BatchState.php create mode 100644 src/Metadata/Subscriber/BatchMetadata.php create mode 100644 src/Metadata/Subscriber/DuplicateBeginBatchMethod.php create mode 100644 src/Metadata/Subscriber/DuplicateFlushMethod.php create mode 100644 src/Metadata/Subscriber/DuplicateRollbackBatchMethod.php create mode 100644 src/Metadata/Subscriber/DuplicateShouldFlushMethod.php create mode 100644 src/Metadata/Subscriber/IncompleteBatchMethods.php create mode 100644 src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php create mode 100644 src/Subscription/Subscriber/Batch.php create mode 100644 src/Subscription/Subscriber/BatchManager.php create mode 100644 src/Subscription/Subscriber/BatchNotFound.php delete mode 100644 src/Subscription/Subscriber/BatchableSubscriber.php create mode 100644 tests/Benchmark/BasicImplementation/Projection/BatchProfileState.php create mode 100644 tests/Integration/Subscription/Subscriber/BatchProfileProjection.php create mode 100644 tests/Integration/Subscription/Subscriber/BatchProfileState.php create mode 100644 tests/Unit/Fixture/BatchingState.php create mode 100644 tests/Unit/Fixture/DefaultStateBatchingSubscriber.php create mode 100644 tests/Unit/Fixture/VoidBeginBatchingSubscriber.php create mode 100644 tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php create mode 100644 tests/Unit/Subscription/Subscriber/BatchManagerTest.php diff --git a/docs/UPGRADE-4.0.md b/docs/UPGRADE-4.0.md index cfdaddb68..bf0565e6a 100644 --- a/docs/UPGRADE-4.0.md +++ b/docs/UPGRADE-4.0.md @@ -243,6 +243,136 @@ final class CustomResolver implements ArgumentResolver } ``` +### Custom ArgumentResolver registration + +Custom argument resolvers are no longer passed to the `MetadataSubscriberAccessorRepository`. +They are now passed to the `DefaultSubscriptionEngine`, which forwards them to the message processor. + +Before: + +```php +$subscriberRepository = new MetadataSubscriberAccessorRepository( + [new MySubscriber()], + argumentResolvers: [new MyResolver()], +); + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberRepository, +); +``` + +After: + +```php +$subscriberRepository = new MetadataSubscriberAccessorRepository( + [new MySubscriber()], +); + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberRepository, + argumentResolvers: [new MyResolver()], +); +``` + +### Batchable Subscriber + +The `Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber` interface has been removed. +Batching is now configured with attributes and the subscriber stays stateless: the data you collect +during a batch lives in a state object that the engine creates, keeps and hands back to your methods. + +* `beginBatch()` becomes a `#[BatchBegin]` method that returns the state object. +* `commitBatch()` becomes a `#[BatchFlush]` method that receives the state object. The batch size is now + configured on the attribute (`#[BatchFlush(afterMessages: 1000)]`). +* `rollbackBatch()` becomes a `#[BatchRollback]` method that receives the state object (optional). +* `forceCommit()` becomes a `#[BatchShouldFlush]` method that receives the state object (optional). +* The handler receives the state object through a parameter marked with `#[BatchState]`. + +Before: + +```php +use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; + +#[Projector('profile_1')] +final class MigrationSubscriber implements BatchableSubscriber +{ + /** @var array */ + private array $nameChanged = []; + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $event): void + { + $this->nameChanged[$event->userId] = $event->name; + } + + public function beginBatch(): void + { + $this->nameChanged = []; + } + + public function commitBatch(): void + { + // ... persist $this->nameChanged + $this->nameChanged = []; + } + + public function rollbackBatch(): void + { + } + + public function forceCommit(): bool + { + return count($this->nameChanged) > 1000; + } +} +``` + +After: + +```php +use Patchlevel\EventSourcing\Attribute\BatchState; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; + +final class MigrationBatch +{ + /** @var array */ + public array $nameChanged = []; +} + +#[Projector('profile_1')] +final class MigrationSubscriber +{ + #[BatchBegin] + public function beginBatch(): MigrationBatch + { + return new MigrationBatch(); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $event, #[BatchState] MigrationBatch $batch): void + { + $batch->nameChanged[$event->userId] = $event->name; + } + + #[BatchFlush(afterMessages: 1000)] + public function flush(MigrationBatch $batch): void + { + // ... persist $batch->nameChanged + } + + #[BatchRollback] + public function rollback(MigrationBatch $batch): void + { + } +} +``` + ## Store ### StreamStore diff --git a/docs/subscription.md b/docs/subscription.md index 5451aab58..05060fa13 100644 --- a/docs/subscription.md +++ b/docs/subscription.md @@ -752,39 +752,53 @@ For more information, see the [retry strategy](#retry-strategy) documentation. You can also optimize the performance of your subscribers by processing a number of events in a batch. This is particularly useful when projections need to be rebuilt. -To achieve this, you can implement the `BatchableSubscriber` interface. +To achieve this, you mark the relevant methods with the batch attributes. +The subscriber itself stays stateless: the `#[BatchBegin]` method returns a state object that the +engine keeps for you and hands back to the handler (via a `#[BatchState]` parameter) and to the +flush and rollback methods. ```php use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\Attribute\BatchState; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; use Patchlevel\EventSourcing\Attribute\Projector; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; +use Patchlevel\EventSourcing\Attribute\Subscribe; + +final class MigrationBatch +{ + /** @var array */ + public array $nameChanged = []; +} #[Projector('profile_1')] -final class MigrationSubscriber implements BatchableSubscriber +final class MigrationSubscriber { public function __construct( private readonly Connection $connection, ) { } - /** @var array */ - private array $nameChanged = []; - - #[Subscribe(NameChanged::class)] - public function handleNameChanged(NameChanged $event): void + #[BatchBegin] + public function beginBatch(): MigrationBatch { - $this->nameChanged[$event->userId] = $event->name; + $this->connection->beginTransaction(); + + return new MigrationBatch(); } - public function beginBatch(): void + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $event, #[BatchState] MigrationBatch $batch): void { - $this->nameChanged = []; - $this->connection->beginTransaction(); + $batch->nameChanged[$event->userId] = $event->name; } - public function commitBatch(): void + #[BatchFlush(afterMessages: 1000)] + public function flush(MigrationBatch $batch): void { - foreach ($this->nameChanged as $userId => $name) { + foreach ($batch->nameChanged as $userId => $name) { $this->connection->executeStatement( 'UPDATE user SET name = :name WHERE id = :id', ['name' => $name, 'id' => $userId], @@ -792,50 +806,53 @@ final class MigrationSubscriber implements BatchableSubscriber } $this->connection->commit(); - $this->nameChanged = []; } - public function rollbackBatch(): void + #[BatchShouldFlush] + public function shouldFlush(MigrationBatch $batch): bool { - $this->connection->rollBack(); + return count($batch->nameChanged) > 1000; } - public function forceCommit(): bool + #[BatchRollback] + public function rollback(MigrationBatch $batch): void { - return count($this->nameChanged) > 1000; + $this->connection->rollBack(); } } ``` -This interface provides you with all the options you need to process your data collectively. -The `beginBatch` method is called as soon as a subscriber wants to process an event. +The `#[BatchBegin]` method is optional and called as soon as a subscriber wants to process an event. If no suitable event is found in the stream, batching will not start, and this method will not be called. -Here, you can make all necessary preparations, such as opening a transaction or preparing variables. +Here, you can make all necessary preparations, such as opening a transaction, and optionally return the +state object that holds the data you collect during the batch. +If you do not define a `#[BatchBegin]` method, or it returns nothing, an empty `stdClass` object is used as the state. -The `commitBatch` method is called when batching was previously started, and one of the following conditions is met: +The `#[BatchFlush]` method is called when batching was previously started, and one of the following conditions is met: Either the Subscription Engine reaches its limit, or the stream is finished. -Alternatively, if the subscriber explicitly indicates using the `forceCommit` method that they want to process the data now. +You can also pass `afterMessages` to the attribute (`#[BatchFlush(afterMessages: 1000)]`) to flush automatically +after that many messages, or implement a `#[BatchShouldFlush]` method for custom logic. At this step, you must process all the data. -The `rollbackBatch` method is called when an error occurs and the batching needs to be aborted. +The `#[BatchRollback]` method is optional and called when an error occurs and the batching needs to be aborted. Here, you can respond to the error and potentially perform a database rollback. -The method `forceCommit` is called after each handled event, -and you can decide whether the batch commit process should start now. +The `#[BatchShouldFlush]` method is optional and called after each handled event, +and you can decide whether the flush process should start now. This helps to determine the batch size and thus avoid memory overflow. :::danger -Make sure to fully process the data in `commitBatch` and close any open transactions. +Make sure to fully process the data in the `#[BatchFlush]` method and close any open transactions. Otherwise, it may lead to inconsistent data. ::: :::note -The position of the subscriber is only updated after a successful commit. +The position of the subscriber is only updated after a successful flush. In case of an error, the position remains at the state before the batch started. ::: :::tip -Use `forceCommit` to prevent memory leaks. +Use `#[BatchFlush(afterMessages: ...)]` or a `#[BatchShouldFlush]` method to prevent memory leaks. This allows you to decide when it's suitable to process the data and then release the memory. ::: diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 6c4abf6bf..b463caf87 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -589,7 +589,7 @@ parameters: path: tests/Unit/Message/Translator/ReplaceEventTranslatorTest.php - - message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:266\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' + message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:278\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' identifier: missingType.parameter count: 1 path: tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php diff --git a/src/Attribute/BatchBegin.php b/src/Attribute/BatchBegin.php new file mode 100644 index 000000000..cbb3c2746 --- /dev/null +++ b/src/Attribute/BatchBegin.php @@ -0,0 +1,12 @@ +getAttributes(Subscribe::class); @@ -77,6 +87,57 @@ public function metadata(string $subscriber): SubscriberMetadata $subscribeMethods[$eventClass] = $this->subscribeMethod($method); } + if ($method->getAttributes(BatchBegin::class)) { + if ($beginBatchMethod !== null) { + throw new DuplicateBeginBatchMethod( + $subscriber, + $beginBatchMethod, + $method->getName(), + ); + } + + $beginBatchMethod = $method->getName(); + } + + $flushAttributes = $method->getAttributes(BatchFlush::class); + + if ($flushAttributes !== []) { + if ($flushMethod !== null) { + throw new DuplicateFlushMethod( + $subscriber, + $flushMethod, + $method->getName(), + ); + } + + $flushMethod = $method->getName(); + $flushAfterMessages = $flushAttributes[0]->newInstance()->afterMessages; + } + + if ($method->getAttributes(BatchShouldFlush::class)) { + if ($shouldFlushMethod !== null) { + throw new DuplicateShouldFlushMethod( + $subscriber, + $shouldFlushMethod, + $method->getName(), + ); + } + + $shouldFlushMethod = $method->getName(); + } + + if ($method->getAttributes(BatchRollback::class)) { + if ($rollbackBatchMethod !== null) { + throw new DuplicateRollbackBatchMethod( + $subscriber, + $rollbackBatchMethod, + $method->getName(), + ); + } + + $rollbackBatchMethod = $method->getName(); + } + if ($method->getAttributes(OnFailed::class)) { if ($failedMethod !== null) { throw new DuplicateFailedMethod( @@ -148,6 +209,28 @@ public function metadata(string $subscriber): SubscriberMetadata throw DuplicateSubscribeMethod::mixedWithAll($subscriber); } + $usesBatching = $beginBatchMethod !== null + || $flushMethod !== null + || $shouldFlushMethod !== null + || $rollbackBatchMethod !== null + || $this->hasBatchArgument($subscribeMethods); + + $batch = null; + + if ($usesBatching) { + if ($flushMethod === null) { + throw IncompleteBatchMethods::missingFlushMethod($subscriber); + } + + $batch = new BatchMetadata( + $flushMethod, + $beginBatchMethod, + $shouldFlushMethod, + $rollbackBatchMethod, + $flushAfterMessages, + ); + } + $metadata = new SubscriberMetadata( $subscriberInfo->id, $subscriberInfo->group, @@ -160,6 +243,7 @@ public function metadata(string $subscriber): SubscriberMetadata $cleanupMethod, $reflector->getAttributes(EnableEventEmittingDuringBoot::class) !== [], $reflector->getAttributes(DisableEventEmitting::class) !== [], + $batch, ); $this->subscriberMetadata[$subscriber] = $metadata; @@ -185,6 +269,7 @@ private function subscribeMethod(ReflectionMethod $method): SubscribeMethodMetad $arguments[] = new ArgumentMetadata( $parameter->getName(), $this->typeResolver->resolve($type), + $parameter->getAttributes(BatchState::class) !== [], ); } @@ -194,6 +279,20 @@ private function subscribeMethod(ReflectionMethod $method): SubscribeMethodMetad ); } + /** @param array $subscribeMethods */ + private function hasBatchArgument(array $subscribeMethods): bool + { + foreach ($subscribeMethods as $subscribeMethod) { + foreach ($subscribeMethod->arguments as $argument) { + if ($argument->batch) { + return true; + } + } + } + + return false; + } + private function retryStrategy(ReflectionClass $reflector): string|null { $attributes = $reflector->getAttributes(RetryStrategy::class); diff --git a/src/Metadata/Subscriber/BatchMetadata.php b/src/Metadata/Subscriber/BatchMetadata.php new file mode 100644 index 000000000..417ce8f59 --- /dev/null +++ b/src/Metadata/Subscriber/BatchMetadata.php @@ -0,0 +1,17 @@ +, Handler> */ private readonly array $handlers; + /** @param iterable $argumentResolvers */ public function __construct( private readonly MessageLoader $messageLoader, SubscriptionStore $subscriptionStore, @@ -62,6 +66,7 @@ public function __construct( private readonly LoggerInterface|null $logger = null, private readonly Cleaner|null $cleaner = null, private readonly EventDispatcherInterface $eventDispatcher = new EventDispatcher(), + iterable $argumentResolvers = [], ) { $this->subscriptionManager = new SubscriptionManager($subscriptionStore); @@ -80,9 +85,12 @@ public function __construct( $this->logger, ); + $batchManager = new BatchManager(); + $messageProcessor = new MessageProcessor( $this->subscriberRepository, $this->eventDispatcher, + [new BatchArgumentResolver($batchManager), ...$argumentResolvers], $this->logger, ); @@ -159,6 +167,7 @@ public function __construct( $this->eventDispatcher->addSubscriber( new BatchSubscriber( + $batchManager, $this->subscriberRepository, $this->logger, ), diff --git a/src/Subscription/Engine/Listener/BatchSubscriber.php b/src/Subscription/Engine/Listener/BatchSubscriber.php index 1f06d0fe3..c887e712c 100644 --- a/src/Subscription/Engine/Listener/BatchSubscriber.php +++ b/src/Subscription/Engine/Listener/BatchSubscriber.php @@ -4,28 +4,29 @@ namespace Patchlevel\EventSourcing\Subscription\Engine\Listener; +use InvalidArgumentException; use Patchlevel\EventSourcing\Subscription\Engine\Error; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnCommand; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessage; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageError; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageSuccess; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnProcessingFinished; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; +use Patchlevel\EventSourcing\Subscription\Subscriber\Batch; +use Patchlevel\EventSourcing\Subscription\Subscriber\BatchManager; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; -use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; +use stdClass; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Throwable; +use function is_object; use function sprintf; /** @internal */ final class BatchSubscriber implements EventSubscriberInterface { - /** @var array */ - private array $batching = []; - public function __construct( + private readonly BatchManager $batchManager, private readonly SubscriberAccessorRepository $subscriberRepository, private readonly LoggerInterface|null $logger = null, ) { @@ -33,41 +34,35 @@ public function __construct( public function onCommand(OnCommand $event): void { - $this->batching = []; + $this->batchManager->clear(); } public function onHandleMessage(OnHandleMessage $event): void { $subscriberId = $event->subscription->id(); - if (isset($this->batching[$subscriberId])) { + if ($this->batchManager->has($subscriberId)) { return; } $subscriber = $this->subscriberRepository->get($subscriberId); + $batchMetadata = $subscriber->metadata()?->batch; - if (!$subscriber) { + if ($subscriber === null || $batchMetadata === null) { return; } $realSubscriber = $subscriber->subscriber(); - if (!$realSubscriber instanceof BatchableSubscriber) { - return; - } - - $this->batching[$subscriberId] = [ - 'subscriber' => $realSubscriber, - 'subscription' => $event->subscription, - ]; - $this->logger?->debug(sprintf( 'Subscription Engine: Subscriber "%s" starts a new batch.', $subscriberId, )); + $beginMethod = $batchMetadata->beginMethod; + try { - $realSubscriber->beginBatch(); + $state = $beginMethod !== null ? $realSubscriber->$beginMethod() : null; } catch (Throwable $e) { $this->logger?->error(sprintf( 'Subscription Engine: Subscriber "%s" has an error in the begin batch method: %s', @@ -77,36 +72,59 @@ public function onHandleMessage(OnHandleMessage $event): void throw $e; } + + if ($state !== null && !is_object($state)) { + throw new InvalidArgumentException(sprintf( + 'Subscription Engine: Subscriber "%s" begin batch method must return an object or null.', + $subscriberId, + )); + } + + $this->batchManager->add(new Batch( + $event->subscription, + $subscriber, + $state ?? new stdClass(), + )); } public function onHandleMessageSuccess(OnHandleMessageSuccess $event): void { $subscriberId = $event->subscription->id(); - if (!isset($this->batching[$subscriberId])) { + if (!$this->batchManager->has($subscriberId)) { return; } - if (!$this->shouldCommitBatch($event->subscription)) { + $batch = $this->batchManager->get($subscriberId); + $batch->count++; + + if (!$this->shouldFlush($batch)) { $event->shouldChangePosition = false; return; } - $subscriber = $this->batching[$subscriberId]['subscriber']; - unset($this->batching[$subscriberId]); + $this->batchManager->remove($subscriberId); $this->logger?->debug(sprintf( - 'Subscription Engine: Subscriber "%s" commits the batch.', + 'Subscription Engine: Subscriber "%s" flushes the batch.', $subscriberId, )); + $flushMethod = $batch->accessor->metadata()->batch?->flushMethod; + + if ($flushMethod === null) { + $event->shouldChangePosition = true; + + return; + } + try { - $subscriber->commitBatch(); + $batch->accessor->subscriber()->$flushMethod($batch->state); $event->shouldChangePosition = true; } catch (Throwable $e) { $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" has an error in the commit batch method: %s', + 'Subscription Engine: Subscriber "%s" has an error in the flush method: %s', $subscriberId, $e->getMessage(), )); @@ -123,46 +141,55 @@ public function onProcessingFinished(OnProcessingFinished $event): void return; } - foreach ($this->batching as $subscriberId => ['subscriber' => $subscriber, 'subscription' => $subscription]) { - unset($this->batching[$subscriberId]); + foreach ($this->batchManager->all() as $batch) { + $subscriberId = $batch->subscription->id(); + $this->batchManager->remove($subscriberId); + + $flushMethod = $batch->accessor->metadata()->batch?->flushMethod; + + if ($flushMethod === null) { + $batch->subscription->changePosition($lastIndex); + + continue; + } $this->logger?->debug(sprintf( - 'Subscription Engine: Subscriber "%s" commits the batch.', + 'Subscription Engine: Subscriber "%s" flushes the batch.', $subscriberId, )); try { - $subscriber->commitBatch(); - $subscription->changePosition($lastIndex); + $batch->accessor->subscriber()->$flushMethod($batch->state); + $batch->subscription->changePosition($lastIndex); } catch (Throwable $e) { $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" has an error in the commit batch method: %s', + 'Subscription Engine: Subscriber "%s" has an error in the flush method: %s', $subscriberId, $e->getMessage(), )); - $subscription->error($e); + $batch->subscription->error($e); $event->errors[] = new Error($subscriberId, $e->getMessage(), $e); } } } - private function shouldCommitBatch(Subscription $subscription): bool - { - return $this->batching[$subscription->id()]['subscriber']->forceCommit(); - } - public function onError(OnHandleMessageError $event): void { $subscriptionId = $event->subscription->id(); - if (!isset($this->batching[$subscriptionId])) { + if (!$this->batchManager->has($subscriptionId)) { return; } - $subscriber = $this->batching[$subscriptionId]['subscriber']; + $batch = $this->batchManager->get($subscriptionId); + $this->batchManager->remove($subscriptionId); + + $rollbackMethod = $batch->accessor->metadata()->batch?->rollbackMethod; - unset($this->batching[$subscriptionId]); + if ($rollbackMethod === null) { + return; + } $this->logger?->debug(sprintf( 'Subscription Engine: Subscriber "%s" rollback the batch.', @@ -170,7 +197,7 @@ public function onError(OnHandleMessageError $event): void )); try { - $subscriber->rollbackBatch(); + $batch->accessor->subscriber()->$rollbackMethod($batch->state); } catch (Throwable $e) { $this->logger?->error(sprintf( 'Subscription Engine: Subscriber "%s" has an error in the rollback batch method: %s', @@ -180,6 +207,23 @@ public function onError(OnHandleMessageError $event): void } } + private function shouldFlush(Batch $batch): bool + { + $metadata = $batch->accessor->metadata()->batch; + + if ($metadata?->afterMessages !== null && $batch->count >= $metadata->afterMessages) { + return true; + } + + $shouldFlushMethod = $metadata?->shouldFlushMethod; + + if ($shouldFlushMethod === null) { + return false; + } + + return $batch->accessor->subscriber()->$shouldFlushMethod($batch->state) === true; + } + /** @return array */ public static function getSubscribedEvents(): array { diff --git a/src/Subscription/Engine/Listener/FailSubscriber.php b/src/Subscription/Engine/Listener/FailSubscriber.php index 48c7b63f1..5b48516a4 100644 --- a/src/Subscription/Engine/Listener/FailSubscriber.php +++ b/src/Subscription/Engine/Listener/FailSubscriber.php @@ -7,7 +7,6 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageError; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; @@ -48,7 +47,7 @@ public function handleFailed( return; } - if ($subscriber->subscriber() instanceof BatchableSubscriber) { + if ($subscriber->metadata()->batch !== null) { $subscription->failed($throwable); $this->subscriptionManager->update($subscription); diff --git a/src/Subscription/Engine/MessageProcessor.php b/src/Subscription/Engine/MessageProcessor.php index c24bb5fcf..9c9320744 100644 --- a/src/Subscription/Engine/MessageProcessor.php +++ b/src/Subscription/Engine/MessageProcessor.php @@ -5,25 +5,55 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscribeMethodMetadata; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessage; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageError; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageSuccess; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\RecordedOnArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Throwable; +use function array_key_exists; +use function array_merge; +use function array_values; +use function is_array; +use function iterator_to_array; use function sprintf; /** @internal */ final class MessageProcessor { + /** @var list */ + private readonly array $argumentResolvers; + + /** @var array> */ + private array $resolverCache = []; + + /** @param iterable|list $argumentResolvers */ public function __construct( private readonly SubscriberAccessorRepository $subscriberRepository, private readonly EventDispatcherInterface $eventDispatcher, + iterable $argumentResolvers = [], private readonly LoggerInterface|null $logger = null, ) { + $this->argumentResolvers = array_merge( + // the check for array is required before PHP 8.2 + array_values(is_array($argumentResolvers) ? $argumentResolvers : iterator_to_array($argumentResolvers)), + [ + new MessageArgumentResolver(), + new EventArgumentResolver(), + new RecordedOnArgumentResolver(), + ], + ); } public function process(int $index, Message $message, Subscription $subscription): Error|null @@ -91,8 +121,18 @@ public function process(int $index, Message $message, Subscription $subscription } try { + $context = new ArgumentResolverContext($message, $subscription, $subscriber->metadata()); + foreach ($subscribeMethods as $subscribeMethod) { - $subscribeMethod($message, $subscription); + $arguments = $this->resolveArguments( + $subscription->id(), + $message->event()::class, + $subscriber->subscriber()::class, + $subscribeMethod, + $context, + ); + + $subscriber->subscriber()->{$subscribeMethod->name}(...$arguments); } } catch (Throwable $e) { $this->logger?->error( @@ -144,4 +184,74 @@ public function process(int $index, Message $message, Subscription $subscription return null; } + + /** + * @param class-string $eventClass + * @param class-string $subscriberClass + * + * @return list + */ + private function resolveArguments( + string $subscriptionId, + string $eventClass, + string $subscriberClass, + SubscribeMethodMetadata $method, + ArgumentResolverContext $context, + ): array { + $resolvers = $this->resolversFor($subscriptionId, $eventClass, $subscriberClass, $method); + + $arguments = []; + + foreach ($method->arguments as $position => $argument) { + $arguments[] = $resolvers[$position]->resolve($argument, $context); + } + + return $arguments; + } + + /** + * @param class-string $eventClass + * @param class-string $subscriberClass + * + * @return list + */ + private function resolversFor( + string $subscriptionId, + string $eventClass, + string $subscriberClass, + SubscribeMethodMetadata $method, + ): array { + $key = $subscriptionId . '::' . $eventClass . '::' . $method->name; + + if (array_key_exists($key, $this->resolverCache)) { + return $this->resolverCache[$key]; + } + + $resolvers = []; + + foreach ($method->arguments as $argument) { + $resolvers[] = $this->resolverFor($argument, $eventClass, $subscriberClass, $method); + } + + return $this->resolverCache[$key] = $resolvers; + } + + /** + * @param class-string $eventClass + * @param class-string $subscriberClass + */ + private function resolverFor( + ArgumentMetadata $argument, + string $eventClass, + string $subscriberClass, + SubscribeMethodMetadata $method, + ): ArgumentResolver { + foreach ($this->argumentResolvers as $resolver) { + if ($resolver->support($argument, $eventClass)) { + return $resolver; + } + } + + throw new NoSuitableResolver($subscriberClass, $method->name, $argument->name); + } } diff --git a/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php new file mode 100644 index 000000000..401f2a1a2 --- /dev/null +++ b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php @@ -0,0 +1,26 @@ +batchManager->get($context->subscription->id())->state; + } + + public function support(ArgumentMetadata $argument, string $eventClass): bool + { + return $argument->batch; + } +} diff --git a/src/Subscription/Subscriber/Batch.php b/src/Subscription/Subscriber/Batch.php new file mode 100644 index 000000000..a61fbfb0b --- /dev/null +++ b/src/Subscription/Subscriber/Batch.php @@ -0,0 +1,21 @@ + $accessor */ + public function __construct( + public readonly Subscription $subscription, + public readonly MetadataSubscriberAccessor $accessor, + public readonly object $state, + ) { + } +} diff --git a/src/Subscription/Subscriber/BatchManager.php b/src/Subscription/Subscriber/BatchManager.php new file mode 100644 index 000000000..d6e01242b --- /dev/null +++ b/src/Subscription/Subscriber/BatchManager.php @@ -0,0 +1,49 @@ + */ + private array $batches = []; + + public function has(string $subscriptionId): bool + { + return array_key_exists($subscriptionId, $this->batches); + } + + public function get(string $subscriptionId): Batch + { + if (!array_key_exists($subscriptionId, $this->batches)) { + throw new BatchNotFound($subscriptionId); + } + + return $this->batches[$subscriptionId]; + } + + public function add(Batch $batch): void + { + $this->batches[$batch->subscription->id()] = $batch; + } + + public function remove(string $subscriptionId): void + { + unset($this->batches[$subscriptionId]); + } + + /** @return list */ + public function all(): array + { + return array_values($this->batches); + } + + public function clear(): void + { + $this->batches = []; + } +} diff --git a/src/Subscription/Subscriber/BatchNotFound.php b/src/Subscription/Subscriber/BatchNotFound.php new file mode 100644 index 000000000..fd716dfb2 --- /dev/null +++ b/src/Subscription/Subscriber/BatchNotFound.php @@ -0,0 +1,17 @@ +> */ + /** @var array> */ private array $subscribeCache = []; - /** - * @param T $subscriber - * @param list $argumentResolvers - */ + /** @param T $subscriber */ public function __construct( private readonly object $subscriber, private readonly SubscriberMetadata $metadata, - private readonly array $argumentResolvers, ) { } @@ -101,7 +93,7 @@ public function events(): array /** * @param class-string $eventClass * - * @return list + * @return list */ public function subscribeMethods(string $eventClass): array { @@ -119,61 +111,6 @@ public function subscribeMethods(string $eventClass): array $methods[] = $this->metadata->subscribeMethods[Subscribe::ALL]; } - $this->subscribeCache[$eventClass] = array_map( - fn (SubscribeMethodMetadata $method): Closure => $this->createClosure($eventClass, $method), - $methods, - ); - - return $this->subscribeCache[$eventClass]; - } - - /** - * @param class-string $eventClass - * - * @return Closure(Message, Subscription):void - */ - private function createClosure(string $eventClass, SubscribeMethodMetadata $method): Closure - { - $resolvers = $this->resolvers($eventClass, $method); - $methodName = $method->name; - - return function (Message $message, Subscription $subscription) use ($methodName, $resolvers): void { - $arguments = []; - - foreach ($resolvers as $resolver) { - $arguments[] = $resolver($message, $subscription); - } - - $this->subscriber->$methodName(...$arguments); - }; - } - - /** - * @param class-string $eventClass - * - * @return list - */ - private function resolvers(string $eventClass, SubscribeMethodMetadata $method): array - { - $resolvers = []; - $metadata = $this->metadata; - - foreach ($method->arguments as $argument) { - foreach ($this->argumentResolvers as $resolver) { - if (!$resolver->support($argument, $eventClass)) { - continue; - } - - $resolvers[] = static function (Message $message, Subscription $subscription) use ($resolver, $argument, $metadata): mixed { - return $resolver->resolve($argument, new ArgumentResolverContext($message, $subscription, $metadata)); - }; - - continue 2; - } - - throw new NoSuitableResolver($this->subscriber::class, $method->name, $argument->name); - } - - return $resolvers; + return $this->subscribeCache[$eventClass] = $methods; } } diff --git a/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php b/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php index 88fe173f6..cba34b27e 100644 --- a/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php +++ b/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php @@ -6,43 +6,20 @@ use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadataFactory; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\RecordedOnArgumentResolver; use function array_key_exists; -use function array_merge; use function array_values; -use function is_array; -use function iterator_to_array; final class MetadataSubscriberAccessorRepository implements SubscriberAccessorRepository { /** @var array */ private array $subscribersMap = []; - /** @var list $argumentResolvers */ - private readonly array $argumentResolvers; - - /** - * @param iterable $subscribers - * @param iterable|list $argumentResolvers - */ + /** @param iterable $subscribers */ public function __construct( private readonly iterable $subscribers, private readonly SubscriberMetadataFactory $metadataFactory = new AttributeSubscriberMetadataFactory(), - iterable $argumentResolvers = [], ) { - $this->argumentResolvers = array_merge( - // the check for array is required before PHP 8.2 - array_values(is_array($argumentResolvers) ? $argumentResolvers : iterator_to_array($argumentResolvers)), - [ - new MessageArgumentResolver(), - new EventArgumentResolver(), - new RecordedOnArgumentResolver(), - ], - ); } /** @return iterable */ @@ -75,7 +52,6 @@ private function subscriberAccessorMap(): array $this->subscribersMap[$metadata->id] = new MetadataSubscriberAccessor( $subscriber, $metadata, - $this->argumentResolvers, ); } diff --git a/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php b/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php index 3b3cd8b76..a364b0578 100644 --- a/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php +++ b/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php @@ -5,22 +5,22 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Projection; use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Attribute\Teardown; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Events\ProfileCreated; #[Projector(self::SUBSCRIBER_ID)] -final class BatchProfileProjector implements BatchableSubscriber +final class BatchProfileProjector { private const SUBSCRIBER_ID = 'profile'; - /** @var array */ - private array $nameChanged = []; - public function __construct( private Connection $connection, ) { @@ -51,9 +51,12 @@ public function onProfileCreated(ProfileCreated $profileCreated): void } #[Subscribe(NameChanged::class)] - public function onNameChanged(NameChanged $nameChanged): void - { - $this->nameChanged[$nameChanged->profileId->toString()] = $nameChanged->name; + public function onNameChanged( + NameChanged $nameChanged, + #[BatchState] + BatchProfileState $state, + ): void { + $state->nameChanged[$nameChanged->profileId->toString()] = $nameChanged->name; } public function table(): string @@ -61,35 +64,29 @@ public function table(): string return 'projection_' . self::SUBSCRIBER_ID; } - public function beginBatch(): void - { - $this->nameChanged = []; - } - - public function commitBatch(): void + #[BatchBegin] + public function beginBatch(): BatchProfileState { - try { - $this->connection->transactional(function (): void { - foreach ($this->nameChanged as $profileId => $name) { - $this->connection->update( - $this->table(), - ['name' => $name], - ['id' => $profileId], - ); - } - }); - } finally { - $this->nameChanged = []; - } + return new BatchProfileState(); } - public function rollbackBatch(): void + #[BatchFlush] + public function flush(BatchProfileState $state): void { - $this->nameChanged = []; + $this->connection->transactional(function () use ($state): void { + foreach ($state->nameChanged as $profileId => $name) { + $this->connection->update( + $this->table(), + ['name' => $name], + ['id' => $profileId], + ); + } + }); } - public function forceCommit(): bool + #[BatchRollback] + public function rollbackBatch(BatchProfileState $state): void { - return false; + $state->nameChanged = []; } } diff --git a/tests/Benchmark/BasicImplementation/Projection/BatchProfileState.php b/tests/Benchmark/BasicImplementation/Projection/BatchProfileState.php new file mode 100644 index 000000000..f22ac4dff --- /dev/null +++ b/tests/Benchmark/BasicImplementation/Projection/BatchProfileState.php @@ -0,0 +1,11 @@ + */ + public array $nameChanged = []; +} diff --git a/tests/Integration/Subscription/Subscriber/BatchProfileProjection.php b/tests/Integration/Subscription/Subscriber/BatchProfileProjection.php new file mode 100644 index 000000000..ea9b14669 --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/BatchProfileProjection.php @@ -0,0 +1,119 @@ +tableName()); + $table->addColumn('id', 'string')->setLength(36); + $table->addColumn('name', 'string')->setLength(255); + $table->setPrimaryKey(['id']); + + $this->connection->createSchemaManager()->createTable($table); + } + + #[Teardown] + public function drop(): void + { + $this->connection->createSchemaManager()->dropTable($this->tableName()); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated( + ProfileCreated $event, + #[BatchState] + BatchProfileState $state, + ): void { + $this->connection->insert( + $this->tableName(), + [ + 'id' => $event->profileId->toString(), + 'name' => $event->name, + ], + ); + + $state->insertedIds[] = $event->profileId->toString(); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged( + NameChanged $event, + #[BatchState] + BatchProfileState $state, + ): void { + if ($event->name === self::POISON) { + throw new RuntimeException('poisoned event'); + } + + $this->connection->update( + $this->tableName(), + ['name' => $event->name], + ['id' => $event->profileId->toString()], + ); + } + + private function tableName(): string + { + return 'projection_' . self::SUBSCRIBER_ID; + } + + #[BatchBegin] + public function beginBatch(): BatchProfileState + { + $this->beginCount++; + $this->connection->beginTransaction(); + + return new BatchProfileState(); + } + + #[BatchFlush] + public function flush(BatchProfileState $state): void + { + $this->flushCount++; + $this->connection->commit(); + } + + #[BatchRollback] + public function rollbackBatch(BatchProfileState $state): void + { + $this->rollbackCount++; + $this->connection->rollBack(); + } +} diff --git a/tests/Integration/Subscription/Subscriber/BatchProfileState.php b/tests/Integration/Subscription/Subscriber/BatchProfileState.php new file mode 100644 index 000000000..6e3f2269a --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/BatchProfileState.php @@ -0,0 +1,11 @@ + */ + public array $insertedIds = []; +} diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjection.php b/tests/Integration/Subscription/Subscriber/ProfileProjection.php index 1ca44af33..30c76d1d8 100644 --- a/tests/Integration/Subscription/Subscriber/ProfileProjection.php +++ b/tests/Integration/Subscription/Subscriber/ProfileProjection.php @@ -6,15 +6,17 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Schema\Table; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Attribute\Teardown; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated; #[Projector(self::SUBSCRIBER_ID)] -final class ProfileProjection implements BatchableSubscriber +final class ProfileProjection { private const SUBSCRIBER_ID = 'profile_1'; @@ -57,23 +59,21 @@ private function tableName(): string return 'projection_' . self::SUBSCRIBER_ID; } + #[BatchBegin] public function beginBatch(): void { $this->connection->beginTransaction(); } - public function commitBatch(): void + #[BatchFlush] + public function flush(): void { $this->connection->commit(); } + #[BatchRollback] public function rollbackBatch(): void { $this->connection->rollBack(); } - - public function forceCommit(): bool - { - return false; - } } diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php index 4aeed5d7e..c8d23f26c 100644 --- a/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php +++ b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php @@ -7,16 +7,18 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Schema\Table; use Generator; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\Cleanup; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DropTableTask; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated; #[Projector('profile_1')] -final class ProfileProjectionWithCleanup implements BatchableSubscriber +final class ProfileProjectionWithCleanup { private const TABLE_NAME = 'profile_1'; @@ -59,23 +61,21 @@ private function tableName(): string return 'projection_' . self::TABLE_NAME; } + #[BatchBegin] public function beginBatch(): void { $this->connection->beginTransaction(); } - public function commitBatch(): void + #[BatchFlush] + public function flush(): void { $this->connection->commit(); } + #[BatchRollback] public function rollbackBatch(): void { $this->connection->rollBack(); } - - public function forceCommit(): bool - { - return false; - } } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 2dd800487..567e2d6d9 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -52,6 +52,7 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\DbalManager; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\BatchProfileProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerWithSelfRecoverySubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\LookupSubscriber; @@ -1412,18 +1413,18 @@ public function testLookup(): void [ new LookupSubscriber($this->projectionConnection), ], - argumentResolvers: [ - new LookupResolver( - $store, - $eventRegistry, - ), - ], ); $engine = new DefaultSubscriptionEngine( new StoreMessageLoader($store), $subscriptionStore, $subscriberRepository, + argumentResolvers: [ + new LookupResolver( + $store, + $eventRegistry, + ), + ], ); $result = $engine->execute(new SetupCommand()); @@ -1509,9 +1510,6 @@ public function testEventEmitter(): void new NotificationEmittingProjection(), $collector, ], - argumentResolvers: [ - new EventEmitterResolver($store), - ], ); $eventDispatcher = new EventDispatcher(); @@ -1525,6 +1523,9 @@ public function testEventEmitter(): void $subscriptionStore, $subscriberRepository, eventDispatcher: $eventDispatcher, + argumentResolvers: [ + new EventEmitterResolver($store), + ], ); $engine->execute(new SetupCommand()); @@ -1620,6 +1621,199 @@ class { self::assertEquals(RunMode::FromNow, $subscriptions[0]->runMode()); } + public function testBatch(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $projection = new BatchProfileProjection($this->projectionConnection); + $subscriberRepository = new MetadataSubscriberAccessorRepository([$projection]); + + $engine = new DefaultSubscriptionEngine( + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + $subscriptionStore, + $subscriberRepository, + ); + + $result = $engine->execute(new SetupCommand()); + self::assertEquals([], $result->errors); + + $result = $engine->execute(new Boot()); + self::assertProcessedMessages(0, $result); + self::assertEquals([], $result->errors); + + $aliceId = ProfileId::generate(); + $bobId = ProfileId::generate(); + $charlieId = ProfileId::generate(); + + $repository->save(Profile::create($aliceId, 'Alice')); + $repository->save(Profile::create($bobId, 'Bob')); + $repository->save(Profile::create($charlieId, 'Charlie')); + + $result = $engine->execute(new Run()); + + self::assertProcessedMessages(3, $result); + self::assertEquals([], $result->errors); + + // all three events were processed in a single batch: one begin, one flush, no rollback + self::assertSame(1, $projection->beginCount); + self::assertSame(1, $projection->flushCount); + self::assertSame(0, $projection->rollbackCount); + + self::assertEquals( + [ + new Subscription( + 'batch_profile', + 'projector', + RunMode::FromBeginning, + Status::Active, + 3, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $aliceRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$aliceId->toString()], + ); + + self::assertIsArray($aliceRow); + self::assertSame('Alice', $aliceRow['name']); + + $bobRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$bobId->toString()], + ); + + self::assertIsArray($bobRow); + self::assertSame('Bob', $bobRow['name']); + + $charlieRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$charlieId->toString()], + ); + + self::assertIsArray($charlieRow); + self::assertSame('Charlie', $charlieRow['name']); + } + + public function testBatchRollback(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $projection = new BatchProfileProjection($this->projectionConnection); + $subscriberRepository = new MetadataSubscriberAccessorRepository([$projection]); + + $engine = new DefaultSubscriptionEngine( + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + $subscriptionStore, + $subscriberRepository, + ); + + $engine->execute(new SetupCommand()); + $engine->execute(new Boot()); + + // first batch commits successfully + $aliceId = ProfileId::generate(); + $repository->save(Profile::create($aliceId, 'Alice')); + + $result = $engine->execute(new Run()); + + self::assertProcessedMessages(1, $result); + self::assertEquals([], $result->errors); + self::assertSame(1, $projection->flushCount); + + // second batch inserts Bob and then hits a poisoned event, which rolls the batch back + $bobId = ProfileId::generate(); + $bob = Profile::create($bobId, 'Bob'); + $bob->changeName(BatchProfileProjection::POISON); + $repository->save($bob); + + $result = $engine->execute(new Run()); + + self::assertCount(1, $result->errors); + self::assertSame(2, $projection->beginCount); + self::assertSame(1, $projection->flushCount); + self::assertSame(1, $projection->rollbackCount); + + $subscription = self::findSubscription($engine->subscriptions(), 'batch_profile'); + + // the position stays at the last successfully committed event + self::assertEquals(Status::Error, $subscription->status()); + self::assertEquals(1, $subscription->position()); + + // Alice (committed in the first batch) survives, Bob's insert was rolled back + $aliceRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$aliceId->toString()], + ); + + self::assertIsArray($aliceRow); + self::assertSame('Alice', $aliceRow['name']); + + $bobRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$bobId->toString()], + ); + + self::assertFalse($bobRow); + } + /** @phpstan-assert ProcessedResult $result */ private static function assertProcessedMessages(int $expected, Result $result): void { diff --git a/tests/Unit/Fixture/BatchingState.php b/tests/Unit/Fixture/BatchingState.php new file mode 100644 index 000000000..783567e9e --- /dev/null +++ b/tests/Unit/Fixture/BatchingState.php @@ -0,0 +1,13 @@ + */ + public array $messages = []; +} diff --git a/tests/Unit/Fixture/BatchingSubscriber.php b/tests/Unit/Fixture/BatchingSubscriber.php index a216cc39b..960027f29 100644 --- a/tests/Unit/Fixture/BatchingSubscriber.php +++ b/tests/Unit/Fixture/BatchingSubscriber.php @@ -4,17 +4,21 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Fixture; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Attribute\Subscriber; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Subscription\RunMode; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Throwable; use function count; #[Subscriber(BatchingSubscriber::ID, RunMode::FromBeginning)] -final class BatchingSubscriber implements BatchableSubscriber +final class BatchingSubscriber { public const ID = 'test'; @@ -22,21 +26,37 @@ final class BatchingSubscriber implements BatchableSubscriber public array $receivedMessages = []; public int $beginBatchCalled = 0; - public int $commitBatchCalled = 0; - public int $rollbackBatchCalled = 0; + public int $flushCalled = 0; + public int $rollbackCalled = 0; public function __construct( public readonly Throwable|null $throwForMessage = null, public readonly Throwable|null $throwForBeginBatch = null, - public readonly Throwable|null $throwForCommitBatch = null, - public readonly Throwable|null $throwForRollbackBatch = null, - public readonly int $forceCommitAfterMessages = 1_000, + public readonly Throwable|null $throwForFlush = null, + public readonly Throwable|null $throwForRollback = null, + public readonly int $flushAfterMessages = 1_000, ) { } - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void + #[BatchBegin] + public function begin(): BatchingState { + $this->beginBatchCalled++; + + if ($this->throwForBeginBatch !== null) { + throw $this->throwForBeginBatch; + } + + return new BatchingState(); + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + BatchingState $state, + ): void { + $state->messages[] = $message; $this->receivedMessages[] = $message; if ($this->throwForMessage !== null) { @@ -44,35 +64,29 @@ public function handle(Message $message): void } } - public function beginBatch(): void + #[BatchFlush] + public function flush(BatchingState $state): void { - $this->beginBatchCalled++; + $this->flushCalled++; - if ($this->throwForBeginBatch !== null) { - throw $this->throwForBeginBatch; + if ($this->throwForFlush !== null) { + throw $this->throwForFlush; } } - public function commitBatch(): void + #[BatchShouldFlush] + public function shouldFlush(BatchingState $state): bool { - $this->commitBatchCalled++; - - if ($this->throwForCommitBatch !== null) { - throw $this->throwForCommitBatch; - } + return $this->flushAfterMessages <= count($state->messages); } - public function rollbackBatch(): void + #[BatchRollback] + public function rollback(BatchingState $state): void { - $this->rollbackBatchCalled++; + $this->rollbackCalled++; - if ($this->throwForRollbackBatch !== null) { - throw $this->throwForRollbackBatch; + if ($this->throwForRollback !== null) { + throw $this->throwForRollback; } } - - public function forceCommit(): bool - { - return $this->forceCommitAfterMessages <= count($this->receivedMessages); - } } diff --git a/tests/Unit/Fixture/DefaultStateBatchingSubscriber.php b/tests/Unit/Fixture/DefaultStateBatchingSubscriber.php new file mode 100644 index 000000000..6272a9165 --- /dev/null +++ b/tests/Unit/Fixture/DefaultStateBatchingSubscriber.php @@ -0,0 +1,37 @@ +receivedState = $state; + } + + #[BatchFlush] + public function flush(object $state): void + { + $this->flushedState = $state; + } +} diff --git a/tests/Unit/Fixture/VoidBeginBatchingSubscriber.php b/tests/Unit/Fixture/VoidBeginBatchingSubscriber.php new file mode 100644 index 000000000..d59b0d160 --- /dev/null +++ b/tests/Unit/Fixture/VoidBeginBatchingSubscriber.php @@ -0,0 +1,43 @@ +beginCalled = true; + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + object $state, + ): void { + $this->receivedState = $state; + } + + #[BatchFlush] + public function flush(object $state): void + { + } +} diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index a7d239f4a..ae2566e26 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -4,6 +4,11 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Metadata\Subscriber; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\DisableEventEmitting; use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot; use Patchlevel\EventSourcing\Attribute\Processor; @@ -16,16 +21,23 @@ use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentTypeNotSupported; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; +use Patchlevel\EventSourcing\Metadata\Subscriber\BatchMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\ClassIsNotASubscriber; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateBeginBatchMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateFlushMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateRollbackBatchMethod; use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateSetupMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateShouldFlushMethod; use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateSubscribeMethod; use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateTeardownMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\IncompleteBatchMethods; use Patchlevel\EventSourcing\Metadata\Subscriber\SubscribeMethodMetadata; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; +use stdClass; use Symfony\Component\TypeInfo\Type; #[CoversClass(AttributeSubscriberMetadataFactory::class)] @@ -360,4 +372,227 @@ public function drop2(): void $metadataFactory = new AttributeSubscriberMetadataFactory(); $metadataFactory->metadata($subscriber::class); } + + public function testBatchMetadata(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function profileVisited(ProfileVisited $event, #[BatchState] + object $state,): void + { + } + + #[BatchBegin] + public function begin(): object + { + return new stdClass(); + } + + #[BatchFlush(afterMessages: 100)] + public function flush(object $state): void + { + } + + #[BatchShouldFlush] + public function shouldFlush(object $state): bool + { + return false; + } + + #[BatchRollback] + public function rollback(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals( + new BatchMetadata('flush', 'begin', 'shouldFlush', 'rollback', 100), + $metadata->batch, + ); + + self::assertTrue($metadata->subscribeMethods[ProfileVisited::class]->arguments[1]->batch); + self::assertFalse($metadata->subscribeMethods[ProfileVisited::class]->arguments[0]->batch); + } + + public function testBatchMinimalMetadata(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin(): object + { + return new stdClass(); + } + + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals(new BatchMetadata('flush', 'begin'), $metadata->batch); + } + + public function testBatchWithoutBeginMethod(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function profileVisited(ProfileVisited $event, #[BatchState] + object $state,): void + { + } + + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals(new BatchMetadata('flush'), $metadata->batch); + self::assertNull($metadata->batch?->beginMethod); + } + + public function testNoBatchMetadata(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + + self::assertNull($metadataFactory->metadata($subscriber::class)->batch); + } + + public function testBatchArgumentWithoutLifecycleMethods(): void + { + $this->expectException(IncompleteBatchMethods::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function profileVisited(ProfileVisited $event, #[BatchState] + object $state,): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testBatchWithoutFlushMethod(): void + { + $this->expectException(IncompleteBatchMethods::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin(): object + { + return new stdClass(); + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateBeginBatchException(): void + { + $this->expectException(DuplicateBeginBatchMethod::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin1(): object + { + return new stdClass(); + } + + #[BatchBegin] + public function begin2(): object + { + return new stdClass(); + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateFlushException(): void + { + $this->expectException(DuplicateFlushMethod::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchFlush] + public function flush1(object $state): void + { + } + + #[BatchFlush] + public function flush2(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateShouldFlushException(): void + { + $this->expectException(DuplicateShouldFlushMethod::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchShouldFlush] + public function shouldFlush1(object $state): bool + { + return false; + } + + #[BatchShouldFlush] + public function shouldFlush2(object $state): bool + { + return false; + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateRollbackBatchException(): void + { + $this->expectException(DuplicateRollbackBatchMethod::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchRollback] + public function rollback1(object $state): void + { + } + + #[BatchRollback] + public function rollback2(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } } diff --git a/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php index 2a9987e35..c91cf5c2f 100644 --- a/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php @@ -4,6 +4,9 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine\Handler; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\OnFailed; use Patchlevel\EventSourcing\Attribute\RetryStrategy as RetryStrategyName; use Patchlevel\EventSourcing\Attribute\Subscribe; @@ -22,7 +25,6 @@ use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategyRepository; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Subscription\SubscriptionError; @@ -34,6 +36,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use stdClass; use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(BootHandler::class)] @@ -58,7 +61,7 @@ private function createHandler( $eventDispatcher->addSubscriber(new RetrySubscriber($subscriptionManager, $subscriberRepository, $retryStrategyRepository, new NullLogger())); $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [], new NullLogger()); return new BootHandler($messageLoader, $subscriptionManager, $subscriberRepository, $messageProcessor, $eventDispatcher, new NullLogger()); } @@ -330,7 +333,7 @@ public function testBootWithErrorAndRecoveryFailedBecauseBatching(): void $subscriptionId = 'test'; $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] #[RetryStrategyName('no_retry')] - class implements BatchableSubscriber { + class { public function __construct( public readonly RuntimeException $exception = new RuntimeException('ERROR'), ) { @@ -347,21 +350,20 @@ public function onFailed(): void { } - public function beginBatch(): void - { - } - - public function commitBatch(): void + #[BatchBegin] + public function beginBatch(): object { + return new stdClass(); } - public function rollbackBatch(): void + #[BatchFlush] + public function flush(object $state): void { } - public function forceCommit(): bool + #[BatchRollback] + public function rollbackBatch(object $state): void { - return false; } }; diff --git a/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php index f7dd5b4e7..d16f5c406 100644 --- a/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php @@ -64,7 +64,7 @@ private function createHandler( $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addListener(OnCommand::class, new DetachListener($subscriptionManager, $subscriberRepository, new NullLogger()), 32); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [], new NullLogger()); $handler = new RunHandler($messageLoader, $subscriptionManager, $messageProcessor, $eventDispatcher, new NullLogger()); diff --git a/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php b/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php index 2082c6841..5dcd3297f 100644 --- a/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php +++ b/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php @@ -23,18 +23,23 @@ use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategyRepository; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\BatchArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\BatchManager; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Subscription\SubscriptionError; use Patchlevel\EventSourcing\Subscription\ThrowableToErrorContextTransformer; use Patchlevel\EventSourcing\Tests\Unit\Fixture\BatchingSubscriber; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\DefaultStateBatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\VoidBeginBatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Subscription\DummySubscriptionStore; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use stdClass; use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(BatchSubscriber::class)] @@ -54,12 +59,13 @@ private function createBootHandler( $subscriberRepository = new MetadataSubscriberAccessorRepository($subscribers); $subscriptionManager = new SubscriptionManager($store); $eventDispatcher = new EventDispatcher(); + $batchManager = new BatchManager(); - $eventDispatcher->addSubscriber(new BatchSubscriber($subscriberRepository, new NullLogger())); + $eventDispatcher->addSubscriber(new BatchSubscriber($batchManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addSubscriber(new RetrySubscriber($subscriptionManager, $subscriberRepository, $retryStrategyRepository, new NullLogger())); $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [new BatchArgumentResolver($batchManager)], new NullLogger()); return new BootHandler($messageLoader, $subscriptionManager, $subscriberRepository, $messageProcessor, $eventDispatcher, new NullLogger()); } @@ -82,13 +88,14 @@ private function createRunHandler( $subscriberRepository = new MetadataSubscriberAccessorRepository($subscribers); $subscriptionManager = new SubscriptionManager($store); $eventDispatcher = new EventDispatcher(); + $batchManager = new BatchManager(); - $eventDispatcher->addSubscriber(new BatchSubscriber($subscriberRepository, new NullLogger())); + $eventDispatcher->addSubscriber(new BatchSubscriber($batchManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addSubscriber(new RetrySubscriber($subscriptionManager, $subscriberRepository, $retryStrategyRepository, new NullLogger())); $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addListener(OnCommand::class, new DetachListener($subscriptionManager, $subscriberRepository, new NullLogger()), 32); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [new BatchArgumentResolver($batchManager)], new NullLogger()); $handler = new RunHandler($messageLoader, $subscriptionManager, $messageProcessor, $eventDispatcher, new NullLogger()); @@ -133,14 +140,68 @@ public function testBootBatchingSuccess(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); + } + + public function testBootBatchingWithDefaultState(): void + { + $subscriber = new DefaultStateBatchingSubscriber(); + + $store = new DummySubscriptionStore([ + new Subscription( + $subscriber::ID, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + self::assertEquals([], $result->errors); + + self::assertInstanceOf(stdClass::class, $subscriber->receivedState); + self::assertSame($subscriber->receivedState, $subscriber->flushedState); + } + + public function testBootBatchingWithVoidBeginUsesDefaultState(): void + { + $subscriber = new VoidBeginBatchingSubscriber(); + + $store = new DummySubscriptionStore([ + new Subscription( + $subscriber::ID, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + self::assertEquals([], $result->errors); + + self::assertTrue($subscriber->beginCalled); + self::assertInstanceOf(stdClass::class, $subscriber->receivedState); } public function testBootBatchingSuccessForceCommit(): void { $subscriber = new BatchingSubscriber( - forceCommitAfterMessages: 1, + flushAfterMessages: 1, ); $store = new DummySubscriptionStore([ @@ -181,8 +242,8 @@ public function testBootBatchingSuccessForceCommit(): void self::assertSame([$message1, $message2], $subscriber->receivedMessages); self::assertSame(2, $subscriber->beginBatchCalled); - self::assertSame(2, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(2, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testBootBatchingWithHandleError(): void @@ -235,8 +296,8 @@ public function testBootBatchingWithHandleError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); } public function testBootBatchingWithBeginBatchError(): void @@ -289,8 +350,8 @@ public function testBootBatchingWithBeginBatchError(): void self::assertSame([], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testBootBatchingWithCommitBatchError(): void @@ -298,7 +359,7 @@ public function testBootBatchingWithCommitBatchError(): void $exception = new RuntimeException('ERROR'); $subscriber = new BatchingSubscriber( - throwForCommitBatch: $exception, + throwForFlush: $exception, ); $store = new DummySubscriptionStore([ @@ -343,8 +404,8 @@ public function testBootBatchingWithCommitBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testBootBatchingWithRollbackBatchError(): void @@ -353,7 +414,7 @@ public function testBootBatchingWithRollbackBatchError(): void $subscriber = new BatchingSubscriber( throwForMessage: $exception, - throwForRollbackBatch: new RuntimeException('ERROR'), + throwForRollback: new RuntimeException('ERROR'), ); $store = new DummySubscriptionStore([ @@ -398,8 +459,8 @@ public function testBootBatchingWithRollbackBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); } public function testRunningBatchingSuccess(): void @@ -441,14 +502,14 @@ public function testRunningBatchingSuccess(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingSuccessForceCommit(): void { $subscriber = new BatchingSubscriber( - forceCommitAfterMessages: 1, + flushAfterMessages: 1, ); $store = new DummySubscriptionStore([ @@ -490,8 +551,8 @@ public function testRunningBatchingSuccessForceCommit(): void self::assertSame([$message1, $message2], $subscriber->receivedMessages); self::assertSame(2, $subscriber->beginBatchCalled); - self::assertSame(2, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(2, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingWithHandleError(): void @@ -545,8 +606,8 @@ public function testRunningBatchingWithHandleError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); } public function testRunningBatchingWithBeginBatchError(): void @@ -600,8 +661,8 @@ public function testRunningBatchingWithBeginBatchError(): void self::assertSame([], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingWithCommitBatchError(): void @@ -609,7 +670,7 @@ public function testRunningBatchingWithCommitBatchError(): void $exception = new RuntimeException('ERROR'); $subscriber = new BatchingSubscriber( - throwForCommitBatch: $exception, + throwForFlush: $exception, ); $store = new DummySubscriptionStore([ @@ -655,8 +716,8 @@ public function testRunningBatchingWithCommitBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingWithRollbackBatchError(): void @@ -665,7 +726,7 @@ public function testRunningBatchingWithRollbackBatchError(): void $subscriber = new BatchingSubscriber( throwForMessage: $exception, - throwForRollbackBatch: new RuntimeException('ERROR'), + throwForRollback: new RuntimeException('ERROR'), ); $store = new DummySubscriptionStore([ @@ -711,7 +772,7 @@ public function testRunningBatchingWithRollbackBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); } } diff --git a/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php b/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php index 0cca7387f..409f90b97 100644 --- a/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php +++ b/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php @@ -4,6 +4,9 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine\Listener; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\OnFailed; use Patchlevel\EventSourcing\Attribute\RetryStrategy as RetryStrategyName; use Patchlevel\EventSourcing\Attribute\Subscribe; @@ -14,7 +17,6 @@ use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Subscription\SubscriptionError; @@ -26,6 +28,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use stdClass; #[CoversClass(FailSubscriber::class)] final class FailSubscriberTest extends TestCase @@ -95,13 +98,13 @@ public function testFailsSubscriptionWhenNoSubscriberFound(): void ); } - public function testFailsSubscriptionForBatchableSubscriber(): void + public function testFailsSubscriptionForBatchingSubscriber(): void { $exception = new RuntimeException('ERROR'); $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] #[RetryStrategyName('no_retry')] - class implements BatchableSubscriber { + class { #[Subscribe(ProfileVisited::class)] public function handle(): void { @@ -112,21 +115,20 @@ public function onFailed(): void { } - public function beginBatch(): void - { - } - - public function commitBatch(): void + #[BatchBegin] + public function beginBatch(): object { + return new stdClass(); } - public function rollbackBatch(): void + #[BatchFlush] + public function flush(object $state): void { } - public function forceCommit(): bool + #[BatchRollback] + public function rollbackBatch(object $state): void { - return false; } }; diff --git a/tests/Unit/Subscription/Engine/MessageProcessorTest.php b/tests/Unit/Subscription/Engine/MessageProcessorTest.php index 9d61a54b7..571f5daf2 100644 --- a/tests/Unit/Subscription/Engine/MessageProcessorTest.php +++ b/tests/Unit/Subscription/Engine/MessageProcessorTest.php @@ -38,6 +38,7 @@ public function onProfileVisited(ProfileVisited $event): void $processor = new MessageProcessor( new MetadataSubscriberAccessorRepository([$subscriber]), new EventDispatcher(), + [], new NullLogger(), ); diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php new file mode 100644 index 000000000..b43562254 --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php @@ -0,0 +1,79 @@ +support( + new ArgumentMetadata('foo', Type::object(stdClass::class), true), + ProfileVisited::class, + ), + ); + + self::assertFalse( + $resolver->support( + new ArgumentMetadata('foo', Type::object(stdClass::class)), + ProfileVisited::class, + ), + ); + } + + public function testResolve(): void + { + $state = new stdClass(); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + }; + + $batch = new Batch( + new Subscription('foo'), + new MetadataSubscriberAccessor( + $subscriber, + (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), + ), + $state, + ); + + $batchManager = new BatchManager(); + $batchManager->add($batch); + + $resolver = new BatchArgumentResolver($batchManager); + $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); + + self::assertSame( + $state, + $resolver->resolve( + new ArgumentMetadata('foo', Type::object(stdClass::class), true), + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), + ), + ); + } +} diff --git a/tests/Unit/Subscription/Subscriber/BatchManagerTest.php b/tests/Unit/Subscription/Subscriber/BatchManagerTest.php new file mode 100644 index 000000000..ec568499d --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/BatchManagerTest.php @@ -0,0 +1,87 @@ +metadata($subscriber::class), + ), + new stdClass(), + ); + } + + public function testAddGetHasRemove(): void + { + $manager = new BatchManager(); + $batch = $this->batch('foo'); + + self::assertFalse($manager->has('foo')); + + $manager->add($batch); + + self::assertTrue($manager->has('foo')); + self::assertSame($batch, $manager->get('foo')); + + $manager->remove('foo'); + + self::assertFalse($manager->has('foo')); + } + + public function testGetMissing(): void + { + $this->expectException(BatchNotFound::class); + + (new BatchManager())->get('foo'); + } + + public function testAll(): void + { + $manager = new BatchManager(); + $foo = $this->batch('foo'); + $bar = $this->batch('bar'); + + $manager->add($foo); + $manager->add($bar); + + self::assertSame([$foo, $bar], $manager->all()); + } + + public function testClear(): void + { + $manager = new BatchManager(); + $manager->add($this->batch('foo')); + $manager->add($this->batch('bar')); + + $manager->clear(); + + self::assertFalse($manager->has('foo')); + self::assertFalse($manager->has('bar')); + self::assertSame([], $manager->all()); + } +} diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php index 3c27b8dec..b80aa9bc5 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php @@ -4,12 +4,9 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Subscriber; -use ArrayIterator; use Patchlevel\EventSourcing\Attribute\Subscriber; -use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Subscription\RunMode; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\DuplicateSubscriberId; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -36,68 +33,20 @@ class { }; $metadataFactory = new AttributeSubscriberMetadataFactory(); - $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed - { - return null; - } - - public function support(ArgumentMetadata $argument, string $eventClass): bool - { - return false; - } - }; - $repository = new MetadataSubscriberAccessorRepository( [$subscriber], $metadataFactory, - [$customResolver], ); $accessor = new MetadataSubscriberAccessor( $subscriber, $metadataFactory->metadata($subscriber::class), - [ - $customResolver, - new ArgumentResolver\MessageArgumentResolver(), - new ArgumentResolver\EventArgumentResolver(), - new ArgumentResolver\RecordedOnArgumentResolver(), - ], ); self::assertEquals([$accessor], $repository->all()); self::assertEquals($accessor, $repository->get('foo')); } - public function testArgumentResolversCanBeArraysAndIterators(): void - { - $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed - { - return null; - } - - public function support(ArgumentMetadata $argument, string $eventClass): bool - { - return false; - } - }; - - $repository = new MetadataSubscriberAccessorRepository( - [], - new AttributeSubscriberMetadataFactory(), - [$customResolver], - ); - - $repository2 = new MetadataSubscriberAccessorRepository( - [], - new AttributeSubscriberMetadataFactory(), - new ArrayIterator([$customResolver]), - ); - - self::assertEquals($repository, $repository2); - } - public function testDuplicateSubscriberId(): void { $this->expectException(DuplicateSubscriberId::class); diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php index 4ce898a2a..9f3e5a424 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php @@ -11,12 +11,7 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Subscription\RunMode; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; -use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; -use Patchlevel\EventSourcing\Subscription\Subscription; -use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; @@ -24,122 +19,54 @@ #[CoversClass(MetadataSubscriberAccessor::class)] final class MetadataSubscriberAccessorTest extends TestCase { - public function testSubscribeMethod(): void + /** @return MetadataSubscriberAccessor */ + private function accessor(object $subscriber): MetadataSubscriberAccessor { - $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] - class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function onProfileVisited(Message $message): void - { - $this->message = $message; - } - }; - - $accessor = new MetadataSubscriberAccessor( + return new MetadataSubscriberAccessor( $subscriber, (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [ - new MessageArgumentResolver(), - ], ); - - $result = $accessor->subscribeMethods(ProfileVisited::class); - - self::assertArrayHasKey(0, $result); - - $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - - $result[0]($message, new Subscription('profile')); - - self::assertSame($message, $subscriber->message); } - public function testSubscribeAllMethod(): void + public function testSubscribeMethod(): void { $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] class { - public Message|null $message = null; - - #[Subscribe('*')] - public function on(Message $message): void + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(Message $message): void { - $this->message = $message; } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [ - new MessageArgumentResolver(), - ], - ); - - $result = $accessor->subscribeMethods(ProfileVisited::class); + $result = $this->accessor($subscriber)->subscribeMethods(ProfileVisited::class); self::assertArrayHasKey(0, $result); - - $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - - $result[0]($message, new Subscription('profile')); - - self::assertSame($message, $subscriber->message); + self::assertSame('onProfileVisited', $result[0]->name); } - public function testNoResolver(): void + public function testSubscribeAllMethod(): void { - $this->expectException(NoSuitableResolver::class); - $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] class { - #[Subscribe(ProfileVisited::class)] + #[Subscribe('*')] public function on(Message $message): void { } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); + $result = $this->accessor($subscriber)->subscribeMethods(ProfileVisited::class); - $accessor->subscribeMethods(ProfileVisited::class); + self::assertArrayHasKey(0, $result); + self::assertSame('on', $result[0]->name); } - public function testMultipleResolver(): void + public function testNoSubscribeMethod(): void { $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function on(Message $message): void - { - $this->message = $message; - } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [ - new EventArgumentResolver(), - new MessageArgumentResolver(), - ], - ); - - $result = $accessor->subscribeMethods(ProfileVisited::class); - - self::assertArrayHasKey(0, $result); - - $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - - $result[0]($message, new Subscription('profile')); - - self::assertSame($message, $subscriber->message); + self::assertSame([], $this->accessor($subscriber)->subscribeMethods(ProfileVisited::class)); } public function testSetupMethod(): void @@ -152,15 +79,7 @@ public function method(): void } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->setupMethod(); - - self::assertEquals($subscriber->method(...), $result); + self::assertEquals($subscriber->method(...), $this->accessor($subscriber)->setupMethod()); } public function testNotSetupMethod(): void @@ -169,15 +88,7 @@ public function testNotSetupMethod(): void class { }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->setupMethod(); - - self::assertNull($result); + self::assertNull($this->accessor($subscriber)->setupMethod()); } public function testTeardownMethod(): void @@ -190,15 +101,7 @@ public function method(): void } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->teardownMethod(); - - self::assertEquals($subscriber->method(...), $result); + self::assertEquals($subscriber->method(...), $this->accessor($subscriber)->teardownMethod()); } public function testNotTeardownMethod(): void @@ -207,15 +110,7 @@ public function testNotTeardownMethod(): void class { }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->teardownMethod(); - - self::assertNull($result); + self::assertNull($this->accessor($subscriber)->teardownMethod()); } public function testRealSubscriber(): void @@ -224,12 +119,6 @@ public function testRealSubscriber(): void class { }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - self::assertEquals($subscriber, $accessor->subscriber()); + self::assertEquals($subscriber, $this->accessor($subscriber)->subscriber()); } } From d68d32886e912543d53aa54c6067f06631a7e483 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 19:14:09 +0200 Subject: [PATCH 2/7] Hold parameter attributes on ArgumentMetadata instead of a batch flag ArgumentMetadata now carries the list of attribute instances attached to a subscribe handler parameter, with an attribute() lookup helper, rather than a dedicated bool for BatchState. The batch argument resolver and the metadata factory ask for the BatchState attribute through that list, which keeps the metadata generic for future parameter attributes. Also fixes the nullsafe chain in BatchSubscriber, where metadata() was called on a possibly-null accessor before the null check. --- src/Metadata/Subscriber/ArgumentMetadata.php | 21 ++++++++++++++++++- .../AttributeSubscriberMetadataFactory.php | 8 +++++-- .../Engine/Listener/BatchSubscriber.php | 2 +- .../BatchArgumentResolver.php | 3 ++- ...AttributeSubscriberMetadataFactoryTest.php | 4 ++-- .../BatchArgumentResolverTest.php | 5 +++-- 6 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/Metadata/Subscriber/ArgumentMetadata.php b/src/Metadata/Subscriber/ArgumentMetadata.php index bac5225ef..e543ec8f4 100644 --- a/src/Metadata/Subscriber/ArgumentMetadata.php +++ b/src/Metadata/Subscriber/ArgumentMetadata.php @@ -8,10 +8,29 @@ final class ArgumentMetadata { + /** @param list $attributes */ public function __construct( public readonly string $name, public readonly Type $type, - public readonly bool $batch = false, + public readonly array $attributes = [], ) { } + + /** + * @param class-string $attributeClass + * + * @return T|null + * + * @template T of object + */ + public function attribute(string $attributeClass): object|null + { + foreach ($this->attributes as $attribute) { + if ($attribute instanceof $attributeClass) { + return $attribute; + } + } + + return null; + } } diff --git a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php index 706167194..f69f2e858 100644 --- a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php +++ b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php @@ -24,6 +24,7 @@ use Symfony\Component\TypeInfo\TypeResolver\TypeResolver; use function array_key_exists; +use function array_map; use function count; final class AttributeSubscriberMetadataFactory implements SubscriberMetadataFactory @@ -269,7 +270,10 @@ private function subscribeMethod(ReflectionMethod $method): SubscribeMethodMetad $arguments[] = new ArgumentMetadata( $parameter->getName(), $this->typeResolver->resolve($type), - $parameter->getAttributes(BatchState::class) !== [], + array_map( + static fn (ReflectionAttribute $attribute): object => $attribute->newInstance(), + $parameter->getAttributes(), + ), ); } @@ -284,7 +288,7 @@ private function hasBatchArgument(array $subscribeMethods): bool { foreach ($subscribeMethods as $subscribeMethod) { foreach ($subscribeMethod->arguments as $argument) { - if ($argument->batch) { + if ($argument->attribute(BatchState::class) !== null) { return true; } } diff --git a/src/Subscription/Engine/Listener/BatchSubscriber.php b/src/Subscription/Engine/Listener/BatchSubscriber.php index c887e712c..a72cab462 100644 --- a/src/Subscription/Engine/Listener/BatchSubscriber.php +++ b/src/Subscription/Engine/Listener/BatchSubscriber.php @@ -46,7 +46,7 @@ public function onHandleMessage(OnHandleMessage $event): void } $subscriber = $this->subscriberRepository->get($subscriberId); - $batchMetadata = $subscriber->metadata()?->batch; + $batchMetadata = $subscriber?->metadata()->batch; if ($subscriber === null || $batchMetadata === null) { return; diff --git a/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php index 401f2a1a2..d82494eb6 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php @@ -4,6 +4,7 @@ namespace Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Subscription\Subscriber\BatchManager; @@ -21,6 +22,6 @@ public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $con public function support(ArgumentMetadata $argument, string $eventClass): bool { - return $argument->batch; + return $argument->attribute(BatchState::class) !== null; } } diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index ae2566e26..e665dfb68 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -414,8 +414,8 @@ public function rollback(object $state): void $metadata->batch, ); - self::assertTrue($metadata->subscribeMethods[ProfileVisited::class]->arguments[1]->batch); - self::assertFalse($metadata->subscribeMethods[ProfileVisited::class]->arguments[0]->batch); + self::assertInstanceOf(BatchState::class, $metadata->subscribeMethods[ProfileVisited::class]->arguments[1]->attribute(BatchState::class)); + self::assertNull($metadata->subscribeMethods[ProfileVisited::class]->arguments[0]->attribute(BatchState::class)); } public function testBatchMinimalMetadata(): void diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php index b43562254..f55fc60cf 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php @@ -4,6 +4,7 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Subscriber\ArgumentResolver; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\Subscriber; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; @@ -32,7 +33,7 @@ public function testSupport(): void self::assertTrue( $resolver->support( - new ArgumentMetadata('foo', Type::object(stdClass::class), true), + new ArgumentMetadata('foo', Type::object(stdClass::class), [new BatchState()]), ProfileVisited::class, ), ); @@ -71,7 +72,7 @@ class { self::assertSame( $state, $resolver->resolve( - new ArgumentMetadata('foo', Type::object(stdClass::class), true), + new ArgumentMetadata('foo', Type::object(stdClass::class), [new BatchState()]), new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); From 2d5d8ffdeeda136fa8355c07d881f9f396767bd6 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 19:18:33 +0200 Subject: [PATCH 3/7] Fix argument resolver registration in subscription docs The event emitter section passed argumentResolvers to MetadataSubscriberAccessorRepository, which no longer takes them. Resolvers are registered on the DefaultSubscriptionEngine, matching the upgrade guide. --- docs/subscription.md | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/docs/subscription.md b/docs/subscription.md index 05060fa13..714b031a8 100644 --- a/docs/subscription.md +++ b/docs/subscription.md @@ -341,28 +341,16 @@ $eventEmitter->linkTo('notifications', [new NotificationRequired($event->orderId The emitted events must be registered like any other event so the store can (de)serialize them. ::: -To configure the resolver, pass an `EventEmitterResolver` to the subscriber accessor repository, -the same way as the [lookup resolver](#lookup-resolver). It needs the event store to append to: - -```php -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; - -$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository( - $subscribers, - argumentResolvers: [ - new EventEmitterResolver($store), - ], -); -``` - -To also clean up the subscription stream when a subscription is removed, register the -`RemoveSubscriptionStreamListener` on the event dispatcher and pass it to the engine: +To configure the resolver, pass an `EventEmitterResolver` to the subscription engine via the +`argumentResolvers` argument; it needs the event store to append to. To also clean up the +subscription stream when a subscription is removed, register the `RemoveSubscriptionStreamListener` +on an event dispatcher and pass that to the engine as well: ```php use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; use Symfony\Component\EventDispatcher\EventDispatcher; $eventDispatcher = new EventDispatcher(); @@ -376,6 +364,9 @@ $engine = new DefaultSubscriptionEngine( $subscriptionStore, $subscriberAccessorRepository, eventDispatcher: $eventDispatcher, + argumentResolvers: [ + new EventEmitterResolver($store), + ], ); ``` From 531ea3ca34b5d066db20c71709ce8591ffd5ef61 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 19:20:00 +0200 Subject: [PATCH 4/7] Document lookup resolver registration The lookup resolver is not registered by default, so show how to pass it to the subscription engine, the same as the event emitter resolver. --- docs/subscription.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/subscription.md b/docs/subscription.md index 714b031a8..9863de149 100644 --- a/docs/subscription.md +++ b/docs/subscription.md @@ -282,6 +282,23 @@ final class PublicProfileProjection More information can be found in the [reducer](message.md#reducer) documentation. ::: +The lookup resolver is not registered by default. Pass a `LookupResolver` to the subscription engine +via the `argumentResolvers` argument; it needs the event store to read the previous messages from: + +```php +use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver; + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberAccessorRepository, + argumentResolvers: [ + new LookupResolver($store), + ], +); +``` + ##### Recorded On Resolver The recorded on resolver resolves the recorded on date. From 94d507f82706c6eb8c32084a298e597382c1fd1a Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 19:32:43 +0200 Subject: [PATCH 5/7] Rename ArgumentMetadata::attribute() to hasAttribute() Every caller only checks for presence, so a bool hasAttribute() reads better than returning the attribute instance. --- src/Metadata/Subscriber/ArgumentMetadata.php | 14 ++++---------- .../AttributeSubscriberMetadataFactory.php | 2 +- .../ArgumentResolver/BatchArgumentResolver.php | 2 +- .../AttributeSubscriberMetadataFactoryTest.php | 4 ++-- 4 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/Metadata/Subscriber/ArgumentMetadata.php b/src/Metadata/Subscriber/ArgumentMetadata.php index e543ec8f4..b687faa18 100644 --- a/src/Metadata/Subscriber/ArgumentMetadata.php +++ b/src/Metadata/Subscriber/ArgumentMetadata.php @@ -16,21 +16,15 @@ public function __construct( ) { } - /** - * @param class-string $attributeClass - * - * @return T|null - * - * @template T of object - */ - public function attribute(string $attributeClass): object|null + /** @param class-string $attributeClass */ + public function hasAttribute(string $attributeClass): bool { foreach ($this->attributes as $attribute) { if ($attribute instanceof $attributeClass) { - return $attribute; + return true; } } - return null; + return false; } } diff --git a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php index f69f2e858..cf558df61 100644 --- a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php +++ b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php @@ -288,7 +288,7 @@ private function hasBatchArgument(array $subscribeMethods): bool { foreach ($subscribeMethods as $subscribeMethod) { foreach ($subscribeMethod->arguments as $argument) { - if ($argument->attribute(BatchState::class) !== null) { + if ($argument->hasAttribute(BatchState::class)) { return true; } } diff --git a/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php index d82494eb6..661b86478 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php @@ -22,6 +22,6 @@ public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $con public function support(ArgumentMetadata $argument, string $eventClass): bool { - return $argument->attribute(BatchState::class) !== null; + return $argument->hasAttribute(BatchState::class); } } diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index e665dfb68..f08945f41 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -414,8 +414,8 @@ public function rollback(object $state): void $metadata->batch, ); - self::assertInstanceOf(BatchState::class, $metadata->subscribeMethods[ProfileVisited::class]->arguments[1]->attribute(BatchState::class)); - self::assertNull($metadata->subscribeMethods[ProfileVisited::class]->arguments[0]->attribute(BatchState::class)); + self::assertTrue($metadata->subscribeMethods[ProfileVisited::class]->arguments[1]->hasAttribute(BatchState::class)); + self::assertFalse($metadata->subscribeMethods[ProfileVisited::class]->arguments[0]->hasAttribute(BatchState::class)); } public function testBatchMinimalMetadata(): void From fe3984b0179dfd919b73df337f96e93337dc89e4 Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 17 Jun 2026 16:50:43 +0200 Subject: [PATCH 6/7] Improve mutation coverage for batch subscriber code Add tests for ArgumentMetadata::hasAttribute, the usesBatching attribute detection, message argument resolving in MessageProcessor and the afterMessages flush path of the batch subscriber. --- .../AfterMessagesBatchingSubscriber.php | 55 +++++++++++ .../Subscriber/ArgumentMetadataTest.php | 37 ++++++++ ...AttributeSubscriberMetadataFactoryTest.php | 54 +++++++++++ .../Engine/Listener/BatchSubscriberTest.php | 93 +++++++++++++++++++ .../Engine/MessageProcessorTest.php | 89 ++++++++++++++++++ .../Subscriber/BatchManagerTest.php | 1 + 6 files changed, 329 insertions(+) create mode 100644 tests/Unit/Fixture/AfterMessagesBatchingSubscriber.php create mode 100644 tests/Unit/Metadata/Subscriber/ArgumentMetadataTest.php diff --git a/tests/Unit/Fixture/AfterMessagesBatchingSubscriber.php b/tests/Unit/Fixture/AfterMessagesBatchingSubscriber.php new file mode 100644 index 000000000..2543606ff --- /dev/null +++ b/tests/Unit/Fixture/AfterMessagesBatchingSubscriber.php @@ -0,0 +1,55 @@ + */ + public array $receivedMessages = []; + + /** @var list number of messages contained in each flushed batch */ + public array $flushedBatchSizes = []; + + public int $beginBatchCalled = 0; + public int $flushCalled = 0; + + #[BatchBegin] + public function begin(): BatchingState + { + $this->beginBatchCalled++; + + return new BatchingState(); + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + BatchingState $state, + ): void { + $state->messages[] = $message; + $this->receivedMessages[] = $message; + } + + #[BatchFlush(afterMessages: 2)] + public function flush(BatchingState $state): void + { + $this->flushCalled++; + $this->flushedBatchSizes[] = count($state->messages); + } +} diff --git a/tests/Unit/Metadata/Subscriber/ArgumentMetadataTest.php b/tests/Unit/Metadata/Subscriber/ArgumentMetadataTest.php new file mode 100644 index 000000000..dfb52cc1a --- /dev/null +++ b/tests/Unit/Metadata/Subscriber/ArgumentMetadataTest.php @@ -0,0 +1,37 @@ +hasAttribute(BatchState::class)); + } + + public function testHasAttributeReturnsFalseForDifferentAttribute(): void + { + $argument = new ArgumentMetadata('state', Type::object(BatchState::class), [new BatchState()]); + + self::assertFalse($argument->hasAttribute(Subscribe::class)); + } + + public function testHasAttributeReturnsFalseWithoutAttributes(): void + { + $argument = new ArgumentMetadata('event', Type::object(BatchState::class)); + + self::assertFalse($argument->hasAttribute(BatchState::class)); + } +} diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index f08945f41..a31c05d62 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -440,6 +440,55 @@ public function flush(object $state): void self::assertEquals(new BatchMetadata('flush', 'begin'), $metadata->batch); } + public function testBatchOnlyFlushMethod(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals(new BatchMetadata('flush'), $metadata->batch); + } + + public function testBatchOnlyShouldFlushMethodWithoutFlush(): void + { + $this->expectException(IncompleteBatchMethods::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchShouldFlush] + public function shouldFlush(object $state): bool + { + return false; + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testBatchOnlyRollbackMethodWithoutFlush(): void + { + $this->expectException(IncompleteBatchMethods::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchRollback] + public function rollback(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + public function testBatchWithoutBeginMethod(): void { $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] @@ -494,6 +543,7 @@ public function profileVisited(ProfileVisited $event, #[BatchState] public function testBatchWithoutFlushMethod(): void { $this->expectException(IncompleteBatchMethods::class); + $this->expectExceptionMessage('uses batching but does not define a method marked with the #[BatchFlush] attribute'); $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] class { @@ -511,6 +561,7 @@ public function begin(): object public function testDuplicateBeginBatchException(): void { $this->expectException(DuplicateBeginBatchMethod::class); + $this->expectExceptionMessage('have been marked as "begin batch" methods'); $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] class { @@ -534,6 +585,7 @@ public function begin2(): object public function testDuplicateFlushException(): void { $this->expectException(DuplicateFlushMethod::class); + $this->expectExceptionMessage('have been marked as "flush" methods'); $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] class { @@ -555,6 +607,7 @@ public function flush2(object $state): void public function testDuplicateShouldFlushException(): void { $this->expectException(DuplicateShouldFlushMethod::class); + $this->expectExceptionMessage('have been marked as "should flush" methods'); $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] class { @@ -578,6 +631,7 @@ public function shouldFlush2(object $state): bool public function testDuplicateRollbackBatchException(): void { $this->expectException(DuplicateRollbackBatchMethod::class); + $this->expectExceptionMessage('have been marked as "rollback batch" methods'); $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] class { diff --git a/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php b/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php index 5dcd3297f..5fc4f6333 100644 --- a/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php +++ b/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php @@ -4,6 +4,12 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine\Listener; +use InvalidArgumentException; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchState; +use Patchlevel\EventSourcing\Attribute\Subscribe; +use Patchlevel\EventSourcing\Attribute\Subscriber; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Subscription\Engine\Command\Boot as BootCommand; @@ -29,6 +35,7 @@ use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Subscription\SubscriptionError; use Patchlevel\EventSourcing\Subscription\ThrowableToErrorContextTransformer; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\AfterMessagesBatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Fixture\BatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Fixture\DefaultStateBatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; @@ -463,6 +470,92 @@ public function testBootBatchingWithRollbackBatchError(): void self::assertSame(1, $subscriber->rollbackCalled); } + public function testBootBatchingFlushesAfterMessageThreshold(): void + { + $subscriber = new AfterMessagesBatchingSubscriber(); + + $store = new DummySubscriptionStore([ + new Subscription( + $subscriber::ID, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([ + 1 => $message1, + 2 => $message2, + 3 => $message3, + ])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + self::assertEquals(3, $result->processedMessages); + self::assertEquals(true, $result->finished); + self::assertEquals([], $result->errors); + + // first batch flushes once the threshold of 2 is reached, the third + // message opens a new batch that is flushed when processing finishes + self::assertSame([$message1, $message2, $message3], $subscriber->receivedMessages); + self::assertSame(2, $subscriber->beginBatchCalled); + self::assertSame(2, $subscriber->flushCalled); + self::assertSame([2, 1], $subscriber->flushedBatchSizes); + } + + public function testBootBatchingWithNonObjectBeginStateFails(): void + { + $subscriber = new #[Subscriber('non-object', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin(): string + { + return 'not-an-object'; + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + object $state, + ): void { + } + + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $store = new DummySubscriptionStore([ + new Subscription( + 'non-object', + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + $error = $result->errors[0]; + self::assertEquals('non-object', $error->subscriptionId); + self::assertStringContainsString('begin batch method must return an object or null', $error->message); + self::assertInstanceOf(InvalidArgumentException::class, $error->throwable); + } + public function testRunningBatchingSuccess(): void { $subscriber = new BatchingSubscriber(); diff --git a/tests/Unit/Subscription/Engine/MessageProcessorTest.php b/tests/Unit/Subscription/Engine/MessageProcessorTest.php index 571f5daf2..9f2b60061 100644 --- a/tests/Unit/Subscription/Engine/MessageProcessorTest.php +++ b/tests/Unit/Subscription/Engine/MessageProcessorTest.php @@ -11,6 +11,7 @@ use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; +use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; @@ -52,4 +53,92 @@ public function onProfileVisited(ProfileVisited $event): void self::assertSame($event, $subscriber->event); self::assertSame(1, $subscription->position()); } + + public function testProcessResolvesMessageArgument(): void + { + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(Message $message): void + { + $this->message = $message; + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + [], + new NullLogger(), + ); + + $message = Message::create(new ProfileVisited(ProfileId::fromString('1'))); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNull($error); + self::assertSame($message, $subscriber->message); + } + + public function testProcessResolvesMultipleArguments(): void + { + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public ProfileVisited|null $event = null; + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(ProfileVisited $event, Message $message): void + { + $this->event = $event; + $this->message = $message; + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + [], + new NullLogger(), + ); + + $event = new ProfileVisited(ProfileId::fromString('1')); + $message = Message::create($event); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNull($error); + self::assertSame($event, $subscriber->event); + self::assertSame($message, $subscriber->message); + } + + public function testProcessReturnsErrorWhenArgumentCannotBeResolved(): void + { + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(ProfileVisited $event, int $unresolved): void + { + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + [], + new NullLogger(), + ); + + $message = Message::create(new ProfileVisited(ProfileId::fromString('1'))); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNotNull($error); + self::assertInstanceOf(NoSuitableResolver::class, $error->throwable); + } } diff --git a/tests/Unit/Subscription/Subscriber/BatchManagerTest.php b/tests/Unit/Subscription/Subscriber/BatchManagerTest.php index ec568499d..8604d2dd5 100644 --- a/tests/Unit/Subscription/Subscriber/BatchManagerTest.php +++ b/tests/Unit/Subscription/Subscriber/BatchManagerTest.php @@ -56,6 +56,7 @@ public function testAddGetHasRemove(): void public function testGetMissing(): void { $this->expectException(BatchNotFound::class); + $this->expectExceptionMessage('No batch found for subscription "foo".'); (new BatchManager())->get('foo'); } From 599fc4ff7def1a83a0b5854a881684c86170c471 Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 17 Jun 2026 17:28:58 +0200 Subject: [PATCH 7/7] Use nested array for argument resolver cache The string-concatenated cache key was fragile against key collisions between the subscription id, event class and method name. A nested array keyed by those three parts cannot collide and drops the brittle key building. --- src/Subscription/Engine/MessageProcessor.php | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/Subscription/Engine/MessageProcessor.php b/src/Subscription/Engine/MessageProcessor.php index 9c9320744..9709b9de6 100644 --- a/src/Subscription/Engine/MessageProcessor.php +++ b/src/Subscription/Engine/MessageProcessor.php @@ -22,7 +22,6 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Throwable; -use function array_key_exists; use function array_merge; use function array_values; use function is_array; @@ -35,7 +34,7 @@ final class MessageProcessor /** @var list */ private readonly array $argumentResolvers; - /** @var array> */ + /** @var array>>> */ private array $resolverCache = []; /** @param iterable|list $argumentResolvers */ @@ -221,10 +220,8 @@ private function resolversFor( string $subscriberClass, SubscribeMethodMetadata $method, ): array { - $key = $subscriptionId . '::' . $eventClass . '::' . $method->name; - - if (array_key_exists($key, $this->resolverCache)) { - return $this->resolverCache[$key]; + if (isset($this->resolverCache[$subscriptionId][$eventClass][$method->name])) { + return $this->resolverCache[$subscriptionId][$eventClass][$method->name]; } $resolvers = []; @@ -233,7 +230,7 @@ private function resolversFor( $resolvers[] = $this->resolverFor($argument, $eventClass, $subscriberClass, $method); } - return $this->resolverCache[$key] = $resolvers; + return $this->resolverCache[$subscriptionId][$eventClass][$method->name] = $resolvers; } /**