From 1848a057295d9b9ea3f0ef33ea0c9db7abf1ff5b Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Mon, 21 Nov 2022 14:05:01 +0100 Subject: [PATCH 01/12] Stream inspector for destinations --- go.mod | 3 + go.sum | 7 ++ pkg/conduit/runtime.go | 17 ++- pkg/connector/builder.go | 4 + pkg/connector/connector.go | 5 + pkg/connector/destination.go | 8 ++ pkg/connector/mock/connector.go | 29 +++++ pkg/connector/source.go | 6 + pkg/foundation/grpcutil/gateway.go | 5 + pkg/inspector/inspector.go | 142 ++++++++++++++++++++++ pkg/inspector/inspector_benchmark_test.go | 32 +++++ pkg/inspector/inspector_test.go | 142 ++++++++++++++++++++++ pkg/orchestrator/connectors.go | 10 ++ pkg/web/api/connector_v1.go | 31 ++++- pkg/web/api/connector_v1_test.go | 116 ++++++++++++++++++ pkg/web/api/mock/connector.go | 16 +++ pkg/web/api/mock/connector_service.go | 133 ++++++++++++++++++++ pkg/web/api/toproto/record.go | 98 +++++++++++++++ 18 files changed, 799 insertions(+), 5 deletions(-) create mode 100644 pkg/inspector/inspector.go create mode 100644 pkg/inspector/inspector_benchmark_test.go create mode 100644 pkg/inspector/inspector_test.go create mode 100644 pkg/web/api/mock/connector_service.go create mode 100644 pkg/web/api/toproto/record.go diff --git a/go.mod b/go.mod index eafc7850c..e04318c55 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.37.0 github.com/rs/zerolog v1.28.0 + github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.4.3 go.buf.build/protocolbuffers/go/grpc-ecosystem/grpc-gateway v1.3.49 golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 @@ -83,6 +84,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.0+incompatible // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.13.0 // indirect @@ -107,6 +109,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/segmentio/kafka-go v0.4.35 // indirect + github.com/sirupsen/logrus v1.8.1 // indirect github.com/xdg/scram v1.0.5 // indirect github.com/xdg/stringprep v1.0.3 // indirect github.com/xitongsys/parquet-go v1.6.2 // indirect diff --git a/go.sum b/go.sum index c78ec48c7..b139d3824 100644 --- a/go.sum +++ b/go.sum @@ -307,6 +307,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 h1:t7uX3JBHdVwAi3G7sSSdbsk8NfgA+LnUS88V/2EKaA0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0/go.mod h1:4OGVnY4qf2+gw+ssiHbW+pq4mo2yko94YxxMmXZ7jCA= @@ -529,6 +531,8 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -554,6 +558,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE= +github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw= github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= @@ -703,6 +709,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index ff8dff79d..3de42a7c8 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -57,6 +57,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" "google.golang.org/grpc/stats" "gopkg.in/tomb.v2" @@ -320,6 +321,10 @@ func (r *Runtime) serveGRPCAPI(ctx context.Context, t *tomb.Tomb) (net.Addr, err info := api.NewInformation(Version(false)) info.Register(grpcServer) + // Makes it easier to use command line tools to interact + // with the gRPC API. + // https://github.com/grpc/grpc/blob/master/doc/server-reflection.md + reflection.Register(grpcServer) healthService := api.NewHealthChecker() grpc_health_v1.RegisterHealthServer(grpcServer, healthService) @@ -436,14 +441,18 @@ func (r *Runtime) serveHTTPAPI( return nil, cerrors.Errorf("failed to register metrics handler: %w", err) } + handler := grpcutil.WithWebsockets( + grpcutil.WithDefaultGatewayMiddleware( + r.logger, allowCORS(gwmux, "http://localhost:4200"), + ), + ) + return r.serveHTTP( ctx, t, &http.Server{ - Addr: r.Config.HTTP.Address, - Handler: grpcutil.WithDefaultGatewayMiddleware( - r.logger, allowCORS(gwmux, "http://localhost:4200"), - ), + Addr: r.Config.HTTP.Address, + Handler: handler, ReadHeaderTimeout: 10 * time.Second, }, ) diff --git a/pkg/connector/builder.go b/pkg/connector/builder.go index 9d8aa8c43..1d8c9f4c0 100644 --- a/pkg/connector/builder.go +++ b/pkg/connector/builder.go @@ -20,9 +20,12 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/plugin" ) +const inspectorBufferSize = 1000 + // Builder represents an object that can build a connector. // The main use of this interface is to be able to switch out the connector // implementations for mocks in tests. @@ -99,6 +102,7 @@ func (b *DefaultBuilder) Init(c Connector, id string, config Config) error { v.persister = b.persister v.pluginDispenser = p v.errs = make(chan error) + v.inspector = inspector.New(v.logger, inspectorBufferSize) default: return ErrInvalidConnectorType } diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index c56037f54..0bd9bb61a 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -21,6 +21,7 @@ import ( "context" "time" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/record" ) @@ -65,6 +66,10 @@ type Connector interface { // asynchronously (e.g. persisting state). Errors() <-chan error + // Inspect returns an inspector.Session which exposes the records + // coming into or out of this connector (depending on the connector type). + Inspect(context.Context) *inspector.Session + // Open will start the plugin process and call the Open method on the // plugin. After the connector has been successfully opened it is considered // as running (IsRunning returns true) and can be stopped again with diff --git a/pkg/connector/destination.go b/pkg/connector/destination.go index c62163839..0393f965a 100644 --- a/pkg/connector/destination.go +++ b/pkg/connector/destination.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/plugin" "github.com/conduitio/conduit/pkg/record" ) @@ -57,6 +58,8 @@ type destination struct { // stopStream is a function that closes the context of the stream stopStream context.CancelFunc + inspector *inspector.Inspector + // m can lock a destination from concurrent access (e.g. in connector persister). m sync.Mutex // wg tracks the number of in flight calls to the plugin. @@ -115,6 +118,10 @@ func (d *destination) Errors() <-chan error { return d.errs } +func (d *destination) Inspect(ctx context.Context) *inspector.Session { + return d.inspector.NewSession(ctx) +} + func (d *destination) Validate(ctx context.Context, settings map[string]string) (err error) { dest, err := d.pluginDispenser.DispenseDestination() if err != nil { @@ -227,6 +234,7 @@ func (d *destination) Write(ctx context.Context, r record.Record) error { return err } + d.inspector.Send(ctx, r) err = d.plugin.Write(ctx, r) if err != nil { return cerrors.Errorf("error writing record: %w", err) diff --git a/pkg/connector/mock/connector.go b/pkg/connector/mock/connector.go index 06a0b2a4e..00fdb6633 100644 --- a/pkg/connector/mock/connector.go +++ b/pkg/connector/mock/connector.go @@ -10,6 +10,7 @@ import ( time "time" connector "github.com/conduitio/conduit/pkg/connector" + inspector "github.com/conduitio/conduit/pkg/inspector" record "github.com/conduitio/conduit/pkg/record" gomock "github.com/golang/mock/gomock" ) @@ -107,6 +108,20 @@ func (mr *SourceMockRecorder) ID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*Source)(nil).ID)) } +// Inspect mocks base method. +func (m *Source) Inspect(arg0 context.Context) *inspector.Session { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Inspect", arg0) + ret0, _ := ret[0].(*inspector.Session) + return ret0 +} + +// Inspect indicates an expected call of Inspect. +func (mr *SourceMockRecorder) Inspect(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Inspect", reflect.TypeOf((*Source)(nil).Inspect), arg0) +} + // IsRunning mocks base method. func (m *Source) IsRunning() bool { m.ctrl.T.Helper() @@ -379,6 +394,20 @@ func (mr *DestinationMockRecorder) ID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*Destination)(nil).ID)) } +// Inspect mocks base method. +func (m *Destination) Inspect(arg0 context.Context) *inspector.Session { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Inspect", arg0) + ret0, _ := ret[0].(*inspector.Session) + return ret0 +} + +// Inspect indicates an expected call of Inspect. +func (mr *DestinationMockRecorder) Inspect(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Inspect", reflect.TypeOf((*Destination)(nil).Inspect), arg0) +} + // IsRunning mocks base method. func (m *Destination) IsRunning() bool { m.ctrl.T.Helper() diff --git a/pkg/connector/source.go b/pkg/connector/source.go index 7e1bacfaa..da4c51bdf 100644 --- a/pkg/connector/source.go +++ b/pkg/connector/source.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/plugin" "github.com/conduitio/conduit/pkg/record" ) @@ -114,6 +115,11 @@ func (s *source) Errors() <-chan error { return s.errs } +func (s *source) Inspect(_ context.Context) *inspector.Session { + // TODO implement me + panic("implement me") +} + func (s *source) Validate(ctx context.Context, settings map[string]string) (err error) { src, err := s.pluginDispenser.DispenseSource() if err != nil { diff --git a/pkg/foundation/grpcutil/gateway.go b/pkg/foundation/grpcutil/gateway.go index 9d08220a5..77c9c6275 100644 --- a/pkg/foundation/grpcutil/gateway.go +++ b/pkg/foundation/grpcutil/gateway.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/log" "github.com/google/uuid" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/tmc/grpc-websocket-proxy/wsproxy" "google.golang.org/protobuf/encoding/protojson" ) @@ -109,6 +110,10 @@ func WithHTTPEndpointHeader(h http.Handler) http.Handler { }) } +func WithWebsockets(h http.Handler) http.Handler { + return wsproxy.WebsocketProxy(h) +} + func extractEndpoint(r *http.Request) string { return r.Method + " " + r.URL.Path } diff --git a/pkg/inspector/inspector.go b/pkg/inspector/inspector.go new file mode 100644 index 000000000..b00757e9a --- /dev/null +++ b/pkg/inspector/inspector.go @@ -0,0 +1,142 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inspector + +import ( + "context" + "sync" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/google/uuid" +) + +// Session wraps a channel of records and provides: +// 1. a way to send records to it asynchronously +// 2. a way to know if it's closed or not +type Session struct { + C chan record.Record + + logger log.CtxLogger + onClose func() +} + +func (s *Session) Close() { + s.onClose() + close(s.C) +} + +// send sends a record to the session's channel. +// If the channel has already reached its capacity, +// the record will be ignored. +func (s *Session) send(ctx context.Context, r record.Record) { + select { + case s.C <- r: + default: + s.logger. + Warn(ctx). + Msg("session buffer full, record will be dropped") + } +} + +// Inspector is attached to an inspectable pipeline component +// and makes returns records coming in or out of the component. +// An Inspector is a "proxy" between the pipeline component being +// inspected and the API, which broadcasts records to all clients. +type Inspector struct { + // sessions is a map of sessions. + // keys are sessions IDs. + sessions map[string]*Session + // guards access to sessions + lock sync.Mutex + logger log.CtxLogger + bufferSize int +} + +func New(logger log.CtxLogger, bufferSize int) *Inspector { + return &Inspector{ + sessions: make(map[string]*Session), + logger: logger.WithComponent("inspector.Inspector"), + bufferSize: bufferSize, + } +} + +// Send sends the given record to all registered sessions. +// The method does not wait for consumers to get the records. +func (i *Inspector) Send(ctx context.Context, r record.Record) { + // copy metadata, to prevent issues when concurrently accessing the metadata + var meta record.Metadata + if len(r.Metadata) != 0 { + meta = make(record.Metadata, len(r.Metadata)) + for k, v := range r.Metadata { + meta[k] = v + } + } + + // todo optimize this, as we have locks for every record. + // locks are needed to make sure the `sessions` slice + // is not modified as we're iterating over it + i.lock.Lock() + defer i.lock.Unlock() + for _, s := range i.sessions { + s.send(ctx, record.Record{ + Position: r.Position, + Operation: r.Operation, + Metadata: meta, + Key: r.Key, + Payload: r.Payload, + }) + } +} + +func (i *Inspector) NewSession(ctx context.Context) *Session { + id := uuid.NewString() + s := &Session{ + C: make(chan record.Record, i.bufferSize), + logger: i.logger.WithComponent("inspector.Session"), + onClose: func() { + i.remove(id) + }, + } + go func() { + <-ctx.Done() + s.logger. + Info(context.Background()). + Msgf("context canceled: %v", ctx.Err()) + s.Close() + }() + + i.lock.Lock() + defer i.lock.Unlock() + + i.sessions[id] = s + i.logger. + Info(context.Background()). + Str("session_id", id). + Msg("session created") + return s +} + +// remove removes a session with given ID from this Inspector. +func (i *Inspector) remove(id string) { + i.lock.Lock() + defer i.lock.Unlock() + + delete(i.sessions, id) + i.logger. + Info(context.Background()). + Str("session_id", id). + Msg("session removed") +} diff --git a/pkg/inspector/inspector_benchmark_test.go b/pkg/inspector/inspector_benchmark_test.go new file mode 100644 index 000000000..78c41cb5f --- /dev/null +++ b/pkg/inspector/inspector_benchmark_test.go @@ -0,0 +1,32 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inspector + +import ( + "context" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" +) + +func BenchmarkInspector_SingleSession_Send(b *testing.B) { + ins := New(log.Nop(), 10) + ins.NewSession(context.Background()) + + for i := 0; i < b.N; i++ { + ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")}) + } +} diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go new file mode 100644 index 000000000..63e6576c9 --- /dev/null +++ b/pkg/inspector/inspector_test.go @@ -0,0 +1,142 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inspector + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/conduitio/conduit/pkg/foundation/cchan" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/matryer/is" +) + +func TestInspector_Send_NoSessions(t *testing.T) { + underTest := New(log.Nop(), 10) + underTest.Send(context.Background(), record.Record{}) +} + +func TestInspector_Send_SingleSession(t *testing.T) { + underTest := New(log.Nop(), 10) + s := underTest.NewSession(context.Background()) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is.New(t), s, r) +} + +func TestInspector_Send_MultipleSessions(t *testing.T) { + is := is.New(t) + + underTest := New(log.Nop(), 10) + s1 := underTest.NewSession(context.Background()) + s2 := underTest.NewSession(context.Background()) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is, s1, r) + assertGotRecord(is, s2, r) +} + +func TestInspector_Send_SessionClosed(t *testing.T) { + is := is.New(t) + + underTest := New(log.Nop(), 10) + s := underTest.NewSession(context.Background()) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is.New(t), s, r) + + s.Close() + underTest.Send( + context.Background(), + record.Record{ + Position: record.Position("test-pos-2"), + }, + ) +} + +func TestInspector_Send_SessionCtxCanceled(t *testing.T) { + is := is.New(t) + + underTest := New(log.Nop(), 10) + ctx, cancel := context.WithCancel(context.Background()) + s := underTest.NewSession(ctx) + + r := record.Record{ + Position: record.Position("test-pos"), + } + underTest.Send(context.Background(), r) + assertGotRecord(is.New(t), s, r) + + cancel() + + _, got, _ := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + is.True(!got) // expected no record +} + +func TestInspector_Send_SlowConsumer(t *testing.T) { + // When a session's buffer is full, then further records will be dropped. + // In this test we set up an inspector with a buffer size of 10. + // Then we send 11 records (without reading them immediately). + // The expected behavior is that we will be able to get the first 10, + // and that the last record never arrives. + is := is.New(t) + + bufferSize := 10 + underTest := New(log.Nop(), bufferSize) + s := underTest.NewSession(context.Background()) + + for i := 0; i < bufferSize+1; i++ { + s.send( + context.Background(), + record.Record{ + Position: record.Position(fmt.Sprintf("test-pos-%v", i)), + }, + ) + } + + for i := 0; i < bufferSize; i++ { + assertGotRecord( + is, + s, + record.Record{ + Position: record.Position(fmt.Sprintf("test-pos-%v", i)), + }, + ) + } + + _, got, err := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + is.True(cerrors.Is(err, context.DeadlineExceeded)) + is.True(!got) +} + +func assertGotRecord(is *is.I, s *Session, recWant record.Record) { + recGot, got, err := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + is.NoErr(err) + is.True(got) + is.Equal(recWant, recGot) +} diff --git a/pkg/orchestrator/connectors.go b/pkg/orchestrator/connectors.go index 265711b05..9c48745d8 100644 --- a/pkg/orchestrator/connectors.go +++ b/pkg/orchestrator/connectors.go @@ -21,11 +21,21 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/rollback" "github.com/conduitio/conduit/pkg/pipeline" + "github.com/conduitio/conduit/pkg/record" "github.com/google/uuid" ) type ConnectorOrchestrator base +func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (chan record.Record, error) { + conn, err := c.Get(ctx, id) + if err != nil { + return nil, cerrors.Errorf("failed to get connector by ID %v: %w", id, err) + } + + return conn.Inspect(ctx).C, nil +} + func (c *ConnectorOrchestrator) Create( ctx context.Context, t connector.Type, diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index d63b1cd08..ba56fe56f 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -13,13 +13,14 @@ // limitations under the License. //go:generate mockgen -destination=mock/connector.go -package=mock -mock_names=ConnectorOrchestrator=ConnectorOrchestrator . ConnectorOrchestrator +//go:generate mockgen -destination=mock/connector_service.go -package=mock -mock_names=ConnectorService_InspectConnectorServer=ConnectorService_InspectConnectorServer github.com/conduitio/conduit/proto/gen/api/v1 ConnectorService_InspectConnectorServer package api import ( "context" - "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/record" "github.com/conduitio/conduit/pkg/web/api/fromproto" "github.com/conduitio/conduit/pkg/web/api/status" "github.com/conduitio/conduit/pkg/web/api/toproto" @@ -34,6 +35,7 @@ type ConnectorOrchestrator interface { Delete(ctx context.Context, id string) error Update(ctx context.Context, id string, config connector.Config) (connector.Connector, error) Validate(ctx context.Context, t connector.Type, config connector.Config) error + Inspect(ctx context.Context, id string) (chan record.Record, error) } type ConnectorAPIv1 struct { @@ -65,6 +67,33 @@ func (c *ConnectorAPIv1) ListConnectors( return &apiv1.ListConnectorsResponse{Connectors: clist}, nil } +func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, server apiv1.ConnectorService_InspectConnectorServer) error { + if req.Id == "" { + return status.ConnectorError(cerrors.ErrEmptyID) + } + + records, err := c.cs.Inspect(server.Context(), req.Id) + if err != nil { + return status.ConnectorError(cerrors.Errorf("failed to get connector by ID %v: %w", req.Id, err)) + } + + for rec := range records { + recProto, err2 := toproto.Record(rec) + if err2 != nil { + return cerrors.Errorf("failed converting record: %w", err2) + } + + err2 = server.Send(&apiv1.InspectConnectorResponse{ + Record: recProto, + }) + if err2 != nil { + return cerrors.Errorf("failed sending record: %w", err2) + } + } + + return cerrors.New("records channel closed") +} + // GetConnector returns a single Connector proto response or an error. func (c *ConnectorAPIv1) GetConnector( ctx context.Context, diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index 64ed8543e..5d82f3899 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -16,6 +16,7 @@ package api import ( "context" + "fmt" "sort" "testing" "time" @@ -23,9 +24,11 @@ import ( "github.com/conduitio/conduit/pkg/connector" connmock "github.com/conduitio/conduit/pkg/connector/mock" "github.com/conduitio/conduit/pkg/foundation/assert" + "github.com/conduitio/conduit/pkg/foundation/cchan" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/record" apimock "github.com/conduitio/conduit/pkg/web/api/mock" + "github.com/conduitio/conduit/pkg/web/api/toproto" apiv1 "github.com/conduitio/conduit/proto/gen/api/v1" "github.com/golang/mock/gomock" "github.com/google/uuid" @@ -203,6 +206,119 @@ func TestConnectorAPIv1_CreateConnector(t *testing.T) { assert.Equal(t, want, got) } +func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + csMock := apimock.NewConnectorOrchestrator(ctrl) + api := NewConnectorAPIv1(csMock) + + id := uuid.NewString() + rec := generateTestRecord() + recProto, err := toproto.Record(rec) + assert.Ok(t, err) + + records := make(chan record.Record) + + csMock.EXPECT(). + Inspect(ctx, id). + Return(records, nil). + Times(1) + + inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) + inspectServer.EXPECT().Send(gomock.Eq(&apiv1.InspectConnectorResponse{Record: recProto})) + inspectServer.EXPECT().Context().Return(ctx).AnyTimes() + + go func() { + _ = api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) + }() + records <- rec + + time.Sleep(100 * time.Millisecond) +} + +func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + csMock := apimock.NewConnectorOrchestrator(ctrl) + api := NewConnectorAPIv1(csMock) + id := uuid.NewString() + records := make(chan record.Record) + + csMock.EXPECT(). + Inspect(ctx, id). + Return(records, nil). + Times(1) + + inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) + inspectServer.EXPECT().Context().Return(ctx).AnyTimes() + errSend := cerrors.New("I'm sorry, but no.") + inspectServer.EXPECT().Send(gomock.Any()).Return(errSend) + + errC := make(chan error) + go func() { + err := api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) + errC <- err + }() + records <- generateTestRecord() + + err, b, err2 := cchan.Chan[error](errC).RecvTimeout(context.Background(), 100*time.Millisecond) + assert.Ok(t, err2) + assert.True(t, b, "expected to receive an error") + assert.True(t, cerrors.Is(err, errSend), "expected 'I'm sorry, but no.' error") +} + +func TestConnectorAPIv1_InspectConnector_Err(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl := gomock.NewController(t) + csMock := apimock.NewConnectorOrchestrator(ctrl) + api := NewConnectorAPIv1(csMock) + id := uuid.NewString() + err := cerrors.New("not found, sorry") + + csMock.EXPECT(). + Inspect(ctx, gomock.Any()). + Return(nil, err). + Times(1) + + inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) + inspectServer.EXPECT().Context().Return(ctx).AnyTimes() + + errAPI := api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) + assert.NotNil(t, errAPI) + assert.Equal( + t, + fmt.Sprintf( + "rpc error: code = Internal desc = failed to get connector by ID %v: not found, sorry", + id, + ), + errAPI.Error(), + ) +} + +func generateTestRecord() record.Record { + return record.Record{ + Position: record.Position("test-position"), + Operation: record.OperationCreate, + Metadata: record.Metadata{"metadata-key": "metadata-value"}, + Key: record.RawData{Raw: []byte("test-key")}, + Payload: record.Change{ + After: record.RawData{Raw: []byte("test-payload")}, + }, + } +} + func TestConnectorAPIv1_GetConnector(t *testing.T) { ctx := context.Background() ctrl := gomock.NewController(t) diff --git a/pkg/web/api/mock/connector.go b/pkg/web/api/mock/connector.go index fade1a74e..f9e559ba9 100644 --- a/pkg/web/api/mock/connector.go +++ b/pkg/web/api/mock/connector.go @@ -9,6 +9,7 @@ import ( reflect "reflect" connector "github.com/conduitio/conduit/pkg/connector" + record "github.com/conduitio/conduit/pkg/record" gomock "github.com/golang/mock/gomock" ) @@ -79,6 +80,21 @@ func (mr *ConnectorOrchestratorMockRecorder) Get(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*ConnectorOrchestrator)(nil).Get), arg0, arg1) } +// Inspect mocks base method. +func (m *ConnectorOrchestrator) Inspect(arg0 context.Context, arg1 string) (chan record.Record, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Inspect", arg0, arg1) + ret0, _ := ret[0].(chan record.Record) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Inspect indicates an expected call of Inspect. +func (mr *ConnectorOrchestratorMockRecorder) Inspect(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Inspect", reflect.TypeOf((*ConnectorOrchestrator)(nil).Inspect), arg0, arg1) +} + // List mocks base method. func (m *ConnectorOrchestrator) List(arg0 context.Context) map[string]connector.Connector { m.ctrl.T.Helper() diff --git a/pkg/web/api/mock/connector_service.go b/pkg/web/api/mock/connector_service.go new file mode 100644 index 000000000..fc296e0e5 --- /dev/null +++ b/pkg/web/api/mock/connector_service.go @@ -0,0 +1,133 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/proto/gen/api/v1 (interfaces: ConnectorService_InspectConnectorServer) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + apiv1 "github.com/conduitio/conduit/proto/gen/api/v1" + gomock "github.com/golang/mock/gomock" + metadata "google.golang.org/grpc/metadata" +) + +// ConnectorService_InspectConnectorServer is a mock of ConnectorService_InspectConnectorServer interface. +type ConnectorService_InspectConnectorServer struct { + ctrl *gomock.Controller + recorder *ConnectorService_InspectConnectorServerMockRecorder +} + +// ConnectorService_InspectConnectorServerMockRecorder is the mock recorder for ConnectorService_InspectConnectorServer. +type ConnectorService_InspectConnectorServerMockRecorder struct { + mock *ConnectorService_InspectConnectorServer +} + +// NewConnectorService_InspectConnectorServer creates a new mock instance. +func NewConnectorService_InspectConnectorServer(ctrl *gomock.Controller) *ConnectorService_InspectConnectorServer { + mock := &ConnectorService_InspectConnectorServer{ctrl: ctrl} + mock.recorder = &ConnectorService_InspectConnectorServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *ConnectorService_InspectConnectorServer) EXPECT() *ConnectorService_InspectConnectorServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *ConnectorService_InspectConnectorServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).Context)) +} + +// RecvMsg mocks base method. +func (m *ConnectorService_InspectConnectorServer) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *ConnectorService_InspectConnectorServer) Send(arg0 *apiv1.InspectConnectorResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).Send), arg0) +} + +// SendHeader mocks base method. +func (m *ConnectorService_InspectConnectorServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m *ConnectorService_InspectConnectorServer) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SendMsg), arg0) +} + +// SetHeader mocks base method. +func (m *ConnectorService_InspectConnectorServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *ConnectorService_InspectConnectorServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *ConnectorService_InspectConnectorServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*ConnectorService_InspectConnectorServer)(nil).SetTrailer), arg0) +} diff --git a/pkg/web/api/toproto/record.go b/pkg/web/api/toproto/record.go new file mode 100644 index 000000000..e1a2b4a15 --- /dev/null +++ b/pkg/web/api/toproto/record.go @@ -0,0 +1,98 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toproto + +import ( + "fmt" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/record" + opencdcv1 "go.buf.build/grpc/go/conduitio/conduit-connector-protocol/opencdc/v1" + "google.golang.org/protobuf/types/known/structpb" +) + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + var cTypes [1]struct{} + _ = cTypes[int(record.OperationCreate)-int(opencdcv1.Operation_OPERATION_CREATE)] + _ = cTypes[int(record.OperationUpdate)-int(opencdcv1.Operation_OPERATION_UPDATE)] + _ = cTypes[int(record.OperationDelete)-int(opencdcv1.Operation_OPERATION_DELETE)] + _ = cTypes[int(record.OperationSnapshot)-int(opencdcv1.Operation_OPERATION_SNAPSHOT)] +} + +func Record(in record.Record) (*opencdcv1.Record, error) { + key, err := Data(in.Key) + if err != nil { + return nil, err + } + payload, err := Change(in.Payload) + if err != nil { + return nil, err + } + + out := &opencdcv1.Record{ + Position: in.Position, + Operation: opencdcv1.Operation(in.Operation), + Metadata: in.Metadata, + Key: key, + Payload: payload, + } + return out, nil +} + +func Change(in record.Change) (*opencdcv1.Change, error) { + before, err := Data(in.Before) + if err != nil { + return nil, fmt.Errorf("error converting before: %w", err) + } + + after, err := Data(in.After) + if err != nil { + return nil, fmt.Errorf("error converting after: %w", err) + } + + out := opencdcv1.Change{ + Before: before, + After: after, + } + return &out, nil +} + +func Data(in record.Data) (*opencdcv1.Data, error) { + if in == nil { + return nil, nil + } + + switch v := in.(type) { + case record.RawData: + return &opencdcv1.Data{ + Data: &opencdcv1.Data_RawData{ + RawData: v.Raw, + }, + }, nil + case record.StructuredData: + data, err := structpb.NewStruct(v) + if err != nil { + return nil, err + } + return &opencdcv1.Data{ + Data: &opencdcv1.Data_StructuredData{ + StructuredData: data, + }, + }, nil + default: + return nil, cerrors.Errorf("invalid Data type '%T'", in) + } +} From ae1408f319ac44adebb6d0d9a3e1dc4821b1d3b5 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Mon, 21 Nov 2022 14:05:46 +0100 Subject: [PATCH 02/12] linter --- pkg/web/api/connector_v1.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index ba56fe56f..a43cc851f 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -18,6 +18,7 @@ package api import ( "context" + "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/record" From 1a944582e6ebad730221383f8b8a1bbb9c93a6e6 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Mon, 21 Nov 2022 14:06:27 +0100 Subject: [PATCH 03/12] linter --- pkg/web/api/connector_v1.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index a43cc851f..51ef7e2f5 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -18,7 +18,7 @@ package api import ( "context" - + "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/record" From 3a0b31bd20f3d4ded935e656f74419f72ca97248 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Tue, 22 Nov 2022 10:58:40 +0100 Subject: [PATCH 04/12] Update pkg/inspector/inspector_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lovro Mažgon --- pkg/inspector/inspector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index 63e6576c9..e8e780597 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -68,7 +68,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) { Position: record.Position("test-pos"), } underTest.Send(context.Background(), r) - assertGotRecord(is.New(t), s, r) + assertGotRecord(is, s, r) s.Close() underTest.Send( From e7b3306af1a1b6815746d1b6c950cde39b5facad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Tue, 22 Nov 2022 11:08:17 +0100 Subject: [PATCH 05/12] Update pkg/inspector/inspector_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lovro Mažgon --- pkg/inspector/inspector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index e8e780597..9106ba45d 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -90,7 +90,7 @@ func TestInspector_Send_SessionCtxCanceled(t *testing.T) { Position: record.Position("test-pos"), } underTest.Send(context.Background(), r) - assertGotRecord(is.New(t), s, r) + assertGotRecord(is, s, r) cancel() From 5067241b30c6168e6bce63a8983b813c202f3e72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Tue, 22 Nov 2022 11:08:57 +0100 Subject: [PATCH 06/12] Update pkg/inspector/inspector_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lovro Mažgon --- pkg/inspector/inspector_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index 9106ba45d..9e8afa52e 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -94,7 +94,8 @@ func TestInspector_Send_SessionCtxCanceled(t *testing.T) { cancel() - _, got, _ := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + _, got, err := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) + is.NoError(err) is.True(!got) // expected no record } From 8f40c222cfe965a7f4639df6a4de266836194b50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Tue, 22 Nov 2022 11:18:58 +0100 Subject: [PATCH 07/12] Update pkg/web/api/connector_v1_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lovro Mažgon --- pkg/web/api/connector_v1_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index 5d82f3899..0b296fb8a 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -259,15 +259,13 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) { errSend := cerrors.New("I'm sorry, but no.") inspectServer.EXPECT().Send(gomock.Any()).Return(errSend) - errC := make(chan error) go func() { - err := api.InspectConnector( - &apiv1.InspectConnectorRequest{Id: id}, - inspectServer, - ) - errC <- err + records <- generateTestRecord() }() - records <- generateTestRecord() + err := api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) err, b, err2 := cchan.Chan[error](errC).RecvTimeout(context.Background(), 100*time.Millisecond) assert.Ok(t, err2) From 7865b60463dae19df0261b6abab4109eacdfffe7 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Tue, 22 Nov 2022 11:26:07 +0100 Subject: [PATCH 08/12] pr feedback 2 --- pkg/foundation/log/fields.go | 2 ++ pkg/inspector/inspector.go | 17 ++++++++++------- pkg/inspector/inspector_test.go | 2 +- pkg/orchestrator/connectors.go | 2 +- pkg/web/api/connector_v1.go | 2 +- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/foundation/log/fields.go b/pkg/foundation/log/fields.go index 432406381..e70c7fc96 100644 --- a/pkg/foundation/log/fields.go +++ b/pkg/foundation/log/fields.go @@ -32,4 +32,6 @@ const ( PluginTypeField = "plugin_type" PluginNameField = "plugin_name" PluginPathField = "plugin_path" + + InspectorSessionID = "inspector_session_id" ) diff --git a/pkg/inspector/inspector.go b/pkg/inspector/inspector.go index b00757e9a..5b729b193 100644 --- a/pkg/inspector/inspector.go +++ b/pkg/inspector/inspector.go @@ -29,16 +29,17 @@ import ( type Session struct { C chan record.Record + id string logger log.CtxLogger onClose func() } -func (s *Session) Close() { +func (s *Session) close() { s.onClose() close(s.C) } -// send sends a record to the session's channel. +// send a record to the session's channel. // If the channel has already reached its capacity, // the record will be ignored. func (s *Session) send(ctx context.Context, r record.Record) { @@ -47,6 +48,7 @@ func (s *Session) send(ctx context.Context, r record.Record) { default: s.logger. Warn(ctx). + Str(log.InspectorSessionID, s.id). Msg("session buffer full, record will be dropped") } } @@ -73,7 +75,7 @@ func New(logger log.CtxLogger, bufferSize int) *Inspector { } } -// Send sends the given record to all registered sessions. +// Send the given record to all registered sessions. // The method does not wait for consumers to get the records. func (i *Inspector) Send(ctx context.Context, r record.Record) { // copy metadata, to prevent issues when concurrently accessing the metadata @@ -105,6 +107,7 @@ func (i *Inspector) NewSession(ctx context.Context) *Session { id := uuid.NewString() s := &Session{ C: make(chan record.Record, i.bufferSize), + id: id, logger: i.logger.WithComponent("inspector.Session"), onClose: func() { i.remove(id) @@ -115,7 +118,7 @@ func (i *Inspector) NewSession(ctx context.Context) *Session { s.logger. Info(context.Background()). Msgf("context canceled: %v", ctx.Err()) - s.Close() + s.close() }() i.lock.Lock() @@ -124,12 +127,12 @@ func (i *Inspector) NewSession(ctx context.Context) *Session { i.sessions[id] = s i.logger. Info(context.Background()). - Str("session_id", id). + Str(log.InspectorSessionID, id). Msg("session created") return s } -// remove removes a session with given ID from this Inspector. +// remove a session with given ID from this Inspector. func (i *Inspector) remove(id string) { i.lock.Lock() defer i.lock.Unlock() @@ -137,6 +140,6 @@ func (i *Inspector) remove(id string) { delete(i.sessions, id) i.logger. Info(context.Background()). - Str("session_id", id). + Str(log.InspectorSessionID, id). Msg("session removed") } diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index e8e780597..986e58f8d 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -70,7 +70,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) { underTest.Send(context.Background(), r) assertGotRecord(is, s, r) - s.Close() + s.close() underTest.Send( context.Background(), record.Record{ diff --git a/pkg/orchestrator/connectors.go b/pkg/orchestrator/connectors.go index 9c48745d8..51d5d68c1 100644 --- a/pkg/orchestrator/connectors.go +++ b/pkg/orchestrator/connectors.go @@ -30,7 +30,7 @@ type ConnectorOrchestrator base func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (chan record.Record, error) { conn, err := c.Get(ctx, id) if err != nil { - return nil, cerrors.Errorf("failed to get connector by ID %v: %w", id, err) + return nil, err } return conn.Inspect(ctx).C, nil diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index 51ef7e2f5..c1de022d9 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -75,7 +75,7 @@ func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, se records, err := c.cs.Inspect(server.Context(), req.Id) if err != nil { - return status.ConnectorError(cerrors.Errorf("failed to get connector by ID %v: %w", req.Id, err)) + return status.ConnectorError(cerrors.Errorf("failed to get connector: %w", err)) } for rec := range records { From dd8b1b48c8a2c721989a349dfa1168396c460cf5 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Tue, 22 Nov 2022 11:28:27 +0100 Subject: [PATCH 09/12] linter --- pkg/web/api/connector_v1_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index 5d82f3899..d27beeb01 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -16,7 +16,6 @@ package api import ( "context" - "fmt" "sort" "testing" "time" @@ -299,10 +298,7 @@ func TestConnectorAPIv1_InspectConnector_Err(t *testing.T) { assert.NotNil(t, errAPI) assert.Equal( t, - fmt.Sprintf( - "rpc error: code = Internal desc = failed to get connector by ID %v: not found, sorry", - id, - ), + "rpc error: code = Internal desc = failed to get connector: not found, sorry", errAPI.Error(), ) } From 83c6152588fac5cd55be60b6acb0cfb0ad98c22d Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Tue, 22 Nov 2022 11:34:29 +0100 Subject: [PATCH 10/12] Revert "Update pkg/web/api/connector_v1_test.go" This reverts commit 8f40c222cfe965a7f4639df6a4de266836194b50. --- pkg/web/api/connector_v1_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index d37243b71..d27beeb01 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -258,13 +258,15 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) { errSend := cerrors.New("I'm sorry, but no.") inspectServer.EXPECT().Send(gomock.Any()).Return(errSend) + errC := make(chan error) go func() { - records <- generateTestRecord() + err := api.InspectConnector( + &apiv1.InspectConnectorRequest{Id: id}, + inspectServer, + ) + errC <- err }() - err := api.InspectConnector( - &apiv1.InspectConnectorRequest{Id: id}, - inspectServer, - ) + records <- generateTestRecord() err, b, err2 := cchan.Chan[error](errC).RecvTimeout(context.Background(), 100*time.Millisecond) assert.Ok(t, err2) From 623ca25cb7db84be1ecc4689f0c631e9ad20ab3e Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Tue, 22 Nov 2022 12:10:43 +0100 Subject: [PATCH 11/12] pr feedback 3 --- pkg/inspector/inspector.go | 21 ++++++++++++++++----- pkg/inspector/inspector_test.go | 4 ++-- pkg/orchestrator/connectors.go | 6 +++--- pkg/web/api/connector_v1.go | 10 +++++----- pkg/web/api/connector_v1_test.go | 17 +++++++++++------ pkg/web/api/mock/connector.go | 6 +++--- 6 files changed, 40 insertions(+), 24 deletions(-) diff --git a/pkg/inspector/inspector.go b/pkg/inspector/inspector.go index 5b729b193..af0de6305 100644 --- a/pkg/inspector/inspector.go +++ b/pkg/inspector/inspector.go @@ -29,14 +29,25 @@ import ( type Session struct { C chan record.Record - id string - logger log.CtxLogger - onClose func() + id string + logger log.CtxLogger + onClose func() + closeReason error + lock sync.Mutex } -func (s *Session) close() { +func (s *Session) close(reason error) { s.onClose() close(s.C) + s.lock.Lock() + defer s.lock.Unlock() + s.closeReason = reason +} + +func (s *Session) CloseReason() error { + s.lock.Lock() + defer s.lock.Unlock() + return s.closeReason } // send a record to the session's channel. @@ -118,7 +129,7 @@ func (i *Inspector) NewSession(ctx context.Context) *Session { s.logger. Info(context.Background()). Msgf("context canceled: %v", ctx.Err()) - s.close() + s.close(ctx.Err()) }() i.lock.Lock() diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index 89a072f08..30631d1fd 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -70,7 +70,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) { underTest.Send(context.Background(), r) assertGotRecord(is, s, r) - s.close() + s.close(nil) underTest.Send( context.Background(), record.Record{ @@ -95,7 +95,7 @@ func TestInspector_Send_SessionCtxCanceled(t *testing.T) { cancel() _, got, err := cchan.Chan[record.Record](s.C).RecvTimeout(context.Background(), 100*time.Millisecond) - is.NoError(err) + is.NoErr(err) is.True(!got) // expected no record } diff --git a/pkg/orchestrator/connectors.go b/pkg/orchestrator/connectors.go index 51d5d68c1..e2cdb1c1c 100644 --- a/pkg/orchestrator/connectors.go +++ b/pkg/orchestrator/connectors.go @@ -20,20 +20,20 @@ import ( "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/rollback" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/pipeline" - "github.com/conduitio/conduit/pkg/record" "github.com/google/uuid" ) type ConnectorOrchestrator base -func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (chan record.Record, error) { +func (c *ConnectorOrchestrator) Inspect(ctx context.Context, id string) (*inspector.Session, error) { conn, err := c.Get(ctx, id) if err != nil { return nil, err } - return conn.Inspect(ctx).C, nil + return conn.Inspect(ctx), nil } func (c *ConnectorOrchestrator) Create( diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index c1de022d9..677d66d5d 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -21,7 +21,7 @@ import ( "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/conduitio/conduit/pkg/record" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/web/api/fromproto" "github.com/conduitio/conduit/pkg/web/api/status" "github.com/conduitio/conduit/pkg/web/api/toproto" @@ -36,7 +36,7 @@ type ConnectorOrchestrator interface { Delete(ctx context.Context, id string) error Update(ctx context.Context, id string, config connector.Config) (connector.Connector, error) Validate(ctx context.Context, t connector.Type, config connector.Config) error - Inspect(ctx context.Context, id string) (chan record.Record, error) + Inspect(ctx context.Context, id string) (*inspector.Session, error) } type ConnectorAPIv1 struct { @@ -73,12 +73,12 @@ func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, se return status.ConnectorError(cerrors.ErrEmptyID) } - records, err := c.cs.Inspect(server.Context(), req.Id) + session, err := c.cs.Inspect(server.Context(), req.Id) if err != nil { return status.ConnectorError(cerrors.Errorf("failed to get connector: %w", err)) } - for rec := range records { + for rec := range session.C { recProto, err2 := toproto.Record(rec) if err2 != nil { return cerrors.Errorf("failed converting record: %w", err2) @@ -92,7 +92,7 @@ func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, se } } - return cerrors.New("records channel closed") + return cerrors.Errorf("inspector session closed: %w", session.CloseReason()) } // GetConnector returns a single Connector proto response or an error. diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index d27beeb01..4e7ebeb5c 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -25,6 +25,8 @@ import ( "github.com/conduitio/conduit/pkg/foundation/assert" "github.com/conduitio/conduit/pkg/foundation/cchan" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/inspector" "github.com/conduitio/conduit/pkg/record" apimock "github.com/conduitio/conduit/pkg/web/api/mock" "github.com/conduitio/conduit/pkg/web/api/toproto" @@ -217,11 +219,12 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) { recProto, err := toproto.Record(rec) assert.Ok(t, err) - records := make(chan record.Record) + ins := inspector.New(log.Nop(), 10) + session := ins.NewSession(ctx) csMock.EXPECT(). Inspect(ctx, id). - Return(records, nil). + Return(session, nil). Times(1) inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) @@ -234,7 +237,7 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) { inspectServer, ) }() - records <- rec + ins.Send(ctx, rec) time.Sleep(100 * time.Millisecond) } @@ -246,11 +249,13 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) { csMock := apimock.NewConnectorOrchestrator(ctrl) api := NewConnectorAPIv1(csMock) id := uuid.NewString() - records := make(chan record.Record) + + ins := inspector.New(log.Nop(), 10) + session := ins.NewSession(ctx) csMock.EXPECT(). Inspect(ctx, id). - Return(records, nil). + Return(session, nil). Times(1) inspectServer := apimock.NewConnectorService_InspectConnectorServer(ctrl) @@ -266,7 +271,7 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) { ) errC <- err }() - records <- generateTestRecord() + ins.Send(ctx, generateTestRecord()) err, b, err2 := cchan.Chan[error](errC).RecvTimeout(context.Background(), 100*time.Millisecond) assert.Ok(t, err2) diff --git a/pkg/web/api/mock/connector.go b/pkg/web/api/mock/connector.go index f9e559ba9..5b0dee76c 100644 --- a/pkg/web/api/mock/connector.go +++ b/pkg/web/api/mock/connector.go @@ -9,7 +9,7 @@ import ( reflect "reflect" connector "github.com/conduitio/conduit/pkg/connector" - record "github.com/conduitio/conduit/pkg/record" + inspector "github.com/conduitio/conduit/pkg/inspector" gomock "github.com/golang/mock/gomock" ) @@ -81,10 +81,10 @@ func (mr *ConnectorOrchestratorMockRecorder) Get(arg0, arg1 interface{}) *gomock } // Inspect mocks base method. -func (m *ConnectorOrchestrator) Inspect(arg0 context.Context, arg1 string) (chan record.Record, error) { +func (m *ConnectorOrchestrator) Inspect(arg0 context.Context, arg1 string) (*inspector.Session, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Inspect", arg0, arg1) - ret0, _ := ret[0].(chan record.Record) + ret0, _ := ret[0].(*inspector.Session) ret1, _ := ret[1].(error) return ret0, ret1 } From f485aa5eda6bac1f5d4e7e2f3f4071f283c1b4c4 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 30 Nov 2022 13:13:21 +0100 Subject: [PATCH 12/12] remove field --- pkg/inspector/inspector.go | 21 +++++---------------- pkg/inspector/inspector_test.go | 2 +- pkg/web/api/connector_v1.go | 2 +- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/pkg/inspector/inspector.go b/pkg/inspector/inspector.go index af0de6305..5b729b193 100644 --- a/pkg/inspector/inspector.go +++ b/pkg/inspector/inspector.go @@ -29,25 +29,14 @@ import ( type Session struct { C chan record.Record - id string - logger log.CtxLogger - onClose func() - closeReason error - lock sync.Mutex + id string + logger log.CtxLogger + onClose func() } -func (s *Session) close(reason error) { +func (s *Session) close() { s.onClose() close(s.C) - s.lock.Lock() - defer s.lock.Unlock() - s.closeReason = reason -} - -func (s *Session) CloseReason() error { - s.lock.Lock() - defer s.lock.Unlock() - return s.closeReason } // send a record to the session's channel. @@ -129,7 +118,7 @@ func (i *Inspector) NewSession(ctx context.Context) *Session { s.logger. Info(context.Background()). Msgf("context canceled: %v", ctx.Err()) - s.close(ctx.Err()) + s.close() }() i.lock.Lock() diff --git a/pkg/inspector/inspector_test.go b/pkg/inspector/inspector_test.go index 30631d1fd..328bbb104 100644 --- a/pkg/inspector/inspector_test.go +++ b/pkg/inspector/inspector_test.go @@ -70,7 +70,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) { underTest.Send(context.Background(), r) assertGotRecord(is, s, r) - s.close(nil) + s.close() underTest.Send( context.Background(), record.Record{ diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index 677d66d5d..68d3c1be3 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -92,7 +92,7 @@ func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, se } } - return cerrors.Errorf("inspector session closed: %w", session.CloseReason()) + return cerrors.New("inspector session closed") } // GetConnector returns a single Connector proto response or an error.