-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathrun.py
More file actions
215 lines (160 loc) · 8.56 KB
/
run.py
File metadata and controls
215 lines (160 loc) · 8.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import argparse
from datetime import datetime
from keras.models import model_from_json, load_model, save_model
from utils import load_MNIST, load_CIFAR
from utils import filter_val_set, get_trainable_layers
from utils import generate_adversarial, filter_correct_classifications
from coverages.idc import ImportanceDrivenCoverage
from coverages.neuron_cov import NeuronCoverage
from coverages.tkn import DeepGaugeLayerLevelCoverage
from coverages.kmn import DeepGaugePercentCoverage
from coverages.ss import SSCover
from coverages.sa import SurpriseAdequacy
__version__ = 0.9
def scale(intermediate_layer_output, rmax=1, rmin=0):
X_std = (intermediate_layer_output - intermediate_layer_output.min()) / (
intermediate_layer_output.max() - intermediate_layer_output.min())
X_scaled = X_std * (rmax - rmin) + rmin
return X_scaled
def by_indices(outs, indices):
return [[outs[i][0][indices]] for i in range(len(outs))]
def parse_arguments():
"""
Parse command line argument and construct the DNN
:return: a dictionary comprising the command-line arguments
"""
# define the program description
text = 'Coverage Analyzer for DNNs'
# initiate the parser
parser = argparse.ArgumentParser(description=text)
# add new command-line arguments
parser.add_argument("-V", "--version", help="show program version",
action="version", version="DeepFault %f" % __version__)
parser.add_argument("-M", "--model", help="Path to the model to be loaded.\
The specified model will be used.")#, required=True)
# choices=['lenet1','lenet4', 'lenet5'], required=True)
parser.add_argument("-DS", "--dataset", help="The dataset to be used (mnist\
or cifar10).", choices=["mnist","cifar10"])#, required=True)
parser.add_argument("-A", "--approach", help="the approach to be employed \
to measure coverage", choices=['idc','nc','kmnc',
'nbc','snac','tknc','ssc', 'lsa', 'dsa'])
parser.add_argument("-C", "--class", help="the selected class", type=int)
parser.add_argument("-Q", "--quantize", help="quantization granularity for \
combinatorial other_coverage_metrics.", type= int)
parser.add_argument("-L", "--layer", help="the subject layer's index for \
combinatorial cov. NOTE THAT ONLY TRAINABLE LAYERS CAN \
BE SELECTED", type= int)
parser.add_argument("-KS", "--k_sections", help="number of sections used in \
k multisection other_coverage_metrics", type=int)
parser.add_argument("-KN", "--k_neurons", help="number of neurons used in \
top k neuron other_coverage_metrics", type=int)
parser.add_argument("-RN", "--rel_neurons", help="number of neurons considered\
as relevant in combinatorial other_coverage_metrics", type=int)
parser.add_argument("-AT", "--act_threshold", help="a threshold value used\
to consider if a neuron is activated or not.", type=float)
parser.add_argument("-R", "--repeat", help="index of the repeating. (for\
the cases where you need to run the same experiments \
multiple times)", type=int)
parser.add_argument("-LOG", "--logfile", help="path to log file")
parser.add_argument("-ADV", "--advtype", help="path to log file")
# parse command-line arguments
# parse command-line arguments
args = parser.parse_args()
return vars(args)
if __name__ == "__main__":
args = parse_arguments()
model_path = args['model'] if args['model'] else 'neural_networks/LeNet5'
dataset = args['dataset'] if args['dataset'] else 'mnist'
approach = args['approach'] if args['approach'] else 'idc'
num_rel_neurons= args['rel_neurons'] if args['rel_neurons'] else 2
act_threshold = args['act_threshold'] if args['act_threshold'] else 0
top_k = args['k_neurons'] if args['k_neurons'] else 3
k_sect = args['k_sections'] if args['k_sections'] else 1000
selected_class = args['class'] if not args['class']==None else -1 #ALL CLASSES
repeat = args['repeat'] if args['repeat'] else 1
logfile_name = args['logfile'] if args['logfile'] else 'result.log'
quantization_granularity = args['quantize'] if args['quantize'] else 3
adv_type = args['advtype'] if args['advtype'] else 'fgsm'
logfile = open(logfile_name, 'a')
####################
# 0) Load data
if dataset == 'mnist':
X_train, Y_train, X_test, Y_test = load_MNIST(channel_first=False)
img_rows, img_cols = 28, 28
else:
X_train, Y_train, X_test, Y_test = load_CIFAR()
img_rows, img_cols = 32, 32
if not selected_class == -1:
X_train, Y_train = filter_val_set(selected_class, X_train, Y_train) #Get training input for selected_class
X_test, Y_test = filter_val_set(selected_class, X_test, Y_test) #Get testing input for selected_class
####################
# 1) Setup the model
model_name = model_path.split('/')[-1]
try:
json_file = open(model_path + '.json', 'r') #Read Keras model parameters (stored in JSON file)
file_content = json_file.read()
json_file.close()
model = model_from_json(file_content)
model.load_weights(model_path + '.h5')
# Compile the model before using
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
except:
model = load_model(model_path + '.h5')
# 2) Load necessary information
trainable_layers = get_trainable_layers(model)
non_trainable_layers = list(set(range(len(model.layers))) - set(trainable_layers))
print('Trainable layers: ' + str(trainable_layers))
print('Non trainable layers: ' + str(non_trainable_layers))
experiment_folder = 'experiments'
#Investigate the penultimate layer
subject_layer = args['layer'] if not args['layer'] == None else -1
subject_layer = trainable_layers[subject_layer]
skip_layers = [0] #SKIP LAYERS FOR NC, KMNC, NBC etc.
for idx, lyr in enumerate(model.layers):
if 'flatten' in lyr.__class__.__name__.lower(): skip_layers.append(idx)
print("Skipping layers:", skip_layers)
####################
# 3) Analyze Coverages
if approach == 'nc':
nc = NeuronCoverage(model, threshold=.75, skip_layers = skip_layers) #SKIP ONLY INPUT AND FLATTEN LAYERS
coverage, _, _, _, _ = nc.test(X_test)
print("Your test set's coverage is: ", coverage)
nc.set_measure_state(nc.get_measure_state())
elif approach == 'idc':
print("\nRunning IDC for %d relevant neurons" % (num_rel_neurons))
X_train_corr, Y_train_corr, _, _, = filter_correct_classifications(model,
X_train,
Y_train)
idc = ImportanceDrivenCoverage(model, model_name, num_rel_neurons, selected_class,
subject_layer, X_train_corr, Y_train_corr)
coverage, covered_combinations, max_comb = idc.test(X_test)
print("Analysed %d test inputs" % len(Y_train_corr))
print("IDC test set coverage: %.2f%% " % (coverage))
print("Covered combinations: ", len(covered_combinations))
print("Total combinations: ", max_comb)
idc.set_measure_state(covered_combinations)
elif approach == 'kmnc' or approach == 'nbc' or approach == 'snac':
res = {
'model_name': model_name,
'num_section': k_sect,
}
dg = DeepGaugePercentCoverage(model, k_sect, X_train, None, skip_layers)
score = dg.test(X_test)
elif approach == 'tknc':
dg = DeepGaugeLayerLevelCoverage(model, top_k, skip_layers=skip_layers)
orig_coverage, _, _, _, _, orig_incrs = dg.test(X_test)
_, orig_acc = model.evaluate(X_test, Y_test)
elif approach == 'ssc':
ss = SSCover(model, skip_layers=non_trainable_layers)
score = ss.test(X_test)
print("Your test set's coverage is: ", score)
elif approach == 'lsa' or approach == 'dsa':
upper_bound = 2000
layer_names = [model.layers[-3].name]
#for lyr in model.layers:
# layer_names.append(lyr.name)
sa = SurpriseAdequacy(model, X_train, layer_names, upper_bound, dataset)
sa.test(X_test, approach)
logfile.close()