Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/UPGRADE-4.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,40 @@ and a `bool $allowsNull` property. Instead it now has a single `Symfony\Componen

If you implemented a custom `ArgumentResolver`, adjust it to read the type from the new `Type` object.

### ArgumentResolver

The `resolve` method of `Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver`
no longer receives the `Message` directly. It now receives an `ArgumentResolverContext` that bundles the
current `message`, `subscription` and `subscriber` metadata.

Before:

```php
final class CustomResolver implements ArgumentResolver
{
public function resolve(ArgumentMetadata $argument, Message $message): mixed
{
return $message->header(CustomHeader::class);
}

// ... support()
}
```

After:

```php
final class CustomResolver implements ArgumentResolver
{
public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed
{
return $context->message->header(CustomHeader::class);
}

// ... support()
}
```

## Store

### StreamStore
Expand Down
125 changes: 125 additions & 0 deletions docs/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,135 @@ final class DoStuffSubscriber
}
}
```
##### Event Emitter Resolver

A subscriber can not only build a projection or trigger a side effect, it can also emit new events
back into the store. This is useful when one subscription should kick off another one: for example a
projection that, after it has been updated, wants a notification subscription to send a mail.

If you type-hint an `EventEmitter` argument in a subscribe method, it is injected automatically.
By default the emitted events are written into a dedicated stream named `subscription_<subscription-id>`.

```php
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Subscriber\EventEmitter\EventEmitter;

#[Subscriber('order_projection', RunMode::FromBeginning)]
final class OrderProjection
{
#[Subscribe(OrderPlaced::class)]
public function onOrderPlaced(OrderPlaced $event, EventEmitter $eventEmitter): void
{
// update the projection ...

$eventEmitter->emit([new NotificationRequired($event->orderId)]);
}
}
```

The `emit` method writes the events into the subscriber's own `subscription_<subscription-id>` stream.
If you want to target a different stream, use `linkTo`:

```php
$eventEmitter->linkTo('notifications', [new NotificationRequired($event->orderId)]);
```

:::info
The emitted events must be registered like any other event so the store can (de)serialize them.
:::

To configure the resolver, pass an `EventEmitterResolver` to the subscriber accessor repository,
the same way as the [lookup resolver](#lookup-resolver). It needs the event store to append to:

```php
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;

$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository(
$subscribers,
argumentResolvers: [
new EventEmitterResolver($store),
],
);
```

To also clean up the subscription stream when a subscription is removed, register the
`RemoveSubscriptionStreamListener` on the event dispatcher and pass it to the engine:

```php
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved;
use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener;
use Symfony\Component\EventDispatcher\EventDispatcher;

$eventDispatcher = new EventDispatcher();
$eventDispatcher->addListener(
OnSubscriptionRemoved::class,
new RemoveSubscriptionStreamListener($store),
);

$engine = new DefaultSubscriptionEngine(
$messageLoader,
$subscriptionStore,
$subscriberAccessorRepository,
eventDispatcher: $eventDispatcher,
);
```

###### When are events emitted

Emitting events while a subscription is *booting* would create duplicates on every replay, so by
default events are only emitted during `run` and the emitter is a noop during `boot`. You can change
this per subscriber with two attributes:

```php
use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Subscription\RunMode;

#[Subscriber('order_projection', RunMode::FromBeginning)]
#[EnableEventEmittingDuringBoot]
final class OrderProjection
{
// ...
}
```

* `EnableEventEmittingDuringBoot` — events are also emitted during `boot`.
* `DisableEventEmitting` — events are never emitted, not even during `run`.

:::info
When a subscription is removed, its `subscription_<subscription-id>` stream is removed from the store as
well (as long as the `RemoveSubscriptionStreamListener` is registered).
:::

##### Custom Resolvers

You can provide your own argument resolvers by implementing the `ArgumentResolver` interface.
This can be useful for providing direct access to custom headers or other data.
The `resolve` method receives an `ArgumentResolverContext` that gives you access to the current
`message`, the `subscription` and the `subscriber` metadata.

```php
use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata;
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver;
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolverContext;

final class CustomResolver implements ArgumentResolver
{
public function resolve(ArgumentMetadata $argument, ArgumentResolverContext $context): mixed
{
return $context->message->header(CustomHeader::class);
}

public function support(ArgumentMetadata $argument, string $eventClass): bool
{
return $argument->type->isIdentifiedBy(CustomHeader::class);
}
}
```

### Setup

Expand Down
2 changes: 1 addition & 1 deletion phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ parameters:
path: tests/Unit/Message/Translator/ReplaceEventTranslatorTest.php

-
message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:234\:\:profileVisited\(\) has parameter \$message with no type specified\.$#'
message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:266\:\:profileVisited\(\) has parameter \$message with no type specified\.$#'
identifier: missingType.parameter
count: 1
path: tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php
Expand Down
12 changes: 12 additions & 0 deletions src/Attribute/DisableEventEmitting.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS)]
final class DisableEventEmitting
{
}
12 changes: 12 additions & 0 deletions src/Attribute/EnableEventEmittingDuringBoot.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS)]
final class EnableEventEmittingDuringBoot
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Patchlevel\EventSourcing\Metadata\Subscriber;

