Skip to content

Commit 3192f0e

Browse files
committed
clean up
1 parent dade621 commit 3192f0e

3 files changed

Lines changed: 18 additions & 134 deletions

File tree

event.go

Lines changed: 0 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -90,129 +90,3 @@ func Event(ctx context.Context, fn ManagerEventApplier, opts ...eo.Options) erro
9090
<-ctx.Done()
9191
return nil
9292
}
93-
94-
/*
95-
type ProcMgr struct {
96-
js nats.JetStreamContext
97-
nc *nats.Conn
98-
aggregate string
99-
subject string
100-
durable string
101-
handler ManagerEventApplier
102-
cfg *eo.Config
103-
}
104-
105-
// Event is a process manager that listens for events and depending on situaion emits command events.
106-
func Event(ctx context.Context, handler ManagerEventApplier, opts ...eo.Options) {
107-
cfg := &eo.Config{}
108-
109-
for _, opt := range opts {
110-
opt(cfg)
111-
}
112-
113-
if cfg.Aggregate == "" {
114-
panic("Aggregate name is required for command processor")
115-
}
116-
117-
subject := fmt.Sprintf(CommandsPrefix+".%s", cfg.Aggregate)
118-
if cfg.Subject != "" {
119-
subject = cfg.Subject
120-
}
121-
122-
nc, _ := Nats(ctx)
123-
js, _ := JetStream(ctx)
124-
cp := &ProcMgr{js: js, nc: nc, subject: subject, aggregate: cfg.Aggregate, durable: "default", handler: handler, cfg: cfg}
125-
c, cancel := context.WithCancel(ctx)
126-
go func() {
127-
for {
128-
err := cp.init(c, cancel)
129-
if err != nil {
130-
log.Printf("Error initializing command processor: %v", err)
131-
continue
132-
}
133-
}
134-
}()
135-
<-c.Done()
136-
}
137-
138-
func (cp *ProcMgr) init(ctx context.Context, cancel context.CancelFunc) error {
139-
140-
_, _ = cp.js.AddStream(&nats.StreamConfig{
141-
Name: commandsStream,
142-
Subjects: []string{CommandsPrefix + ".>"},
143-
Retention: nats.WorkQueuePolicy,
144-
Storage: nats.FileStorage,
145-
Replicas: 1,
146-
Duplicates: 5 * time.Minute,
147-
})
148-
149-
_, err := cp.js.AddConsumer(commandsStream, &nats.ConsumerConfig{
150-
Name: cp.aggregate + "_" + cp.durable + "_cmd",
151-
Durable: cp.aggregate + "_" + cp.durable + "_cmd",
152-
FilterSubject: cp.subject,
153-
AckPolicy: nats.AckExplicitPolicy,
154-
})
155-
if err != nil {
156-
return err
157-
}
158-
159-
sub, err := cp.js.PullSubscribe(cp.subject, cp.aggregate+"_"+cp.durable+"_cmd", nats.BindStream(commandsStream), nats.ManualAck(), nats.AckExplicit(), nats.DeliverAll())
160-
161-
if err != nil {
162-
log.Printf("Error subscribing to commands: %v", err)
163-
cancel()
164-
}
165-
defer sub.Unsubscribe()
166-
167-
for {
168-
if ctx.Err() != nil {
169-
cancel()
170-
return nil
171-
}
172-
msg, _ := sub.Fetch(1, nats.MaxWait(60*time.Second))
173-
if len(msg) == 0 {
174-
time.Sleep(50 * time.Millisecond)
175-
continue
176-
}
177-
for _, msg := range msg {
178-
var evt gen.EventEnvelope
179-
err := proto.Unmarshal(msg.Data, &evt)
180-
if err != nil {
181-
log.Printf("Error unmarshalling message: %v", err)
182-
msg.Ack()
183-
continue
184-
}
185-
186-
commands, err := cp.handler.Handle(&evt)
187-
if err != nil {
188-
msg.Ack()
189-
continue
190-
}
191-
192-
for _, command := range commands {
193-
if command.Aggregate == "" {
194-
command.Aggregate = evt.AggregateType
195-
}
196-
if command.AggregateId == "" {
197-
command.AggregateId = evt.AggregateId
198-
}
199-
eventSubject := fmt.Sprintf(CommandsPrefix+".%s.%s.%s", command.Aggregate, command.AggregateId, command.CommandType)
200-
if len(command.Parents) > 0 {
201-
var parents []string
202-
for _, parent := range command.Parents {
203-
parents = append(parents, fmt.Sprintf("%s.%s", parent.AggregateType, parent.AggregateId))
204-
}
205-
eventSubject = fmt.Sprintf("events.%s.%s.%s.%s", strings.Join(parents, "."), command.Aggregate, command.AggregateId, command.CommandType)
206-
}
207-
208-
b, _ := proto.Marshal(command)
209-
if _, err := cp.js.Publish(eventSubject, b); err != nil {
210-
log.Printf("Error publishing event %v", err)
211-
}
212-
}
213-
214-
_ = msg.Ack()
215-
}
216-
}
217-
}
218-
*/

