-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
128 lines (97 loc) · 4.02 KB
/
parser.py
File metadata and controls
128 lines (97 loc) · 4.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from Evtx.Evtx import Evtx
import xml.etree.ElementTree as ET
import datetime
# XML namespace used by Windows Event Logs
NS = "{http://schemas.microsoft.com/win/2004/08/events/event}"
# RDP + DFIR relevant event IDs only
# Parser extracts evidence, not detections
RDP_RELEVANT_EVENTS = {
"4624": "Successful Logon", # RDP when LogonType = 10
"4625": "Failed Logon",
"4634": "Logoff",
"1149": "RDP Authentication Successful",
"21": "RDP Session Connect",
"22": "Shell Start",
"24": "Session Disconnect",
"4720": "User Account Created",
"4722": "User Account Enabled",
"4723": "Password Change Attempt",
"4724": "Password Reset Attempt",
"4725": "User Account Disabled",
"4732": "User Added To Privileged Group",
"4728": "User Added To Security Group",
"7045": "New Service Installed",
"4698": "Scheduled Task Created",
"1102": "Security Logs Cleared",
"129": "Scheduled Task Registered (TaskScheduler)"
}
class RDPEventParser:
def __init__(self):
self.events = [] # Collected parsed events
def _convert_time(self, ts):
"""Convert ISO timestamp string to datetime object."""
if not ts or ts == "N/A":
return None
try:
# Converts e.g. 2026-01-01T00:11:56.007219Z
return datetime.datetime.fromisoformat(ts.replace("Z", "+00:00"))
except:
# Invalid or malformed timestamps are ignored
return None
def parse_evtx(self, path, source_name):
"""Parse an EVTX file and extract relevant DFIR events."""
print(f"[+] Parsing {source_name}: {path}")
count = 0
with Evtx(path) as log:
for record in log.records():
# Parse XML record
try:
root = ET.fromstring(record.xml())
except:
# Skip malformed XML records
continue
system = root.find(f"{NS}System")
if system is None:
continue
# Extract Event ID
eid_node = system.find(f"{NS}EventID")
if eid_node is None:
continue
event_id = eid_node.text.strip()
# Ignore irrelevant events early
if event_id not in RDP_RELEVANT_EVENTS:
continue
# Extract timestamp from SystemTime attribute
time_node = system.find(f"{NS}TimeCreated")
timestamp = (
time_node.attrib.get("SystemTime")
if time_node is not None else "N/A"
)
# Core event structure
event = {
"event_id": event_id,
"event_name": RDP_RELEVANT_EVENTS[event_id],
"timestamp": timestamp, # raw evidence
"parsed_time": self._convert_time(timestamp), # analysis-friendly
"source": source_name,
"details": {}
}
# Extract EventData fields (Security / System / RDP logs)
event_data = root.find(f"{NS}EventData")
if event_data is not None:
for d in event_data:
name = d.attrib.get("Name", "")
event["details"][name] = d.text
# Extract UserData fields (TaskScheduler and others)
user_data = root.find(f"{NS}UserData")
if user_data is not None:
for elem in user_data.iter():
if elem.text and elem.tag:
clean_name = elem.tag.replace(NS, "")
event["details"][clean_name] = elem.text
self.events.append(event)
count += 1
print(f"[OK] Extracted {count} relevant events from {source_name}")
def get_events(self):
"""Return all parsed events."""
return self.events