This repository was archived by the owner on Jan 23, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathprint.py
More file actions
180 lines (153 loc) · 6.57 KB
/
print.py
File metadata and controls
180 lines (153 loc) · 6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import asyncclick as click
from jumpstarter_cli_common import (
OutputMode,
OutputType,
make_table,
time_since,
)
from jumpstarter_kubernetes import (
V1Alpha1Client,
V1Alpha1ClientList,
V1Alpha1Exporter,
V1Alpha1ExporterList,
V1Alpha1Lease,
V1Alpha1LeaseList,
)
CLIENT_COLUMNS = ["NAME", "ENDPOINT", "AGE"]
def make_client_row(client: V1Alpha1Client):
return {
"NAME": client.metadata.name,
"ENDPOINT": client.status.endpoint if client.status is not None else "",
"AGE": time_since(client.metadata.creation_timestamp),
}
def print_client(client: V1Alpha1Client, output: OutputType):
if output == OutputMode.JSON:
click.echo(client.dump_json())
elif output == OutputMode.YAML:
click.echo(client.dump_yaml())
elif output == OutputMode.NAME:
click.echo(f"client.jumpstarter.dev/{client.metadata.name}")
else:
click.echo(make_table(CLIENT_COLUMNS, [make_client_row(client)]))
def print_clients(clients: V1Alpha1ClientList, namespace: str, output: OutputType):
if output == OutputMode.JSON:
click.echo(clients.dump_json())
elif output == OutputMode.YAML:
click.echo(clients.dump_yaml())
elif output == OutputMode.NAME:
for item in clients.items:
click.echo(f"client.jumpstarter.dev/{item.metadata.name}")
elif len(clients.items) == 0:
raise click.ClickException(f'No resources found in "{namespace}" namespace')
else:
click.echo(make_table(CLIENT_COLUMNS, list(map(make_client_row, clients.items))))
EXPORTER_COLUMNS = ["NAME", "ENDPOINT", "DEVICES", "AGE"]
DEVICE_COLUMNS = ["NAME", "ENDPOINT", "AGE", "LABELS", "UUID"]
def make_exporter_row(exporter: V1Alpha1Exporter):
"""Make an exporter row to print"""
return {
"NAME": exporter.metadata.name,
"ENDPOINT": exporter.status.endpoint,
"DEVICES": str(len(exporter.status.devices) if exporter.status and exporter.status.devices else 0),
"AGE": time_since(exporter.metadata.creation_timestamp),
}
def get_device_rows(exporters: list[V1Alpha1Exporter]):
"""Get the device rows to print from the exporters"""
devices = []
for e in exporters:
if e.status is not None:
for d in e.status.devices:
labels = []
if d.labels is not None:
for label in d.labels:
labels.append(f"{label}:{str(d.labels[label])}")
devices.append(
{
"NAME": e.metadata.name,
"ENDPOINT": e.status.endpoint,
"AGE": time_since(e.metadata.creation_timestamp),
"LABELS": ",".join(labels),
"UUID": d.uuid,
}
)
return devices
def print_exporter(exporter: V1Alpha1Exporter, devices: bool, output: OutputType):
if output == OutputMode.JSON:
click.echo(exporter.dump_json())
elif output == OutputMode.YAML:
click.echo(exporter.dump_yaml())
elif output == OutputMode.NAME:
click.echo(f"exporter.jumpstarter.dev/{exporter.metadata.name}")
elif devices:
# Print the devices for the exporter
click.echo(make_table(DEVICE_COLUMNS, get_device_rows([exporter])))
else:
click.echo(make_table(EXPORTER_COLUMNS, [make_exporter_row(exporter)]))
def print_exporters(exporters: V1Alpha1ExporterList, namespace: str, devices: bool, output: OutputType):
if output == OutputMode.JSON:
click.echo(exporters.dump_json())
elif output == OutputMode.YAML:
click.echo(exporters.dump_yaml())
elif output == OutputMode.NAME:
for item in exporters.items:
click.echo(f"exporter.jumpstarter.dev/{item.metadata.name}")
elif len(exporters.items) == 0:
raise click.ClickException(f'No resources found in "{namespace}" namespace')
elif devices:
click.echo(make_table(DEVICE_COLUMNS, get_device_rows(exporters.items)))
else:
click.echo(make_table(EXPORTER_COLUMNS, list(map(make_exporter_row, exporters.items))))
LEASE_COLUMNS = ["NAME", "CLIENT", "SELECTOR", "EXPORTER", "STATUS", "REASON", "BEGIN", "END", "DURATION", "AGE"]
def get_reason(lease: V1Alpha1Lease):
condition = lease.status.conditions[-1] if len(lease.status.conditions) > 0 else None
reason = condition.reason if condition is not None else "Unknown"
status = condition.status if condition is not None else "False"
if reason == "Ready":
if status == "True":
return "Ready"
else:
return "Waiting"
elif reason == "Expired":
if status == "True":
return "Expired"
else:
return "Complete"
else:
return reason
def make_lease_row(lease: V1Alpha1Lease):
selectors = []
for label in lease.spec.selector.match_labels:
selectors.append(f"{label}:{str(lease.spec.selector.match_labels[label])}")
return {
"NAME": lease.metadata.name,
"CLIENT": lease.spec.client.name if lease.spec.client is not None else "",
"SELECTOR": ",".join(selectors) if len(selectors) > 0 else "*",
"EXPORTER": lease.status.exporter.name if lease.status.exporter is not None else "",
"DURATION": lease.spec.duration,
"STATUS": "Ended" if lease.status.ended else "InProgress",
"REASON": get_reason(lease),
"BEGIN": lease.status.begin_time if lease.status.begin_time is not None else "",
"END": lease.status.end_time if lease.status.end_time is not None else "",
"AGE": time_since(lease.metadata.creation_timestamp),
}
def print_lease(lease: V1Alpha1Lease, output: OutputType):
if output == OutputMode.JSON:
click.echo(lease.dump_json())
elif output == OutputMode.YAML:
click.echo(lease.dump_yaml())
elif output == OutputMode.NAME:
click.echo(f"lease.jumpstarter.dev/{lease.metadata.name}")
else:
click.echo(make_table(LEASE_COLUMNS, [make_lease_row(lease)]))
def print_leases(leases: V1Alpha1LeaseList, namespace: str, output: OutputType):
if output == OutputMode.JSON:
click.echo(leases.dump_json())
elif output == OutputMode.YAML:
click.echo(leases.dump_yaml())
elif output == OutputMode.NAME:
for item in leases.items:
click.echo(f"lease.jumpstarter.dev/{item.metadata.name}")
elif len(leases.items) == 0:
raise click.ClickException(f'No resources found in "{namespace}" namespace')
else:
click.echo(make_table(LEASE_COLUMNS, list(map(make_lease_row, leases.items))))