-
Notifications
You must be signed in to change notification settings - Fork 151
Expand file tree
/
Copy pathlog_examples.py
More file actions
130 lines (112 loc) · 3.59 KB
/
log_examples.py
File metadata and controls
130 lines (112 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
from __future__ import absolute_import, print_function
import csv
import json
import sys
from datetime import datetime, timedelta, timezone
from six.moves import input
import duo_client
argv_iter = iter(sys.argv[1:])
def get_next_arg(prompt, default=None):
try:
return next(argv_iter)
except StopIteration:
return input(prompt) or default
today = datetime.now(tz=timezone.utc)
default_mintime = int((today - timedelta(days=180)).timestamp())
default_maxtime = int(today.timestamp() * 1000) - 120
# Configuration and information about objects to create.
admin_api = duo_client.Admin(
ikey=get_next_arg("Admin API integration key: "),
skey=get_next_arg("Integration secret key: "),
host=get_next_arg("API hostname: "),
)
params = {}
mintime = get_next_arg("Mintime: ", default_mintime)
if mintime:
params["mintime"] = mintime
maxtime = get_next_arg("Maxtime: ", default_maxtime)
if maxtime:
params["maxtime"] = maxtime
limit = get_next_arg("Limit (1000): ")
if limit:
params["limit"] = limit
next_offset = get_next_arg("Next_offset: ")
if next_offset:
params["next_offset"] = next_offset
sort = get_next_arg("Sort (ts:desc): ")
if sort:
params["sort"] = sort
log_type = get_next_arg("Log Type (telephony_v2): ", "telephony_v2")
print(f"Fetching {log_type} logs...")
reporter = csv.writer(sys.stdout)
print("==============================")
if log_type == "activity":
params["mintime"] = params["mintime"] * 1000
activity_logs = admin_api.get_activity_logs(**params)
print(
"Next offset from response: ", activity_logs.get("metadata").get("next_offset")
)
reporter.writerow(
("activity_id", "ts", "action", "actor_name", "target_name", "application")
)
for log in activity_logs["items"]:
activity = log["activity_id"]
ts = log["ts"]
action = log["action"]
actor_name = log.get("actor", {}).get("name", None)
target_name = log.get("target", {}).get("name", None)
application = log.get("application", {}).get("name", None)
reporter.writerow(
[
activity,
ts,
action,
actor_name,
target_name,
application,
]
)
if log_type == "telephony_v2":
telephony_logs = admin_api.get_telephony_log(api_version=2, kwargs=params)
reporter.writerow(("telephony_id", "txid", "credits", "context", "phone", "type"))
for log in telephony_logs["items"]:
telephony_id = log["telephony_id"]
txid = log["txid"]
credits = log["credits"]
context = log["context"]
phone = log["phone"]
type = log["type"]
reporter.writerow(
[
telephony_id,
txid,
credits,
context,
phone,
type
]
)
if log_type == "auth":
auth_logs = admin_api.get_authentication_log(api_version=2, kwargs=params)
print(
"Next offset from response: ",
auth_logs.get("metadata").get("next_offset"),
)
reporter.writerow(("admin", "akey", "context", "phone", "provider"))
for log in auth_logs["authlogs"]:
admin = log["admin_name"]
akey = log["akey"]
context = log["context"]
phone = log["phone"]
provider = log["provider"]
reporter.writerow(
[
admin,
akey,
context,
phone,
provider,
]
)
print("==============================")