Skip to content

Commit 8ff1e1a

Browse files
committed
parallel workflow
Signed-off-by: Jean-Laurent de Morlhon <jeanlaurent@morlhon.net>
1 parent 1d6956c commit 8ff1e1a

11 files changed

Lines changed: 504 additions & 84 deletions

File tree

cagent-schema.json

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,15 +428,36 @@
428428
"type": {
429429
"type": "string",
430430
"description": "Type of workflow step",
431-
"enum": ["agent"]
431+
"enum": ["agent", "parallel"]
432432
},
433433
"name": {
434434
"type": "string",
435-
"description": "Name of the agent to run"
435+
"description": "Name of the agent to run (for type 'agent')"
436+
},
437+
"steps": {
438+
"type": "array",
439+
"description": "List of agent names to run in parallel (for type 'parallel')",
440+
"items": {
441+
"type": "string"
442+
}
436443
}
437444
},
438-
"required": ["type", "name"],
439-
"additionalProperties": false
445+
"required": ["type"],
446+
"additionalProperties": false,
447+
"oneOf": [
448+
{
449+
"properties": {
450+
"type": { "const": "agent" }
451+
},
452+
"required": ["name"]
453+
},
454+
{
455+
"properties": {
456+
"type": { "const": "parallel" }
457+
},
458+
"required": ["steps"]
459+
}
460+
]
440461
}
441462
}
442463
}

examples/README.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,22 @@ These are more advanced examples, most of them involve some sort of MCP server t
4646

4747
## **Workflow Configurations**
4848

49-
These examples demonstrate sequential workflow execution where multiple agents are chained together. Each agent processes the output from the previous agent, creating a pipeline of transformations. Workflows don't require a root agent - they execute agents in the order defined in the workflow section.
49+
These examples demonstrate workflow execution where multiple agents are chained together in sequential or parallel patterns. Workflows support both sequential execution (one agent after another) and parallel execution (multiple agents running concurrently). Workflows don't require a root agent - they execute agents in the order defined in the workflow section.
5050

5151
See [WORKFLOW_README.md](WORKFLOW_README.md) for detailed documentation.
5252

53-
| Name | Description/Purpose | Steps | Models Used |
54-
|------------------------------------------------------------------|-------------------------------------------|-------|-------------|
55-
| [story_workflow.yaml](story_workflow.yaml) | Creative writing workflow | 3 | OpenAI GPT-4o |
56-
| [product_description_workflow.yaml](product_description_workflow.yaml) | Marketing content generation | 3 | OpenAI GPT-4o |
57-
| [joke_workflow.yaml](joke_workflow.yaml) | Joke creation, translation, and formatting | 3 | OpenAI GPT-4o |
53+
| Name | Description/Purpose | Steps | Execution Pattern | Models Used |
54+
|------------------------------------------------------------------|-------------------------------------------|-------|-------------------|-------------|
55+
| [story_workflow.yaml](story_workflow.yaml) | Creative writing workflow | 3 | Sequential | OpenAI GPT-4o |
56+
| [product_description_workflow.yaml](product_description_workflow.yaml) | Marketing content generation | 3 | Sequential | OpenAI GPT-4o |
57+
| [joke_workflow.yaml](joke_workflow.yaml) | Joke creation, translation, and formatting | 3 | Sequential | OpenAI GPT-4o |
58+
| [parallel_translation_workflow.yaml](parallel_translation_workflow.yaml) | Multi-language translation in parallel | 3 (1+3+1) | Mixed (Sequential + Parallel) | OpenAI GPT-4o |
59+
| [parallel_sorting_workflow.yaml](parallel_sorting_workflow.yaml) | Compare sorting algorithms concurrently | 3 (1+4+1) | Mixed (Sequential + Parallel) | OpenAI GPT-4o |
5860

5961
**How workflows work:**
60-
- Agents execute sequentially in the order defined
61-
- Output from each agent becomes input for the next
62+
- **Sequential**: Agents execute one after another in order defined
63+
- **Parallel**: Multiple agents run concurrently, processing the same input
64+
- Output from each step becomes input for the next step
6265
- No root agent required
6366
- Run with: `./bin/cagent run examples/story_workflow.yaml`
6467

examples/WORKFLOW_README.md

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Sequential Workflow Execution
1+
# Workflow Execution
22

