forked from niklasvincent/ipplan2sqlite
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdiff.py
More file actions
104 lines (87 loc) · 3.17 KB
/
diff.py
File metadata and controls
104 lines (87 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import sys
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def get_tables(
c,
exclude=['meta_data',
'sqlite_sequence'],
include=['firewall_rule_ip_level']):
c.execute('''SELECT name FROM sqlite_master WHERE type = 'table';''')
tables = [t[0] for t in c.fetchall() if t[0] not in exclude]
tables += include
return tables
def get_counts(c):
tables = get_tables(c)
counts = {}
for table in tables:
c.execute('SELECT COUNT(*) FROM %s' % table)
count = c.fetchone()[0]
counts[table] = count
return counts
def get_object_sets(c):
tables = get_tables(c)
objects = {}
for table in tables:
c.execute('PRAGMA table_info(%s)' % table)
r = c.fetchall()
columns = [col[1] for col in r if not 'id' in col[1]]
if len(columns) == 0:
objects[table] = set()
continue
sql = 'SELECT %s FROM %s' % (','.join(columns), table)
c.execute(sql)
objects[table] = set(c.fetchall())
return objects
def get_state(c):
state = {}
state['tables'] = get_tables(c)
state['objects'] = get_object_sets(c)
state['counts'] = get_counts(c)
return state
def _print(color, msg, output):
output.write(color + msg + bcolors.ENDC + "\n")
def compare_states(before, after, logging, output=sys.stdout, limit=10000):
tables_before = set(before['tables'])
tables_after = set(after['tables'])
dropped_tables = tables_before - tables_after
new_tables = tables_after - tables_before
if len(dropped_tables) > 0:
output.write('Dropped %d table(s):' % (len(dropped_tables)))
for dropped_table in dropped_tables:
_print(bcolors.FAIL, dropped_table, output)
if len(new_tables) > 0:
output.write('Added %d table(s):' % (len(new_tables)))
for added_table in new_tables:
_print(bcolors.OKGREEN, added_table, output)
# We can only do diff magic on tables that were in both databases
tables = tables_after.intersection(tables_before)
# Diff tables
for table in tables:
count_before = before['counts'][table]
objects_before = before['objects'][table]
count_after = after['counts'][table]
objects_after = after['objects'][table]
delta_count = count_after - count_before
if delta_count != 0:
logging.info('%s %d' % (table, delta_count))
removed_objects = objects_before - objects_after
added_objects = objects_after - objects_before
if len(removed_objects) > 0:
i = 0
for removed_object in removed_objects:
_print(bcolors.FAIL, "- " + str(removed_object), output)
i += 1
if i >= limit:
break
if len(added_objects) > 0:
i = 0
for added_object in added_objects:
_print(bcolors.OKGREEN, "+ " + str(added_object), output)
i += 1
if i >= limit:
break