|
22 | 22 | import time |
23 | 23 | from concurrent.futures import Future |
24 | 24 | from queue import Queue |
| 25 | +from threading import Lock |
25 | 26 | from typing import TYPE_CHECKING, Literal, Tuple, cast, get_args |
26 | 27 |
|
27 | 28 | logger = logging.getLogger(__name__) |
@@ -172,45 +173,42 @@ def __init__(self, max_workers=10, stack_size=None): |
172 | 173 | self.__workers = [ |
173 | 174 | _QThreadWorker(self.__queue, i + 1, stack_size) for i in range(max_workers) |
174 | 175 | ] |
| 176 | + self.__shutdown_lock = Lock() |
175 | 177 | self.__been_shutdown = False |
176 | 178 |
|
177 | 179 | for w in self.__workers: |
178 | 180 | w.start() |
179 | 181 |
|
180 | 182 | def submit(self, callback, *args, **kwargs): |
181 | | - if self.__been_shutdown: |
182 | | - raise RuntimeError("QThreadExecutor has been shutdown") |
| 183 | + with self.__shutdown_lock: |
| 184 | + if self.__been_shutdown: |
| 185 | + raise RuntimeError("QThreadExecutor has been shutdown") |
183 | 186 |
|
184 | | - future = Future() |
185 | | - self._logger.debug( |
186 | | - "Submitting callback %s with args %s and kwargs %s to thread worker queue", |
187 | | - callback, |
188 | | - args, |
189 | | - kwargs, |
190 | | - ) |
191 | | - self.__queue.put((future, callback, args, kwargs)) |
192 | | - return future |
| 187 | + future = Future() |
| 188 | + self._logger.debug( |
| 189 | + "Submitting callback %s with args %s and kwargs %s to thread worker queue", |
| 190 | + callback, |
| 191 | + args, |
| 192 | + kwargs, |
| 193 | + ) |
| 194 | + self.__queue.put((future, callback, args, kwargs)) |
| 195 | + return future |
193 | 196 |
|
194 | 197 | def map(self, func, *iterables, timeout=None): |
195 | 198 | raise NotImplementedError("use as_completed on the event loop") |
196 | 199 |
|
197 | 200 | def shutdown(self, wait=True): |
198 | | - if self.__been_shutdown: |
199 | | - raise RuntimeError("QThreadExecutor has been shutdown") |
200 | | - |
201 | | - self.__been_shutdown = True |
202 | | - |
203 | | - self._logger.debug("Shutting down") |
204 | | - for i in range(len(self.__workers)): |
205 | | - # Signal workers to stop |
206 | | - self.__queue.put(None) |
207 | | - if wait: |
208 | | - for w in self.__workers: |
209 | | - w.wait() |
| 201 | + with self.__shutdown_lock: |
| 202 | + self.__been_shutdown = True |
| 203 | + self._logger.debug("Shutting down") |
| 204 | + for i in range(len(self.__workers)): |
| 205 | + # Signal workers to stop |
| 206 | + self.__queue.put(None) |
| 207 | + if wait: |
| 208 | + for w in self.__workers: |
| 209 | + w.wait() |
210 | 210 |
|
211 | 211 | def __enter__(self, *args): |
212 | | - if self.__been_shutdown: |
213 | | - raise RuntimeError("QThreadExecutor has been shutdown") |
214 | 212 | return self |
215 | 213 |
|
216 | 214 | def __exit__(self, *args): |
|
0 commit comments