|
2 | 2 |
|
3 | 3 | import logging |
4 | 4 | import pathlib |
5 | | -import types |
6 | | -from typing import Any, Callable, List, NamedTuple, Optional, Tuple, TypeVar, Union |
| 5 | +from typing import Any, Callable, TypeVar |
7 | 6 |
|
8 | 7 | import click |
9 | 8 | import grpc |
10 | | -import ni_measurementlink_service as nims |
11 | | -from ni_measurementlink_service import session_management |
12 | 9 | from ni_measurementlink_service._internal.stubs.ni.measurementlink.pinmap.v1 import ( |
13 | 10 | pin_map_service_pb2, |
14 | 11 | pin_map_service_pb2_grpc, |
15 | 12 | ) |
16 | 13 | from ni_measurementlink_service.discovery import DiscoveryClient |
17 | | -from ni_measurementlink_service.measurement.service import ( |
18 | | - GrpcChannelPool, |
19 | | - MeasurementService, |
20 | | -) |
21 | | - |
22 | | - |
23 | | -class ServiceOptions(NamedTuple): |
24 | | - """Service options specified on the command line.""" |
25 | | - |
26 | | - use_grpc_device: bool = False |
27 | | - grpc_device_address: str = "" |
28 | | - |
29 | | - use_simulation: bool = False |
30 | | - |
31 | | - |
32 | | -def get_service_options(**kwargs: Any) -> ServiceOptions: |
33 | | - """Get service options from keyword arguments.""" |
34 | | - return ServiceOptions( |
35 | | - use_grpc_device=kwargs.get("use_grpc_device", False), |
36 | | - grpc_device_address=kwargs.get("grpc_device_address", ""), |
37 | | - use_simulation=kwargs.get("use_simulation", False), |
38 | | - ) |
| 14 | +from ni_measurementlink_service.measurement.service import GrpcChannelPool |
39 | 15 |
|
40 | 16 |
|
41 | 17 | class PinMapClient(object): |
@@ -86,30 +62,6 @@ def pin_map_channel(self) -> grpc.Channel: |
86 | 62 | ).insecure_address |
87 | 63 | ) |
88 | 64 |
|
89 | | - @property |
90 | | - def session_management_channel(self) -> grpc.Channel: |
91 | | - """Return gRPC channel to session management service.""" |
92 | | - return self.get_channel( |
93 | | - self._discovery_client.resolve_service( |
94 | | - provided_interface=session_management.GRPC_SERVICE_INTERFACE_NAME, |
95 | | - service_class=session_management.GRPC_SERVICE_CLASS, |
96 | | - ).insecure_address |
97 | | - ) |
98 | | - |
99 | | - def get_grpc_device_channel(self, provided_interface: str) -> grpc.Channel: |
100 | | - """Return gRPC channel to specified NI gRPC Device service. |
101 | | -
|
102 | | - Args: |
103 | | - provided_interface (str): The gRPC Full Name of the service. |
104 | | -
|
105 | | - """ |
106 | | - return self.get_channel( |
107 | | - self._discovery_client.resolve_service( |
108 | | - provided_interface=provided_interface, |
109 | | - service_class="ni.measurementlink.v1.grpcdeviceserver", |
110 | | - ).insecure_address |
111 | | - ) |
112 | | - |
113 | 65 |
|
114 | 66 | class TestStandSupport(object): |
115 | 67 | """Class that communicates with TestStand.""" |
@@ -194,96 +146,3 @@ def verbosity_option(func: F) -> F: |
194 | 146 | count=True, |
195 | 147 | help="Enable verbose logging. Repeat to increase verbosity.", |
196 | 148 | )(func) |
197 | | - |
198 | | - |
199 | | -def grpc_device_options(func: F) -> F: |
200 | | - """Decorator for NI gRPC Device Server command line options.""" |
201 | | - use_grpc_device_option = click.option( |
202 | | - "--use-grpc-device/--no-use-grpc-device", |
203 | | - default=True, |
204 | | - is_flag=True, |
205 | | - help="Use the NI gRPC Device Server.", |
206 | | - ) |
207 | | - grpc_device_address_option = click.option( |
208 | | - "--grpc-device-address", |
209 | | - default="", |
210 | | - help="NI gRPC Device Server address (e.g. localhost:31763). If unspecified, use the discovery service to resolve the address.", |
211 | | - ) |
212 | | - return grpc_device_address_option(use_grpc_device_option(func)) |
213 | | - |
214 | | - |
215 | | -def use_simulation_option(default: bool) -> Callable[[F], F]: |
216 | | - """Decorator for --use-simulation command line option.""" |
217 | | - return click.option( |
218 | | - "--use-simulation/--no-use-simulation", |
219 | | - default=default, |
220 | | - is_flag=True, |
221 | | - help="Use simulated instruments.", |
222 | | - ) |
223 | | - |
224 | | - |
225 | | -def get_grpc_device_channel( |
226 | | - measurement_service: MeasurementService, |
227 | | - driver_module: types.ModuleType, |
228 | | - service_options: ServiceOptions, |
229 | | -) -> Optional[grpc.Channel]: |
230 | | - """Returns driver specific grpc device channel.""" |
231 | | - if service_options.use_grpc_device: |
232 | | - if service_options.grpc_device_address: |
233 | | - return measurement_service.channel_pool.get_channel(service_options.grpc_device_address) |
234 | | - |
235 | | - return measurement_service.get_channel( |
236 | | - provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"), |
237 | | - service_class="ni.measurementlink.v1.grpcdeviceserver", |
238 | | - ) |
239 | | - return None |
240 | | - |
241 | | - |
242 | | -def create_session_management_client( |
243 | | - measurement_service: MeasurementService, |
244 | | -) -> nims.session_management.Client: |
245 | | - """Return created session management client.""" |
246 | | - return nims.session_management.Client( |
247 | | - grpc_channel=measurement_service.get_channel( |
248 | | - provided_interface=nims.session_management.GRPC_SERVICE_INTERFACE_NAME, |
249 | | - service_class=nims.session_management.GRPC_SERVICE_CLASS, |
250 | | - ) |
251 | | - ) |
252 | | - |
253 | | - |
254 | | -def get_session_and_channel_for_pin( |
255 | | - session_info: List[nims.session_management.SessionInformation], |
256 | | - pin: str, |
257 | | - site: Optional[int] = None, |
258 | | -) -> Tuple[int, List[str]]: |
259 | | - """Returns the session information based on the given pin names.""" |
260 | | - session_and_channel_info = get_sessions_and_channels_for_pins( |
261 | | - session_info=session_info, pins=[pin], site=site |
262 | | - ) |
263 | | - |
264 | | - if len(session_and_channel_info) != 1: |
265 | | - raise ValueError(f"Unsupported number of sessions for {pin}: {len(session_info)}") |
266 | | - return session_and_channel_info[0] |
267 | | - |
268 | | - |
269 | | -def get_sessions_and_channels_for_pins( |
270 | | - session_info: List[nims.session_management.SessionInformation], |
271 | | - pins: Union[str, List[str]], |
272 | | - site: Optional[int] = None, |
273 | | -) -> List[Tuple[int, List[str]]]: |
274 | | - """Returns the session information based on the given pin names.""" |
275 | | - pin_names = [pins] if isinstance(pins, str) else pins |
276 | | - session_and_channel_info = [] |
277 | | - for session_index, session_details in enumerate(session_info): |
278 | | - channel_list = [ |
279 | | - mapping.channel |
280 | | - for mapping in session_details.channel_mappings |
281 | | - if mapping.pin_or_relay_name in pin_names and (site is None or mapping.site == site) |
282 | | - ] |
283 | | - if len(channel_list) != 0: |
284 | | - session_and_channel_info.append((session_index, channel_list)) |
285 | | - |
286 | | - if len(session_and_channel_info) == 0: |
287 | | - raise KeyError(f"Pin(s) {pins} and site {site} not found") |
288 | | - |
289 | | - return session_and_channel_info |
0 commit comments