From 4b30320e3a2ee6e05740b79d27231290a09703e6 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 00:31:39 +0100 Subject: [PATCH 01/21] wip working test_pool --- Lib/concurrent/futures/_base.py | 41 +++++++++++++++++- Lib/concurrent/futures/process.py | 6 ++- Lib/concurrent/futures/test.py | 11 +++++ Lib/test/test_concurrent_futures/test_pool.py | 42 +++++++++++++++++++ .../test_thread_pool.py | 1 + _test.py | 32 ++++++++++++++ test_buffered_map.py | 11 +++++ 7 files changed, 141 insertions(+), 3 deletions(-) create mode 100644 Lib/concurrent/futures/test.py create mode 100644 Lib/test/test_concurrent_futures/test_pool.py create mode 100644 _test.py create mode 100644 test_buffered_map.py diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 707fcdfde79acdb..7b66638c0c02419 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,11 +4,15 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +import itertools import logging +from multiprocessing import Queue import threading import time import types +from contextlib import suppress + FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' @@ -572,7 +576,35 @@ def submit(self, fn, /, *args, **kwargs): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1): + def _buffered_map(self, fn, timeout, buffersize, *iterables): + if timeout is not None: + end_time = timeout + time.monotonic() + + zip_iterator = iter(zip(*iterables)) + fs = collections.deque( + (self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize)), + maxlen=buffersize, + ) + + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + result = _result_or_cancel(fs.popleft()) + else: + result = _result_or_cancel(fs.popleft(), end_time - time.monotonic()) + with suppress(StopIteration): + fs.append(self.submit(fn, *next(zip_iterator))) + yield result + finally: + for future in fs: + future.cancel() + return result_iterator() + + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -584,6 +616,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. + buffersize: The number of not-yet-yielded results buffered. + If the buffer is full, then iteration over `iterables` is paused + until an element is yielded out of the buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -594,6 +629,10 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): before the given timeout. Exception: If fn(*args) raises for any values. """ + if buffersize is not None: + if buffersize < 1: + raise ValueError("buffersize must be None or >= 1.") + return self._buffered_map(fn, timeout, buffersize, *iterables) if timeout is not None: end_time = timeout + time.monotonic() diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7092b4757b5429f..3bb2d10733f3267 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -839,7 +839,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), itertools.batched(zip(*iterables), chunksize), - timeout=timeout) + timeout=timeout, + buffersize=buffersize) return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True, *, cancel_futures=False): @@ -863,3 +864,4 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + # type: ignore \ No newline at end of file diff --git a/Lib/concurrent/futures/test.py b/Lib/concurrent/futures/test.py new file mode 100644 index 000000000000000..f2c278d24374185 --- /dev/null +++ b/Lib/concurrent/futures/test.py @@ -0,0 +1,11 @@ +from thread import ThreadPoolExecutor +import time +from typing import List + +concurrency = 8 +with ThreadPoolExecutor(max_workers=concurrency) as executor: + l: List[int] = [] + executor.map(l.append, range(1000)) + print(l) + time.sleep(1) + assert len(l) <= concurrency + 1 \ No newline at end of file diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py new file mode 100644 index 000000000000000..80ef1b8292c10be --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -0,0 +1,42 @@ +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from multiprocessing import Manager +import time +import unittest + +from .executor import ExecutorTest +from .util import BaseTestCase, setup_module + + +class PoolExecutorTest(ExecutorTest, BaseTestCase): + def test_map_buffersize(self): + manager = Manager() + for ExecutorType in (ThreadPoolExecutor, ProcessPoolExecutor): + with ExecutorType(max_workers=1) as pool: + with self.assertRaisesRegex( + ValueError, "buffersize must be None or >= 1." + ): + pool.map(bool, [], buffersize=0) + + for buffersize in [1, 5]: + iterable = range(10) + processed_elements = manager.list() + + with ExecutorType(max_workers=1) as pool: + iterator = pool.map( + processed_elements.append, iterable, buffersize=buffersize + ) + time.sleep(0.2) # wait for buffered futures to finish + self.assertSetEqual(set(processed_elements), set(range(buffersize))) + next(iterator) + time.sleep(0.1) # wait for the created future to finish + self.assertSetEqual( + set(processed_elements), set(range(buffersize + 1)) + ) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py index 2b5bea9f4055a29..b35f55ca701b871 100644 --- a/Lib/test/test_concurrent_futures/test_thread_pool.py +++ b/Lib/test/test_concurrent_futures/test_thread_pool.py @@ -4,6 +4,7 @@ import multiprocessing.util import os import threading +import time import unittest from concurrent import futures from test import support diff --git a/_test.py b/_test.py new file mode 100644 index 000000000000000..88498113e3deda1 --- /dev/null +++ b/_test.py @@ -0,0 +1,32 @@ +import math +from streamable import Stream + +import time + +def factors(n: int): + return [i for i in range(1, math.floor(n // 2) + 1) if not n % i] +assert factors(10) == [1, 2, 5] + +N = 1000000 +if __name__ == "__main__": + stream = Stream([N] * 100) + # mono thread + start_time = time.time() + print("sum", sum(stream.map(factors).map(len))) + print(time.time() - start_time) + + # concurrent 8 threads + start_time = time.time() + print("sum", sum(stream.map(factors, concurrency=8).map(len))) + print(time.time() - start_time) + + # concurrent 8 processes + start_time = time.time() + print("sum", sum(stream.map(factors, concurrency=8, via_processes=True).map(len))) + print(time.time() - start_time) + +""" +./configure --disable-gil +make -j +sudo make install +""" \ No newline at end of file diff --git a/test_buffered_map.py b/test_buffered_map.py new file mode 100644 index 000000000000000..04ae4cfae899a6b --- /dev/null +++ b/test_buffered_map.py @@ -0,0 +1,11 @@ +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor + +concurrency = 8 +def printing_identity(_): + print(_) + return _ +if __name__ == "__main__": + with ProcessPoolExecutor(concurrency) as executor: + it = executor.map(int, map(printing_identity, range(15)), buffersize=4) + print("next", next(it), "endnext") + print("next", next(it), "endnext") \ No newline at end of file From 8d8be23a7c700fe15db006d3b2586b46fc305836 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 00:50:55 +0100 Subject: [PATCH 02/21] wip map itself edited --- Lib/concurrent/futures/_base.py | 57 +++++++++++---------------------- 1 file changed, 19 insertions(+), 38 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 7b66638c0c02419..c7f43900670a7e0 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -7,6 +7,7 @@ import itertools import logging from multiprocessing import Queue +import sys import threading import time import types @@ -576,35 +577,7 @@ def submit(self, fn, /, *args, **kwargs): """ raise NotImplementedError() - def _buffered_map(self, fn, timeout, buffersize, *iterables): - if timeout is not None: - end_time = timeout + time.monotonic() - - zip_iterator = iter(zip(*iterables)) - fs = collections.deque( - (self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize)), - maxlen=buffersize, - ) - - # Yield must be hidden in closure so that the futures are submitted - # before the first iterator value is required. - def result_iterator(): - try: - while fs: - # Careful not to keep a reference to the popped future - if timeout is None: - result = _result_or_cancel(fs.popleft()) - else: - result = _result_or_cancel(fs.popleft(), end_time - time.monotonic()) - with suppress(StopIteration): - fs.append(self.submit(fn, *next(zip_iterator))) - yield result - finally: - for future in fs: - future.cancel() - return result_iterator() - - def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=sys.maxsize): """Returns an iterator equivalent to map(fn, iter). Args: @@ -629,27 +602,35 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): before the given timeout. Exception: If fn(*args) raises for any values. """ - if buffersize is not None: - if buffersize < 1: - raise ValueError("buffersize must be None or >= 1.") - return self._buffered_map(fn, timeout, buffersize, *iterables) + if buffersize < 1: + raise ValueError("buffersize must be None or >= 1.") + if timeout is not None: end_time = timeout + time.monotonic() - fs = [self.submit(fn, *args) for args in zip(*iterables)] + zip_iterator = iter(zip(*iterables)) + + fs = collections.deque( + (self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize)), + maxlen=buffersize, + ) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): + has_next = True try: - # reverse to keep finishing order - fs.reverse() while fs: + if has_next: + try: + fs.append(self.submit(fn, *next(zip_iterator))) + except StopIteration: + has_next = False # Careful not to keep a reference to the popped future if timeout is None: - yield _result_or_cancel(fs.pop()) + yield _result_or_cancel(fs.popleft()) else: - yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) + yield _result_or_cancel(fs.popleft(), end_time - time.monotonic()) finally: for future in fs: future.cancel() From 09bd5db26efc00ddf3d8069ddb16dc0d35e3f50e Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 01:09:01 +0100 Subject: [PATCH 03/21] wip --- Lib/concurrent/futures/_base.py | 5 ++--- Lib/test/test_concurrent_futures/test_pool.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index c7f43900670a7e0..137f9519388f63b 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -577,7 +577,7 @@ def submit(self, fn, /, *args, **kwargs): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=sys.maxsize): + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -602,7 +602,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=sys.maxsize) before the given timeout. Exception: If fn(*args) raises for any values. """ - if buffersize < 1: + if buffersize is not None and buffersize < 1: raise ValueError("buffersize must be None or >= 1.") if timeout is not None: @@ -612,7 +612,6 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=sys.maxsize) fs = collections.deque( (self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize)), - maxlen=buffersize, ) # Yield must be hidden in closure so that the futures are submitted diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py index 80ef1b8292c10be..f0400edd48c0abd 100644 --- a/Lib/test/test_concurrent_futures/test_pool.py +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -3,11 +3,10 @@ import time import unittest -from .executor import ExecutorTest from .util import BaseTestCase, setup_module -class PoolExecutorTest(ExecutorTest, BaseTestCase): +class PoolExecutorTest(BaseTestCase): def test_map_buffersize(self): manager = Manager() for ExecutorType in (ThreadPoolExecutor, ProcessPoolExecutor): From af40616b7b40003a84ef1d8028df61cd0ee96061 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 01:13:05 +0100 Subject: [PATCH 04/21] wip --- Lib/concurrent/futures/_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 137f9519388f63b..1954684622d40ac 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -613,11 +613,10 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): fs = collections.deque( (self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize)), ) - # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): - has_next = True + has_next = len(fs) == buffersize try: while fs: if has_next: From 619415592a4aa6bab073d40789a47e0b9c98e658 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 01:17:46 +0100 Subject: [PATCH 05/21] wip --- Lib/concurrent/futures/_base.py | 8 +++----- Lib/concurrent/futures/process.py | 4 +++- Lib/test/test_concurrent_futures/test_thread_pool.py | 3 --- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 1954684622d40ac..73c4d2d01d60630 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -6,13 +6,10 @@ import collections import itertools import logging -from multiprocessing import Queue -import sys import threading import time import types -from contextlib import suppress FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' @@ -589,8 +586,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. - buffersize: The number of not-yet-yielded results buffered. - If the buffer is full, then iteration over `iterables` is paused + buffersize: The maximum number of not-yet-yielded results buffered. + If the buffer is full, then iteration over `iterables` is paused until an element is yielded out of the buffer. Returns: @@ -613,6 +610,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): fs = collections.deque( (self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize)), ) + # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 3bb2d10733f3267..f60de51877f9619 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -51,7 +51,6 @@ import multiprocessing as mp # This import is required to load the multiprocessing.connection submodule # so that it can be accessed later as `mp.connection` -import multiprocessing.connection from multiprocessing.queues import Queue import threading import weakref @@ -824,6 +823,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. + buffersize: The maximum number of not-yet-yielded results buffered. + If the buffer is full, then iteration over `iterables` is paused + until an element is yielded out of the buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py index b35f55ca701b871..6c80d05572e6fb8 100644 --- a/Lib/test/test_concurrent_futures/test_thread_pool.py +++ b/Lib/test/test_concurrent_futures/test_thread_pool.py @@ -1,10 +1,7 @@ import contextlib import multiprocessing as mp -import multiprocessing.process -import multiprocessing.util import os import threading -import time import unittest from concurrent import futures from test import support From 004b7ae63e7fd9838ea92fd956f0d7e10ba909a7 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 01:18:44 +0100 Subject: [PATCH 06/21] wip --- Lib/concurrent/futures/test.py | 11 ----------- _test.py | 32 -------------------------------- test_buffered_map.py | 11 ----------- 3 files changed, 54 deletions(-) delete mode 100644 Lib/concurrent/futures/test.py delete mode 100644 _test.py delete mode 100644 test_buffered_map.py diff --git a/Lib/concurrent/futures/test.py b/Lib/concurrent/futures/test.py deleted file mode 100644 index f2c278d24374185..000000000000000 --- a/Lib/concurrent/futures/test.py +++ /dev/null @@ -1,11 +0,0 @@ -from thread import ThreadPoolExecutor -import time -from typing import List - -concurrency = 8 -with ThreadPoolExecutor(max_workers=concurrency) as executor: - l: List[int] = [] - executor.map(l.append, range(1000)) - print(l) - time.sleep(1) - assert len(l) <= concurrency + 1 \ No newline at end of file diff --git a/_test.py b/_test.py deleted file mode 100644 index 88498113e3deda1..000000000000000 --- a/_test.py +++ /dev/null @@ -1,32 +0,0 @@ -import math -from streamable import Stream - -import time - -def factors(n: int): - return [i for i in range(1, math.floor(n // 2) + 1) if not n % i] -assert factors(10) == [1, 2, 5] - -N = 1000000 -if __name__ == "__main__": - stream = Stream([N] * 100) - # mono thread - start_time = time.time() - print("sum", sum(stream.map(factors).map(len))) - print(time.time() - start_time) - - # concurrent 8 threads - start_time = time.time() - print("sum", sum(stream.map(factors, concurrency=8).map(len))) - print(time.time() - start_time) - - # concurrent 8 processes - start_time = time.time() - print("sum", sum(stream.map(factors, concurrency=8, via_processes=True).map(len))) - print(time.time() - start_time) - -""" -./configure --disable-gil -make -j -sudo make install -""" \ No newline at end of file diff --git a/test_buffered_map.py b/test_buffered_map.py deleted file mode 100644 index 04ae4cfae899a6b..000000000000000 --- a/test_buffered_map.py +++ /dev/null @@ -1,11 +0,0 @@ -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor - -concurrency = 8 -def printing_identity(_): - print(_) - return _ -if __name__ == "__main__": - with ProcessPoolExecutor(concurrency) as executor: - it = executor.map(int, map(printing_identity, range(15)), buffersize=4) - print("next", next(it), "endnext") - print("next", next(it), "endnext") \ No newline at end of file From 2a47f053fa987ed7d07ce3ea396e06d07a43a1d8 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 01:20:15 +0100 Subject: [PATCH 07/21] wip --- Lib/concurrent/futures/process.py | 2 +- Lib/test/test_concurrent_futures/test_thread_pool.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f60de51877f9619..548f376b021d7b7 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -51,6 +51,7 @@ import multiprocessing as mp # This import is required to load the multiprocessing.connection submodule # so that it can be accessed later as `mp.connection` +import multiprocessing.connection from multiprocessing.queues import Queue import threading import weakref @@ -866,4 +867,3 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ - # type: ignore \ No newline at end of file diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py index 6c80d05572e6fb8..2b5bea9f4055a29 100644 --- a/Lib/test/test_concurrent_futures/test_thread_pool.py +++ b/Lib/test/test_concurrent_futures/test_thread_pool.py @@ -1,5 +1,7 @@ import contextlib import multiprocessing as mp +import multiprocessing.process +import multiprocessing.util import os import threading import unittest From 5096a766e81998422d48246458d0106525077082 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 01:20:39 +0100 Subject: [PATCH 08/21] wip --- Lib/concurrent/futures/_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 73c4d2d01d60630..94ebed97de5443a 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -10,7 +10,6 @@ import time import types - FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' From c9d44c4a1813c2153a459f14cb2d9883e1264704 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 14:40:00 +0100 Subject: [PATCH 09/21] back to list for general use case --- Lib/concurrent/futures/_base.py | 16 ++++++++++------ Lib/test/test_concurrent_futures/test_pool.py | 2 -- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 94ebed97de5443a..dece128d2df4ebd 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -606,9 +606,13 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): zip_iterator = iter(zip(*iterables)) - fs = collections.deque( - (self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize)), - ) + if buffersize: + fs = collections.deque( + self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize) + ) + else: + fs = [self.submit(fn, *args) for args in zip_iterator] + fs.reverse() # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. @@ -618,14 +622,14 @@ def result_iterator(): while fs: if has_next: try: - fs.append(self.submit(fn, *next(zip_iterator))) + fs.appendleft(self.submit(fn, *next(zip_iterator))) except StopIteration: has_next = False # Careful not to keep a reference to the popped future if timeout is None: - yield _result_or_cancel(fs.popleft()) + yield _result_or_cancel(fs.pop()) else: - yield _result_or_cancel(fs.popleft(), end_time - time.monotonic()) + yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) finally: for future in fs: future.cancel() diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py index f0400edd48c0abd..91a704e89818e51 100644 --- a/Lib/test/test_concurrent_futures/test_pool.py +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -15,11 +15,9 @@ def test_map_buffersize(self): ValueError, "buffersize must be None or >= 1." ): pool.map(bool, [], buffersize=0) - for buffersize in [1, 5]: iterable = range(10) processed_elements = manager.list() - with ExecutorType(max_workers=1) as pool: iterator = pool.map( processed_elements.append, iterable, buffersize=buffersize From eaad1f225cd9607cf30d1d371ad203c32388c6cc Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 15:33:08 +0100 Subject: [PATCH 10/21] wip args_iter --- Lib/concurrent/futures/_base.py | 21 +++++++++++-------- Lib/test/test_concurrent_futures/test_pool.py | 12 +++++++---- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index dece128d2df4ebd..72dac778d948dab 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -604,27 +604,30 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): if timeout is not None: end_time = timeout + time.monotonic() - zip_iterator = iter(zip(*iterables)) - + args_iter = iter(zip(*iterables)) if buffersize: fs = collections.deque( - self.submit(fn, *args) for args in itertools.islice(zip_iterator, buffersize) + self.submit(fn, *args) for args in itertools.islice(args_iter, buffersize) ) else: - fs = [self.submit(fn, *args) for args in zip_iterator] - fs.reverse() + fs = [self.submit(fn, *args) for args in args_iter] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): - has_next = len(fs) == buffersize try: + # reverse to have the first future on the right + fs.reverse() + # args may be remaining if futures buffer is full + args_iter_has_next = len(fs) == buffersize + while fs: - if has_next: + if args_iter_has_next: try: - fs.appendleft(self.submit(fn, *next(zip_iterator))) + fs.appendleft(self.submit(fn, *next(args_iter))) except StopIteration: - has_next = False + args_iter_has_next = False + # Careful not to keep a reference to the popped future if timeout is None: yield _result_or_cancel(fs.pop()) diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py index 91a704e89818e51..4934845aeaa6e43 100644 --- a/Lib/test/test_concurrent_futures/test_pool.py +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -15,19 +15,23 @@ def test_map_buffersize(self): ValueError, "buffersize must be None or >= 1." ): pool.map(bool, [], buffersize=0) - for buffersize in [1, 5]: - iterable = range(10) + for buffersize, iterable_size in [ + (1, 5), + (5, 5), + (10, 5), + ]: + iterable = range(iterable_size) processed_elements = manager.list() with ExecutorType(max_workers=1) as pool: iterator = pool.map( processed_elements.append, iterable, buffersize=buffersize ) time.sleep(0.2) # wait for buffered futures to finish - self.assertSetEqual(set(processed_elements), set(range(buffersize))) + self.assertSetEqual(set(processed_elements), set(range(min(buffersize, iterable_size)))) next(iterator) time.sleep(0.1) # wait for the created future to finish self.assertSetEqual( - set(processed_elements), set(range(buffersize + 1)) + set(processed_elements), set(range(min(buffersize + 1, iterable_size))) ) From f009f40c1496c1a3d7bd68a996061a9f740af9e5 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 15:47:19 +0100 Subject: [PATCH 11/21] wip islice short, comment --- Lib/concurrent/futures/_base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 72dac778d948dab..0eb80129f8f2509 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,7 +4,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections -import itertools +from itertools import islice import logging import threading import time @@ -607,7 +607,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): args_iter = iter(zip(*iterables)) if buffersize: fs = collections.deque( - self.submit(fn, *args) for args in itertools.islice(args_iter, buffersize) + self.submit(fn, *args) for args in islice(args_iter, buffersize) ) else: fs = [self.submit(fn, *args) for args in args_iter] @@ -616,9 +616,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # before the first iterator value is required. def result_iterator(): try: - # reverse to have the first future on the right + # reverse so that pop is FIFO fs.reverse() - # args may be remaining if futures buffer is full + # args may be remaining if buffersize has been reached args_iter_has_next = len(fs) == buffersize while fs: From ba02d7a8ecccbd764d658777f62de550e618ea5a Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 14 Oct 2024 00:46:58 +0100 Subject: [PATCH 12/21] args_iter in non buffer --- Lib/concurrent/futures/_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 0eb80129f8f2509..862c856700a880a 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -604,13 +604,13 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): if timeout is not None: end_time = timeout + time.monotonic() - args_iter = iter(zip(*iterables)) if buffersize: + args_iter = iter(zip(*iterables)) fs = collections.deque( self.submit(fn, *args) for args in islice(args_iter, buffersize) ) else: - fs = [self.submit(fn, *args) for args in args_iter] + fs = [self.submit(fn, *args) for args in zip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. From b0c9737983c1db04603a15f1adb6c8fccc2b2eb3 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 14 Oct 2024 14:43:58 +0100 Subject: [PATCH 13/21] launch_next --- Lib/concurrent/futures/_base.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 862c856700a880a..d0d7e16e3b6f2ad 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -609,8 +609,12 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): fs = collections.deque( self.submit(fn, *args) for args in islice(args_iter, buffersize) ) + def launch_next(): + if (args := next(args_iter, None)) is not None: + fs.appendleft(self.submit(fn, *args)) else: fs = [self.submit(fn, *args) for args in zip(*iterables)] + launch_next = lambda: None # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. @@ -618,16 +622,9 @@ def result_iterator(): try: # reverse so that pop is FIFO fs.reverse() - # args may be remaining if buffersize has been reached - args_iter_has_next = len(fs) == buffersize while fs: - if args_iter_has_next: - try: - fs.appendleft(self.submit(fn, *next(args_iter))) - except StopIteration: - args_iter_has_next = False - + launch_next() # Careful not to keep a reference to the popped future if timeout is None: yield _result_or_cancel(fs.pop()) From 7dff5aa55d9c17a777278b0a2652db6ee7d924eb Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 14 Oct 2024 14:45:29 +0100 Subject: [PATCH 14/21] revert comment change --- Lib/concurrent/futures/_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d0d7e16e3b6f2ad..bb647ddb76fa05f 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -620,9 +620,8 @@ def launch_next(): # before the first iterator value is required. def result_iterator(): try: - # reverse so that pop is FIFO + # reverse to keep finishing order fs.reverse() - while fs: launch_next() # Careful not to keep a reference to the popped future From 49223c4acd4159e1558ba486a187f80c1a7f7789 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 14 Oct 2024 14:46:46 +0100 Subject: [PATCH 15/21] wip --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index bb647ddb76fa05f..28fe3695121a964 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -623,8 +623,8 @@ def result_iterator(): # reverse to keep finishing order fs.reverse() while fs: - launch_next() # Careful not to keep a reference to the popped future + launch_next() if timeout is None: yield _result_or_cancel(fs.pop()) else: From 7096d5d03ccf9dfb64752a90402440b9f1a2a41c Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 14 Oct 2024 15:39:57 +0100 Subject: [PATCH 16/21] weakref --- Lib/concurrent/futures/_base.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 28fe3695121a964..3b7a40f3d2e5d96 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -9,6 +9,7 @@ import threading import time import types +import weakref FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' @@ -604,17 +605,14 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): if timeout is not None: end_time = timeout + time.monotonic() + executor = weakref.ref(self) + args_iter = iter(zip(*iterables)) if buffersize: - args_iter = iter(zip(*iterables)) fs = collections.deque( self.submit(fn, *args) for args in islice(args_iter, buffersize) ) - def launch_next(): - if (args := next(args_iter, None)) is not None: - fs.appendleft(self.submit(fn, *args)) else: - fs = [self.submit(fn, *args) for args in zip(*iterables)] - launch_next = lambda: None + fs = [self.submit(fn, *args) for args in args_iter] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. @@ -624,7 +622,8 @@ def result_iterator(): fs.reverse() while fs: # Careful not to keep a reference to the popped future - launch_next() + if executor() and (args := next(args_iter, None)) is not None: + fs.appendleft(executor().submit(fn, *args)) if timeout is None: yield _result_or_cancel(fs.pop()) else: From 747ba46a1645941e888fbf039dbe87e4faeae93e Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 14 Oct 2024 15:58:36 +0100 Subject: [PATCH 17/21] weakref --- Lib/concurrent/futures/_base.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 3b7a40f3d2e5d96..5e43033333f38b0 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -605,7 +605,6 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): if timeout is not None: end_time = timeout + time.monotonic() - executor = weakref.ref(self) args_iter = iter(zip(*iterables)) if buffersize: fs = collections.deque( @@ -614,6 +613,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): else: fs = [self.submit(fn, *args) for args in args_iter] + executor_weakref = weakref.ref(self) + # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): @@ -622,8 +623,8 @@ def result_iterator(): fs.reverse() while fs: # Careful not to keep a reference to the popped future - if executor() and (args := next(args_iter, None)) is not None: - fs.appendleft(executor().submit(fn, *args)) + if (args := next(args_iter, None)) and (executor := executor_weakref()): + fs.appendleft(executor.submit(fn, *args)) if timeout is None: yield _result_or_cancel(fs.pop()) else: From 5150fdf01fda197d3ea8bf9fa8a6f9f91a662098 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 15 Oct 2024 08:44:20 +0100 Subject: [PATCH 18/21] if buffersize --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 5e43033333f38b0..fb8a671bc09f8a1 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -623,7 +623,7 @@ def result_iterator(): fs.reverse() while fs: # Careful not to keep a reference to the popped future - if (args := next(args_iter, None)) and (executor := executor_weakref()): + if buffersize and (args := next(args_iter, None)) and (executor := executor_weakref()): fs.appendleft(executor.submit(fn, *args)) if timeout is None: yield _result_or_cancel(fs.pop()) From 92074e4cf77c4f1a54c5c5ba2cd5bd5c6f3c7b4e Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 15 Oct 2024 09:04:17 +0100 Subject: [PATCH 19/21] wip --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index fb8a671bc09f8a1..b804ea3cf1187ae 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -623,7 +623,7 @@ def result_iterator(): fs.reverse() while fs: # Careful not to keep a reference to the popped future - if buffersize and (args := next(args_iter, None)) and (executor := executor_weakref()): + if buffersize and (executor := executor_weakref()) and (args := next(args_iter, None)): fs.appendleft(executor.submit(fn, *args)) if timeout is None: yield _result_or_cancel(fs.pop()) From aae0baf37944c88c25159aff6604d49245563c35 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 17 Oct 2024 21:26:20 +0100 Subject: [PATCH 20/21] wip --- Doc/library/concurrent.futures.rst | 9 +++++++-- Lib/test/test_concurrent_futures/test_pool.py | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index ce72127127c7a61..a6d11a7ec882ff6 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -39,11 +39,13 @@ Executor Objects future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(fn, *iterables, timeout=None, chunksize=1) + .. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None) Similar to :func:`map(fn, *iterables) ` except: - * the *iterables* are collected immediately rather than lazily; + * the *iterables* are collected immediately rather than lazily, except if + a *buffersize* is specified: If the buffer is full, then the iteration + over *iterables* is paused until a result is yielded out of the buffer. * *fn* is executed asynchronously and several calls to *fn* may be made concurrently. @@ -68,6 +70,9 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. + .. versionchanged:: 3.15 + Added the *buffersize* argument. + .. method:: shutdown(wait=True, *, cancel_futures=False) Signal the executor that it should free any resources that it is using diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py index 4934845aeaa6e43..adaf29c67fa121e 100644 --- a/Lib/test/test_concurrent_futures/test_pool.py +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -15,6 +15,7 @@ def test_map_buffersize(self): ValueError, "buffersize must be None or >= 1." ): pool.map(bool, [], buffersize=0) + for buffersize, iterable_size in [ (1, 5), (5, 5), From 0e5d3dc727959095274fb2bdf1eb136ea83aada6 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 17 Oct 2024 21:29:40 +0100 Subject: [PATCH 21/21] wip --- Doc/library/concurrent.futures.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index a6d11a7ec882ff6..85ad025cad4f54a 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -43,9 +43,9 @@ Executor Objects Similar to :func:`map(fn, *iterables) ` except: - * the *iterables* are collected immediately rather than lazily, except if - a *buffersize* is specified: If the buffer is full, then the iteration - over *iterables* is paused until a result is yielded out of the buffer. + * the *iterables* are collected immediately rather than lazily, unless a + *buffersize* is specified: If the buffer is full, then the iteration + over *iterables* is paused until a result is yielded from the buffer. * *fn* is executed asynchronously and several calls to *fn* may be made concurrently.