Skip to content

Commit 8cfeade

Browse files
committed
Fix batch matching by allowing same worker accept multiple jobs.
1 parent 7cd29c9 commit 8cfeade

2 files changed

Lines changed: 32 additions & 33 deletions

File tree

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub struct SchedulerMetrics {
6363
}
6464

6565
use crate::platform_property_manager::PlatformPropertyManager;
66-
use crate::worker::{ActionInfoWithProps, Worker, WorkerState, WorkerTimestamp, WorkerUpdate};
66+
use crate::worker::{reduce_platform_properties, Worker, ActionInfoWithProps, WorkerState, WorkerTimestamp, WorkerUpdate};
6767
use crate::worker_capability_index::WorkerCapabilityIndex;
6868
use crate::worker_registry::SharedWorkerRegistry;
6969
use crate::worker_scheduler::WorkerScheduler;
@@ -311,21 +311,6 @@ impl ApiWorkerSchedulerImpl {
311311
&self,
312312
platform_properties: &PlatformProperties,
313313
full_worker_logging: bool,
314-
) -> Option<WorkerId> {
315-
self.inner_find_worker_for_action_excluding(
316-
platform_properties,
317-
&HashSet::new(),
318-
full_worker_logging,
319-
)
320-
}
321-
322-
/// Finds a worker for an action, excluding workers in the given set.
323-
/// This is used by batch matching to avoid assigning the same worker to multiple actions.
324-
fn inner_find_worker_for_action_excluding(
325-
&self,
326-
platform_properties: &PlatformProperties,
327-
excluded_workers: &HashSet<WorkerId>,
328-
full_worker_logging: bool,
329314
) -> Option<WorkerId> {
330315
// Use capability index to get candidate workers that match STATIC properties
331316
// (Exact, Unknown) and have the required property keys (Priority, Minimum).
@@ -345,11 +330,6 @@ impl ApiWorkerSchedulerImpl {
345330
// The index only does presence checks for Minimum properties since their
346331
// values change dynamically as jobs are assigned to workers.
347332
let worker_matches = |(worker_id, w): &(&WorkerId, &Worker)| -> bool {
348-
// Skip workers that are already assigned in this batch
349-
if excluded_workers.contains(worker_id) {
350-
return false;
351-
}
352-
353333
if !w.can_accept_work() {
354334
if full_worker_logging {
355335
info!(
@@ -390,23 +370,42 @@ impl ApiWorkerSchedulerImpl {
390370

391371
/// Batch finds workers for multiple actions in a single pass.
392372
/// This reduces lock contention by acquiring the lock once for all actions.
393-
/// Returns a vector of (action_index, worker_id) pairs for successful matches.
373+
/// Returns a map of (action_index, worker_id) pairs for successful matches.
394374
fn inner_batch_find_workers_for_actions(
395375
&self,
396376
actions: &[&PlatformProperties],
397377
full_worker_logging: bool,
398-
) -> Vec<(usize, WorkerId)> {
399-
let mut results = Vec::with_capacity(actions.len());
400-
let mut assigned_workers: HashSet<WorkerId> = HashSet::new();
378+
) -> HashMap<usize, WorkerId> {
379+
let mut results = HashMap::with_capacity(actions.len());
380+
let mut workers_platform_properties = HashMap::new();
401381

402382
for (idx, platform_properties) in actions.iter().enumerate() {
403-
if let Some(worker_id) = self.inner_find_worker_for_action_excluding(
404-
platform_properties,
405-
&assigned_workers,
406-
full_worker_logging,
407-
) {
408-
assigned_workers.insert(worker_id.clone());
409-
results.push((idx, worker_id));
383+
let candidates = self
384+
.capability_index
385+
.find_matching_workers(platform_properties);
386+
if candidates.is_empty() {
387+
continue;
388+
}
389+
390+
for worker_id in candidates {
391+
if let Some(worker) = self.workers.peek(&worker_id) {
392+
if !worker.can_accept_work() {
393+
continue;
394+
}
395+
396+
if !workers_platform_properties.contains_key(&worker_id) {
397+
workers_platform_properties.insert(worker_id.clone(), worker.platform_properties.clone());
398+
}
399+
400+
if !platform_properties.is_satisfied_by(&workers_platform_properties[&worker_id], full_worker_logging) {
401+
continue;
402+
}
403+
404+
reduce_platform_properties(workers_platform_properties.get_mut(&worker_id).unwrap(), platform_properties);
405+
406+
results.insert(idx, worker_id.clone());
407+
break;
408+
}
410409
}
411410
}
412411

nativelink-scheduler/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ fn send_msg_to_worker(
130130
/// Reduces the platform properties available on the worker based on the platform properties provided.
131131
/// This is used because we allow more than 1 job to run on a worker at a time, and this is how the
132132
/// scheduler knows if more jobs can run on a given worker.
133-
fn reduce_platform_properties(
133+
pub fn reduce_platform_properties(
134134
parent_props: &mut PlatformProperties,
135135
reduction_props: &PlatformProperties,
136136
) {

0 commit comments

Comments
 (0)