Skip to content

Commit 313419c

Browse files
authored
HardwareManager: Rework to use a watch channel instead of broadcasting updates (#10194)
This is an attempt to make hardware monitoring somewhat more level-triggered and somewhat less edge-triggered. In #10187, we had a case where the two different sources of `HardwareUpdate` notifications got out of sync, resulting in the polling source failing to send a `TofinoAvailable` update even though it realized the Tofino was, in fact, available. On this branch, we keep the current `HardwareView` in a watch channel; any changes made to it will result in a `.changed()` notification firing, removing the possibility of mismatched updates. Fixes #10187.
1 parent be164f1 commit 313419c

5 files changed

Lines changed: 317 additions & 410 deletions

File tree

sled-agent/config-reconciler/src/raw_disks.rs

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//! [`RawDisk`]s sled-agent is aware of.
77
88
use iddqd::IdOrdMap;
9-
use omicron_common::disk::DiskIdentity;
109
use sled_agent_types::inventory::InventoryDisk;
1110
use sled_storage::disk::RawDisk;
1211
use slog::Logger;
@@ -55,10 +54,15 @@ impl RawDisksSender {
5554
// disks that shouldn't be removed even if they're not present in
5655
// `new_disks`; check for that first.
5756
for old_disk in disks.iter() {
58-
if !new_disks.contains_key(old_disk.identity())
59-
&& !can_remove_disk(old_disk, log)
60-
{
61-
new_disks.insert_overwrite(old_disk.clone());
57+
if !new_disks.contains_key(old_disk.identity()) {
58+
if can_remove_disk(old_disk, log) {
59+
info!(
60+
log, "Removing disk";
61+
"identity" => ?old_disk.identity(),
62+
);
63+
} else {
64+
new_disks.insert_overwrite(old_disk.clone());
65+
}
6266
}
6367
}
6468

@@ -98,31 +102,6 @@ impl RawDisksSender {
98102
})
99103
}
100104

101-
/// Remove a raw disk that is no longer visible to sled-agent.
102-
pub fn remove_raw_disk(
103-
&self,
104-
identity: &DiskIdentity,
105-
log: &Logger,
106-
) -> bool {
107-
self.0.send_if_modified(|disks| {
108-
let Some(disk) = disks.get(identity) else {
109-
info!(
110-
log, "Ignoring request to remove nonexistent disk";
111-
"identity" => ?identity,
112-
);
113-
return false;
114-
};
115-
116-
if !can_remove_disk(disk, log) {
117-
return false;
118-
}
119-
120-
info!(log, "Removing disk"; "identity" => ?identity);
121-
Arc::make_mut(disks).remove(identity);
122-
true
123-
})
124-
}
125-
126105
pub(crate) fn to_inventory(&self) -> Vec<InventoryDisk> {
127106
self.0
128107
.borrow()
@@ -163,6 +142,7 @@ fn can_remove_disk(disk: &RawDisk, log: &Logger) -> bool {
163142
mod tests {
164143
use super::*;
165144
use camino::Utf8PathBuf;
145+
use omicron_common::disk::DiskIdentity;
166146
use omicron_common::disk::DiskVariant;
167147
use omicron_test_utils::dev;
168148
use proptest::collection::btree_map;
@@ -252,9 +232,6 @@ mod tests {
252232
// Change the active firmware slot of the disk at the given index, then
253233
// pass it to `RawDisksSender::add_or_update_raw_disk`
254234
Update(Index),
255-
// Call `RawDisksSender::remove_raw_disk` with the disk at the given
256-
// index
257-
Remove(Index),
258235
// Call `RawDisksSender::set_raw_disks` using the set of disks in the
259236
// range `[start, start + num)`
260237
Set { start: Index, num: usize },
@@ -305,19 +282,6 @@ mod tests {
305282
states[index].present = true;
306283
true
307284
}
308-
Operation::Remove(index) => {
309-
let index = index.index(states.len());
310-
eprintln!("removing disk {index}");
311-
let was_present = states[index].present;
312-
tx.remove_raw_disk(disks[index].identity(), log);
313-
// Synthetic disks should never be removed
314-
if disks[index].is_synthetic() {
315-
false
316-
} else {
317-
states[index].present = false;
318-
was_present
319-
}
320-
}
321285
Operation::Set { start, num } => {
322286
let start = start.index(states.len());
323287
let end =
@@ -381,7 +345,7 @@ mod tests {
381345
assert_eq!(rx.has_changed().expect("channel open"), expect_changes);
382346

383347
// After this operation, check that the disk is either present or
384-
// not (the thing changed by our add/remove/set operations) and has
348+
// not (the thing changed by our add or set operations) and has
385349
// the expected firmware slot (the thing changed by our update
386350
// operation).
387351
let current = rx.borrow_and_update();

sled-agent/src/hardware_monitor.rs

Lines changed: 34 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,20 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44

5-
//! A task that listens for hardware events from the
5+
//! A task that listens for changes in the [`HardwareView`] from the
66
//! [`sled_hardware::HardwareManager`] and dispatches them to other parts
77
//! of the bootstrap agent and sled-agent code.
88
99
use crate::services::ServiceManager;
1010
use crate::sled_agent::SledAgent;
1111
use sled_agent_config_reconciler::RawDisksSender;
1212
use sled_agent_types::debug::OperatorSwitchZonePolicy;
13-
use sled_hardware::{HardwareManager, HardwareUpdate};
13+
use sled_hardware::{HardwareManager, HardwareView};
1414
use sled_hardware_types::Baseboard;
1515
use sled_storage::disk::RawDisk;
1616
use slog::Logger;
17-
use tokio::sync::broadcast::error::RecvError;
18-
use tokio::sync::{broadcast, oneshot, watch};
17+
use tokio::sync::oneshot;
18+
use tokio::sync::watch;
1919

2020
/// A handle controlling the behavior of a [`HardwareMonitor`]
2121
#[derive(Debug, Clone)]
@@ -52,15 +52,12 @@ pub struct HardwareMonitor {
5252
// Receive a onetime notification that the ServiceManager is ready
5353
service_manager_ready_rx: oneshot::Receiver<ServiceManager>,
5454

55-
// Receive messages from the [`HardwareManager`]
56-
hardware_rx: broadcast::Receiver<HardwareUpdate>,
55+
// Receive current view of hardware from the [`HardwareManager`]
56+
hardware_view_rx: watch::Receiver<HardwareView>,
5757

5858
// Receive the operator's policy controlling the switch zone
5959
switch_zone_policy_rx: watch::Receiver<OperatorSwitchZonePolicy>,
6060

61-
// A reference to the hardware manager
62-
hardware_manager: HardwareManager,
63-
6461
// A handle to send raw disk updates to the config-reconciler system.
6562
raw_disks_tx: RawDisksSender,
6663

@@ -96,8 +93,8 @@ impl HardwareMonitor {
9693
let (sled_agent_started_tx, sled_agent_started_rx) = oneshot::channel();
9794
let (service_manager_ready_tx, service_manager_ready_rx) =
9895
oneshot::channel();
99-
let baseboard = hardware_manager.baseboard();
100-
let hardware_rx = hardware_manager.monitor();
96+
let hardware_view_rx = hardware_manager.subscribe();
97+
let baseboard = hardware_view_rx.borrow().baseboard();
10198
let log = log.new(o!("component" => "HardwareMonitor"));
10299
let (switch_zone_policy_tx, switch_zone_policy_rx) =
103100
watch::channel(OperatorSwitchZonePolicy::StartIfSwitchPresent);
@@ -106,9 +103,8 @@ impl HardwareMonitor {
106103
baseboard,
107104
sled_agent_started_rx,
108105
service_manager_ready_rx,
109-
hardware_rx,
106+
hardware_view_rx,
110107
switch_zone_policy_rx,
111-
hardware_manager: hardware_manager.clone(),
112108
raw_disks_tx,
113109
sled_agent: None,
114110
service_manager: None,
@@ -150,14 +146,25 @@ impl HardwareMonitor {
150146
policy,
151147
).await;
152148
}
153-
update = self.hardware_rx.recv() => {
154-
info!(
155-
self.log,
156-
"Received hardware update message";
157-
"update" => ?update,
158-
);
159-
self.handle_hardware_update(update.clone()).await
160-
},
149+
result = self.hardware_view_rx.changed() => {
150+
match result {
151+
Ok(()) => {
152+
info!(
153+
self.log,
154+
"Received notification hardware \
155+
view has changed"
156+
);
157+
self.check_latest_hardware_snapshot().await;
158+
}
159+
Err(_recv_error) => {
160+
// The `HardwareManager` monitoring task is an
161+
// infinite loop - the only way for us to get
162+
// an error from `changed()` here is if it panicked,
163+
// so we will propagate such a panic.
164+
panic!("Hardware manager monitor task panicked");
165+
}
166+
}
167+
}
161168
Ok(()) = self.switch_zone_policy_rx.changed() => {
162169
let policy = self.current_switch_zone_policy();
163170
info!(
@@ -179,70 +186,6 @@ impl HardwareMonitor {
179186
*self.switch_zone_policy_rx.borrow_and_update()
180187
}
181188

182-
// Handle an update from the [`HardwareMonitor`]
183-
async fn handle_hardware_update(
184-
&mut self,
185-
update: Result<HardwareUpdate, RecvError>,
186-
) {
187-
match update {
188-
Ok(update) => match update {
189-
HardwareUpdate::TofinoAvailable => {
190-
info!(
191-
self.log,
192-
"Hardware monitor got TofinoAvailable message"
193-
);
194-
let policy = self.current_switch_zone_policy();
195-
self.ensure_switch_zone_activated_or_deactivated(
196-
true, policy,
197-
)
198-
.await
199-
}
200-
HardwareUpdate::TofinoUnavailable => {
201-
info!(
202-
self.log,
203-
"Hardware monitor got TofinoUnavailable message"
204-
);
205-
let policy = self.current_switch_zone_policy();
206-
self.ensure_switch_zone_activated_or_deactivated(
207-
false, policy,
208-
)
209-
.await
210-
}
211-
HardwareUpdate::TofinoDeviceChange => {
212-
info!(
213-
self.log,
214-
"Hardware monitor got TofinoDeviceChange message"
215-
);
216-
if let Some(sled_agent) = &mut self.sled_agent {
217-
sled_agent.notify_nexus_about_self(&self.log).await;
218-
}
219-
}
220-
HardwareUpdate::DiskAdded(disk) => {
221-
self.raw_disks_tx
222-
.add_or_update_raw_disk(disk.into(), &self.log);
223-
}
224-
HardwareUpdate::DiskRemoved(disk) => {
225-
self.raw_disks_tx
226-
.remove_raw_disk(disk.identity(), &self.log);
227-
}
228-
HardwareUpdate::DiskUpdated(disk) => {
229-
self.raw_disks_tx
230-
.add_or_update_raw_disk(disk.into(), &self.log);
231-
}
232-
},
233-
Err(broadcast::error::RecvError::Lagged(count)) => {
234-
warn!(self.log, "Hardware monitor missed {count} messages");
235-
self.check_latest_hardware_snapshot().await;
236-
}
237-
Err(broadcast::error::RecvError::Closed) => {
238-
// The `HardwareManager` monitoring task is an infinite loop -
239-
// the only way for us to get `Closed` here is if it panicked,
240-
// so we will propagate such a panic.
241-
panic!("Hardware manager monitor task panicked");
242-
}
243-
}
244-
}
245-
246189
async fn ensure_switch_zone_activated_or_deactivated(
247190
&mut self,
248191
is_tofino_available: bool,
@@ -296,29 +239,29 @@ impl HardwareMonitor {
296239
}
297240
}
298241

299-
// Observe the current hardware state manually.
242+
// Act on the current hardware snapshot.
300243
//
301-
// We use this when we're monitoring hardware for the first
302-
// time, and if we miss notifications.
244+
// We use this on startup and any time the snapshot changes.
303245
async fn check_latest_hardware_snapshot(&mut self) {
304246
if let Some(sled_agent) = &self.sled_agent {
305247
sled_agent.notify_nexus_about_self(&self.log).await;
306248
}
307249

250+
let snapshot = self.hardware_view_rx.borrow_and_update().clone();
308251
info!(
309252
self.log, "Checking current full hardware snapshot";
310-
"disks" => ?self.hardware_manager.disks(),
253+
"snapshot" => ?snapshot,
311254
);
312255

313256
let policy = self.current_switch_zone_policy();
314257
self.ensure_switch_zone_activated_or_deactivated(
315-
self.hardware_manager.is_scrimlet_asic_available(),
258+
snapshot.is_scrimlet_asic_available(),
316259
policy,
317260
)
318261
.await;
319262

320263
self.raw_disks_tx.set_raw_disks(
321-
self.hardware_manager.disks().into_values().map(RawDisk::from),
264+
snapshot.into_disks().into_values().map(RawDisk::from),
322265
&self.log,
323266
);
324267
}

0 commit comments

Comments
 (0)