-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathclient.py
More file actions
171 lines (135 loc) · 5.44 KB
/
client.py
File metadata and controls
171 lines (135 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Client multihost host."""
from __future__ import annotations
from pathlib import PurePosixPath
from typing import Any
from pytest_mh.conn import ProcessLogLevel
from .base import BaseHost, BaseLinuxHost
__all__ = [
"ClientHost",
]
class ClientHost(BaseHost, BaseLinuxHost):
"""
SSSD client host object.
Provides features specific to SSSD client.
.. note::
Full backup and restore of SSSD state is supported.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._features: dict[str, bool] | None = None
self.sssd_service_user: str = ""
""" SSSD service user configured by default install """
def pytest_setup(self) -> None:
super().pytest_setup()
self.sssd_service_user = self.svc.get_property("sssd", "User")
@property
def features(self) -> dict[str, bool]:
"""
Features supported by the host.
"""
if self._features is not None:
return self._features
self.logger.info(f"Detecting SSSD's features on {self.hostname}")
result = self.conn.run(
"""
set -ex
[ -f "/usr/lib64/sssd/libsss_files.so" ] && echo "files-provider" || :
[ -f "/usr/libexec/sssd/passkey_child" ] && echo "passkey" || :
[ -f "/usr/bin/sss_ssh_knownhosts" ] && echo "knownhosts" || :
[ -f "/usr/lib64/sssd/libsss_idp.so" ] && echo "idp-provider" || :
[ -f /opt/test_venv/bin/scauto ] && /opt/test_venv/bin/scauto gui done 2> /dev/null && echo "gdm" || :
systemctl cat sssd.service | grep -q "If service configured to be run under" && echo "non-privileged" || :
strings /usr/lib64/sssd/libsss_ldap_common.so | grep ldap_use_ppolicy && echo "ldap_use_ppolicy" || :
# enumerate (bool) Feature is only supported for domains with id_provider = ldap or id_provider = proxy.
MANWIDTH=10000 man sssd.conf | grep -q "id_provider = ldap or id_provider = proxy" && \
echo "limited_enumeration" || :
[ -f "/usr/bin/vicc" ] && echo "virtualsmartcard" || :
[ -f "/usr/bin/umockdev-run" ] && echo "umockdev" || :
lsmod | grep -q "vhci_hcd" && [ -f "/opt/test_venv/bin/vfido.py" ] && echo "vfido" || :
""",
log_level=ProcessLogLevel.Error,
)
# Set default values
self._features = {
"files-provider": False,
"passkey": False,
"non-privileged": False,
"ldap_use_ppolicy": False,
"knownhosts": False,
"limited_enumeration": False,
"idp-provider": False,
"gdm": False,
"virtualsmartcard": False,
"umockdev": False,
"vfido": False,
}
self._features.update({k: True for k in result.stdout_lines})
self.logger.info("Detected features:", extra={"data": {"Features": self._features}})
return self._features
def start(self) -> None:
"""
Not supported.
:raises NotImplementedError: _description_
"""
# SSSD might not be configured properly at this time. We start and stop SSSD in tests.
raise NotImplementedError("Starting Client service is not implemented.")
def stop(self) -> None:
self.svc.stop("sssd.service")
def backup(self) -> Any:
"""
Backup all SSSD data.
:return: Backup data.
:rtype: Any
"""
self.logger.info("Creating backup of SSSD client")
result = self.conn.run(
"""
set -ex
function backup {
if [ -d "$1" ] || [ -f "$1" ]; then
cp --force --archive "$1" "$2"
fi
}
path=`mktemp -d`
backup /etc/krb5.conf "$path/krb5.conf"
backup /etc/krb5.keytab "$path/krb5.keytab"
backup /etc/sssd "$path/config"
backup /var/log/sssd "$path/logs"
backup /var/lib/sss "$path/lib"
backup /home "$path/home"
echo $path
""",
log_level=ProcessLogLevel.Error,
)
return PurePosixPath(result.stdout_lines[-1].strip())
def restore(self, backup_data: Any | None) -> None:
"""
Restore all SSSD data.
:return: Backup data.
:rtype: Any
"""
if backup_data is None:
return
if not isinstance(backup_data, PurePosixPath):
raise TypeError(f"Expected PurePosixPath, got {type(backup_data)}")
backup_path = str(backup_data)
self.logger.info(f"Restoring SSSD data from {backup_path}")
self.conn.run(
f"""
set -ex
function restore {{
rm --force --recursive "$2"
if [ -d "$1" ] || [ -f "$1" ]; then
cp --force --archive "$1" "$2"
fi
}}
rm --force --recursive /etc/sssd /var/lib/sss /var/log/sssd /home/*
restore "{backup_path}/krb5.conf" /etc/krb5.conf
restore "{backup_path}/krb5.keytab" /etc/krb5.keytab
restore "{backup_path}/config" /etc/sssd
restore "{backup_path}/logs" /var/log/sssd
restore "{backup_path}/lib" /var/lib/sss
cp --force --archive "{backup_path}/home/*" /home/ || :
""",
log_level=ProcessLogLevel.Error,
)