Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/openlogi-agent-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ workspace = true

[dev-dependencies]
bincode = "1.3"
# Only to construct an `InventoryError::Hid` in the watcher's classify tests.
async-hid = { workspace = true }
143 changes: 115 additions & 28 deletions crates/openlogi-agent-core/src/watchers/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,54 @@ pub enum InventoryEvent {
SystemWake,
}

/// The watcher's cross-tick memory, factored out of the poll loop so the
/// tick → event decision is unit-testable without spawning the thread or
/// touching real HID.
#[derive(Default)]
struct WatchState {
/// Set once any enumeration has completed. After that, a failed tick keeps
/// the last good snapshot forever instead of ever reporting `Unavailable`.
succeeded: bool,
/// Consecutive failures, counted only before the first success.
initial_failures: u8,
}

impl WatchState {
/// Decide what (if anything) a watch tick emits.
///
/// - `Ok(snapshot)` — a completed enumeration (an empty one included: that's
/// a genuine disconnect) — is forwarded so the agent's device set tracks
/// reality. A transient per-node probe miss never reaches here as an empty
/// `Ok`: `openlogi_hid`'s `NodeLedger` replays the node's last inventory
/// (#218/#222).
/// - `Err(..)` means enumeration itself failed (OS-level HID enumerate
/// error): emit nothing, so the agent keeps its last good device set and
/// live bindings instead of wiping them for ~one period. Before the *first*
/// success there is no good set to keep, so persistent initial failure is
/// reported once as [`InventoryEvent::Unavailable`]; the loop keeps
/// retrying and a later success recovers.
fn classify(
&mut self,
result: Result<Vec<DeviceInventory>, openlogi_hid::InventoryError>,
) -> Option<InventoryEvent> {
match result {
Ok(inv) => {
self.succeeded = true;
Some(InventoryEvent::Snapshot(inv))
}
Err(e) => {
warn!(error = ?e, "enumerate failed during watch tick — keeping last snapshot");
if self.succeeded {
return None;
}
self.initial_failures = self.initial_failures.saturating_add(1);
(self.initial_failures == INITIAL_FAILURE_LIMIT)
.then_some(InventoryEvent::Unavailable)
}
}
}
}

/// Spawn the watcher and return a receiver of inventory events. The
/// channel is unbounded so a slow consumer cannot back-pressure the HID
/// poll loop into stalling on a real device disconnect.
Expand Down Expand Up @@ -76,8 +124,7 @@ pub fn spawn(period: Duration) -> mpsc::UnboundedReceiver<InventoryEvent> {
// across ticks — a known device's immutable data (model, features)
// is reused instead of being re-handshaked every poll.
let mut enumerator = openlogi_hid::Enumerator::default();
let mut succeeded = false;
let mut initial_failures: u8 = 0;
let mut state = WatchState::default();
let mut last_tick = SystemTime::now();
loop {
// A tick arriving far past its period means the system slept;
Expand All @@ -93,32 +140,12 @@ pub fn spawn(period: Duration) -> mpsc::UnboundedReceiver<InventoryEvent> {
}
}
last_tick = now;
match rt.block_on(enumerator.enumerate()) {
Ok(inv) => {
succeeded = true;
if worker_tx.send(InventoryEvent::Snapshot(inv)).is_err() {
debug!("inventory watcher receiver dropped — exiting");
return;
}
}
// A failed enumerate means "couldn't check", NOT "no devices":
// skip the tick so the agent keeps its last good device set
// and live bindings instead of wiping them for ~one period. A
// genuine disconnect comes back as an `Ok` empty snapshot,
// which we DO forward. Before the *first* success there is no
// good set to keep, so persistent failure is reported once —
// the loop keeps retrying, and a later success recovers.
Err(e) => {
warn!(error = ?e, "enumerate failed during watch tick — keeping last snapshot");
if !succeeded {
initial_failures = initial_failures.saturating_add(1);
if initial_failures == INITIAL_FAILURE_LIMIT
&& worker_tx.send(InventoryEvent::Unavailable).is_err()
{
return;
}
}
}
let result = rt.block_on(enumerator.enumerate());
if let Some(event) = state.classify(result)
&& worker_tx.send(event).is_err()
{
debug!("inventory watcher receiver dropped — exiting");
return;
}
thread::sleep(period);
}
Expand All @@ -133,3 +160,63 @@ pub fn spawn(period: Duration) -> mpsc::UnboundedReceiver<InventoryEvent> {
}
rx
}

