Skip to content

Commit 6ccb2be

Browse files
committed
fix(kafka): fix converting AsyncAPI 2.0 to 3.0
test(mcp): improve test stability (map is not ordered
1 parent d52cf43 commit 6ccb2be

10 files changed

Lines changed: 124 additions & 87 deletions

File tree

config/dynamic/asyncApi/convert_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ func TestConfig_Convert(t *testing.T) {
2727
err = cfg.Parse(c, &dynamictest.Reader{})
2828
require.NoError(t, err)
2929

30-
cfg3 := c.Data.(*asyncapi3.Config)
30+
cfg3, err := c.Data.(*asyncApi.Config).Convert()
31+
require.NoError(t, err)
3132

3233
require.Equal(t, "3.0.0", cfg3.Version)
3334
require.Equal(t, "urn:example:com:smartylighting:streetlights:server", cfg3.Id)
@@ -211,9 +212,9 @@ func TestConvert(t *testing.T) {
211212
c := &dynamic.Config{Data: tc.cfg}
212213
err := tc.cfg.Parse(c, &dynamictest.Reader{})
213214
require.NoError(t, err)
214-
require.IsType(t, &asyncapi3.Config{}, c.Data)
215215

216-
tc.test(t, c.Data.(*asyncapi3.Config), err)
216+
cfg, err := tc.cfg.Convert()
217+
tc.test(t, cfg, err)
217218
})
218219
}
219220
}

config/dynamic/asyncApi/parsing.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,27 @@ func (c *Config) Parse(config *dynamic.Config, reader dynamic.Reader) error {
1111
return nil
1212
}
1313

14-
converted, err := c.Convert()
15-
if err != nil {
16-
return err
14+
for _, server := range c.Servers {
15+
if server == nil || len(server.Ref) == 0 {
16+
continue
17+
}
18+
resolved, err := server.Resolve(config, reader)
19+
if err != nil {
20+
return err
21+
}
22+
server.Value = resolved.Value
1723
}
18-
config.Data = converted
19-
return converted.Parse(config, reader)
24+
25+
for _, ch := range c.Channels {
26+
if ch == nil {
27+
continue
28+
}
29+
if err := ch.Parse(config, reader); err != nil {
30+
return err
31+
}
32+
}
33+
34+
return nil
2035
}
2136

2237
func (r *ChannelRef) Parse(config *dynamic.Config, reader dynamic.Reader) error {

config/dynamic/asyncApi/parsing_test.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,12 @@ func TestServerResolve(t *testing.T) {
146146
reader := &testReader{readFunc: tc.read}
147147
c := &dynamic.Config{Info: dynamic.ConfigInfo{Url: &url.URL{}}, Data: tc.cfg}
148148
err := tc.cfg.Parse(c, reader)
149-
require.IsType(t, &asyncapi3.Config{}, c.Data)
150-
tc.test(t, c.Data.(*asyncapi3.Config), err)
149+
if err != nil {
150+
tc.test(t, nil, err)
151+
} else {
152+
cfg, err := tc.cfg.Convert()
153+
tc.test(t, cfg, err)
154+
}
151155
})
152156
}
153157
}
@@ -238,9 +242,12 @@ func TestChannelResolve(t *testing.T) {
238242
reader := &testReader{readFunc: tc.read}
239243
c := &dynamic.Config{Info: dynamic.ConfigInfo{Url: &url.URL{}}, Data: tc.cfg}
240244
err := tc.cfg.Parse(c, reader)
241-
require.IsType(t, &asyncapi3.Config{}, c.Data)
242-
243-
tc.test(t, c.Data.(*asyncapi3.Config), err)
245+
if err != nil {
246+
tc.test(t, nil, err)
247+
} else {
248+
cfg, err := tc.cfg.Convert()
249+
tc.test(t, cfg, err)
250+
}
244251
})
245252
}
246253
}
@@ -366,8 +373,12 @@ func TestMessage(t *testing.T) {
366373
reader := &testReader{readFunc: tc.read}
367374
c := &dynamic.Config{Info: dynamic.ConfigInfo{Url: &url.URL{}}, Data: tc.cfg}
368375
err := tc.cfg.Parse(c, reader)
369-
require.IsType(t, &asyncapi3.Config{}, c.Data)
370-
tc.test(t, c.Data.(*asyncapi3.Config), err)
376+
if err != nil {
377+
tc.test(t, nil, err)
378+
} else {
379+
cfg, err := tc.cfg.Convert()
380+
tc.test(t, cfg, err)
381+
}
371382
})
372383
}
373384
}
@@ -402,8 +413,8 @@ func TestSchema(t *testing.T) {
402413
c := &dynamic.Config{Info: dynamic.ConfigInfo{Url: &url.URL{}}, Data: config}
403414
err := config.Parse(c, reader)
404415
require.NoError(t, err)
405-
require.IsType(t, &asyncapi3.Config{}, c.Data)
406-
s, err := c.Data.(*asyncapi3.Config).Channels["foo"].Value.Messages["publish"].Value.Payload.GetSchema()
416+
cfg, err := config.Convert()
417+
s, err := cfg.Channels["foo"].Value.Messages["publish"].Value.Payload.GetSchema()
407418
require.NoError(t, err)
408419
require.Equal(t, target, s)
409420
})

