Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit f9dbbad

Browse files
mangelajoNickCao
authored andcommitted
Allow grpc option tweaking, i.e. keepalive settings
1 parent ab363e6 commit f9dbbad

9 files changed

Lines changed: 46 additions & 16 deletions

File tree

docs/source/cli/clients.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ metadata:
3232
name: john
3333
endpoint: grpc.jumpstarter.192.168.1.10.nip.io:8082
3434
token: <<token>>
35+
grpcConfig:
36+
# please refer to the https://grpc.github.io/grpc/core/group__grpc__arg__keys.html documentation
37+
grpc.keepalive_time_ms: 20000
3538
tls:
3639
ca: ''
3740
insecure: False

docs/source/introduction/exporters.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ metadata:
3030
name: demo
3131
endpoint: grpc.jumpstarter.example.com:443
3232
token: xxxxx
33+
grpcConfig:
34+
# Please refer to the https://grpc.github.io/grpc/core/group__grpc__arg__keys.html documentation
35+
grpc.keepalive_time_ms: 20000
3336
export:
3437
power:
3538
type: jumpstarter_driver_yepkit.driver.Ykush

packages/jumpstarter/jumpstarter/client/lease.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
)
99
from dataclasses import dataclass, field
1010
from datetime import datetime, timedelta
11+
from typing import Any
1112

1213
from anyio import create_task_group, fail_after, sleep
1314
from anyio.from_thread import BlockingPortal
@@ -39,6 +40,7 @@ class Lease(AbstractContextManager, AbstractAsyncContextManager):
3940
release: bool = True # release on contexts exit
4041
controller: jumpstarter_pb2_grpc.ControllerServiceStub = field(init=False)
4142
tls_config: TLSConfigV1Alpha1 = field(default_factory=TLSConfigV1Alpha1)
43+
grpc_options: dict[str, Any] = field(default_factory=dict)
4244

4345
def __post_init__(self):
4446
if hasattr(super(), "__post_init__"):
@@ -149,7 +151,9 @@ def __exit__(self, exc_type, exc_value, traceback):
149151
async def handle_async(self, stream):
150152
logger.debug("Connecting to Lease with name %s", self.name)
151153
response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name))
152-
async with connect_router_stream(response.router_endpoint, response.router_token, stream, self.tls_config):
154+
async with connect_router_stream(
155+
response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options
156+
):
153157
pass
154158

155159
@asynccontextmanager

packages/jumpstarter/jumpstarter/common/grpc.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import socket
44
import ssl
55
from contextlib import contextmanager
6+
from typing import Any, Sequence, Tuple
67
from urllib.parse import urlparse
78

89
import grpc
@@ -34,20 +35,28 @@ def ssl_channel_credentials(target: str, tls_config):
3435
return grpc.ssl_channel_credentials()
3536

3637

37-
def aio_secure_channel(target: str, credentials: grpc.ChannelCredentials):
38+
def aio_secure_channel(target: str, credentials: grpc.ChannelCredentials, grpc_options: dict[str, Any] | None):
3839
return grpc.aio.secure_channel(
3940
target,
4041
credentials,
41-
options=(
42-
("grpc.lb_policy_name", "round_robin"),
43-
("grpc.keepalive_time_ms", 350000),
44-
("grpc.keepalive_timeout_ms", 5000),
45-
("grpc.http2.max_pings_without_data", 5),
46-
("grpc.keepalive_permit_without_calls", 1),
47-
),
42+
options=_override_default_grpc_options(grpc_options),
4843
)
4944

5045

46+
def _override_default_grpc_options(grpc_options: dict[str, str | int] | None) -> Sequence[Tuple[str, Any]]:
47+
defaults = (
48+
("grpc.lb_policy_name", "round_robin"),
49+
# we keep a low keepalive time to avoid idle timeouts on cloud load balancers
50+
("grpc.keepalive_time_ms", 20000),
51+
("grpc.keepalive_timeout_ms", 5000),
52+
("grpc.http2.max_pings_without_data", 5),
53+
("grpc.keepalive_permit_without_calls", 1),
54+
)
55+
options = dict(defaults)
56+
options.update(grpc_options or {})
57+
return tuple(options.items())
58+
59+
5160
def configure_grpc_env():
5261
# disable informative logs by default, i.e.:
5362
# WARNING: All log messages before absl::InitializeLog() is called are written to STDERR

packages/jumpstarter/jumpstarter/common/streams.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ class StreamRequestMetadata(BaseModel):
3232

3333

3434
@asynccontextmanager
35-
async def connect_router_stream(endpoint, token, stream, tls_config):
35+
async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options):
3636
credentials = grpc.composite_channel_credentials(
3737
ssl_channel_credentials(endpoint, tls_config),
3838
grpc.access_token_call_credentials(token),
3939
)
4040

41-
async with aio_secure_channel(endpoint, credentials) as channel:
41+
async with aio_secure_channel(endpoint, credentials, grpc_options) as channel:
4242
router = router_pb2_grpc.RouterServiceStub(channel)
4343
context = router.Stream(metadata=())
4444
async with RouterStream(context=context) as s:

packages/jumpstarter/jumpstarter/config/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ClientConfigV1Alpha1(BaseModel):
4848
endpoint: str
4949
tls: TLSConfigV1Alpha1 = Field(default_factory=TLSConfigV1Alpha1)
5050
token: str
51+
grpcOptions: dict[str, str | int] | None = Field(default_factory=dict)
5152

