-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy path__init__.py
More file actions
386 lines (310 loc) · 15.3 KB
/
__init__.py
File metadata and controls
386 lines (310 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
"""Code supporting the LabThings server.
LabThings wraps the `fastapi.FastAPI` application in a `.ThingServer`, which
provides the tools to serve and manage `.Thing` instances.
See the :ref:`tutorial` for examples of how to set up a `.ThingServer`.
"""
from __future__ import annotations
from typing import Any, AsyncGenerator, Optional, TypeVar
from typing_extensions import Self
import os
import logging
from fastapi import APIRouter, FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from anyio.from_thread import BlockingPortal
from contextlib import asynccontextmanager, AsyncExitStack
from collections.abc import Mapping, Sequence
from types import MappingProxyType
from ..middleware.url_for import url_for_middleware
from ..thing_slots import ThingSlot
from ..utilities import class_attributes
from ..actions import ActionManager
from ..logs import configure_thing_logger
from ..thing import Thing
from ..thing_server_interface import ThingServerInterface
from ..thing_description._model import ThingDescription
from .config_model import (
ThingsConfig,
ThingServerConfig,
normalise_things_config as normalise_things_config,
)
# `_thing_servers` is used as a global from `ThingServer.__init__`
from labthings_fastapi.outputs import blob
__all__ = ["ThingServer"]
ThingSubclass = TypeVar("ThingSubclass", bound=Thing)
class ThingServer:
"""Use FastAPI to serve `.Thing` instances.
The `.ThingServer` sets up a `fastapi.FastAPI` application and uses it
to expose the capabilities of `.Thing` instances over HTTP.
There are several functions of a `.ThingServer`:
* Manage where settings are stored, to allow `.Thing` instances to
load and save their settings from disk.
* Configure the server to allow cross-origin requests (required if
we use a web app that is not served from the `.ThingServer`).
* Manage the threads used to run :ref:`actions`.
* Manage :ref:`blobs` to allow binary data to be returned.
* Allow threaded code to call functions in the event loop, by providing
an `anyio.from_thread.BlockingPortal`.
"""
def __init__(
self,
things: ThingsConfig,
settings_folder: Optional[str] = None,
api_prefix: str = "",
application_config: Optional[Mapping[str, Any]] = None,
debug: bool = False,
) -> None:
r"""Initialise a LabThings server.
Setting up the `.ThingServer` involves creating the underlying
`fastapi.FastAPI` app, setting its lifespan function (used to
set up and shut down the `.Thing` instances), and configuring it
to allow cross-origin requests.
We also create the `.ActionManager` to manage :ref:`actions` and the
`.BlobManager` to manage the downloading of :ref:`blobs`.
:param things: A mapping of Thing names to `.Thing` subclasses, or
`.ThingConfig` objects specifying the subclass, its initialisation
arguments, and any connections to other `.Thing`\ s.
:param settings_folder: the location on disk where `.Thing`
settings will be saved.
:param api_prefix: An optional prefix for all API routes. This must either
be empty, or start with a slash and not end with a slash.
:param application_config: A mapping containing custom configuration for the
application. This is not processed by LabThings. Each `.Thing` can access
this via the Thing-Server interface.
:param debug: If ``True``, set the log level for `.Thing` instances to
DEBUG.
"""
self.startup_failure: dict | None = None
# Note: this is safe to call multiple times.
configure_thing_logger(logging.DEBUG if debug else None)
self._config = ThingServerConfig(
things=things,
settings_folder=settings_folder,
api_prefix=api_prefix,
application_config=application_config,
)
self.app = FastAPI(lifespan=self.lifespan)
self._set_cors_middleware()
self._set_url_for_middleware()
self.settings_folder = settings_folder or "./settings"
self.action_manager = ActionManager()
self.app.include_router(self.action_manager.router(), prefix=self._api_prefix)
self.app.include_router(blob.router, prefix=self._api_prefix)
self.app.include_router(self._things_view_router(), prefix=self._api_prefix)
self.blocking_portal: Optional[BlockingPortal] = None
self.startup_status: dict[str, str | dict] = {"things": {}}
global _thing_servers # noqa: F824
# The function calls below create and set up the Things.
self._things = self._create_things()
self._connect_things()
self._attach_things_to_server()
@classmethod
def from_config(cls, config: ThingServerConfig, debug: bool = False) -> Self:
r"""Create a ThingServer from a configuration model.
This is equivalent to ``ThingServer(**dict(config))``\ .
:param config: The configuration parameters for the server.
:param debug: If ``True``, set the log level for `.Thing` instances to
DEBUG.
:return: A `.ThingServer` configured as per the model.
"""
return cls(**dict(config), debug=debug)
def _set_cors_middleware(self) -> None:
"""Configure the server to allow requests from other origins.
This is required to allow web applications access to the HTTP API,
if they are not served from the same origin (i.e. if they are not
served as part of the `.ThingServer`.).
This is usually needed during development, and may be needed at
other times depending on how you are using LabThings.
"""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _set_url_for_middleware(self) -> None:
"""Add middleware to support `url_for` in Pydantic models.
This middleware adds a request state variable that allows
`labthings_fastapi.server.URLFor` instances to be serialised
using FastAPI's `url_for` function.
"""
self.app.middleware("http")(url_for_middleware)
@property
def things(self) -> Mapping[str, Thing]:
"""Return a dictionary of all the things.
:return: a dictionary mapping thing paths to `.Thing` instances.
"""
return MappingProxyType(self._things)
@property
def application_config(self) -> Mapping[str, Any] | None:
"""Return the application configuration from the config file.
:return: The custom configuration as specified in the configuration
file.
"""
return self._config.application_config
@property
def _api_prefix(self) -> str:
"""A string that prefixes all URLs in the application.
This must either be empty, or start with a slash and not
end with a slash.
"""
return self._config.api_prefix
ThingInstance = TypeVar("ThingInstance", bound=Thing)
def things_by_class(self, cls: type[ThingInstance]) -> Sequence[ThingInstance]:
"""Return all Things attached to this server matching a class.
Return all instances of ``cls`` attached to this server.
:param cls: A `.Thing` subclass.
:return: all instances of ``cls`` that have been added to this server.
"""
return [t for t in self.things.values() if isinstance(t, cls)]
def thing_by_class(self, cls: type[ThingInstance]) -> ThingInstance:
"""Return the instance of ``cls`` attached to this server.
This function calls `.ThingServer.things_by_class`, but asserts that
there is exactly one match.
:param cls: a `.Thing` subclass.
:return: the instance of ``cls`` attached to this server.
:raise RuntimeError: if there is not exactly one matching Thing.
"""
instances = self.things_by_class(cls)
if len(instances) == 1:
return instances[0]
raise RuntimeError(
f"There are {len(instances)} Things of class {cls}, expected 1."
)
def path_for_thing(self, name: str) -> str:
"""Return the path for a thing with the given name.
:param name: The name of the thing.
:return: The path at which the thing is served.
:raise KeyError: if no thing with the given name has been added.
"""
if name not in self._things:
raise KeyError(f"No thing named {name} has been added to this server.")
return f"{self._api_prefix}/{name}/"
def _create_things(self) -> Mapping[str, Thing]:
r"""Create the Things, add them to the server, and connect them up if needed.
This method is responsible for creating instances of `.Thing` subclasses
and adding them to the server. It also ensures the `.Thing`\ s are connected
together if required.
The Things are defined in ``self._config.thing_configs`` which in turn is
generated from the ``things`` argument to ``__init__``\ .
:return: A mapping of names to `.Thing` instances.
:raise TypeError: if ``cls`` is not a subclass of `.Thing`.
"""
things: dict[str, Thing] = {}
for name, config in self._config.thing_configs.items():
if not issubclass(config.cls, Thing):
raise TypeError(f"{config.cls} is not a Thing subclass.")
interface = ThingServerInterface(name=name, server=self)
os.makedirs(interface.settings_folder, exist_ok=True)
# This is where we instantiate the Thing
things[name] = config.cls(
*config.args,
**config.kwargs,
thing_server_interface=interface,
)
return things
def _connect_things(self) -> None:
r"""Connect the `thing_slot` attributes of Things.
A `.Thing` may have attributes defined as ``lt.thing_slot()``, which
will be populated after all `.Thing` instances are loaded on the server.
This function is responsible for supplying the `.Thing` instances required
for each connection. This will be done by using the name specified either
in the connection's default, or in the configuration of the server.
`.ThingSlotError` will be raised by code called by this method if
the connection cannot be provided. See `.ThingSlot.connect` for more
details.
"""
for thing_name, thing in self.things.items():
config = self._config.thing_configs[thing_name].thing_slots
for attr_name, attr in class_attributes(thing):
if not isinstance(attr, ThingSlot):
continue
target = config.get(attr_name, ...)
attr.connect(thing, self.things, target)
def _attach_things_to_server(self) -> None:
"""Add the Things to the FastAPI App.
This calls `.Thing.attach_to_server` on each `.Thing` that is a part of
this `.ThingServer` in order to add the HTTP endpoints and load settings.
"""
for thing in self.things.values():
thing.attach_to_server(self)
@asynccontextmanager
async def lifespan(self, app: FastAPI) -> AsyncGenerator[None, None]:
"""Manage set up and tear down of the server and Things.
This method is used as a lifespan function for the FastAPI app. See
the lifespan_ page in FastAPI's documentation.
.. _lifespan: https://fastapi.tiangolo.com/advanced/events/#lifespan-function
This does two important things:
* It sets up the blocking portal so background threads can run async code
(this is required for events, streams, etc.).
* It runs setup/teardown code for Things by calling them as context
managers.
:param app: The FastAPI application wrapped by the server.
:yield: no value. The FastAPI application will serve requests while this
function yields.
:raises BaseException: Reraises any errors that are caught when calling
``__enter__`` on each Thing. The error is also saved to
``self.startup_failure`` for post mortem, as otherwise uvicorn will swallow
it and replace it with SystemExit(3) and no traceback.
"""
async with BlockingPortal() as portal:
# We create a blocking portal to allow threaded code to call async code
# in the event loop.
self.blocking_portal = portal
# we __aenter__ and __aexit__ each Thing, which will in turn call the
# synchronous __enter__ and __exit__ methods if they exist, to initialise
# and shut down the hardware. NB we must make sure the blocking portal
# is present when this happens, in case we are dealing with threads.
async with AsyncExitStack() as stack:
for thing in self.things.values():
try:
await stack.enter_async_context(thing)
except BaseException as e:
self.startup_failure = {
"thing": thing.name,
"exception": e,
}
raise
yield
self.blocking_portal = None
def _things_view_router(self) -> APIRouter:
"""Create a router for the endpoint that shows the list of attached things.
:returns: an APIRouter with the `thing_descriptions` endpoint.
"""
router = APIRouter()
thing_server = self
@router.get(
"/thing_descriptions/",
response_model_exclude_none=True,
response_model_by_alias=True,
)
def thing_descriptions(request: Request) -> Mapping[str, ThingDescription]:
"""Describe all the things available from this server.
This returns a dictionary, where the keys are the paths to each
`.Thing` attached to the server, and the values are :ref:`wot_td` documents
represented as `.ThingDescription` objects. These should enable
clients to see all the capabilities of the `.Thing` instances and
access them over HTTP.
:param request: is supplied automatically by FastAPI.
:return: a dictionary mapping Thing paths to :ref:`wot_td` objects, which
are `pydantic.BaseModel` subclasses that get serialised to
dictionaries.
"""
return {
name: thing.thing_description(
path=f"{self._api_prefix}/{name}/", base=str(request.base_url)
)
for name, thing in thing_server.things.items()
}
@router.get("/things/")
def thing_paths(request: Request) -> Mapping[str, str]:
"""URLs pointing to the Thing Descriptions of each Thing.
:param request: is supplied automatically by FastAPI.
:return: a list of paths pointing to `.Thing` instances. These
URLs will return the :ref:`wot_td` of one `.Thing` each.
""" # noqa: D403 (URLs is correct capitalisation)
return {
t: str(request.url_for(f"things.{t}"))
for t in thing_server.things.keys()
}
return router