Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit 856239f

Browse files
authored
Merge pull request #378 from jumpstarter-dev/keepalive
Make ssl_channel_credentials async
2 parents dbc50d2 + eb290db commit 856239f

5 files changed

Lines changed: 15 additions & 10 deletions

File tree

packages/jumpstarter/jumpstarter/common/grpc.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
from urllib.parse import urlparse
88

99
import grpc
10+
from anyio import fail_after
11+
from anyio.to_thread import run_sync
1012

1113
from jumpstarter.common.exceptions import ConfigurationError, ConnectionError
1214

1315

14-
def ssl_channel_credentials(target: str, tls_config):
16+
async def ssl_channel_credentials(target: str, tls_config, timeout=5):
1517
configure_grpc_env()
1618
if tls_config.insecure or os.getenv("JUMPSTARTER_GRPC_INSECURE") == "1":
1719
try:
@@ -21,12 +23,15 @@ def ssl_channel_credentials(target: str, tls_config):
2123
raise ConfigurationError(f"Failed parsing {target}") from e
2224

2325
try:
24-
root_certificates = ssl.get_server_certificate((parsed.hostname, port))
26+
with fail_after(timeout):
27+
root_certificates = await run_sync(ssl.get_server_certificate, (parsed.hostname, port))
2528
return grpc.ssl_channel_credentials(root_certificates=root_certificates.encode())
2629
except socket.gaierror as e:
2730
raise ConnectionError(f"Failed resolving {parsed.hostname}") from e
2831
except ConnectionRefusedError as e:
2932
raise ConnectionError(f"Failed connecting to {parsed.hostname}:{port}") from e
33+
except TimeoutError as e:
34+
raise ConnectionError(f"Timeout connecting to {parsed.hostname}:{port}") from e
3035

3136
elif tls_config.ca != "":
3237
ca_certificate = base64.b64decode(tls_config.ca)

packages/jumpstarter/jumpstarter/common/streams.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class StreamRequestMetadata(BaseModel):
3434
@asynccontextmanager
3535
async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options):
3636
credentials = grpc.composite_channel_credentials(
37-
ssl_channel_credentials(endpoint, tls_config),
37+
await ssl_channel_credentials(endpoint, tls_config),
3838
grpc.access_token_call_credentials(token),
3939
)
4040

packages/jumpstarter/jumpstarter/config/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ClientConfigV1Alpha1(BaseModel):
5454

5555
async def channel(self):
5656
credentials = grpc.composite_channel_credentials(
57-
ssl_channel_credentials(self.endpoint, self.tls),
57+
await ssl_channel_credentials(self.endpoint, self.tls),
5858
call_credentials("Client", self.metadata, self.token),
5959
)
6060

packages/jumpstarter/jumpstarter/config/exporter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,9 @@ async def serve(self):
159159
# dynamic import to avoid circular imports
160160
from jumpstarter.exporter import Exporter
161161

162-
def channel_factory():
162+
async def channel_factory():
163163
credentials = grpc.composite_channel_credentials(
164-
ssl_channel_credentials(self.endpoint, self.tls),
164+
await ssl_channel_credentials(self.endpoint, self.tls),
165165
call_credentials("Exporter", self.metadata, self.token),
166166
)
167167
return aio_secure_channel(self.endpoint, credentials, self.grpcOptions)

packages/jumpstarter/jumpstarter/exporter/exporter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class Exporter(AbstractAsyncContextManager, Metadata):
2929
grpc_options: dict[str, str] = field(default_factory=dict)
3030

3131
async def __aexit__(self, exc_type, exc_value, traceback):
32-
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())
32+
controller = jumpstarter_pb2_grpc.ControllerServiceStub(await self.channel_factory())
3333
logger.info("Unregistering exporter with controller")
3434
await controller.Unregister(
3535
jumpstarter_pb2.UnregisterRequest(
@@ -47,7 +47,7 @@ async def __handle(self, path, endpoint, token, tls_config, grpc_options):
4747

4848
@asynccontextmanager
4949
async def session(self):
50-
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())
50+
controller = jumpstarter_pb2_grpc.ControllerServiceStub(await self.channel_factory())
5151
with Session(
5252
uuid=self.uuid,
5353
labels=self.labels,
@@ -76,7 +76,7 @@ async def listen(retries=5, backoff=3):
7676
retries_left = retries
7777
while True:
7878
try:
79-
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())
79+
controller = jumpstarter_pb2_grpc.ControllerServiceStub(await self.channel_factory())
8080
async for request in controller.Listen(jumpstarter_pb2.ListenRequest(lease_name=lease_name)):
8181
await listen_tx.send(request)
8282
except Exception as e:
@@ -113,7 +113,7 @@ async def status(retries=5, backoff=3):
113113
retries_left = retries
114114
while True:
115115
try:
116-
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())
116+
controller = jumpstarter_pb2_grpc.ControllerServiceStub(await self.channel_factory())
117117
async for status in controller.Status(jumpstarter_pb2.StatusRequest()):
118118
await status_tx.send(status)
119119
except Exception as e:

0 commit comments

Comments
 (0)