Skip to content

Commit 87eb82f

Browse files
feat: Add support for Measurement Remote Execution by populating deployment_target (#1194)
* feat: implement client API for EnumerateComputeNodes * feat: populate deployment_target with remote node Url * tests: add automation tests for EnumerateComputeNodes API * fix: automated tests * fix: handle unimplemented exception for EnumerateComputeNodes * doc: rephrase the info comment --------- Co-authored-by: Avinash2 Suresh <avinash2.suresh@emerson.com>
1 parent 3d23777 commit 87eb82f

8 files changed

Lines changed: 142 additions & 4 deletions

File tree

packages/service/ni_measurement_plugin_sdk_service/_drivers/_grpcdevice.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ def get_grpc_device_server_location(
3636
)
3737
return service_location
3838

39+
compute_nodes = discovery_client.enumerate_compute_nodes()
40+
remote_compute_nodes = [node for node in compute_nodes if not node.is_local]
41+
# Use remote node URL as deployment target if only one remote node is found.
42+
# If more than one remote node exists, use empty string for deployment target.
43+
first_remote_node_url = remote_compute_nodes[0].url if len(remote_compute_nodes) == 1 else ""
3944
service_location = discovery_client.resolve_service(
4045
provided_interface=provided_interface,
46+
deployment_target=first_remote_node_url,
4147
service_class=SERVICE_CLASS,
4248
)
4349
_logger.debug(
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
"""Public API for accessing the NI Discovery Service."""
22

33
from ni_measurement_plugin_sdk_service.discovery._client import DiscoveryClient
4-
from ni_measurement_plugin_sdk_service.discovery._types import ServiceLocation
4+
from ni_measurement_plugin_sdk_service.discovery._types import (
5+
ComputeNodeDescriptor,
6+
ServiceLocation,
7+
)
58

6-
__all__ = ["DiscoveryClient", "ServiceLocation"]
9+
__all__ = ["DiscoveryClient", "ServiceLocation", "ComputeNodeDescriptor"]

packages/service/ni_measurement_plugin_sdk_service/discovery/_client.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from ni_measurement_plugin_sdk_service.discovery._support import (
2020
_get_discovery_service_address,
2121
)
22-
from ni_measurement_plugin_sdk_service.discovery._types import ServiceLocation
22+
from ni_measurement_plugin_sdk_service.discovery._types import (
23+
ComputeNodeDescriptor,
24+
ServiceLocation,
25+
)
2326
from ni_measurement_plugin_sdk_service.grpc.channelpool import GrpcChannelPool
2427
from ni_measurement_plugin_sdk_service.measurement.info import (
2528
MeasurementInfo,
@@ -304,3 +307,20 @@ def enumerate_services(self, provided_interface: str) -> Sequence[ServiceInfo]:
304307
response = self._get_stub().EnumerateServices(request)
305308

306309
return [ServiceInfo._from_grpc(service) for service in response.available_services]
310+
311+
def enumerate_compute_nodes(self) -> Sequence[ComputeNodeDescriptor]:
312+
"""Enumerates all the compute nodes registered with the discovery service.
313+
314+
Returns:
315+
The list of information describing the compute nodes.
316+
"""
317+
request = discovery_service_pb2.EnumerateComputeNodesRequest()
318+
319+
try:
320+
response = self._get_stub().EnumerateComputeNodes(request)
321+
except grpc.RpcError as e:
322+
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
323+
return []
324+
raise
325+
326+
return [ComputeNodeDescriptor._from_grpc(node) for node in response.compute_nodes]

packages/service/ni_measurement_plugin_sdk_service/discovery/_types.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,22 @@ def _from_grpc(cls, other: discovery_service_pb2.ServiceLocation) -> ServiceLoca
3333
insecure_port=other.insecure_port,
3434
ssl_authenticated_port=other.ssl_authenticated_port,
3535
)
36+
37+
38+
class ComputeNodeDescriptor(typing.NamedTuple):
39+
"""Represents a compute node."""
40+
41+
url: str
42+
"""The resolvable name (URL) of the compute node."""
43+
44+
is_local: bool
45+
"""Indicates whether the compute node is local node."""
46+
47+
@classmethod
48+
def _from_grpc(
49+
cls, other: discovery_service_pb2.ComputeNodeDescriptor
50+
) -> ComputeNodeDescriptor:
51+
return ComputeNodeDescriptor(
52+
url=other.url,
53+
is_local=other.is_local,
54+
)

packages/service/ni_measurement_plugin_sdk_service/pin_map/_client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,16 @@ def _get_stub(self) -> pin_map_service_pb2_grpc.PinMapServiceStub:
6060
grpc_channel_pool=self._grpc_channel_pool
6161
)
6262
if self._stub is None:
63+
compute_nodes = self._discovery_client.enumerate_compute_nodes()
64+
remote_compute_nodes = [node for node in compute_nodes if not node.is_local]
65+
# Use remote node URL as deployment target if only one remote node is found.
66+
# If more than one remote node exists, use empty string for deployment target.
67+
first_remote_node_url = (
68+
remote_compute_nodes[0].url if len(remote_compute_nodes) == 1 else ""
69+
)
6370
service_location = self._discovery_client.resolve_service(
6471
provided_interface=GRPC_SERVICE_INTERFACE_NAME,
72+
deployment_target=first_remote_node_url,
6573
service_class=GRPC_SERVICE_CLASS,
6674
)
6775
channel = self._grpc_channel_pool.get_channel(service_location.insecure_address)

packages/service/ni_measurement_plugin_sdk_service/session_management/_client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,16 @@ def _get_stub(self) -> session_management_service_pb2_grpc.SessionManagementServ
7575
grpc_channel_pool=self._grpc_channel_pool
7676
)
7777
if self._stub is None:
78+
compute_nodes = self._discovery_client.enumerate_compute_nodes()
79+
remote_compute_nodes = [node for node in compute_nodes if not node.is_local]
80+
# Use remote node URL as deployment target if only one remote node is found.
81+
# If more than one remote node exists, use empty string for deployment target.
82+
first_remote_node_url = (
83+
remote_compute_nodes[0].url if len(remote_compute_nodes) == 1 else ""
84+
)
7885
service_location = self._discovery_client.resolve_service(
7986
provided_interface=GRPC_SERVICE_INTERFACE_NAME,
87+
deployment_target=first_remote_node_url,
8088
service_class=GRPC_SERVICE_CLASS,
8189
)
8290
channel = self._grpc_channel_pool.get_channel(service_location.insecure_address)

