Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions docs/UPGRADE-4.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,136 @@ final class CustomResolver implements ArgumentResolver
}
```

### Custom ArgumentResolver registration

Custom argument resolvers are no longer passed to the `MetadataSubscriberAccessorRepository`.
They are now passed to the `DefaultSubscriptionEngine`, which forwards them to the message processor.

Before:

```php
$subscriberRepository = new MetadataSubscriberAccessorRepository(
[new MySubscriber()],
argumentResolvers: [new MyResolver()],
);

$engine = new DefaultSubscriptionEngine(
$messageLoader,
$subscriptionStore,
$subscriberRepository,
);
```

After:

```php
$subscriberRepository = new MetadataSubscriberAccessorRepository(
[new MySubscriber()],
);

$engine = new DefaultSubscriptionEngine(
$messageLoader,
$subscriptionStore,
$subscriberRepository,
argumentResolvers: [new MyResolver()],
);
```

### Batchable Subscriber

The `Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber` interface has been removed.
Batching is now configured with attributes and the subscriber stays stateless: the data you collect
during a batch lives in a state object that the engine creates, keeps and hands back to your methods.

* `beginBatch()` becomes a `#[BatchBegin]` method that returns the state object.
* `commitBatch()` becomes a `#[BatchFlush]` method that receives the state object. The batch size is now
configured on the attribute (`#[BatchFlush(afterMessages: 1000)]`).
* `rollbackBatch()` becomes a `#[BatchRollback]` method that receives the state object (optional).
* `forceCommit()` becomes a `#[BatchShouldFlush]` method that receives the state object (optional).
* The handler receives the state object through a parameter marked with `#[BatchState]`.

Before:

```php
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;

#[Projector('profile_1')]
final class MigrationSubscriber implements BatchableSubscriber
{
/** @var array<string, string> */
private array $nameChanged = [];

#[Subscribe(NameChanged::class)]
public function handleNameChanged(NameChanged $event): void
{
$this->nameChanged[$event->userId] = $event->name;
}

public function beginBatch(): void
{
$this->nameChanged = [];
}

public function commitBatch(): void
{
// ... persist $this->nameChanged
$this->nameChanged = [];
}

public function rollbackBatch(): void
{
}

public function forceCommit(): bool
{
return count($this->nameChanged) > 1000;
}
}
```

After:

```php
use Patchlevel\EventSourcing\Attribute\BatchState;
use Patchlevel\EventSourcing\Attribute\BatchBegin;
use Patchlevel\EventSourcing\Attribute\BatchFlush;
use Patchlevel\EventSourcing\Attribute\BatchRollback;
use Patchlevel\EventSourcing\Attribute\BatchShouldFlush;

final class MigrationBatch
{
/** @var array<string, string> */
public array $nameChanged = [];
}

#[Projector('profile_1')]
final class MigrationSubscriber
{
#[BatchBegin]
public function beginBatch(): MigrationBatch
{
return new MigrationBatch();
}

#[Subscribe(NameChanged::class)]
public function handleNameChanged(NameChanged $event, #[BatchState] MigrationBatch $batch): void
{
$batch->nameChanged[$event->userId] = $event->name;
}

#[BatchFlush(afterMessages: 1000)]
public function flush(MigrationBatch $batch): void
{
// ... persist $batch->nameChanged
}

#[BatchRollback]
public function rollback(MigrationBatch $batch): void
{
}
}
```

## Store