#[cfg(test)]
mod tests {
use openlogi_hid::InventoryError;

use super::{INITIAL_FAILURE_LIMIT, InventoryEvent, WatchState};

/// A transport-level enumerate failure — what the watcher's `Err` arm now
/// sees (a partial per-node read is replayed by the hid ledger as `Ok`).
fn enumerate_failed() -> InventoryError {
InventoryError::Hid(async_hid::HidError::Disconnected)
}

#[test]
fn completed_enumeration_is_forwarded_even_when_empty() {
let mut state = WatchState::default();
// A genuine "checked, nothing there" still propagates as a disconnect —
// the resilience must not swallow a real empty.
assert!(matches!(
state.classify(Ok(vec![])),
Some(InventoryEvent::Snapshot(snap)) if snap.is_empty()
));
assert!(state.succeeded);
}

#[test]
fn failure_after_a_success_keeps_the_last_snapshot() {
let mut state = WatchState::default();
// A good tick first, so there is a last-known-good set to preserve.
assert!(matches!(
state.classify(Ok(vec![])),
Some(InventoryEvent::Snapshot(_))
));
// Then transient enumerate failures emit nothing — the agent keeps the
// last snapshot instead of flapping to "No devices" (#218).
assert!(state.classify(Err(enumerate_failed())).is_none());
assert!(state.classify(Err(enumerate_failed())).is_none());
}

#[test]
fn persistent_initial_failure_reports_unavailable_once_then_recovers() {
let mut state = WatchState::default();
// No snapshot has ever landed, so repeated failure must eventually stop
// looking like "still scanning".
for _ in 0..INITIAL_FAILURE_LIMIT - 1 {
assert!(state.classify(Err(enumerate_failed())).is_none());
}
assert!(matches!(
state.classify(Err(enumerate_failed())),
Some(InventoryEvent::Unavailable)
));
// Reported once, not on every later failure.
assert!(state.classify(Err(enumerate_failed())).is_none());
// …and a later success recovers with a live snapshot.
assert!(matches!(
state.classify(Ok(vec![])),
Some(InventoryEvent::Snapshot(_))
));
}
}
52 changes: 50 additions & 2 deletions crates/openlogi-hid/src/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,39 @@ struct CachedChannel {
/// We merge the two so an MX Master that's been asleep still shows up with
/// its codename and kind even before you click it.
pub async fn enumerate() -> Result<Vec<DeviceInventory>, InventoryError> {
Enumerator::default().enumerate().await
// The polling [`Enumerator`] keeps a per-node ledger across ticks, so a
// transient probe miss replays the node's last good inventory. A one-shot
// caller (CLI `list` / `diag`) builds a fresh `Enumerator` whose ledger is
// empty, so a miss has nothing to replay and would surface as an empty or
// partial list — the two isolated runs in #218 read 3 devices and 0. Retry a
// few times instead, reusing the same enumerator so its ledger accumulates a
// snapshot a later attempt can replay and the opened channel stays warm.
// #226's 5 s request timeout inside `HidppChannel::send` makes a dead probe
// fail fast, so a short bounded retry is cheap.
let mut enumerator = Enumerator::default();
let mut attempt = 1u8;
loop {
let (inventories, all_healthy) = enumerator.enumerate_reporting_health().await?;
if all_healthy || attempt >= ONESHOT_ATTEMPTS {
return Ok(inventories);
}
debug!(
attempt,
"one-shot enumerate saw an unhealthy node — retrying"
);
tokio::time::sleep(ONESHOT_RETRY_DELAY).await;
attempt += 1;
}
}

/// Attempts a one-shot [`enumerate`] makes before returning whatever it last
/// read, when a node keeps coming back unhealthy.
const ONESHOT_ATTEMPTS: u8 = 4;

/// Delay between one-shot [`enumerate`] retries. A first probe usually wakes an
/// asleep device, so a short pause lets the next attempt read it cleanly.
const ONESHOT_RETRY_DELAY: Duration = Duration::from_millis(300);

impl Enumerator {
/// One enumeration pass, reusing the cache from prior passes. Probes every
/// HID candidate concurrently (so one asleep node that burns the whole
Expand All @@ -270,6 +300,19 @@ impl Enumerator {
/// channel is reopened, so a transient HID++ glitch can't masquerade as
/// "no devices" (#218) — see [`crate::node_ledger`].
pub async fn enumerate(&mut self) -> Result<Vec<DeviceInventory>, InventoryError> {
self.enumerate_reporting_health().await.map(|(inv, _)| inv)
}

/// [`Self::enumerate`] plus whether every probed node answered cleanly this
/// pass — `false` if any probe timed out, failed to open, or read short of a
/// receiver's pairing count. The polling watcher ignores the flag (the ledger
/// already replays a node through a transient miss), but the one-shot
/// [`enumerate`] free fn uses it to retry: a fresh `Enumerator` has no ledger
/// history to replay, so a transient miss would otherwise surface as an
/// empty/partial list (#218).
async fn enumerate_reporting_health(
&mut self,
) -> Result<(Vec<DeviceInventory>, bool), InventoryError> {
self.tick = self.tick.wrapping_add(1);
let tick = self.tick;
let candidates = enumerate_hidpp_devices().await?;
Expand Down Expand Up @@ -336,6 +379,9 @@ impl Enumerator {

let mut inventories = Vec::new();
let mut outcomes = Vec::new();
// Whether every node answered cleanly this pass. Drives the one-shot
// `enumerate` retry; the ledger's own per-node replay is unaffected.
let mut all_healthy = true;
for (node, result) in results {
let probe = if let Ok(probe) = result {
probe
Expand All @@ -347,6 +393,7 @@ impl Enumerator {
warn!(budget = ?PROBE_BUDGET, "device probe timed out — treating as a failed probe");
NodeProbe::failed()
};
all_healthy &= probe.healthy;
outcomes.extend(probe.outcomes);
let settled = self.ledger.settle(&node, probe.healthy, probe.inventory);
if settled.evict_channel && self.channels.remove(&node).is_some() {
Expand All @@ -357,6 +404,7 @@ impl Enumerator {
// Nodes that wouldn't open this tick still replay their last snapshot
// (they have no cached channel to evict).
for node in open_failures {
all_healthy = false;
let settled = self.ledger.settle(&node, false, None);
inventories.extend(settled.inventory);
}
Expand All @@ -376,7 +424,7 @@ impl Enumerator {
}
}
self.evict_unseen(&seen_keys);
Ok(inventories)
Ok((inventories, all_healthy))
}

/// Drop cache entries for devices not seen this tick, after a short grace so
Expand Down