Skip to content

Commit de86238

Browse files
committed
feat: adaptive concurrency
1 parent df96a54 commit de86238

2 files changed

Lines changed: 79 additions & 33 deletions

File tree

tardis_client/data_downloader.py

Lines changed: 78 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,28 @@
1919
logger = logging.getLogger(__name__)
2020

2121

22+
class _AdaptiveConcurrency:
23+
def __init__(self, maximum, minimum=1):
24+
self._limit = maximum
25+
self._minimum = minimum
26+
self._maximum = maximum
27+
self._last_throttle = 0.0
28+
29+
def on_success(self):
30+
self._limit = min(self._maximum, self._limit + 1)
31+
32+
def on_throttle(self):
33+
now = time()
34+
if now - self._last_throttle < 2.0:
35+
return
36+
self._last_throttle = now
37+
self._limit = max(self._minimum, self._limit * 7 // 10)
38+
39+
@property
40+
def limit(self):
41+
return self._limit
42+
43+
2244
async def fetch_data_to_replay(exchange, from_date, to_date, filters, endpoint, cache_dir, api_key, http_timeout, http_proxy):
2345
timeout = aiohttp.ClientTimeout(total=http_timeout)
2446
headers = {
@@ -28,7 +50,7 @@ async def fetch_data_to_replay(exchange, from_date, to_date, filters, endpoint,
2850

2951
minutes_diff = int(round((to_date - from_date).total_seconds() / 60))
3052
offset = 0
31-
FETCH_DATA_CONCURRENCY_LIMIT = 60
53+
ac = _AdaptiveConcurrency(maximum=60)
3254
fetch_data_tasks = set()
3355

3456
start_time = time()
@@ -42,24 +64,40 @@ async def fetch_data_to_replay(exchange, from_date, to_date, filters, endpoint,
4264
)
4365

4466
async with aiohttp.ClientSession(auto_decompress=False, timeout=timeout, headers=headers, trust_env=True) as session:
45-
# loop below will fetch data slices if not cached already concurrently up to the conecurrency limit
46-
while offset < minutes_diff:
47-
if len(fetch_data_tasks) >= FETCH_DATA_CONCURRENCY_LIMIT:
48-
# if there are going to be more pending fetch data downloads than concurrency limit
49-
# wait before adding another one
50-
done, fetch_data_tasks = await asyncio.wait(fetch_data_tasks, return_when=asyncio.FIRST_COMPLETED)
51-
# need to check the result that may throw if task finished with an error
52-
done.pop().result()
53-
54-
fetch_data_tasks.add(
55-
asyncio.create_task(
56-
_fetch_data_if_not_cached(session, endpoint, cache_dir, exchange, from_date, offset, filters, http_proxy)
67+
try:
68+
# loop below will fetch data slices if not cached already concurrently up to the adaptive limit
69+
while offset < minutes_diff:
70+
while len(fetch_data_tasks) >= ac.limit:
71+
# drain until in-flight count is below the current adaptive limit
72+
done, fetch_data_tasks = await asyncio.wait(fetch_data_tasks, return_when=asyncio.FIRST_COMPLETED)
73+
# retrieve all results so no "exception was never retrieved" warnings
74+
first_error = None
75+
for task in done:
76+
try:
77+
task.result()
78+
except Exception as ex:
79+
if first_error is None:
80+
first_error = ex
81+
ac.on_success()
82+
if first_error is not None:
83+
raise first_error
84+
85+
fetch_data_tasks.add(
86+
asyncio.create_task(
87+
_fetch_data_if_not_cached(session, endpoint, cache_dir, exchange, from_date, offset, filters, http_proxy, ac)
88+
)
5789
)
58-
)
59-
offset += 1
60-
61-
# finally wait for the remaining fetch data download tasks
62-
await asyncio.gather(*fetch_data_tasks)
90+
offset += 1
91+
92+
# finally wait for the remaining fetch data download tasks
93+
await asyncio.gather(*fetch_data_tasks)
94+
except BaseException:
95+
# cancel all pending tasks so they don't keep making requests
96+
for task in fetch_data_tasks:
97+
task.cancel()
98+
# await them to suppress "Task was destroyed but it is pending" warnings
99+
await asyncio.gather(*fetch_data_tasks, return_exceptions=True)
100+
raise
63101

64102
end_time = time()
65103

@@ -73,19 +111,19 @@ async def fetch_data_to_replay(exchange, from_date, to_date, filters, endpoint,
73111
)
74112

75113

76-
async def _fetch_data_if_not_cached(session, endpoint, cache_dir, exchange, from_date, offset, filters, http_proxy):
114+
async def _fetch_data_if_not_cached(session, endpoint, cache_dir, exchange, from_date, offset, filters, http_proxy, ac):
77115
slice_date = from_date + timedelta(seconds=offset * 60)
78116
cache_path = get_slice_cache_path(cache_dir, exchange, slice_date, filters)
79117

80118
# fetch and cache slice only if it's not cached already
81119
if os.path.isfile(cache_path) == False:
82-
await _reliably_fetch_and_cache_slice(session, endpoint, exchange, from_date, offset, filters, cache_path, http_proxy)
120+
await _reliably_fetch_and_cache_slice(session, endpoint, exchange, from_date, offset, filters, cache_path, http_proxy, ac)
83121
logger.debug("fetched data slice for date %s from the API and cached - %s", slice_date, cache_path)
84122
else:
85123
logger.debug("data slice for date %s already in local cache - %s", slice_date, cache_path)
86124

87125

88-
async def _reliably_fetch_and_cache_slice(session, endpoint, exchange, from_date, offset, filters, cache_path, http_proxy):
126+
async def _reliably_fetch_and_cache_slice(session, endpoint, exchange, from_date, offset, filters, cache_path, http_proxy, ac):
89127
fetch_url = f"{endpoint}/v1/data-feeds/{exchange}?from={from_date.isoformat()}&offset={offset}"
90128

91129
if filters is not None and len(filters) > 0:
@@ -118,6 +156,7 @@ async def _reliably_fetch_and_cache_slice(session, endpoint, exchange, from_date
118156
raise ex
119157
if ex.code == 429:
120158
too_many_requests = True
159+
ac.on_throttle()
121160

122161
random_ingridient = random.random()
123162
attempts_delay = 2 ** attempts
@@ -142,15 +181,22 @@ async def _fetch_and_cache_slice(session, url, cache_path, http_proxy):
142181
# ensure that directory where we want to cache data slice exists
143182
pathlib.Path(cache_path).parent.mkdir(parents=True, exist_ok=True)
144183
temp_cache_path = f"{cache_path}{secrets.token_hex(8)}.unconfirmed"
145-
# write response stream to unconfirmed temp file
146-
async with aiofiles.open(temp_cache_path, "wb") as temp_file:
147-
async for data in response.content.iter_any():
148-
await temp_file.write(data)
149-
150-
# rename temp file to desired name only if file has been fully and successfully saved
151-
# it there is an error during renaming file it means that target file aready exists
152-
# and we're fine as only successfully save files exist
153184
try:
154-
os.rename(temp_cache_path, cache_path)
155-
except Exception as ex:
156-
logger.debug("_fetch_and_cache_slice rename error: %s", ex)
185+
# write response stream to unconfirmed temp file
186+
async with aiofiles.open(temp_cache_path, "wb") as temp_file:
187+
async for data in response.content.iter_any():
188+
await temp_file.write(data)
189+
190+
# atomically replace temp file with the final cache path
191+
try:
192+
os.replace(temp_cache_path, cache_path)
193+
except OSError as ex:
194+
# if another task already wrote this file, that's fine
195+
if os.path.isfile(cache_path):
196+
logger.debug("_fetch_and_cache_slice rename skipped, file already exists: %s", ex)
197+
else:
198+
raise
199+
finally:
200+
# cleanup partial temp file on cancellation or error
201+
if os.path.exists(temp_cache_path):
202+
os.remove(temp_cache_path)

tardis_client/tardis_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ async def reconstruct_market(
159159
yield market_response
160160

161161
def clear_cache(self):
162-
shutil.rmtree(self.cache_dir)
162+
shutil.rmtree(self.cache_dir, ignore_errors=True)
163163

164164
def _validate_payload(self, exchange, from_date, to_date, filters):
165165
if from_date is None or self._try_parse_as_iso_date(from_date) is False:

0 commit comments

Comments
 (0)