### StreamStore
Expand Down
119 changes: 72 additions & 47 deletions docs/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,23 @@ final class PublicProfileProjection
More information can be found in the [reducer](message.md#reducer) documentation.
:::

The lookup resolver is not registered by default. Pass a `LookupResolver` to the subscription engine
via the `argumentResolvers` argument; it needs the event store to read the previous messages from:

```php
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver;

$engine = new DefaultSubscriptionEngine(
$messageLoader,
$subscriptionStore,
$subscriberAccessorRepository,
argumentResolvers: [
new LookupResolver($store),
],
);
```

##### Recorded On Resolver

The recorded on resolver resolves the recorded on date.
Expand Down Expand Up @@ -341,28 +358,16 @@ $eventEmitter->linkTo('notifications', [new NotificationRequired($event->orderId
The emitted events must be registered like any other event so the store can (de)serialize them.
:::

To configure the resolver, pass an `EventEmitterResolver` to the subscriber accessor repository,
the same way as the [lookup resolver](#lookup-resolver). It needs the event store to append to:

```php
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;

$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository(
$subscribers,
argumentResolvers: [
new EventEmitterResolver($store),
],
);
```

To also clean up the subscription stream when a subscription is removed, register the
`RemoveSubscriptionStreamListener` on the event dispatcher and pass it to the engine:
To configure the resolver, pass an `EventEmitterResolver` to the subscription engine via the
`argumentResolvers` argument; it needs the event store to append to. To also clean up the
subscription stream when a subscription is removed, register the `RemoveSubscriptionStreamListener`
on an event dispatcher and pass that to the engine as well:

```php
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\Event\OnSubscriptionRemoved;
use Patchlevel\EventSourcing\Subscription\Engine\Listener\RemoveSubscriptionStreamListener;
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\EventEmitterResolver;
use Symfony\Component\EventDispatcher\EventDispatcher;

$eventDispatcher = new EventDispatcher();
Expand All @@ -376,6 +381,9 @@ $engine = new DefaultSubscriptionEngine(
$subscriptionStore,
$subscriberAccessorRepository,
eventDispatcher: $eventDispatcher,
argumentResolvers: [
new EventEmitterResolver($store),
],
);
```

Expand Down Expand Up @@ -752,90 +760,107 @@ For more information, see the [retry strategy](#retry-strategy) documentation.

You can also optimize the performance of your subscribers by processing a number of events in a batch.
This is particularly useful when projections need to be rebuilt.
To achieve this, you can implement the `BatchableSubscriber` interface.
To achieve this, you mark the relevant methods with the batch attributes.
The subscriber itself stays stateless: the `#[BatchBegin]` method returns a state object that the
engine keeps for you and hands back to the handler (via a `#[BatchState]` parameter) and to the
flush and rollback methods.

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\BatchState;
use Patchlevel\EventSourcing\Attribute\BatchBegin;
use Patchlevel\EventSourcing\Attribute\BatchFlush;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;
use Patchlevel\EventSourcing\Attribute\BatchRollback;
use Patchlevel\EventSourcing\Attribute\BatchShouldFlush;
use Patchlevel\EventSourcing\Attribute\Subscribe;

final class MigrationBatch
{
/** @var array<string, string> */
public array $nameChanged = [];
}

#[Projector('profile_1')]
final class MigrationSubscriber implements BatchableSubscriber
final class MigrationSubscriber
{
public function __construct(
private readonly Connection $connection,
) {
}

/** @var array<string, int> */
private array $nameChanged = [];

#[Subscribe(NameChanged::class)]
public function handleNameChanged(NameChanged $event): void
#[BatchBegin]
public function beginBatch(): MigrationBatch
{
$this->nameChanged[$event->userId] = $event->name;
$this->connection->beginTransaction();

return new MigrationBatch();
}

public function beginBatch(): void
#[Subscribe(NameChanged::class)]
public function handleNameChanged(NameChanged $event, #[BatchState] MigrationBatch $batch): void
{
$this->nameChanged = [];
$this->connection->beginTransaction();
$batch->nameChanged[$event->userId] = $event->name;
}

public function commitBatch(): void
#[BatchFlush(afterMessages: 1000)]
public function flush(MigrationBatch $batch): void
{
foreach ($this->nameChanged as $userId => $name) {
foreach ($batch->nameChanged as $userId => $name) {
$this->connection->executeStatement(
'UPDATE user SET name = :name WHERE id = :id',
['name' => $name, 'id' => $userId],
);
}

$this->connection->commit();
$this->nameChanged = [];
}

public function rollbackBatch(): void
#[BatchShouldFlush]
public function shouldFlush(MigrationBatch $batch): bool
{
$this->connection->rollBack();
return count($batch->nameChanged) > 1000;
}

public function forceCommit(): bool
#[BatchRollback]
public function rollback(MigrationBatch $batch): void
{
return count($this->nameChanged) > 1000;
$this->connection->rollBack();
}
}
```
This interface provides you with all the options you need to process your data collectively.

The `beginBatch` method is called as soon as a subscriber wants to process an event.
The `#[BatchBegin]` method is optional and called as soon as a subscriber wants to process an event.
If no suitable event is found in the stream, batching will not start, and this method will not be called.
Here, you can make all necessary preparations, such as opening a transaction or preparing variables.
Here, you can make all necessary preparations, such as opening a transaction, and optionally return the
state object that holds the data you collect during the batch.
If you do not define a `#[BatchBegin]` method, or it returns nothing, an empty `stdClass` object is used as the state.

The `commitBatch` method is called when batching was previously started, and one of the following conditions is met:
The `#[BatchFlush]` method is called when batching was previously started, and one of the following conditions is met:
Either the Subscription Engine reaches its limit, or the stream is finished.
Alternatively, if the subscriber explicitly indicates using the `forceCommit` method that they want to process the data now.
You can also pass `afterMessages` to the attribute (`#[BatchFlush(afterMessages: 1000)]`) to flush automatically
after that many messages, or implement a `#[BatchShouldFlush]` method for custom logic.
At this step, you must process all the data.

The `rollbackBatch` method is called when an error occurs and the batching needs to be aborted.
The `#[BatchRollback]` method is optional and called when an error occurs and the batching needs to be aborted.
Here, you can respond to the error and potentially perform a database rollback.

The method `forceCommit` is called after each handled event,
and you can decide whether the batch commit process should start now.
The `#[BatchShouldFlush]` method is optional and called after each handled event,
and you can decide whether the flush process should start now.
This helps to determine the batch size and thus avoid memory overflow.

:::danger
Make sure to fully process the data in `commitBatch` and close any open transactions.
Make sure to fully process the data in the `#[BatchFlush]` method and close any open transactions.
Otherwise, it may lead to inconsistent data.
:::

:::note
The position of the subscriber is only updated after a successful commit.
The position of the subscriber is only updated after a successful flush.
In case of an error, the position remains at the state before the batch started.
:::

:::tip
Use `forceCommit` to prevent memory leaks.
Use `#[BatchFlush(afterMessages: ...)]` or a `#[BatchShouldFlush]` method to prevent memory leaks.
This allows you to decide when it's suitable to process the data and then release the memory.
:::

Expand Down
2 changes: 1 addition & 1 deletion phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ parameters:
path: tests/Unit/Message/Translator/ReplaceEventTranslatorTest.php

-
message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:266\:\:profileVisited\(\) has parameter \$message with no type specified\.$#'
message: '#^Method class@anonymous/tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest\.php\:278\:\:profileVisited\(\) has parameter \$message with no type specified\.$#'
identifier: missingType.parameter
count: 1
path: tests/Unit/Metadata/Subscriber/AttributeSubscriberMetadataFactoryTest.php
Expand Down
12 changes: 12 additions & 0 deletions src/Attribute/BatchBegin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_METHOD)]
final class BatchBegin
{
}
16 changes: 16 additions & 0 deletions src/Attribute/BatchFlush.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_METHOD)]
final class BatchFlush
{
public function __construct(
public readonly int|null $afterMessages = null,
) {
}
}
12 changes: 12 additions & 0 deletions src/Attribute/BatchRollback.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_METHOD)]
final class BatchRollback
{
}
Loading
Loading