-
Notifications
You must be signed in to change notification settings - Fork 61
Expand file tree
/
Copy pathauth.py
More file actions
102 lines (77 loc) · 3.58 KB
/
auth.py
File metadata and controls
102 lines (77 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from .errors import AuthError
import enum
import os
# The auth interface here is unstable. I would like to eventually open this up
# for people to define their own custom authentication protocols, but I'm not
# familiar with what's needed for that exactly. To work with any message bus
# implementation would require abstracting out all the IO. Async operations
# might be challenging because different IO backends have different ways of
# doing that. I might just end up giving the raw socket and leaving it all up
# to the user, but it would be nice to have a little guidance in the interface
# since a lot of it is strongly specified. If you have a need for this, contact
# the project maintainer to help stabalize this interface.
class _AuthResponse(enum.Enum):
OK = 'OK'
REJECTED = 'REJECTED'
DATA = 'DATA'
ERROR = 'ERROR'
AGREE_UNIX_FD = 'AGREE_UNIX_FD'
@classmethod
def parse(klass, line):
args = line.split(' ')
response = klass(args[0])
return response, args[1:]
# UNSTABLE
class Authenticator:
"""The base class for authenticators for :class:`MessageBus <dbus_next.message_bus.BaseMessageBus>` authentication.
In the future, the library may allow extending this class for custom authentication protocols.
:seealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
"""
def _authentication_start(self, negotiate_unix_fd=False):
raise NotImplementedError(
'authentication_start() must be implemented in the inheriting class')
def _receive_line(self, line):
raise NotImplementedError('receive_line() must be implemented in the inheriting class')
@staticmethod
def _format_line(line):
return f'{line}\r\n'.encode()
class AuthExternal(Authenticator):
"""An authenticator class for the external auth protocol for use with the
:class:`MessageBus <dbus_next.message_bus.BaseMessageBus>`.
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
"""
def __init__(self):
self.negotiate_unix_fd = False
self.negotiating_fds = False
def _authentication_start(self, negotiate_unix_fd=False) -> str:
self.negotiate_unix_fd = negotiate_unix_fd
hex_uid = str(os.getuid()).encode().hex()
return f'AUTH EXTERNAL {hex_uid}'
def _receive_line(self, line: str):
response, args = _AuthResponse.parse(line)
if response is _AuthResponse.OK:
if self.negotiate_unix_fd:
self.negotiating_fds = True
return "NEGOTIATE_UNIX_FD"
else:
return "BEGIN"
if response is _AuthResponse.AGREE_UNIX_FD:
return "BEGIN"
raise AuthError(f'authentication failed: {response.value}: {args}')
class AuthAnnonymous(Authenticator):
"""An authenticator class for the annonymous auth protocol for use with the
:class:`MessageBus <dbus_next.message_bus.BaseMessageBus>`.
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
"""
def _authentication_start(self, negotiate_unix_fd=False) -> str:
if negotiate_unix_fd:
raise AuthError(
'annonymous authentication does not support negotiating unix fds right now')
return 'AUTH ANONYMOUS'
def _receive_line(self, line: str) -> str:
response, args = _AuthResponse.parse(line)
if response == _AuthResponse.DATA:
return 'DATA'
if response != _AuthResponse.OK:
raise AuthError(f'authentication failed: {response.value}: {args}')
return 'BEGIN'