config/dynamic/resolve.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ type Converter interface {
1919
ConvertTo(i any) (any, error)
2020
}
2121

22-
type FromConverter interface {
23-
ConvertFrom(i any) (any, error)
24-
}
25-
2622
func resolve[T any](ref string, config *Config, reader Reader) (T, error) {
2723
var err error
2824
var result T
@@ -351,17 +347,5 @@ func convert[T any](val any) any {
351347
}
352348
}
353349

354-
v := reflect.ValueOf(target)
355-
if !v.IsValid() || !v.CanInterface() {
356-
return val
357-
}
358-
c, ok := v.Interface().(FromConverter)
359-
if !ok {
360-
return val
361-
}
362-
result, err := c.ConvertFrom(val)
363-
if err == nil {
364-
return result
365-
}
366350
return val
367351
}

mcp/run_kafka.go

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package mcp
22

33
import (
44
"fmt"
5+
"mokapi/engine"
6+
"mokapi/engine/common"
57
"mokapi/kafka"
68
"mokapi/runtime"
79
"slices"
@@ -14,7 +16,8 @@ type Kafka struct {
1416
Type string `json:"type"`
1517
Brokers []Broker `json:"brokers"`
1618

17-
info *runtime.KafkaInfo
19+
info *runtime.KafkaInfo
20+
client *engine.KafkaClient
1821
}
1922

2023
type Broker struct {
@@ -36,7 +39,8 @@ type Topic struct {
3639

3740
Operations []KafkaOperation `json:"operations,omitempty"`
3841

39-
info *runtime.KafkaInfo
42+
info *runtime.KafkaInfo
43+
client *engine.KafkaClient
4044
}
4145

4246
type KafkaPartition struct {
@@ -74,9 +78,10 @@ func (m *mokapi) getKafkaApi(name string) any {
7478
for _, api := range m.app.Kafka.List() {
7579
if api.Info.Name == name {
7680
result := &Kafka{
77-
Name: name,
78-
Type: "kafka",
79-
info: api,
81+
Name: name,
82+
Type: "kafka",
83+
info: api,
84+
client: engine.NewKafkaClient(m.app),
8085
}
8186
for it := api.Servers.Iter(); it.Next(); {
8287
b := it.Value()
@@ -128,6 +133,7 @@ func (k *Kafka) GetTopic(name string) (Topic, error) {
128133
},
129134
Description: ch.Value.Description,
130135
info: k.info,
136+
client: k.client,
131137
}
132138

133139
topic := k.info.Store.Topic(name)
@@ -180,45 +186,55 @@ func (k *Kafka) GetTopic(name string) (Topic, error) {
180186
t.Operations = append(t.Operations, result)
181187
}
182188

189+
slices.SortStableFunc(t.Operations, func(a, b KafkaOperation) int {
190+
r := strings.Compare(a.Action, b.Action)
191+
if r != 0 {
192+
return r
193+
}
194+
return strings.Compare(a.Title, b.Title)
195+
})
196+
183197
return t, nil
184198
}
185199

186200
func (t *Topic) Produce(partition int, value string, key string, headers map[string]string) error {
187-
topic := t.info.Store.Topic(t.Name)
188-
if topic == nil {
189-
return fmt.Errorf("topic '%s' not found", t.Name)
190-
}
191-
p := topic.Partition(partition)
192-
if p == nil {
193-
return fmt.Errorf("partition '%d' not found", partition)
194-
}
195-
196-
r := &kafka.Record{
197-
Time: time.Now(),
198-
Key: kafka.NewBytes([]byte(key)),
199-
Value: kafka.NewBytes([]byte(value)),
201+
msg := common.KafkaMessage{
202+
Value: []byte(value),
203+
Data: nil,
204+
Headers: headers,
205+
Partition: partition,
200206
}
201-
if headers != nil {
202-
for k, v := range headers {
203-
r.Headers = append(r.Headers, kafka.RecordHeader{
204-
Key: k,
205-
Value: []byte(v),
206-
})
207-
}
207+
if key != "" {
208+
msg.Key = []byte(key)
208209
}
209210

210-
result, err := p.Write(kafka.RecordBatch{
211-
Records: []*kafka.Record{r},
211+
_, err := t.client.Produce(&common.KafkaProduceArgs{
212+
Cluster: t.info.Info.Name,
213+
Topic: t.Name,
214+
Messages: []common.KafkaMessage{msg},
215+
Retry: common.KafkaProduceRetry{
216+
MaxRetryTime: 3 * time.Minute,
217+
InitialRetryTime: 500 * time.Millisecond,
218+
Retries: 10,
219+
Factor: 2,
220+
},
221+
ClientId: "mokapi-mcp",
212222
})
213223
if err != nil {
214224
return err
215225
}
216-
if result.ErrorCode != kafka.None {
217-
return fmt.Errorf("%d: %s", result.ErrorCode, result.ErrorMessage)
226+
227+
topic := t.info.Store.Topic(t.Name)
228+
if topic == nil {
229+
return fmt.Errorf("topic '%s' not found", t.Name)
230+
}
231+
p := topic.Partition(partition)
232+
if p == nil {
233+
return fmt.Errorf("partition '%s' not found", t.Name)
218234
}
219235
for _, pt := range t.Partitions {
220236
if pt.Index == p.Index {
221-
pt.Offset += 1
237+
pt.Offset = p.Offset()
222238
}
223239
}
224240
return nil

mcp/run_kafka_test.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,21 +167,22 @@ func TestService_Run_Kafka(t *testing.T) {
167167
require.Equal(t, "channel-1 summary", topic.Summary)
168168
require.Equal(t, "description", topic.Description)
169169
require.Len(t, topic.Operations, 2)
170-
require.Equal(t, "send", topic.Operations[0].Action)
171-
require.Equal(t, "op-title-1", topic.Operations[0].Title)
172-
require.Equal(t, "op-summary-1", topic.Operations[0].Summary)
173-
require.Equal(t, "op-description-1", topic.Operations[0].Description)
174-
require.Len(t, topic.Operations[0].Messages, 1)
175-
require.Equal(t, "msg-name-1", topic.Operations[0].Messages[0].Name)
176-
require.Equal(t, "msg-title-1", topic.Operations[0].Messages[0].Title)
177-
require.Equal(t, "msg-summary-1", topic.Operations[0].Messages[0].Summary)
178-
require.Equal(t, "msg-description-1", topic.Operations[0].Messages[0].Description)
179-
require.Equal(t, "application/json", topic.Operations[0].Messages[0].ContentType)
180-
require.IsType(t, &jsonSchema.Schema{}, topic.Operations[0].Messages[0].Payload)
181-
payload := topic.Operations[0].Messages[0].Payload.(*jsonSchema.Schema)
182-
require.Equal(t, "object", payload.Type.String())
183170

184-
require.Equal(t, "receive", topic.Operations[1].Action)
171+
require.Equal(t, "receive", topic.Operations[0].Action)
172+
173+
require.Equal(t, "send", topic.Operations[1].Action)
174+
require.Equal(t, "op-title-1", topic.Operations[1].Title)
175+
require.Equal(t, "op-summary-1", topic.Operations[1].Summary)
176+
require.Equal(t, "op-description-1", topic.Operations[1].Description)
177+
require.Len(t, topic.Operations[1].Messages, 1)
178+
require.Equal(t, "msg-name-1", topic.Operations[1].Messages[0].Name)
179+
require.Equal(t, "msg-title-1", topic.Operations[1].Messages[0].Title)
180+
require.Equal(t, "msg-summary-1", topic.Operations[1].Messages[0].Summary)
181+
require.Equal(t, "msg-description-1", topic.Operations[1].Messages[0].Description)
182+
require.Equal(t, "application/json", topic.Operations[1].Messages[0].ContentType)
183+
require.IsType(t, &jsonSchema.Schema{}, topic.Operations[1].Messages[0].Payload)
184+
payload := topic.Operations[1].Messages[0].Payload.(*jsonSchema.Schema)
185+
require.Equal(t, "object", payload.Type.String())
185186
},
186187
},
187188
{

providers/asyncapi3/kafka/store/partition_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ func TestPartition_Write_Value_Validator(t *testing.T) {
210210
require.Equal(t, int64(0), wr.BaseOffset)
211211
require.Equal(t, int64(1), p.Offset())
212212
require.Equal(t, int64(0), p.StartOffset())
213-
r := p.Segments[p.ActiveSegment].record(0)
213+
r := p.Segments[p.ActiveSegment].Record(0)
214214
require.NotNil(t, r)
215215
require.Len(t, r.Headers, 1)
216216
require.Equal(t, "bar-1", r.Headers[0].Key)

providers/asyncapi3/schema.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,6 @@ func (r *SchemaRef) Marshal(v any, ct media.ContentType) ([]byte, error) {
118118
}
119119
}
120120

121-
//func (r *SchemaRef) ConvertFrom(val any) (any, error) {
122-
// if s, ok := val.(Schema); ok {
123-
// return &SchemaRef{Value: s}, nil
124-
// }
125-
// return nil, fmt.Errorf("unsupported type: %T", val)
126-
//}
127-
128121
func (r *SchemaRef) GetSchema() (Schema, error) {
129122
if r.Value == nil {
130123
return nil, nil
@@ -134,7 +127,7 @@ func (r *SchemaRef) GetSchema() (Schema, error) {
134127
case *jsonSchema.Schema, *avro.Schema, *openapi.Schema:
135128
return s, nil
136129
case *SchemaRef:
137-
return r.Value, nil
130+
return s.GetSchema()
138131
case *MultiSchemaFormat:
139132
return s.Schema.GetSchema()
140133
default:

runtime/runtime_kafka.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package runtime
22

33
import (
44
"mokapi/config/dynamic"
5+
"mokapi/config/dynamic/asyncApi"
56
"mokapi/config/static"
67
"mokapi/engine/common"
78
"mokapi/kafka"
@@ -225,13 +226,24 @@ func IsAsyncApiConfig(c *dynamic.Config) (*asyncapi3.Config, bool) {
225226
switch v := c.Data.(type) {
226227
case *asyncapi3.Config:
227228
return v, true
229+
case *asyncApi.Config:
230+
conv, err := v.Convert()
231+
if err != nil {
232+
log.Errorf("failed to convert asyncapi 2.0 config: %s", err)
233+
return nil, false
234+
}
235+
return conv, true
228236
default:
229237
return nil, false
230238
}
231239
}
232240

233241
func getKafkaConfig(c *dynamic.Config) *asyncapi3.Config {
234-
return c.Data.(*asyncapi3.Config)
242+
cfg, ok := IsAsyncApiConfig(c)
243+
if !ok {
244+
return nil
245+
}
246+
return cfg
235247
}
236248

237249
func (s *KafkaStore) updateEventStore(k *KafkaInfo) {

schema/json/schema/string.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ import (
66
)
77

88
func (s *Schema) String() string {
9+
if s == nil {
10+
return ""
11+
}
12+
913
var sb strings.Builder
1014

1115
if s.Boolean != nil {

0 commit comments

Comments
 (0)