projector.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bee
33
import (
44
"context"
55
"fmt"
6+
"log"
67

78
"github.com/blinkinglight/bee/gen"
89
"github.com/blinkinglight/bee/po"
@@ -44,7 +45,7 @@ func Project(ctx context.Context, fn EventApplier, opts ...po.Options) error {
4445
}
4546

4647
if cfg.Aggregate == "" {
47-
panic("aggregate is required for projection")
48+
return fmt.Errorf("aggregate is required for projection")
4849
}
4950

5051
if cfg.DurableName == "" {
@@ -60,9 +61,12 @@ func Project(ctx context.Context, fn EventApplier, opts ...po.Options) error {
6061
prefix = cfg.Prefix + "_"
6162
}
6263

63-
js, _ := JetStream(ctx)
64+
js, ok := JetStream(ctx)
65+
if !ok {
66+
return fmt.Errorf("JetStream not available in context")
67+
}
6468

65-
js.AddStream(&nats.StreamConfig{
69+
_, _ = js.AddStream(&nats.StreamConfig{
6670
Name: "EVENTS",
6771
Subjects: []string{EventsPrefix + ".>"},
6872
Storage: nats.FileStorage,
@@ -77,19 +81,24 @@ func Project(ctx context.Context, fn EventApplier, opts ...po.Options) error {
7781
}
7882
m := &gen.EventEnvelope{}
7983
if err := proto.Unmarshal(msg.Data, m); err != nil {
84+
log.Printf("Error unmarshalling event: aggregate=%s, aggregateID=%s, eventType=%s, error=%v",
85+
cfg.Aggregate, cfg.AggregateID, msg.Subject, err)
8086
msg.Ack()
8187
return
8288
}
8389

8490
if err := fn.ApplyEvent(m); err != nil {
91+
log.Printf("Error applying event: aggregate=%s, aggregateID=%s, eventType=%s, error=%v",
92+
m.AggregateType, m.AggregateId, m.EventType, err)
8593
msg.Ack()
8694
return
8795
}
8896
msg.Ack()
8997
}, nats.DeliverAll(), nats.ManualAck(), nats.Durable("events_"+prefix+cfg.DurableName), nats.BindStream("EVENTS"), nats.ConsumerName(cfg.DurableName))
9098
if err != nil {
9199
fmt.Printf("Error subscribing to events: %v\n", err)
92-
return err
100+
return fmt.Errorf("projector: failed to subscribe to subject %s: %w", cfg.Subject, err)
101+
93102
}
94103
defer sub.Unsubscribe()
95104

replay.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type ReplayHandler interface {
2727
// ro.WithStartSeq - start from event (if you have snapshot)
2828
// ro.WtihParent - nests subjects
2929
// ro.WithTimeout - timeout if no events for stream
30-
func Replay(ctx context.Context, fn ReplayHandler, opts ...ro.Options) {
30+
func Replay(ctx context.Context, fn ReplayHandler, opts ...ro.Options) error {
3131

3232
cfg := &ro.Config{
3333
StartSeq: DeliverAll,
@@ -65,17 +65,17 @@ func Replay(ctx context.Context, fn ReplayHandler, opts ...ro.Options) {
6565
}, opt, nats.ManualAck())
6666
if err != nil {
6767
cancel()
68-
return
68+
return fmt.Errorf("projector: failed to subscribe to subject %s: %w", subject, err)
6969
}
7070
num, _, err := sub.MaxPending()
7171
if err != nil {
7272
cancel()
73-
return
73+
return fmt.Errorf("projector: failed to get max pending for subject %s: %w", subject, err)
7474
}
7575
_ = num
7676
if num <= 0 {
7777
cancel()
78-
return
78+
return fmt.Errorf("projector: no events found for subject %s", subject)
7979
}
8080

8181
defer close(msgs)
@@ -109,6 +109,7 @@ func Replay(ctx context.Context, fn ReplayHandler, opts ...ro.Options) {
109109
}
110110
}()
111111
<-lctx.Done()
112+
return nil
112113
}
113114

114115
// ReplayAndSubscribe replays events for a given aggregate and aggregate ID,

0 commit comments

Comments
 (0)