5253
drivers: ClientConfigV1Alpha1Drivers
5354

@@ -57,7 +58,7 @@ async def channel(self):
5758
call_credentials("Client", self.metadata, self.token),
5859
)
5960

60-
return aio_secure_channel(self.endpoint, credentials)
61+
return aio_secure_channel(self.endpoint, credentials, self.grpcOptions)
6162

6263
@contextmanager
6364
def lease(self, metadata_filter: MetadataFilter, lease_name: str | None = None):
@@ -122,6 +123,7 @@ async def request_lease_async(
122123
allow=self.drivers.allow,
123124
unsafe=self.drivers.unsafe,
124125
tls_config=self.tls,
126+
grpc_options=self.grpcOptions,
125127
)
126128
with translate_grpc_exceptions():
127129
return await lease.request_async()
@@ -161,6 +163,7 @@ async def lease_async(
161163
unsafe=self.drivers.unsafe,
162164
release=release_lease,
163165
tls_config=self.tls,
166+
grpc_options=self.grpcOptions,
164167
) as lease:
165168
yield lease
166169

packages/jumpstarter/jumpstarter/config/client_config_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ def test_client_config_save(monkeypatch: pytest.MonkeyPatch):
206206
ca: ''
207207
insecure: false
208208
token: dGhpc2lzYXRva2VuLTEyMzQxMjM0MTIzNEyMzQtc2Rxd3Jxd2VycXdlcnF3ZXJxd2VyLTEyMzQxMjM0MTIz
209+
grpcOptions: {}
209210
drivers:
210211
allow:
211212
- jumpstarter.drivers.*
@@ -241,6 +242,7 @@ def test_client_config_save_explicit_path():
241242
ca: ''
242243
insecure: false
243244
token: dGhpc2lzYXRva2VuLTEyMzQxMjM0MTIzNEyMzQtc2Rxd3Jxd2VycXdlcnF3ZXJxd2VyLTEyMzQxMjM0MTIz
245+
grpcOptions: {}
244246
drivers:
245247
allow:
246248
- jumpstarter.drivers.*
@@ -274,6 +276,7 @@ def test_client_config_save_unsafe_drivers():
274276
ca: ''
275277
insecure: false
276278
token: dGhpc2lzYXRva2VuLTEyMzQxMjM0MTIzNEyMzQtc2Rxd3Jxd2VycXdlcnF3ZXJxd2VyLTEyMzQxMjM0MTIz
279+
grpcOptions: {}
277280
drivers:
278281
allow: []
279282
unsafe: true

packages/jumpstarter/jumpstarter/config/exporter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class ExporterConfigV1Alpha1(BaseModel):
8181
endpoint: str
8282
tls: TLSConfigV1Alpha1 = Field(default_factory=TLSConfigV1Alpha1)
8383
token: str
84+
grpcOptions: dict[str, str | int] | None = Field(default_factory=dict)
8485

8586
export: dict[str, ExporterConfigV1Alpha1DriverInstance] = Field(default_factory=dict)
8687

@@ -163,12 +164,13 @@ def channel_factory():
163164
ssl_channel_credentials(self.endpoint, self.tls),
164165
call_credentials("Exporter", self.metadata, self.token),
165166
)
166-
return aio_secure_channel(self.endpoint, credentials)
167+
return aio_secure_channel(self.endpoint, credentials, self.grpcOptions)
167168

168169
async with Exporter(
169170
channel_factory=channel_factory,
170171
device_factory=ExporterConfigV1Alpha1DriverInstance(children=self.export).instantiate,
171172
tls=self.tls,
173+
grpc_options=self.grpcOptions,
172174
) as exporter:
173175
await exporter.serve()
174176

packages/jumpstarter/jumpstarter/exporter/exporter.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class Exporter(AbstractAsyncContextManager, Metadata):
2626
device_factory: Callable[[], Driver]
2727
lease_name: str = field(init=False, default="")
2828
tls: TLSConfigV1Alpha1 = field(default_factory=TLSConfigV1Alpha1)
29+
grpc_options: dict[str, str] = field(default_factory=dict)
2930

3031
async def __aexit__(self, exc_type, exc_value, traceback):
3132
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())
@@ -36,9 +37,9 @@ async def __aexit__(self, exc_type, exc_value, traceback):
3637
)
3738
)
3839

39-
async def __handle(self, path, endpoint, token, tls_config):
40+
async def __handle(self, path, endpoint, token, tls_config, grpc_options):
4041
async with await connect_unix(path) as stream:
41-
async with connect_router_stream(endpoint, token, stream, tls_config):
42+
async with connect_router_stream(endpoint, token, stream, tls_config, grpc_options):
4243
pass
4344

4445
@asynccontextmanager
@@ -69,7 +70,9 @@ async def handle(self, lease_name, tg):
6970
async with self.session() as path:
7071
async for request in controller.Listen(jumpstarter_pb2.ListenRequest(lease_name=lease_name)):
7172
logger.info("Handling new connection request on lease %s", lease_name)
72-
tg.start_soon(self.__handle, path, request.router_endpoint, request.router_token, self.tls)
73+
tg.start_soon(
74+
self.__handle, path, request.router_endpoint, request.router_token, self.tls, self.grpc_options
75+
)
7376

7477
async def serve(self):
7578
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())

0 commit comments

Comments
 (0)