|
6 | 6 |
|
7 | 7 |
|
8 | 8 | def func_wrapper(args): |
9 | | - (f, offset, *args) = args |
| 9 | + """ |
| 10 | + args: tuple(func, limit, offset, *extra_args) |
| 11 | + Returns list of (index, item) |
| 12 | + """ |
| 13 | + func, limit, offset, *extra_args = args |
10 | 14 | try: |
11 | | - items = f(*args) |
| 15 | + items = func(limit, offset, *extra_args) |
12 | 16 | except Exception as e: |
13 | | - log.error("Failed to run %s(offset=%d, args=%s)", f, offset, args) |
| 17 | + log.error( |
| 18 | + "Failed to run %s(limit=%d, offset=%d, args=%s)", |
| 19 | + func, |
| 20 | + limit, |
| 21 | + offset, |
| 22 | + extra_args, |
| 23 | + ) |
14 | 24 | log.exception(e) |
15 | 25 | items = [] |
16 | | - return list((i + offset, item) for i, item in enumerate(items)) |
| 26 | + |
| 27 | + return [(i + offset, item) for i, item in enumerate(items)] |
17 | 28 |
|
18 | 29 |
|
19 | 30 | def get_items( |
20 | 31 | func: Callable, |
| 32 | + total_count: int, |
21 | 33 | *args, |
22 | 34 | parse: Callable = lambda _: _, |
23 | 35 | chunk_size: int = 50, |
24 | 36 | processes: int = 2, |
25 | 37 | ): |
26 | 38 | """This function performs pagination on a function that supports `limit`/`offset` |
27 | 39 | parameters and it runs API requests in parallel to speed things up.""" |
| 40 | + offsets = list(range(0, total_count, chunk_size)) |
28 | 41 | items = [] |
29 | | - offsets = [-chunk_size] |
30 | | - remaining = chunk_size * processes |
31 | | - |
32 | | - with ThreadPoolExecutor( |
33 | | - processes, thread_name_prefix=f"mopidy-tidal-{func.__name__}-" |
34 | | - ) as pool: |
35 | | - while remaining == chunk_size * processes: |
36 | | - offsets = [offsets[-1] + chunk_size * (i + 1) for i in range(processes)] |
37 | | - |
38 | | - pool_results = pool.map( |
39 | | - func_wrapper, |
40 | | - [ |
41 | | - ( |
42 | | - func, |
43 | | - offset, |
44 | | - chunk_size, # limit |
45 | | - offset, # offset |
46 | | - *args, # extra args (e.g. order, order_direction) |
47 | | - ) |
48 | | - for offset in offsets |
49 | | - ], |
50 | | - ) |
51 | | - |
52 | | - new_items = [] |
53 | | - for results in pool_results: |
54 | | - new_items.extend(results) |
55 | 42 |
|
56 | | - remaining = len(new_items) |
57 | | - items.extend(new_items) |
| 43 | + with ThreadPoolExecutor(processes) as pool: |
| 44 | + args_list = [(func, chunk_size, offset, *args) for offset in offsets] |
58 | 45 |
|
59 | | - items = [_ for _ in items if _] |
60 | | - sorted_items = list( |
61 | | - map(lambda item: item[1], sorted(items, key=lambda item: item[0])) |
62 | | - ) |
| 46 | + for page_items in pool.map(func_wrapper, args_list): |
| 47 | + items.extend(page_items) |
63 | 48 |
|
64 | | - return list(map(parse, sorted_items)) |
| 49 | + items = [item for _, item in sorted(items, key=lambda x: x[0])] |
| 50 | + return list(map(parse, items)) |
0 commit comments