Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions compare.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package testkit

import (
"reflect"

"github.com/dogmatiq/dogma"
"google.golang.org/protobuf/proto"
"github.com/dogmatiq/testkit/internal/compare"
)

// A MessageComparator is a function that returns true if two messages are
Expand All @@ -16,9 +14,11 @@ type MessageComparator func(a, b dogma.Message) bool
// It is the default implementation of the MessageComparator type.
//
// It supports comparison of protocol buffers messages using the proto.Equal()
// function. All other types are compared using reflect.DeepEqual().
// function. All other types are compared using semantics equivalent to
// reflect.DeepEqual(), except that function values are compared by their
// definition site rather than by pointer identity.
func DefaultMessageComparator(a, b dogma.Message) bool {
return equal(a, b)
return compare.Equal(a, b)
}

// WithMessageComparator returns a test option that sets the comparator
Expand All @@ -32,14 +32,3 @@ func WithMessageComparator(c MessageComparator) TestOption {
t.predicateOptions.MessageComparator = c
})
}

// equal returns true if a and b are considered equal.
func equal(a, b any) bool {
if pa, ok := a.(proto.Message); ok {
if pb, ok := b.(proto.Message); ok {
return proto.Equal(pa, pb)
}
}

return reflect.DeepEqual(a, b)
}
129 changes: 82 additions & 47 deletions engine/internal/aggregate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dogmatiq/testkit/engine/internal/panicx"
"github.com/dogmatiq/testkit/envelope"
"github.com/dogmatiq/testkit/fact"
"github.com/dogmatiq/testkit/internal/compare"
"github.com/dogmatiq/testkit/internal/x/xreflect"
"github.com/dogmatiq/testkit/location"
)
Expand Down Expand Up @@ -73,7 +74,7 @@ func (c *Controller) Handle(
}

id := c.route(env, mt)
inst, root := c.instanceByID(obs, env, id)
inst, root, shadowRoot := c.instanceByID(obs, env, id)

s := &scope{
instanceID: id,
Expand All @@ -82,11 +83,24 @@ func (c *Controller) Handle(
observer: obs,
now: now,
root: root,
shadowRoot: shadowRoot,
command: env,
streamID: uuidpb.Derive(c.Config.Identity().GetKey(), id).AsString(),
offset: uint64(len(inst.history)),
}

if inst.snapshotted && !compare.Equal(root, shadowRoot) {
panic(panicx.UnexpectedBehavior{
Handler: c.Config,
Interface: "AggregateRoot",
Method: "UnmarshalBinary",
Implementation: root,
Message: env.Message,
Description: "aggregate root state differs when built from events versus snapshot",
Location: location.OfMethod(root, "UnmarshalBinary"),
})
}

panicx.EnrichUnexpectedMessage(
c.Config,
"AggregateMessageHandler",
Expand All @@ -102,12 +116,10 @@ func (c *Controller) Handle(
},
)

s.guardAgainstDirectMutation("", location.Location{})

if len(s.events) != 0 {
if c.instances == nil {
c.instances = map[string]*instance{}
}
inst.history = append(inst.history, s.events...)
c.instances[id] = inst
c.takeSnapshot(root, inst, env)
}

Expand Down Expand Up @@ -150,15 +162,18 @@ func (c *Controller) route(env *envelope.Envelope, mt message.Type) string {
return id
}

// instanceByID returns the instance and root for the given instance ID.
// instanceByID returns the instance, root, and shadow root for the given
// instance ID. The shadow root is built by replaying the full event history
// from New(), ignoring any snapshot.
func (c *Controller) instanceByID(
obs fact.Observer,
env *envelope.Envelope,
id string,
) (*instance, dogma.AggregateRoot) {
root := c.Config.Source.Get().New()
) (inst *instance, root, shadowRoot dogma.AggregateRoot) {
root = c.Config.Source.Get().New()
shadowRoot = c.Config.Source.Get().New()

if xreflect.IsNil(root) {
if xreflect.IsNil(root) || xreflect.IsNil(shadowRoot) {
panic(panicx.UnexpectedBehavior{
Handler: c.Config,
Interface: "AggregateMessageHandler",
Expand All @@ -170,53 +185,73 @@ func (c *Controller) instanceByID(
})
}

if inst, ok := c.instances[id]; ok {
if inst.snapshotted {
if err := root.UnmarshalBinary(inst.snapshot); err != nil {
panic(panicx.UnexpectedBehavior{
Handler: c.Config,
Interface: "AggregateRoot",
Method: "UnmarshalBinary",
Implementation: root,
Message: env.Message,
Description: fmt.Sprintf("unable to unmarshal the aggregate root: %s", err),
Location: location.OfMethod(root, "UnmarshalBinary"),
})
}
}

for _, ev := range inst.history[inst.snapshotOffset:] {
panicx.EnrichUnexpectedMessage(
c.Config,
"AggregateRoot",
"ApplyEvent",
root,
ev.Message,
func() {
root.ApplyEvent(
ev.Message.(dogma.Event),
)
},
)
}

obs.Notify(fact.AggregateInstanceLoaded{
inst, ok := c.instances[id]
if !ok {
obs.Notify(fact.AggregateInstanceNotFound{
Handler: c.Config,
InstanceID: id,
Root: root,
Envelope: env,
})

return inst, root
if c.instances == nil {
c.instances = map[string]*instance{}
}

inst = &instance{}
c.instances[id] = inst
Comment on lines +196 to +201

return inst, root, shadowRoot
}

for _, ev := range inst.history {
panicx.EnrichUnexpectedMessage(
c.Config,
"AggregateRoot",
"ApplyEvent",
shadowRoot,
ev.Message,
func() {
shadowRoot.ApplyEvent(ev.Message.(dogma.Event))
},
)
}

if inst.snapshotted {
if err := root.UnmarshalBinary(inst.snapshot); err != nil {
panic(panicx.UnexpectedBehavior{
Handler: c.Config,
Interface: "AggregateRoot",
Method: "UnmarshalBinary",
Implementation: root,
Message: env.Message,
Description: fmt.Sprintf("unable to unmarshal the aggregate root: %s", err),
Location: location.OfMethod(root, "UnmarshalBinary"),
})
}
}

for _, ev := range inst.history[inst.snapshotOffset:] {
panicx.EnrichUnexpectedMessage(
c.Config,
"AggregateRoot",
"ApplyEvent",
root,
ev.Message,
func() {
root.ApplyEvent(ev.Message.(dogma.Event))
},
)
}

obs.Notify(fact.AggregateInstanceNotFound{
Handler: c.Config,
InstanceID: id,
Envelope: env,
obs.Notify(fact.AggregateInstanceLoaded{
Handler: c.Config,
InstanceID: id,
Root: root,
Envelope: env,
SnapshotOffset: inst.snapshotOffset,
})

return &instance{}, root
return inst, root, shadowRoot
}

// takeSnapshot attempts to store a snapshot of the aggregate root.
Expand Down
Loading