Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 152 additions & 3 deletions abxbus-go/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,120 @@ func (b *EventBus) notifyBusHandlersChange(handler *EventHandler, registered boo
}
}

func (b *EventBus) On(event_pattern string, handler_name string, handler any, options *EventHandler) *EventHandler {
func (b *EventBus) On(args ...any) *EventHandler {
if err := b.rejectIfDestroyed("On"); err != nil {
panic(err)
}
event_name, handler_name, handler, options, err := parseOnArgs(args)
if err != nil {
panic(err)
}
if event_name == "" || event_name == "*" {
panic(`EventBus.On registers handlers for one concrete event name; use EventBus.OnEventName for wildcard event-name handlers`)
}
normalizedHandler, err := normalizeEventHandlerCallable(handler)
if err != nil {
panic(err)
}
return b.registerHandler(event_name, handler_name, normalizedHandler, options)
}

func parseOnArgs(args []any) (string, string, any, *EventHandler, error) {
if len(args) == 0 || len(args) > 4 {
return "", "", nil, nil, fmt.Errorf("EventBus.On expects handler, event name + handler, or event name + handler name + handler")
}
handlerName := "handler"
var eventName string
var handler any
var options *EventHandler
if first, ok := args[0].(string); ok {
eventName = first
if len(args) >= 2 {
if second, ok := args[1].(string); ok {
handlerName = second
if len(args) < 3 {
return "", "", nil, nil, fmt.Errorf("EventBus.On missing handler callback")
}
handler = args[2]
if len(args) == 4 {
parsedOptions, err := parseEventHandlerOptions(args[3])
if err != nil {
return "", "", nil, nil, err
}
options = parsedOptions
}
} else {
if len(args) > 3 {
return "", "", nil, nil, fmt.Errorf("EventBus.On exact-name form expects event name, handler, and optional *EventHandler")
}
handler = args[1]
if len(args) == 3 {
parsedOptions, err := parseEventHandlerOptions(args[2])
if err != nil {
return "", "", nil, nil, err
}
options = parsedOptions
}
}
}
} else {
if len(args) > 2 {
return "", "", nil, nil, fmt.Errorf("EventBus.On inferred form expects handler and optional *EventHandler")
}
handler = args[0]
inferred, err := inferEventTypeFromHandler(handler)
if err != nil {
return "", "", nil, nil, err
}
eventName = inferred
if len(args) >= 2 {
parsedOptions, err := parseEventHandlerOptions(args[1])
if err != nil {
return "", "", nil, nil, err
}
options = parsedOptions
}
}
if handler == nil {
return "", "", nil, nil, fmt.Errorf("EventBus.On missing handler callback")
}
return eventName, handlerName, handler, options, nil
}

func parseEventHandlerOptions(value any) (*EventHandler, error) {
if value == nil {
return nil, nil
}
options, ok := value.(*EventHandler)
if !ok {
return nil, fmt.Errorf("EventBus.On options must be *EventHandler, got %T", value)
}
return options, nil
}

func inferEventTypeFromHandler(handler any) (string, error) {
value := reflect.ValueOf(handler)
if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() {
return "", unsupportedHandlerSignatureError(handler)
}
handlerType := value.Type()
if handlerType.NumIn() == 0 {
return "", unsupportedHandlerSignatureError(handler)
}
payloadType := handlerType.In(0)
if payloadType == baseEventPointerType {
return "", fmt.Errorf("EventBus.On cannot infer an event type from *BaseEvent handlers; pass an exact event name or use a typed payload handler")
}
for payloadType.Kind() == reflect.Pointer {
payloadType = payloadType.Elem()
}
if payloadType.Kind() != reflect.Struct || payloadType.Name() == "" {
return "", fmt.Errorf("EventBus.On cannot infer an event type from handler payload %s; pass an exact event name", payloadType)
}
return payloadType.Name(), nil
}

func (b *EventBus) OnEventName(event_pattern string, handler_name string, handler any, options *EventHandler) *EventHandler {
if err := b.rejectIfDestroyed("On"); err != nil {
panic(err)
}
Expand All @@ -309,6 +422,10 @@ func (b *EventBus) On(event_pattern string, handler_name string, handler any, op
if err != nil {
panic(err)
}
return b.registerHandler(event_pattern, handler_name, normalizedHandler, options)
}

func (b *EventBus) registerHandler(event_pattern string, handler_name string, normalizedHandler EventHandlerCallable, options *EventHandler) *EventHandler {
h := NewEventHandler(b.Name, b.ID, event_pattern, handler_name, normalizedHandler)
explicitID := false
if options != nil {
Expand Down Expand Up @@ -1704,7 +1821,23 @@ func (b *EventBus) eventMatchesEquals(event *BaseEvent, equals map[string]any) b
return eventMatchesEquals(event, equals)
}

func (b *EventBus) Find(event_pattern string, where func(event *BaseEvent) bool, options *FindOptions) (*BaseEvent, error) {
func (b *EventBus) Find(input any, where any, options *FindOptions) (*BaseEvent, error) {
event, err := baseEventFromAny(input)
if err != nil {
return nil, err
}
matches, err := normalizeTypedFindPredicate(where)
if err != nil {
return nil, err
}
return b.findEventName(event.EventType, matches, options)
}

func (b *EventBus) FindEventName(event_pattern string, where func(event *BaseEvent) bool, options *FindOptions) (*BaseEvent, error) {
return b.findEventName(event_pattern, where, options)
}

func (b *EventBus) findEventName(event_pattern string, where func(event *BaseEvent) bool, options *FindOptions) (*BaseEvent, error) {
if err := b.rejectIfDestroyed("Find"); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1780,7 +1913,23 @@ func (b *EventBus) Find(event_pattern string, where func(event *BaseEvent) bool,
}
}

func (b *EventBus) Filter(event_pattern string, where func(event *BaseEvent) bool, options *FilterOptions) ([]*BaseEvent, error) {
func (b *EventBus) Filter(input any, where any, options *FilterOptions) ([]*BaseEvent, error) {
event, err := baseEventFromAny(input)
if err != nil {
return nil, err
}
matches, err := normalizeTypedFindPredicate(where)
if err != nil {
return nil, err
}
return b.filterEventName(event.EventType, matches, options)
}

func (b *EventBus) FilterEventName(event_pattern string, where func(event *BaseEvent) bool, options *FilterOptions) ([]*BaseEvent, error) {
return b.filterEventName(event_pattern, where, options)
}

func (b *EventBus) filterEventName(event_pattern string, where func(event *BaseEvent) bool, options *FilterOptions) ([]*BaseEvent, error) {
if err := b.rejectIfDestroyed("Filter"); err != nil {
return nil, err
}
Expand Down
148 changes: 139 additions & 9 deletions abxbus-go/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"reflect"

"github.com/ArchiveBox/abxbus/abxbus-go/v2/jsonschema"
)

type EventHandlerCallable func(event *BaseEvent, ctx context.Context) (any, error)
Expand Down Expand Up @@ -132,27 +134,30 @@ func normalizeEventHandlerCallable(handler any) (EventHandlerCallable, error) {
}, nil
default:
if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() {
return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler)
return nil, unsupportedHandlerSignatureError(handler)
}
handlerType := value.Type()
if handlerType.NumIn() != 1 && handlerType.NumIn() != 2 {
return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler)
}
if handlerType.In(0) != baseEventPointerType {
return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler)
return nil, unsupportedHandlerSignatureError(handler)
}
withContext := handlerType.NumIn() == 2
if withContext && handlerType.In(1) != contextInterfaceType {
return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler)
return nil, unsupportedHandlerSignatureError(handler)
}
if handlerType.NumOut() > 2 {
return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler)
return nil, unsupportedHandlerSignatureError(handler)
}
if handlerType.NumOut() == 1 && !handlerType.Out(0).Implements(errorInterfaceType) {
return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler)
if handlerType.In(0) == baseEventPointerType {
return nil, unsupportedHandlerSignatureError(handler)
}
return normalizeTypedEventHandlerReflectCallable(value, handlerType), nil
}
if handlerType.NumOut() == 2 && !handlerType.Out(1).Implements(errorInterfaceType) {
return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler)
return nil, unsupportedHandlerSignatureError(handler)
}
if handlerType.In(0) != baseEventPointerType {
return normalizeTypedEventHandlerReflectCallable(value, handlerType), nil
}
return func(event *BaseEvent, ctx context.Context) (any, error) {
args := []reflect.Value{reflect.ValueOf(event)}
Expand All @@ -178,3 +183,128 @@ func normalizeEventHandlerCallable(handler any) (EventHandlerCallable, error) {
}, nil
}
}

