diff --git a/docs/UPGRADE-4.0.md b/docs/UPGRADE-4.0.md index ccb88ebc8..cfdaddb68 100644 --- a/docs/UPGRADE-4.0.md +++ b/docs/UPGRADE-4.0.md @@ -209,6 +209,40 @@ and a `bool $allowsNull` property. Instead it now has a single `Symfony\Componen If you implemented a custom `ArgumentResolver`, adjust it to read the type from the new `Type` object. +### ArgumentResolver + +The `resolve` method of `Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver` +no longer receives the `Message` directly. It now receives an `ArgumentResolverContext` that bundles the +current `message`, `subscription` and `subscriber` metadata. + +Before: + +```php +final class CustomResolver implements ArgumentResolver +{ + public function resolve(ArgumentMetadata $argument, Message $message): mixed + { + return $message->header(CustomHeader::class); + } + + // ... support() +} +``` + +After: + +```php +final class CustomResolver implements ArgumentResolver +{ + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed + { + return $context->message->header(CustomHeader::class); + } + + // ... support() +} +``` + ## Store ### StreamStore diff --git a/docs/subscription.md b/docs/subscription.md index cc461ba19..5451aab58 100644 --- a/docs/subscription.md +++ b/docs/subscription.md @@ -302,10 +302,135 @@ final class DoStuffSubscriber } } ``` +##### Event Emitter Resolver + +A subscriber can not only build a projection or trigger a side effect, it can also emit new events +back into the store. This is useful when one subscription should kick off another one: for example a +projection that, after it has been updated, wants a notification subscription to send a mail. + +If you type-hint an `EventEmitter` argument in a subscribe method, it is injected automatically. +By default the emitted events are written into a dedicated stream named `subscription_`. + +```php +use Patchlevel\EventSourcing\Attribute\Subscribe; +use Patchlevel\EventSourcing\Attribute\Subscriber; +use Patchlevel\EventSourcing\Subscription\RunMode; +use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\EventEmitter; + +#[Subscriber('order_projection', RunMode::FromBeginning)] +final class OrderProjection +{ + #[Subscribe(OrderPlaced::class)] + public function onOrderPlaced(OrderPlaced $event, EventEmitter $eventEmitter): void + { + // update the projection ... + + $eventEmitter->emit([new NotificationRequired($event->orderId)]); + } +} +``` + +The `emit` method writes the events into the subscriber's own `subscription_` stream. +If you want to target a different stream, use `linkTo`: + +```php +$eventEmitter->linkTo('notifications', [new NotificationRequired($event->orderId)]); +``` + +:::info +The emitted events must be registered like any other event so the store can (de)serialize them. +::: + +To configure the resolver, pass an `EventEmitterResolver` to the subscriber accessor repository, +the same way as the [lookup resolver](#lookup-resolver). It needs the event store to append to: + +```php +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; + +$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository( + $subscribers, + argumentResolvers: [ + new EventEmitterResolver($store), + ], +); +``` + +To also clean up the subscription stream when a subscription is removed, register the +`RemoveSubscriptionStreamListener` on the event dispatcher and pass it to the engine: + +```php +use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; +use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener; +use Symfony\Component\EventDispatcher\EventDispatcher; + +$eventDispatcher = new EventDispatcher(); +$eventDispatcher->addListener( + OnSubscriptionRemoved::class, + new RemoveSubscriptionStreamListener($store), +); + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberAccessorRepository, + eventDispatcher: $eventDispatcher, +); +``` + +###### When are events emitted + +Emitting events while a subscription is *booting* would create duplicates on every replay, so by +default events are only emitted during `run` and the emitter is a noop during `boot`. You can change +this per subscriber with two attributes: + +```php +use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot; +use Patchlevel\EventSourcing\Attribute\Subscriber; +use Patchlevel\EventSourcing\Subscription\RunMode; + +#[Subscriber('order_projection', RunMode::FromBeginning)] +#[EnableEventEmittingDuringBoot] +final class OrderProjection +{ + // ... +} +``` + +* `EnableEventEmittingDuringBoot` — events are also emitted during `boot`. +* `DisableEventEmitting` — events are never emitted, not even during `run`. + +:::info +When a subscription is removed, its `subscription_` stream is removed from the store as +well (as long as the `RemoveSubscriptionStreamListener` is registered). +::: + ##### Custom Resolvers You can provide your own argument resolvers by implementing the `ArgumentResolver` interface. This can be useful for providing direct access to custom headers or other data. +The `resolve` method receives an `ArgumentResolverContext` that gives you access to the current +`message`, the `subscription` and the `subscriber` metadata. + +```php +use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; + +final class CustomResolver implements ArgumentResolver +{ + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed + { + return $context->message->header(CustomHeader::class); + } + + public function support(ArgumentMetadata $argument, string $eventClass): bool + { + return $argument->type->isIdentifiedBy(CustomHeader::class); + } +} +``` ### Setup diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index bf453ba17..6c4abf6bf 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -589,7 +589,7 @@ parameters: path: tests/Unit/Message/Translator/ReplaceEventTranslatorTest.php - - message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:234\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' + message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:266\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' identifier: missingType.parameter count: 1 path: tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php diff --git a/src/Attribute/DisableEventEmitting.php b/src/Attribute/DisableEventEmitting.php new file mode 100644 index 000000000..529b62e8f --- /dev/null +++ b/src/Attribute/DisableEventEmitting.php @@ -0,0 +1,12 @@ +retryStrategy($reflector), $cleanupMethod, + $reflector->getAttributes(EnableEventEmittingDuringBoot::class) !== [], + $reflector->getAttributes(DisableEventEmitting::class) !== [], ); $this->subscriberMetadata[$subscriber] = $metadata; diff --git a/src/Metadata/Subscriber/SubscriberMetadata.php b/src/Metadata/Subscriber/SubscriberMetadata.php index f6926270f..893aedfb1 100644 --- a/src/Metadata/Subscriber/SubscriberMetadata.php +++ b/src/Metadata/Subscriber/SubscriberMetadata.php @@ -20,6 +20,8 @@ public function __construct( public readonly string|null $failedMethod = null, public readonly string|null $retryStrategy = null, public readonly string|null $cleanupMethod = null, + public readonly bool $enableEventEmittingDuringBoot = false, + public readonly bool $disableEventEmitting = false, ) { } } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 9a9ada233..2f99e039b 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -113,6 +113,7 @@ public function __construct( $this->subscriptionManager, $this->subscriberRepository, $cleanupRunner, + $this->eventDispatcher, $this->logger, ), Run::class => new RunHandler( @@ -133,6 +134,7 @@ public function __construct( $this->subscriptionManager, $this->subscriberRepository, $cleanupRunner, + $this->eventDispatcher, $this->logger, ), ]; diff --git a/src/Subscription/Engine/Event/OnSubscriptionRemoved.php b/src/Subscription/Engine/Event/OnSubscriptionRemoved.php new file mode 100644 index 000000000..20104c365 --- /dev/null +++ b/src/Subscription/Engine/Event/OnSubscriptionRemoved.php @@ -0,0 +1,15 @@ +isNew()) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -65,13 +69,16 @@ function (SubscriptionCollection $subscriptions): Result { $errors[] = $error; } + // the cleanup runner removes the subscription (forced, even on error) + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + continue; } $subscriber = $this->subscriberRepository->get($subscription->id()); if (!$subscriber) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -86,7 +93,7 @@ function (SubscriptionCollection $subscriptions): Result { $teardownMethod = $subscriber->teardownMethod(); if (!$teardownMethod) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), @@ -113,7 +120,7 @@ function (SubscriptionCollection $subscriptions): Result { ); } - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), @@ -124,4 +131,10 @@ function (SubscriptionCollection $subscriptions): Result { }, ); } + + private function remove(Subscription $subscription): void + { + $this->subscriptionManager->remove($subscription); + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + } } diff --git a/src/Subscription/Engine/Handler/TeardownHandler.php b/src/Subscription/Engine/Handler/TeardownHandler.php index 51a14c593..9a2eb52f4 100644 --- a/src/Subscription/Engine/Handler/TeardownHandler.php +++ b/src/Subscription/Engine/Handler/TeardownHandler.php @@ -8,13 +8,16 @@ use Patchlevel\EventSourcing\Subscription\Engine\Command\Command; use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown; use Patchlevel\EventSourcing\Subscription\Engine\Error; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Result; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionCollection; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; +use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Throwable; use function sprintf; @@ -30,6 +33,7 @@ public function __construct( private readonly SubscriptionManager $subscriptionManager, private readonly SubscriberAccessorRepository $subscriberRepository, private readonly CleanupRunner $cleanupRunner, + private readonly EventDispatcherInterface $eventDispatcher, private readonly LoggerInterface|null $logger = null, ) { } @@ -52,8 +56,13 @@ function (SubscriptionCollection $subscriptions): Result { if ($error) { $errors[] = $error; + + continue; } + // the cleanup runner removed the subscription + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + continue; } @@ -73,7 +82,7 @@ function (SubscriptionCollection $subscriptions): Result { $teardownMethod = $subscriber->teardownMethod(); if (!$teardownMethod) { - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -113,7 +122,7 @@ function (SubscriptionCollection $subscriptions): Result { continue; } - $this->subscriptionManager->remove($subscription); + $this->remove($subscription); $this->logger?->info( sprintf( @@ -127,4 +136,10 @@ function (SubscriptionCollection $subscriptions): Result { }, ); } + + private function remove(Subscription $subscription): void + { + $this->subscriptionManager->remove($subscription); + $this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription)); + } } diff --git a/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php b/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php new file mode 100644 index 000000000..161b8faea --- /dev/null +++ b/src/Subscription/Engine/Listener/RemoveSubscriptionStreamListener.php @@ -0,0 +1,42 @@ +subscription->id(); + + $this->store->remove( + new Criteria( + new StreamCriterion($streamName), + ), + ); + + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription stream "%s" for subscription "%s" has been removed.', + $streamName, + $event->subscription->id(), + ), + ); + } +} diff --git a/src/Subscription/Engine/MessageProcessor.php b/src/Subscription/Engine/MessageProcessor.php index 534f76a0a..c24bb5fcf 100644 --- a/src/Subscription/Engine/MessageProcessor.php +++ b/src/Subscription/Engine/MessageProcessor.php @@ -92,7 +92,7 @@ public function process(int $index, Message $message, Subscription $subscription try { foreach ($subscribeMethods as $subscribeMethod) { - $subscribeMethod($message); + $subscribeMethod($message, $subscription); } } catch (Throwable $e) { $this->logger?->error( diff --git a/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php index 2f0411af2..77db678d4 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolver.php @@ -4,12 +4,11 @@ namespace Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; interface ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): mixed; + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed; public function support(ArgumentMetadata $argument, string $eventClass): bool; } diff --git a/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolverContext.php b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolverContext.php new file mode 100644 index 000000000..2bc1833e0 --- /dev/null +++ b/src/Subscription/Subscriber/ArgumentResolver/ArgumentResolverContext.php @@ -0,0 +1,19 @@ +event(); + return $context->message->event(); } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php new file mode 100644 index 000000000..018fdd341 --- /dev/null +++ b/src/Subscription/Subscriber/ArgumentResolver/EventEmitterResolver.php @@ -0,0 +1,45 @@ +subscriber; + + $enabled = match (true) { + $subscriber->disableEventEmitting => false, + $subscriber->enableEventEmittingDuringBoot => true, + // by default events are only emitted during run, not while booting + default => $context->subscription->isActive(), + }; + + if (!$enabled) { + return new NoopEventEmitter(); + } + + return new StoreEventEmitter( + $this->store, + 'subscription_' . $context->subscription->id(), + ); + } + + public function support(ArgumentMetadata $argument, string $eventClass): bool + { + return $argument->type->isIdentifiedBy(EventEmitter::class); + } +} diff --git a/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php b/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php index 6b466e148..90af8e0a8 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/LookupResolver.php @@ -4,7 +4,6 @@ namespace Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Store\Store; @@ -18,11 +17,11 @@ public function __construct( ) { } - public function resolve(ArgumentMetadata $argument, Message $message): Lookup + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): Lookup { return new Lookup( $this->store, - $message, + $context->message, $this->eventRegistry, ); } diff --git a/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php index 50cdb848f..c511ab022 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolver.php @@ -9,9 +9,9 @@ final class MessageArgumentResolver implements ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): Message + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): Message { - return $message; + return $context->message; } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php index ddc71f5b1..41853e1f9 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolver.php @@ -5,15 +5,14 @@ namespace Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; use DateTimeImmutable; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; final class RecordedOnArgumentResolver implements ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): DateTimeImmutable + public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): DateTimeImmutable { - return $message->header(RecordedOnHeader::class)->recordedOn; + return $context->message->header(RecordedOnHeader::class)->recordedOn; } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/src/Subscription/Subscriber/EventEmitter/EventEmitter.php b/src/Subscription/Subscriber/EventEmitter/EventEmitter.php new file mode 100644 index 000000000..f2534d6f5 --- /dev/null +++ b/src/Subscription/Subscriber/EventEmitter/EventEmitter.php @@ -0,0 +1,22 @@ + $events + */ + public function emit(array $events): void; + + /** + * Emit events into the given stream. + * + * @param list $events + */ + public function linkTo(string $streamName, array $events): void; +} diff --git a/src/Subscription/Subscriber/EventEmitter/NoopEventEmitter.php b/src/Subscription/Subscriber/EventEmitter/NoopEventEmitter.php new file mode 100644 index 000000000..328ac2683 --- /dev/null +++ b/src/Subscription/Subscriber/EventEmitter/NoopEventEmitter.php @@ -0,0 +1,18 @@ + $events */ + public function emit(array $events): void + { + } + + /** @param list $events */ + public function linkTo(string $streamName, array $events): void + { + } +} diff --git a/src/Subscription/Subscriber/EventEmitter/StoreEventEmitter.php b/src/Subscription/Subscriber/EventEmitter/StoreEventEmitter.php new file mode 100644 index 000000000..3af8d551f --- /dev/null +++ b/src/Subscription/Subscriber/EventEmitter/StoreEventEmitter.php @@ -0,0 +1,41 @@ + $events */ + public function emit(array $events): void + { + $this->linkTo($this->defaultStream, $events); + } + + /** @param list $events */ + public function linkTo(string $streamName, array $events): void + { + if ($events === []) { + return; + } + + $messages = []; + + foreach ($events as $event) { + $messages[] = Message::create($event) + ->withHeader(new StreamNameHeader($streamName)); + } + + $this->store->save(...$messages); + } +} diff --git a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php index 4d7d04e00..8eaccf0b0 100644 --- a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php +++ b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php @@ -10,6 +10,8 @@ use Patchlevel\EventSourcing\Metadata\Subscriber\SubscribeMethodMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; +use Patchlevel\EventSourcing\Subscription\Subscription; use Throwable; use function array_key_exists; @@ -19,7 +21,7 @@ /** @template T of object */ final class MetadataSubscriberAccessor { - /** @var array> */ + /** @var array> */ private array $subscribeCache = []; /** @@ -99,7 +101,7 @@ public function events(): array /** * @param class-string $eventClass * - * @return list + * @return list */ public function subscribeMethods(string $eventClass): array { @@ -128,18 +130,18 @@ public function subscribeMethods(string $eventClass): array /** * @param class-string $eventClass * - * @return Closure(Message):void + * @return Closure(Message, Subscription):void */ private function createClosure(string $eventClass, SubscribeMethodMetadata $method): Closure { $resolvers = $this->resolvers($eventClass, $method); $methodName = $method->name; - return function (Message $message) use ($methodName, $resolvers): void { + return function (Message $message, Subscription $subscription) use ($methodName, $resolvers): void { $arguments = []; foreach ($resolvers as $resolver) { - $arguments[] = $resolver($message); + $arguments[] = $resolver($message, $subscription); } $this->subscriber->$methodName(...$arguments); @@ -149,11 +151,12 @@ private function createClosure(string $eventClass, SubscribeMethodMetadata $meth /** * @param class-string $eventClass * - * @return list + * @return list */ private function resolvers(string $eventClass, SubscribeMethodMetadata $method): array { $resolvers = []; + $metadata = $this->metadata; foreach ($method->arguments as $argument) { foreach ($this->argumentResolvers as $resolver) { @@ -161,8 +164,8 @@ private function resolvers(string $eventClass, SubscribeMethodMetadata $method): continue; } - $resolvers[] = static function (Message $message) use ($resolver, $argument): mixed { - return $resolver->resolve($argument, $message); + $resolvers[] = static function (Message $message, Subscription $subscription) use ($resolver, $argument, $metadata): mixed { + return $resolver->resolve($argument, new ArgumentResolverContext($message, $subscription, $metadata)); }; continue 2; diff --git a/tests/Benchmark/NoopSubscriptionEngineBench.php b/tests/Benchmark/NoopSubscriptionEngineBench.php index 210c08d3b..b2c4b67fb 100644 --- a/tests/Benchmark/NoopSubscriptionEngineBench.php +++ b/tests/Benchmark/NoopSubscriptionEngineBench.php @@ -4,15 +4,18 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; -use Patchlevel\EventSourcing\Aggregate\AggregateRootId; +use Patchlevel\EventSourcing\Identifier\Identifier; use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; -use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; +use Patchlevel\EventSourcing\Subscription\Engine\Command\Boot; +use Patchlevel\EventSourcing\Subscription\Engine\Command\Remove; +use Patchlevel\EventSourcing\Subscription\Engine\Command\Setup; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; @@ -33,13 +36,13 @@ final class NoopSubscriptionEngineBench private SubscriptionEngine $subscriptionEngine; - private AggregateRootId $id; + private Identifier $id; public function setUp(): void { $connection = DbalManager::createConnection(); - $this->store = new DoctrineDbalStore( + $this->store = new StreamDoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), ); @@ -89,8 +92,8 @@ public function setUp(): void #[Bench\Revs(10)] public function benchHandle10000Events(): void { - $this->subscriptionEngine->setup(); - $this->subscriptionEngine->boot(); - $this->subscriptionEngine->remove(); + $this->subscriptionEngine->execute(new Setup()); + $this->subscriptionEngine->execute(new Boot()); + $this->subscriptionEngine->execute(new Remove()); } } diff --git a/tests/Integration/Subscription/Events/NotificationSent.php b/tests/Integration/Subscription/Events/NotificationSent.php new file mode 100644 index 000000000..a0f5fcf01 --- /dev/null +++ b/tests/Integration/Subscription/Events/NotificationSent.php @@ -0,0 +1,17 @@ + */ + public array $notifications = []; + + #[Subscribe(NotificationSent::class)] + public function onNotificationSent(NotificationSent $event): void + { + $this->notifications[] = $event; + } +} diff --git a/tests/Integration/Subscription/Subscriber/NotificationEmittingProjection.php b/tests/Integration/Subscription/Subscriber/NotificationEmittingProjection.php new file mode 100644 index 000000000..4a039928a --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/NotificationEmittingProjection.php @@ -0,0 +1,22 @@ +emit([new NotificationSent($event->profileId)]); + } +} diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 6da6843d2..2dd800487 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -19,6 +19,8 @@ use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; +use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DbalCleanupTaskHandler; use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DropTableTask; @@ -32,8 +34,10 @@ use Patchlevel\EventSourcing\Subscription\Engine\Command\Setup as SetupCommand; use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown as TeardownCommand; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\GapResolverStoreMessageLoader; +use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener; use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\ProcessedResult; use Patchlevel\EventSourcing\Subscription\Engine\Result; @@ -43,6 +47,7 @@ use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; @@ -50,6 +55,8 @@ use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerWithSelfRecoverySubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\LookupSubscriber; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\NotificationCollector; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\NotificationEmittingProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjection; @@ -57,6 +64,7 @@ use PHPUnit\Framework\Attributes\CoversNothing; use PHPUnit\Framework\TestCase; use RuntimeException; +use Symfony\Component\EventDispatcher\EventDispatcher; use function gc_collect_cycles; use function iterator_to_array; @@ -1463,6 +1471,89 @@ public function testLookup(): void self::assertSame('Hans', $result['name']); } + public function testEventEmitter(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $collector = new NotificationCollector(); + + $subscriberRepository = new MetadataSubscriberAccessorRepository( + [ + new NotificationEmittingProjection(), + $collector, + ], + argumentResolvers: [ + new EventEmitterResolver($store), + ], + ); + + $eventDispatcher = new EventDispatcher(); + $eventDispatcher->addListener( + OnSubscriptionRemoved::class, + new RemoveSubscriptionStreamListener($store), + ); + + $engine = new DefaultSubscriptionEngine( + new StoreMessageLoader($store), + $subscriptionStore, + $subscriberRepository, + eventDispatcher: $eventDispatcher, + ); + + $engine->execute(new SetupCommand()); + $engine->execute(new Boot()); + + $profileId = ProfileId::generate(); + $repository->save(Profile::create($profileId, 'John')); + + // the emitting projection reacts to ProfileCreated and emits a NotificationSent event + // into its projection stream, which the notification subscriber then consumes. Depending + // on the database driver this can happen in one or two runs, so we drain the engine. + do { + $result = $engine->execute(new Run()); + + self::assertInstanceOf(ProcessedResult::class, $result); + self::assertEquals([], $result->errors); + } while ($result->processedMessages > 0); + + self::assertSame(1, $store->count(new Criteria(new StreamCriterion('subscription_emitting')))); + self::assertCount(1, $collector->notifications); + self::assertEquals($profileId, $collector->notifications[0]->profileId); + + // removing the subscriptions also removes the projection stream + $engine->execute(new Remove()); + + self::assertNotContains('subscription_emitting', $store->streams()); + self::assertSame(0, $store->count(new Criteria(new StreamCriterion('subscription_emitting')))); + } + public function testRefreshSubscriptions(): void { $store = new StreamDoctrineDbalStore( diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index 144335160..a7d239f4a 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -4,6 +4,8 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Metadata\Subscriber; +use Patchlevel\EventSourcing\Attribute\DisableEventEmitting; +use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot; use Patchlevel\EventSourcing\Attribute\Processor; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; @@ -53,6 +55,36 @@ class { self::assertNull($metadata->setupMethod); self::assertNull($metadata->teardownMethod); self::assertSame('foo', $metadata->id); + self::assertFalse($metadata->enableEventEmittingDuringBoot); + self::assertFalse($metadata->disableEventEmitting); + } + + public function testEnableEventEmittingDuringBoot(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + #[EnableEventEmittingDuringBoot] + class { + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertTrue($metadata->enableEventEmittingDuringBoot); + self::assertFalse($metadata->disableEventEmitting); + } + + public function testDisableEventEmitting(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + #[DisableEventEmitting] + class { + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertTrue($metadata->disableEventEmitting); + self::assertFalse($metadata->enableEventEmittingDuringBoot); } public function testProjector(): void diff --git a/tests/Unit/Metadata/Subscriber/SubscriberMetadataTest.php b/tests/Unit/Metadata/Subscriber/SubscriberMetadataTest.php new file mode 100644 index 000000000..eef513993 --- /dev/null +++ b/tests/Unit/Metadata/Subscriber/SubscriberMetadataTest.php @@ -0,0 +1,21 @@ +enableEventEmittingDuringBoot); + self::assertFalse($metadata->disableEventEmitting); + } +} diff --git a/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php index f70eb96d0..d1b3a1ced 100644 --- a/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/RemoveHandlerTest.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Subscription\Engine\CleanerNotConfigured; use Patchlevel\EventSourcing\Subscription\Engine\CleanupRunner; use Patchlevel\EventSourcing\Subscription\Engine\Command\Remove as RemoveCommand; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Handler\RemoveHandler; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -24,10 +25,32 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(RemoveHandler::class)] final class RemoveHandlerTest extends TestCase { + /** @var list */ + private array $removedSubscriptions = []; + + protected function setUp(): void + { + $this->removedSubscriptions = []; + } + + private function recordingDispatcher(): EventDispatcher + { + $dispatcher = new EventDispatcher(); + $dispatcher->addListener( + OnSubscriptionRemoved::class, + function (OnSubscriptionRemoved $event): void { + $this->removedSubscriptions[] = $event->subscription; + }, + ); + + return $dispatcher; + } + /** @param list $subscribers */ private function createHandler( DummySubscriptionStore $store, @@ -40,6 +63,7 @@ private function createHandler( $subscriptionManager, new MetadataSubscriberAccessorRepository($subscribers), $cleanupRunner ?? new CleanupRunner($subscriptionManager, null, new NullLogger()), + $this->recordingDispatcher(), new NullLogger(), ); } @@ -68,6 +92,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); self::assertTrue($subscriber->dropped); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithoutDropMethod(): void @@ -86,6 +111,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithSubscriberAndError(): void @@ -115,6 +141,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveNewSubscriber(): void @@ -141,6 +168,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); self::assertFalse($subscriber->dropped); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithoutSubscriber(): void @@ -156,6 +184,7 @@ public function testRemoveWithoutSubscriber(): void self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithCleanupAndWithoutCleaner(): void @@ -206,6 +235,7 @@ class { $subscriptionManager, new MetadataSubscriberAccessorRepository([$subscriber]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + $this->recordingDispatcher(), new NullLogger(), ); @@ -214,6 +244,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testRemoveWithCleanupHandlerError(): void @@ -239,6 +270,7 @@ public function testRemoveWithCleanupHandlerError(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + $this->recordingDispatcher(), new NullLogger(), ); @@ -251,5 +283,6 @@ public function testRemoveWithCleanupHandlerError(): void self::assertInstanceOf(CleanupFailed::class, $error->throwable); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } } diff --git a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php index 880864f26..934e3e7f4 100644 --- a/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/TeardownHandlerTest.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Subscription\Engine\CleanerNotConfigured; use Patchlevel\EventSourcing\Subscription\Engine\CleanupRunner; use Patchlevel\EventSourcing\Subscription\Engine\Command\Teardown as TeardownCommand; +use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Handler\TeardownHandler; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -24,10 +25,32 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(TeardownHandler::class)] final class TeardownHandlerTest extends TestCase { + /** @var list */ + private array $removedSubscriptions = []; + + protected function setUp(): void + { + $this->removedSubscriptions = []; + } + + private function recordingDispatcher(): EventDispatcher + { + $dispatcher = new EventDispatcher(); + $dispatcher->addListener( + OnSubscriptionRemoved::class, + function (OnSubscriptionRemoved $event): void { + $this->removedSubscriptions[] = $event->subscription; + }, + ); + + return $dispatcher; + } + /** @param list $subscribers */ private function createHandler( DummySubscriptionStore $store, @@ -40,6 +63,7 @@ private function createHandler( $subscriptionManager, new MetadataSubscriberAccessorRepository($subscribers), $cleanupRunner ?? new CleanupRunner($subscriptionManager, null, new NullLogger()), + $this->recordingDispatcher(), new NullLogger(), ); } @@ -60,6 +84,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithSubscriber(): void @@ -86,6 +111,7 @@ public function drop(): void $store->assertNoUpdated(); $store->assertRemoved($subscription); self::assertTrue($subscriber->dropped); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithSubscriberAndError(): void @@ -115,6 +141,7 @@ public function drop(): void self::assertInstanceOf(RuntimeException::class, $error->throwable); $store->assertNoChanges(); + self::assertSame([], $this->removedSubscriptions); } public function testTeardownWithoutSubscriber(): void @@ -130,6 +157,7 @@ public function testTeardownWithoutSubscriber(): void self::assertEquals([], $result->errors); $store->assertNoChanges(); + self::assertSame([], $this->removedSubscriptions); } public function testTeardownWithCleanupAndWithoutCleaner(): void @@ -180,6 +208,7 @@ class { $subscriptionManager, new MetadataSubscriberAccessorRepository([$subscriber]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + $this->recordingDispatcher(), new NullLogger(), ); @@ -188,6 +217,7 @@ class { self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithCleanupAndWithoutSubscriber(): void @@ -213,6 +243,7 @@ public function testTeardownWithCleanupAndWithoutSubscriber(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + $this->recordingDispatcher(), new NullLogger(), ); @@ -221,6 +252,7 @@ public function testTeardownWithCleanupAndWithoutSubscriber(): void self::assertEquals([], $result->errors); $store->assertNoUpdated(); $store->assertRemoved($subscription); + self::assertSame([$subscription], $this->removedSubscriptions); } public function testTeardownWithCleanupHandlerError(): void @@ -247,6 +279,7 @@ public function testTeardownWithCleanupHandlerError(): void $subscriptionManager, new MetadataSubscriberAccessorRepository([]), new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + $this->recordingDispatcher(), new NullLogger(), ); @@ -259,5 +292,42 @@ public function testTeardownWithCleanupHandlerError(): void self::assertInstanceOf(CleanupFailed::class, $error->throwable); $store->assertNoChanges(); + self::assertSame([], $this->removedSubscriptions); + } + + public function testTeardownContinuesWithNextSubscriptionAfterCleanupError(): void + { + $failingTask = new DropTableTask('failing'); + $passingTask = new DropTableTask('passing'); + + $failing = new Subscription('failing', Subscription::DEFAULT_GROUP, RunMode::FromBeginning, Status::Detached, cleanupTasks: [$failingTask]); + $passing = new Subscription('passing', Subscription::DEFAULT_GROUP, RunMode::FromBeginning, Status::Detached, cleanupTasks: [$passingTask]); + $store = new DummySubscriptionStore([$failing, $passing]); + + $cleanupHandler = $this->createMock(CleanupTaskHandler::class); + $cleanupHandler->method('supports')->willReturn(true); + $cleanupHandler->method('__invoke')->willReturnCallback( + static function (object $task) use ($failingTask): void { + if ($task === $failingTask) { + throw new RuntimeException('ERROR'); + } + }, + ); + + $subscriptionManager = new SubscriptionManager($store); + $handler = new TeardownHandler( + $subscriptionManager, + new MetadataSubscriberAccessorRepository([]), + new CleanupRunner($subscriptionManager, new DefaultCleaner([$cleanupHandler]), new NullLogger()), + $this->recordingDispatcher(), + new NullLogger(), + ); + + $result = $handler(new TeardownCommand()); + + self::assertCount(1, $result->errors); + // the second subscription is still processed after the first one failed + $store->assertRemoved($passing); + self::assertSame([$passing], $this->removedSubscriptions); } } diff --git a/tests/Unit/Subscription/Engine/Listener/RemoveSubscriptionStreamListenerTest.php b/tests/Unit/Subscription/Engine/Listener/RemoveSubscriptionStreamListenerTest.php new file mode 100644 index 000000000..a1f924328 --- /dev/null +++ b/tests/Unit/Subscription/Engine/Listener/RemoveSubscriptionStreamListenerTest.php @@ -0,0 +1,29 @@ +createMock(Store::class); + $store->expects($this->once()) + ->method('remove') + ->with(new Criteria(new StreamCriterion('subscription_foo'))); + + $listener = new RemoveSubscriptionStreamListener($store); + $listener(new OnSubscriptionRemoved(new Subscription('foo'))); + } +} diff --git a/tests/Unit/Subscription/Engine/MessageProcessorTest.php b/tests/Unit/Subscription/Engine/MessageProcessorTest.php new file mode 100644 index 000000000..9d61a54b7 --- /dev/null +++ b/tests/Unit/Subscription/Engine/MessageProcessorTest.php @@ -0,0 +1,54 @@ +event = $event; + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + new NullLogger(), + ); + + $event = new ProfileVisited(ProfileId::fromString('1')); + $message = Message::create($event); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNull($error); + self::assertSame($event, $subscriber->event); + self::assertSame(1, $subscription->position()); + } +} diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php index d530a7a94..916b10920 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventArgumentResolverTest.php @@ -6,7 +6,10 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; @@ -66,7 +69,7 @@ public function testResolve(): void $event, $resolver->resolve( new ArgumentMetadata('foo', Type::object(ProfileVisited::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php new file mode 100644 index 000000000..50bd8504c --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/EventEmitterResolverTest.php @@ -0,0 +1,125 @@ +createMock(Store::class)); + + self::assertTrue( + $resolver->support( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + ProfileVisited::class, + ), + ); + + self::assertFalse( + $resolver->support( + new ArgumentMetadata('foo', Type::object(ProfileVisited::class)), + ProfileVisited::class, + ), + ); + } + + public function testResolveDuringRunReturnsDefaultEmitter(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Active), + ); + + self::assertInstanceOf(StoreEventEmitter::class, $emitter); + } + + public function testResolvedEmitterEmitsToSubscriptionStream(): void + { + $store = new InMemoryStore(); + $resolver = new EventEmitterResolver($store); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Active), + ); + + $emitter->emit([new stdClass()]); + + self::assertSame(['subscription_foo'], $store->streams()); + } + + public function testResolveDuringBootReturnsNoopByDefault(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Booting), + ); + + self::assertInstanceOf(NoopEventEmitter::class, $emitter); + } + + public function testEnableEventEmittingDuringBoot(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Booting, enableEventEmittingDuringBoot: true), + ); + + self::assertInstanceOf(StoreEventEmitter::class, $emitter); + } + + public function testDisableEventEmitting(): void + { + $resolver = new EventEmitterResolver($this->createMock(Store::class)); + + $emitter = $resolver->resolve( + new ArgumentMetadata('emitter', Type::object(EventEmitter::class)), + $this->context(Status::Active, disableEventEmitting: true), + ); + + self::assertInstanceOf(NoopEventEmitter::class, $emitter); + } + + private function context( + Status $status, + bool $enableEventEmittingDuringBoot = false, + bool $disableEventEmitting = false, + ): ArgumentResolverContext { + return new ArgumentResolverContext( + new Message(new stdClass()), + new Subscription('foo', status: $status), + new SubscriberMetadata( + 'foo', + enableEventEmittingDuringBoot: $enableEventEmittingDuringBoot, + disableEventEmitting: $disableEventEmitting, + ), + ); + } +} diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php index 2e62b07b9..425e2a59a 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/LookupResolverTest.php @@ -7,10 +7,13 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Event\EventRegistry; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; use Patchlevel\EventSourcing\Store\Header\IndexHeader; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Lookup\Lookup; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; @@ -58,7 +61,7 @@ public function testResolve(): void $lookup = $resolver->resolve( new ArgumentMetadata('foo', Type::object(Lookup::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ); self::assertInstanceOf(Lookup::class, $lookup); diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php index 0d76a7c0f..facfbc1c6 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/MessageArgumentResolverTest.php @@ -6,7 +6,10 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use stdClass; @@ -43,7 +46,7 @@ public function testResolve(): void $message, $resolver->resolve( new ArgumentMetadata('foo', Type::object(Message::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php index c3697ef0e..2f4e26160 100644 --- a/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/RecordedOnArgumentResolverTest.php @@ -7,8 +7,11 @@ use DateTimeImmutable; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadata; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\RecordedOnArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use stdClass; @@ -49,7 +52,7 @@ public function testResolveFromAggregateHeader(): void $date, $resolver->resolve( new ArgumentMetadata('foo', Type::object(DateTimeImmutable::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } @@ -65,7 +68,7 @@ public function testResolveFromRecordedOnHeader(): void $date, $resolver->resolve( new ArgumentMetadata('foo', Type::object(DateTimeImmutable::class)), - $message, + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), ), ); } diff --git a/tests/Unit/Subscription/Subscriber/EventEmitter/NoopEventEmitterTest.php b/tests/Unit/Subscription/Subscriber/EventEmitter/NoopEventEmitterTest.php new file mode 100644 index 000000000..7cb295671 --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/EventEmitter/NoopEventEmitterTest.php @@ -0,0 +1,24 @@ +emit([new stdClass()]); + $emitter->linkTo('foo', [new stdClass()]); + + $this->expectNotToPerformAssertions(); + } +} diff --git a/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php b/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php new file mode 100644 index 000000000..2195abc40 --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/EventEmitter/StoreEventEmitterTest.php @@ -0,0 +1,55 @@ +emit([new stdClass(), new stdClass()]); + + self::assertSame(['subscription_foo'], $store->streams()); + self::assertSame(2, $store->count(new Criteria(new StreamCriterion('subscription_foo')))); + } + + public function testLinkToWritesToGivenStream(): void + { + $store = new InMemoryStore(); + $emitter = new StoreEventEmitter($store, 'subscription_foo'); + + $emitter->linkTo('other_stream', [new stdClass()]); + + self::assertSame(['other_stream'], $store->streams()); + + $message = $store->load(new Criteria(new StreamCriterion('other_stream')))->current(); + + self::assertNotNull($message); + self::assertSame('other_stream', $message->header(StreamNameHeader::class)->streamName); + } + + public function testEmitWithoutEventsDoesNotTouchTheStore(): void + { + $store = $this->createMock(Store::class); + $store->expects($this->never())->method('save'); + + $emitter = new StoreEventEmitter($store, 'subscription_foo'); + + $emitter->emit([]); + } +} diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php index 757a10362..3c27b8dec 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php @@ -6,7 +6,6 @@ use ArrayIterator; use Patchlevel\EventSourcing\Attribute\Subscriber; -use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -38,7 +37,7 @@ class { $metadataFactory = new AttributeSubscriberMetadataFactory(); $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): mixed + public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed { return null; } @@ -73,7 +72,7 @@ public function support(ArgumentMetadata $argument, string $eventClass): bool public function testArgumentResolversCanBeArraysAndIterators(): void { $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, Message $message): mixed + public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed { return null; } diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php index 2e162417a..4ce898a2a 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php @@ -15,6 +15,7 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; +use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\Attributes\CoversClass; @@ -50,7 +51,7 @@ public function onProfileVisited(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - $result[0]($message); + $result[0]($message, new Subscription('profile')); self::assertSame($message, $subscriber->message); } @@ -82,7 +83,7 @@ public function on(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - $result[0]($message); + $result[0]($message, new Subscription('profile')); self::assertSame($message, $subscriber->message); } @@ -136,7 +137,7 @@ public function on(Message $message): void $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - $result[0]($message); + $result[0]($message, new Subscription('profile')); self::assertSame($message, $subscriber->message); }