-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_parser.py
More file actions
184 lines (151 loc) · 6.72 KB
/
log_parser.py
File metadata and controls
184 lines (151 loc) · 6.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""
Log Parser Module
Handles parsing of text and JSON log files with support for background processing.
"""
import json
import re
from datetime import datetime
from typing import List, Dict, Any, Optional
from enum import Enum
class LogLevel(Enum):
"""Log level enumeration"""
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
UNKNOWN = "UNKNOWN"
class LogEntry:
"""Represents a single log entry"""
def __init__(self, timestamp: Optional[datetime] = None, level: LogLevel = LogLevel.UNKNOWN,
source: str = "", message: str = "", raw_line: str = ""):
self.timestamp = timestamp
self.level = level
self.source = source
self.message = message
self.raw_line = raw_line
self.is_anomaly = False
def to_dict(self) -> Dict[str, Any]:
"""Convert log entry to dictionary"""
return {
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'level': self.level.value,
'source': self.source,
'message': self.message,
'is_anomaly': self.is_anomaly
}
class LogParser:
"""Parser for log files supporting text and JSON formats"""
# Common log patterns
PATTERNS = [
# Simple: timestamp - level - message (without milliseconds)
r'^(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+-\s+(?P<level>\w+)\s+-\s+(?P<message>.*)$',
# Simple: timestamp - level - message (with milliseconds)
r'^(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}[.,]\d+)\s+-\s+(?P<level>\w+)\s+-\s+(?P<message>.*)$',
# Apache/Nginx style: [timestamp] level: message
r'^\[(?P<timestamp>[^\]]+)\]\s+(?P<level>\w+):\s+(?P<message>.*)$',
# Syslog style: timestamp source level: message
r'^(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+(?P<source>\S+)\s+(?P<level>\w+):\s+(?P<message>.*)$',
# Java style: level timestamp [source] message
r'^(?P<level>\w+)\s+(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[(?P<source>[^\]]+)\]\s+(?P<message>.*)$',
# Generic: level: message
r'^(?P<level>\w+):\s+(?P<message>.*)$'
]
def __init__(self):
self.compiled_patterns = [re.compile(pattern) for pattern in self.PATTERNS]
def parse_timestamp(self, timestamp_str: str) -> Optional[datetime]:
"""Parse timestamp from string"""
timestamp_formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%Y-%m-%d %H:%M:%S,%f',
'%d/%b/%Y:%H:%M:%S',
'%Y/%m/%d %H:%M:%S',
'%b %d %H:%M:%S',
]
for fmt in timestamp_formats:
try:
return datetime.strptime(timestamp_str.strip(), fmt)
except ValueError:
continue
return None
def parse_level(self, level_str: str) -> LogLevel:
"""Parse log level from string"""
level_upper = level_str.upper()
for level in LogLevel:
if level.value == level_upper:
return level
return LogLevel.UNKNOWN
def parse_text_line(self, line: str) -> LogEntry:
"""Parse a single text log line"""
line = line.strip()
if not line:
return LogEntry(raw_line=line)
# Try each pattern
for pattern in self.compiled_patterns:
match = pattern.match(line)
if match:
groups = match.groupdict()
timestamp = None
if 'timestamp' in groups:
timestamp = self.parse_timestamp(groups['timestamp'])
level = LogLevel.UNKNOWN
if 'level' in groups:
level = self.parse_level(groups['level'])
source = groups.get('source', '')
message = groups.get('message', line)
return LogEntry(timestamp, level, source, message, line)
# If no pattern matches, return as unknown
return LogEntry(message=line, raw_line=line)
def parse_json_line(self, line: str) -> LogEntry:
"""Parse a JSON log line"""
try:
data = json.loads(line)
timestamp = None
if 'timestamp' in data:
timestamp = self.parse_timestamp(str(data['timestamp']))
elif 'time' in data:
timestamp = self.parse_timestamp(str(data['time']))
level = LogLevel.UNKNOWN
if 'level' in data:
level = self.parse_level(str(data['level']))
elif 'severity' in data:
level = self.parse_level(str(data['severity']))
source = data.get('source', data.get('logger', ''))
message = data.get('message', data.get('msg', json.dumps(data)))
return LogEntry(timestamp, level, str(source), str(message), line)
except json.JSONDecodeError:
return LogEntry(message=line, raw_line=line)
def parse_file(self, filepath: str, is_json: bool = False) -> List[LogEntry]:
"""Parse a log file"""
entries = []
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
if is_json:
entry = self.parse_json_line(line)
else:
entry = self.parse_text_line(line)
entries.append(entry)
except Exception as e:
print(f"Error parsing file: {e}")
return entries
def detect_anomalies(self, entries: List[LogEntry]) -> None:
"""Detect anomalies in log entries (simple heuristic)"""
# Mark ERROR and CRITICAL as anomalies
for entry in entries:
if entry.level in [LogLevel.ERROR, LogLevel.CRITICAL]:
entry.is_anomaly = True
# Detect repeated errors (simple frequency analysis)
error_messages = {}
for entry in entries:
if entry.level in [LogLevel.ERROR, LogLevel.CRITICAL]:
msg_key = entry.message[:50] # First 50 chars
error_messages[msg_key] = error_messages.get(msg_key, 0) + 1
# Mark frequently occurring errors as anomalies
threshold = max(2, len(entries) // 1000) # Dynamic threshold
for entry in entries:
if entry.level in [LogLevel.ERROR, LogLevel.CRITICAL]:
msg_key = entry.message[:50]
if error_messages.get(msg_key, 0) > threshold:
entry.is_anomaly = True