|
6 | 6 |
|
7 | 7 | import click |
8 | 8 | import grpc |
9 | | -from ni_measurementlink_service._internal.stubs.ni.measurementlink.pinmap.v1 import ( |
10 | | - pin_map_service_pb2, |
11 | | - pin_map_service_pb2_grpc, |
12 | | -) |
13 | 9 | from ni_measurementlink_service.discovery import DiscoveryClient |
14 | 10 | from ni_measurementlink_service.measurement.service import GrpcChannelPool |
15 | 11 |
|
16 | 12 |
|
17 | | -class PinMapClient(object): |
18 | | - """Class that communicates with the pin map service.""" |
19 | | - |
20 | | - def __init__(self, *, grpc_channel: grpc.Channel): |
21 | | - """Initialize pin map client.""" |
22 | | - self._client: pin_map_service_pb2_grpc.PinMapServiceStub = ( |
23 | | - pin_map_service_pb2_grpc.PinMapServiceStub(grpc_channel) |
24 | | - ) |
25 | | - |
26 | | - def update_pin_map(self, pin_map_path: str) -> str: |
27 | | - """Update registered pin map contents. |
28 | | -
|
29 | | - Create and register a pin map if a pin map resource for the specified pin map id is not |
30 | | - found. |
31 | | -
|
32 | | - Args: |
33 | | - pin_map_path: The file path of the pin map to register as a pin map resource. |
34 | | -
|
35 | | - Returns: |
36 | | - The resource id of the pin map that is registered to the pin map service. |
37 | | - """ |
38 | | - pin_map_path_obj = pathlib.Path(pin_map_path) |
39 | | - # By convention, the pin map id is the .pinmap file path. |
40 | | - request = pin_map_service_pb2.UpdatePinMapFromXmlRequest( |
41 | | - pin_map_id=pin_map_path, pin_map_xml=pin_map_path_obj.read_text(encoding="utf-8") |
42 | | - ) |
43 | | - response: pin_map_service_pb2.PinMap = self._client.UpdatePinMapFromXml(request) |
44 | | - return response.pin_map_id |
45 | | - |
46 | | - |
47 | 13 | class GrpcChannelPoolHelper(GrpcChannelPool): |
48 | 14 | """Class that manages gRPC channel lifetimes.""" |
49 | 15 |
|
@@ -77,26 +43,15 @@ def __init__(self, sequence_context: Any) -> None: |
77 | 43 | self._sequence_context = sequence_context |
78 | 44 |
|
79 | 45 | def get_active_pin_map_id(self) -> str: |
80 | | - """Get the active pin map id from the NI.MeasurementLink.PinMapId temporary global variable. |
| 46 | + """Get the active pin map id from the NI.MeasurementLink.PinMapId runtime variable. |
81 | 47 |
|
82 | 48 | Returns: |
83 | 49 | The resource id of the pin map that is registered to the pin map service. |
84 | 50 | """ |
85 | | - return self._sequence_context.Engine.TemporaryGlobals.GetValString( |
| 51 | + return self._sequence_context.Execution.RunTimeVariables.GetValString( |
86 | 52 | "NI.MeasurementLink.PinMapId", 0x0 |
87 | 53 | ) |
88 | 54 |
|
89 | | - def set_active_pin_map_id(self, pin_map_id: str) -> None: |
90 | | - """Set the NI.MeasurementLink.PinMapId temporary global variable to the specified id. |
91 | | -
|
92 | | - Args: |
93 | | - pin_map_id: |
94 | | - The resource id of the pin map that is registered to the pin map service. |
95 | | - """ |
96 | | - self._sequence_context.Engine.TemporaryGlobals.SetValString( |
97 | | - "NI.MeasurementLink.PinMapId", 0x1, pin_map_id |
98 | | - ) |
99 | | - |
100 | 55 | def resolve_file_path(self, file_path: str) -> str: |
101 | 56 | """Resolve the absolute path to a file using the TestStand search directories. |
102 | 57 |
|
|
0 commit comments