From f836ac53afd5a22d019475c791ae5d7463610467 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 10:45:04 +0200 Subject: [PATCH 1/7] Add event emitter for subscriptions Subscribers can now emit events back into the store by type-hinting an EventEmitter argument on a subscribe method. Emitted events go into a dedicated subscription_ stream by default, or an explicit stream via linkTo, so other subscriptions can react to them. By default events are only emitted during run; while booting the emitter is a noop to avoid duplicates on replay. This can be overridden per subscriber with the OverrideEventEmitting attribute. When a subscription is removed an OnSubscriptionRemoved event is dispatched; registering the RemoveSubscriptionStreamListener removes the subscription stream from the store. ArgumentResolver::resolve() now receives an ArgumentResolverContext (message, subscription and subscriber metadata) instead of just the message. See UPGRADE-4.0. --- docs/UPGRADE-4.0.md | 34 +++++ docs/subscription.md | 125 ++++++++++++++++++ phpstan-baseline.neon | 2 +- src/Attribute/OverrideEventEmitting.php | 16 +++ .../AttributeSubscriberMetadataFactory.php | 13 ++ .../Subscriber/SubscriberMetadata.php | 1 + .../Engine/DefaultSubscriptionEngine.php | 2 + .../Engine/Event/OnSubscriptionRemoved.php | 15 +++ .../Engine/Handler/RemoveHandler.php | 21 ++- .../Engine/Handler/TeardownHandler.php | 19 ++- .../RemoveSubscriptionStreamListener.php | 43 ++++++ src/Subscription/Engine/MessageProcessor.php | 2 +- .../ArgumentResolver/ArgumentResolver.php | 3 +- .../ArgumentResolverContext.php | 19 +++ .../EventArgumentResolver.php | 5 +- .../ArgumentResolver/EventEmitterResolver.php | 46 +++++++ .../ArgumentResolver/LookupResolver.php | 5 +- .../MessageArgumentResolver.php | 4 +- .../RecordedOnArgumentResolver.php | 5 +- .../EventEmitter/DefaultEventEmitter.php | 41 ++++++ .../Subscriber/EventEmitter/EventEmitter.php | 22 +++ .../EventEmitter/NoopEventEmitter.php | 18 +++ .../EventEmitter/SubscriptionStream.php | 15 +++ .../Subscriber/MetadataSubscriberAccessor.php | 19 +-- .../Subscription/Events/NotificationSent.php | 17 +++ .../Subscriber/NotificationCollector.php | 23 ++++ .../NotificationEmittingProjection.php | 22 +++ .../Subscription/SubscriptionTest.php | 91 +++++++++++++ ...AttributeSubscriberMetadataFactoryTest.php | 15 +++ .../Engine/Handler/RemoveHandlerTest.php | 4 + .../Engine/Handler/TeardownHandlerTest.php | 5 + .../RemoveSubscriptionStreamListenerTest.php | 29 ++++ .../EventArgumentResolverTest.php | 5 +- .../EventEmitterResolverTest.php | 102 ++++++++++++++ .../ArgumentResolver/LookupResolverTest.php | 5 +- .../MessageArgumentResolverTest.php | 5 +- .../RecordedOnArgumentResolverTest.php | 7 +- .../EventEmitter/DefaultEventEmitterTest.php | 54 ++++++++ .../EventEmitter/NoopEventEmitterTest.php | 24 ++++ ...tadataSubscriberAccessorRepositoryTest.php | 5 +- .../MetadataSubscriberAccessorTest.php | 7 +- 41 files changed, 875 insertions(+), 40 deletions(-) create mode 100644 src/Attribute/OverrideEventEmitting.php create mode 100644 src/Subscription/Engine/Event/OnSubscriptionRemoved.php create mode 100644 src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php create mode 100644 src/Subscription/Subscriber/ArgumentResolver/ArgumentResolverContext.php create mode 100644 src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php create mode 100644 src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php create mode 100644 src/Subscription/Subscriber/EventEmitter/EventEmitter.php create mode 100644 src/Subscription/Subscriber/EventEmitter/NoopEventEmitter.php create mode 100644 src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php create mode 100644 tests/Integration/Subscription/Events/NotificationSent.php create mode 100644 tests/Integration/Subscription/Subscriber/NotificationCollector.php create mode 100644 tests/Integration/Subscription/Subscriber/NotificationEmittingProjection.php create mode 100644 tests/Unit/Subscription/Engine/Listener/RemoveSubscriptionStreamListenerTest.php create mode 100644 tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php create mode 100644 tests/Unit/Subscription/Subscriber/EventEmitter/DefaultEventEmitterTest.php create mode 100644 tests/Unit/Subscription/Subscriber/EventEmitter/NoopEventEmitterTest.php diff --git a/docs/UPGRADE-4.0.md b/docs/UPGRADE-4.0.md index ccb88ebc8..cfdaddb68 100644 --- a/docs/UPGRADE-4.0.md +++ b/docs/UPGRADE-4.0.md @@ -209,6 +209,40 @@ and a `bool $allowsNull` property. Instead it now has a single `Symfony\Componen If you implemented a custom `ArgumentResolver`, adjust it to read the type from the new `Type` object. +### ArgumentResolver + +The `resolve` method of `Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver` +no longer receives the `Message` directly. It now receives an `ArgumentResolverContext` that bundles the +current `message`, `subscription` and `subscriber` metadata. + +Before: + +```php +final class CustomResolver implements ArgumentResolver +{ + public function resolve(ArgumentMetadata $argument, Message $message): mixed + { + return $message->header(CustomHeader::class); + } + + // ... support() +} +``` + +After: + +```php +final class CustomResolver implements ArgumentResolver +{ + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed + { + return $context->message->header(CustomHeader::class); + } + + // ... support() +} +``` + ## Store ### StreamStore diff --git a/docs/subscription.md b/docs/subscription.md index cc461ba19..d107c73c1 100644 --- a/docs/subscription.md +++ b/docs/subscription.md @@ -302,10 +302,135 @@ final class DoStuffSubscriber } } ``` +##### Event Emitter Resolver + +A subscriber can not only build a projection or trigger a side effect, it can also emit new events +back into the store. This is useful when one subscription should kick off another one: for example a +projection that, after it has been updated, wants a notification subscription to send a mail. + +If you type-hint an `EventEmitter` argument in a subscribe method, it is injected automatically. +By default the emitted events are written into a dedicated stream named `subscription_`. + +```php +use Patchlevel\EventSourcing\Attribute\Subscribe; +use Patchlevel\EventSourcing\Attribute\Subscriber; +use Patchlevel\EventSourcing\Subscription\RunMode; +use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\EventEmitter; + +#[Subscriber('order_projection', RunMode::FromBeginning)] +final class OrderProjection +{ + #[Subscribe(OrderPlaced::class)] + public function onOrderPlaced(OrderPlaced $event, EventEmitter $eventEmitter): void + { + // update the projection ... + + $eventEmitter->emit([new NotificationRequired($event->orderId)]); + } +} +``` + +The `emit` method writes the events into the subscriber's own `subscription_` stream. +If you want to target a different stream, use `linkTo`: + +```php +$eventEmitter->linkTo('notifications', [new NotificationRequired($event->orderId)]); +``` + +:::info +The emitted events must be registered like any other event so the store can (de)serialize them. +::: + +To configure the resolver, pass an `EventEmitterResolver` to the subscriber accessor repository, +the same way as the [lookup resolver](#lookup-resolver). It needs the event store to append to: + +```php +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; + +$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository( + $subscribers, + argumentResolvers: [ + new EventEmitterResolver($store), + ], +); +``` + +To also clean up the subscription stream when a subscription is removed, register the +`RemoveSubscriptionStreamListener` on the event dispatcher and pass it to the engine: + +```php +use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; +use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener; +use Symfony\Component\EventDispatcher\EventDispatcher; + +$eventDispatcher = new EventDispatcher(); +$eventDispatcher->addListener( + OnSubscriptionRemoved::class, + new RemoveSubscriptionStreamListener($store), +); + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberAccessorRepository, + eventDispatcher: $eventDispatcher, +); +``` + +###### When are events emitted + +Emitting events while a subscription is *booting* would create duplicates on every replay, so by +default events are only emitted during `run` and the emitter is a noop during `boot`. You can change +this per subscriber with the `OverrideEventEmitting` attribute: + +```php +use Patchlevel\EventSourcing\Attribute\OverrideEventEmitting; +use Patchlevel\EventSourcing\Attribute\Subscriber; +use Patchlevel\EventSourcing\Subscription\RunMode; + +#[Subscriber('order_projection', RunMode::FromBeginning)] +#[OverrideEventEmitting(true)] // also emit during boot +final class OrderProjection +{ + // ... +} +``` + +* `OverrideEventEmitting(true)` — events are also emitted during `boot`. +* `OverrideEventEmitting(false)` — events are never emitted, not even during `run`. + +:::info +When a subscription is removed, its `subscription_` stream is removed from the store as +well (as long as the `RemoveSubscriptionStreamListener` is registered). +::: + ##### Custom Resolvers You can provide your own argument resolvers by implementing the `ArgumentResolver` interface. This can be useful for providing direct access to custom headers or other data. +The `resolve` method receives an `ArgumentResolverContext` that gives you access to the current +`message`, the `subscription` and the `subscriber` metadata. + +```php +use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; + +final class CustomResolver implements ArgumentResolver +{ + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed + { + return $context->message->header(CustomHeader::class); + } + + public function support(ArgumentMetadata $argument, string $eventClass): bool + { + return $argument->type->isIdentifiedBy(CustomHeader::class); + } +} +``` ### Setup diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index bf453ba17..e6ef23b53 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -589,7 +589,7 @@ parameters: path: tests/Unit/Message/Translator/ReplaceEventTranslatorTest.php - - message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:234\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' + message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:249\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' identifier: missingType.parameter count: 1 path: tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php diff --git a/src/Attribute/OverrideEventEmitting.php b/src/Attribute/OverrideEventEmitting.php new file mode 100644 index 000000000..5c6e52f45 --- /dev/null +++ b/src/Attribute/OverrideEventEmitting.php @@ -0,0 +1,16 @@ +retryStrategy($reflector), $cleanupMethod, + $this->overrideEventEmitting($reflector), ); $this->subscriberMetadata[$subscriber] = $metadata; @@ -202,4 +204,15 @@ private function retryStrategy(ReflectionClass $reflector): string|null return $instance->name; } + + private function overrideEventEmitting(ReflectionClass $reflector): bool|null + { + $attributes = $reflector->getAttributes(OverrideEventEmitting::class); + + if ($attributes === []) { + return null; + } + + return $attributes[0]->newInstance()->enabled; + } } diff --git a/src/Metadata/Subscriber/SubscriberMetadata.php b/src/Metadata/Subscriber/SubscriberMetadata.php index f6926270f..69c02ab99 100644 --- a/src/Metadata/Subscriber/SubscriberMetadata.php +++ b/src/Metadata/Subscriber/SubscriberMetadata.php @@ -20,6 +20,7 @@ public function __construct( public readonly string|null $failedMethod = null, public readonly string|null $retryStrategy = null, public readonly string|null $cleanupMethod = null, + public readonly bool|null $overrideEventEmitting = null, ) { } } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 9a9ada233..2f99e039b 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -113,6 +113,7 @@ public function __construct( $this->subscriptionManager, $this->subscriberRepository, $cleanupRunner, + $this->eventDispatcher, $this->logger, ), Run::class => new RunHandler( @@ -133,6 +134,7 @@ public function __construct( $this->subscriptionManager, $this->subscriberRepository, $cleanupRunner, + $this->eventDispatcher, $this->logger, ), ]; diff --git a/src/Subscription/Engine/Event/OnSubscriptionRemoved.php b/src/Subscription/Engine/Event/OnSubscriptionRemoved.php new file mode 100644 index 000000000..20104c365 --- /dev/null +++ b/src/Subscription/Engine/Event/OnSubscriptionRemoved.php @@ -0,0 +1,15 @@ +isNew()) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -65,13 +69,16 @@ function (SubscriptionCollection $subscriptions): Result { $errors[] = $error; } + // the cleanup runner removes the subscription (forced, even on error) + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + continue; } $subscriber = $this->subscriberRepository->get($subscription->id()); if (!$subscriber) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -86,7 +93,7 @@ function (SubscriptionCollection $subscriptions): Result { $teardownMethod = $subscriber->teardownMethod(); if (!$teardownMethod) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), @@ -113,7 +120,7 @@ function (SubscriptionCollection $subscriptions): Result { ); } - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), @@ -124,4 +131,10 @@ function (SubscriptionCollection $subscriptions): Result { }, ); } + + private function remove(Subscription $subscription): void + { + $this->subscriptionManager->remove($subscription); + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + } } diff --git a/src/Subscription/Engine/Handler/TeardownHandler.php b/src/Subscription/Engine/Handler/TeardownHandler.php index 51a14c593..9a2eb52f4 100644 --- a/src/Subscription/Engine/Handler/TeardownHandler.php +++ b/src/Subscription/Engine/Handler/TeardownHandler.php @@ -8,13 +8,16 @@ use Patchlevel\EventSourcing\Subscription\Engine\Command\Command; use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown; use Patchlevel\EventSourcing\Subscription\Engine\Error; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Result; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionCollection; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; +use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Throwable; use function sprintf; @@ -30,6 +33,7 @@ public function __construct( private readonly SubscriptionManager $subscriptionManager, private readonly SubscriberAccessorRepository $subscriberRepository, private readonly CleanupRunner $cleanupRunner, + private readonly EventDispatcherInterface $eventDispatcher, private readonly LoggerInterface|null $logger = null, ) { } @@ -52,8 +56,13 @@ function (SubscriptionCollection $subscriptions): Result { if ($error) { $errors[] = $error; + + continue; } + // the cleanup runner removed the subscription + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + continue; } @@ -73,7 +82,7 @@ function (SubscriptionCollection $subscriptions): Result { $teardownMethod = $subscriber->teardownMethod(); if (!$teardownMethod) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -113,7 +122,7 @@ function (SubscriptionCollection $subscriptions): Result { continue; } - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -127,4 +136,10 @@ function (SubscriptionCollection $subscriptions): Result { }, ); } + + private function remove(Subscription $subscription): void + { + $this->subscriptionManager->remove($subscription); + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + } } diff --git a/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php b/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php new file mode 100644 index 000000000..d5eaa4baf --- /dev/null +++ b/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php @@ -0,0 +1,43 @@ +subscription->id()); + + $this->store->remove( + new Criteria( + new StreamCriterion($streamName), + ), + ); + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription stream "%s" for subscription "%s" has been removed.', + $streamName, + $event->subscription->id(), + ), + ); + } +} diff --git a/src/Subscription/Engine/MessageProcessor.php b/src/Subscription/Engine/MessageProcessor.php index 534f76a0a..c24bb5fcf 100644 --- a/src/Subscription/Engine/MessageProcessor.php +++ b/src/Subscription/Engine/MessageProcessor.php @@ -92,7 +92,7 @@ public function process(int $index, Message $message, Subscription $subscription try { foreach ($subscribeMethods as $subscribeMethod) { - $subscribeMethod($message); + $subscribeMethod($message, $subscription); } } catch (Throwable $e) { $this->logger?->error( diff --git a/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php index 2f0411af2..77db678d4 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php @@ -4,12 +4,11 @@ namespace Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; interface ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): mixed; + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed; public function support(ArgumentMetadata $argument, string $eventClass): bool; } diff --git a/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolverContext.php b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolverContext.php new file mode 100644 index 000000000..2bc1833e0 --- /dev/null +++ b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolverContext.php @@ -0,0 +1,19 @@ +event(); + return $context->message->event(); } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php new file mode 100644 index 000000000..483a52985 --- /dev/null +++ b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php @@ -0,0 +1,46 @@ +subscriber->overrideEventEmitting; + + $enabled = match (true) { + $override === false => false, + $override === true => true, + // by default events are only emitted during run, not while booting + default => $context->subscription->isActive(), + }; + + if (!$enabled) { + return new NoopEventEmitter(); + } + + return new DefaultEventEmitter( + $this->store, + SubscriptionStream::name($context->subscription->id()), + ); + } + + public function support(ArgumentMetadata $argument, string $eventClass): bool + { + return $argument->type->isIdentifiedBy(EventEmitter::class); + } +} diff --git a/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php b/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php index 6b466e148..90af8e0a8 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php @@ -4,7 +4,6 @@ namespace Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Store\Store; @@ -18,11 +17,11 @@ public function __construct( ) { } - public function resolve(ArgumentMetadata $argument, Message $message): Lookup + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): Lookup { return new Lookup( $this->store, - $message, + $context->message, $this->eventRegistry, ); } diff --git a/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php index 50cdb848f..c511ab022 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php @@ -9,9 +9,9 @@ final class MessageArgumentResolver implements ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): Message + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): Message { - return $message; + return $context->message; } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php index ddc71f5b1..41853e1f9 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php @@ -5,15 +5,14 @@ namespace Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; use DateTimeImmutable; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; final class RecordedOnArgumentResolver implements ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): DateTimeImmutable + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): DateTimeImmutable { - return $message->header(RecordedOnHeader::class)->recordedOn; + return $context->message->header(RecordedOnHeader::class)->recordedOn; } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php b/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php new file mode 100644 index 000000000..a6a30ef5c --- /dev/null +++ b/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php @@ -0,0 +1,41 @@ + $events */ + public function emit(array $events): void + { + $this->linkTo($this->subscriptionStream, $events); + } + + /** @param list $events */ + public function linkTo(string $streamName, array $events): void + { + if ($events === []) { + return; + } + + $messages = []; + + foreach ($events as $event) { + $messages[] = Message::create($event) + ->withHeader(new StreamNameHeader($streamName)); + } + + $this->store->save(...$messages); + } +} diff --git a/src/Subscription/Subscriber/EventEmitter/EventEmitter.php b/src/Subscription/Subscriber/EventEmitter/EventEmitter.php new file mode 100644 index 000000000..f2534d6f5 --- /dev/null +++ b/src/Subscription/Subscriber/EventEmitter/EventEmitter.php @@ -0,0 +1,22 @@ + $events + */ + public function emit(array $events): void; + + /** + * Emit events into the given stream. + * + * @param list $events + */ + public function linkTo(string $streamName, array $events): void; +} diff --git a/src/Subscription/Subscriber/EventEmitter/NoopEventEmitter.php b/src/Subscription/Subscriber/EventEmitter/NoopEventEmitter.php new file mode 100644 index 000000000..328ac2683 --- /dev/null +++ b/src/Subscription/Subscriber/EventEmitter/NoopEventEmitter.php @@ -0,0 +1,18 @@ + $events */ + public function emit(array $events): void + { + } + + /** @param list $events */ + public function linkTo(string $streamName, array $events): void + { + } +} diff --git a/src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php b/src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php new file mode 100644 index 000000000..37c7bfe7e --- /dev/null +++ b/src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php @@ -0,0 +1,15 @@ +> */ + /** @var array> */ private array $subscribeCache = []; /** @@ -99,7 +101,7 @@ public function events(): array /** * @param class-string $eventClass * - * @return list + * @return list */ public function subscribeMethods(string $eventClass): array { @@ -128,18 +130,18 @@ public function subscribeMethods(string $eventClass): array /** * @param class-string $eventClass * - * @return Closure(Message):void + * @return Closure(Message, Subscription):void */ private function createClosure(string $eventClass, SubscribeMethodMetadata $method): Closure { $resolvers = $this->resolvers($eventClass, $method); $methodName = $method->name; - return function (Message $message) use ($methodName, $resolvers): void { + return function (Message $message, Subscription $subscription) use ($methodName, $resolvers): void { $arguments = []; foreach ($resolvers as $resolver) { - $arguments[] = $resolver($message); + $arguments[] = $resolver($message, $subscription); } $this->subscriber->$methodName(...$arguments); @@ -149,11 +151,12 @@ private function createClosure(string $eventClass, SubscribeMethodMetadata $meth /** * @param class-string $eventClass * - * @return list + * @return list */ private function resolvers(string $eventClass, SubscribeMethodMetadata $method): array { $resolvers = []; + $metadata = $this->metadata; foreach ($method->arguments as $argument) { foreach ($this->argumentResolvers as $resolver) { @@ -161,8 +164,8 @@ private function resolvers(string $eventClass, SubscribeMethodMetadata $method): continue; } - $resolvers[] = static function (Message $message) use ($resolver, $argument): mixed { - return $resolver->resolve($argument, $message); + $resolvers[] = static function (Message $message, Subscription $subscription) use ($resolver, $argument, $metadata): mixed { + return $resolver->resolve($argument, new ArgumentResolverContext($message, $subscription, $metadata)); }; continue 2; diff --git a/tests/Integration/Subscription/Events/NotificationSent.php b/tests/Integration/Subscription/Events/NotificationSent.php new file mode 100644 index 000000000..a0f5fcf01 --- /dev/null +++ b/tests/Integration/Subscription/Events/NotificationSent.php @@ -0,0 +1,17 @@ + */ + public array $notifications = []; + + #[Subscribe(NotificationSent::class)] + public function onNotificationSent(NotificationSent $event): void + { + $this->notifications[] = $event; + } +} diff --git a/tests/Integration/Subscription/Subscriber/NotificationEmittingProjection.php b/tests/Integration/Subscription/Subscriber/NotificationEmittingProjection.php new file mode 100644 index 000000000..4a039928a --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/NotificationEmittingProjection.php @@ -0,0 +1,22 @@ +emit([new NotificationSent($event->profileId)]); + } +} diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 6da6843d2..2dd800487 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -19,6 +19,8 @@ use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; +use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DbalCleanupTaskHandler; use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DropTableTask; @@ -32,8 +34,10 @@ use Patchlevel\EventSourcing\Subscription\Engine\Command\Setup as SetupCommand; use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown as TeardownCommand; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\GapResolverStoreMessageLoader; +use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener; use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\ProcessedResult; use Patchlevel\EventSourcing\Subscription\Engine\Result; @@ -43,6 +47,7 @@ use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; @@ -50,6 +55,8 @@ use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerWithSelfRecoverySubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\LookupSubscriber; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\NotificationCollector; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\NotificationEmittingProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjection; @@ -57,6 +64,7 @@ use PHPUnit\Framework\Attributes\CoversNothing; use PHPUnit\Framework\TestCase; use RuntimeException; +use Symfony\Component\EventDispatcher\EventDispatcher; use function gc_collect_cycles; use function iterator_to_array; @@ -1463,6 +1471,89 @@ public function testLookup(): void self::assertSame('Hans', $result['name']); } + public function testEventEmitter(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $collector = new NotificationCollector(); + + $subscriberRepository = new MetadataSubscriberAccessorRepository( + [ + new NotificationEmittingProjection(), + $collector, + ], + argumentResolvers: [ + new EventEmitterResolver($store), + ], + ); + + $eventDispatcher = new EventDispatcher(); + $eventDispatcher->addListener( + OnSubscriptionRemoved::class, + new RemoveSubscriptionStreamListener($store), + ); + + $engine = new DefaultSubscriptionEngine( + new StoreMessageLoader($store), + $subscriptionStore, + $subscriberRepository, + eventDispatcher: $eventDispatcher, + ); + + $engine->execute(new SetupCommand()); + $engine->execute(new Boot()); + + $profileId = ProfileId::generate(); + $repository->save(Profile::create($profileId, 'John')); + + // the emitting projection reacts to ProfileCreated and emits a NotificationSent event + // into its projection stream, which the notification subscriber then consumes. Depending + // on the database driver this can happen in one or two runs, so we drain the engine. + do { + $result = $engine->execute(new Run()); + + self::assertInstanceOf(ProcessedResult::class, $result); + self::assertEquals([], $result->errors); + } while ($result->processedMessages > 0); + + self::assertSame(1, $store->count(new Criteria(new StreamCriterion('subscription_emitting')))); + self::assertCount(1, $collector->notifications); + self::assertEquals($profileId, $collector->notifications[0]->profileId); + + // removing the subscriptions also removes the projection stream + $engine->execute(new Remove()); + + self::assertNotContains('subscription_emitting', $store->streams()); + self::assertSame(0, $store->count(new Criteria(new StreamCriterion('subscription_emitting')))); + } + public function testRefreshSubscriptions(): void { $store = new StreamDoctrineDbalStore( diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index 144335160..7d95731be 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -4,6 +4,7 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Metadata\Subscriber; +use Patchlevel\EventSourcing\Attribute\OverrideEventEmitting; use Patchlevel\EventSourcing\Attribute\Processor; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; @@ -53,6 +54,20 @@ class { self::assertNull($metadata->setupMethod); self::assertNull($metadata->teardownMethod); self::assertSame('foo', $metadata->id); + self::assertNull($metadata->overrideEventEmitting); + } + + public function testOverrideEventEmitting(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + #[OverrideEventEmitting(true)] + class { + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertTrue($metadata->overrideEventEmitting); } public function testProjector(): void diff --git a/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php index f70eb96d0..cc6264ecd 100644 --- a/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php @@ -24,6 +24,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(RemoveHandler::class)] final class RemoveHandlerTest extends TestCase @@ -40,6 +41,7 @@ private function createHandler( $subscriptionManager, new MetadataSubscriberAccessorRepository($subscribers), $cleanupRunner ?? new CleanupRunner($subscriptionManager, null, new NullLogger()), + new EventDispatcher(), new NullLogger(), ); } @@ -206,6 +208,7 @@ class { $subscriptionManager, new MetadataSubscriberAccessorRepository([$subscriber]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + new EventDispatcher(), new NullLogger(), ); @@ -239,6 +242,7 @@ public function testRemoveWithCleanupHandlerError(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + new EventDispatcher(), new NullLogger(), ); diff --git a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php index 880864f26..1c5a41983 100644 --- a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php @@ -24,6 +24,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(TeardownHandler::class)] final class TeardownHandlerTest extends TestCase @@ -40,6 +41,7 @@ private function createHandler( $subscriptionManager, new MetadataSubscriberAccessorRepository($subscribers), $cleanupRunner ?? new CleanupRunner($subscriptionManager, null, new NullLogger()), + new EventDispatcher(), new NullLogger(), ); } @@ -180,6 +182,7 @@ class { $subscriptionManager, new MetadataSubscriberAccessorRepository([$subscriber]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + new EventDispatcher(), new NullLogger(), ); @@ -213,6 +216,7 @@ public function testTeardownWithCleanupAndWithoutSubscriber(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + new EventDispatcher(), new NullLogger(), ); @@ -247,6 +251,7 @@ public function testTeardownWithCleanupHandlerError(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + new EventDispatcher(), new NullLogger(), ); diff --git a/tests/Unit/Subscription/Engine/Listener/RemoveSubscriptionStreamListenerTest.php b/tests/Unit/Subscription/Engine/Listener/RemoveSubscriptionStreamListenerTest.php new file mode 100644 index 000000000..a1f924328 --- /dev/null +++ b/tests/Unit/Subscription/Engine/Listener/RemoveSubscriptionStreamListenerTest.php @@ -0,0 +1,29 @@ +createMock(Store::class); + $store->expects($this->once()) + ->method('remove') + ->with(new Criteria(new StreamCriterion('subscription_foo'))); + + $listener = new RemoveSubscriptionStreamListener($store); + $listener(new OnSubscriptionRemoved(new Subscription('foo'))); + } +} diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php index d530a7a94..916b10920 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php @@ -6,7 +6,10 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; @@ -66,7 +69,7 @@ public function testResolve(): void $event, $resolver->resolve( new ArgumentMetadata('foo', Type::object(ProfileVisited::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php new file mode 100644 index 000000000..f8cdce214 --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php @@ -0,0 +1,102 @@ +createMock(Store::class)); + + self::assertTrue( + $resolver->support( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + ProfileVisited::class, + ), + ); + + self::assertFalse( + $resolver->support( + new ArgumentMetadata('foo', Type::object(ProfileVisited::class)), + ProfileVisited::class, + ), + ); + } + + public function testResolveDuringRunReturnsDefaultEmitter(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Active, null), + ); + + self::assertInstanceOf(DefaultEventEmitter::class, $emitter); + } + + public function testResolveDuringBootReturnsNoopByDefault(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Booting, null), + ); + + self::assertInstanceOf(NoopEventEmitter::class, $emitter); + } + + public function testOverrideTrueEmitsDuringBoot(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Booting, true), + ); + + self::assertInstanceOf(DefaultEventEmitter::class, $emitter); + } + + public function testOverrideFalseSuppressesDuringRun(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Active, false), + ); + + self::assertInstanceOf(NoopEventEmitter::class, $emitter); + } + + private function context(Status $status, bool|null $override): ArgumentResolverContext + { + return new ArgumentResolverContext( + new Message(new stdClass()), + new Subscription('foo', status: $status), + new SubscriberMetadata('foo', overrideEventEmitting: $override), + ); + } +} diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php index 2e62b07b9..425e2a59a 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php @@ -7,10 +7,13 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; use Patchlevel\EventSourcing\Store\Header\IndexHeader; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Lookup\Lookup; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; @@ -58,7 +61,7 @@ public function testResolve(): void $lookup = $resolver->resolve( new ArgumentMetadata('foo', Type::object(Lookup::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ); self::assertInstanceOf(Lookup::class, $lookup); diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php index 0d76a7c0f..facfbc1c6 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php @@ -6,7 +6,10 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use stdClass; @@ -43,7 +46,7 @@ public function testResolve(): void $message, $resolver->resolve( new ArgumentMetadata('foo', Type::object(Message::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php index c3697ef0e..2f4e26160 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php @@ -7,8 +7,11 @@ use DateTimeImmutable; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\RecordedOnArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use stdClass; @@ -49,7 +52,7 @@ public function testResolveFromAggregateHeader(): void $date, $resolver->resolve( new ArgumentMetadata('foo', Type::object(DateTimeImmutable::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } @@ -65,7 +68,7 @@ public function testResolveFromRecordedOnHeader(): void $date, $resolver->resolve( new ArgumentMetadata('foo', Type::object(DateTimeImmutable::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } diff --git a/tests/Unit/Subscription/Subscriber/EventEmitter/DefaultEventEmitterTest.php b/tests/Unit/Subscription/Subscriber/EventEmitter/DefaultEventEmitterTest.php new file mode 100644 index 000000000..110b5ae04 --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/EventEmitter/DefaultEventEmitterTest.php @@ -0,0 +1,54 @@ +emit([new stdClass(), new stdClass()]); + + self::assertSame(['subscription_foo'], $store->streams()); + self::assertSame(2, $store->count(new Criteria(new StreamCriterion('subscription_foo')))); + } + + public function testLinkToWritesToGivenStream(): void + { + $store = new InMemoryStore(); + $emitter = new DefaultEventEmitter($store, 'subscription_foo'); + + $emitter->linkTo('other_stream', [new stdClass()]); + + self::assertSame(['other_stream'], $store->streams()); + + $message = $store->load(new Criteria(new StreamCriterion('other_stream')))->current(); + + self::assertNotNull($message); + self::assertSame('other_stream', $message->header(StreamNameHeader::class)->streamName); + } + + public function testEmitWithoutEventsDoesNothing(): void + { + $store = new InMemoryStore(); + $emitter = new DefaultEventEmitter($store, 'subscription_foo'); + + $emitter->emit([]); + + self::assertSame([], $store->streams()); + } +} diff --git a/tests/Unit/Subscription/Subscriber/EventEmitter/NoopEventEmitterTest.php b/tests/Unit/Subscription/Subscriber/EventEmitter/NoopEventEmitterTest.php new file mode 100644 index 000000000..7cb295671 --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/EventEmitter/NoopEventEmitterTest.php @@ -0,0 +1,24 @@ +emit([new stdClass()]); + $emitter->linkTo('foo', [new stdClass()]); + + $this->expectNotToPerformAssertions(); + } +} diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php index 757a10362..3c27b8dec 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php @@ -6,7 +6,6 @@ use ArrayIterator; use Patchlevel\EventSourcing\Attribute\Subscriber; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -38,7 +37,7 @@ class { $metadataFactory = new AttributeSubscriberMetadataFactory(); $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): mixed + public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed { return null; } @@ -73,7 +72,7 @@ public function support(ArgumentMetadata $argument, string $eventClass): bool public function testArgumentResolversCanBeArraysAndIterators(): void { $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): mixed + public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed { return null; } diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php index 2e162417a..4ce898a2a 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php @@ -15,6 +15,7 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\Attributes\CoversClass; @@ -50,7 +51,7 @@ public function onProfileVisited(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - $result[0]($message); + $result[0]($message, new Subscription('profile')); self::assertSame($message, $subscriber->message); } @@ -82,7 +83,7 @@ public function on(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - $result[0]($message); + $result[0]($message, new Subscription('profile')); self::assertSame($message, $subscriber->message); } @@ -136,7 +137,7 @@ public function on(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - $result[0]($message); + $result[0]($message, new Subscription('profile')); self::assertSame($message, $subscriber->message); } From bdbcc7fc2b98fdf47d6c3976b996c4d7ff28a981 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 10:54:32 +0200 Subject: [PATCH 2/7] Drop SubscriptionStream helper, inline stream name The default stream name (subscription_) is now built directly in the EventEmitterResolver and the RemoveSubscriptionStreamListener. Also rename the DefaultEventEmitter property to defaultStream. --- .../Listener/RemoveSubscriptionStreamListener.php | 3 +-- .../ArgumentResolver/EventEmitterResolver.php | 3 +-- .../EventEmitter/DefaultEventEmitter.php | 4 ++-- .../EventEmitter/SubscriptionStream.php | 15 --------------- 4 files changed, 4 insertions(+), 21 deletions(-) delete mode 100644 src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php diff --git a/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php b/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php index d5eaa4baf..161b8faea 100644 --- a/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php +++ b/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php @@ -8,7 +8,6 @@ use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; -use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\SubscriptionStream; use Psr\Log\LoggerInterface; use function sprintf; @@ -24,7 +23,7 @@ public function __construct( public function __invoke(OnSubscriptionRemoved $event): void { - $streamName = SubscriptionStream::name($event->subscription->id()); + $streamName = 'subscription_' . $event->subscription->id(); $this->store->remove( new Criteria( diff --git a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php index 483a52985..863c9121d 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php @@ -9,7 +9,6 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\DefaultEventEmitter; use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\EventEmitter; use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\NoopEventEmitter; -use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\SubscriptionStream; final class EventEmitterResolver implements ArgumentResolver { @@ -35,7 +34,7 @@ public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $con return new DefaultEventEmitter( $this->store, - SubscriptionStream::name($context->subscription->id()), + 'subscription_' . $context->subscription->id(), ); } diff --git a/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php b/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php index a6a30ef5c..e1103ad90 100644 --- a/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php +++ b/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php @@ -12,14 +12,14 @@ final class DefaultEventEmitter implements EventEmitter { public function __construct( private readonly Store $store, - private readonly string $subscriptionStream, + private readonly string $defaultStream, ) { } /** @param list $events */ public function emit(array $events): void { - $this->linkTo($this->subscriptionStream, $events); + $this->linkTo($this->defaultStream, $events); } /** @param list $events */ diff --git a/src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php b/src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php deleted file mode 100644 index 37c7bfe7e..000000000 --- a/src/Subscription/Subscriber/EventEmitter/SubscriptionStream.php +++ /dev/null @@ -1,15 +0,0 @@ - Date: Tue, 16 Jun 2026 11:41:24 +0200 Subject: [PATCH 3/7] Split OverrideEventEmitting into two attributes Replace the OverrideEventEmitting(bool) attribute with two dedicated marker attributes: EnableEventEmittingDuringBoot (also emit while booting) and DisableEventEmitting (never emit). The subscriber metadata now carries two booleans instead of a nullable one. --- docs/subscription.md | 10 +++---- phpstan-baseline.neon | 2 +- ...tEmitting.php => DisableEventEmitting.php} | 6 +---- .../EnableEventEmittingDuringBoot.php | 12 +++++++++ .../AttributeSubscriberMetadataFactory.php | 17 +++--------- .../Subscriber/SubscriberMetadata.php | 3 ++- .../ArgumentResolver/EventEmitterResolver.php | 6 ++--- ...AttributeSubscriberMetadataFactoryTest.php | 27 +++++++++++++++---- .../EventEmitterResolverTest.php | 25 ++++++++++------- 9 files changed, 66 insertions(+), 42 deletions(-) rename src/Attribute/{OverrideEventEmitting.php => DisableEventEmitting.php} (53%) create mode 100644 src/Attribute/EnableEventEmittingDuringBoot.php diff --git a/docs/subscription.md b/docs/subscription.md index d107c73c1..5451aab58 100644 --- a/docs/subscription.md +++ b/docs/subscription.md @@ -383,23 +383,23 @@ $engine = new DefaultSubscriptionEngine( Emitting events while a subscription is *booting* would create duplicates on every replay, so by default events are only emitted during `run` and the emitter is a noop during `boot`. You can change -this per subscriber with the `OverrideEventEmitting` attribute: +this per subscriber with two attributes: ```php -use Patchlevel\EventSourcing\Attribute\OverrideEventEmitting; +use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot; use Patchlevel\EventSourcing\Attribute\Subscriber; use Patchlevel\EventSourcing\Subscription\RunMode; #[Subscriber('order_projection', RunMode::FromBeginning)] -#[OverrideEventEmitting(true)] // also emit during boot +#[EnableEventEmittingDuringBoot] final class OrderProjection { // ... } ``` -* `OverrideEventEmitting(true)` — events are also emitted during `boot`. -* `OverrideEventEmitting(false)` — events are never emitted, not even during `run`. +* `EnableEventEmittingDuringBoot` — events are also emitted during `boot`. +* `DisableEventEmitting` — events are never emitted, not even during `run`. :::info When a subscription is removed, its `subscription_` stream is removed from the store as diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index e6ef23b53..6c4abf6bf 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -589,7 +589,7 @@ parameters: path: tests/Unit/Message/Translator/ReplaceEventTranslatorTest.php - - message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:249\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' + message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:266\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' identifier: missingType.parameter count: 1 path: tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php diff --git a/src/Attribute/OverrideEventEmitting.php b/src/Attribute/DisableEventEmitting.php similarity index 53% rename from src/Attribute/OverrideEventEmitting.php rename to src/Attribute/DisableEventEmitting.php index 5c6e52f45..529b62e8f 100644 --- a/src/Attribute/OverrideEventEmitting.php +++ b/src/Attribute/DisableEventEmitting.php @@ -7,10 +7,6 @@ use Attribute; #[Attribute(Attribute::TARGET_CLASS)] -final class OverrideEventEmitting +final class DisableEventEmitting { - public function __construct( - public readonly bool $enabled, - ) { - } } diff --git a/src/Attribute/EnableEventEmittingDuringBoot.php b/src/Attribute/EnableEventEmittingDuringBoot.php new file mode 100644 index 000000000..f15f97f79 --- /dev/null +++ b/src/Attribute/EnableEventEmittingDuringBoot.php @@ -0,0 +1,12 @@ +retryStrategy($reflector), $cleanupMethod, - $this->overrideEventEmitting($reflector), + $reflector->getAttributes(EnableEventEmittingDuringBoot::class) !== [], + $reflector->getAttributes(DisableEventEmitting::class) !== [], ); $this->subscriberMetadata[$subscriber] = $metadata; @@ -204,15 +206,4 @@ private function retryStrategy(ReflectionClass $reflector): string|null return $instance->name; } - - private function overrideEventEmitting(ReflectionClass $reflector): bool|null - { - $attributes = $reflector->getAttributes(OverrideEventEmitting::class); - - if ($attributes === []) { - return null; - } - - return $attributes[0]->newInstance()->enabled; - } } diff --git a/src/Metadata/Subscriber/SubscriberMetadata.php b/src/Metadata/Subscriber/SubscriberMetadata.php index 69c02ab99..893aedfb1 100644 --- a/src/Metadata/Subscriber/SubscriberMetadata.php +++ b/src/Metadata/Subscriber/SubscriberMetadata.php @@ -20,7 +20,8 @@ public function __construct( public readonly string|null $failedMethod = null, public readonly string|null $retryStrategy = null, public readonly string|null $cleanupMethod = null, - public readonly bool|null $overrideEventEmitting = null, + public readonly bool $enableEventEmittingDuringBoot = false, + public readonly bool $disableEventEmitting = false, ) { } } diff --git a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php index 863c9121d..1050248ea 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php @@ -19,11 +19,11 @@ public function __construct( public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): EventEmitter { - $override = $context->subscriber->overrideEventEmitting; + $subscriber = $context->subscriber; $enabled = match (true) { - $override === false => false, - $override === true => true, + $subscriber->disableEventEmitting => false, + $subscriber->enableEventEmittingDuringBoot => true, // by default events are only emitted during run, not while booting default => $context->subscription->isActive(), }; diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index 7d95731be..a7d239f4a 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -4,7 +4,8 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Metadata\Subscriber; -use Patchlevel\EventSourcing\Attribute\OverrideEventEmitting; +use Patchlevel\EventSourcing\Attribute\DisableEventEmitting; +use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot; use Patchlevel\EventSourcing\Attribute\Processor; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; @@ -54,20 +55,36 @@ class { self::assertNull($metadata->setupMethod); self::assertNull($metadata->teardownMethod); self::assertSame('foo', $metadata->id); - self::assertNull($metadata->overrideEventEmitting); + self::assertFalse($metadata->enableEventEmittingDuringBoot); + self::assertFalse($metadata->disableEventEmitting); } - public function testOverrideEventEmitting(): void + public function testEnableEventEmittingDuringBoot(): void { $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] - #[OverrideEventEmitting(true)] + #[EnableEventEmittingDuringBoot] class { }; $metadataFactory = new AttributeSubscriberMetadataFactory(); $metadata = $metadataFactory->metadata($subscriber::class); - self::assertTrue($metadata->overrideEventEmitting); + self::assertTrue($metadata->enableEventEmittingDuringBoot); + self::assertFalse($metadata->disableEventEmitting); + } + + public function testDisableEventEmitting(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + #[DisableEventEmitting] + class { + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertTrue($metadata->disableEventEmitting); + self::assertFalse($metadata->enableEventEmittingDuringBoot); } public function testProjector(): void diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php index f8cdce214..013b8fd1d 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php @@ -49,7 +49,7 @@ public function testResolveDuringRunReturnsDefaultEmitter(): void $emitter = $resolver->resolve( new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), - $this->context(Status::Active, null), + $this->context(Status::Active), ); self::assertInstanceOf(DefaultEventEmitter::class, $emitter); @@ -61,42 +61,49 @@ public function testResolveDuringBootReturnsNoopByDefault(): void $emitter = $resolver->resolve( new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), - $this->context(Status::Booting, null), + $this->context(Status::Booting), ); self::assertInstanceOf(NoopEventEmitter::class, $emitter); } - public function testOverrideTrueEmitsDuringBoot(): void + public function testEnableEventEmittingDuringBoot(): void { $resolver = new EventEmitterResolver($this->createMock(Store::class)); $emitter = $resolver->resolve( new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), - $this->context(Status::Booting, true), + $this->context(Status::Booting, enableEventEmittingDuringBoot: true), ); self::assertInstanceOf(DefaultEventEmitter::class, $emitter); } - public function testOverrideFalseSuppressesDuringRun(): void + public function testDisableEventEmitting(): void { $resolver = new EventEmitterResolver($this->createMock(Store::class)); $emitter = $resolver->resolve( new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), - $this->context(Status::Active, false), + $this->context(Status::Active, disableEventEmitting: true), ); self::assertInstanceOf(NoopEventEmitter::class, $emitter); } - private function context(Status $status, bool|null $override): ArgumentResolverContext - { + private function context( + Status $status, + bool $enableEventEmittingDuringBoot = false, + bool $disableEventEmitting = false, + ): ArgumentResolverContext { return new ArgumentResolverContext( new Message(new stdClass()), new Subscription('foo', status: $status), - new SubscriberMetadata('foo', overrideEventEmitting: $override), + new SubscriberMetadata( + 'foo', + enableEventEmittingDuringBoot: $enableEventEmittingDuringBoot, + disableEventEmitting: $disableEventEmitting, + ), ); } } From 820442e9d7db4558d95f8a3ebe919b47d97ebd76 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 11:44:16 +0200 Subject: [PATCH 4/7] Rename DefaultEventEmitter to StoreEventEmitter --- .../ArgumentResolver/EventEmitterResolver.php | 4 ++-- ...DefaultEventEmitter.php => StoreEventEmitter.php} | 2 +- .../ArgumentResolver/EventEmitterResolverTest.php | 6 +++--- ...ventEmitterTest.php => StoreEventEmitterTest.php} | 12 ++++++------ 4 files changed, 12 insertions(+), 12 deletions(-) rename src/Subscription/Subscriber/EventEmitter/{DefaultEventEmitter.php => StoreEventEmitter.php} (94%) rename tests/Unit/Subscription/Subscriber/EventEmitter/{DefaultEventEmitterTest.php => StoreEventEmitterTest.php} (81%) diff --git a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php index 1050248ea..018fdd341 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php @@ -6,9 +6,9 @@ use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\DefaultEventEmitter; use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\EventEmitter; use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\NoopEventEmitter; +use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\StoreEventEmitter; final class EventEmitterResolver implements ArgumentResolver { @@ -32,7 +32,7 @@ public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $con return new NoopEventEmitter(); } - return new DefaultEventEmitter( + return new StoreEventEmitter( $this->store, 'subscription_' . $context->subscription->id(), ); diff --git a/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php b/src/Subscription/Subscriber/EventEmitter/StoreEventEmitter.php similarity index 94% rename from src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php rename to src/Subscription/Subscriber/EventEmitter/StoreEventEmitter.php index e1103ad90..3af8d551f 100644 --- a/src/Subscription/Subscriber/EventEmitter/DefaultEventEmitter.php +++ b/src/Subscription/Subscriber/EventEmitter/StoreEventEmitter.php @@ -8,7 +8,7 @@ use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\Store; -final class DefaultEventEmitter implements EventEmitter +final class StoreEventEmitter implements EventEmitter { public function __construct( private readonly Store $store, diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php index 013b8fd1d..bd48543db 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php @@ -11,9 +11,9 @@ use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\DefaultEventEmitter; use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\EventEmitter; use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\NoopEventEmitter; +use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\StoreEventEmitter; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\Attributes\CoversClass; @@ -52,7 +52,7 @@ public function testResolveDuringRunReturnsDefaultEmitter(): void $this->context(Status::Active), ); - self::assertInstanceOf(DefaultEventEmitter::class, $emitter); + self::assertInstanceOf(StoreEventEmitter::class, $emitter); } public function testResolveDuringBootReturnsNoopByDefault(): void @@ -76,7 +76,7 @@ public function testEnableEventEmittingDuringBoot(): void $this->context(Status::Booting, enableEventEmittingDuringBoot: true), ); - self::assertInstanceOf(DefaultEventEmitter::class, $emitter); + self::assertInstanceOf(StoreEventEmitter::class, $emitter); } public function testDisableEventEmitting(): void diff --git a/tests/Unit/Subscription/Subscriber/EventEmitter/DefaultEventEmitterTest.php b/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php similarity index 81% rename from tests/Unit/Subscription/Subscriber/EventEmitter/DefaultEventEmitterTest.php rename to tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php index 110b5ae04..742ad5f10 100644 --- a/tests/Unit/Subscription/Subscriber/EventEmitter/DefaultEventEmitterTest.php +++ b/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php @@ -8,18 +8,18 @@ use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\InMemoryStore; -use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\DefaultEventEmitter; +use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\StoreEventEmitter; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use stdClass; -#[CoversClass(DefaultEventEmitter::class)] -final class DefaultEventEmitterTest extends TestCase +#[CoversClass(StoreEventEmitter::class)] +final class StoreEventEmitterTest extends TestCase { public function testEmitWritesToSubscriptionStream(): void { $store = new InMemoryStore(); - $emitter = new DefaultEventEmitter($store, 'subscription_foo'); + $emitter = new StoreEventEmitter($store, 'subscription_foo'); $emitter->emit([new stdClass(), new stdClass()]); @@ -30,7 +30,7 @@ public function testEmitWritesToSubscriptionStream(): void public function testLinkToWritesToGivenStream(): void { $store = new InMemoryStore(); - $emitter = new DefaultEventEmitter($store, 'subscription_foo'); + $emitter = new StoreEventEmitter($store, 'subscription_foo'); $emitter->linkTo('other_stream', [new stdClass()]); @@ -45,7 +45,7 @@ public function testLinkToWritesToGivenStream(): void public function testEmitWithoutEventsDoesNothing(): void { $store = new InMemoryStore(); - $emitter = new DefaultEventEmitter($store, 'subscription_foo'); + $emitter = new StoreEventEmitter($store, 'subscription_foo'); $emitter->emit([]); From bb3b0c1f80f6e16fba17fce679d2d67672bb0463 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 12:00:20 +0200 Subject: [PATCH 5/7] Assert event emitting behaviour to restore mutation coverage The handler tests now record dispatched OnSubscriptionRemoved events and assert them for every removal path (and assert none on the teardown error/no-removal paths). The resolver test now emits through the resolved emitter and asserts the subscription_ target stream, instead of only checking the returned type. --- .../Engine/Handler/RemoveHandlerTest.php | 35 ++++++++++++++++-- .../Engine/Handler/TeardownHandlerTest.php | 37 +++++++++++++++++-- .../EventEmitterResolverTest.php | 16 ++++++++ 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php index cc6264ecd..d1b3a1ced 100644 --- a/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Subscription\Engine\CleanerNotConfigured; use Patchlevel\EventSourcing\Subscription\Engine\CleanupRunner; use Patchlevel\EventSourcing\Subscription\Engine\Command\Remove as RemoveCommand; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Handler\RemoveHandler; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -29,6 +30,27 @@ #[CoversClass(RemoveHandler::class)] final class RemoveHandlerTest extends TestCase { + /** @var list */ + private array $removedSubscriptions = []; + + protected function setUp(): void + { + $this->removedSubscriptions = []; + } + + private function recordingDispatcher(): EventDispatcher + { + $dispatcher = new EventDispatcher(); + $dispatcher->addListener( + OnSubscriptionRemoved::class, + function (OnSubscriptionRemoved $event): void { + $this->removedSubscriptions[] = $event->subscription; + }, + ); + + return $dispatcher; + } + /** @param list $subscribers */ private function createHandler( DummySubscriptionStore $store, @@ -41,7 +63,7 @@ private function createHandler( $subscriptionManager, new MetadataSubscriberAccessorRepository($subscribers), $cleanupRunner ?? new CleanupRunner($subscriptionManager, null, new NullLogger()), - new EventDispatcher(), + $this->recordingDispatcher(), new NullLogger(), ); } @@ -70,6 +92,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); self::assertTrue($subscriber->dropped); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithoutDropMethod(): void @@ -88,6 +111,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithSubscriberAndError(): void @@ -117,6 +141,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveNewSubscriber(): void @@ -143,6 +168,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); self::assertFalse($subscriber->dropped); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithoutSubscriber(): void @@ -158,6 +184,7 @@ public function testRemoveWithoutSubscriber(): void self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithCleanupAndWithoutCleaner(): void @@ -208,7 +235,7 @@ class { $subscriptionManager, new MetadataSubscriberAccessorRepository([$subscriber]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), - new EventDispatcher(), + $this->recordingDispatcher(), new NullLogger(), ); @@ -217,6 +244,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithCleanupHandlerError(): void @@ -242,7 +270,7 @@ public function testRemoveWithCleanupHandlerError(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), - new EventDispatcher(), + $this->recordingDispatcher(), new NullLogger(), ); @@ -255,5 +283,6 @@ public function testRemoveWithCleanupHandlerError(): void self::assertInstanceOf(CleanupFailed::class, $error->throwable); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } } diff --git a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php index 1c5a41983..6c27a3959 100644 --- a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Subscription\Engine\CleanerNotConfigured; use Patchlevel\EventSourcing\Subscription\Engine\CleanupRunner; use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown as TeardownCommand; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Handler\TeardownHandler; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -29,6 +30,27 @@ #[CoversClass(TeardownHandler::class)] final class TeardownHandlerTest extends TestCase { + /** @var list */ + private array $removedSubscriptions = []; + + protected function setUp(): void + { + $this->removedSubscriptions = []; + } + + private function recordingDispatcher(): EventDispatcher + { + $dispatcher = new EventDispatcher(); + $dispatcher->addListener( + OnSubscriptionRemoved::class, + function (OnSubscriptionRemoved $event): void { + $this->removedSubscriptions[] = $event->subscription; + }, + ); + + return $dispatcher; + } + /** @param list $subscribers */ private function createHandler( DummySubscriptionStore $store, @@ -41,7 +63,7 @@ private function createHandler( $subscriptionManager, new MetadataSubscriberAccessorRepository($subscribers), $cleanupRunner ?? new CleanupRunner($subscriptionManager, null, new NullLogger()), - new EventDispatcher(), + $this->recordingDispatcher(), new NullLogger(), ); } @@ -62,6 +84,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithSubscriber(): void @@ -88,6 +111,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); self::assertTrue($subscriber->dropped); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithSubscriberAndError(): void @@ -117,6 +141,7 @@ public function drop(): void self::assertInstanceOf(RuntimeException::class, $error->throwable); $store->assertNoChanges(); + self::assertSame([], $this->removedSubscriptions); } public function testTeardownWithoutSubscriber(): void @@ -132,6 +157,7 @@ public function testTeardownWithoutSubscriber(): void self::assertEquals([], $result->errors); $store->assertNoChanges(); + self::assertSame([], $this->removedSubscriptions); } public function testTeardownWithCleanupAndWithoutCleaner(): void @@ -182,7 +208,7 @@ class { $subscriptionManager, new MetadataSubscriberAccessorRepository([$subscriber]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), - new EventDispatcher(), + $this->recordingDispatcher(), new NullLogger(), ); @@ -191,6 +217,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithCleanupAndWithoutSubscriber(): void @@ -216,7 +243,7 @@ public function testTeardownWithCleanupAndWithoutSubscriber(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), - new EventDispatcher(), + $this->recordingDispatcher(), new NullLogger(), ); @@ -225,6 +252,7 @@ public function testTeardownWithCleanupAndWithoutSubscriber(): void self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithCleanupHandlerError(): void @@ -251,7 +279,7 @@ public function testTeardownWithCleanupHandlerError(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), - new EventDispatcher(), + $this->recordingDispatcher(), new NullLogger(), ); @@ -264,5 +292,6 @@ public function testTeardownWithCleanupHandlerError(): void self::assertInstanceOf(CleanupFailed::class, $error->throwable); $store->assertNoChanges(); + self::assertSame([], $this->removedSubscriptions); } } diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php index bd48543db..50bd8504c 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php @@ -7,6 +7,7 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; +use Patchlevel\EventSourcing\Store\InMemoryStore; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; @@ -55,6 +56,21 @@ public function testResolveDuringRunReturnsDefaultEmitter(): void self::assertInstanceOf(StoreEventEmitter::class, $emitter); } + public function testResolvedEmitterEmitsToSubscriptionStream(): void + { + $store = new InMemoryStore(); + $resolver = new EventEmitterResolver($store); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Active), + ); + + $emitter->emit([new stdClass()]); + + self::assertSame(['subscription_foo'], $store->streams()); + } + public function testResolveDuringBootReturnsNoopByDefault(): void { $resolver = new EventEmitterResolver($this->createMock(Store::class)); From 6b665827723f75220e092fe38782ee7ea3f727ef Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 12:11:34 +0200 Subject: [PATCH 6/7] Cover remaining mutations on the event emitter diff Add a MessageProcessorTest that drives a real subscribe method through the processor (covers the previously uncovered foreach), assert StoreEventEmitter does not touch the store for empty event lists (mock instead of in-memory, to kill the return-removal mutant), and add a two-subscription teardown test so the continue after a cleanup error is not equivalent to a break. Also assert the SubscriberMetadata event-emitting defaults. --- .../Subscriber/SubscriberMetadataTest.php | 21 ++++++++ .../Engine/Handler/TeardownHandlerTest.php | 36 +++++++++++++ .../Engine/MessageProcessorTest.php | 54 +++++++++++++++++++ .../EventEmitter/StoreEventEmitterTest.php | 9 ++-- 4 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 tests/Unit/Metadata/Subscriber/SubscriberMetadataTest.php create mode 100644 tests/Unit/Subscription/Engine/MessageProcessorTest.php diff --git a/tests/Unit/Metadata/Subscriber/SubscriberMetadataTest.php b/tests/Unit/Metadata/Subscriber/SubscriberMetadataTest.php new file mode 100644 index 000000000..eef513993 --- /dev/null +++ b/tests/Unit/Metadata/Subscriber/SubscriberMetadataTest.php @@ -0,0 +1,21 @@ +enableEventEmittingDuringBoot); + self::assertFalse($metadata->disableEventEmitting); + } +} diff --git a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php index 6c27a3959..934e3e7f4 100644 --- a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php @@ -294,4 +294,40 @@ public function testTeardownWithCleanupHandlerError(): void $store->assertNoChanges(); self::assertSame([], $this->removedSubscriptions); } + + public function testTeardownContinuesWithNextSubscriptionAfterCleanupError(): void + { + $failingTask = new DropTableTask('failing'); + $passingTask = new DropTableTask('passing'); + + $failing = new Subscription('failing', Subscription::DEFAULT_GROUP, RunMode::FromBeginning, Status::Detached, cleanupTasks: [$failingTask]); + $passing = new Subscription('passing', Subscription::DEFAULT_GROUP, RunMode::FromBeginning, Status::Detached, cleanupTasks: [$passingTask]); + $store = new DummySubscriptionStore([$failing, $passing]); + + $cleanupHandler = $this->createMock(CleanupTaskHandler::class); + $cleanupHandler->method('supports')->willReturn(true); + $cleanupHandler->method('__invoke')->willReturnCallback( + static function (object $task) use ($failingTask): void { + if ($task === $failingTask) { + throw new RuntimeException('ERROR'); + } + }, + ); + + $subscriptionManager = new SubscriptionManager($store); + $handler = new TeardownHandler( + $subscriptionManager, + new MetadataSubscriberAccessorRepository([]), + new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + $this->recordingDispatcher(), + new NullLogger(), + ); + + $result = $handler(new TeardownCommand()); + + self::assertCount(1, $result->errors); + // the second subscription is still processed after the first one failed + $store->assertRemoved($passing); + self::assertSame([$passing], $this->removedSubscriptions); + } } diff --git a/tests/Unit/Subscription/Engine/MessageProcessorTest.php b/tests/Unit/Subscription/Engine/MessageProcessorTest.php new file mode 100644 index 000000000..9d61a54b7 --- /dev/null +++ b/tests/Unit/Subscription/Engine/MessageProcessorTest.php @@ -0,0 +1,54 @@ +event = $event; + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + new NullLogger(), + ); + + $event = new ProfileVisited(ProfileId::fromString('1')); + $message = Message::create($event); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNull($error); + self::assertSame($event, $subscriber->event); + self::assertSame(1, $subscription->position()); + } +} diff --git a/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php b/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php index 742ad5f10..2195abc40 100644 --- a/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php +++ b/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php @@ -8,6 +8,7 @@ use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\InMemoryStore; +use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\StoreEventEmitter; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; @@ -42,13 +43,13 @@ public function testLinkToWritesToGivenStream(): void self::assertSame('other_stream', $message->header(StreamNameHeader::class)->streamName); } - public function testEmitWithoutEventsDoesNothing(): void + public function testEmitWithoutEventsDoesNotTouchTheStore(): void { - $store = new InMemoryStore(); + $store = $this->createMock(Store::class); + $store->expects($this->never())->method('save'); + $emitter = new StoreEventEmitter($store, 'subscription_foo'); $emitter->emit([]); - - self::assertSame([], $store->streams()); } } From 9b5ad2b1384d6b2a1859ad4bcb2ecb4296f701f5 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 16 Jun 2026 12:24:33 +0200 Subject: [PATCH 7/7] fix benchmark --- tests/Benchmark/NoopSubscriptionEngineBench.php | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/Benchmark/NoopSubscriptionEngineBench.php b/tests/Benchmark/NoopSubscriptionEngineBench.php index 210c08d3b..b2c4b67fb 100644 --- a/tests/Benchmark/NoopSubscriptionEngineBench.php +++ b/tests/Benchmark/NoopSubscriptionEngineBench.php @@ -4,15 +4,18 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; -use Patchlevel\EventSourcing\Aggregate\AggregateRootId; +use Patchlevel\EventSourcing\Identifier\Identifier; use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; -use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; +use Patchlevel\EventSourcing\Subscription\Engine\Command\Boot; +use Patchlevel\EventSourcing\Subscription\Engine\Command\Remove; +use Patchlevel\EventSourcing\Subscription\Engine\Command\Setup; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; @@ -33,13 +36,13 @@ final class NoopSubscriptionEngineBench private SubscriptionEngine $subscriptionEngine; - private AggregateRootId $id; + private Identifier $id; public function setUp(): void { $connection = DbalManager::createConnection(); - $this->store = new DoctrineDbalStore( + $this->store = new StreamDoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), ); @@ -89,8 +92,8 @@ public function setUp(): void #[Bench\Revs(10)] public function benchHandle10000Events(): void { - $this->subscriptionEngine->setup(); - $this->subscriptionEngine->boot(); - $this->subscriptionEngine->remove(); + $this->subscriptionEngine->execute(new Setup()); + $this->subscriptionEngine->execute(new Boot()); + $this->subscriptionEngine->execute(new Remove()); } }