func normalizeTypedEventHandlerCallable(handler any) (EventHandlerCallable, error) {
value := reflect.ValueOf(handler)
if value.IsValid() && value.Kind() == reflect.Func && value.IsNil() {
return nil, nil
}
if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() {
return nil, unsupportedHandlerSignatureError(handler)
}
handlerType := value.Type()
if handlerType.NumIn() != 1 && handlerType.NumIn() != 2 {
return nil, unsupportedHandlerSignatureError(handler)
}
if handlerType.In(0) == baseEventPointerType {
return nil, unsupportedHandlerSignatureError(handler)
}
if handlerType.NumIn() == 2 && handlerType.In(1) != contextInterfaceType {
return nil, unsupportedHandlerSignatureError(handler)
}
if handlerType.NumOut() > 2 {
return nil, unsupportedHandlerSignatureError(handler)
}
if handlerType.NumOut() == 2 && !handlerType.Out(1).Implements(errorInterfaceType) {
return nil, unsupportedHandlerSignatureError(handler)
}
return normalizeTypedEventHandlerReflectCallable(value, handlerType), nil
}

func unsupportedHandlerSignatureError(handler any) error {
return fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), a typed payload handler func(TPayload), func(TPayload) error, func(TPayload) TResult, func(TPayload) (TResult, error), or the same forms with context.Context as the second argument; got %T", handler)
}

