@@ -487,11 +487,18 @@ async def test_parallel_dispatch(fxt: Fxt, num_jobs: int) -> None:
487487 @pytest .mark .timeout (DEFAULT_TIMEOUT )
488488 @pytest .mark .parametrize ("num_jobs" , [5 , 10 , 20 ])
489489 @pytest .mark .parametrize ("max_parallel" , [1 , 5 , 15 , 25 ])
490- async def test_max_parallel (fxt : Fxt , num_jobs : int , max_parallel : int ) -> None :
491- """Test that max parallel limits of launchers are used & respected."""
490+ @pytest .mark .parametrize ("on_scheduler" , [True , False ])
491+ async def test_max_parallel (
492+ fxt : Fxt , num_jobs : int , max_parallel : int , * , on_scheduler : bool
493+ ) -> None :
494+ """Test that max parallel limits of launchers & the scheduler are used & respected."""
492495 jobs = make_many_jobs (fxt .tmp_path , num_jobs )
493- fxt .mock_legacy_backend .max_parallelism = max_parallel
494- scheduler = Scheduler (jobs , fxt .backends , MOCK_BACKEND )
496+ if on_scheduler :
497+ fxt .mock_legacy_backend .max_parallelism = 0
498+ scheduler = Scheduler (jobs , fxt .backends , MOCK_BACKEND , max_parallelism = max_parallel )
499+ else :
500+ fxt .mock_legacy_backend .max_parallelism = max_parallel
501+ scheduler = Scheduler (jobs , fxt .backends , MOCK_BACKEND )
495502 assert_that (fxt .mock_ctx .max_concurrent , equal_to (0 ))
496503 result = await scheduler .run ()
497504 _assert_result_status (result , num_jobs )
@@ -893,16 +900,22 @@ async def test_blocked_weight_starvation(fxt: Fxt) -> None:
893900 assert_that (fxt .mock_ctx .order_started [0 ], equal_to (start_job ))
894901 assert_that (fxt .mock_ctx .order_started [- 1 ], equal_to (high ))
895902
896- # TODO: we do not currently test the logic to schedule multiple queued jobs per target
897- # across different targets based on the weights of those jobs/targets, because this
898- # will require the test to be quite complex and specific to the intricacies of the
899- # current DVSim scheduler due to the current implementation. Due to only one successor
900- # in another target being discovered at once, we must carefully construct a dependency
901- # tree of jobs with specially modelled delays which relies on this implementation
902- # detail. Instead, for now at least, we leave this untested.
903- #
904- # Note also that DVSim currently assumes weights within a target are constant,
905- # which may not be the case with the current JobSpec model.
903+ @staticmethod
904+ @pytest .mark .asyncio
905+ @pytest .mark .timeout (DEFAULT_TIMEOUT )
906+ async def test_custom_priority (fxt : Fxt ) -> None :
907+ """Test that a custom prioritization function can be given to and used by the scheduler."""
908+ jobs = make_many_jobs (
909+ fxt .tmp_path , n = 5 , per_job = lambda n : {"name" : str (n ), "weight" : n + 1 }
910+ )
911+ # Prioritizes jobs via their names (lower names have higher priority, so come first).
912+ # So jobs should be scheduled in the order created, instead of the opposite default order
913+ # by decreasing weight.
914+ result = await Scheduler (
915+ jobs , fxt .backends , MOCK_BACKEND , priority_fn = lambda job : - int (job .spec .name )
916+ ).run ()
917+ _assert_result_status (result , len (jobs ))
918+ assert_that (fxt .mock_ctx .order_started , equal_to (jobs ))
906919
907920
908921class TestSignals :
0 commit comments