packages/service/tests/unit/_drivers/test_grpcdevice.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def test___default_configuration___get_grpc_device_server_location___resolves_se
2323

2424
discovery_client.resolve_service.assert_called_with(
2525
provided_interface=fake_driver.GRPC_SERVICE_INTERFACE_NAME,
26+
deployment_target="",
2627
service_class=SERVICE_CLASS,
2728
)
2829
assert service_location == ServiceLocation("localhost", "1234", "")
@@ -136,6 +137,7 @@ def test___default_configuration___get_insecure_grpc_device_server_channel___res
136137

137138
discovery_client.resolve_service.assert_called_with(
138139
provided_interface=fake_driver.GRPC_SERVICE_INTERFACE_NAME,
140+
deployment_target="",
139141
service_class=SERVICE_CLASS,
140142
)
141143
grpc_channel_pool.get_channel.assert_called_with("localhost:1234")

packages/service/tests/unit/test_discovery_client.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
SERVICE_PROGRAMMINGLANGUAGE_KEY,
2020
)
2121
from ni_measurement_plugin_sdk_service._internal.stubs.ni.measurementlink.discovery.v1.discovery_service_pb2 import (
22+
ComputeNodeDescriptor as GrpcComputeNodeDescriptor,
23+
EnumerateComputeNodesResponse,
2224
EnumerateServicesRequest,
2325
EnumerateServicesResponse,
2426
RegisterServiceRequest,
@@ -34,7 +36,11 @@
3436
from ni_measurement_plugin_sdk_service._internal.stubs.ni.measurementlink.discovery.v1.discovery_service_pb2_grpc import (
3537
DiscoveryServiceStub,
3638
)
37-
from ni_measurement_plugin_sdk_service.discovery import DiscoveryClient, ServiceLocation
39+
from ni_measurement_plugin_sdk_service.discovery import (
40+
ComputeNodeDescriptor,
41+
DiscoveryClient,
42+
ServiceLocation,
43+
)
3844
from ni_measurement_plugin_sdk_service.discovery._support import (
3945
_get_discovery_service_address,
4046
_open_key_file,
@@ -459,6 +465,71 @@ def test___no_registered_measurements___resolve_service_with_information___raise
459465
assert exc_info.value.code() == grpc.StatusCode.NOT_FOUND
460466

461467

468+
def test___registered_compute_node___enumerate_compute_nodes___returns_node(
469+
discovery_client: DiscoveryClient, discovery_service_stub: Mock
470+
):
471+
expected_node = ComputeNodeDescriptor(url="http://remotehost:42000", is_local=False)
472+
discovery_service_stub.EnumerateComputeNodes.return_value = EnumerateComputeNodesResponse(
473+
compute_nodes=[
474+
GrpcComputeNodeDescriptor(url=expected_node.url, is_local=expected_node.is_local),
475+
]
476+
)
477+
478+
compute_nodes = discovery_client.enumerate_compute_nodes()
479+
480+
discovery_service_stub.EnumerateComputeNodes.assert_called_once()
481+
assert compute_nodes[0] == expected_node
482+
483+
484+
def test___multiple_registered_compute_nodes___enumerate_compute_nodes___returns_all_nodes(
485+
discovery_client: DiscoveryClient, discovery_service_stub: Mock
486+
):
487+
expected_nodes = [
488+
ComputeNodeDescriptor(url="http://localhost:42000", is_local=True),
489+
ComputeNodeDescriptor(url="http://remotehost:42001", is_local=False),
490+
]
491+
discovery_service_stub.EnumerateComputeNodes.return_value = EnumerateComputeNodesResponse(
492+
compute_nodes=[
493+
GrpcComputeNodeDescriptor(
494+
url=expected_nodes[0].url, is_local=expected_nodes[0].is_local
495+
),
496+
GrpcComputeNodeDescriptor(
497+
url=expected_nodes[1].url, is_local=expected_nodes[1].is_local
498+
),
499+
]
500+
)
501+
502+
compute_nodes = discovery_client.enumerate_compute_nodes()
503+
504+
discovery_service_stub.EnumerateComputeNodes.assert_called_once()
505+
assert compute_nodes == expected_nodes
506+
507+
508+
def test___no_registered_compute_nodes___enumerate_compute_nodes___returns_empty_list(
509+
discovery_client: DiscoveryClient, discovery_service_stub: Mock
510+
):
511+
discovery_service_stub.EnumerateComputeNodes.return_value = EnumerateComputeNodesResponse()
512+
513+
compute_nodes = discovery_client.enumerate_compute_nodes()
514+
515+
discovery_service_stub.EnumerateComputeNodes.assert_called_once()
516+
assert compute_nodes == []
517+
518+
519+
def test___enumerate_compute_nodes___grpc_error___raises_rpc_error(
520+
discovery_client: DiscoveryClient, discovery_service_stub: Mock
521+
):
522+
discovery_service_stub.EnumerateComputeNodes.side_effect = FakeRpcError(
523+
grpc.StatusCode.UNAVAILABLE, details="Test service unavailable"
524+
)
525+
526+
with pytest.raises(grpc.RpcError) as exc_info:
527+
discovery_client.enumerate_compute_nodes()
528+
529+
discovery_service_stub.EnumerateComputeNodes.assert_called_once()
530+
assert exc_info.value.code() == grpc.StatusCode.UNAVAILABLE
531+
532+
462533
@pytest.fixture(scope="module")
463534
def subprocess_popen_kwargs() -> dict[str, Any]:
464535
kwargs: dict[str, Any] = {}
@@ -488,6 +559,7 @@ def discovery_service_stub(mocker: MockerFixture) -> Mock:
488559
stub.EnumerateServices = mocker.create_autospec(grpc.UnaryUnaryMultiCallable)
489560
stub.ResolveService = mocker.create_autospec(grpc.UnaryUnaryMultiCallable)
490561
stub.ResolveServiceWithInformation = mocker.create_autospec(grpc.UnaryUnaryMultiCallable)
562+
stub.EnumerateComputeNodes = mocker.create_autospec(grpc.UnaryUnaryMultiCallable)
491563
return stub
492564

493565

0 commit comments

Comments
 (0)