3-
This directory contains examples of sequential workflow execution in cagent. Workflows allow you to chain multiple agents together, where each agent processes the output from the previous agent.
3+
This directory contains examples of workflow execution in cagent. Workflows allow you to chain multiple agents together in sequential or parallel execution patterns, where agents process and transform data through a defined pipeline.
44

55
## Examples
66

@@ -39,14 +39,54 @@ The `joke_workflow.yaml` demonstrates a simple two-step comedy workflow:
3939
./bin/cagent run examples/joke_workflow.yaml
4040
```
4141

42+
### Parallel Translation Workflow
43+
44+
The `parallel_translation_workflow.yaml` demonstrates parallel execution where multiple agents process the same input concurrently:
45+
46+
1. **source_text** - Generates a technical explanation of Docker containers
47+
2. **Parallel Step** - Three translation agents run simultaneously:
48+
- **translate_spanish** - Translates to Spanish
49+
- **translate_french** - Translates to French
50+
- **translate_japanese** - Translates to Japanese
51+
3. **formatter** - Combines all translations into a formatted output
52+
53+
```bash
54+
./bin/cagent run examples/parallel_translation_workflow.yaml
55+
```
56+
57+
### Parallel Sorting Workflow
58+
59+
The `parallel_sorting_workflow.yaml` shows parallel processing with compute-intensive tasks:
60+
61+
1. **generate_array** - Creates a random array of 100 integers
62+
2. **Parallel Step** - Four sorting agents run concurrently:
63+
- **bubble_sort** - Sorts using Bubble Sort
64+
- **insertion_sort** - Sorts using Insertion Sort
65+
- **merge_sort** - Sorts using Merge Sort
66+
- **quicksort** - Sorts using QuickSort
67+
3. **analyzer** - Compares and analyzes all sorting results
68+
69+
```bash
70+
./bin/cagent run examples/parallel_sorting_workflow.yaml
71+
```
72+
4273
## How It Works
4374

4475
The `run` command automatically detects workflows by checking if the configuration file contains a `workflow` section. No special command is needed!
4576

77+
### Execution Patterns
78+
79+
Workflows support two execution patterns:
80+
81+
1. **Sequential (`type: agent`)** - Agents run one after another, each receiving the previous agent's output
82+
2. **Parallel (`type: parallel`)** - Multiple agents run concurrently, all receiving the same input, with outputs combined for the next step
83+
4684
## Workflow Configuration
4785

4886
### Basic Structure
4987

88+
#### Sequential Workflow
89+
5090
```yaml
5191
version: "2"
5292