use Patchlevel\EventSourcing\Attribute\Cleanup;
use Patchlevel\EventSourcing\Attribute\DisableEventEmitting;
use Patchlevel\EventSourcing\Attribute\EnableEventEmittingDuringBoot;
use Patchlevel\EventSourcing\Attribute\OnFailed;
use Patchlevel\EventSourcing\Attribute\RetryStrategy;
use Patchlevel\EventSourcing\Attribute\Setup;
Expand Down Expand Up @@ -156,6 +158,8 @@ public function metadata(string $subscriber): SubscriberMetadata
$failedMethod,
$this->retryStrategy($reflector),
$cleanupMethod,
$reflector->getAttributes(EnableEventEmittingDuringBoot::class) !== [],
$reflector->getAttributes(DisableEventEmitting::class) !== [],
);

$this->subscriberMetadata[$subscriber] = $metadata;
Expand Down
2 changes: 2 additions & 0 deletions src/Metadata/Subscriber/SubscriberMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public function __construct(
public readonly string|null $failedMethod = null,
public readonly string|null $retryStrategy = null,
public readonly string|null $cleanupMethod = null,
public readonly bool $enableEventEmittingDuringBoot = false,
public readonly bool $disableEventEmitting = false,
) {
}
}
2 changes: 2 additions & 0 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public function __construct(
$this->subscriptionManager,
$this->subscriberRepository,
$cleanupRunner,
$this->eventDispatcher,
$this->logger,
),
Run::class => new RunHandler(
Expand All @@ -133,6 +134,7 @@ public function __construct(
$this->subscriptionManager,
$this->subscriberRepository,
$cleanupRunner,
$this->eventDispatcher,
$this->logger,
),
];
Expand Down
15 changes: 15 additions & 0 deletions src/Subscription/Engine/Event/OnSubscriptionRemoved.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine\Event;

use Patchlevel\EventSourcing\Subscription\Subscription;

final class OnSubscriptionRemoved
{
public function __construct(
public readonly Subscription $subscription,
) {
}
}
21 changes: 17 additions & 4 deletions src/Subscription/Engine/Handler/RemoveHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
use Patchlevel\EventSourcing\Subscription\Engine\Command\Command;
use Patchlevel\EventSourcing\Subscription\Engine\Command\Remove;
use Patchlevel\EventSourcing\Subscription\Engine\Error;
use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved;
use Patchlevel\EventSourcing\Subscription\Engine\Result;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionCollection;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager;
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Subscription;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Throwable;

use function sprintf;
Expand All @@ -29,6 +32,7 @@ public function __construct(
private readonly SubscriptionManager $subscriptionManager,
private readonly SubscriberAccessorRepository $subscriberRepository,
private readonly CleanupRunner $cleanupRunner,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly LoggerInterface|null $logger = null,
) {
}
Expand All @@ -46,7 +50,7 @@ function (SubscriptionCollection $subscriptions): Result {

foreach ($subscriptions as $subscription) {
if ($subscription->isNew()) {
$this->subscriptionManager->remove($subscription);
$this->remove($subscription);

$this->logger?->info(
sprintf(
Expand All @@ -65,13 +69,16 @@ function (SubscriptionCollection $subscriptions): Result {
$errors[] = $error;
}

// the cleanup runner removes the subscription (forced, even on error)
$this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription));

continue;
}

$subscriber = $this->subscriberRepository->get($subscription->id());

if (!$subscriber) {
$this->subscriptionManager->remove($subscription);
$this->remove($subscription);

$this->logger?->info(
sprintf(
Expand All @@ -86,7 +93,7 @@ function (SubscriptionCollection $subscriptions): Result {
$teardownMethod = $subscriber->teardownMethod();

if (!$teardownMethod) {
$this->subscriptionManager->remove($subscription);
$this->remove($subscription);

$this->logger?->info(
sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()),
Expand All @@ -113,7 +120,7 @@ function (SubscriptionCollection $subscriptions): Result {
);
}

$this->subscriptionManager->remove($subscription);
$this->remove($subscription);

$this->logger?->info(
sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()),
Expand All @@ -124,4 +131,10 @@ function (SubscriptionCollection $subscriptions): Result {
},
);
}

private function remove(Subscription $subscription): void
{
$this->subscriptionManager->remove($subscription);
$this->eventDispatcher->dispatch(new OnSubscriptionRemoved($subscription));
}
}
Loading
Loading