@@ -208,6 +208,28 @@ class ExecutionTimeLimitExceeded(Exception):
208208 pass
209209
210210
211+ # -----------------------------------------------
212+ # Local setups where public S3 URLs are not directly reachable from workers
213+ # -----------------------------------------------
214+ def rewrite_bundle_url_if_needed (url ):
215+ """
216+ Optionally rewrite presigned bundle URLs for worker networking.
217+
218+ Controlled by env: WORKER_BUNDLE_URL_REWRITE=FROM|TO
219+
220+ Example: http://localhost:9000|http://minio:9000
221+ """
222+ rule = os .getenv ("WORKER_BUNDLE_URL_REWRITE" , "" ).strip ()
223+ if not rule or "|" not in rule :
224+ return url
225+ src , dst = rule .split ("|" , 1 )
226+ if url .startswith (src ):
227+ new_url = dst + url [len (src ):]
228+ logger .info (f"Rewriting bundle URL for worker: { url } -> { new_url } " )
229+ return new_url
230+ return url
231+
232+
211233# -----------------------------------------------------------------------------
212234# The main compute worker entrypoint, this is how a job is ran at the highest
213235# level.
@@ -616,6 +638,7 @@ def _get_bundle(self, url, destination, cache=True):
616638 if download_needed :
617639 try :
618640 # Download the bundle
641+ url = rewrite_bundle_url_if_needed (url )
619642 urlretrieve (url , bundle_file )
620643 except HTTPError :
621644 raise SubmissionException (
@@ -1023,6 +1046,7 @@ def _put_dir(self, url, directory):
10231046
10241047 def _put_file (self , url , file = None , raw_data = None , content_type = "application/zip" ):
10251048 """Send the file in the storage."""
1049+ url = rewrite_bundle_url_if_needed (url )
10261050 if file and raw_data :
10271051 raise Exception ("Cannot put both a file and raw_data" )
10281052
0 commit comments