Skip to content

Commit eee7bbc

Browse files
committed
Add explicit sad paths for workflow and event delivery
1 parent 75797a2 commit eee7bbc

3 files changed

Lines changed: 154 additions & 26 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ All notable changes to `devloop` will be recorded in this file.
55
## [Unreleased]
66

77
### Fixed
8+
- Runtime requests for missing workflows now fail explicitly instead of
9+
being logged and skipped, and external events return `503` if their
10+
workflow trigger cannot be dispatched.
11+
- Watcher callback delivery failures are now surfaced as errors instead
12+
of being dropped silently.
13+
- Unexpected watcher and external-event channel disconnects now fail the
14+
engine explicitly instead of silently disabling those input paths.
815
- Accepted macOS `notify` event paths reported under `/private/...`
916
for watched roots configured under `/var/...`, so file changes in
1017
temp directories are no longer dropped by the watch classifier.

src/engine.rs

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use notify::{
1010
use serde_json::{Map, Value};
1111
use tokio::signal;
1212
use tokio::time::{Instant, sleep};
13-
use tracing::{info, warn};
13+
use tracing::{error, info};
1414
use unicode_width::UnicodeWidthStr;
1515

1616
use crate::config::{CompiledWatchGroup, Config, LogStyle};
@@ -85,7 +85,12 @@ impl Engine {
8585
let tx_watcher = tx.clone();
8686
let mut watcher = RecommendedWatcher::new(
8787
move |result| {
88-
let _ = tx_watcher.send(result);
88+
if let Err(error) = tx_watcher.send(result) {
89+
error!(
90+
"failed to forward watcher event into runtime loop: {}",
91+
error
92+
);
93+
}
8994
},
9095
NotifyConfig::default(),
9196
)?;
@@ -125,9 +130,7 @@ impl Engine {
125130
}
126131
}
127132
batch = next_batch(&rx, self.config.debounce()) => {
128-
let Some(events) = batch? else {
129-
continue;
130-
};
133+
let events = batch?;
131134
let workflows = classify_events(&self.config.root, &watch_groups, &events);
132135
if !workflows.is_empty() {
133136
runtime.handle_event(RuntimeEvent::WatchChanges { workflows });
@@ -136,12 +139,17 @@ impl Engine {
136139
}
137140
}
138141
}
139-
Some(event) = external_event_rx.recv() => {
140-
runtime.handle_event(RuntimeEvent::WorkflowTrigger {
141-
workflow_name: event.workflow_name,
142-
});
143-
if execute_runtime_effects(&mut runtime, &mut adapter).await? {
144-
return Ok(());
142+
event = external_event_rx.recv() => {
143+
match event {
144+
Some(event) => {
145+
runtime.handle_event(RuntimeEvent::WorkflowTrigger {
146+
workflow_name: event.workflow_name,
147+
});
148+
if execute_runtime_effects(&mut runtime, &mut adapter).await? {
149+
return Ok(());
150+
}
151+
}
152+
None => return Err(anyhow!("external event channel disconnected")),
145153
}
146154
}
147155
}
@@ -225,10 +233,10 @@ impl RuntimeEffectAdapter for LiveRuntimeAdapter<'_, '_> {
225233

226234
async fn run_workflow(&mut self, workflow_name: &str, changed_files: &[String]) -> Result<()> {
227235
info!("running workflow {}", workflow_name);
228-
let Some(_) = self.config.workflow.get(workflow_name) else {
229-
warn!("skipping missing workflow {}", workflow_name);
230-
return Ok(());
231-
};
236+
self.config
237+
.workflow
238+
.get(workflow_name)
239+
.ok_or_else(|| anyhow!("runtime requested missing workflow '{workflow_name}'"))?;
232240
let mut adapter = LiveWorkflowAdapter {
233241
processes: self.processes,
234242
state: self.state,
@@ -306,10 +314,10 @@ async fn run_workflow(
306314
changed_files: &[String],
307315
) -> Result<()> {
308316
info!("running workflow {}", workflow_name);
309-
let Some(_) = config.workflow.get(workflow_name) else {
310-
warn!("skipping missing workflow {}", workflow_name);
311-
return Ok(());
312-
};
317+
config
318+
.workflow
319+
.get(workflow_name)
320+
.ok_or_else(|| anyhow!("runtime requested missing workflow '{workflow_name}'"))?;
313321
let mut adapter = LiveWorkflowAdapter { processes, state };
314322
run_workflow_machine(config, &mut adapter, workflow_name, changed_files).await
315323
}
@@ -381,10 +389,10 @@ fn boxed_banner_lines(message: &str) -> [String; 3] {
381389
async fn next_batch(
382390
rx: &mpsc::Receiver<notify::Result<Event>>,
383391
debounce: Duration,
384-
) -> Result<Option<Vec<Event>>> {
392+
) -> Result<Vec<Event>> {
385393
let first = match rx.recv() {
386394
Ok(result) => result?,
387-
Err(_) => return Ok(None),
395+
Err(_) => return Err(anyhow!("watcher event channel disconnected")),
388396
};
389397
let start = Instant::now();
390398
let mut events = vec![first];
@@ -395,7 +403,7 @@ async fn next_batch(
395403
Err(mpsc::TryRecvError::Disconnected) => break,
396404
}
397405
}
398-
Ok(Some(events))
406+
Ok(events)
399407
}
400408

401409
fn classify_events(
@@ -495,6 +503,22 @@ mod tests {
495503
assert_eq!(grouped["content"], vec!["content/posts/example.md"]);
496504
}
497505

506+
#[tokio::test]
507+
async fn next_batch_errors_when_watcher_channel_disconnects() {
508+
let (_tx, rx) = mpsc::channel();
509+
drop(_tx);
510+
511+
let error = next_batch(&rx, Duration::from_millis(10))
512+
.await
513+
.expect_err("channel disconnect should error");
514+
515+
assert!(
516+
error
517+
.to_string()
518+
.contains("watcher event channel disconnected")
519+
);
520+
}
521+
498522
#[test]
499523
fn classify_changes_by_workflow_accepts_private_var_event_paths() {
500524
let root = PathBuf::from("/var/folders/example/tmp");
@@ -1081,4 +1105,34 @@ mod tests {
10811105
]
10821106
);
10831107
}
1108+
1109+
#[tokio::test]
1110+
async fn missing_runtime_workflow_returns_error() {
1111+
let config = Config {
1112+
root: PathBuf::from("."),
1113+
debounce_ms: 100,
1114+
state_file: Some(PathBuf::from("./state.json")),
1115+
startup_workflows: vec![],
1116+
watch: BTreeMap::new(),
1117+
process: BTreeMap::new(),
1118+
hook: BTreeMap::new(),
1119+
event_server: crate::config::EventServerConfig::default(),
1120+
event: BTreeMap::new(),
1121+
workflow: BTreeMap::new(),
1122+
};
1123+
let state_path = unique_state_path();
1124+
let state = SessionState::load(state_path.clone()).expect("load state");
1125+
let mut processes = ProcessManager::new(&config);
1126+
1127+
let error = run_workflow(&config, &mut processes, &state, "missing", &[])
1128+
.await
1129+
.expect_err("missing workflow should error");
1130+
1131+
assert!(
1132+
error
1133+
.to_string()
1134+
.contains("runtime requested missing workflow 'missing'")
1135+
);
1136+
let _ = std::fs::remove_file(state_path);
1137+
}
10841138
}

src/external_events.rs

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl ExternalEventServer {
7575
.with_state(Arc::new(app_state));
7676
let task = tokio::spawn(async move {
7777
if let Err(error) = axum::serve(listener, app).await {
78-
info!("external event server stopped: {}", error);
78+
error!("external event server stopped unexpectedly: {}", error);
7979
}
8080
});
8181

@@ -137,10 +137,18 @@ async fn handle_event(
137137
.set_if_changed(&spec.state_key, Value::String(payload.value))
138138
{
139139
Ok(true) => {
140-
let _ = state.sender.send(ExternalEventMessage {
140+
match state.sender.send(ExternalEventMessage {
141141
workflow_name: spec.workflow.clone(),
142-
});
143-
StatusCode::NO_CONTENT
142+
}) {
143+
Ok(()) => StatusCode::NO_CONTENT,
144+
Err(error) => {
145+
error!(
146+
"failed to dispatch external event '{}' to workflow '{}': {}",
147+
name, spec.workflow, error
148+
);
149+
StatusCode::SERVICE_UNAVAILABLE
150+
}
151+
}
144152
}
145153
Ok(false) => StatusCode::NO_CONTENT,
146154
Err(err) => {
@@ -468,4 +476,63 @@ mod tests {
468476

469477
let _ = std::fs::remove_file(state_path);
470478
}
479+
480+
#[tokio::test]
481+
async fn event_server_fails_loudly_when_workflow_dispatch_is_unavailable() {
482+
let mut config = base_config();
483+
config.workflow.insert(
484+
"publish_post_url".into(),
485+
WorkflowSpec {
486+
steps: vec![WorkflowStep::Log {
487+
message: "ok".into(),
488+
style: crate::config::LogStyle::Plain,
489+
}],
490+
},
491+
);
492+
config.event.insert(
493+
"browser_path".into(),
494+
EventSpec {
495+
state_key: "current_browser_path".into(),
496+
workflow: "publish_post_url".into(),
497+
pattern: None,
498+
},
499+
);
500+
let state_path = std::env::temp_dir().join(format!(
501+
"devloop-external-events-dispatch-{}.json",
502+
std::process::id()
503+
));
504+
let state = SessionState::load(state_path.clone()).expect("load state");
505+
let (sender, receiver) = mpsc::unbounded_channel();
506+
drop(receiver);
507+
let server = ExternalEventServer::start(&config, state.clone(), sender)
508+
.await
509+
.expect("start external event server")
510+
.expect("event server");
511+
512+
let client = reqwest::Client::new();
513+
let response = client
514+
.post(
515+
server
516+
.environment()
517+
.event_urls
518+
.get("browser_path")
519+
.expect("browser path url"),
520+
)
521+
.bearer_auth(&server.environment().token)
522+
.json(&serde_json::json!({ "value": "/posts/example-post" }))
523+
.send()
524+
.await
525+
.expect("send request");
526+
527+
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
528+
assert_eq!(
529+
state
530+
.get_string("current_browser_path")
531+
.expect("get current_browser_path")
532+
.as_deref(),
533+
Some("/posts/example-post")
534+
);
535+
536+
let _ = std::fs::remove_file(state_path);
537+
}
471538
}

0 commit comments

Comments
 (0)