@@ -63,14 +103,51 @@ workflow:
63103
name: next_agent
64104
```
65105
106+
#### Parallel Workflow
107+
108+
```yaml
109+
version: "2"
110+
111+
agents:
112+
generator:
113+
model: openai/gpt-4o
114+
instruction: Generate initial data
115+
116+
processor1:
117+
model: openai/gpt-4o
118+
instruction: Process data using method 1
119+
120+
processor2:
121+
model: openai/gpt-4o
122+
instruction: Process data using method 2
123+
124+
combiner:
125+
model: openai/gpt-4o
126+
instruction: Combine and analyze results
127+
128+
workflow:
129+
- type: agent
130+
name: generator
131+
- type: parallel
132+
steps:
133+
- processor1
134+
- processor2
135+
- type: agent
136+
name: combiner
137+
```
138+
66139
### Key Features
67140
68141
1. **Sequential Execution**: Agents run in the order defined in the workflow
69-
2. **Data Piping**: The output of each agent becomes the input for the next agent
70-
3. **Automatic Context**: The first agent receives instructions to generate initial content, subsequent agents receive the previous output as input
71-
4. **No Root Agent Required**: Workflows don't need a "root" agent - just define the agents used in your workflow steps
142+
2. **Parallel Execution**: Multiple agents process the same input concurrently
143+
3. **Data Piping**: The output of each step becomes the input for the next step
144+
4. **Automatic Context**: The first agent receives instructions to generate initial content, subsequent agents receive the previous output as input
145+
5. **Output Combination**: Parallel step outputs are concatenated in the order specified and passed to the next step
146+
6. **No Root Agent Required**: Workflows don't need a "root" agent - just define the agents used in your workflow steps
72147
73-
### Example Flow
148+
### Example Flows
149+
150+
#### Sequential Flow
74151
75152
```
76153
Step 1: story_starter
@@ -83,6 +160,21 @@ Step 3: add_ending (receives previous output)
83160
→ Output: "...a bright future in the culinary world."
84161
```
85162
163+
#### Parallel Flow
164+
165+
```
166+
Step 1: source_text
167+
→ Output: "Docker containers are lightweight..."
168+
169+
Step 2: Parallel execution (all receive same input)
170+
├─ translate_spanish → "Los contenedores Docker son ligeros..."
171+
├─ translate_french → "Les conteneurs Docker sont légers..."
172+
└─ translate_japanese → "Dockerコンテナは軽量です..."
173+
174+
Step 3: formatter (receives all parallel outputs)
175+
→ Combined output with all three translations formatted
176+
```
177+
86178
## Command Options
87179

88180
Workflows support the same runtime configuration flags as regular agent runs:
@@ -111,8 +203,10 @@ Override specific agent models:
111203

112204
## Notes
113205

114-
- Each agent's output is passed as text to the next agent
206+
- Each agent's output is passed as text to the next step
207+
- For parallel steps, outputs are concatenated in the order specified in the `steps` array
115208
- The workflow stops immediately if any agent fails
116209
- Model overrides can be specified per agent using `--model agent_name=provider/model`
117-
- Currently only supports `type: agent` workflow steps (future: conditions, parallel execution)
118-
- The final output of the workflow is the output from the last agent in the sequence
210+
- Supports both `type: agent` (sequential) and `type: parallel` (concurrent) workflow steps
211+
- The final output of the workflow is the output from the last step in the sequence
212+
- Parallel execution provides true concurrency - agents run simultaneously, not sequentially
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
version: "2"
2+
3+
agents:
4+
generate_array:
5+
model: openai/gpt-4o
6+
instruction: |
7+
Generate a random array of 100 integers between 1 and 1000.
8+
Output format: just the numbers separated by commas.
9+
Example: 42, 789, 123, 456, ...
10+
11+
bubble_sort:
12+
model: openai/gpt-4o
13+
instruction: |
14+
Sort the given array using Bubble Sort algorithm.
15+
Explain your step-by-step process as you sort.
16+
Write out intermediate states every 10 swaps.
17+
Output the final sorted array and count how many comparisons you made.
18+
This should take significant work - be thorough and detailed.
19+
20+
insertion_sort:
21+
model: openai/gpt-4o
22+
instruction: |
23+
Sort the given array using Insertion Sort algorithm.
24+
Explain your step-by-step process.
25+
Show intermediate states every 10 insertions.
26+
Output the final sorted array and count how many comparisons you made.
27+
28+
merge_sort:
29+
model: openai/gpt-4o
30+
instruction: |
31+
Sort the given array using Merge Sort algorithm.
32+
Briefly explain the divide-and-conquer process.
33+
Show the merge tree structure (how you split and merged).
34+
Output the final sorted array.
35+
36+
quicksort:
37+
model: openai/gpt-4o
38+
instruction: |
39+
Sort the given array using QuickSort algorithm.
40+
Briefly explain your pivot selection and partitioning.
41+
Output the final sorted array.
42+
43+
analyzer:
44+
model: openai/gpt-4o
45+
instruction: |
46+
You received the same array sorted by 4 different algorithms:
47+
Bubble Sort, Insertion Sort, Merge Sort, and QuickSort.
48+
49+
Create a summary report:
50+
1. Verify all 4 produced the same sorted result
51+
2. Compare the complexity of explanations (which algorithm explained more steps)
52+
3. Note which algorithms mentioned their time complexity
53+
4. Provide a final "winner" based on explanation clarity
54+
55+
Format as a brief analysis report.
56+
57+
workflow:
58+
- type: agent
59+
name: generate_array
60+
- type: parallel
61+
steps:
62+
- bubble_sort
63+
- insertion_sort
64+
- merge_sort
65+
- quicksort
66+
- type: agent
67+
name: analyzer
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
version: "2"
2+
3+
agents:
4+
source_text:
5+
model: openai/gpt-4o
6+
instruction: |
7+
Write a 2-paragraph technical explanation of how Docker containers work.
8+
Use simple language but be technically accurate. About 80-100 words total.
9+
10+
translate_spanish:
11+
model: openai/gpt-4o
12+
instruction: |
13+
Translate the text to Spanish.
14+
Maintain technical accuracy and natural flow.
15+
16+
translate_french:
17+
model: openai/gpt-4o
18+
instruction: |
19+
Translate the text to French.
20+
Maintain technical accuracy and natural flow.
21+
22+
translate_japanese:
23+
model: openai/gpt-4o
24+
instruction: |
25+
Translate the text to Japanese.
26+
Maintain technical accuracy and natural flow.
27+
Use appropriate technical terminology in Japanese.
28+
29+
formatter:
30+
model: openai/gpt-4o
31+
instruction: |
32+
You received the same text translated into Spanish, French, and Japanese.
33+
Create a formatted output showing all three translations with language labels.
34+
Format: "🇪🇸 Spanish:\n[text]\n\n🇫🇷 French:\n[text]\n\n🇯🇵 Japanese:\n[text]"
35+
36+
workflow:
37+
- type: agent
38+
name: source_text
39+
- type: parallel
40+
steps:
41+
- translate_spanish
42+
- translate_french
43+
- translate_japanese
44+
- type: agent
45+
name: formatter

pkg/config/config.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,22 @@ func validateConfig(cfg *latest.Config) error {
138138

139139
// Validate workflow steps
140140
for i, step := range cfg.Workflow {
141-
if step.Type != "agent" {
142-
return fmt.Errorf("workflow step %d: unsupported type '%s' (only 'agent' is supported)", i, step.Type)
143-
}
144-
if _, exists := cfg.Agents[step.Name]; !exists {
145-
return fmt.Errorf("workflow step %d: references non-existent agent '%s'", i, step.Name)
141+
switch step.Type {
142+
case "agent":
143+
if _, exists := cfg.Agents[step.Name]; !exists {
144+
return fmt.Errorf("workflow step %d: references non-existent agent '%s'", i, step.Name)
145+
}
146+
case "parallel":
147+
if len(step.Steps) == 0 {
148+
return fmt.Errorf("workflow step %d: parallel step must specify at least one agent in 'steps'", i)
149+
}
150+
for _, agentName := range step.Steps {
151+
if _, exists := cfg.Agents[agentName]; !exists {
152+
return fmt.Errorf("workflow step %d: parallel step references non-existent agent '%s'", i, agentName)
153+
}
154+
}
155+
default:
156+
return fmt.Errorf("workflow step %d: unsupported type '%s' (supported types: 'agent', 'parallel')", i, step.Type)
146157
}
147158
}
148159

pkg/config/examples_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,17 @@ func TestParseExamples(t *testing.T) {
4848
require.NotEmpty(t, cfg.Workflow, "Workflow should not be empty in %s", file)
4949
// Verify all workflow agents exist and have instructions
5050
for _, step := range cfg.Workflow {
51-
require.Contains(t, cfg.Agents, step.Name, "Workflow agent '%s' not found in %s", step.Name, file)
52-
require.NotEmpty(t, cfg.Agents[step.Name].Instruction, "Instruction should not be empty for agent '%s' in %s", step.Name, file)
51+
switch step.Type {
52+
case "agent":
53+
require.Contains(t, cfg.Agents, step.Name, "Workflow agent '%s' not found in %s", step.Name, file)
54+
require.NotEmpty(t, cfg.Agents[step.Name].Instruction, "Instruction should not be empty for agent '%s' in %s", step.Name, file)
55+
case "parallel":
56+
require.NotEmpty(t, step.Steps, "Parallel step should have at least one agent in %s", file)
57+
for _, agentName := range step.Steps {
58+
require.Contains(t, cfg.Agents, agentName, "Parallel workflow agent '%s' not found in %s", agentName, file)
59+
require.NotEmpty(t, cfg.Agents[agentName].Instruction, "Instruction should not be empty for agent '%s' in %s", agentName, file)
60+
}
61+
}
5362
}
5463
} else {
5564
// For non-workflow examples, verify root agent

pkg/config/v1/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ type Metadata struct {
149149

150150
// WorkflowStep represents a step in a workflow
151151
type WorkflowStep struct {
152-
Type string `json:"type" yaml:"type"` // Currently only "agent" is supported
153-
Name string `json:"name" yaml:"name"` // Name of the agent to run
152+
Type string `json:"type" yaml:"type"` // "agent" or "parallel"
153+
Name string `json:"name,omitempty" yaml:"name,omitempty"` // Name of the agent to run (for type "agent")
154+
Steps []string `json:"steps,omitempty" yaml:"steps,omitempty"` // List of agent names to run in parallel (for type "parallel")
154155
}

0 commit comments

Comments
 (0)