func normalizeTypedEventHandlerReflectCallable(value reflect.Value, handlerType reflect.Type) EventHandlerCallable {
payloadType := handlerType.In(0)
payloadSchema := jsonSchemaForType(payloadType)
withContext := handlerType.NumIn() == 2
return func(event *BaseEvent, ctx context.Context) (any, error) {
if err := jsonschema.Validate(payloadSchema, event.Payload); err != nil {
return nil, fmt.Errorf("EventHandlerPayloadSchemaError: Event payload did not match declared handler payload type: %w", err)
}
payload, err := eventPayloadAsReflectValue(event, payloadType)
if err != nil {
return nil, err
}
args := []reflect.Value{payload}
if withContext {
args = append(args, reflect.ValueOf(ctx))
}
results := value.Call(args)
switch len(results) {
case 0:
return nil, nil
case 1:
if results[0].Type().Implements(errorInterfaceType) {
if reflectValueIsNil(results[0]) {
return nil, nil
}
return nil, results[0].Interface().(error)
}
return reflectResultInterface(results[0]), nil
default:
var err error
if !reflectValueIsNil(results[1]) {
err = results[1].Interface().(error)
}
return reflectResultInterface(results[0]), err
}
}
}

func eventPayloadAsReflectValue(event *BaseEvent, payloadType reflect.Type) (reflect.Value, error) {
if event == nil {
return reflect.Value{}, fmt.Errorf("event is nil")
}
data, err := json.Marshal(event.Payload)
if err != nil {
return reflect.Value{}, err
}
payloadPtr := reflect.New(payloadType)
if err := json.Unmarshal(data, payloadPtr.Interface()); err != nil {
return reflect.Value{}, err
}
return payloadPtr.Elem(), nil
}

func reflectResultInterface(value reflect.Value) any {
if reflectValueIsNil(value) {
return nil
}
return value.Interface()
}

func normalizeTypedFindPredicate(where any) (func(*BaseEvent) bool, error) {
if where == nil {
return nil, nil
}
if typed, ok := where.(func(*BaseEvent) bool); ok {
return typed, nil
}
value := reflect.ValueOf(where)
if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() {
return nil, fmt.Errorf("where must be func(*BaseEvent) bool or func(TPayload) bool; got %T", where)
}
predicateType := value.Type()
if predicateType.NumIn() != 1 || predicateType.NumOut() != 1 || predicateType.Out(0).Kind() != reflect.Bool {
return nil, fmt.Errorf("where must be func(*BaseEvent) bool or func(TPayload) bool; got %T", where)
}
if predicateType.In(0) == baseEventPointerType {
return func(event *BaseEvent) bool {
return value.Call([]reflect.Value{reflect.ValueOf(event)})[0].Bool()
}, nil
}
payloadType := predicateType.In(0)
payloadSchema := jsonSchemaForType(payloadType)
return func(event *BaseEvent) bool {
if err := jsonschema.Validate(payloadSchema, event.Payload); err != nil {
return false
}
payload, err := eventPayloadAsReflectValue(event, payloadType)
if err != nil {
return false
}
return value.Call([]reflect.Value{payload})[0].Bool()
}, nil
}
9 changes: 7 additions & 2 deletions abxbus-go/jsonl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ func NewJSONLEventBridge(path string, pollIntervalSeconds float64, name string)
}
}

func (b *JSONLEventBridge) On(eventPattern string, handlerName string, handler any, options *EventHandler) *EventHandler {
func (b *JSONLEventBridge) OnEventName(eventPattern string, handlerName string, handler any, options *EventHandler) *EventHandler {
_ = b.Start()
return b.inboundBus.On(eventPattern, handlerName, handler, options)
return b.inboundBus.OnEventName(eventPattern, handlerName, handler, options)
}

func (b *JSONLEventBridge) On(eventName string, handlerName string, handler any, options *EventHandler) *EventHandler {
_ = b.Start()
return b.inboundBus.On(eventName, handlerName, handler, options)
}

func (b *JSONLEventBridge) Emit(event *BaseEvent) (*BaseEvent, error) {
Expand Down
Loading