77 contextmanager ,
88)
99from dataclasses import dataclass , field
10- from datetime import timedelta
10+ from datetime import datetime , timedelta
1111
12- from anyio import fail_after , sleep
12+ from anyio import create_task_group , fail_after , sleep
1313from anyio .from_thread import BlockingPortal
1414from grpc .aio import Channel
1515from jumpstarter_protocol import jumpstarter_pb2 , jumpstarter_pb2_grpc
@@ -62,6 +62,11 @@ async def _create(self):
6262 ).name
6363 logger .info ("Created lease request for selector %s for duration %s" , selector , duration )
6464
65+ async def get (self ):
66+ with translate_grpc_exceptions ():
67+ svc = ClientService (channel = self .channel , namespace = self .namespace )
68+ return await svc .GetLease (name = self .name )
69+
6570 def request (self ):
6671 """Request a lease, or verifies a lease which was already created.
6772
@@ -96,11 +101,7 @@ async def _acquire(self):
96101 with fail_after (300 ): # TODO: configurable timeout
97102 while True :
98103 logger .debug ("Polling Lease %s" , self .name )
99- with translate_grpc_exceptions ():
100- result = await self .svc .GetLease (
101- name = self .name ,
102- )
103-
104+ result = await self .get ()
104105 # lease ready
105106 if condition_true (result .conditions , "Ready" ):
106107 logger .debug ("Lease %s acquired" , self .name )
@@ -156,6 +157,29 @@ async def serve_unix_async(self):
156157 async with TemporaryUnixListener (self .handle_async ) as path :
157158 yield path
158159
160+ @asynccontextmanager
161+ async def monitor_async (self , threshold : timedelta = timedelta (minutes = 5 )):
162+ async def _monitor ():
163+ while True :
164+ lease = await self .get ()
165+ if lease .effective_begin_time :
166+ end_time = lease .effective_begin_time + lease .duration
167+ remain = end_time - datetime .now (tz = datetime .now ().astimezone ().tzinfo )
168+ if remain < threshold :
169+ logger .info ("Lease {} ending soon in {} at {}" .format (self .name , remain , end_time ))
170+ await sleep (threshold .total_seconds ())
171+ else :
172+ await sleep (5 )
173+ else :
174+ await sleep (1 )
175+
176+ async with create_task_group () as tg :
177+ tg .start_soon (_monitor )
178+ try :
179+ yield
180+ finally :
181+ tg .cancel_scope .cancel ()
182+
159183 @asynccontextmanager
160184 async def connect_async (self , stack ):
161185 async with self .serve_unix_async () as path :
@@ -172,3 +196,8 @@ def connect(self):
172196 def serve_unix (self ):
173197 with self .portal .wrap_async_context_manager (self .serve_unix_async ()) as path :
174198 yield path
199+
200+ @contextmanager
201+ def monitor (self , threshold : timedelta = timedelta (minutes = 5 )):
202+ with self .portal .wrap_async_context_manager (self .monitor_async (threshold )):
203+ yield
0 commit comments