-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathadmin.py
More file actions
69 lines (53 loc) · 1.9 KB
/
admin.py
File metadata and controls
69 lines (53 loc) · 1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
import logging
from auth_lib.fastapi import UnionAuth
from fastapi import APIRouter, Depends
from redis import Redis
from print_service.exceptions import TerminalTokenNotFound
from print_service.schema import BaseModel
from print_service.settings import Settings, get_settings
logger = logging.getLogger(__name__)
settings: Settings = get_settings()
router = APIRouter()
class UpdateInput(BaseModel):
terminal_token: str
class RebootInput(BaseModel):
terminal_token: str
class InstantCommandSender:
def __init__(self, settings: Settings = None) -> None:
settings = settings or get_settings()
self.redis: Redis = Redis.from_url(str(settings.REDIS_DSN))
def update(self, terminal_token: str):
terminal = self.redis.get(terminal_token)
if terminal:
return None
self.redis.set(terminal_token, json.dumps({'manual_update': True}))
return True
def reboot(self, terminal_token: str):
terminal = self.redis.get(terminal_token)
if terminal:
return None
self.redis.set(terminal_token, json.dumps({'reboot': True}))
return True
@router.post("/update")
async def manual_update_terminal(
input: UpdateInput, user=Depends(UnionAuth(scopes=["print.terminal.service"]))
):
logger.info(f"User {user} updated terminal")
sender = InstantCommandSender()
if sender.update(input.terminal_token):
sender.redis.close()
return {'status': 'ok'}
sender.redis.close()
raise TerminalTokenNotFound()
@router.post("/reboot")
async def reboot_terminal(
input: RebootInput, user=Depends(UnionAuth(scopes=["print.terminal.service"]))
):
logger.info(f"User {user} rebooted terminal")
sender = InstantCommandSender()
if sender.reboot(input.terminal_token):
sender.redis.close()
return {'status': 'ok'}
sender.redis.close()
raise TerminalTokenNotFound()