This package looks promising. The concept of an ErrorSink is exactly what I was looking for. In my scenario I have a pipeline with multiple stages. Two stages need to make network calls. Some errors are fatal and the pipeline has to be canceled if one of these errors occur. When I use ErrorSink.Fatal() in multiple stages I get a panic:
panic: send on closed channel
goroutine 7 [running]:
github.com/splunk/pipelines.(*ErrorSink).Fatal(...)
/home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:567
main.main.func1({0x4adc11, 0x1})
/home/ron/go/src/bitbucket.org/innius/shared-script-pipeline/main.go:18 +0xdb
github.com/splunk/pipelines.doMap[...]({0x4cf300?, 0xc00002e080}, 0xc000070180, 0xc000014290, 0xc00002a180)
/home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:78 +0x124
github.com/splunk/pipelines.Map[...].func1({0xc000012030?, 0x0?, 0x0?})
/home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:62 +0x4c
github.com/splunk/pipelines.doWithConf[...].func1()
/home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:444 +0x98
created by github.com/splunk/pipelines.doWithConf[...]
/home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:436 +0x2ea
exit status 2
My test program looks like this:
package main
import (
"context"
"fmt"
"github.com/splunk/pipelines"
)
func main() {
ctx := context.Background()
input := pipelines.Chan([]string{"a", "b", "c", "d", "e", "f"})
ctx, errs := pipelines.NewErrorSink(ctx)
stage1 := pipelines.Map(ctx, input, func(item string) string {
if item == "c" {
errs.Fatal(fmt.Errorf("%s caused an error at stage 2", item))
}
return item
})
stage2 := pipelines.Map(ctx, stage1, func(item string) string {
if item == "b" {
errs.Fatal(fmt.Errorf("%s caused an error at stage 2", item))
}
return item
})
pipelines.Drain(ctx, stage2)
}
Did I miss something or is this something which should work?
This package looks promising. The concept of an ErrorSink is exactly what I was looking for. In my scenario I have a pipeline with multiple stages. Two stages need to make network calls. Some errors are fatal and the pipeline has to be canceled if one of these errors occur. When I use
ErrorSink.Fatal()in multiple stages I get a panic:My test program looks like this:
Did I miss something or is this something which should work?