44from dataclasses import dataclass , field
55
66import grpc
7- from anyio import connect_unix , create_task_group
7+ from anyio import connect_unix , create_memory_object_stream , create_task_group , sleep
88from google .protobuf import empty_pb2
99from jumpstarter_protocol import (
1010 jumpstarter_pb2 ,
@@ -38,9 +38,12 @@ async def __aexit__(self, exc_type, exc_value, traceback):
3838 )
3939
4040 async def __handle (self , path , endpoint , token , tls_config , grpc_options ):
41- async with await connect_unix (path ) as stream :
42- async with connect_router_stream (endpoint , token , stream , tls_config , grpc_options ):
43- pass
41+ try :
42+ async with await connect_unix (path ) as stream :
43+ async with connect_router_stream (endpoint , token , stream , tls_config , grpc_options ):
44+ pass
45+ except Exception as e :
46+ logger .info ("failed to handle connection: {}" .format (e ))
4447
4548 @asynccontextmanager
4649 async def session (self ):
@@ -65,23 +68,71 @@ async def session(self):
6568 yield path
6669
6770 async def handle (self , lease_name , tg ):
68- controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
6971 logger .info ("Listening for incoming connection requests on lease %s" , lease_name )
72+
73+ listen_tx , listen_rx = create_memory_object_stream ()
74+
75+ async def listen (retries = 5 , backoff = 3 ):
76+ retries_left = retries
77+ while True :
78+ try :
79+ controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
80+ async for request in controller .Listen (jumpstarter_pb2 .ListenRequest (lease_name = lease_name )):
81+ await listen_tx .send (request )
82+ except Exception as e :
83+ if retries_left > 0 :
84+ retries_left -= 1
85+ logger .info (
86+ "Listen stream interrupted, restarting in {}s, {} retries left: {}" .format (
87+ backoff , retries_left , e
88+ )
89+ )
90+ await sleep (backoff )
91+ else :
92+ raise
93+ else :
94+ retries_left = retries
95+
96+ tg .start_soon (listen )
97+
7098 async with self .session () as path :
71- async for request in controller . Listen ( jumpstarter_pb2 . ListenRequest ( lease_name = lease_name )) :
99+ async for request in listen_rx :
72100 logger .info ("Handling new connection request on lease %s" , lease_name )
73101 tg .start_soon (
74102 self .__handle , path , request .router_endpoint , request .router_token , self .tls , self .grpc_options
75103 )
76104
77- async def serve (self ):
78- controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
105+ async def serve (self ): # noqa: C901
79106 # initial registration
80107 async with self .session ():
81108 pass
82109 started = False
110+ status_tx , status_rx = create_memory_object_stream ()
111+
112+ async def status (retries = 5 , backoff = 3 ):
113+ retries_left = retries
114+ while True :
115+ try :
116+ controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
117+ async for status in controller .Status (jumpstarter_pb2 .StatusRequest ()):
118+ await status_tx .send (status )
119+ except Exception as e :
120+ if retries_left > 0 :
121+ retries_left -= 1
122+ logger .info (
123+ "Status stream interrupted, restarting in {}s, {} retries left: {}" .format (
124+ backoff , retries_left , e
125+ )
126+ )
127+ await sleep (backoff )
128+ else :
129+ raise
130+ else :
131+ retries_left = retries
132+
83133 async with create_task_group () as tg :
84- async for status in controller .Status (jumpstarter_pb2 .StatusRequest ()):
134+ tg .start_soon (status )
135+ async for status in status_rx :
85136 if self .lease_name != "" and self .lease_name != status .lease_name :
86137 self .lease_name = status .lease_name
87138 logger .info ("Lease status changed, killing existing connections" )
0 commit comments