diff --git a/docs/UPGRADE-4.0.md b/docs/UPGRADE-4.0.md index cfdaddb68..bf0565e6a 100644 --- a/docs/UPGRADE-4.0.md +++ b/docs/UPGRADE-4.0.md @@ -243,6 +243,136 @@ final class CustomResolver implements ArgumentResolver } ``` +### Custom ArgumentResolver registration + +Custom argument resolvers are no longer passed to the `MetadataSubscriberAccessorRepository`. +They are now passed to the `DefaultSubscriptionEngine`, which forwards them to the message processor. + +Before: + +```php +$subscriberRepository = new MetadataSubscriberAccessorRepository( + [new MySubscriber()], + argumentResolvers: [new MyResolver()], +); + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberRepository, +); +``` + +After: + +```php +$subscriberRepository = new MetadataSubscriberAccessorRepository( + [new MySubscriber()], +); + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberRepository, + argumentResolvers: [new MyResolver()], +); +``` + +### Batchable Subscriber + +The `Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber` interface has been removed. +Batching is now configured with attributes and the subscriber stays stateless: the data you collect +during a batch lives in a state object that the engine creates, keeps and hands back to your methods. + +* `beginBatch()` becomes a `#[BatchBegin]` method that returns the state object. +* `commitBatch()` becomes a `#[BatchFlush]` method that receives the state object. The batch size is now + configured on the attribute (`#[BatchFlush(afterMessages: 1000)]`). +* `rollbackBatch()` becomes a `#[BatchRollback]` method that receives the state object (optional). +* `forceCommit()` becomes a `#[BatchShouldFlush]` method that receives the state object (optional). +* The handler receives the state object through a parameter marked with `#[BatchState]`. + +Before: + +```php +use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; + +#[Projector('profile_1')] +final class MigrationSubscriber implements BatchableSubscriber +{ + /** @var array */ + private array $nameChanged = []; + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $event): void + { + $this->nameChanged[$event->userId] = $event->name; + } + + public function beginBatch(): void + { + $this->nameChanged = []; + } + + public function commitBatch(): void + { + // ... persist $this->nameChanged + $this->nameChanged = []; + } + + public function rollbackBatch(): void + { + } + + public function forceCommit(): bool + { + return count($this->nameChanged) > 1000; + } +} +``` + +After: + +```php +use Patchlevel\EventSourcing\Attribute\BatchState; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; + +final class MigrationBatch +{ + /** @var array */ + public array $nameChanged = []; +} + +#[Projector('profile_1')] +final class MigrationSubscriber +{ + #[BatchBegin] + public function beginBatch(): MigrationBatch + { + return new MigrationBatch(); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $event, #[BatchState] MigrationBatch $batch): void + { + $batch->nameChanged[$event->userId] = $event->name; + } + + #[BatchFlush(afterMessages: 1000)] + public function flush(MigrationBatch $batch): void + { + // ... persist $batch->nameChanged + } + + #[BatchRollback] + public function rollback(MigrationBatch $batch): void + { + } +} +``` + ## Store ### StreamStore diff --git a/docs/subscription.md b/docs/subscription.md index 5451aab58..9863de149 100644 --- a/docs/subscription.md +++ b/docs/subscription.md @@ -282,6 +282,23 @@ final class PublicProfileProjection More information can be found in the [reducer](message.md#reducer) documentation. ::: +The lookup resolver is not registered by default. Pass a `LookupResolver` to the subscription engine +via the `argumentResolvers` argument; it needs the event store to read the previous messages from: + +```php +use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver; + +$engine = new DefaultSubscriptionEngine( + $messageLoader, + $subscriptionStore, + $subscriberAccessorRepository, + argumentResolvers: [ + new LookupResolver($store), + ], +); +``` + ##### Recorded On Resolver The recorded on resolver resolves the recorded on date. @@ -341,28 +358,16 @@ $eventEmitter->linkTo('notifications', [new NotificationRequired($event->orderId The emitted events must be registered like any other event so the store can (de)serialize them. ::: -To configure the resolver, pass an `EventEmitterResolver` to the subscriber accessor repository, -the same way as the [lookup resolver](#lookup-resolver). It needs the event store to append to: - -```php -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; - -$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository( - $subscribers, - argumentResolvers: [ - new EventEmitterResolver($store), - ], -); -``` - -To also clean up the subscription stream when a subscription is removed, register the -`RemoveSubscriptionStreamListener` on the event dispatcher and pass it to the engine: +To configure the resolver, pass an `EventEmitterResolver` to the subscription engine via the +`argumentResolvers` argument; it needs the event store to append to. To also clean up the +subscription stream when a subscription is removed, register the `RemoveSubscriptionStreamListener` +on an event dispatcher and pass that to the engine as well: ```php use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved; use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver; use Symfony\Component\EventDispatcher\EventDispatcher; $eventDispatcher = new EventDispatcher(); @@ -376,6 +381,9 @@ $engine = new DefaultSubscriptionEngine( $subscriptionStore, $subscriberAccessorRepository, eventDispatcher: $eventDispatcher, + argumentResolvers: [ + new EventEmitterResolver($store), + ], ); ``` @@ -752,39 +760,53 @@ For more information, see the [retry strategy](#retry-strategy) documentation. You can also optimize the performance of your subscribers by processing a number of events in a batch. This is particularly useful when projections need to be rebuilt. -To achieve this, you can implement the `BatchableSubscriber` interface. +To achieve this, you mark the relevant methods with the batch attributes. +The subscriber itself stays stateless: the `#[BatchBegin]` method returns a state object that the +engine keeps for you and hands back to the handler (via a `#[BatchState]` parameter) and to the +flush and rollback methods. ```php use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\Attribute\BatchState; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; use Patchlevel\EventSourcing\Attribute\Projector; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; +use Patchlevel\EventSourcing\Attribute\Subscribe; + +final class MigrationBatch +{ + /** @var array */ + public array $nameChanged = []; +} #[Projector('profile_1')] -final class MigrationSubscriber implements BatchableSubscriber +final class MigrationSubscriber { public function __construct( private readonly Connection $connection, ) { } - /** @var array */ - private array $nameChanged = []; - - #[Subscribe(NameChanged::class)] - public function handleNameChanged(NameChanged $event): void + #[BatchBegin] + public function beginBatch(): MigrationBatch { - $this->nameChanged[$event->userId] = $event->name; + $this->connection->beginTransaction(); + + return new MigrationBatch(); } - public function beginBatch(): void + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $event, #[BatchState] MigrationBatch $batch): void { - $this->nameChanged = []; - $this->connection->beginTransaction(); + $batch->nameChanged[$event->userId] = $event->name; } - public function commitBatch(): void + #[BatchFlush(afterMessages: 1000)] + public function flush(MigrationBatch $batch): void { - foreach ($this->nameChanged as $userId => $name) { + foreach ($batch->nameChanged as $userId => $name) { $this->connection->executeStatement( 'UPDATE user SET name = :name WHERE id = :id', ['name' => $name, 'id' => $userId], @@ -792,50 +814,53 @@ final class MigrationSubscriber implements BatchableSubscriber } $this->connection->commit(); - $this->nameChanged = []; } - public function rollbackBatch(): void + #[BatchShouldFlush] + public function shouldFlush(MigrationBatch $batch): bool { - $this->connection->rollBack(); + return count($batch->nameChanged) > 1000; } - public function forceCommit(): bool + #[BatchRollback] + public function rollback(MigrationBatch $batch): void { - return count($this->nameChanged) > 1000; + $this->connection->rollBack(); } } ``` -This interface provides you with all the options you need to process your data collectively. -The `beginBatch` method is called as soon as a subscriber wants to process an event. +The `#[BatchBegin]` method is optional and called as soon as a subscriber wants to process an event. If no suitable event is found in the stream, batching will not start, and this method will not be called. -Here, you can make all necessary preparations, such as opening a transaction or preparing variables. +Here, you can make all necessary preparations, such as opening a transaction, and optionally return the +state object that holds the data you collect during the batch. +If you do not define a `#[BatchBegin]` method, or it returns nothing, an empty `stdClass` object is used as the state. -The `commitBatch` method is called when batching was previously started, and one of the following conditions is met: +The `#[BatchFlush]` method is called when batching was previously started, and one of the following conditions is met: Either the Subscription Engine reaches its limit, or the stream is finished. -Alternatively, if the subscriber explicitly indicates using the `forceCommit` method that they want to process the data now. +You can also pass `afterMessages` to the attribute (`#[BatchFlush(afterMessages: 1000)]`) to flush automatically +after that many messages, or implement a `#[BatchShouldFlush]` method for custom logic. At this step, you must process all the data. -The `rollbackBatch` method is called when an error occurs and the batching needs to be aborted. +The `#[BatchRollback]` method is optional and called when an error occurs and the batching needs to be aborted. Here, you can respond to the error and potentially perform a database rollback. -The method `forceCommit` is called after each handled event, -and you can decide whether the batch commit process should start now. +The `#[BatchShouldFlush]` method is optional and called after each handled event, +and you can decide whether the flush process should start now. This helps to determine the batch size and thus avoid memory overflow. :::danger -Make sure to fully process the data in `commitBatch` and close any open transactions. +Make sure to fully process the data in the `#[BatchFlush]` method and close any open transactions. Otherwise, it may lead to inconsistent data. ::: :::note -The position of the subscriber is only updated after a successful commit. +The position of the subscriber is only updated after a successful flush. In case of an error, the position remains at the state before the batch started. ::: :::tip -Use `forceCommit` to prevent memory leaks. +Use `#[BatchFlush(afterMessages: ...)]` or a `#[BatchShouldFlush]` method to prevent memory leaks. This allows you to decide when it's suitable to process the data and then release the memory. ::: diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 6c4abf6bf..b463caf87 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -589,7 +589,7 @@ parameters: path: tests/Unit/Message/Translator/ReplaceEventTranslatorTest.php - - message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:266\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' + message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:278\:\:profileVisited\(\) has parameter \$message with no type specified\.$#' identifier: missingType.parameter count: 1 path: tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php diff --git a/src/Attribute/BatchBegin.php b/src/Attribute/BatchBegin.php new file mode 100644 index 000000000..cbb3c2746 --- /dev/null +++ b/src/Attribute/BatchBegin.php @@ -0,0 +1,12 @@ + $attributes */ public function __construct( public readonly string $name, public readonly Type $type, + public readonly array $attributes = [], ) { } + + /** @param class-string $attributeClass */ + public function hasAttribute(string $attributeClass): bool + { + foreach ($this->attributes as $attribute) { + if ($attribute instanceof $attributeClass) { + return true; + } + } + + return false; + } } diff --git a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php index a959a255a..cf558df61 100644 --- a/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php +++ b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php @@ -4,6 +4,11 @@ namespace Patchlevel\EventSourcing\Metadata\Subscriber; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\Cleanup; use Patchlevel\EventSourcing\Attribute\DisableEventEmitting; use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot; @@ -19,6 +24,7 @@ use Symfony\Component\TypeInfo\TypeResolver\TypeResolver; use function array_key_exists; +use function array_map; use function count; final class AttributeSubscriberMetadataFactory implements SubscriberMetadataFactory @@ -57,6 +63,11 @@ public function metadata(string $subscriber): SubscriberMetadata $teardownMethod = null; $cleanupMethod = null; $failedMethod = null; + $beginBatchMethod = null; + $flushMethod = null; + $flushAfterMessages = null; + $shouldFlushMethod = null; + $rollbackBatchMethod = null; foreach ($methods as $method) { $attributes = $method->getAttributes(Subscribe::class); @@ -77,6 +88,57 @@ public function metadata(string $subscriber): SubscriberMetadata $subscribeMethods[$eventClass] = $this->subscribeMethod($method); } + if ($method->getAttributes(BatchBegin::class)) { + if ($beginBatchMethod !== null) { + throw new DuplicateBeginBatchMethod( + $subscriber, + $beginBatchMethod, + $method->getName(), + ); + } + + $beginBatchMethod = $method->getName(); + } + + $flushAttributes = $method->getAttributes(BatchFlush::class); + + if ($flushAttributes !== []) { + if ($flushMethod !== null) { + throw new DuplicateFlushMethod( + $subscriber, + $flushMethod, + $method->getName(), + ); + } + + $flushMethod = $method->getName(); + $flushAfterMessages = $flushAttributes[0]->newInstance()->afterMessages; + } + + if ($method->getAttributes(BatchShouldFlush::class)) { + if ($shouldFlushMethod !== null) { + throw new DuplicateShouldFlushMethod( + $subscriber, + $shouldFlushMethod, + $method->getName(), + ); + } + + $shouldFlushMethod = $method->getName(); + } + + if ($method->getAttributes(BatchRollback::class)) { + if ($rollbackBatchMethod !== null) { + throw new DuplicateRollbackBatchMethod( + $subscriber, + $rollbackBatchMethod, + $method->getName(), + ); + } + + $rollbackBatchMethod = $method->getName(); + } + if ($method->getAttributes(OnFailed::class)) { if ($failedMethod !== null) { throw new DuplicateFailedMethod( @@ -148,6 +210,28 @@ public function metadata(string $subscriber): SubscriberMetadata throw DuplicateSubscribeMethod::mixedWithAll($subscriber); } + $usesBatching = $beginBatchMethod !== null + || $flushMethod !== null + || $shouldFlushMethod !== null + || $rollbackBatchMethod !== null + || $this->hasBatchArgument($subscribeMethods); + + $batch = null; + + if ($usesBatching) { + if ($flushMethod === null) { + throw IncompleteBatchMethods::missingFlushMethod($subscriber); + } + + $batch = new BatchMetadata( + $flushMethod, + $beginBatchMethod, + $shouldFlushMethod, + $rollbackBatchMethod, + $flushAfterMessages, + ); + } + $metadata = new SubscriberMetadata( $subscriberInfo->id, $subscriberInfo->group, @@ -160,6 +244,7 @@ public function metadata(string $subscriber): SubscriberMetadata $cleanupMethod, $reflector->getAttributes(EnableEventEmittingDuringBoot::class) !== [], $reflector->getAttributes(DisableEventEmitting::class) !== [], + $batch, ); $this->subscriberMetadata[$subscriber] = $metadata; @@ -185,6 +270,10 @@ private function subscribeMethod(ReflectionMethod $method): SubscribeMethodMetad $arguments[] = new ArgumentMetadata( $parameter->getName(), $this->typeResolver->resolve($type), + array_map( + static fn (ReflectionAttribute $attribute): object => $attribute->newInstance(), + $parameter->getAttributes(), + ), ); } @@ -194,6 +283,20 @@ private function subscribeMethod(ReflectionMethod $method): SubscribeMethodMetad ); } + /** @param array $subscribeMethods */ + private function hasBatchArgument(array $subscribeMethods): bool + { + foreach ($subscribeMethods as $subscribeMethod) { + foreach ($subscribeMethod->arguments as $argument) { + if ($argument->hasAttribute(BatchState::class)) { + return true; + } + } + } + + return false; + } + private function retryStrategy(ReflectionClass $reflector): string|null { $attributes = $reflector->getAttributes(RetryStrategy::class); diff --git a/src/Metadata/Subscriber/BatchMetadata.php b/src/Metadata/Subscriber/BatchMetadata.php new file mode 100644 index 000000000..417ce8f59 --- /dev/null +++ b/src/Metadata/Subscriber/BatchMetadata.php @@ -0,0 +1,17 @@ +, Handler> */ private readonly array $handlers; + /** @param iterable $argumentResolvers */ public function __construct( private readonly MessageLoader $messageLoader, SubscriptionStore $subscriptionStore, @@ -62,6 +66,7 @@ public function __construct( private readonly LoggerInterface|null $logger = null, private readonly Cleaner|null $cleaner = null, private readonly EventDispatcherInterface $eventDispatcher = new EventDispatcher(), + iterable $argumentResolvers = [], ) { $this->subscriptionManager = new SubscriptionManager($subscriptionStore); @@ -80,9 +85,12 @@ public function __construct( $this->logger, ); + $batchManager = new BatchManager(); + $messageProcessor = new MessageProcessor( $this->subscriberRepository, $this->eventDispatcher, + [new BatchArgumentResolver($batchManager), ...$argumentResolvers], $this->logger, ); @@ -159,6 +167,7 @@ public function __construct( $this->eventDispatcher->addSubscriber( new BatchSubscriber( + $batchManager, $this->subscriberRepository, $this->logger, ), diff --git a/src/Subscription/Engine/Listener/BatchSubscriber.php b/src/Subscription/Engine/Listener/BatchSubscriber.php index 1f06d0fe3..a72cab462 100644 --- a/src/Subscription/Engine/Listener/BatchSubscriber.php +++ b/src/Subscription/Engine/Listener/BatchSubscriber.php @@ -4,28 +4,29 @@ namespace Patchlevel\EventSourcing\Subscription\Engine\Listener; +use InvalidArgumentException; use Patchlevel\EventSourcing\Subscription\Engine\Error; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnCommand; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessage; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageError; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageSuccess; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnProcessingFinished; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; +use Patchlevel\EventSourcing\Subscription\Subscriber\Batch; +use Patchlevel\EventSourcing\Subscription\Subscriber\BatchManager; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; -use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; +use stdClass; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Throwable; +use function is_object; use function sprintf; /** @internal */ final class BatchSubscriber implements EventSubscriberInterface { - /** @var array */ - private array $batching = []; - public function __construct( + private readonly BatchManager $batchManager, private readonly SubscriberAccessorRepository $subscriberRepository, private readonly LoggerInterface|null $logger = null, ) { @@ -33,41 +34,35 @@ public function __construct( public function onCommand(OnCommand $event): void { - $this->batching = []; + $this->batchManager->clear(); } public function onHandleMessage(OnHandleMessage $event): void { $subscriberId = $event->subscription->id(); - if (isset($this->batching[$subscriberId])) { + if ($this->batchManager->has($subscriberId)) { return; } $subscriber = $this->subscriberRepository->get($subscriberId); + $batchMetadata = $subscriber?->metadata()->batch; - if (!$subscriber) { + if ($subscriber === null || $batchMetadata === null) { return; } $realSubscriber = $subscriber->subscriber(); - if (!$realSubscriber instanceof BatchableSubscriber) { - return; - } - - $this->batching[$subscriberId] = [ - 'subscriber' => $realSubscriber, - 'subscription' => $event->subscription, - ]; - $this->logger?->debug(sprintf( 'Subscription Engine: Subscriber "%s" starts a new batch.', $subscriberId, )); + $beginMethod = $batchMetadata->beginMethod; + try { - $realSubscriber->beginBatch(); + $state = $beginMethod !== null ? $realSubscriber->$beginMethod() : null; } catch (Throwable $e) { $this->logger?->error(sprintf( 'Subscription Engine: Subscriber "%s" has an error in the begin batch method: %s', @@ -77,36 +72,59 @@ public function onHandleMessage(OnHandleMessage $event): void throw $e; } + + if ($state !== null && !is_object($state)) { + throw new InvalidArgumentException(sprintf( + 'Subscription Engine: Subscriber "%s" begin batch method must return an object or null.', + $subscriberId, + )); + } + + $this->batchManager->add(new Batch( + $event->subscription, + $subscriber, + $state ?? new stdClass(), + )); } public function onHandleMessageSuccess(OnHandleMessageSuccess $event): void { $subscriberId = $event->subscription->id(); - if (!isset($this->batching[$subscriberId])) { + if (!$this->batchManager->has($subscriberId)) { return; } - if (!$this->shouldCommitBatch($event->subscription)) { + $batch = $this->batchManager->get($subscriberId); + $batch->count++; + + if (!$this->shouldFlush($batch)) { $event->shouldChangePosition = false; return; } - $subscriber = $this->batching[$subscriberId]['subscriber']; - unset($this->batching[$subscriberId]); + $this->batchManager->remove($subscriberId); $this->logger?->debug(sprintf( - 'Subscription Engine: Subscriber "%s" commits the batch.', + 'Subscription Engine: Subscriber "%s" flushes the batch.', $subscriberId, )); + $flushMethod = $batch->accessor->metadata()->batch?->flushMethod; + + if ($flushMethod === null) { + $event->shouldChangePosition = true; + + return; + } + try { - $subscriber->commitBatch(); + $batch->accessor->subscriber()->$flushMethod($batch->state); $event->shouldChangePosition = true; } catch (Throwable $e) { $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" has an error in the commit batch method: %s', + 'Subscription Engine: Subscriber "%s" has an error in the flush method: %s', $subscriberId, $e->getMessage(), )); @@ -123,46 +141,55 @@ public function onProcessingFinished(OnProcessingFinished $event): void return; } - foreach ($this->batching as $subscriberId => ['subscriber' => $subscriber, 'subscription' => $subscription]) { - unset($this->batching[$subscriberId]); + foreach ($this->batchManager->all() as $batch) { + $subscriberId = $batch->subscription->id(); + $this->batchManager->remove($subscriberId); + + $flushMethod = $batch->accessor->metadata()->batch?->flushMethod; + + if ($flushMethod === null) { + $batch->subscription->changePosition($lastIndex); + + continue; + } $this->logger?->debug(sprintf( - 'Subscription Engine: Subscriber "%s" commits the batch.', + 'Subscription Engine: Subscriber "%s" flushes the batch.', $subscriberId, )); try { - $subscriber->commitBatch(); - $subscription->changePosition($lastIndex); + $batch->accessor->subscriber()->$flushMethod($batch->state); + $batch->subscription->changePosition($lastIndex); } catch (Throwable $e) { $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" has an error in the commit batch method: %s', + 'Subscription Engine: Subscriber "%s" has an error in the flush method: %s', $subscriberId, $e->getMessage(), )); - $subscription->error($e); + $batch->subscription->error($e); $event->errors[] = new Error($subscriberId, $e->getMessage(), $e); } } } - private function shouldCommitBatch(Subscription $subscription): bool - { - return $this->batching[$subscription->id()]['subscriber']->forceCommit(); - } - public function onError(OnHandleMessageError $event): void { $subscriptionId = $event->subscription->id(); - if (!isset($this->batching[$subscriptionId])) { + if (!$this->batchManager->has($subscriptionId)) { return; } - $subscriber = $this->batching[$subscriptionId]['subscriber']; + $batch = $this->batchManager->get($subscriptionId); + $this->batchManager->remove($subscriptionId); + + $rollbackMethod = $batch->accessor->metadata()->batch?->rollbackMethod; - unset($this->batching[$subscriptionId]); + if ($rollbackMethod === null) { + return; + } $this->logger?->debug(sprintf( 'Subscription Engine: Subscriber "%s" rollback the batch.', @@ -170,7 +197,7 @@ public function onError(OnHandleMessageError $event): void )); try { - $subscriber->rollbackBatch(); + $batch->accessor->subscriber()->$rollbackMethod($batch->state); } catch (Throwable $e) { $this->logger?->error(sprintf( 'Subscription Engine: Subscriber "%s" has an error in the rollback batch method: %s', @@ -180,6 +207,23 @@ public function onError(OnHandleMessageError $event): void } } + private function shouldFlush(Batch $batch): bool + { + $metadata = $batch->accessor->metadata()->batch; + + if ($metadata?->afterMessages !== null && $batch->count >= $metadata->afterMessages) { + return true; + } + + $shouldFlushMethod = $metadata?->shouldFlushMethod; + + if ($shouldFlushMethod === null) { + return false; + } + + return $batch->accessor->subscriber()->$shouldFlushMethod($batch->state) === true; + } + /** @return array */ public static function getSubscribedEvents(): array { diff --git a/src/Subscription/Engine/Listener/FailSubscriber.php b/src/Subscription/Engine/Listener/FailSubscriber.php index 48c7b63f1..5b48516a4 100644 --- a/src/Subscription/Engine/Listener/FailSubscriber.php +++ b/src/Subscription/Engine/Listener/FailSubscriber.php @@ -7,7 +7,6 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageError; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; @@ -48,7 +47,7 @@ public function handleFailed( return; } - if ($subscriber->subscriber() instanceof BatchableSubscriber) { + if ($subscriber->metadata()->batch !== null) { $subscription->failed($throwable); $this->subscriptionManager->update($subscription); diff --git a/src/Subscription/Engine/MessageProcessor.php b/src/Subscription/Engine/MessageProcessor.php index c24bb5fcf..9709b9de6 100644 --- a/src/Subscription/Engine/MessageProcessor.php +++ b/src/Subscription/Engine/MessageProcessor.php @@ -5,25 +5,54 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; +use Patchlevel\EventSourcing\Metadata\Subscriber\SubscribeMethodMetadata; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessage; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageError; use Patchlevel\EventSourcing\Subscription\Engine\Event\OnHandleMessageSuccess; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\RecordedOnArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Throwable; +use function array_merge; +use function array_values; +use function is_array; +use function iterator_to_array; use function sprintf; /** @internal */ final class MessageProcessor { + /** @var list */ + private readonly array $argumentResolvers; + + /** @var array>>> */ + private array $resolverCache = []; + + /** @param iterable|list $argumentResolvers */ public function __construct( private readonly SubscriberAccessorRepository $subscriberRepository, private readonly EventDispatcherInterface $eventDispatcher, + iterable $argumentResolvers = [], private readonly LoggerInterface|null $logger = null, ) { + $this->argumentResolvers = array_merge( + // the check for array is required before PHP 8.2 + array_values(is_array($argumentResolvers) ? $argumentResolvers : iterator_to_array($argumentResolvers)), + [ + new MessageArgumentResolver(), + new EventArgumentResolver(), + new RecordedOnArgumentResolver(), + ], + ); } public function process(int $index, Message $message, Subscription $subscription): Error|null @@ -91,8 +120,18 @@ public function process(int $index, Message $message, Subscription $subscription } try { + $context = new ArgumentResolverContext($message, $subscription, $subscriber->metadata()); + foreach ($subscribeMethods as $subscribeMethod) { - $subscribeMethod($message, $subscription); + $arguments = $this->resolveArguments( + $subscription->id(), + $message->event()::class, + $subscriber->subscriber()::class, + $subscribeMethod, + $context, + ); + + $subscriber->subscriber()->{$subscribeMethod->name}(...$arguments); } } catch (Throwable $e) { $this->logger?->error( @@ -144,4 +183,72 @@ public function process(int $index, Message $message, Subscription $subscription return null; } + + /** + * @param class-string $eventClass + * @param class-string $subscriberClass + * + * @return list + */ + private function resolveArguments( + string $subscriptionId, + string $eventClass, + string $subscriberClass, + SubscribeMethodMetadata $method, + ArgumentResolverContext $context, + ): array { + $resolvers = $this->resolversFor($subscriptionId, $eventClass, $subscriberClass, $method); + + $arguments = []; + + foreach ($method->arguments as $position => $argument) { + $arguments[] = $resolvers[$position]->resolve($argument, $context); + } + + return $arguments; + } + + /** + * @param class-string $eventClass + * @param class-string $subscriberClass + * + * @return list + */ + private function resolversFor( + string $subscriptionId, + string $eventClass, + string $subscriberClass, + SubscribeMethodMetadata $method, + ): array { + if (isset($this->resolverCache[$subscriptionId][$eventClass][$method->name])) { + return $this->resolverCache[$subscriptionId][$eventClass][$method->name]; + } + + $resolvers = []; + + foreach ($method->arguments as $argument) { + $resolvers[] = $this->resolverFor($argument, $eventClass, $subscriberClass, $method); + } + + return $this->resolverCache[$subscriptionId][$eventClass][$method->name] = $resolvers; + } + + /** + * @param class-string $eventClass + * @param class-string $subscriberClass + */ + private function resolverFor( + ArgumentMetadata $argument, + string $eventClass, + string $subscriberClass, + SubscribeMethodMetadata $method, + ): ArgumentResolver { + foreach ($this->argumentResolvers as $resolver) { + if ($resolver->support($argument, $eventClass)) { + return $resolver; + } + } + + throw new NoSuitableResolver($subscriberClass, $method->name, $argument->name); + } } diff --git a/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php new file mode 100644 index 000000000..661b86478 --- /dev/null +++ b/src/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolver.php @@ -0,0 +1,27 @@ +batchManager->get($context->subscription->id())->state; + } + + public function support(ArgumentMetadata $argument, string $eventClass): bool + { + return $argument->hasAttribute(BatchState::class); + } +} diff --git a/src/Subscription/Subscriber/Batch.php b/src/Subscription/Subscriber/Batch.php new file mode 100644 index 000000000..a61fbfb0b --- /dev/null +++ b/src/Subscription/Subscriber/Batch.php @@ -0,0 +1,21 @@ + $accessor */ + public function __construct( + public readonly Subscription $subscription, + public readonly MetadataSubscriberAccessor $accessor, + public readonly object $state, + ) { + } +} diff --git a/src/Subscription/Subscriber/BatchManager.php b/src/Subscription/Subscriber/BatchManager.php new file mode 100644 index 000000000..d6e01242b --- /dev/null +++ b/src/Subscription/Subscriber/BatchManager.php @@ -0,0 +1,49 @@ + */ + private array $batches = []; + + public function has(string $subscriptionId): bool + { + return array_key_exists($subscriptionId, $this->batches); + } + + public function get(string $subscriptionId): Batch + { + if (!array_key_exists($subscriptionId, $this->batches)) { + throw new BatchNotFound($subscriptionId); + } + + return $this->batches[$subscriptionId]; + } + + public function add(Batch $batch): void + { + $this->batches[$batch->subscription->id()] = $batch; + } + + public function remove(string $subscriptionId): void + { + unset($this->batches[$subscriptionId]); + } + + /** @return list */ + public function all(): array + { + return array_values($this->batches); + } + + public function clear(): void + { + $this->batches = []; + } +} diff --git a/src/Subscription/Subscriber/BatchNotFound.php b/src/Subscription/Subscriber/BatchNotFound.php new file mode 100644 index 000000000..fd716dfb2 --- /dev/null +++ b/src/Subscription/Subscriber/BatchNotFound.php @@ -0,0 +1,17 @@ +> */ + /** @var array> */ private array $subscribeCache = []; - /** - * @param T $subscriber - * @param list $argumentResolvers - */ + /** @param T $subscriber */ public function __construct( private readonly object $subscriber, private readonly SubscriberMetadata $metadata, - private readonly array $argumentResolvers, ) { } @@ -101,7 +93,7 @@ public function events(): array /** * @param class-string $eventClass * - * @return list + * @return list */ public function subscribeMethods(string $eventClass): array { @@ -119,61 +111,6 @@ public function subscribeMethods(string $eventClass): array $methods[] = $this->metadata->subscribeMethods[Subscribe::ALL]; } - $this->subscribeCache[$eventClass] = array_map( - fn (SubscribeMethodMetadata $method): Closure => $this->createClosure($eventClass, $method), - $methods, - ); - - return $this->subscribeCache[$eventClass]; - } - - /** - * @param class-string $eventClass - * - * @return Closure(Message, Subscription):void - */ - private function createClosure(string $eventClass, SubscribeMethodMetadata $method): Closure - { - $resolvers = $this->resolvers($eventClass, $method); - $methodName = $method->name; - - return function (Message $message, Subscription $subscription) use ($methodName, $resolvers): void { - $arguments = []; - - foreach ($resolvers as $resolver) { - $arguments[] = $resolver($message, $subscription); - } - - $this->subscriber->$methodName(...$arguments); - }; - } - - /** - * @param class-string $eventClass - * - * @return list - */ - private function resolvers(string $eventClass, SubscribeMethodMetadata $method): array - { - $resolvers = []; - $metadata = $this->metadata; - - foreach ($method->arguments as $argument) { - foreach ($this->argumentResolvers as $resolver) { - if (!$resolver->support($argument, $eventClass)) { - continue; - } - - $resolvers[] = static function (Message $message, Subscription $subscription) use ($resolver, $argument, $metadata): mixed { - return $resolver->resolve($argument, new ArgumentResolverContext($message, $subscription, $metadata)); - }; - - continue 2; - } - - throw new NoSuitableResolver($this->subscriber::class, $method->name, $argument->name); - } - - return $resolvers; + return $this->subscribeCache[$eventClass] = $methods; } } diff --git a/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php b/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php index 88fe173f6..cba34b27e 100644 --- a/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php +++ b/src/Subscription/Subscriber/MetadataSubscriberAccessorRepository.php @@ -6,43 +6,20 @@ use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadataFactory; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\RecordedOnArgumentResolver; use function array_key_exists; -use function array_merge; use function array_values; -use function is_array; -use function iterator_to_array; final class MetadataSubscriberAccessorRepository implements SubscriberAccessorRepository { /** @var array */ private array $subscribersMap = []; - /** @var list $argumentResolvers */ - private readonly array $argumentResolvers; - - /** - * @param iterable $subscribers - * @param iterable|list $argumentResolvers - */ + /** @param iterable $subscribers */ public function __construct( private readonly iterable $subscribers, private readonly SubscriberMetadataFactory $metadataFactory = new AttributeSubscriberMetadataFactory(), - iterable $argumentResolvers = [], ) { - $this->argumentResolvers = array_merge( - // the check for array is required before PHP 8.2 - array_values(is_array($argumentResolvers) ? $argumentResolvers : iterator_to_array($argumentResolvers)), - [ - new MessageArgumentResolver(), - new EventArgumentResolver(), - new RecordedOnArgumentResolver(), - ], - ); } /** @return iterable */ @@ -75,7 +52,6 @@ private function subscriberAccessorMap(): array $this->subscribersMap[$metadata->id] = new MetadataSubscriberAccessor( $subscriber, $metadata, - $this->argumentResolvers, ); } diff --git a/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php b/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php index 3b3cd8b76..a364b0578 100644 --- a/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php +++ b/tests/Benchmark/BasicImplementation/Projection/BatchProfileProjector.php @@ -5,22 +5,22 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Projection; use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Attribute\Teardown; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Events\ProfileCreated; #[Projector(self::SUBSCRIBER_ID)] -final class BatchProfileProjector implements BatchableSubscriber +final class BatchProfileProjector { private const SUBSCRIBER_ID = 'profile'; - /** @var array */ - private array $nameChanged = []; - public function __construct( private Connection $connection, ) { @@ -51,9 +51,12 @@ public function onProfileCreated(ProfileCreated $profileCreated): void } #[Subscribe(NameChanged::class)] - public function onNameChanged(NameChanged $nameChanged): void - { - $this->nameChanged[$nameChanged->profileId->toString()] = $nameChanged->name; + public function onNameChanged( + NameChanged $nameChanged, + #[BatchState] + BatchProfileState $state, + ): void { + $state->nameChanged[$nameChanged->profileId->toString()] = $nameChanged->name; } public function table(): string @@ -61,35 +64,29 @@ public function table(): string return 'projection_' . self::SUBSCRIBER_ID; } - public function beginBatch(): void - { - $this->nameChanged = []; - } - - public function commitBatch(): void + #[BatchBegin] + public function beginBatch(): BatchProfileState { - try { - $this->connection->transactional(function (): void { - foreach ($this->nameChanged as $profileId => $name) { - $this->connection->update( - $this->table(), - ['name' => $name], - ['id' => $profileId], - ); - } - }); - } finally { - $this->nameChanged = []; - } + return new BatchProfileState(); } - public function rollbackBatch(): void + #[BatchFlush] + public function flush(BatchProfileState $state): void { - $this->nameChanged = []; + $this->connection->transactional(function () use ($state): void { + foreach ($state->nameChanged as $profileId => $name) { + $this->connection->update( + $this->table(), + ['name' => $name], + ['id' => $profileId], + ); + } + }); } - public function forceCommit(): bool + #[BatchRollback] + public function rollbackBatch(BatchProfileState $state): void { - return false; + $state->nameChanged = []; } } diff --git a/tests/Benchmark/BasicImplementation/Projection/BatchProfileState.php b/tests/Benchmark/BasicImplementation/Projection/BatchProfileState.php new file mode 100644 index 000000000..f22ac4dff --- /dev/null +++ b/tests/Benchmark/BasicImplementation/Projection/BatchProfileState.php @@ -0,0 +1,11 @@ + */ + public array $nameChanged = []; +} diff --git a/tests/Integration/Subscription/Subscriber/BatchProfileProjection.php b/tests/Integration/Subscription/Subscriber/BatchProfileProjection.php new file mode 100644 index 000000000..ea9b14669 --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/BatchProfileProjection.php @@ -0,0 +1,119 @@ +tableName()); + $table->addColumn('id', 'string')->setLength(36); + $table->addColumn('name', 'string')->setLength(255); + $table->setPrimaryKey(['id']); + + $this->connection->createSchemaManager()->createTable($table); + } + + #[Teardown] + public function drop(): void + { + $this->connection->createSchemaManager()->dropTable($this->tableName()); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated( + ProfileCreated $event, + #[BatchState] + BatchProfileState $state, + ): void { + $this->connection->insert( + $this->tableName(), + [ + 'id' => $event->profileId->toString(), + 'name' => $event->name, + ], + ); + + $state->insertedIds[] = $event->profileId->toString(); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged( + NameChanged $event, + #[BatchState] + BatchProfileState $state, + ): void { + if ($event->name === self::POISON) { + throw new RuntimeException('poisoned event'); + } + + $this->connection->update( + $this->tableName(), + ['name' => $event->name], + ['id' => $event->profileId->toString()], + ); + } + + private function tableName(): string + { + return 'projection_' . self::SUBSCRIBER_ID; + } + + #[BatchBegin] + public function beginBatch(): BatchProfileState + { + $this->beginCount++; + $this->connection->beginTransaction(); + + return new BatchProfileState(); + } + + #[BatchFlush] + public function flush(BatchProfileState $state): void + { + $this->flushCount++; + $this->connection->commit(); + } + + #[BatchRollback] + public function rollbackBatch(BatchProfileState $state): void + { + $this->rollbackCount++; + $this->connection->rollBack(); + } +} diff --git a/tests/Integration/Subscription/Subscriber/BatchProfileState.php b/tests/Integration/Subscription/Subscriber/BatchProfileState.php new file mode 100644 index 000000000..6e3f2269a --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/BatchProfileState.php @@ -0,0 +1,11 @@ + */ + public array $insertedIds = []; +} diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjection.php b/tests/Integration/Subscription/Subscriber/ProfileProjection.php index 1ca44af33..30c76d1d8 100644 --- a/tests/Integration/Subscription/Subscriber/ProfileProjection.php +++ b/tests/Integration/Subscription/Subscriber/ProfileProjection.php @@ -6,15 +6,17 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Schema\Table; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Attribute\Teardown; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated; #[Projector(self::SUBSCRIBER_ID)] -final class ProfileProjection implements BatchableSubscriber +final class ProfileProjection { private const SUBSCRIBER_ID = 'profile_1'; @@ -57,23 +59,21 @@ private function tableName(): string return 'projection_' . self::SUBSCRIBER_ID; } + #[BatchBegin] public function beginBatch(): void { $this->connection->beginTransaction(); } - public function commitBatch(): void + #[BatchFlush] + public function flush(): void { $this->connection->commit(); } + #[BatchRollback] public function rollbackBatch(): void { $this->connection->rollBack(); } - - public function forceCommit(): bool - { - return false; - } } diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php index 4aeed5d7e..c8d23f26c 100644 --- a/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php +++ b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php @@ -7,16 +7,18 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Schema\Table; use Generator; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\Cleanup; use Patchlevel\EventSourcing\Attribute\Projector; use Patchlevel\EventSourcing\Attribute\Setup; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DropTableTask; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated; #[Projector('profile_1')] -final class ProfileProjectionWithCleanup implements BatchableSubscriber +final class ProfileProjectionWithCleanup { private const TABLE_NAME = 'profile_1'; @@ -59,23 +61,21 @@ private function tableName(): string return 'projection_' . self::TABLE_NAME; } + #[BatchBegin] public function beginBatch(): void { $this->connection->beginTransaction(); } - public function commitBatch(): void + #[BatchFlush] + public function flush(): void { $this->connection->commit(); } + #[BatchRollback] public function rollbackBatch(): void { $this->connection->rollBack(); } - - public function forceCommit(): bool - { - return false; - } } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 2dd800487..567e2d6d9 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -52,6 +52,7 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\DbalManager; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\BatchProfileProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerWithSelfRecoverySubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\LookupSubscriber; @@ -1412,18 +1413,18 @@ public function testLookup(): void [ new LookupSubscriber($this->projectionConnection), ], - argumentResolvers: [ - new LookupResolver( - $store, - $eventRegistry, - ), - ], ); $engine = new DefaultSubscriptionEngine( new StoreMessageLoader($store), $subscriptionStore, $subscriberRepository, + argumentResolvers: [ + new LookupResolver( + $store, + $eventRegistry, + ), + ], ); $result = $engine->execute(new SetupCommand()); @@ -1509,9 +1510,6 @@ public function testEventEmitter(): void new NotificationEmittingProjection(), $collector, ], - argumentResolvers: [ - new EventEmitterResolver($store), - ], ); $eventDispatcher = new EventDispatcher(); @@ -1525,6 +1523,9 @@ public function testEventEmitter(): void $subscriptionStore, $subscriberRepository, eventDispatcher: $eventDispatcher, + argumentResolvers: [ + new EventEmitterResolver($store), + ], ); $engine->execute(new SetupCommand()); @@ -1620,6 +1621,199 @@ class { self::assertEquals(RunMode::FromNow, $subscriptions[0]->runMode()); } + public function testBatch(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $projection = new BatchProfileProjection($this->projectionConnection); + $subscriberRepository = new MetadataSubscriberAccessorRepository([$projection]); + + $engine = new DefaultSubscriptionEngine( + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + $subscriptionStore, + $subscriberRepository, + ); + + $result = $engine->execute(new SetupCommand()); + self::assertEquals([], $result->errors); + + $result = $engine->execute(new Boot()); + self::assertProcessedMessages(0, $result); + self::assertEquals([], $result->errors); + + $aliceId = ProfileId::generate(); + $bobId = ProfileId::generate(); + $charlieId = ProfileId::generate(); + + $repository->save(Profile::create($aliceId, 'Alice')); + $repository->save(Profile::create($bobId, 'Bob')); + $repository->save(Profile::create($charlieId, 'Charlie')); + + $result = $engine->execute(new Run()); + + self::assertProcessedMessages(3, $result); + self::assertEquals([], $result->errors); + + // all three events were processed in a single batch: one begin, one flush, no rollback + self::assertSame(1, $projection->beginCount); + self::assertSame(1, $projection->flushCount); + self::assertSame(0, $projection->rollbackCount); + + self::assertEquals( + [ + new Subscription( + 'batch_profile', + 'projector', + RunMode::FromBeginning, + Status::Active, + 3, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $aliceRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$aliceId->toString()], + ); + + self::assertIsArray($aliceRow); + self::assertSame('Alice', $aliceRow['name']); + + $bobRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$bobId->toString()], + ); + + self::assertIsArray($bobRow); + self::assertSame('Bob', $bobRow['name']); + + $charlieRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$charlieId->toString()], + ); + + self::assertIsArray($charlieRow); + self::assertSame('Charlie', $charlieRow['name']); + } + + public function testBatchRollback(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $projection = new BatchProfileProjection($this->projectionConnection); + $subscriberRepository = new MetadataSubscriberAccessorRepository([$projection]); + + $engine = new DefaultSubscriptionEngine( + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + $subscriptionStore, + $subscriberRepository, + ); + + $engine->execute(new SetupCommand()); + $engine->execute(new Boot()); + + // first batch commits successfully + $aliceId = ProfileId::generate(); + $repository->save(Profile::create($aliceId, 'Alice')); + + $result = $engine->execute(new Run()); + + self::assertProcessedMessages(1, $result); + self::assertEquals([], $result->errors); + self::assertSame(1, $projection->flushCount); + + // second batch inserts Bob and then hits a poisoned event, which rolls the batch back + $bobId = ProfileId::generate(); + $bob = Profile::create($bobId, 'Bob'); + $bob->changeName(BatchProfileProjection::POISON); + $repository->save($bob); + + $result = $engine->execute(new Run()); + + self::assertCount(1, $result->errors); + self::assertSame(2, $projection->beginCount); + self::assertSame(1, $projection->flushCount); + self::assertSame(1, $projection->rollbackCount); + + $subscription = self::findSubscription($engine->subscriptions(), 'batch_profile'); + + // the position stays at the last successfully committed event + self::assertEquals(Status::Error, $subscription->status()); + self::assertEquals(1, $subscription->position()); + + // Alice (committed in the first batch) survives, Bob's insert was rolled back + $aliceRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$aliceId->toString()], + ); + + self::assertIsArray($aliceRow); + self::assertSame('Alice', $aliceRow['name']); + + $bobRow = $this->projectionConnection->fetchAssociative( + 'SELECT * FROM projection_batch_profile WHERE id = ?', + [$bobId->toString()], + ); + + self::assertFalse($bobRow); + } + /** @phpstan-assert ProcessedResult $result */ private static function assertProcessedMessages(int $expected, Result $result): void { diff --git a/tests/Unit/Fixture/AfterMessagesBatchingSubscriber.php b/tests/Unit/Fixture/AfterMessagesBatchingSubscriber.php new file mode 100644 index 000000000..2543606ff --- /dev/null +++ b/tests/Unit/Fixture/AfterMessagesBatchingSubscriber.php @@ -0,0 +1,55 @@ + */ + public array $receivedMessages = []; + + /** @var list number of messages contained in each flushed batch */ + public array $flushedBatchSizes = []; + + public int $beginBatchCalled = 0; + public int $flushCalled = 0; + + #[BatchBegin] + public function begin(): BatchingState + { + $this->beginBatchCalled++; + + return new BatchingState(); + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + BatchingState $state, + ): void { + $state->messages[] = $message; + $this->receivedMessages[] = $message; + } + + #[BatchFlush(afterMessages: 2)] + public function flush(BatchingState $state): void + { + $this->flushCalled++; + $this->flushedBatchSizes[] = count($state->messages); + } +} diff --git a/tests/Unit/Fixture/BatchingState.php b/tests/Unit/Fixture/BatchingState.php new file mode 100644 index 000000000..783567e9e --- /dev/null +++ b/tests/Unit/Fixture/BatchingState.php @@ -0,0 +1,13 @@ + */ + public array $messages = []; +} diff --git a/tests/Unit/Fixture/BatchingSubscriber.php b/tests/Unit/Fixture/BatchingSubscriber.php index a216cc39b..960027f29 100644 --- a/tests/Unit/Fixture/BatchingSubscriber.php +++ b/tests/Unit/Fixture/BatchingSubscriber.php @@ -4,17 +4,21 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Fixture; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Attribute\Subscriber; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Subscription\RunMode; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Throwable; use function count; #[Subscriber(BatchingSubscriber::ID, RunMode::FromBeginning)] -final class BatchingSubscriber implements BatchableSubscriber +final class BatchingSubscriber { public const ID = 'test'; @@ -22,21 +26,37 @@ final class BatchingSubscriber implements BatchableSubscriber public array $receivedMessages = []; public int $beginBatchCalled = 0; - public int $commitBatchCalled = 0; - public int $rollbackBatchCalled = 0; + public int $flushCalled = 0; + public int $rollbackCalled = 0; public function __construct( public readonly Throwable|null $throwForMessage = null, public readonly Throwable|null $throwForBeginBatch = null, - public readonly Throwable|null $throwForCommitBatch = null, - public readonly Throwable|null $throwForRollbackBatch = null, - public readonly int $forceCommitAfterMessages = 1_000, + public readonly Throwable|null $throwForFlush = null, + public readonly Throwable|null $throwForRollback = null, + public readonly int $flushAfterMessages = 1_000, ) { } - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void + #[BatchBegin] + public function begin(): BatchingState { + $this->beginBatchCalled++; + + if ($this->throwForBeginBatch !== null) { + throw $this->throwForBeginBatch; + } + + return new BatchingState(); + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + BatchingState $state, + ): void { + $state->messages[] = $message; $this->receivedMessages[] = $message; if ($this->throwForMessage !== null) { @@ -44,35 +64,29 @@ public function handle(Message $message): void } } - public function beginBatch(): void + #[BatchFlush] + public function flush(BatchingState $state): void { - $this->beginBatchCalled++; + $this->flushCalled++; - if ($this->throwForBeginBatch !== null) { - throw $this->throwForBeginBatch; + if ($this->throwForFlush !== null) { + throw $this->throwForFlush; } } - public function commitBatch(): void + #[BatchShouldFlush] + public function shouldFlush(BatchingState $state): bool { - $this->commitBatchCalled++; - - if ($this->throwForCommitBatch !== null) { - throw $this->throwForCommitBatch; - } + return $this->flushAfterMessages <= count($state->messages); } - public function rollbackBatch(): void + #[BatchRollback] + public function rollback(BatchingState $state): void { - $this->rollbackBatchCalled++; + $this->rollbackCalled++; - if ($this->throwForRollbackBatch !== null) { - throw $this->throwForRollbackBatch; + if ($this->throwForRollback !== null) { + throw $this->throwForRollback; } } - - public function forceCommit(): bool - { - return $this->forceCommitAfterMessages <= count($this->receivedMessages); - } } diff --git a/tests/Unit/Fixture/DefaultStateBatchingSubscriber.php b/tests/Unit/Fixture/DefaultStateBatchingSubscriber.php new file mode 100644 index 000000000..6272a9165 --- /dev/null +++ b/tests/Unit/Fixture/DefaultStateBatchingSubscriber.php @@ -0,0 +1,37 @@ +receivedState = $state; + } + + #[BatchFlush] + public function flush(object $state): void + { + $this->flushedState = $state; + } +} diff --git a/tests/Unit/Fixture/VoidBeginBatchingSubscriber.php b/tests/Unit/Fixture/VoidBeginBatchingSubscriber.php new file mode 100644 index 000000000..d59b0d160 --- /dev/null +++ b/tests/Unit/Fixture/VoidBeginBatchingSubscriber.php @@ -0,0 +1,43 @@ +beginCalled = true; + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + object $state, + ): void { + $this->receivedState = $state; + } + + #[BatchFlush] + public function flush(object $state): void + { + } +} diff --git a/tests/Unit/Metadata/Subscriber/ArgumentMetadataTest.php b/tests/Unit/Metadata/Subscriber/ArgumentMetadataTest.php new file mode 100644 index 000000000..dfb52cc1a --- /dev/null +++ b/tests/Unit/Metadata/Subscriber/ArgumentMetadataTest.php @@ -0,0 +1,37 @@ +hasAttribute(BatchState::class)); + } + + public function testHasAttributeReturnsFalseForDifferentAttribute(): void + { + $argument = new ArgumentMetadata('state', Type::object(BatchState::class), [new BatchState()]); + + self::assertFalse($argument->hasAttribute(Subscribe::class)); + } + + public function testHasAttributeReturnsFalseWithoutAttributes(): void + { + $argument = new ArgumentMetadata('event', Type::object(BatchState::class)); + + self::assertFalse($argument->hasAttribute(BatchState::class)); + } +} diff --git a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php index a7d239f4a..a31c05d62 100644 --- a/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php @@ -4,6 +4,11 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Metadata\Subscriber; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; +use Patchlevel\EventSourcing\Attribute\BatchShouldFlush; +use Patchlevel\EventSourcing\Attribute\BatchState; use Patchlevel\EventSourcing\Attribute\DisableEventEmitting; use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot; use Patchlevel\EventSourcing\Attribute\Processor; @@ -16,16 +21,23 @@ use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentTypeNotSupported; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; +use Patchlevel\EventSourcing\Metadata\Subscriber\BatchMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\ClassIsNotASubscriber; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateBeginBatchMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateFlushMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateRollbackBatchMethod; use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateSetupMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateShouldFlushMethod; use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateSubscribeMethod; use Patchlevel\EventSourcing\Metadata\Subscriber\DuplicateTeardownMethod; +use Patchlevel\EventSourcing\Metadata\Subscriber\IncompleteBatchMethods; use Patchlevel\EventSourcing\Metadata\Subscriber\SubscribeMethodMetadata; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; +use stdClass; use Symfony\Component\TypeInfo\Type; #[CoversClass(AttributeSubscriberMetadataFactory::class)] @@ -360,4 +372,281 @@ public function drop2(): void $metadataFactory = new AttributeSubscriberMetadataFactory(); $metadataFactory->metadata($subscriber::class); } + + public function testBatchMetadata(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function profileVisited(ProfileVisited $event, #[BatchState] + object $state,): void + { + } + + #[BatchBegin] + public function begin(): object + { + return new stdClass(); + } + + #[BatchFlush(afterMessages: 100)] + public function flush(object $state): void + { + } + + #[BatchShouldFlush] + public function shouldFlush(object $state): bool + { + return false; + } + + #[BatchRollback] + public function rollback(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals( + new BatchMetadata('flush', 'begin', 'shouldFlush', 'rollback', 100), + $metadata->batch, + ); + + self::assertTrue($metadata->subscribeMethods[ProfileVisited::class]->arguments[1]->hasAttribute(BatchState::class)); + self::assertFalse($metadata->subscribeMethods[ProfileVisited::class]->arguments[0]->hasAttribute(BatchState::class)); + } + + public function testBatchMinimalMetadata(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin(): object + { + return new stdClass(); + } + + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals(new BatchMetadata('flush', 'begin'), $metadata->batch); + } + + public function testBatchOnlyFlushMethod(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals(new BatchMetadata('flush'), $metadata->batch); + } + + public function testBatchOnlyShouldFlushMethodWithoutFlush(): void + { + $this->expectException(IncompleteBatchMethods::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchShouldFlush] + public function shouldFlush(object $state): bool + { + return false; + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testBatchOnlyRollbackMethodWithoutFlush(): void + { + $this->expectException(IncompleteBatchMethods::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchRollback] + public function rollback(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testBatchWithoutBeginMethod(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function profileVisited(ProfileVisited $event, #[BatchState] + object $state,): void + { + } + + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadata = $metadataFactory->metadata($subscriber::class); + + self::assertEquals(new BatchMetadata('flush'), $metadata->batch); + self::assertNull($metadata->batch?->beginMethod); + } + + public function testNoBatchMetadata(): void + { + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + + self::assertNull($metadataFactory->metadata($subscriber::class)->batch); + } + + public function testBatchArgumentWithoutLifecycleMethods(): void + { + $this->expectException(IncompleteBatchMethods::class); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function profileVisited(ProfileVisited $event, #[BatchState] + object $state,): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testBatchWithoutFlushMethod(): void + { + $this->expectException(IncompleteBatchMethods::class); + $this->expectExceptionMessage('uses batching but does not define a method marked with the #[BatchFlush] attribute'); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin(): object + { + return new stdClass(); + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateBeginBatchException(): void + { + $this->expectException(DuplicateBeginBatchMethod::class); + $this->expectExceptionMessage('have been marked as "begin batch" methods'); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin1(): object + { + return new stdClass(); + } + + #[BatchBegin] + public function begin2(): object + { + return new stdClass(); + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateFlushException(): void + { + $this->expectException(DuplicateFlushMethod::class); + $this->expectExceptionMessage('have been marked as "flush" methods'); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchFlush] + public function flush1(object $state): void + { + } + + #[BatchFlush] + public function flush2(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateShouldFlushException(): void + { + $this->expectException(DuplicateShouldFlushMethod::class); + $this->expectExceptionMessage('have been marked as "should flush" methods'); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchShouldFlush] + public function shouldFlush1(object $state): bool + { + return false; + } + + #[BatchShouldFlush] + public function shouldFlush2(object $state): bool + { + return false; + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } + + public function testDuplicateRollbackBatchException(): void + { + $this->expectException(DuplicateRollbackBatchMethod::class); + $this->expectExceptionMessage('have been marked as "rollback batch" methods'); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + #[BatchRollback] + public function rollback1(object $state): void + { + } + + #[BatchRollback] + public function rollback2(object $state): void + { + } + }; + + $metadataFactory = new AttributeSubscriberMetadataFactory(); + $metadataFactory->metadata($subscriber::class); + } } diff --git a/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php index 2a9987e35..c91cf5c2f 100644 --- a/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/BootHandlerTest.php @@ -4,6 +4,9 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine\Handler; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\OnFailed; use Patchlevel\EventSourcing\Attribute\RetryStrategy as RetryStrategyName; use Patchlevel\EventSourcing\Attribute\Subscribe; @@ -22,7 +25,6 @@ use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategyRepository; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Subscription\SubscriptionError; @@ -34,6 +36,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use stdClass; use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(BootHandler::class)] @@ -58,7 +61,7 @@ private function createHandler( $eventDispatcher->addSubscriber(new RetrySubscriber($subscriptionManager, $subscriberRepository, $retryStrategyRepository, new NullLogger())); $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [], new NullLogger()); return new BootHandler($messageLoader, $subscriptionManager, $subscriberRepository, $messageProcessor, $eventDispatcher, new NullLogger()); } @@ -330,7 +333,7 @@ public function testBootWithErrorAndRecoveryFailedBecauseBatching(): void $subscriptionId = 'test'; $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] #[RetryStrategyName('no_retry')] - class implements BatchableSubscriber { + class { public function __construct( public readonly RuntimeException $exception = new RuntimeException('ERROR'), ) { @@ -347,21 +350,20 @@ public function onFailed(): void { } - public function beginBatch(): void - { - } - - public function commitBatch(): void + #[BatchBegin] + public function beginBatch(): object { + return new stdClass(); } - public function rollbackBatch(): void + #[BatchFlush] + public function flush(object $state): void { } - public function forceCommit(): bool + #[BatchRollback] + public function rollbackBatch(object $state): void { - return false; } }; diff --git a/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php b/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php index f7dd5b4e7..d16f5c406 100644 --- a/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php +++ b/tests/Unit/Subscription/Engine/Handler/RunHandlerTest.php @@ -64,7 +64,7 @@ private function createHandler( $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addListener(OnCommand::class, new DetachListener($subscriptionManager, $subscriberRepository, new NullLogger()), 32); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [], new NullLogger()); $handler = new RunHandler($messageLoader, $subscriptionManager, $messageProcessor, $eventDispatcher, new NullLogger()); diff --git a/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php b/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php index 2082c6841..5fc4f6333 100644 --- a/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php +++ b/tests/Unit/Subscription/Engine/Listener/BatchSubscriberTest.php @@ -4,6 +4,12 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine\Listener; +use InvalidArgumentException; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchState; +use Patchlevel\EventSourcing\Attribute\Subscribe; +use Patchlevel\EventSourcing\Attribute\Subscriber; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Stream; use Patchlevel\EventSourcing\Subscription\Engine\Command\Boot as BootCommand; @@ -23,18 +29,24 @@ use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategyRepository; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; +use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\BatchArgumentResolver; +use Patchlevel\EventSourcing\Subscription\Subscriber\BatchManager; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Subscription\SubscriptionError; use Patchlevel\EventSourcing\Subscription\ThrowableToErrorContextTransformer; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\AfterMessagesBatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Fixture\BatchingSubscriber; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\DefaultStateBatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\VoidBeginBatchingSubscriber; use Patchlevel\EventSourcing\Tests\Unit\Subscription\DummySubscriptionStore; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use stdClass; use Symfony\Component\EventDispatcher\EventDispatcher; #[CoversClass(BatchSubscriber::class)] @@ -54,12 +66,13 @@ private function createBootHandler( $subscriberRepository = new MetadataSubscriberAccessorRepository($subscribers); $subscriptionManager = new SubscriptionManager($store); $eventDispatcher = new EventDispatcher(); + $batchManager = new BatchManager(); - $eventDispatcher->addSubscriber(new BatchSubscriber($subscriberRepository, new NullLogger())); + $eventDispatcher->addSubscriber(new BatchSubscriber($batchManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addSubscriber(new RetrySubscriber($subscriptionManager, $subscriberRepository, $retryStrategyRepository, new NullLogger())); $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [new BatchArgumentResolver($batchManager)], new NullLogger()); return new BootHandler($messageLoader, $subscriptionManager, $subscriberRepository, $messageProcessor, $eventDispatcher, new NullLogger()); } @@ -82,13 +95,14 @@ private function createRunHandler( $subscriberRepository = new MetadataSubscriberAccessorRepository($subscribers); $subscriptionManager = new SubscriptionManager($store); $eventDispatcher = new EventDispatcher(); + $batchManager = new BatchManager(); - $eventDispatcher->addSubscriber(new BatchSubscriber($subscriberRepository, new NullLogger())); + $eventDispatcher->addSubscriber(new BatchSubscriber($batchManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addSubscriber(new RetrySubscriber($subscriptionManager, $subscriberRepository, $retryStrategyRepository, new NullLogger())); $eventDispatcher->addSubscriber(new FailSubscriber($subscriptionManager, $subscriberRepository, new NullLogger())); $eventDispatcher->addListener(OnCommand::class, new DetachListener($subscriptionManager, $subscriberRepository, new NullLogger()), 32); - $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, new NullLogger()); + $messageProcessor = new MessageProcessor($subscriberRepository, $eventDispatcher, [new BatchArgumentResolver($batchManager)], new NullLogger()); $handler = new RunHandler($messageLoader, $subscriptionManager, $messageProcessor, $eventDispatcher, new NullLogger()); @@ -133,14 +147,68 @@ public function testBootBatchingSuccess(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); + } + + public function testBootBatchingWithDefaultState(): void + { + $subscriber = new DefaultStateBatchingSubscriber(); + + $store = new DummySubscriptionStore([ + new Subscription( + $subscriber::ID, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + self::assertEquals([], $result->errors); + + self::assertInstanceOf(stdClass::class, $subscriber->receivedState); + self::assertSame($subscriber->receivedState, $subscriber->flushedState); + } + + public function testBootBatchingWithVoidBeginUsesDefaultState(): void + { + $subscriber = new VoidBeginBatchingSubscriber(); + + $store = new DummySubscriptionStore([ + new Subscription( + $subscriber::ID, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + self::assertEquals([], $result->errors); + + self::assertTrue($subscriber->beginCalled); + self::assertInstanceOf(stdClass::class, $subscriber->receivedState); } public function testBootBatchingSuccessForceCommit(): void { $subscriber = new BatchingSubscriber( - forceCommitAfterMessages: 1, + flushAfterMessages: 1, ); $store = new DummySubscriptionStore([ @@ -181,8 +249,8 @@ public function testBootBatchingSuccessForceCommit(): void self::assertSame([$message1, $message2], $subscriber->receivedMessages); self::assertSame(2, $subscriber->beginBatchCalled); - self::assertSame(2, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(2, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testBootBatchingWithHandleError(): void @@ -235,8 +303,8 @@ public function testBootBatchingWithHandleError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); } public function testBootBatchingWithBeginBatchError(): void @@ -289,8 +357,8 @@ public function testBootBatchingWithBeginBatchError(): void self::assertSame([], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testBootBatchingWithCommitBatchError(): void @@ -298,7 +366,7 @@ public function testBootBatchingWithCommitBatchError(): void $exception = new RuntimeException('ERROR'); $subscriber = new BatchingSubscriber( - throwForCommitBatch: $exception, + throwForFlush: $exception, ); $store = new DummySubscriptionStore([ @@ -343,8 +411,8 @@ public function testBootBatchingWithCommitBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testBootBatchingWithRollbackBatchError(): void @@ -353,7 +421,7 @@ public function testBootBatchingWithRollbackBatchError(): void $subscriber = new BatchingSubscriber( throwForMessage: $exception, - throwForRollbackBatch: new RuntimeException('ERROR'), + throwForRollback: new RuntimeException('ERROR'), ); $store = new DummySubscriptionStore([ @@ -398,8 +466,94 @@ public function testBootBatchingWithRollbackBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); + } + + public function testBootBatchingFlushesAfterMessageThreshold(): void + { + $subscriber = new AfterMessagesBatchingSubscriber(); + + $store = new DummySubscriptionStore([ + new Subscription( + $subscriber::ID, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([ + 1 => $message1, + 2 => $message2, + 3 => $message3, + ])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + self::assertEquals(3, $result->processedMessages); + self::assertEquals(true, $result->finished); + self::assertEquals([], $result->errors); + + // first batch flushes once the threshold of 2 is reached, the third + // message opens a new batch that is flushed when processing finishes + self::assertSame([$message1, $message2, $message3], $subscriber->receivedMessages); + self::assertSame(2, $subscriber->beginBatchCalled); + self::assertSame(2, $subscriber->flushCalled); + self::assertSame([2, 1], $subscriber->flushedBatchSizes); + } + + public function testBootBatchingWithNonObjectBeginStateFails(): void + { + $subscriber = new #[Subscriber('non-object', RunMode::FromBeginning)] + class { + #[BatchBegin] + public function begin(): string + { + return 'not-an-object'; + } + + #[Subscribe(ProfileVisited::class)] + public function handle( + Message $message, + #[BatchState] + object $state, + ): void { + } + + #[BatchFlush] + public function flush(object $state): void + { + } + }; + + $store = new DummySubscriptionStore([ + new Subscription( + 'non-object', + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $messageLoader = $this->createMock(MessageLoader::class); + $messageLoader->expects($this->once())->method('load')->with(null)->willReturn(new Stream([1 => $message])); + + $handler = $this->createBootHandler($messageLoader, $store, [$subscriber]); + $result = $handler(new BootCommand()); + + $error = $result->errors[0]; + self::assertEquals('non-object', $error->subscriptionId); + self::assertStringContainsString('begin batch method must return an object or null', $error->message); + self::assertInstanceOf(InvalidArgumentException::class, $error->throwable); } public function testRunningBatchingSuccess(): void @@ -441,14 +595,14 @@ public function testRunningBatchingSuccess(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingSuccessForceCommit(): void { $subscriber = new BatchingSubscriber( - forceCommitAfterMessages: 1, + flushAfterMessages: 1, ); $store = new DummySubscriptionStore([ @@ -490,8 +644,8 @@ public function testRunningBatchingSuccessForceCommit(): void self::assertSame([$message1, $message2], $subscriber->receivedMessages); self::assertSame(2, $subscriber->beginBatchCalled); - self::assertSame(2, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(2, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingWithHandleError(): void @@ -545,8 +699,8 @@ public function testRunningBatchingWithHandleError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); } public function testRunningBatchingWithBeginBatchError(): void @@ -600,8 +754,8 @@ public function testRunningBatchingWithBeginBatchError(): void self::assertSame([], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingWithCommitBatchError(): void @@ -609,7 +763,7 @@ public function testRunningBatchingWithCommitBatchError(): void $exception = new RuntimeException('ERROR'); $subscriber = new BatchingSubscriber( - throwForCommitBatch: $exception, + throwForFlush: $exception, ); $store = new DummySubscriptionStore([ @@ -655,8 +809,8 @@ public function testRunningBatchingWithCommitBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(1, $subscriber->commitBatchCalled); - self::assertSame(0, $subscriber->rollbackBatchCalled); + self::assertSame(1, $subscriber->flushCalled); + self::assertSame(0, $subscriber->rollbackCalled); } public function testRunningBatchingWithRollbackBatchError(): void @@ -665,7 +819,7 @@ public function testRunningBatchingWithRollbackBatchError(): void $subscriber = new BatchingSubscriber( throwForMessage: $exception, - throwForRollbackBatch: new RuntimeException('ERROR'), + throwForRollback: new RuntimeException('ERROR'), ); $store = new DummySubscriptionStore([ @@ -711,7 +865,7 @@ public function testRunningBatchingWithRollbackBatchError(): void self::assertSame([$message], $subscriber->receivedMessages); self::assertSame(1, $subscriber->beginBatchCalled); - self::assertSame(0, $subscriber->commitBatchCalled); - self::assertSame(1, $subscriber->rollbackBatchCalled); + self::assertSame(0, $subscriber->flushCalled); + self::assertSame(1, $subscriber->rollbackCalled); } } diff --git a/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php b/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php index 0cca7387f..409f90b97 100644 --- a/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php +++ b/tests/Unit/Subscription/Engine/Listener/FailSubscriberTest.php @@ -4,6 +4,9 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine\Listener; +use Patchlevel\EventSourcing\Attribute\BatchBegin; +use Patchlevel\EventSourcing\Attribute\BatchFlush; +use Patchlevel\EventSourcing\Attribute\BatchRollback; use Patchlevel\EventSourcing\Attribute\OnFailed; use Patchlevel\EventSourcing\Attribute\RetryStrategy as RetryStrategyName; use Patchlevel\EventSourcing\Attribute\Subscribe; @@ -14,7 +17,6 @@ use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; -use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Subscription\SubscriptionError; @@ -26,6 +28,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\NullLogger; use RuntimeException; +use stdClass; #[CoversClass(FailSubscriber::class)] final class FailSubscriberTest extends TestCase @@ -95,13 +98,13 @@ public function testFailsSubscriptionWhenNoSubscriberFound(): void ); } - public function testFailsSubscriptionForBatchableSubscriber(): void + public function testFailsSubscriptionForBatchingSubscriber(): void { $exception = new RuntimeException('ERROR'); $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] #[RetryStrategyName('no_retry')] - class implements BatchableSubscriber { + class { #[Subscribe(ProfileVisited::class)] public function handle(): void { @@ -112,21 +115,20 @@ public function onFailed(): void { } - public function beginBatch(): void - { - } - - public function commitBatch(): void + #[BatchBegin] + public function beginBatch(): object { + return new stdClass(); } - public function rollbackBatch(): void + #[BatchFlush] + public function flush(object $state): void { } - public function forceCommit(): bool + #[BatchRollback] + public function rollbackBatch(object $state): void { - return false; } }; diff --git a/tests/Unit/Subscription/Engine/MessageProcessorTest.php b/tests/Unit/Subscription/Engine/MessageProcessorTest.php index 9d61a54b7..9f2b60061 100644 --- a/tests/Unit/Subscription/Engine/MessageProcessorTest.php +++ b/tests/Unit/Subscription/Engine/MessageProcessorTest.php @@ -11,6 +11,7 @@ use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; +use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; @@ -38,6 +39,7 @@ public function onProfileVisited(ProfileVisited $event): void $processor = new MessageProcessor( new MetadataSubscriberAccessorRepository([$subscriber]), new EventDispatcher(), + [], new NullLogger(), ); @@ -51,4 +53,92 @@ public function onProfileVisited(ProfileVisited $event): void self::assertSame($event, $subscriber->event); self::assertSame(1, $subscription->position()); } + + public function testProcessResolvesMessageArgument(): void + { + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(Message $message): void + { + $this->message = $message; + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + [], + new NullLogger(), + ); + + $message = Message::create(new ProfileVisited(ProfileId::fromString('1'))); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNull($error); + self::assertSame($message, $subscriber->message); + } + + public function testProcessResolvesMultipleArguments(): void + { + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public ProfileVisited|null $event = null; + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(ProfileVisited $event, Message $message): void + { + $this->event = $event; + $this->message = $message; + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + [], + new NullLogger(), + ); + + $event = new ProfileVisited(ProfileId::fromString('1')); + $message = Message::create($event); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNull($error); + self::assertSame($event, $subscriber->event); + self::assertSame($message, $subscriber->message); + } + + public function testProcessReturnsErrorWhenArgumentCannotBeResolved(): void + { + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(ProfileVisited $event, int $unresolved): void + { + } + }; + + $processor = new MessageProcessor( + new MetadataSubscriberAccessorRepository([$subscriber]), + new EventDispatcher(), + [], + new NullLogger(), + ); + + $message = Message::create(new ProfileVisited(ProfileId::fromString('1'))); + $subscription = new Subscription('test', status: Status::Active); + + $error = $processor->process(1, $message, $subscription); + + self::assertNotNull($error); + self::assertInstanceOf(NoSuitableResolver::class, $error->throwable); + } } diff --git a/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php b/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php new file mode 100644 index 000000000..f55fc60cf --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/ArgumentResolver/BatchArgumentResolverTest.php @@ -0,0 +1,80 @@ +support( + new ArgumentMetadata('foo', Type::object(stdClass::class), [new BatchState()]), + ProfileVisited::class, + ), + ); + + self::assertFalse( + $resolver->support( + new ArgumentMetadata('foo', Type::object(stdClass::class)), + ProfileVisited::class, + ), + ); + } + + public function testResolve(): void + { + $state = new stdClass(); + + $subscriber = new #[Subscriber('foo', RunMode::FromBeginning)] + class { + }; + + $batch = new Batch( + new Subscription('foo'), + new MetadataSubscriberAccessor( + $subscriber, + (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), + ), + $state, + ); + + $batchManager = new BatchManager(); + $batchManager->add($batch); + + $resolver = new BatchArgumentResolver($batchManager); + $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); + + self::assertSame( + $state, + $resolver->resolve( + new ArgumentMetadata('foo', Type::object(stdClass::class), [new BatchState()]), + new ArgumentResolverContext($message, new Subscription('foo'), new SubscriberMetadata('foo')), + ), + ); + } +} diff --git a/tests/Unit/Subscription/Subscriber/BatchManagerTest.php b/tests/Unit/Subscription/Subscriber/BatchManagerTest.php new file mode 100644 index 000000000..8604d2dd5 --- /dev/null +++ b/tests/Unit/Subscription/Subscriber/BatchManagerTest.php @@ -0,0 +1,88 @@ +metadata($subscriber::class), + ), + new stdClass(), + ); + } + + public function testAddGetHasRemove(): void + { + $manager = new BatchManager(); + $batch = $this->batch('foo'); + + self::assertFalse($manager->has('foo')); + + $manager->add($batch); + + self::assertTrue($manager->has('foo')); + self::assertSame($batch, $manager->get('foo')); + + $manager->remove('foo'); + + self::assertFalse($manager->has('foo')); + } + + public function testGetMissing(): void + { + $this->expectException(BatchNotFound::class); + $this->expectExceptionMessage('No batch found for subscription "foo".'); + + (new BatchManager())->get('foo'); + } + + public function testAll(): void + { + $manager = new BatchManager(); + $foo = $this->batch('foo'); + $bar = $this->batch('bar'); + + $manager->add($foo); + $manager->add($bar); + + self::assertSame([$foo, $bar], $manager->all()); + } + + public function testClear(): void + { + $manager = new BatchManager(); + $manager->add($this->batch('foo')); + $manager->add($this->batch('bar')); + + $manager->clear(); + + self::assertFalse($manager->has('foo')); + self::assertFalse($manager->has('bar')); + self::assertSame([], $manager->all()); + } +} diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php index 3c27b8dec..b80aa9bc5 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorRepositoryTest.php @@ -4,12 +4,9 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Subscriber; -use ArrayIterator; use Patchlevel\EventSourcing\Attribute\Subscriber; -use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Subscription\RunMode; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\DuplicateSubscriberId; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -36,68 +33,20 @@ class { }; $metadataFactory = new AttributeSubscriberMetadataFactory(); - $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed - { - return null; - } - - public function support(ArgumentMetadata $argument, string $eventClass): bool - { - return false; - } - }; - $repository = new MetadataSubscriberAccessorRepository( [$subscriber], $metadataFactory, - [$customResolver], ); $accessor = new MetadataSubscriberAccessor( $subscriber, $metadataFactory->metadata($subscriber::class), - [ - $customResolver, - new ArgumentResolver\MessageArgumentResolver(), - new ArgumentResolver\EventArgumentResolver(), - new ArgumentResolver\RecordedOnArgumentResolver(), - ], ); self::assertEquals([$accessor], $repository->all()); self::assertEquals($accessor, $repository->get('foo')); } - public function testArgumentResolversCanBeArraysAndIterators(): void - { - $customResolver = new class implements ArgumentResolver\ArgumentResolver { - public function resolve(ArgumentMetadata $argument, ArgumentResolver\ArgumentResolverContext $context): mixed - { - return null; - } - - public function support(ArgumentMetadata $argument, string $eventClass): bool - { - return false; - } - }; - - $repository = new MetadataSubscriberAccessorRepository( - [], - new AttributeSubscriberMetadataFactory(), - [$customResolver], - ); - - $repository2 = new MetadataSubscriberAccessorRepository( - [], - new AttributeSubscriberMetadataFactory(), - new ArrayIterator([$customResolver]), - ); - - self::assertEquals($repository, $repository2); - } - public function testDuplicateSubscriberId(): void { $this->expectException(DuplicateSubscriberId::class); diff --git a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php index 4ce898a2a..9f3e5a424 100644 --- a/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php +++ b/tests/Unit/Subscription/Subscriber/MetadataSubscriberAccessorTest.php @@ -11,12 +11,7 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory; use Patchlevel\EventSourcing\Subscription\RunMode; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventArgumentResolver; -use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\MessageArgumentResolver; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; -use Patchlevel\EventSourcing\Subscription\Subscriber\NoSuitableResolver; -use Patchlevel\EventSourcing\Subscription\Subscription; -use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; @@ -24,122 +19,54 @@ #[CoversClass(MetadataSubscriberAccessor::class)] final class MetadataSubscriberAccessorTest extends TestCase { - public function testSubscribeMethod(): void + /** @return MetadataSubscriberAccessor */ + private function accessor(object $subscriber): MetadataSubscriberAccessor { - $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] - class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function onProfileVisited(Message $message): void - { - $this->message = $message; - } - }; - - $accessor = new MetadataSubscriberAccessor( + return new MetadataSubscriberAccessor( $subscriber, (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [ - new MessageArgumentResolver(), - ], ); - - $result = $accessor->subscribeMethods(ProfileVisited::class); - - self::assertArrayHasKey(0, $result); - - $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - - $result[0]($message, new Subscription('profile')); - - self::assertSame($message, $subscriber->message); } - public function testSubscribeAllMethod(): void + public function testSubscribeMethod(): void { $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] class { - public Message|null $message = null; - - #[Subscribe('*')] - public function on(Message $message): void + #[Subscribe(ProfileVisited::class)] + public function onProfileVisited(Message $message): void { - $this->message = $message; } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [ - new MessageArgumentResolver(), - ], - ); - - $result = $accessor->subscribeMethods(ProfileVisited::class); + $result = $this->accessor($subscriber)->subscribeMethods(ProfileVisited::class); self::assertArrayHasKey(0, $result); - - $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - - $result[0]($message, new Subscription('profile')); - - self::assertSame($message, $subscriber->message); + self::assertSame('onProfileVisited', $result[0]->name); } - public function testNoResolver(): void + public function testSubscribeAllMethod(): void { - $this->expectException(NoSuitableResolver::class); - $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] class { - #[Subscribe(ProfileVisited::class)] + #[Subscribe('*')] public function on(Message $message): void { } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); + $result = $this->accessor($subscriber)->subscribeMethods(ProfileVisited::class); - $accessor->subscribeMethods(ProfileVisited::class); + self::assertArrayHasKey(0, $result); + self::assertSame('on', $result[0]->name); } - public function testMultipleResolver(): void + public function testNoSubscribeMethod(): void { $subscriber = new #[Subscriber('profile', RunMode::FromBeginning)] class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function on(Message $message): void - { - $this->message = $message; - } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [ - new EventArgumentResolver(), - new MessageArgumentResolver(), - ], - ); - - $result = $accessor->subscribeMethods(ProfileVisited::class); - - self::assertArrayHasKey(0, $result); - - $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - - $result[0]($message, new Subscription('profile')); - - self::assertSame($message, $subscriber->message); + self::assertSame([], $this->accessor($subscriber)->subscribeMethods(ProfileVisited::class)); } public function testSetupMethod(): void @@ -152,15 +79,7 @@ public function method(): void } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->setupMethod(); - - self::assertEquals($subscriber->method(...), $result); + self::assertEquals($subscriber->method(...), $this->accessor($subscriber)->setupMethod()); } public function testNotSetupMethod(): void @@ -169,15 +88,7 @@ public function testNotSetupMethod(): void class { }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->setupMethod(); - - self::assertNull($result); + self::assertNull($this->accessor($subscriber)->setupMethod()); } public function testTeardownMethod(): void @@ -190,15 +101,7 @@ public function method(): void } }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->teardownMethod(); - - self::assertEquals($subscriber->method(...), $result); + self::assertEquals($subscriber->method(...), $this->accessor($subscriber)->teardownMethod()); } public function testNotTeardownMethod(): void @@ -207,15 +110,7 @@ public function testNotTeardownMethod(): void class { }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - $result = $accessor->teardownMethod(); - - self::assertNull($result); + self::assertNull($this->accessor($subscriber)->teardownMethod()); } public function testRealSubscriber(): void @@ -224,12 +119,6 @@ public function testRealSubscriber(): void class { }; - $accessor = new MetadataSubscriberAccessor( - $subscriber, - (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class), - [], - ); - - self::assertEquals($subscriber, $accessor->subscriber()); + self::assertEquals($subscriber, $this->accessor($subscriber)->subscriber()); } }