-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathcapsulenet.py
More file actions
203 lines (174 loc) · 10.6 KB
/
capsulenet.py
File metadata and controls
203 lines (174 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import numpy as np
from keras import layers,optimizers
from keras.models import *
from keras import backend as K
from keras.utils import to_categorical
from keras import callbacks
from keras.callbacks import EarlyStopping
from keras.layers import Dropout,Activation
K.set_image_data_format('channels_last')
from capsulelayers import CapsuleLayer,CapsuleLayer_nogradient_stop,PrimaryCap, Length, Mask
from keras.engine.topology import Layer
from keras.regularizers import l1,l2,l1_l2
from keras.layers.normalization import BatchNormalization
class LearningRate(callbacks.Callback):
def on_train_begin(self, logs={}):
self.learningrate = 0
def on_epoch_end(self, epoch, logs={}):
optimizer = self.model.optimizer
lr = K.eval(optimizer.lr * (1. / (1. + optimizer.decay * optimizer.iterations)))
self.learningrate=lr
class Extract_outputs(Layer):
def __init__(self,outputdim, **kwargs):
#self.input_spec = [InputSpec(ndim='3+')]
self.outputdim=outputdim
super(Extract_outputs, self).__init__(**kwargs)
def compute_output_shape(self, input_shape):
return tuple([None,input_shape[1], self.outputdim])
def call(self, x, mask=None):
x=x[:,:,:self.outputdim]
#return K.batch_flatten(x)
return x
class Extract_weight_c(Layer):
def __init__(self,outputdim, **kwargs):
#self.input_spec = [InputSpec(ndim='3+')]
self.outputdim=outputdim
super(Extract_weight_c, self).__init__(**kwargs)
def compute_output_shape(self, input_shape):
return tuple([None,input_shape[1], input_shape[-1]-self.outputdim])
def call(self, x, mask=None):
x=x[:,:,self.outputdim:]
#return K.batch_flatten(x)
return x
def custom_binary_crossentropy(y_true, y_pred):
return K.mean(K.binary_crossentropy(K.flatten(y_pred), K.flatten(y_true)), axis=-1)
def CapsNet(input_shape, n_class, routings,modeltype,power=2):
if modeltype == "nogradientstop":
return CapsNet_nogradientstop(input_shape,n_class,routings)
if modeltype == "nogradientstop_crossentropy":
return CapsNet_nogradientstop_crossentropy(input_shape,n_class,routings)
def CapsNet_nogradientstop(input_shape, n_class, routings): # best testing results! val 0.13xx testX cnn1 200 1 cnn2 150 9 drop1 0.68 drop20.68 n_channels 50 kernel_size 20,dropout1
x = layers.Input(shape=input_shape)
conv1 = layers.Conv1D(filters=200, kernel_size=1, strides=1, padding='valid', kernel_initializer='he_normal',activation='relu', name='conv1')(x)
#conv1=BatchNormalization()(conv1)
conv1 = Dropout(0.7)(conv1)
conv2 = layers.Conv1D(filters=200, kernel_size=9, strides=1, padding='valid', kernel_initializer='he_normal',activation='relu', name='conv2')(conv1)
#conv1=BatchNormalization()(conv1)
conv2 = Dropout(0.75)(conv2) #0.75 valx loss has 0.1278!
primarycaps = PrimaryCap(conv2, dim_capsule=8, n_channels=60, kernel_size=20, kernel_initializer='he_normal',strides=1, padding='valid',dropout=0.2)
dim_capsule_dim2=10
#Capsule layer. Routing algorithm works here.
digitcaps_c = CapsuleLayer_nogradient_stop(num_capsule=n_class, dim_capsule=dim_capsule_dim2, num_routing=routings,name='digitcaps',kernel_initializer='he_normal',dropout=0.1)(primarycaps)
#digitcaps_c = CapsuleLayer(num_capsule=n_class, dim_capsule=dim_capsule_dim2, num_routing=routings,name='digitcaps',kernel_initializer='he_normal')(primarycaps)
digitcaps = Extract_outputs(dim_capsule_dim2)(digitcaps_c)
weight_c = Extract_weight_c(dim_capsule_dim2)(digitcaps_c)
out_caps = Length(name='capsnet')(digitcaps)
# Decoder network.
y = layers.Input(shape=(n_class,))
masked_by_y = Mask()([digitcaps, y]) # The true label is used to mask the output of capsule layer. For training
masked = Mask()(digitcaps) # Mask using the capsule with maximal length. For prediction
# Shared Decoder model in training and prediction
decoder = Sequential(name='decoder')
decoder.add(layers.Dense(512, activation='relu', input_dim=dim_capsule_dim2*n_class))
decoder.add(layers.Dense(1024, activation='relu'))
decoder.add(layers.Dense(np.prod(input_shape), activation='sigmoid'))
decoder.add(layers.Reshape(target_shape=input_shape, name='out_recon'))
# Models for training and evaluation (prediction)
train_model = Model([x, y], [out_caps, decoder(masked_by_y)])
eval_model = Model(x, [out_caps, decoder(masked)])
weight_c_model = Model(x,weight_c)
# manipulate model
noise = layers.Input(shape=(n_class, dim_capsule_dim2))
noised_digitcaps = layers.Add()([digitcaps, noise])
masked_noised_y = Mask()([noised_digitcaps, y])
manipulate_model = Model([x, y, noise], decoder(masked_noised_y))
return train_model, eval_model, manipulate_model,weight_c_model
def CapsNet_nogradientstop_crossentropy(input_shape, n_class, routings): # best testing results! val 0.13xx testX cnn1 200 1 cnn2 150 9 drop1 0.68 drop20.68 n_channels 50 kernel_size 20,dropout1
x = layers.Input(shape=input_shape)
conv1 = layers.Conv1D(filters=200, kernel_size=1, strides=1, padding='valid', kernel_initializer='he_normal',activation='relu', name='conv1')(x)
#conv1=BatchNormalization()(conv1)
conv1 = Dropout(0.7)(conv1)
conv2 = layers.Conv1D(filters=200, kernel_size=9, strides=1, padding='valid', kernel_initializer='he_normal',activation='relu', name='conv2')(conv1)
#conv1=BatchNormalization()(conv1)
conv2 = Dropout(0.75)(conv2) #0.75 valx loss has 0.1278!
primarycaps = PrimaryCap(conv2, dim_capsule=8, n_channels=60, kernel_size=20, kernel_initializer='he_normal',strides=1, padding='valid',dropout=0.2)
dim_capsule_dim2=10
# Capsule layer. Routing algorithm works here.
digitcaps_c = CapsuleLayer_nogradient_stop(num_capsule=n_class, dim_capsule=dim_capsule_dim2, num_routing=routings,name='digitcaps',kernel_initializer='he_normal',dropout=0.1)(primarycaps)
#digitcaps_c = CapsuleLayer(num_capsule=n_class, dim_capsule=dim_capsule_dim2, num_routing=routings,name='digitcaps',kernel_initializer='he_normal')(primarycaps)
digitcaps = Extract_outputs(dim_capsule_dim2)(digitcaps_c)
weight_c = Extract_weight_c(dim_capsule_dim2)(digitcaps_c)
out_caps = Length()(digitcaps)
out_caps = Activation('softmax',name='capsnet')(out_caps)
# Decoder network.
y = layers.Input(shape=(n_class,))
masked_by_y = Mask()([digitcaps, y]) # The true label is used to mask the output of capsule layer. For training
masked = Mask()(digitcaps) # Mask using the capsule with maximal length. For prediction
# Shared Decoder model in training and prediction
decoder = Sequential(name='decoder')
decoder.add(layers.Dense(512, activation='relu', input_dim=dim_capsule_dim2*n_class))
decoder.add(layers.Dense(1024, activation='relu'))
decoder.add(layers.Dense(np.prod(input_shape), activation='sigmoid'))
decoder.add(layers.Reshape(target_shape=input_shape, name='out_recon'))
# Models for training and evaluation (prediction)
train_model = Model([x, y], [out_caps, decoder(masked_by_y)])
eval_model = Model(x, [out_caps, decoder(masked)])
weight_c_model = Model(x,weight_c)
# manipulate model
noise = layers.Input(shape=(n_class, dim_capsule_dim2))
noised_digitcaps = layers.Add()([digitcaps, noise])
masked_noised_y = Mask()([noised_digitcaps, y])
manipulate_model = Model([x, y, noise], decoder(masked_noised_y))
return train_model, eval_model, manipulate_model,weight_c_model
def margin_loss(y_true, y_pred):
"""
Margin loss for Eq.(4). When y_true[i, :] contains not just one `1`, this loss should work too. Not test it.
:param y_true: [None, n_classes]
:param y_pred: [None, num_capsule]
:return: a scalar loss value.
"""
L = y_true * K.square(K.maximum(0., 0.9 - y_pred)) + \
0.5 * (1 - y_true) * K.square(K.maximum(0., y_pred - 0.1))
return K.mean(K.sum(L, 1))
def speard_loss(m):
def loss(y_true,y_pred):
L = K.square(K.maximum(0.,m-(y_true-y_pred)))
L=K.mean(K.sum(L, 1))-m**2
return L
return loss
def Capsnet_main(trainX,trainY,valX=None,valY=None,nb_classes=2,nb_epoch=500,earlystop=None,weights=None,compiletimes=0,compilemodels=None,lr=0.001,lrdecay=1,batch_size=500,lam_recon=0.392,routings=3,modeltype=5,class_weight=None,activefun='linear',power=2,predict=False):
print(trainX.shape)
if len(trainX.shape)>3:
trainX.shape=(trainX.shape[0],trainX.shape[2],trainX.shape[3])
if(valX is not None):
print(valX.shape)
if len(valX.shape)>3:
valX.shape=(valX.shape[0],valX.shape[2],valX.shape[3])
if(earlystop is not None):#use early_stop to control nb_epoch there must contain a validation if not provided will select one
early_stopping = EarlyStopping(monitor='val_capsnet_loss', patience=earlystop)
nb_epoch=10000
lr_decay = callbacks.LearningRateScheduler(schedule=lambda epoch: lr * (lrdecay ** epoch))
if compiletimes==0:
model, eval_model, manipulate_model,weight_c_model = CapsNet(input_shape=trainX.shape[1:],n_class=nb_classes,routings=routings,modeltype=modeltype)
if "crossentropy" in modeltype:
model.compile(optimizer=optimizers.Adam(lr=lr,epsilon=1e-08),loss=['binary_crossentropy', 'mse'],loss_weights=[1., lam_recon],metrics={'capsnet': 'accuracy'})
else:
model.compile(optimizer=optimizers.Adam(lr=lr,epsilon=1e-08),loss=[margin_loss, 'mse'],loss_weights=[1., lam_recon],metrics={'capsnet': 'accuracy'})
else:
model=compilemodels[0]
eval_model=compilemodels[1]
manipulate_model=compilemodels[2]
weight_c_model=compilemodels[3]
if(predict is False):
if(weights is not None and compiletimes==0):
print("load weights:"+weights);
model.load_weights(weights)
if valX is not None:
if(earlystop is None):
history=model.fit([trainX, trainY], [trainY, trainX], batch_size=batch_size, epochs=nb_epoch,validation_data=[[valX, valY], [valY, valX]],class_weight=class_weight,callbacks=[lr_decay])
else:
history=model.fit([trainX, trainY], [trainY, trainX], batch_size=batch_size, epochs=nb_epoch,validation_data=[[valX, valY], [valY, valX]],callbacks=[early_stopping,lr_decay],class_weight=class_weight)
else:
history=model.fit([trainX, trainY], [trainY, trainX], batch_size=batch_size, epochs=nb_epoch,class_weight=class_weight,callbacks=[lr_decay])
return model,eval_model,manipulate_model,weight_c_model,history
return model,eval_model