Skip to content

Commit 4e0ef19

Browse files
committed
Support the cancel_futures parameter to executor.shutdown()
1 parent 4e0195b commit 4e0ef19

2 files changed

Lines changed: 35 additions & 1 deletion

File tree

src/qasync/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,17 @@ def submit(self, callback, *args, **kwargs):
197197
def map(self, func, *iterables, timeout=None):
198198
raise NotImplementedError("use as_completed on the event loop")
199199

200-
def shutdown(self, wait=True):
200+
def shutdown(self, wait=True, *, cancel_futures=False):
201201
with self.__shutdown_lock:
202202
self.__been_shutdown = True
203203
self._logger.debug("Shutting down")
204+
if cancel_futures:
205+
# pop all the futures and cancel them
206+
while not self.__queue.empty():
207+
item = self.__queue.get_nowait()
208+
if item is not None:
209+
future, _, _, _ = item
210+
future.cancel()
204211
for i in range(len(self.__workers)):
205212
# Signal workers to stop
206213
self.__queue.put(None)

tests/test_qthreadexec.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
# BSD License
55
import logging
66
import threading
7+
import time
78
import weakref
9+
from concurrent.futures import CancelledError
810

911
import pytest
1012

@@ -118,3 +120,28 @@ def test_context(executor):
118120
# but will fail when we submit
119121
with pytest.raises(RuntimeError):
120122
executor.submit(lambda: 42)
123+
124+
125+
@pytest.mark.parametrize("cancel", [True, False])
126+
def test_shutdown_cancel_futures(executor, cancel):
127+
"""Test that shutdown with cancel_futures=True cancels all remaining futures in the queue."""
128+
129+
def task():
130+
time.sleep(0.01)
131+
132+
# Submit ten tasks to the executor
133+
futures = [executor.submit(task) for _ in range(10)]
134+
# shut it down
135+
executor.shutdown(cancel_futures=cancel)
136+
137+
cancels = 0
138+
for future in futures:
139+
try:
140+
future.result(timeout=0.01)
141+
except CancelledError:
142+
cancels += 1
143+
144+
if cancel:
145+
assert cancels > 0
146+
else:
147+
assert cancels == 0

0 commit comments

Comments
 (0)