forked from a-agmon/rs-graph-llm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_example.rs
More file actions
132 lines (110 loc) · 4.65 KB
/
Copy pathsimple_example.rs
File metadata and controls
132 lines (110 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use async_trait::async_trait;
use graph_flow::{
Context, ExecutionStatus, FlowRunner, GraphBuilder, GraphStorage, InMemoryGraphStorage,
InMemorySessionStorage, NextAction, Session, SessionStorage, Task, TaskResult,
};
use std::sync::Arc;
// We have 2 tasks in this simple example:
// 1. HelloTask - greets the user by name
// 2. ExcitementTask - adds excitement to the greeting
struct HelloTask;
#[async_trait]
impl Task for HelloTask {
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let name: String = context.get_sync("name").unwrap();
let greeting = format!("Hello, {}", name);
// Store result for next task
context.set("greeting", greeting.clone()).await;
// using NextAction::Continue to indicate we want to proceed to the next task,
// but we want to advance just one step and give control back to the workflow manager
// We can also use NextAction::ContinueAndExecute if we want to execute the next task immediately
Ok(TaskResult::new(Some(greeting), NextAction::Continue))
}
}
// Define a task that adds excitement
struct ExcitementTask;
#[async_trait]
impl Task for ExcitementTask {
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let greeting: String = context.get_sync("greeting").unwrap();
let excited = format!("{} !!!", greeting);
Ok(TaskResult::new(Some(excited), NextAction::End))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create storage instances
let session_storage = Arc::new(InMemorySessionStorage::new());
let graph_storage = Arc::new(InMemoryGraphStorage::new());
// Build a simple workflow
let hello_task = Arc::new(HelloTask);
let hello_task_id = hello_task.id().to_string();
let excitement_task = Arc::new(ExcitementTask);
let excitement_task_id = excitement_task.id().to_string();
let graph = Arc::new(
GraphBuilder::new("greeting_workflow")
.add_task(hello_task)
.add_task(excitement_task)
.add_edge(&hello_task_id, &excitement_task_id) // Connect the tasks
.build(),
);
// Store the graph in graph storage
graph_storage
.save("greeting_workflow".to_string(), graph.clone())
.await?;
// Create a session with initial context
let session_id = "session_001".to_string();
let session = Session::new_from_task(session_id.clone(), &hello_task_id);
// Set up context with input data
session.context.set("name", "Batman".to_string()).await;
// Save the session
session_storage.save(session.clone()).await?;
println!("Starting simple workflow with FlowRunner\n");
println!("Session ID: {}", session.id);
println!("Initial task: {}\n", session.current_task_id);
// Create a FlowRunner that hides the load / execute / save boilerplate
let runner = FlowRunner::new(graph.clone(), session_storage.clone());
// Execute until completion
loop {
let execution_result = runner.run(&session_id).await?;
if let Some(response) = &execution_result.response {
println!("Task response: {}", response);
}
println!("Execution status: {:?}", execution_result.status);
match execution_result.status {
ExecutionStatus::Completed => {
println!("Workflow completed successfully!");
break;
}
ExecutionStatus::Paused { next_task_id, reason } => {
println!("Workflow paused, will continue to task: {} (reason: {}) – continuing...\n", next_task_id, reason);
continue;
}
ExecutionStatus::WaitingForInput => {
println!("Workflow waiting for user input – continuing...\n");
continue;
}
ExecutionStatus::Error(err) => {
println!("Error occurred: {}", err);
break;
}
}
}
// Demonstrate session persistence by retrieving final session
let final_session = session_storage
.get(&session_id)
.await?
.ok_or("Session not found")?;
println!("\nFinal session state:");
println!("Session ID: {}", final_session.id);
println!("Current task: {}", final_session.current_task_id);
if let Some(status) = &final_session.status_message {
println!("Final status: {}", status);
}
// Demonstrate retrieving stored values from context
if let Some(greeting) = final_session.context.get::<String>("greeting").await {
println!("Stored greeting: {}", greeting);
}
println!("\nWorkflow execution finished");
Ok(())
}