diff --git a/abxbus-go/event_bus.go b/abxbus-go/event_bus.go index c2045c6b..87059aef 100644 --- a/abxbus-go/event_bus.go +++ b/abxbus-go/event_bus.go @@ -298,7 +298,120 @@ func (b *EventBus) notifyBusHandlersChange(handler *EventHandler, registered boo } } -func (b *EventBus) On(event_pattern string, handler_name string, handler any, options *EventHandler) *EventHandler { +func (b *EventBus) On(args ...any) *EventHandler { + if err := b.rejectIfDestroyed("On"); err != nil { + panic(err) + } + event_name, handler_name, handler, options, err := parseOnArgs(args) + if err != nil { + panic(err) + } + if event_name == "" || event_name == "*" { + panic(`EventBus.On registers handlers for one concrete event name; use EventBus.OnEventName for wildcard event-name handlers`) + } + normalizedHandler, err := normalizeEventHandlerCallable(handler) + if err != nil { + panic(err) + } + return b.registerHandler(event_name, handler_name, normalizedHandler, options) +} + +func parseOnArgs(args []any) (string, string, any, *EventHandler, error) { + if len(args) == 0 || len(args) > 4 { + return "", "", nil, nil, fmt.Errorf("EventBus.On expects handler, event name + handler, or event name + handler name + handler") + } + handlerName := "handler" + var eventName string + var handler any + var options *EventHandler + if first, ok := args[0].(string); ok { + eventName = first + if len(args) >= 2 { + if second, ok := args[1].(string); ok { + handlerName = second + if len(args) < 3 { + return "", "", nil, nil, fmt.Errorf("EventBus.On missing handler callback") + } + handler = args[2] + if len(args) == 4 { + parsedOptions, err := parseEventHandlerOptions(args[3]) + if err != nil { + return "", "", nil, nil, err + } + options = parsedOptions + } + } else { + if len(args) > 3 { + return "", "", nil, nil, fmt.Errorf("EventBus.On exact-name form expects event name, handler, and optional *EventHandler") + } + handler = args[1] + if len(args) == 3 { + parsedOptions, err := parseEventHandlerOptions(args[2]) + if err != nil { + return "", "", nil, nil, err + } + options = parsedOptions + } + } + } + } else { + if len(args) > 2 { + return "", "", nil, nil, fmt.Errorf("EventBus.On inferred form expects handler and optional *EventHandler") + } + handler = args[0] + inferred, err := inferEventTypeFromHandler(handler) + if err != nil { + return "", "", nil, nil, err + } + eventName = inferred + if len(args) >= 2 { + parsedOptions, err := parseEventHandlerOptions(args[1]) + if err != nil { + return "", "", nil, nil, err + } + options = parsedOptions + } + } + if handler == nil { + return "", "", nil, nil, fmt.Errorf("EventBus.On missing handler callback") + } + return eventName, handlerName, handler, options, nil +} + +func parseEventHandlerOptions(value any) (*EventHandler, error) { + if value == nil { + return nil, nil + } + options, ok := value.(*EventHandler) + if !ok { + return nil, fmt.Errorf("EventBus.On options must be *EventHandler, got %T", value) + } + return options, nil +} + +func inferEventTypeFromHandler(handler any) (string, error) { + value := reflect.ValueOf(handler) + if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() { + return "", unsupportedHandlerSignatureError(handler) + } + handlerType := value.Type() + if handlerType.NumIn() == 0 { + return "", unsupportedHandlerSignatureError(handler) + } + payloadType := handlerType.In(0) + if payloadType == baseEventPointerType { + return "", fmt.Errorf("EventBus.On cannot infer an event type from *BaseEvent handlers; pass an exact event name or use a typed payload handler") + } + for payloadType.Kind() == reflect.Pointer { + payloadType = payloadType.Elem() + } + if payloadType.Kind() != reflect.Struct || payloadType.Name() == "" { + return "", fmt.Errorf("EventBus.On cannot infer an event type from handler payload %s; pass an exact event name", payloadType) + } + return payloadType.Name(), nil +} + +func (b *EventBus) OnEventName(event_pattern string, handler_name string, handler any, options *EventHandler) *EventHandler { if err := b.rejectIfDestroyed("On"); err != nil { panic(err) } @@ -309,6 +422,10 @@ func (b *EventBus) On(event_pattern string, handler_name string, handler any, op if err != nil { panic(err) } + return b.registerHandler(event_pattern, handler_name, normalizedHandler, options) +} + +func (b *EventBus) registerHandler(event_pattern string, handler_name string, normalizedHandler EventHandlerCallable, options *EventHandler) *EventHandler { h := NewEventHandler(b.Name, b.ID, event_pattern, handler_name, normalizedHandler) explicitID := false if options != nil { @@ -1704,7 +1821,23 @@ func (b *EventBus) eventMatchesEquals(event *BaseEvent, equals map[string]any) b return eventMatchesEquals(event, equals) } -func (b *EventBus) Find(event_pattern string, where func(event *BaseEvent) bool, options *FindOptions) (*BaseEvent, error) { +func (b *EventBus) Find(input any, where any, options *FindOptions) (*BaseEvent, error) { + event, err := baseEventFromAny(input) + if err != nil { + return nil, err + } + matches, err := normalizeTypedFindPredicate(where) + if err != nil { + return nil, err + } + return b.findEventName(event.EventType, matches, options) +} + +func (b *EventBus) FindEventName(event_pattern string, where func(event *BaseEvent) bool, options *FindOptions) (*BaseEvent, error) { + return b.findEventName(event_pattern, where, options) +} + +func (b *EventBus) findEventName(event_pattern string, where func(event *BaseEvent) bool, options *FindOptions) (*BaseEvent, error) { if err := b.rejectIfDestroyed("Find"); err != nil { return nil, err } @@ -1780,7 +1913,23 @@ func (b *EventBus) Find(event_pattern string, where func(event *BaseEvent) bool, } } -func (b *EventBus) Filter(event_pattern string, where func(event *BaseEvent) bool, options *FilterOptions) ([]*BaseEvent, error) { +func (b *EventBus) Filter(input any, where any, options *FilterOptions) ([]*BaseEvent, error) { + event, err := baseEventFromAny(input) + if err != nil { + return nil, err + } + matches, err := normalizeTypedFindPredicate(where) + if err != nil { + return nil, err + } + return b.filterEventName(event.EventType, matches, options) +} + +func (b *EventBus) FilterEventName(event_pattern string, where func(event *BaseEvent) bool, options *FilterOptions) ([]*BaseEvent, error) { + return b.filterEventName(event_pattern, where, options) +} + +func (b *EventBus) filterEventName(event_pattern string, where func(event *BaseEvent) bool, options *FilterOptions) ([]*BaseEvent, error) { if err := b.rejectIfDestroyed("Filter"); err != nil { return nil, err } diff --git a/abxbus-go/event_handler.go b/abxbus-go/event_handler.go index 1c0102dc..52818834 100644 --- a/abxbus-go/event_handler.go +++ b/abxbus-go/event_handler.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "reflect" + + "github.com/ArchiveBox/abxbus/abxbus-go/v2/jsonschema" ) type EventHandlerCallable func(event *BaseEvent, ctx context.Context) (any, error) @@ -132,27 +134,30 @@ func normalizeEventHandlerCallable(handler any) (EventHandlerCallable, error) { }, nil default: if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() { - return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler) + return nil, unsupportedHandlerSignatureError(handler) } handlerType := value.Type() if handlerType.NumIn() != 1 && handlerType.NumIn() != 2 { - return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler) - } - if handlerType.In(0) != baseEventPointerType { - return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler) + return nil, unsupportedHandlerSignatureError(handler) } withContext := handlerType.NumIn() == 2 if withContext && handlerType.In(1) != contextInterfaceType { - return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler) + return nil, unsupportedHandlerSignatureError(handler) } if handlerType.NumOut() > 2 { - return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler) + return nil, unsupportedHandlerSignatureError(handler) } if handlerType.NumOut() == 1 && !handlerType.Out(0).Implements(errorInterfaceType) { - return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler) + if handlerType.In(0) == baseEventPointerType { + return nil, unsupportedHandlerSignatureError(handler) + } + return normalizeTypedEventHandlerReflectCallable(value, handlerType), nil } if handlerType.NumOut() == 2 && !handlerType.Out(1).Implements(errorInterfaceType) { - return nil, fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), or the same forms with context.Context as the second argument; got %T", handler) + return nil, unsupportedHandlerSignatureError(handler) + } + if handlerType.In(0) != baseEventPointerType { + return normalizeTypedEventHandlerReflectCallable(value, handlerType), nil } return func(event *BaseEvent, ctx context.Context) (any, error) { args := []reflect.Value{reflect.ValueOf(event)} @@ -178,3 +183,128 @@ func normalizeEventHandlerCallable(handler any) (EventHandlerCallable, error) { }, nil } } + +func normalizeTypedEventHandlerCallable(handler any) (EventHandlerCallable, error) { + value := reflect.ValueOf(handler) + if value.IsValid() && value.Kind() == reflect.Func && value.IsNil() { + return nil, nil + } + if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() { + return nil, unsupportedHandlerSignatureError(handler) + } + handlerType := value.Type() + if handlerType.NumIn() != 1 && handlerType.NumIn() != 2 { + return nil, unsupportedHandlerSignatureError(handler) + } + if handlerType.In(0) == baseEventPointerType { + return nil, unsupportedHandlerSignatureError(handler) + } + if handlerType.NumIn() == 2 && handlerType.In(1) != contextInterfaceType { + return nil, unsupportedHandlerSignatureError(handler) + } + if handlerType.NumOut() > 2 { + return nil, unsupportedHandlerSignatureError(handler) + } + if handlerType.NumOut() == 2 && !handlerType.Out(1).Implements(errorInterfaceType) { + return nil, unsupportedHandlerSignatureError(handler) + } + return normalizeTypedEventHandlerReflectCallable(value, handlerType), nil +} + +func unsupportedHandlerSignatureError(handler any) error { + return fmt.Errorf("handler must be one of: func(*BaseEvent), func(*BaseEvent) error, func(*BaseEvent) (any, error), a typed payload handler func(TPayload), func(TPayload) error, func(TPayload) TResult, func(TPayload) (TResult, error), or the same forms with context.Context as the second argument; got %T", handler) +} + +func normalizeTypedEventHandlerReflectCallable(value reflect.Value, handlerType reflect.Type) EventHandlerCallable { + payloadType := handlerType.In(0) + payloadSchema := jsonSchemaForType(payloadType) + withContext := handlerType.NumIn() == 2 + return func(event *BaseEvent, ctx context.Context) (any, error) { + if err := jsonschema.Validate(payloadSchema, event.Payload); err != nil { + return nil, fmt.Errorf("EventHandlerPayloadSchemaError: Event payload did not match declared handler payload type: %w", err) + } + payload, err := eventPayloadAsReflectValue(event, payloadType) + if err != nil { + return nil, err + } + args := []reflect.Value{payload} + if withContext { + args = append(args, reflect.ValueOf(ctx)) + } + results := value.Call(args) + switch len(results) { + case 0: + return nil, nil + case 1: + if results[0].Type().Implements(errorInterfaceType) { + if reflectValueIsNil(results[0]) { + return nil, nil + } + return nil, results[0].Interface().(error) + } + return reflectResultInterface(results[0]), nil + default: + var err error + if !reflectValueIsNil(results[1]) { + err = results[1].Interface().(error) + } + return reflectResultInterface(results[0]), err + } + } +} + +func eventPayloadAsReflectValue(event *BaseEvent, payloadType reflect.Type) (reflect.Value, error) { + if event == nil { + return reflect.Value{}, fmt.Errorf("event is nil") + } + data, err := json.Marshal(event.Payload) + if err != nil { + return reflect.Value{}, err + } + payloadPtr := reflect.New(payloadType) + if err := json.Unmarshal(data, payloadPtr.Interface()); err != nil { + return reflect.Value{}, err + } + return payloadPtr.Elem(), nil +} + +func reflectResultInterface(value reflect.Value) any { + if reflectValueIsNil(value) { + return nil + } + return value.Interface() +} + +func normalizeTypedFindPredicate(where any) (func(*BaseEvent) bool, error) { + if where == nil { + return nil, nil + } + if typed, ok := where.(func(*BaseEvent) bool); ok { + return typed, nil + } + value := reflect.ValueOf(where) + if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() { + return nil, fmt.Errorf("where must be func(*BaseEvent) bool or func(TPayload) bool; got %T", where) + } + predicateType := value.Type() + if predicateType.NumIn() != 1 || predicateType.NumOut() != 1 || predicateType.Out(0).Kind() != reflect.Bool { + return nil, fmt.Errorf("where must be func(*BaseEvent) bool or func(TPayload) bool; got %T", where) + } + if predicateType.In(0) == baseEventPointerType { + return func(event *BaseEvent) bool { + return value.Call([]reflect.Value{reflect.ValueOf(event)})[0].Bool() + }, nil + } + payloadType := predicateType.In(0) + payloadSchema := jsonSchemaForType(payloadType) + return func(event *BaseEvent) bool { + if err := jsonschema.Validate(payloadSchema, event.Payload); err != nil { + return false + } + payload, err := eventPayloadAsReflectValue(event, payloadType) + if err != nil { + return false + } + return value.Call([]reflect.Value{payload})[0].Bool() + }, nil +} diff --git a/abxbus-go/jsonl_bridge.go b/abxbus-go/jsonl_bridge.go index e99df9a1..47492353 100644 --- a/abxbus-go/jsonl_bridge.go +++ b/abxbus-go/jsonl_bridge.go @@ -46,9 +46,14 @@ func NewJSONLEventBridge(path string, pollIntervalSeconds float64, name string) } } -func (b *JSONLEventBridge) On(eventPattern string, handlerName string, handler any, options *EventHandler) *EventHandler { +func (b *JSONLEventBridge) OnEventName(eventPattern string, handlerName string, handler any, options *EventHandler) *EventHandler { _ = b.Start() - return b.inboundBus.On(eventPattern, handlerName, handler, options) + return b.inboundBus.OnEventName(eventPattern, handlerName, handler, options) +} + +func (b *JSONLEventBridge) On(eventName string, handlerName string, handler any, options *EventHandler) *EventHandler { + _ = b.Start() + return b.inboundBus.On(eventName, handlerName, handler, options) } func (b *JSONLEventBridge) Emit(event *BaseEvent) (*BaseEvent, error) { diff --git a/abxbus-go/jsonschema/jsonschema.go b/abxbus-go/jsonschema/jsonschema.go index 7c831417..e7e97629 100644 --- a/abxbus-go/jsonschema/jsonschema.go +++ b/abxbus-go/jsonschema/jsonschema.go @@ -52,6 +52,28 @@ func SchemaForValue(value any) map[string]any { // SchemaForType returns a small JSON Schema object for t. func SchemaForType(t reflect.Type) map[string]any { + state := schemaForState{ + defs: map[string]any{}, + inProgress: map[reflect.Type]string{}, + typeNames: map[reflect.Type]string{}, + usedNames: map[string]reflect.Type{}, + } + schema := state.schemaForType(t) + if len(state.defs) > 0 { + schema = cloneSchemaMap(schema) + schema["$defs"] = state.defs + } + return schema +} + +type schemaForState struct { + defs map[string]any + inProgress map[reflect.Type]string + typeNames map[reflect.Type]string + usedNames map[string]reflect.Type +} + +func (state *schemaForState) schemaForType(t reflect.Type) map[string]any { for t != nil && t.Kind() == reflect.Pointer { t = t.Elem() } @@ -60,31 +82,20 @@ func SchemaForType(t reflect.Type) map[string]any { } switch t.Kind() { case reflect.Struct: - properties := map[string]any{} - required := []any{} - for i := 0; i < t.NumField(); i++ { - field := t.Field(i) - if field.PkgPath != "" { - continue - } - name, omitEmpty, skip := jsonFieldName(field) - if skip { - continue + if name := state.schemaRefName(t); name != "" { + if _, ok := state.defs[name]; ok { + return map[string]any{"$ref": "#/$defs/" + name} } - properties[name] = SchemaForType(field.Type) - if !omitEmpty && !isOptionalType(field.Type) { - required = append(required, name) + if _, ok := state.inProgress[t]; ok { + return map[string]any{"$ref": "#/$defs/" + name} } + state.inProgress[t] = name + schema := state.schemaForStruct(t) + delete(state.inProgress, t) + state.defs[name] = schema + return schema } - schema := map[string]any{ - "type": "object", - "properties": properties, - "additionalProperties": false, - } - if len(required) > 0 { - schema["required"] = required - } - return schema + return state.schemaForStruct(t) case reflect.String: return map[string]any{"type": "string"} case reflect.Bool: @@ -95,14 +106,82 @@ func SchemaForType(t reflect.Type) map[string]any { case reflect.Float32, reflect.Float64: return map[string]any{"type": "number"} case reflect.Slice, reflect.Array: - return map[string]any{"type": "array", "items": SchemaForType(t.Elem())} + return map[string]any{"type": "array", "items": state.schemaForType(t.Elem())} case reflect.Map: - return map[string]any{"type": "object", "additionalProperties": SchemaForType(t.Elem())} + return map[string]any{"type": "object", "additionalProperties": state.schemaForType(t.Elem())} default: return map[string]any{} } } +func (state *schemaForState) schemaForStruct(t reflect.Type) map[string]any { + properties := map[string]any{} + required := []any{} + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + if field.PkgPath != "" { + continue + } + name, omitEmpty, skip := jsonFieldName(field) + if skip { + continue + } + properties[name] = state.schemaForType(field.Type) + if !omitEmpty && !isOptionalType(field.Type) { + required = append(required, name) + } + } + schema := map[string]any{ + "type": "object", + "properties": properties, + "additionalProperties": false, + } + if len(required) > 0 { + schema["required"] = required + } + return schema +} + +func (state *schemaForState) schemaRefName(t reflect.Type) string { + if t.Name() == "" { + return "" + } + if name, ok := state.typeNames[t]; ok { + return name + } + base := sanitizeSchemaRefName(t.PkgPath() + "." + t.Name()) + name := base + for i := 2; ; i++ { + existing, ok := state.usedNames[name] + if !ok || existing == t { + state.usedNames[name] = t + state.typeNames[t] = name + return name + } + name = fmt.Sprintf("%s_%d", base, i) + } +} + +func sanitizeSchemaRefName(name string) string { + var builder strings.Builder + for _, r := range name { + if r >= 'a' && r <= 'z' || r >= 'A' && r <= 'Z' || r >= '0' && r <= '9' || r == '_' || r == '-' || r == '.' { + builder.WriteRune(r) + } else { + builder.WriteByte('_') + } + } + return builder.String() +} + +func cloneSchemaMap(schema map[string]any) map[string]any { + clone := make(map[string]any, len(schema)+1) + for key, value := range schema { + clone[key] = value + } + return clone +} + func jsonFieldName(field reflect.StructField) (string, bool, bool) { tag := field.Tag.Get("json") if tag == "-" { diff --git a/abxbus-go/jsonschema/jsonschema_test.go b/abxbus-go/jsonschema/jsonschema_test.go index dd5ef372..52c767bc 100644 --- a/abxbus-go/jsonschema/jsonschema_test.go +++ b/abxbus-go/jsonschema/jsonschema_test.go @@ -86,3 +86,28 @@ func TestSchemaForStructUsesJSONNamesAndValidates(t *testing.T) { t.Fatalf("expected generated schema to reject wrong id type, got %v", err) } } + +func TestSchemaForRecursiveStructUsesRefsAndValidates(t *testing.T) { + type Node struct { + ID string `json:"id"` + Children []Node `json:"children,omitempty"` + Parent *Node `json:"parent,omitempty"` + } + + schema := jsonschema.SchemaFor[Node]() + if _, ok := schema["$defs"].(map[string]any); !ok { + t.Fatalf("expected recursive schema defs, got %#v", schema) + } + if err := jsonschema.Validate(schema, Node{ID: "root", Children: []Node{{ID: "child"}}}); err != nil { + t.Fatalf("expected recursive struct value to validate: %v", err) + } + payload := map[string]any{ + "id": "root", + "children": []any{ + map[string]any{"id": 3}, + }, + } + if err := jsonschema.Validate(schema, payload); err == nil || !strings.Contains(err.Error(), "expected string") { + t.Fatalf("expected generated recursive schema to reject wrong child id type, got %v", err) + } +} diff --git a/abxbus-go/tests/base_event_test.go b/abxbus-go/tests/base_event_test.go index 0a78b4ce..bb69e700 100644 --- a/abxbus-go/tests/base_event_test.go +++ b/abxbus-go/tests/base_event_test.go @@ -1096,7 +1096,7 @@ func TestWaitIsPassiveInsideHandlersAndTimesOutForSerialEvents(t *testing.T) { record("parent_start") emitted := e.Emit(abxbus.NewBaseEvent("PassiveSerialEmittedEvent", nil)) foundSource := e.Emit(abxbus.NewBaseEvent("PassiveSerialFoundEvent", nil)) - found, err := bus.Find("PassiveSerialFoundEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + found, err := bus.FindEventName("PassiveSerialFoundEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { return nil, err } @@ -1179,7 +1179,7 @@ func TestWaitSerialWaitInsideHandlerTimesOutAndWarnsAboutSlowHandler(t *testing. bus.On("EventCompletedSerialDeadlockWarningParentEvent", "parent", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { record("parent_start") child := e.Emit(abxbus.NewBaseEvent("EventCompletedSerialDeadlockWarningChildEvent", nil)) - found, err := bus.Find("EventCompletedSerialDeadlockWarningChildEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + found, err := bus.FindEventName("EventCompletedSerialDeadlockWarningChildEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { return nil, err } @@ -1301,7 +1301,7 @@ func TestWaitWaitsForNormalParallelProcessingInsideHandlers(t *testing.T) { foundEvent := abxbus.NewBaseEvent("PassiveParallelFoundEvent", nil) foundEvent.EventConcurrency = abxbus.EventConcurrencyParallel foundSource := e.Emit(foundEvent) - found, err := bus.Find("PassiveParallelFoundEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + found, err := bus.FindEventName("PassiveParallelFoundEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { return nil, err } @@ -1379,7 +1379,7 @@ func TestAwaitedParallelQueueJumpChildDoesNotPauseLaterParallelChildEvents(t *te e.Emit(newChild("bg")) appendLocked(&mu, &order, "parent_after_bg_emit") - found, err := bus.Find("ParallelPauseObservedEvent", func(event *abxbus.BaseEvent) bool { + found, err := bus.FindEventName("ParallelPauseObservedEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["name"] == "bg" }, &abxbus.FindOptions{Past: true, Future: 0.2}) if err != nil { @@ -1492,7 +1492,7 @@ func TestWaitWaitsForFutureParallelEventFoundAfterHandlerStarts(t *testing.T) { bus.On("FutureParallelSomeOtherEvent", "other", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { close(otherStarted) <-releaseFind - found, err := bus.Find("FutureParallelEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + found, err := bus.FindEventName("FutureParallelEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { return nil, err } @@ -1619,7 +1619,7 @@ func TestBaseEventCarriesEventBusReferenceDuringDispatch(t *testing.T) { func TestBaseEventBusReferenceReflectsForwardedProcessingBus(t *testing.T) { source := abxbus.NewEventBus("ProxySourceBus", nil) target := abxbus.NewEventBus("ProxyTargetBus", nil) - source.On("*", "forward", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + source.OnEventName("*", "forward", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { target.Emit(event) return "forwarded", nil }, nil) @@ -1653,7 +1653,7 @@ func TestBaseEventBusReferenceReflectsForwardedProcessingBus(t *testing.T) { func TestEventEmitFromForwardedHandlerDispatchesChildOnTargetBus(t *testing.T) { source := abxbus.NewEventBus("ProxyChildSourceBus", nil) target := abxbus.NewEventBus("ProxyChildTargetBus", nil) - source.On("*", "forward", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + source.OnEventName("*", "forward", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { target.Emit(event) return "forwarded", nil }, nil) diff --git a/abxbus-go/tests/comprehensive_patterns_test.go b/abxbus-go/tests/comprehensive_patterns_test.go index 96f4fefd..be8a41c9 100644 --- a/abxbus-go/tests/comprehensive_patterns_test.go +++ b/abxbus-go/tests/comprehensive_patterns_test.go @@ -85,12 +85,12 @@ func TestComprehensivePatternsForwardingDispatchAndParentTracking(t *testing.T) results = append(results, fmt.Sprintf("%04d:%s", sequence, label)) } - bus2.On("*", "child_bus2_event_handler", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { + bus2.OnEventName("*", "child_bus2_event_handler", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { eventTypeShort := strings.TrimSuffix(e.EventType, "Event") next("bus2_handler_" + eventTypeShort) return "forwarded bus result", nil }, nil) - bus1.On("*", "emit", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { + bus1.OnEventName("*", "emit", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { return bus2.Emit(e), nil }, nil) @@ -176,7 +176,7 @@ func TestComprehensiveRaceConditionStress(t *testing.T) { var mu sync.Mutex results := []string{} - bus1.On("*", "forward_to_bus2", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { + bus1.OnEventName("*", "forward_to_bus2", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { return bus2.Emit(e), nil }, nil) diff --git a/abxbus-go/tests/event_handler_test.go b/abxbus-go/tests/event_handler_test.go index 74311c3c..46554d69 100644 --- a/abxbus-go/tests/event_handler_test.go +++ b/abxbus-go/tests/event_handler_test.go @@ -49,37 +49,40 @@ func TestEventHandlerJSONRoundtrip(t *testing.T) { func TestEventBusOnSupportsEventFirstOptionalContextHandlerSignatures(t *testing.T) { bus := abxbus.NewEventBus("HandlerSignatureBus", nil) seen := []string{} + type handlerSignatureResult struct { + Source string + } - bus.On("HandlerSignatureEvent", "value_error", func(event *abxbus.BaseEvent) (any, error) { - seen = append(seen, event.EventType+":value_error") - return "value_error", nil + bus.On("HandlerSignatureEvent", "handler_a", func(event *abxbus.BaseEvent) (handlerSignatureResult, error) { + seen = append(seen, event.EventType+":handler_a") + return handlerSignatureResult{Source: "event"}, nil }, nil) - bus.On("HandlerSignatureEvent", "value_error_ctx", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + bus.On("HandlerSignatureEvent", "handler_b", func(event *abxbus.BaseEvent, ctx context.Context) (handlerSignatureResult, error) { if ctx == nil { t.Fatal("context should be available when requested") } - seen = append(seen, event.EventType+":value_error_ctx") - return "value_error_ctx", nil + seen = append(seen, event.EventType+":handler_b") + return handlerSignatureResult{Source: "context"}, nil }, nil) - bus.On("HandlerSignatureEvent", "error_only", func(event *abxbus.BaseEvent) error { - seen = append(seen, event.EventType+":error_only") + bus.On("HandlerSignatureEvent", "handler_c", func(event *abxbus.BaseEvent) error { + seen = append(seen, event.EventType+":handler_c") return nil }, nil) - bus.On("HandlerSignatureEvent", "error_only_ctx", func(event *abxbus.BaseEvent, ctx context.Context) error { + bus.On("HandlerSignatureEvent", "handler_d", func(event *abxbus.BaseEvent, ctx context.Context) error { if ctx == nil { t.Fatal("context should be available when requested") } - seen = append(seen, event.EventType+":error_only_ctx") + seen = append(seen, event.EventType+":handler_d") return nil }, nil) - bus.On("HandlerSignatureEvent", "void", func(event *abxbus.BaseEvent) { - seen = append(seen, event.EventType+":void") + bus.On("HandlerSignatureEvent", "handler_e", func(event *abxbus.BaseEvent) { + seen = append(seen, event.EventType+":handler_e") }, nil) - bus.On("HandlerSignatureEvent", "void_ctx", func(event *abxbus.BaseEvent, ctx context.Context) { + bus.On("HandlerSignatureEvent", "handler_f", func(event *abxbus.BaseEvent, ctx context.Context) { if ctx == nil { t.Fatal("context should be available when requested") } - seen = append(seen, event.EventType+":void_ctx") + seen = append(seen, event.EventType+":handler_f") }, nil) values, err := bus.Emit(abxbus.NewBaseEvent("HandlerSignatureEvent", nil)).EventResultsList(&abxbus.EventResultOptions{ @@ -89,7 +92,9 @@ func TestEventBusOnSupportsEventFirstOptionalContextHandlerSignatures(t *testing if err != nil { t.Fatal(err) } - if len(values) != 2 || values[0] != "value_error" || values[1] != "value_error_ctx" { + if len(values) != 2 || + values[0] != (handlerSignatureResult{Source: "event"}) || + values[1] != (handlerSignatureResult{Source: "context"}) { t.Fatalf("expected only non-null handler values, got %#v", values) } if len(seen) != 6 { @@ -112,12 +117,12 @@ func TestEventBusOnTreatsTypedNilHandlersAsNoop(t *testing.T) { var named nilNamedHandler bus.On("TypedNilHandlerEvent", "direct", direct, nil) - bus.On("TypedNilHandlerEvent", "value_error", valueError, nil) - bus.On("TypedNilHandlerEvent", "value_error_ctx", valueErrorCtx, nil) - bus.On("TypedNilHandlerEvent", "error_only", errorOnly, nil) - bus.On("TypedNilHandlerEvent", "error_only_ctx", errorOnlyCtx, nil) - bus.On("TypedNilHandlerEvent", "void", voidOnly, nil) - bus.On("TypedNilHandlerEvent", "void_ctx", voidCtx, nil) + bus.On("TypedNilHandlerEvent", "handler_a", valueError, nil) + bus.On("TypedNilHandlerEvent", "handler_b", valueErrorCtx, nil) + bus.On("TypedNilHandlerEvent", "handler_c", errorOnly, nil) + bus.On("TypedNilHandlerEvent", "handler_d", errorOnlyCtx, nil) + bus.On("TypedNilHandlerEvent", "handler_e", voidOnly, nil) + bus.On("TypedNilHandlerEvent", "handler_f", voidCtx, nil) bus.On("TypedNilHandlerEvent", "named", named, nil) values, err := bus.Emit(abxbus.NewBaseEvent("TypedNilHandlerEvent", nil)).EventResultsList(&abxbus.EventResultOptions{ @@ -132,6 +137,30 @@ func TestEventBusOnTreatsTypedNilHandlersAsNoop(t *testing.T) { } } +func TestEventBusOnAndEmitInferTypesFromHandlerAndEvent(t *testing.T) { + type InferredOnEvent struct { + A int `json:"a"` + B int `json:"b"` + } + + bus := abxbus.NewEventBus("InferredOnBus", nil) + bus.On(func(payload InferredOnEvent) (int, error) { + return payload.A + payload.B, nil + }) + + event := bus.Emit(InferredOnEvent{A: 2, B: 3}) + if event.EventType != "InferredOnEvent" { + t.Fatalf("event type should be inferred from emitted struct, got %s", event.EventType) + } + result, err := event.EventResult() + if err != nil { + t.Fatal(err) + } + if result != 5 { + t.Fatalf("handler result should come from typed callback return, got %#v", result) + } +} + // Folded from event_handler_ids_test.go to keep test layout class-based. func TestBusAndEventIDsAreUUIDv7(t *testing.T) { bus := abxbus.NewEventBus("BusId", nil) diff --git a/abxbus-go/tests/event_result_test.go b/abxbus-go/tests/event_result_test.go index 6c0db71e..c8a828e6 100644 --- a/abxbus-go/tests/event_result_test.go +++ b/abxbus-go/tests/event_result_test.go @@ -1009,11 +1009,11 @@ func TestEmitAcceptsTypedStructAndDerivesPayloadAndConfig(t *testing.T) { func TestTypedEventPayloadAndResultHelpers(t *testing.T) { bus := abxbus.NewEventBus("TypedBus", nil) - abxbus.OnTyped[addPayload, addResult](bus, "AddEvent", "add", func(payload addPayload) (addResult, error) { + bus.On("AddEvent", "add", func(payload addPayload) (addResult, error) { return addResult{Sum: payload.A + payload.B}, nil }, nil) - event := abxbus.MustNewTypedEventWithResult[addPayload, addResult]("AddEvent", addPayload{A: 4, B: 9}) + event := abxbus.MustNewEvent("AddEvent", addPayload{A: 4, B: 9}, abxbus.ResultType[addResult]()) result, err := bus.Emit(event).EventResult() if err != nil { t.Fatal(err) @@ -1035,19 +1035,19 @@ func TestTypedEventPayloadAndResultHelpers(t *testing.T) { } } -func TestOnTypedSupportsOptionalContextHandlerSignatures(t *testing.T) { +func TestOnSupportsOptionalContextHandlerSignatures(t *testing.T) { bus := abxbus.NewEventBus("TypedOptionalContextBus", nil) - abxbus.OnTyped[addPayload, addResult](bus, "TypedNoContextEvent", "typed", func(payload addPayload) addResult { + bus.On("TypedNoContextEvent", "typed", func(payload addPayload) addResult { return addResult{Sum: payload.A + payload.B} }, nil) gotCtx := false - abxbus.OnTyped[addPayload, addResult](bus, "TypedWithContextEvent", "typed", func(payload addPayload, ctx context.Context) (addResult, error) { + bus.On("TypedWithContextEvent", "typed", func(payload addPayload, ctx context.Context) (addResult, error) { gotCtx = ctx != nil return addResult{Sum: payload.A + payload.B}, nil }, nil) - noCtxEvent := bus.Emit(abxbus.MustNewTypedEventWithResult[addPayload, addResult]("TypedNoContextEvent", addPayload{A: 2, B: 3})) + noCtxEvent := bus.Emit(abxbus.MustNewEvent("TypedNoContextEvent", addPayload{A: 2, B: 3}, abxbus.ResultType[addResult]())) if _, err := noCtxEvent.Now(); err != nil { t.Fatal(err) } @@ -1063,7 +1063,7 @@ func TestOnTypedSupportsOptionalContextHandlerSignatures(t *testing.T) { t.Fatalf("expected typed no-context result sum=5, got %#v", noCtxTypedResult) } - ctxEvent := bus.Emit(abxbus.MustNewTypedEventWithResult[addPayload, addResult]("TypedWithContextEvent", addPayload{A: 4, B: 6})) + ctxEvent := bus.Emit(abxbus.MustNewEvent("TypedWithContextEvent", addPayload{A: 4, B: 6}, abxbus.ResultType[addResult]())) if _, err := ctxEvent.Now(); err != nil { t.Fatal(err) } @@ -1115,19 +1115,19 @@ func TestEventBusOnSupportsNamedHandlerFunctionTypes(t *testing.T) { } } -func TestOnTypedSupportsNamedHandlerFunctionTypes(t *testing.T) { +func TestOnSupportsNamedHandlerFunctionTypes(t *testing.T) { bus := abxbus.NewEventBus("TypedNamedHandlerFunctionTypesBus", nil) gotCtx := false - abxbus.OnTyped[addPayload, addResult](bus, "TypedNamedNoContextEvent", "typed", namedTypedNoContextHandler(func(payload addPayload) addResult { + bus.On("TypedNamedNoContextEvent", "typed", namedTypedNoContextHandler(func(payload addPayload) addResult { return addResult{Sum: payload.A + payload.B} }), nil) - abxbus.OnTyped[addPayload, addResult](bus, "TypedNamedWithContextEvent", "typed", namedTypedWithContextHandler(func(payload addPayload, ctx context.Context) (addResult, error) { + bus.On("TypedNamedWithContextEvent", "typed", namedTypedWithContextHandler(func(payload addPayload, ctx context.Context) (addResult, error) { gotCtx = ctx != nil return addResult{Sum: payload.A + payload.B}, nil }), nil) - noContextEvent := bus.Emit(abxbus.MustNewTypedEventWithResult[addPayload, addResult]("TypedNamedNoContextEvent", addPayload{A: 3, B: 4})) + noContextEvent := bus.Emit(abxbus.MustNewEvent("TypedNamedNoContextEvent", addPayload{A: 3, B: 4}, abxbus.ResultType[addResult]())) if _, err := noContextEvent.Now(); err != nil { t.Fatal(err) } @@ -1143,7 +1143,7 @@ func TestOnTypedSupportsNamedHandlerFunctionTypes(t *testing.T) { t.Fatalf("expected named typed no-context result sum=7, got %#v", noContextTypedResult) } - contextEvent := bus.Emit(abxbus.MustNewTypedEventWithResult[addPayload, addResult]("TypedNamedWithContextEvent", addPayload{A: 5, B: 8})) + contextEvent := bus.Emit(abxbus.MustNewEvent("TypedNamedWithContextEvent", addPayload{A: 5, B: 8}, abxbus.ResultType[addResult]())) if _, err := contextEvent.Now(); err != nil { t.Fatal(err) } @@ -1160,11 +1160,11 @@ func TestOnTypedSupportsNamedHandlerFunctionTypes(t *testing.T) { } } -func TestOnTypedNamedAnyHandlerAcceptsNilPayload(t *testing.T) { +func TestOnNamedAnyHandlerAcceptsNilPayload(t *testing.T) { bus := abxbus.NewEventBus("TypedNamedAnyNilPayloadBus", nil) called := false - abxbus.OnTyped[any, addResult](bus, "TypedNamedAnyNilPayloadEvent", "typed", namedTypedAnyHandler(func(payload any) (addResult, error) { + bus.On("TypedNamedAnyNilPayloadEvent", "typed", namedTypedAnyHandler(func(payload any) (addResult, error) { called = true if payload != nil { t.Fatalf("expected nil payload, got %#v", payload) @@ -1199,16 +1199,16 @@ func TestTypedEventWithResultSchemaValidatesHandlerReturnAtRuntime(t *testing.T) return map[string]any{"sum": "not-an-int"}, nil }, nil) - event := abxbus.MustNewTypedEventWithResult[addPayload, addResult]("TypedSchemaEvent", addPayload{A: 1, B: 2}) + event := abxbus.MustNewEvent("TypedSchemaEvent", addPayload{A: 1, B: 2}, abxbus.ResultType[addResult]()) if _, err := bus.Emit(event).EventResult(); err == nil || !strings.Contains(err.Error(), "EventHandlerResultSchemaError") { t.Fatalf("expected typed result schema error, got %v", err) } } -func TestOnTypedValidatesPayloadBeforeCallingHandler(t *testing.T) { +func TestOnValidatesPayloadBeforeCallingHandler(t *testing.T) { bus := abxbus.NewEventBus("TypedPayloadSchemaBus", nil) called := false - abxbus.OnTyped[addPayload, addResult](bus, "TypedPayloadSchemaEvent", "typed", func(payload addPayload, ctx context.Context) (addResult, error) { + bus.On("TypedPayloadSchemaEvent", "typed", func(payload addPayload, ctx context.Context) (addResult, error) { called = true return addResult{Sum: payload.A + payload.B}, nil }, nil) @@ -1218,17 +1218,17 @@ func TestOnTypedValidatesPayloadBeforeCallingHandler(t *testing.T) { t.Fatal(err) } if called { - t.Fatal("typed handler should not be called when a required payload field is missing") + t.Fatal("handler should not be called when a required payload field is missing") } if _, err := event.EventResult(); err == nil || !strings.Contains(err.Error(), "EventHandlerPayloadSchemaError") { t.Fatalf("expected typed payload schema error, got %v", err) } } -func TestOnTypedRejectsWrongPayloadFieldType(t *testing.T) { +func TestOnRejectsWrongPayloadFieldType(t *testing.T) { bus := abxbus.NewEventBus("TypedPayloadTypeBus", nil) called := false - abxbus.OnTyped[addPayload, addResult](bus, "TypedPayloadTypeEvent", "typed", func(payload addPayload, ctx context.Context) (addResult, error) { + bus.On("TypedPayloadTypeEvent", "typed", func(payload addPayload, ctx context.Context) (addResult, error) { called = true return addResult{Sum: payload.A + payload.B}, nil }, nil) @@ -1238,7 +1238,7 @@ func TestOnTypedRejectsWrongPayloadFieldType(t *testing.T) { t.Fatal(err) } if called { - t.Fatal("typed handler should not be called when a payload field has the wrong type") + t.Fatal("handler should not be called when a payload field has the wrong type") } if _, err := event.EventResult(); err == nil || !strings.Contains(err.Error(), "EventHandlerPayloadSchemaError") { t.Fatalf("expected typed payload schema error, got %v", err) diff --git a/abxbus-go/tests/eventbus_cross_runtime_features_test.go b/abxbus-go/tests/eventbus_cross_runtime_features_test.go index 097d4030..b19c1ffe 100644 --- a/abxbus-go/tests/eventbus_cross_runtime_features_test.go +++ b/abxbus-go/tests/eventbus_cross_runtime_features_test.go @@ -93,7 +93,7 @@ func TestQueueJumpPreservesParentChildLineageAndFindVisibility(t *testing.T) { t.Fatal("child handler did not capture child event id") } - foundChild, err := bus.Find("QueueJumpChildEvent", nil, &abxbus.FindOptions{Past: true, Future: false, ChildOf: root}) + foundChild, err := bus.FindEventName("QueueJumpChildEvent", nil, &abxbus.FindOptions{Past: true, Future: false, ChildOf: root}) if err != nil { t.Fatal(err) } @@ -273,7 +273,7 @@ func TestZeroHistoryBackpressureWithFindFutureStillResolvesNewEvents(t *testing. if bus.EventHistory.Has(first.EventID) { t.Fatal("max_history_size=0 should drop completed events from history") } - past, err := bus.Find("ZeroHistoryEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + past, err := bus.FindEventName("ZeroHistoryEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -287,7 +287,7 @@ func TestZeroHistoryBackpressureWithFindFutureStillResolvesNewEvents(t *testing. futureEvent := bus.Emit(abxbus.NewBaseEvent("ZeroHistoryEvent", map[string]any{"value": "future"})) capturedFutureID <- futureEvent.EventID }() - futureMatch, err := bus.Find("ZeroHistoryEvent", func(event *abxbus.BaseEvent) bool { + futureMatch, err := bus.FindEventName("ZeroHistoryEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["value"] == "future" }, &abxbus.FindOptions{Past: false, Future: 1.0}) if err != nil { @@ -319,7 +319,7 @@ func TestContextPropagatesThroughForwardingAndChildDispatchWithLineageIntact(t * parentEventID := "" childParentID := "" - busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busA.OnEventName("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { return busB.Emit(event), nil }, nil) busB.On("ContextParentEvent", "on_parent", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { @@ -371,7 +371,7 @@ func TestContextPropagatesThroughForwardingAndChildDispatchWithLineageIntact(t * t.Fatalf("parent event path did not include target bus: %#v", parent.EventPath) } - foundChild, err := busB.Find("ContextChildEvent", nil, &abxbus.FindOptions{Past: true, Future: false, ChildOf: parent}) + foundChild, err := busB.FindEventName("ContextChildEvent", nil, &abxbus.FindOptions{Past: true, Future: false, ChildOf: parent}) if err != nil { t.Fatal(err) } @@ -408,7 +408,7 @@ func TestPendingQueueFindVisibilityTransitionsToCompletedAfterRelease(t *testing queued := bus.Emit(abxbus.NewBaseEvent("PendingVisibilityEvent", map[string]any{"tag": "queued"})) time.Sleep(10 * time.Millisecond) - foundQueued, err := bus.Find("PendingVisibilityEvent", func(event *abxbus.BaseEvent) bool { + foundQueued, err := bus.FindEventName("PendingVisibilityEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["tag"] == "queued" }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -451,7 +451,7 @@ func TestHistoryBackpressureRejectsOverflowAndPreservesFindableHistory(t *testin if _, err := second.Now(); err != nil { t.Fatal(err) } - foundFirst, err := bus.Find("BackpressureEvent", nil, &abxbus.FindOptions{ + foundFirst, err := bus.FindEventName("BackpressureEvent", nil, &abxbus.FindOptions{ Past: true, Future: false, Equals: map[string]any{"value": "first"}, diff --git a/abxbus-go/tests/eventbus_debounce_test.go b/abxbus-go/tests/eventbus_debounce_test.go index a677dbc3..121cc7c8 100644 --- a/abxbus-go/tests/eventbus_debounce_test.go +++ b/abxbus-go/tests/eventbus_debounce_test.go @@ -33,7 +33,7 @@ func TestSimpleDebounceWithChildOfReusesRecentEvent(t *testing.T) { t.Fatal(err) } - found, err := bus.Find("ScreenshotEvent", nil, &abxbus.FindOptions{ + found, err := bus.FindEventName("ScreenshotEvent", nil, &abxbus.FindOptions{ Past: 10.0, Future: false, ChildOf: parent, @@ -75,7 +75,7 @@ func TestReturnsExistingFreshEvent(t *testing.T) { } return time.Since(completedAt) < 5*time.Second } - found, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID1 && isFresh(event) }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -96,7 +96,7 @@ func TestAdvancedDebouncePrefersHistoryThenWaitsFutureThenDispatches(t *testing. errs := make(chan error, 1) go func() { - found, err := bus.Find("SyncEvent", nil, &abxbus.FindOptions{Past: false, Future: 0.5}) + found, err := bus.FindEventName("SyncEvent", nil, &abxbus.FindOptions{Past: false, Future: 0.5}) if err != nil { errs <- err return @@ -110,7 +110,7 @@ func TestAdvancedDebouncePrefersHistoryThenWaitsFutureThenDispatches(t *testing. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - historyMatch, err := bus.Find("SyncEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + historyMatch, err := bus.FindEventName("SyncEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -143,7 +143,7 @@ func TestDispatchesNewWhenNoMatch(t *testing.T) { return "done", nil }, nil) - found, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID1 }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -171,7 +171,7 @@ func TestDispatchesNewWhenStale(t *testing.T) { if _, err := original.Now(); err != nil { t.Fatal(err) } - found, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID1 && false }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -196,7 +196,7 @@ func TestDispatchesNewWhenStale(t *testing.T) { func TestFindPastOnlyReturnsImmediatelyWithoutWaiting(t *testing.T) { bus := abxbus.NewEventBus("DebouncePastOnlyBus", nil) start := time.Now() - result, err := bus.Find("ParentEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + result, err := bus.FindEventName("ParentEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -212,7 +212,7 @@ func TestFindPastOnlyReturnsImmediatelyWithoutWaiting(t *testing.T) { func TestFindPastFloatReturnsImmediatelyWithoutWaiting(t *testing.T) { bus := abxbus.NewEventBus("DebouncePastWindowBus", nil) start := time.Now() - result, err := bus.Find("ParentEvent", nil, &abxbus.FindOptions{Past: 5.0, Future: false}) + result, err := bus.FindEventName("ParentEvent", nil, &abxbus.FindOptions{Past: 5.0, Future: false}) if err != nil { t.Fatal(err) } @@ -237,7 +237,7 @@ func TestOrChainWithoutWaitingFindsExisting(t *testing.T) { } start := time.Now() - found, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID1 }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -262,7 +262,7 @@ func TestOrChainWithoutWaitingDispatchesWhenNoMatch(t *testing.T) { }, nil) start := time.Now() - found, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID1 }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -287,7 +287,7 @@ func TestOrChainMultipleSequentialLookups(t *testing.T) { }, nil) start := time.Now() - found1, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found1, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID1 }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -295,7 +295,7 @@ func TestOrChainMultipleSequentialLookups(t *testing.T) { } result1 := debounceEmitFallback(bus, "ScreenshotEvent", map[string]any{"target_id": debounceTargetID1}, found1) - found2, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found2, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID1 }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -303,7 +303,7 @@ func TestOrChainMultipleSequentialLookups(t *testing.T) { } result2 := debounceEmitFallback(bus, "ScreenshotEvent", map[string]any{"target_id": debounceTargetID1}, found2) - found3, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + found3, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == debounceTargetID2 }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { diff --git a/abxbus-go/tests/eventbus_find_test.go b/abxbus-go/tests/eventbus_find_test.go index 734f1e85..de54a02e 100644 --- a/abxbus-go/tests/eventbus_find_test.go +++ b/abxbus-go/tests/eventbus_find_test.go @@ -8,6 +8,11 @@ import ( abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2" ) +type TypedFindEvent struct { + RequestID string `json:"request_id"` + Count int `json:"count"` +} + func TestFindHistoryAndFuture(t *testing.T) { bus := abxbus.NewEventBus("FindBus", nil) seed := bus.Emit(abxbus.NewBaseEvent("ResponseEvent", map[string]any{"request_id": "abc"})) @@ -15,7 +20,7 @@ func TestFindHistoryAndFuture(t *testing.T) { t.Fatal(err) } - match, err := bus.Find("ResponseEvent", func(e *abxbus.BaseEvent) bool { + match, err := bus.FindEventName("ResponseEvent", func(e *abxbus.BaseEvent) bool { return e.Payload["request_id"] == "abc" }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -29,7 +34,7 @@ func TestFindHistoryAndFuture(t *testing.T) { time.Sleep(20 * time.Millisecond) bus.Emit(abxbus.NewBaseEvent("FutureEvent", map[string]any{"request_id": "future"})) }() - future, err := bus.Find("FutureEvent", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) + future, err := bus.FindEventName("FutureEvent", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) if err != nil { t.Fatal(err) } @@ -38,9 +43,52 @@ func TestFindHistoryAndFuture(t *testing.T) { } } +func TestFindAndFilterDefaultToTypedEvents(t *testing.T) { + bus := abxbus.NewEventBus("TypedFindFilterBus", nil) + first := bus.Emit(TypedFindEvent{RequestID: "one", Count: 1}) + second := bus.Emit(TypedFindEvent{RequestID: "two", Count: 2}) + if _, err := first.Now(); err != nil { + t.Fatal(err) + } + if _, err := second.Now(); err != nil { + t.Fatal(err) + } + + found, err := bus.Find(TypedFindEvent{}, func(payload TypedFindEvent) bool { + return payload.RequestID == "two" + }, &abxbus.FindOptions{Past: true, Future: false}) + if err != nil { + t.Fatal(err) + } + if found == nil || found.EventID != second.EventID { + t.Fatalf("expected typed find to match second event, got %#v", found) + } + + matches, err := bus.Filter(TypedFindEvent{}, func(payload TypedFindEvent) bool { + return payload.Count >= 1 + }, &abxbus.FilterOptions{Past: true, Future: false}) + if err != nil { + t.Fatal(err) + } + if len(matches) != 2 || matches[0].EventID != second.EventID || matches[1].EventID != first.EventID { + t.Fatalf("expected typed filter to return newest-first matches, got %#v", matches) + } +} + +func TestEmitRequiresEventObject(t *testing.T) { + bus := abxbus.NewEventBus("EmitRequiresEventObjectBus", nil) + event := bus.Emit(abxbus.NewBaseEvent("RawStringEvent", map[string]any{"ok": true})) + if _, err := event.Now(); err != nil { + t.Fatal(err) + } + if event.EventType != "RawStringEvent" || event.Payload["ok"] != true { + t.Fatalf("unexpected raw event emission: %#v", event) + } +} + func TestFindReturnsNilWhenNoMatch(t *testing.T) { bus := abxbus.NewEventBus("FindNilBus", nil) - match, err := bus.Find("MissingEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + match, err := bus.FindEventName("MissingEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -55,7 +103,7 @@ func TestFindDefaultPastOnlyNoFutureWait(t *testing.T) { if _, err := seed.Now(); err != nil { t.Fatal(err) } - match, err := bus.Find("DefaultEvent", nil, nil) + match, err := bus.FindEventName("DefaultEvent", nil, nil) if err != nil { t.Fatal(err) } @@ -71,7 +119,7 @@ func TestFindFutureIgnoresPastEvents(t *testing.T) { t.Fatal(err) } - found, err := bus.Find("ParentEvent", nil, &abxbus.FindOptions{Past: false, Future: 0.03}) + found, err := bus.FindEventName("ParentEvent", nil, &abxbus.FindOptions{Past: false, Future: 0.03}) if err != nil { t.Fatal(err) } @@ -83,7 +131,7 @@ func TestFindFutureIgnoresPastEvents(t *testing.T) { func TestFindPastFalseFutureFalseReturnsNilImmediately(t *testing.T) { bus := abxbus.NewEventBus("FindNeitherBus", nil) start := time.Now() - found, err := bus.Find("ParentEvent", nil, &abxbus.FindOptions{Past: false, Future: false}) + found, err := bus.FindEventName("ParentEvent", nil, &abxbus.FindOptions{Past: false, Future: false}) if err != nil { t.Fatal(err) } @@ -104,7 +152,7 @@ func TestFindPastAndFutureWindowsAreIndependent(t *testing.T) { time.Sleep(120 * time.Millisecond) start := time.Now() - found, err := bus.Find("ParentEvent", nil, &abxbus.FindOptions{Past: 0.03, Future: 0.03}) + found, err := bus.FindEventName("ParentEvent", nil, &abxbus.FindOptions{Past: 0.03, Future: 0.03}) if err != nil { t.Fatal(err) } @@ -130,7 +178,7 @@ func TestFindPastWindowAndEqualsFiltering(t *testing.T) { t.Fatal(err) } - recent, err := bus.Find("WindowEvent", nil, &abxbus.FindOptions{Past: 0.5, Future: false, Equals: map[string]any{"event_type": "WindowEvent", "event_status": "completed"}}) + recent, err := bus.FindEventName("WindowEvent", nil, &abxbus.FindOptions{Past: 0.5, Future: false, Equals: map[string]any{"event_type": "WindowEvent", "event_status": "completed"}}) if err != nil { t.Fatal(err) } @@ -138,7 +186,7 @@ func TestFindPastWindowAndEqualsFiltering(t *testing.T) { t.Fatalf("expected past-window filter to return recent event, got %#v", recent) } - equalsMatch, err := bus.Find("WindowEvent", nil, &abxbus.FindOptions{Past: true, Future: false, Equals: map[string]any{"request_id": "new"}}) + equalsMatch, err := bus.FindEventName("WindowEvent", nil, &abxbus.FindOptions{Past: true, Future: false, Equals: map[string]any{"request_id": "new"}}) if err != nil { t.Fatal(err) } @@ -161,7 +209,7 @@ func TestFindSupportsMetadataAndPayloadEqualityFilters(t *testing.T) { } } - foundA, err := bus.Find("FieldFilterEvent", nil, &abxbus.FindOptions{ + foundA, err := bus.FindEventName("FieldFilterEvent", nil, &abxbus.FindOptions{ Past: true, Future: false, Equals: map[string]any{ @@ -179,7 +227,7 @@ func TestFindSupportsMetadataAndPayloadEqualityFilters(t *testing.T) { t.Fatalf("expected metadata and payload filters to match event A, got %#v", foundA) } - mismatch, err := bus.Find("FieldFilterEvent", nil, &abxbus.FindOptions{ + mismatch, err := bus.FindEventName("FieldFilterEvent", nil, &abxbus.FindOptions{ Past: true, Future: false, Equals: map[string]any{ @@ -194,7 +242,7 @@ func TestFindSupportsMetadataAndPayloadEqualityFilters(t *testing.T) { t.Fatalf("expected mismatched metadata filters to return nil, got %#v", mismatch) } - foundPayload, err := bus.Find("FieldFilterEvent", nil, &abxbus.FindOptions{ + foundPayload, err := bus.FindEventName("FieldFilterEvent", nil, &abxbus.FindOptions{ Past: true, Future: false, Equals: map[string]any{ @@ -222,7 +270,7 @@ func TestFindWherePredicateAndBusScopedHistory(t *testing.T) { t.Fatal(err) } - foundA, err := busA.Find("ScopedEvent", func(event *abxbus.BaseEvent) bool { + foundA, err := busA.FindEventName("ScopedEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["source"] == "A" && event.Payload["value"] == 1 }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -232,7 +280,7 @@ func TestFindWherePredicateAndBusScopedHistory(t *testing.T) { t.Fatalf("expected bus A to find only its own event, got %#v", foundA) } - foundB, err := busB.Find("ScopedEvent", func(event *abxbus.BaseEvent) bool { + foundB, err := busB.FindEventName("ScopedEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["source"] == "B" }, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { @@ -281,7 +329,7 @@ func TestFindChildOfFilteringAndLineageTraversal(t *testing.T) { t.Fatal("event should not be child of itself") } - found, err := bus.Find("Grandchild", nil, &abxbus.FindOptions{Past: true, Future: false, ChildOf: parent}) + found, err := bus.FindEventName("Grandchild", nil, &abxbus.FindOptions{Past: true, Future: false, ChildOf: parent}) if err != nil { t.Fatal(err) } @@ -308,7 +356,7 @@ func TestFindCanSeeInProgressEventInHistory(t *testing.T) { t.Fatal("timed out waiting for slow handler start") } - match, err := bus.Find("SlowFindEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + match, err := bus.FindEventName("SlowFindEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { close(release) t.Fatal(err) @@ -347,7 +395,7 @@ func TestFindFutureIgnoresAlreadyDispatchedInFlightEventsWhenPastFalse(t *testin t.Fatal("timed out waiting for in-flight event") } - match, err := bus.Find("FutureInflightEvent", nil, &abxbus.FindOptions{Past: false, Future: 0.03}) + match, err := bus.FindEventName("FutureInflightEvent", nil, &abxbus.FindOptions{Past: false, Future: 0.03}) close(release) if err != nil { t.Fatal(err) @@ -379,7 +427,7 @@ func TestFindFutureResolvesOnDispatchBeforeHandlersComplete(t *testing.T) { time.Sleep(20 * time.Millisecond) bus.Emit(abxbus.NewBaseEvent("DispatchVisibleEvent", nil)) }() - match, err := bus.Find("DispatchVisibleEvent", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) + match, err := bus.FindEventName("DispatchVisibleEvent", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) if err != nil { close(release) t.Fatal(err) @@ -412,7 +460,7 @@ func TestMultipleConcurrentFutureFindWaitersResolveCorrectEvents(t *testing.T) { errs := make(chan error, 2) go func() { - event, err := bus.Find("ConcurrentFindA", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) + event, err := bus.FindEventName("ConcurrentFindA", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) if err != nil { errs <- err return @@ -420,7 +468,7 @@ func TestMultipleConcurrentFutureFindWaitersResolveCorrectEvents(t *testing.T) { resultA <- event }() go func() { - event, err := bus.Find("ConcurrentFindB", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) + event, err := bus.FindEventName("ConcurrentFindB", nil, &abxbus.FindOptions{Past: false, Future: 1.0}) if err != nil { errs <- err return @@ -475,7 +523,7 @@ func TestMaxHistorySizeZeroDisablesPastSearchButFutureFindStillResolves(t *testi if bus.EventHistory.Size() != 0 { t.Fatalf("zero history should drop completed event, got size=%d", bus.EventHistory.Size()) } - past, err := bus.Find("ZeroHistoryEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + past, err := bus.FindEventName("ZeroHistoryEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -487,7 +535,7 @@ func TestMaxHistorySizeZeroDisablesPastSearchButFutureFindStillResolves(t *testi time.Sleep(20 * time.Millisecond) bus.Emit(abxbus.NewBaseEvent("ZeroHistoryEvent", map[string]any{"value": "future"})) }() - future, err := bus.Find("ZeroHistoryEvent", func(event *abxbus.BaseEvent) bool { + future, err := bus.FindEventName("ZeroHistoryEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["value"] == "future" }, &abxbus.FindOptions{Past: false, Future: 1.0}) if err != nil { @@ -515,12 +563,12 @@ func TestFindReturnsFirstFilterResult(t *testing.T) { t.Fatal(err) } - found, err := bus.Find("ParentEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + found, err := bus.FindEventName("ParentEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } limit := 1 - filtered, err := bus.Filter("ParentEvent", nil, &abxbus.FilterOptions{Past: true, Future: false, Limit: &limit}) + filtered, err := bus.FilterEventName("ParentEvent", nil, &abxbus.FilterOptions{Past: true, Future: false, Limit: &limit}) if err != nil { t.Fatal(err) } @@ -543,7 +591,7 @@ func TestFindSupportsPayloadFieldNamedLimitViaEquals(t *testing.T) { t.Fatal(err) } - match, err := bus.Find("LimitFieldEvent", nil, &abxbus.FindOptions{ + match, err := bus.FindEventName("LimitFieldEvent", nil, &abxbus.FindOptions{ Past: true, Future: false, Equals: map[string]any{"limit": 5}, @@ -564,7 +612,7 @@ func TestFilterLimitZeroAndNegativeReturnImmediatelyWithoutFutureWait(t *testing t.Cleanup(bus.Destroy) for _, limit := range []int{0, -1} { start := time.Now() - matches, err := bus.Filter("NeverDispatched", nil, &abxbus.FilterOptions{Past: false, Future: 1.0, Limit: &limit}) + matches, err := bus.FilterEventName("NeverDispatched", nil, &abxbus.FilterOptions{Past: false, Future: 1.0, Limit: &limit}) if err != nil { t.Fatal(err) } @@ -581,7 +629,7 @@ func TestFilterFutureOnlyTimesOutToEmptyList(t *testing.T) { bus := abxbus.NewEventBus("FilterFutureTimeoutBus", nil) t.Cleanup(bus.Destroy) start := time.Now() - matches, err := bus.Filter("MissingFutureFilterEvent", nil, &abxbus.FilterOptions{Past: false, Future: 0.03}) + matches, err := bus.FilterEventName("MissingFutureFilterEvent", nil, &abxbus.FilterOptions{Past: false, Future: 0.03}) if err != nil { t.Fatal(err) } @@ -596,7 +644,7 @@ func TestFilterFutureOnlyTimesOutToEmptyList(t *testing.T) { func TestFilterReturnsEmptyArrayWhenNoMatches(t *testing.T) { bus := abxbus.NewEventBus("FilterEmptyBus", nil) - matches, err := bus.Filter("ParentEvent", nil, &abxbus.FilterOptions{Past: true, Future: false}) + matches, err := bus.FilterEventName("ParentEvent", nil, &abxbus.FilterOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -617,7 +665,7 @@ func TestFilterReturnsPastMatchesNewestFirstAndRespectsLimit(t *testing.T) { } limit := 2 - matches, err := bus.Filter("Work", nil, &abxbus.FilterOptions{Past: true, Future: false, Limit: &limit}) + matches, err := bus.FilterEventName("Work", nil, &abxbus.FilterOptions{Past: true, Future: false, Limit: &limit}) if err != nil { t.Fatal(err) } @@ -637,7 +685,7 @@ func TestFilterRespectsWherePredicateNewestFirst(t *testing.T) { } } - matches, err := bus.Filter("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { + matches, err := bus.FilterEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["target_id"] == "same" }, &abxbus.FilterOptions{Past: true, Future: false}) if err != nil { @@ -659,7 +707,7 @@ func TestFilterWildcardMatchesAllEventTypesNewestFirst(t *testing.T) { t.Fatal(err) } - matches, err := bus.Filter("*", nil, &abxbus.FilterOptions{Past: true, Future: false}) + matches, err := bus.FilterEventName("*", nil, &abxbus.FilterOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -680,7 +728,7 @@ func TestFilterPastWindowFiltersByAge(t *testing.T) { t.Fatal(err) } - matches, err := bus.Filter("ParentEvent", nil, &abxbus.FilterOptions{Past: 0.1, Future: false}) + matches, err := bus.FilterEventName("ParentEvent", nil, &abxbus.FilterOptions{Past: 0.1, Future: false}) if err != nil { t.Fatal(err) } @@ -700,7 +748,7 @@ func TestFilterFutureAppendsMatchAfterPastResults(t *testing.T) { time.Sleep(20 * time.Millisecond) bus.Emit(abxbus.NewBaseEvent("ParentEvent", nil)) }() - matches, err := bus.Filter("ParentEvent", nil, &abxbus.FilterOptions{Past: true, Future: 0.5}) + matches, err := bus.FilterEventName("ParentEvent", nil, &abxbus.FilterOptions{Past: true, Future: 0.5}) if err != nil { t.Fatal(err) } @@ -726,7 +774,7 @@ func TestFilterSupportsWhereEqualsWildcardChildAndFuture(t *testing.T) { } bus.Emit(abxbus.NewBaseEvent("Other", map[string]any{"kind": "target"})) - childMatches, err := bus.Filter("*", func(event *abxbus.BaseEvent) bool { + childMatches, err := bus.FilterEventName("*", func(event *abxbus.BaseEvent) bool { return event.Payload["kind"] == "target" }, &abxbus.FilterOptions{Past: true, Future: false, ChildOf: parent, Equals: map[string]any{"kind": "target"}}) if err != nil { @@ -740,7 +788,7 @@ func TestFilterSupportsWhereEqualsWildcardChildAndFuture(t *testing.T) { time.Sleep(20 * time.Millisecond) bus.Emit(abxbus.NewBaseEvent("FutureWork", map[string]any{"kind": "future"})) }() - futureMatches, err := bus.Filter("FutureWork", nil, &abxbus.FilterOptions{ + futureMatches, err := bus.FilterEventName("FutureWork", nil, &abxbus.FilterOptions{ Past: false, Future: 1.0, Equals: map[string]any{"kind": "future"}, @@ -752,7 +800,7 @@ func TestFilterSupportsWhereEqualsWildcardChildAndFuture(t *testing.T) { t.Fatalf("expected one future match, got %#v", futureMatches) } - none, err := bus.Filter("Missing", nil, &abxbus.FilterOptions{Past: false, Future: false}) + none, err := bus.FilterEventName("Missing", nil, &abxbus.FilterOptions{Past: false, Future: false}) if err != nil { t.Fatal(err) } @@ -775,7 +823,7 @@ func TestFilterSupportsMetadataEqualityAndFutureLimitShortCircuit(t *testing.T) } } - matches, err := bus.Filter("NumberedEvent", nil, &abxbus.FilterOptions{ + matches, err := bus.FilterEventName("NumberedEvent", nil, &abxbus.FilterOptions{ Past: true, Future: false, Equals: map[string]any{ @@ -792,7 +840,7 @@ func TestFilterSupportsMetadataEqualityAndFutureLimitShortCircuit(t *testing.T) limit := 1 start := time.Now() - limited, err := bus.Filter("NumberedEvent", nil, &abxbus.FilterOptions{Past: true, Future: 2.0, Limit: &limit}) + limited, err := bus.FilterEventName("NumberedEvent", nil, &abxbus.FilterOptions{Past: true, Future: 2.0, Limit: &limit}) if err != nil { t.Fatal(err) } diff --git a/abxbus-go/tests/eventbus_forwarding_test.go b/abxbus-go/tests/eventbus_forwarding_test.go index db2d4ae6..f71931fb 100644 --- a/abxbus-go/tests/eventbus_forwarding_test.go +++ b/abxbus-go/tests/eventbus_forwarding_test.go @@ -33,11 +33,11 @@ func TestEventsForwardBetweenBusesWithoutDuplication(t *testing.T) { seenC = append(seenC, event.EventID) return "c", nil }, nil) - busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busA.OnEventName("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { busB.Emit(event) return nil, nil }, nil) - busB.On("*", "forward_to_c", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busB.OnEventName("*", "forward_to_c", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { busC.Emit(event) return nil, nil }, nil) @@ -85,11 +85,11 @@ func TestTreeLevelHierarchyBubbling(t *testing.T) { seenSubchild = append(seenSubchild, event.EventID) return nil, nil }, nil) - childBus.On("*", "forward_to_parent", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + childBus.OnEventName("*", "forward_to_parent", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { parentBus.Emit(event) return nil, nil }, nil) - subchildBus.On("*", "forward_to_child", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + subchildBus.OnEventName("*", "forward_to_child", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { childBus.Emit(event) return nil, nil }, nil) @@ -142,7 +142,7 @@ func TestForwardingDisambiguatesBusesThatShareTheSameName(t *testing.T) { seenB = append(seenB, event.EventID) return "b", nil }, nil) - busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busA.OnEventName("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { busB.Emit(event) return nil, nil }, nil) @@ -196,11 +196,11 @@ func TestAwaitEventNowWaitsForHandlersOnForwardedBuses(t *testing.T) { record("C") return "c", nil }, nil) - busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busA.OnEventName("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { busB.Emit(event) return nil, nil }, nil) - busB.On("*", "forward_to_c", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busB.OnEventName("*", "forward_to_c", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { busC.Emit(event) return nil, nil }, nil) @@ -305,7 +305,7 @@ func TestAwaitEventNowWaitsWhenForwardingHandlerIsAsyncDelayed(t *testing.T) { busBDone = true return nil, nil }, nil) - busA.On("*", "delayed_forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busA.OnEventName("*", "delayed_forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { time.Sleep(30 * time.Millisecond) busB.Emit(event) return nil, nil @@ -340,7 +340,7 @@ func TestForwardingSameEventDoesNotSetSelfParentID(t *testing.T) { target.On("SelfParentForwardEvent", "target_handler", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { return "target-ok", nil }, nil) - origin.On("*", "forward_to_target", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + origin.OnEventName("*", "forward_to_target", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { target.Emit(event) return nil, nil }, nil) @@ -546,7 +546,7 @@ func TestForwardedFirstModeUsesProcessingBusHandlerConcurrencyDefaults(t *testin log = append(log, v) } - busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busA.OnEventName("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { busB.Emit(event) return nil, nil }, nil) @@ -684,15 +684,15 @@ func registerCycle(t *testing.T, peer1, peer2, peer3 *abxbus.EventBus) (*[]strin seen3 = append(seen3, event.EventID) return "p3", nil }, nil) - peer1.On("*", "forward_to_2", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + peer1.OnEventName("*", "forward_to_2", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { peer2.Emit(event) return nil, nil }, nil) - peer2.On("*", "forward_to_3", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + peer2.OnEventName("*", "forward_to_3", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { peer3.Emit(event) return nil, nil }, nil) - peer3.On("*", "forward_to_1", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + peer3.OnEventName("*", "forward_to_1", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { peer1.Emit(event) return nil, nil }, nil) diff --git a/abxbus-go/tests/eventbus_on_off_test.go b/abxbus-go/tests/eventbus_on_off_test.go index ed47f29b..da1d45cd 100644 --- a/abxbus-go/tests/eventbus_on_off_test.go +++ b/abxbus-go/tests/eventbus_on_off_test.go @@ -21,7 +21,7 @@ func TestOnOffByEntryByIDAndRemoveAll(t *testing.T) { eventCalls.Add(1) return "h2", nil }, nil) - bus.On("*", "all", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { + bus.OnEventName("*", "all", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { wildcardCalls.Add(1) return "all", nil }, nil) diff --git a/abxbus-go/tests/eventbus_test.go b/abxbus-go/tests/eventbus_test.go index 4f23796c..00ba9ab9 100644 --- a/abxbus-go/tests/eventbus_test.go +++ b/abxbus-go/tests/eventbus_test.go @@ -1153,7 +1153,7 @@ func TestEventBusMiddlewareHooksCoverStringAndWildcardPatterns(t *testing.T) { stringHandler := bus.On("MiddlewarePatternEvent", "string", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { return "string:" + event.EventType, nil }, nil) - wildcardHandler := bus.On("*", "wildcard", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + wildcardHandler := bus.OnEventName("*", "wildcard", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { return "wildcard:" + event.EventType, nil }, nil) @@ -1283,7 +1283,7 @@ func TestDestroyClearFalsePreservesHandlersAndHistoryResolvesWaitersAndIsTermina bus.DestroyWithOptions(&abxbus.EventBusDestroyOptions{Clear: false}) close(destroyed) }() - match, err := bus.Find("NeverHappens", nil, &abxbus.FindOptions{Past: false, Future: true}) + match, err := bus.FindEventName("NeverHappens", nil, &abxbus.FindOptions{Past: false, Future: true}) if err != nil { t.Fatal(err) } @@ -1336,7 +1336,11 @@ func TestDestroyClearFalsePreservesHandlersAndHistoryResolvesWaitersAndIsTermina return nil, nil }, nil) }) - if _, err := bus.Find("Evt", nil, nil); !errors.Is(err, abxbus.ErrEventBusDestroyed) { + assertDestroyedPanic("On", func() { + type ClearFalseDestroyedEvent struct{} + bus.On(func(payload ClearFalseDestroyedEvent) {}) + }) + if _, err := bus.FindEventName("Evt", nil, nil); !errors.Is(err, abxbus.ErrEventBusDestroyed) { t.Fatalf("Find should reject with ErrEventBusDestroyed, got %v", err) } } @@ -1483,11 +1487,15 @@ func TestDestroyDefaultClearIsTerminalAndFreesBusState(t *testing.T) { assertDestroyedPanic("On", func() { bus.On("Evt", "new", func(e *abxbus.BaseEvent, ctx context.Context) (any, error) { return nil, nil }, nil) }) + assertDestroyedPanic("On", func() { + type TerminalDestroyedEvent struct{} + bus.On(func(payload TerminalDestroyedEvent) {}) + }) - if _, err := bus.Find("Evt", nil, nil); !errors.Is(err, abxbus.ErrEventBusDestroyed) { + if _, err := bus.FindEventName("Evt", nil, nil); !errors.Is(err, abxbus.ErrEventBusDestroyed) { t.Fatalf("Find should reject with ErrEventBusDestroyed, got %v", err) } - if _, err := bus.Filter("Evt", nil, nil); !errors.Is(err, abxbus.ErrEventBusDestroyed) { + if _, err := bus.FilterEventName("Evt", nil, nil); !errors.Is(err, abxbus.ErrEventBusDestroyed) { t.Fatalf("Filter should reject with ErrEventBusDestroyed, got %v", err) } } @@ -1503,7 +1511,7 @@ func TestDestroyingOneBusDoesNotBreakSharedHandlersOrForwardTargets(t *testing.T return "shared", nil } source.On("SharedDestroyEvent", "shared_source", shared, nil) - source.On("*", "forward", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + source.OnEventName("*", "forward", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { return target.Emit(event), nil }, nil) target.On("SharedDestroyEvent", "shared_target", shared, nil) diff --git a/abxbus-go/tests/eventbus_timeout_test.go b/abxbus-go/tests/eventbus_timeout_test.go index 1463a848..4fd2b763 100644 --- a/abxbus-go/tests/eventbus_timeout_test.go +++ b/abxbus-go/tests/eventbus_timeout_test.go @@ -249,7 +249,7 @@ func TestHandlerTimeoutIgnoresLateHandlerResultAndLateEmits(t *testing.T) { if slowResult.Result != nil { t.Fatalf("slow handler late result should be ignored, got %#v", slowResult.Result) } - found, err := bus.Find("LateAfterTimeoutEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + found, err := bus.FindEventName("LateAfterTimeoutEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -302,7 +302,7 @@ func TestEventTimeoutIgnoresLateHandlerResultAndLateEmits(t *testing.T) { if slowResult.Result != nil { t.Fatalf("slow handler late result should be ignored, got %#v", slowResult.Result) } - found, err := bus.Find("LateAfterEventTimeoutEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) + found, err := bus.FindEventName("LateAfterEventTimeoutEvent", nil, &abxbus.FindOptions{Past: true, Future: false}) if err != nil { t.Fatal(err) } @@ -636,7 +636,7 @@ func TestForwardedTimeoutPathDoesNotStallFollowupEvents(t *testing.T) { busATailRuns++ return "tail_a", nil }, nil) - busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + busA.OnEventName("*", "forward_to_b", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { return busB.Emit(event), nil }, nil) busB.On("TimeoutRecoveryChildEvent", "slow_child", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { diff --git a/abxbus-go/tests/roundtrip_cli/main.go b/abxbus-go/tests/roundtrip_cli/main.go index 854477de..a3e6b6e6 100644 --- a/abxbus-go/tests/roundtrip_cli/main.go +++ b/abxbus-go/tests/roundtrip_cli/main.go @@ -104,7 +104,7 @@ func runJSONLListener(configPath string) { received := make(chan struct{}) receivedOnce := sync.Once{} - bridge.On("*", "capture", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { + bridge.OnEventName("*", "capture", func(event *abxbus.BaseEvent, ctx context.Context) (any, error) { data, err := json.Marshal(event) if err != nil { return nil, err diff --git a/abxbus-go/typed.go b/abxbus-go/typed.go index 955d5539..e4dd9495 100644 --- a/abxbus-go/typed.go +++ b/abxbus-go/typed.go @@ -1,14 +1,11 @@ package abxbus import ( - "context" "encoding/json" "fmt" "reflect" "regexp" "strings" - - "github.com/ArchiveBox/abxbus/abxbus-go/v2/jsonschema" ) func Event[T any](payload T) (*BaseEvent, error) { @@ -166,7 +163,7 @@ func normalizeReflectValue(value reflect.Value) any { return value.Interface() } -func NewTypedEvent[T any](eventType string, payload T) (*BaseEvent, error) { +func newEventFromPayload[T any](eventType string, payload T) (*BaseEvent, error) { normalized := map[string]any{} data, err := json.Marshal(payload) if err != nil { @@ -180,155 +177,35 @@ func NewTypedEvent[T any](eventType string, payload T) (*BaseEvent, error) { return NewBaseEvent(eventType, normalized), nil } -func NewTypedEventWithResult[TPayload any, TResult any](eventType string, payload TPayload) (*BaseEvent, error) { - event, err := NewTypedEvent(eventType, payload) - if err != nil { - return nil, err +type EventOption func(*BaseEvent) + +func ResultType[T any]() EventOption { + return func(event *BaseEvent) { + event.EventResultType = JSONSchemaFor[T]() } - event.EventResultType = JSONSchemaFor[TResult]() - return event, nil } -func MustNewTypedEvent[T any](eventType string, payload T) *BaseEvent { - event, err := NewTypedEvent(eventType, payload) +func NewEvent[T any](eventType string, payload T, options ...EventOption) (*BaseEvent, error) { + event, err := newEventFromPayload(eventType, payload) if err != nil { - panic(err) + return nil, err } - return event + for _, option := range options { + if option != nil { + option(event) + } + } + return event, nil } -func MustNewTypedEventWithResult[TPayload any, TResult any](eventType string, payload TPayload) *BaseEvent { - event, err := NewTypedEventWithResult[TPayload, TResult](eventType, payload) +func MustNewEvent[T any](eventType string, payload T, options ...EventOption) *BaseEvent { + event, err := NewEvent(eventType, payload, options...) if err != nil { panic(err) } return event } -type TypedEventHandler[TPayload any, TResult any] interface { - ~func(TPayload) (TResult, error) | - ~func(TPayload, context.Context) (TResult, error) | - ~func(TPayload) TResult | - ~func(TPayload, context.Context) TResult | - ~func(TPayload) error | - ~func(TPayload, context.Context) error | - ~func(TPayload) | - ~func(TPayload, context.Context) -} - -func OnTyped[TPayload any, TResult any, THandler TypedEventHandler[TPayload, TResult]]( - bus *EventBus, - eventPattern string, - handlerName string, - handler THandler, - options *EventHandler, -) *EventHandler { - payloadSchema := JSONSchemaFor[TPayload]() - return bus.On(eventPattern, handlerName, func(event *BaseEvent, ctx context.Context) (any, error) { - if err := jsonschema.Validate(payloadSchema, event.Payload); err != nil { - var zero TResult - return zero, fmt.Errorf("EventHandlerPayloadSchemaError: Event payload did not match declared handler payload type: %w", err) - } - payload, err := EventPayloadAs[TPayload](event) - if err != nil { - var zero TResult - return zero, err - } - switch typed := any(handler).(type) { - case func(TPayload) (TResult, error): - return typed(payload) - case func(TPayload, context.Context) (TResult, error): - return typed(payload, ctx) - case func(TPayload) TResult: - return typed(payload), nil - case func(TPayload, context.Context) TResult: - return typed(payload, ctx), nil - case func(TPayload) error: - var zero TResult - return zero, typed(payload) - case func(TPayload, context.Context) error: - var zero TResult - return zero, typed(payload, ctx) - case func(TPayload): - typed(payload) - var zero TResult - return zero, nil - case func(TPayload, context.Context): - typed(payload, ctx) - var zero TResult - return zero, nil - default: - value := reflect.ValueOf(handler) - var zero TResult - if !value.IsValid() || value.Kind() != reflect.Func || value.IsNil() { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - handlerType := value.Type() - if handlerType.NumIn() != 1 && handlerType.NumIn() != 2 { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - payloadValue, ok := reflectValueForTypedPayload(payload, handlerType.In(0)) - if !ok { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - withContext := handlerType.NumIn() == 2 - if withContext && handlerType.In(1) != contextInterfaceType { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - if handlerType.NumOut() > 2 { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - args := []reflect.Value{payloadValue} - if withContext { - args = append(args, reflect.ValueOf(ctx)) - } - results := value.Call(args) - switch len(results) { - case 0: - return zero, nil - case 1: - if results[0].Type().Implements(errorInterfaceType) { - if reflectValueIsNil(results[0]) { - return zero, nil - } - return zero, results[0].Interface().(error) - } - typedResult, ok := results[0].Interface().(TResult) - if !ok { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - return typedResult, nil - default: - if !results[1].Type().Implements(errorInterfaceType) { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - var err error - if !reflectValueIsNil(results[1]) { - err = results[1].Interface().(error) - } - typedResult, ok := results[0].Interface().(TResult) - if !ok { - return zero, fmt.Errorf("unsupported typed handler signature: %T", handler) - } - return typedResult, err - } - } - }, options) -} - -func reflectValueForTypedPayload(payload any, targetType reflect.Type) (reflect.Value, bool) { - payloadValue := reflect.ValueOf(payload) - if payloadValue.IsValid() { - return payloadValue, payloadValue.Type().AssignableTo(targetType) - } - switch targetType.Kind() { - case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Pointer, reflect.Slice: - return reflect.Zero(targetType), true - default: - return reflect.Value{}, false - } -} - func EventPayloadAs[T any](event *BaseEvent) (T, error) { var payload T if event == nil { diff --git a/abxbus-go/version.go b/abxbus-go/version.go index e99ade2e..f815a07e 100644 --- a/abxbus-go/version.go +++ b/abxbus-go/version.go @@ -1,3 +1,3 @@ package abxbus -const Version = "2.5.0" +const Version = "2.5.2" diff --git a/abxbus-rust/Cargo.lock b/abxbus-rust/Cargo.lock index 9a5f1edd..4721daf1 100644 --- a/abxbus-rust/Cargo.lock +++ b/abxbus-rust/Cargo.lock @@ -4,7 +4,7 @@ version = 4 [[package]] name = "abxbus-rust" -version = "2.5.0" +version = "2.5.2" dependencies = [ "chrono", "dcontext", diff --git a/abxbus-rust/Cargo.toml b/abxbus-rust/Cargo.toml index 727de697..73714d5f 100644 --- a/abxbus-rust/Cargo.toml +++ b/abxbus-rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "abxbus-rust" -version = "2.5.0" +version = "2.5.2" edition = "2021" license = "MIT" description = "Rust implementation of the abxbus Event Bus library" diff --git a/abxbus-ts/package.json b/abxbus-ts/package.json index b6ddd328..73a065e8 100644 --- a/abxbus-ts/package.json +++ b/abxbus-ts/package.json @@ -1,6 +1,6 @@ { "name": "abxbus", - "version": "2.5.0", + "version": "2.5.2", "description": "Event bus library for browsers and ESM Node.js", "type": "module", "sideEffects": false, diff --git a/abxbus-ts/src/BaseEvent.ts b/abxbus-ts/src/BaseEvent.ts index 9aafbf86..a7b52977 100644 --- a/abxbus-ts/src/BaseEvent.ts +++ b/abxbus-ts/src/BaseEvent.ts @@ -161,6 +161,7 @@ type ResultTypeFromEventResultTypeInput = TInput extends z.ZodTypeAny : unknown type ResultSchemaFromShape = TShape extends { event_result_type: infer S } ? ResultTypeFromEventResultTypeInput : unknown +type ResultSchemaFromEventSchema = TSchema extends z.ZodObject ? ResultSchemaFromShape : unknown export type EventResultInclude = ( result: EventResult['result'], event_result: EventResult @@ -465,7 +466,10 @@ export class BaseEvent { // main entry point for users to define their own event types // BaseEvent.extend("MyEvent", { some_custom_field: z.string(), event_result_type: z.string(), event_timeout: 25, ... }) -> MyEvent - static extend>(event_type: string, event_schema: TSchema): SchemaEventFactory + static extend>( + event_type: string, + event_schema: TSchema + ): SchemaEventFactory> static extend(event_type: string, shape?: TShape): EventFactory> static extend>( event_type: string, diff --git a/abxbus-ts/src/type_inference.test.ts b/abxbus-ts/src/type_inference.test.ts index 6b52a15d..dd02cb16 100644 --- a/abxbus-ts/src/type_inference.test.ts +++ b/abxbus-ts/src/type_inference.test.ts @@ -17,8 +17,18 @@ const InferableResultEvent = BaseEvent.extend('InferableResultEvent', { event_result_type: z.object({ ok: z.boolean() }), }) +const InferableZodObjectResultEvent = BaseEvent.extend( + 'InferableZodObjectResultEvent', + z.object({ + target_id: z.string(), + event_result_type: z.object({ ok: z.boolean() }), + }) +) + type InferableResult = EventResultType> type _assert_inferable_result = Assert> +type InferableZodObjectResult = EventResultType> +type _assert_inferable_zod_object_result = Assert> type InferableEventResultEntry = InstanceType['event_results'] extends Map ? TResultEntry : never type _assert_inferable_event_result_entry = Assert< @@ -96,6 +106,14 @@ bus.on(InferableResultEvent, () => undefined) // @ts-expect-error non-void return must match event_result_type for inferable event keys bus.on(InferableResultEvent, () => 'not-ok') +bus.on(InferableZodObjectResultEvent, (event) => { + const target: string = event.target_id + return { ok: target.length > 0 } +}) + +// @ts-expect-error z.object event_result_type must also enforce handler return shape +bus.on(InferableZodObjectResultEvent, () => 'not-ok') + // String/wildcard keys remain best-effort and do not strongly enforce return shapes. bus.on('InferableResultEvent', () => 'anything') bus.on('*', () => 123) diff --git a/docs/api/baseevent.mdx b/docs/api/baseevent.mdx index e45e5dbb..acdcddea 100644 --- a/docs/api/baseevent.mdx +++ b/docs/api/baseevent.mdx @@ -40,15 +40,13 @@ const FooCreateEvent = BaseEvent.extend('FooCreateEvent', { ```go -type FooPayload struct { +type FooCreateEvent struct { ID *string `json:"id,omitempty"` Name string `json:"name"` Age int `json:"age"` } -type FooResult string - -event, err := abxbus.NewBaseEventWithResult[FooPayload, FooResult]("FooCreateEvent", FooPayload{ +event, err := abxbus.Event(FooCreateEvent{ Name: "Ada", Age: 37, }) @@ -149,7 +147,7 @@ const value = await completed.eventResult() ```go type MyEvent struct{} -pending := bus.Emit(abxbus.MustNewTypedEvent[MyEvent]("MyEvent", MyEvent{})) +pending := bus.Emit(MyEvent{}) completed, err := pending.Now() if err != nil { return err diff --git a/docs/api/eventbus.mdx b/docs/api/eventbus.mdx index b5413c47..33de2688 100644 --- a/docs/api/eventbus.mdx +++ b/docs/api/eventbus.mdx @@ -155,7 +155,7 @@ bus.on('*', wildcardHandler) ```go bus.On("UserEvent", "handler", handler, nil) -bus.On("*", "wildcard_handler", wildcardHandler, nil) +bus.OnEventName("*", "wildcard_handler", wildcardHandler, nil) timeout := 5.0 bus.On("UserEvent", "custom", handler, &abxbus.EventHandler{ HandlerTimeout: &timeout, @@ -266,7 +266,11 @@ const result = await event.now({ first_result: true }).eventResult() ```go -event := bus.Emit(abxbus.NewBaseEvent("MyEvent", map[string]any{"data": "x"})) +type MyEvent struct { + Data string `json:"data"` +} + +event := bus.Emit(MyEvent{Data: "x"}) result, err := event.EventResult() if err != nil { return err @@ -321,12 +325,12 @@ const child = await bus.find(ChildEvent, { child_of: parentEvent, future: 5 }) ```go -event, err := bus.Find("ResponseEvent", nil, nil) -future, err := bus.Find("ResponseEvent", nil, &abxbus.FindOptions{ +event, err := bus.FindEventName("ResponseEvent", nil, nil) +future, err := bus.FindEventName("ResponseEvent", nil, &abxbus.FindOptions{ Past: false, Future: 5.0, }) -child, err := bus.Find("ChildEvent", nil, &abxbus.FindOptions{ +child, err := bus.FindEventName("ChildEvent", nil, &abxbus.FindOptions{ ChildOf: parentEvent, Future: 5.0, }) @@ -379,13 +383,13 @@ const recent = await bus.filter(ResponseEvent, { past: 10, future: false, limit: ```go limit := 5 -recent, err := bus.Filter("ResponseEvent", nil, &abxbus.FilterOptions{ +recent, err := bus.FilterEventName("ResponseEvent", nil, &abxbus.FilterOptions{ Past: 10.0, Future: false, Limit: &limit, }) -recentWithPredicate, err := bus.Filter( +recentWithPredicate, err := bus.FilterEventName( "ResponseEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["ok"] == true @@ -597,7 +601,10 @@ await bus.destroy({ clear: true }) ```go type ResumeTickEvent struct{} -resumeTick := abxbus.MustNewTypedEvent[ResumeTickEvent]("ResumeTickEvent", ResumeTickEvent{}) +resumeTick, err := abxbus.Event(ResumeTickEvent{}) +if err != nil { + panic(err) +} // 1) Serialize full bus state payload, err := bus.ToJSON() diff --git a/docs/api/eventhandler.mdx b/docs/api/eventhandler.mdx index 2aab8a37..8903aab7 100644 --- a/docs/api/eventhandler.mdx +++ b/docs/api/eventhandler.mdx @@ -40,7 +40,10 @@ bus.off(MyEvent, entry) ```go -entry := bus.On("MyEvent", "handler", handler, nil) +type MyEvent struct{} + +handler := func(event MyEvent) {} +entry := bus.On(handler) bus.Off("MyEvent", entry) ``` diff --git a/docs/features/event-debouncing.mdx b/docs/features/event-debouncing.mdx index 7fed4238..eb7a627f 100644 --- a/docs/features/event-debouncing.mdx +++ b/docs/features/event-debouncing.mdx @@ -101,7 +101,7 @@ let result = block_on(event.event_result())?; ```go -existing, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { +existing, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["url"] == url }, &abxbus.FindOptions{ Past: 10.0, // look back 10s @@ -186,7 +186,7 @@ let result = block_on(event.event_result())?; ```go -inFlight, err := bus.Find("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { +inFlight, err := bus.FindEventName("ScreenshotEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["url"] == url }, &abxbus.FindOptions{ Past: false, // skip history @@ -285,12 +285,12 @@ sameURL := func(event *abxbus.BaseEvent) bool { return event.Payload["url"] == url } -event, err := bus.Find("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: 10.0, Future: false}) +event, err := bus.FindEventName("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: 10.0, Future: false}) if err != nil { panic(err) } if event == nil { - event, err = bus.Find("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: false, Future: 2.0}) + event, err = bus.FindEventName("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: false, Future: 2.0}) if err != nil { panic(err) } @@ -385,12 +385,12 @@ func emitDebouncedScreenshot(ctx context.Context, bus *abxbus.EventBus, url stri return event.Payload["url"] == url } - event, err := bus.Find("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: 15.0, Future: false}) + event, err := bus.FindEventName("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: 15.0, Future: false}) if err != nil { return nil, err } if event == nil { - event, err = bus.Find("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: false, Future: 3.0}) + event, err = bus.FindEventName("ScreenshotEvent", sameURL, &abxbus.FindOptions{Past: false, Future: 3.0}) if err != nil { return nil, err } diff --git a/docs/features/event-pattern-matching.mdx b/docs/features/event-pattern-matching.mdx index c2bab860..af77031d 100644 --- a/docs/features/event-pattern-matching.mdx +++ b/docs/features/event-pattern-matching.mdx @@ -14,7 +14,7 @@ Both accept the same pattern forms: - string event type name - `'*'` wildcard (match everything) -Go and Rust use string event type names for low-level registration/lookup, plus typed helpers (`OnTyped[...]` in Go, `EventSpec`/`BaseEvent` in Rust) when you want static payload/result typing. +Go and Rust use string event type names for low-level registration/lookup, plus typed payload/event APIs (`bus.On(...)` in Go, `EventSpec`/`BaseEvent` in Rust) when you want payload/result validation. ## Supported pattern forms @@ -154,22 +154,16 @@ import ( abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2" ) -type UserActionPayload struct { +type UserActionEvent struct { Action string `json:"action"` } func main() { bus := abxbus.NewEventBus("AppBus", nil) - abxbus.OnTyped[UserActionPayload, string]( - bus, - "UserActionEvent", - "on", - func(payload UserActionPayload) (string, error) { - return fmt.Sprintf("action:%s", payload.Action), nil - }, - nil, - ) + bus.On(func(event UserActionEvent) (string, error) { + return fmt.Sprintf("action:%s", event.Action), nil + }) bus.On("UserActionEvent", "on_by_name", func(event *abxbus.BaseEvent) (any, error) { fmt.Println("by-name", event.EventType, event.Payload["action"]) @@ -177,29 +171,22 @@ func main() { return nil, nil }, nil) - bus.On("*", "on_any", func(event *abxbus.BaseEvent) (any, error) { + bus.OnEventName("*", "on_any", func(event *abxbus.BaseEvent) (any, error) { fmt.Println("wildcard", event.EventType) // wildcard UserActionEvent return nil, nil }, nil) - event, err := abxbus.NewBaseEventWithResult[UserActionPayload, string]( - "UserActionEvent", - UserActionPayload{Action: "click"}, - ) - if err != nil { - panic(err) - } - _, err = bus.Emit(event).EventResult() + _, err := bus.Emit(UserActionEvent{Action: "click"}).EventResult() if err != nil { panic(err) } - namedMatch, err := bus.Find("UserActionEvent", nil, nil) + namedMatch, err := bus.FindEventName("UserActionEvent", nil, nil) if err != nil { panic(err) } - wildcardMatch, err := bus.Find("*", nil, &abxbus.FindOptions{Future: 5.0}) + wildcardMatch, err := bus.FindEventName("*", nil, &abxbus.FindOptions{Future: 5.0}) if err != nil { panic(err) } @@ -223,6 +210,6 @@ String keys and `'*'` are intentionally looser: - Python: treat as `BaseEvent[Any]` - TypeScript: typed as base `BaseEvent`/unknown-oriented handler return checks - Rust: typed handlers use `bus.on(MyEvent, ...)`; raw string/wildcard registration is reserved for low-level forwarding internals. -- Go: typed handlers use `OnTyped[...]`; string/wildcard handlers receive `*BaseEvent` +- Go: typed handlers use `bus.On(...)`; string/wildcard handlers use `bus.OnEventName(...)` and receive `*BaseEvent` Use string/wildcard patterns when you need dynamic behavior. Use classes whenever you want strict payload/result type hints through handlers and lookups. diff --git a/docs/features/find-events.mdx b/docs/features/find-events.mdx index 1e51e617..567677ed 100644 --- a/docs/features/find-events.mdx +++ b/docs/features/find-events.mdx @@ -41,7 +41,7 @@ await bus.find(event_pattern, where, options?) ```go -bus.Find( +bus.FindEventName( "ResponseEvent", // event type string or "*" func(event *abxbus.BaseEvent) bool { // optional predicate return true @@ -130,7 +130,7 @@ const existing = await bus.find(ResponseEvent) ```go -existing, err := bus.Find("ResponseEvent", nil, nil) +existing, err := bus.FindEventName("ResponseEvent", nil, nil) ``` @@ -163,7 +163,7 @@ const future = await bus.find(ResponseEvent, { past: false, future: 5 }) ```go -future, err := bus.Find("ResponseEvent", nil, &abxbus.FindOptions{ +future, err := bus.FindEventName("ResponseEvent", nil, &abxbus.FindOptions{ Past: false, Future: 5.0, }) @@ -199,7 +199,7 @@ const match = await bus.find(ResponseEvent, { past: 5, future: 5 }) ```go -match, err := bus.Find("ResponseEvent", nil, &abxbus.FindOptions{ +match, err := bus.FindEventName("ResponseEvent", nil, &abxbus.FindOptions{ Past: 5.0, Future: 5.0, }) @@ -251,7 +251,7 @@ const match = await bus.find( ```go -match, err := bus.Find( +match, err := bus.FindEventName( "ResponseEvent", func(event *abxbus.BaseEvent) bool { return event.Payload["request_id"] == myID @@ -312,7 +312,7 @@ const anyCompleted = await bus.find( ```go -anyCompleted, err := bus.Find( +anyCompleted, err := bus.FindEventName( "*", func(event *abxbus.BaseEvent) bool { return strings.HasSuffix(event.EventType, "ResultEvent") @@ -379,7 +379,7 @@ parentEvent := bus.Emit(abxbus.NewBaseEvent("NavigateToUrlEvent", map[string]any if _, err := parentEvent.Now(); err != nil { panic(err) } -child, err := bus.Find("TabCreatedEvent", nil, &abxbus.FindOptions{ +child, err := bus.FindEventName("TabCreatedEvent", nil, &abxbus.FindOptions{ ChildOf: parentEvent, Past: 5.0, }) @@ -452,12 +452,12 @@ await event.now() ```go -event, err := bus.Find("ScreenshotEvent", nil, &abxbus.FindOptions{Past: 10.0, Future: false}) +event, err := bus.FindEventName("ScreenshotEvent", nil, &abxbus.FindOptions{Past: 10.0, Future: false}) if err != nil { panic(err) } if event == nil { - event, err = bus.Find("ScreenshotEvent", nil, &abxbus.FindOptions{Past: false, Future: 5.0}) + event, err = bus.FindEventName("ScreenshotEvent", nil, &abxbus.FindOptions{Past: false, Future: 5.0}) if err != nil { panic(err) } @@ -540,7 +540,7 @@ const recent = await bus.filter(ResponseEvent, { past: 10, future: false, limit: ```go limit := 5 -recent, err := bus.Filter("ResponseEvent", nil, &abxbus.FilterOptions{ +recent, err := bus.FilterEventName("ResponseEvent", nil, &abxbus.FilterOptions{ Past: 10.0, Future: false, Limit: &limit, diff --git a/docs/features/forwarding-between-buses.mdx b/docs/features/forwarding-between-buses.mdx index ecede09b..48efe417 100644 --- a/docs/features/forwarding-between-buses.mdx +++ b/docs/features/forwarding-between-buses.mdx @@ -281,10 +281,10 @@ func main() { return fmt.Sprintf("billing-ok:%s", event.Payload["user_id"]), nil }, nil) - auth.bus.On("*", "forward_to_relay", func(event *abxbus.BaseEvent) (any, error) { + auth.bus.OnEventName("*", "forward_to_relay", func(event *abxbus.BaseEvent) (any, error) { return relay.bus.Emit(event), nil }, nil) - relay.bus.On("*", "forward_to_billing", func(event *abxbus.BaseEvent) (any, error) { + relay.bus.OnEventName("*", "forward_to_billing", func(event *abxbus.BaseEvent) (any, error) { return billing.bus.Emit(event), nil }, nil) @@ -378,12 +378,12 @@ left := abxbus.NewEventBus("LeftBus", nil) right := abxbus.NewEventBus("RightBus", nil) // uni-directional -left.On("*", "forward_to_right", func(event *abxbus.BaseEvent) (any, error) { +left.OnEventName("*", "forward_to_right", func(event *abxbus.BaseEvent) (any, error) { return right.Emit(event), nil }, nil) // bi-directional (add reverse path) -right.On("*", "forward_to_left", func(event *abxbus.BaseEvent) (any, error) { +right.OnEventName("*", "forward_to_left", func(event *abxbus.BaseEvent) (any, error) { return left.Emit(event), nil }, nil) ``` @@ -509,13 +509,13 @@ busB := abxbus.NewEventBus("BusB", nil) busC := abxbus.NewEventBus("BusC", nil) // cycle: A -> B -> C -> A -busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent) (any, error) { +busA.OnEventName("*", "forward_to_b", func(event *abxbus.BaseEvent) (any, error) { return busB.Emit(event), nil }, nil) -busB.On("*", "forward_to_c", func(event *abxbus.BaseEvent) (any, error) { +busB.OnEventName("*", "forward_to_c", func(event *abxbus.BaseEvent) (any, error) { return busC.Emit(event), nil }, nil) -busC.On("*", "forward_to_a", func(event *abxbus.BaseEvent) (any, error) { +busC.OnEventName("*", "forward_to_a", func(event *abxbus.BaseEvent) (any, error) { return busA.Emit(event), nil }, nil) diff --git a/docs/features/parent-child-tracking.mdx b/docs/features/parent-child-tracking.mdx index 2f2b465e..0f4e7279 100644 --- a/docs/features/parent-child-tracking.mdx +++ b/docs/features/parent-child-tracking.mdx @@ -44,9 +44,9 @@ In Rust, the same split is explicit: In Go, the same split is explicit: -- `child := event.Emit(abxbus.NewBaseEvent(...))` creates linked child work. +- `child := event.Emit(ChildEvent{...})` creates linked child work. - `child.Now()` queue-jumps and waits for that linked child. -- `bus.Emit(abxbus.NewBaseEvent(...))` creates detached top-level work. +- `bus.Emit(ParentEvent{...})` creates detached top-level work. - calling `Now()` on a bus-emitted event waits for it without adding parent-child lineage. ## Works across forwarded buses too diff --git a/docs/features/return-value-handling.mdx b/docs/features/return-value-handling.mdx index d9debaf9..2e476487 100644 --- a/docs/features/return-value-handling.mdx +++ b/docs/features/return-value-handling.mdx @@ -87,30 +87,17 @@ let result = block_on(event.event_result())?; ```go -type DoMathPayload struct { +type DoMathEvent struct { A int `json:"a"` B int `json:"b"` } bus := abxbus.NewEventBus("AppBus", nil) -abxbus.OnTyped[DoMathPayload, int]( - bus, - "DoMathEvent", - "add", - func(payload DoMathPayload) (int, error) { - return payload.A + payload.B, nil - }, - nil, -) +bus.On(func(event DoMathEvent) (int, error) { + return event.A + event.B, nil +}) -event, err := abxbus.NewBaseEventWithResult[DoMathPayload, int]( - "DoMathEvent", - DoMathPayload{A: 2, B: 3}, -) -if err != nil { - panic(err) -} -emitted := bus.Emit(event) +emitted := bus.Emit(DoMathEvent{A: 2, B: 3}) result, err := emitted.EventResult() ``` @@ -177,7 +164,9 @@ let values = block_on(event.event_results_list_with_options(EventResultOptions { ```go -event := bus.Emit(abxbus.NewBaseEvent("GetConfigEvent", nil)) +type GetConfigEvent struct{} + +event := bus.Emit(GetConfigEvent{}) values, err := event.EventResultsList( &abxbus.EventResultOptions{RaiseIfAny: false, RaiseIfNone: false}, ) diff --git a/docs/features/typed-events.mdx b/docs/features/typed-events.mdx index bcbce473..a96695db 100644 --- a/docs/features/typed-events.mdx +++ b/docs/features/typed-events.mdx @@ -92,7 +92,7 @@ let typed_result = block_on(event.event_result())? ```go -type OrderCreatedPayload struct { +type OrderCreatedEvent struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` TotalAmount float64 `json:"total_amount"` @@ -103,26 +103,15 @@ type OrderResult struct { } bus := abxbus.NewEventBus("OrdersBus", nil) -abxbus.OnTyped[OrderCreatedPayload, OrderResult]( - bus, - "OrderCreatedEvent", - "handle_order", - func(payload OrderCreatedPayload) (OrderResult, error) { - return OrderResult{OK: payload.TotalAmount > 0}, nil - }, - nil, -) - -event, err := abxbus.NewBaseEventWithResult[OrderCreatedPayload, OrderResult]("OrderCreatedEvent", OrderCreatedPayload{ +bus.On(func(event OrderCreatedEvent) (OrderResult, error) { + return OrderResult{OK: event.TotalAmount > 0}, nil +}) + +result, err := bus.Emit(OrderCreatedEvent{ OrderID: "order-123", CustomerID: "customer-456", TotalAmount: 42.50, -}) -if err != nil { - panic(err) -} - -result, err := bus.Emit(event).EventResult() +}).EventResult() if err != nil { panic(err) } diff --git a/docs/further-reading/events-suck.mdx b/docs/further-reading/events-suck.mdx index 5a1f27fb..0a0100ff 100644 --- a/docs/further-reading/events-suck.mdx +++ b/docs/further-reading/events-suck.mdx @@ -179,7 +179,7 @@ const updated = await client.update({ id: user_id ?? 'fallback-id', age: 46 }, { ```go // events_suck.wrap/make_events/make_handler are not implemented in Go yet. -// For typed event APIs, use OnTyped[...] and NewBaseEventWithResult[...]. +// For typed event APIs, use bus.On(func(MyEvent) ...) and bus.Emit(MyEvent{...}). ``` diff --git a/docs/index.mdx b/docs/index.mdx index 08b71bda..55a1925e 100644 --- a/docs/index.mdx +++ b/docs/index.mdx @@ -99,13 +99,16 @@ import ( ) func main() { + type SomeEvent struct { + SomeData int `json:"some_data"` + } + bus := abxbus.NewEventBus("MyBus", nil) - bus.On("SomeEvent", "on_some_event", func(event *abxbus.BaseEvent) (any, error) { - fmt.Println(event.Payload["some_data"]) - return nil, nil - }, nil) + bus.On(func(event SomeEvent) { + fmt.Println(event.SomeData) + }) - event := bus.Emit(abxbus.NewBaseEvent("SomeEvent", map[string]any{"some_data": 132})) + event := bus.Emit(SomeEvent{SomeData: 132}) _, _ = event.Now() } ``` diff --git a/docs/integrations/bridge-jsonl.mdx b/docs/integrations/bridge-jsonl.mdx index 50ccd88c..ab17acf5 100644 --- a/docs/integrations/bridge-jsonl.mdx +++ b/docs/integrations/bridge-jsonl.mdx @@ -88,10 +88,10 @@ bridge.on('*', bus.emit) bus := abxbus.NewEventBus("AppBus", nil) bridge := abxbus.NewJSONLEventBridge("/tmp/abxbus_events.jsonl", 0.25, "JsonlBridge") -bus.On("*", "jsonl_emit", func(event *abxbus.BaseEvent) (any, error) { +bus.OnEventName("*", "jsonl_emit", func(event *abxbus.BaseEvent) (any, error) { return bridge.Emit(event) }, nil) -bridge.On("*", "bus_emit", func(event *abxbus.BaseEvent) (any, error) { +bridge.OnEventName("*", "bus_emit", func(event *abxbus.BaseEvent) (any, error) { return bus.Emit(event), nil }, nil) ``` diff --git a/docs/integrations/bridges.mdx b/docs/integrations/bridges.mdx index dafdc99b..3c0285a3 100644 --- a/docs/integrations/bridges.mdx +++ b/docs/integrations/bridges.mdx @@ -48,10 +48,10 @@ bridge.on('*', bus.emit) bus := abxbus.NewEventBus("AppBus", nil) bridge := abxbus.NewJSONLEventBridge("/tmp/abxbus_events.jsonl", 0.25, "JsonlBridge") -bus.On("*", "jsonl_emit", func(event *abxbus.BaseEvent) (any, error) { +bus.OnEventName("*", "jsonl_emit", func(event *abxbus.BaseEvent) (any, error) { return bridge.Emit(event) }, nil) -bridge.On("*", "bus_emit", func(event *abxbus.BaseEvent) (any, error) { +bridge.OnEventName("*", "bus_emit", func(event *abxbus.BaseEvent) (any, error) { return bus.Emit(event), nil }, nil) ``` diff --git a/docs/quickstart.mdx b/docs/quickstart.mdx index 28b75f4f..abbb3858 100644 --- a/docs/quickstart.mdx +++ b/docs/quickstart.mdx @@ -134,14 +134,20 @@ block_on(async { ```go +type CreateUserEvent struct { + Email string `json:"email"` +} + +type CreateUserResult struct { + UserID string `json:"user_id"` +} + bus := abxbus.NewEventBus("MyAuthEventBus", nil) -bus.On("CreateUserEvent", "on_create_user", func(event *abxbus.BaseEvent) (any, error) { - return map[string]any{"user_id": "some-user-uuid"}, nil -}, nil) +bus.On(func(event CreateUserEvent) (CreateUserResult, error) { + return CreateUserResult{UserID: "some-user-uuid"}, nil +}) -event := bus.Emit(abxbus.NewBaseEvent("CreateUserEvent", map[string]any{ - "email": "someuser@example.com", -})) +event := bus.Emit(CreateUserEvent{Email: "someuser@example.com"}) result, err := event.EventResult() if err != nil { panic(err) diff --git a/pyproject.toml b/pyproject.toml index ebf5c178..c7830d19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "abxbus" description = "Advanced Pydantic-powered event bus with async support" authors = [{ name = "Nick Sweeting" }, { name = "ArchiveBox" }] -version = "2.5.0" +version = "2.5.2" readme = "README.md" requires-python = ">=3.11" classifiers = [