Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ go-taskflow is a general-purpose task-parallel programming framework for Go, ins
| ![](image/simple.svg) | ![](image/subflow.svg) | ![](image/condition.svg) | ![](image/loop.svg) |

- **Priority Task Scheduling**: Assign task priorities to ensure higher-priority tasks are executed first.
- **Built-in Visualization and Profiling Tools**: Generate visual representations of tasks and profile task execution performance using integrated tools, simplifying debugging and optimization.
- **Built-in Visualization, Profiling and Tracing**: Generate visual representations of tasks, profile task execution with flamegraph, and capture Chrome Trace events for timeline analysis.

## Use Cases

Expand All @@ -40,8 +40,11 @@ Import the latest version of go-taskflow using:
```bash
go get -u github.com/noneback/go-taskflow
```

## Documentation

[DeepWiki Page](https://deepwiki.com/noneback/go-taskflow)

## Example

Below is an example of using go-taskflow to implement a parallel merge sort:
Expand Down Expand Up @@ -118,7 +121,7 @@ func main() {
}
done.Succeed(sortTasks...)

executor := gtf.NewExecutor(1000)
executor := gtf.NewExecutor(1000, gtf.WithProfiler())

executor.Run(tf).Wait()

Expand Down Expand Up @@ -156,6 +159,22 @@ ok github.com/noneback/go-taskflow/benchmark 5.606s

Conditional nodes in go-taskflow behave similarly to those in [taskflow-cpp](https://github.com/taskflow/taskflow). They participate in both conditional control and looping. To avoid common pitfalls, refer to the [Conditional Tasking documentation](https://taskflow.github.io/taskflow/ConditionalTasking.html).

## Executor Options

`NewExecutor` accepts functional options to configure behavior:

```go
executor := gtf.NewExecutor(1000,
gtf.WithProfiler(), // enable flamegraph profiling
gtf.WithTracer(), // enable Chrome Trace recording
)
```

| Option | Description |
|:---|:---|
| `WithProfiler()` | Enable flamegraph profiling. Required before calling `executor.Profile()`. |
| `WithTracer()` | Enable Chrome Trace recording. Required before calling `executor.Trace()`. |

## Error Handling in go-taskflow

In Go, `errors` are values, and it is the user's responsibility to handle them appropriately. Only unrecovered `panic` events are managed by the framework. If a `panic` occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them.
Expand Down Expand Up @@ -189,9 +208,12 @@ The `Dump` method generates raw strings in DOT format. Use the `dot` tool to cre

## Profiling Taskflows

To profile a taskflow, use the `Profile` method:
To profile a taskflow, first enable the profiler with `WithProfiler()`, then call `Profile`:

```go
executor := gtf.NewExecutor(1000, gtf.WithProfiler())
executor.Run(tf).Wait()

if err := executor.Profile(os.Stdout); err != nil {
log.Fatal(err)
}
Expand All @@ -201,6 +223,21 @@ The `Profile` method generates raw strings in flamegraph format. Use the `flameg

![flg](image/fl.svg)

## Tracing Taskflows

To trace a taskflow, first enable the tracer with `WithTracer()`, then call `Trace`:

```go
executor := gtf.NewExecutor(1000, gtf.WithTracer())
executor.Run(tf).Wait()

if err := executor.Trace(os.Stdout); err != nil {
log.Fatal(err)
}
```

The `Trace` method outputs JSON in [Chrome Trace Event format](https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU). Open it in `chrome://tracing` or [Perfetto UI](https://ui.perfetto.dev/) for visualization.

## Stargazer

[![Star History Chart](https://api.star-history.com/svg?repos=noneback/go-taskflow&type=Date)](https://star-history.com/#noneback/go-taskflow&Date)
Expand Down
60 changes: 60 additions & 0 deletions examples/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"
"log"
"os"
"runtime"
"time"

gotaskflow "github.com/noneback/go-taskflow"
)

// This example demonstrates how to capture Chrome Trace Event data
// using WithTracer(). The output can be visualized in chrome://tracing
// or Perfetto UI (https://ui.perfetto.dev/).
func main() {
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()*4), gotaskflow.WithTracer())

tf := gotaskflow.NewTaskFlow("tracing-demo")

// Stage 1: parallel fetch and load
fetch := tf.NewTask("fetch", func() {
fmt.Println("fetch")
time.Sleep(10 * time.Millisecond)
})
load := tf.NewTask("load", func() {
fmt.Println("load")
time.Sleep(8 * time.Millisecond)
})

// Stage 2: process depends on both fetch and load
process := tf.NewSubflow("process", func(sf *gotaskflow.Subflow) {
transform := sf.NewTask("transform", func() {
fmt.Println("transform")
time.Sleep(5 * time.Millisecond)
})
enrich := sf.NewTask("enrich", func() {
fmt.Println("enrich")
time.Sleep(4 * time.Millisecond)
})
transform.Precede(enrich)
})

// Stage 3: output waits for process to complete
output := tf.NewTask("output", func() {
fmt.Println("output")
time.Sleep(3 * time.Millisecond)
})

fetch.Precede(process)
load.Precede(process)
process.Precede(output)

executor.Run(tf).Wait()

fmt.Println("--- Chrome Trace JSON (open in chrome://tracing or https://ui.perfetto.dev) ---")
if err := executor.Trace(os.Stdout); err != nil {
log.Fatal(err)
}
}
4 changes: 2 additions & 2 deletions examples/word_count/word_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
gotaskflow "github.com/noneback/go-taskflow"
)

// 配置参数结构体
// MRConfig holds configuration parameters for the MapReduce word count job.
type MRConfig struct {
NumMappers int
NumReducers int
Expand Down Expand Up @@ -255,7 +255,7 @@ Chapter 8, Streaming SQL, which investigates the meaning of streaming within the
Chapter 9, Streaming Joins, which surveys a variety of different join types, analyzes their behavior within the context of streaming, and finally looks in detail at a useful but ill-supported streaming join use case: temporal validity windows.
Finally, closing out the book is Chapter 10, The Evolution of Large-Scale Data Processing, which strolls through a focused history of the MapReduce lineage of data processing systems, examining some of the important contributions that have evolved streaming systems into what they are today.`

// 配置参数
// configure the job parameters
cfg := MRConfig{
NumMappers: 4,
NumReducers: 2,
Expand Down
88 changes: 61 additions & 27 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type Executor interface {
Wait() // Wait block until all tasks finished
Profile(w io.Writer) error // Profile write flame graph raw text into w
Trace(w io.Writer) error // Trace write Chrome Trace Event data into w
Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow
}

Expand All @@ -26,23 +27,27 @@ type innerExecutorImpl struct {
wq *utils.Queue[*innerNode]
wg *sync.WaitGroup
profiler *profiler
tracer *tracer
mu *sync.Mutex
}

// NewExecutor return a Executor with a specified max goroutine concurrency(recommend a value bigger than Runtime.NumCPU, **MUST** bigger than num(subflows). )
func NewExecutor(concurrency uint) Executor {
// NewExecutor returns an Executor with the specified concurrency and options.
// concurrency must be > 0. Recommend concurrency > runtime.NumCPU and MUST > num(subflows).
func NewExecutor(concurrency uint, opts ...Option) Executor {
if concurrency == 0 {
panic("executor concurrency cannot be zero")
}
t := newProfiler()
return &innerExecutorImpl{
e := &innerExecutorImpl{
concurrency: concurrency,
pool: utils.NewCopool(concurrency),
wq: utils.NewQueue[*innerNode](false),
wg: &sync.WaitGroup{},
profiler: t,
mu: &sync.Mutex{},
}
for _, opt := range opts {
opt(e)
}
e.pool = utils.NewCopool(e.concurrency)
return e
}

// Run start to schedule and execute taskflow
Expand Down Expand Up @@ -97,22 +102,44 @@ func (e *innerExecutorImpl) sche_successors(node *innerNode) {
e.schedule(candidate...)
}

// record submits the span to active observers.
// ok=true means the node completed without panic; profiler only records successful spans.
func (e *innerExecutorImpl) record(s *span, ok bool) {
if ok && e.profiler != nil {
e.profiler.AddSpan(s)
}
if e.tracer != nil {
e.tracer.AddEvent(s)
}
}

// getDependentNames extracts predecessor task names from a node.
func getDependentNames(node *innerNode) []string {
if len(node.dependents) == 0 {
return nil
}
names := make([]string, len(node.dependents))
for i, dep := range node.dependents {
names[i] = dep.name
}
return names
}

func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *Static) func() {
return func() {
span := span{extra: attr{
typ: nodeStatic,
name: node.name,
}, begin: time.Now(), parent: parentSpan}
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}

defer func() {
span.cost = time.Since(span.begin)
if r := recover(); r != nil {
r := recover()
if r != nil {
node.g.canceled.Store(true)
log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name)
log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
} else {
e.profiler.AddSpan(&span) // remove canceled node span
log.Printf("[go-taskflow] graph %q canceled: static task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
}
e.record(&span, r == nil)

node.drop()
e.sche_successors(node)
Expand All @@ -132,18 +159,17 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *
span := span{extra: attr{
typ: nodeSubflow,
name: node.name,
}, begin: time.Now(), parent: parentSpan}
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}

defer func() {
span.cost = time.Since(span.begin)
if r := recover(); r != nil {
log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name)
log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack())
r := recover()
if r != nil {
log.Printf("[go-taskflow] graph %q canceled: subflow %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
node.g.canceled.Store(true)
p.g.canceled.Store(true)
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}
e.record(&span, r == nil)

e.scheduleGraph(node.g, p.g, &span)
node.drop()
Expand All @@ -168,17 +194,16 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p
span := span{extra: attr{
typ: nodeCondition,
name: node.name,
}, begin: time.Now(), parent: parentSpan}
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}

defer func() {
span.cost = time.Since(span.begin)
if r := recover(); r != nil {
r := recover()
if r != nil {
node.g.canceled.Store(true)
log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name)
log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
} else {
e.profiler.AddSpan(&span) // remove canceled node span
log.Printf("[go-taskflow] graph %q canceled: condition task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
}
e.record(&span, r == nil)
node.drop()
// e.sche_successors(node)
node.g.deref()
Expand Down Expand Up @@ -222,11 +247,10 @@ func (e *innerExecutorImpl) pushIntoQueue(node *innerNode) {
func (e *innerExecutorImpl) schedule(nodes ...*innerNode) {
for _, node := range nodes {
if node.g.canceled.Load() {
// no need
// graph already canceled, skip scheduling
node.g.scheCond.L.Lock()
node.g.scheCond.Signal()
node.g.scheCond.L.Unlock()
log.Printf("node %v is not scheduled, since graph %v is canceled\n", node.name, node.g.name)
return
}
e.wg.Add(1)
Expand All @@ -248,7 +272,6 @@ func (e *innerExecutorImpl) scheduleGraph(parentg, g *eGraph, parentSpan *span)
e.schedule(g.entries...)
if !e.invokeGraph(g, parentSpan) && parentg != nil {
parentg.canceled.Store(true)
log.Printf("graph %s canceled, since subgraph %s is canceled\n", parentg.name, g.name)
}

g.scheCond.Signal()
Expand All @@ -261,5 +284,16 @@ func (e *innerExecutorImpl) Wait() {

// Profile write flame graph raw text into w
func (e *innerExecutorImpl) Profile(w io.Writer) error {
if e.profiler == nil {
return nil
}
return e.profiler.draw(w)
}

// Trace write Chrome Trace Event data into w
func (e *innerExecutorImpl) Trace(w io.Writer) error {
if e.tracer == nil {
return nil
}
return e.tracer.draw(w)
}
2 changes: 1 addition & 1 deletion executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestExecutor(t *testing.T) {
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()), gotaskflow.WithProfiler())
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
tf.NewTask("A", func() {
Expand Down
Loading
Loading