Skip to content

Commit 95d2789

Browse files
committed
adding codes
vae does not work
1 parent 7ff326f commit 95d2789

2 files changed

Lines changed: 313 additions & 0 deletions

File tree

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import numpy as np
2+
import matplotlib.pyplot as plt
3+
import gzip, pickle, os, urllib.request
4+
5+
# ===== Utility functions =====
6+
def load_mnist():
7+
url = 'http://deeplearning.net/data/mnist/mnist.pkl.gz'
8+
fname = 'mnist.pkl.gz'
9+
if not os.path.exists(fname):
10+
urllib.request.urlretrieve(url, fname)
11+
with gzip.open(fname, 'rb') as f:
12+
train_set, _, _ = pickle.load(f, encoding='latin1')
13+
X, _ = train_set
14+
return X.astype(np.float32)
15+
16+
def linear_beta_schedule(timesteps, beta_start=1e-4, beta_end=0.02):
17+
return np.linspace(beta_start, beta_end, timesteps)
18+
19+
def sigmoid(x): return 1 / (1 + np.exp(-x))
20+
def relu(x): return np.maximum(0, x)
21+
22+
# ===== Neural network for epsilon_theta =====
23+
class Dense:
24+
def __init__(self, in_dim, out_dim, activation='relu'):
25+
self.W = np.random.randn(in_dim, out_dim) * 0.01
26+
self.b = np.zeros(out_dim)
27+
self.activation = activation
28+
29+
def forward(self, x):
30+
self.input = x
31+
self.z = x @ self.W + self.b
32+
if self.activation == 'relu':
33+
self.out = relu(self.z)
34+
elif self.activation == 'linear':
35+
self.out = self.z
36+
return self.out
37+
38+
def backward(self, grad_out, lr):
39+
if self.activation == 'relu':
40+
grad = grad_out * (self.z > 0).astype(float)
41+
else:
42+
grad = grad_out
43+
44+
dW = self.input.T @ grad
45+
db = np.sum(grad, axis=0)
46+
self.W -= lr * dW
47+
self.b -= lr * db
48+
return grad @ self.W.T
49+
50+
class DenoiseMLP:
51+
def __init__(self, input_dim, hidden_dims):
52+
dims = [input_dim] + hidden_dims + [input_dim]
53+
self.layers = [Dense(dims[i], dims[i+1], 'relu' if i < len(dims)-2 else 'linear') for i in range(len(dims)-1)]
54+
55+
def forward(self, x):
56+
for layer in self.layers:
57+
x = layer.forward(x)
58+
return x
59+
60+
def backward(self, grad, lr):
61+
for layer in reversed(self.layers):
62+
grad = layer.backward(grad, lr)
63+
64+
# ===== Variational Diffusion Model =====
65+
class DiffusionModel:
66+
def __init__(self, img_dim, timesteps=1000, hidden_dims=[512, 256], lr=1e-3):
67+
self.T = timesteps
68+
self.beta = linear_beta_schedule(self.T)
69+
self.alpha = 1.0 - self.beta
70+
self.alpha_bar = np.cumprod(self.alpha)
71+
72+
self.model = DenoiseMLP(input_dim=img_dim, hidden_dims=hidden_dims)
73+
self.lr = lr
74+
self.img_dim = img_dim
75+
76+
def q_sample(self, x0, t, noise=None):
77+
if noise is None:
78+
noise = np.random.randn(*x0.shape)
79+
sqrt_alpha_bar = np.sqrt(self.alpha_bar[t])[:, None]
80+
sqrt_one_minus_alpha_bar = np.sqrt(1 - self.alpha_bar[t])[:, None]
81+
return sqrt_alpha_bar * x0 + sqrt_one_minus_alpha_bar * noise
82+
83+
def train_step(self, x):
84+
N = x.shape[0]
85+
t = np.random.randint(0, self.T, size=N)
86+
noise = np.random.randn(*x.shape)
87+
xt = self.q_sample(x, t, noise)
88+
pred_noise = self.model.forward(xt)
89+
90+
loss = np.mean((pred_noise - noise) ** 2)
91+
grad = 2 * (pred_noise - noise) / N
92+
self.model.backward(grad, self.lr)
93+
return loss
94+
95+
def train(self, data, epochs=10, batch_size=128):
96+
for epoch in range(epochs):
97+
perm = np.random.permutation(len(data))
98+
total_loss = 0
99+
for i in range(0, len(data), batch_size):
100+
x = data[perm[i:i+batch_size]]
101+
total_loss += self.train_step(x)
102+
print(f"Epoch {epoch+1} Loss: {total_loss:.4f}")
103+
104+
def p_sample(self, xt, t):
105+
pred_noise = self.model.forward(xt)
106+
alpha = self.alpha[t]
107+
alpha_bar = self.alpha_bar[t]
108+
beta = self.beta[t]
109+
110+
coef1 = 1 / np.sqrt(alpha)
111+
coef2 = (1 - alpha) / np.sqrt(1 - alpha_bar)
112+
mean = coef1 * (xt - coef2 * pred_noise)
113+
114+
if t > 0:
115+
noise = np.random.randn(*xt.shape)
116+
else:
117+
noise = 0
118+
return mean + np.sqrt(beta) * noise
119+
120+
def sample(self, n=16):
121+
xt = np.random.randn(n, self.img_dim)
122+
for t in reversed(range(self.T)):
123+
xt = self.p_sample(xt, t)
124+
return xt
125+
126+
# ===== Visualization =====
127+
def plot_images(samples, n=8):
128+
fig, axs = plt.subplots(1, n, figsize=(n, 1.5))
129+
for i in range(n):
130+
axs[i].imshow(samples[i].reshape(28, 28), cmap='gray')
131+
axs[i].axis('off')
132+
plt.suptitle("Generated Samples")
133+
plt.show()
134+
135+
# ===== Run full example =====
136+
if __name__ == "__main__":
137+
X = load_mnist()[:5000]
138+
model = DiffusionModel(img_dim=784, timesteps=100, hidden_dims=[256, 128], lr=1e-3)
139+
model.train(X, epochs=10, batch_size=128)
140+
samples = model.sample(n=8)
141+
plot_images(samples)

doc/Programs/VAE/vae2.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import numpy as np
2+
import matplotlib.pyplot as plt
3+
import gzip
4+
import pickle
5+
import urllib.request
6+
import os
7+
8+
# ----- Utility functions -----
9+
def load_mnist(normalize=True):
10+
url = 'http://deeplearning.net/data/mnist/mnist.pkl.gz'
11+
filename = 'mnist.pkl.gz'
12+
if not os.path.exists(filename):
13+
urllib.request.urlretrieve(url, filename)
14+
with gzip.open(filename, 'rb') as f:
15+
train_set, _, _ = pickle.load(f, encoding='latin1')
16+
X, _ = train_set
17+
if normalize:
18+
X = X.astype(np.float32)
19+
return X
20+
21+
def sigmoid(x):
22+
return 1 / (1 + np.exp(-x))
23+
24+
def sigmoid_deriv(x):
25+
s = sigmoid(x)
26+
return s * (1 - s)
27+
28+
# ----- Layer -----
29+
class Dense:
30+
def __init__(self, in_dim, out_dim, activation='sigmoid'):
31+
self.W = np.random.randn(in_dim, out_dim) * 0.01
32+
self.b = np.zeros(out_dim)
33+
self.activation = activation
34+
self.input = None
35+
self.z = None
36+
37+
def forward(self, x):
38+
self.input = x
39+
self.z = x @ self.W + self.b
40+
if self.activation == 'sigmoid':
41+
return sigmoid(self.z)
42+
elif self.activation == 'linear':
43+
return self.z
44+
elif self.activation == 'relu':
45+
return np.maximum(0, self.z)
46+
47+
def backward(self, grad_output, learning_rate):
48+
if self.activation == 'sigmoid':
49+
grad = grad_output * sigmoid_deriv(self.z)
50+
elif self.activation == 'relu':
51+
grad = grad_output * (self.z > 0).astype(float)
52+
else:
53+
grad = grad_output
54+
grad_W = self.input.T @ grad
55+
grad_b = np.sum(grad, axis=0)
56+
self.W -= learning_rate * grad_W
57+
self.b -= learning_rate * grad_b
58+
return grad @ self.W.T
59+
60+
# ----- VAE -----
61+
class VAE:
62+
def __init__(self, input_dim=784, hidden_dims=[256], latent_dim=2, learning_rate=0.01):
63+
self.encoder_layers = [Dense(input_dim, hidden_dims[0])]
64+
for i in range(1, len(hidden_dims)):
65+
self.encoder_layers.append(Dense(hidden_dims[i - 1], hidden_dims[i]))
66+
self.W_mu = Dense(hidden_dims[-1], latent_dim, activation='linear')
67+
self.W_logvar = Dense(hidden_dims[-1], latent_dim, activation='linear')
68+
69+
self.decoder_layers = [Dense(latent_dim, hidden_dims[-1])]
70+
for i in range(len(hidden_dims) - 1, 0, -1):
71+
self.decoder_layers.append(Dense(hidden_dims[i], hidden_dims[i - 1]))
72+
self.decoder_layers.append(Dense(hidden_dims[0], input_dim, activation='sigmoid'))
73+
74+
self.learning_rate = learning_rate
75+
76+
def encode(self, x):
77+
h = x
78+
for layer in self.encoder_layers:
79+
h = layer.forward(h)
80+
mu = self.W_mu.forward(h)
81+
logvar = self.W_logvar.forward(h)
82+
return mu, logvar
83+
84+
def reparameterize(self, mu, logvar):
85+
std = np.exp(0.5 * logvar)
86+
eps = np.random.randn(*mu.shape)
87+
return mu + eps * std
88+
89+
def decode(self, z):
90+
h = z
91+
for layer in self.decoder_layers:
92+
h = layer.forward(h)
93+
return h
94+
95+
def loss(self, recon_x, x, mu, logvar):
96+
mse = np.mean((recon_x - x) ** 2)
97+
kl = -0.5 * np.mean(1 + logvar - mu ** 2 - np.exp(logvar))
98+
return mse + kl
99+
100+
def train_step(self, x):
101+
# Forward
102+
mu, logvar = self.encode(x)
103+
z = self.reparameterize(mu, logvar)
104+
x_recon = self.decode(z)
105+
loss = self.loss(x_recon, x, mu, logvar)
106+
107+
# Backward
108+
grad = 2 * (x_recon - x) / x.shape[0]
109+
for layer in reversed(self.decoder_layers):
110+
grad = layer.backward(grad, self.learning_rate)
111+
112+
# Gradients for latent
113+
h = self.encoder_layers[-1].z
114+
grad_mu = (mu / x.shape[0])
115+
grad_logvar = 0.5 * (np.exp(logvar) - 1) / x.shape[0]
116+
117+
grad_latent = grad_mu + grad_logvar
118+
self.W_mu.backward(grad_mu, self.learning_rate)
119+
self.W_logvar.backward(grad_logvar, self.learning_rate)
120+
121+
for layer in reversed(self.encoder_layers):
122+
grad_latent = layer.backward(grad_latent, self.learning_rate)
123+
124+
return loss
125+
126+
def train(self, X, epochs=10, batch_size=64):
127+
for epoch in range(epochs):
128+
perm = np.random.permutation(X.shape[0])
129+
total_loss = 0
130+
for i in range(0, X.shape[0], batch_size):
131+
batch = X[perm[i:i+batch_size]]
132+
total_loss += self.train_step(batch)
133+
print(f"Epoch {epoch+1} Loss: {total_loss:.4f}")
134+
135+
def reconstruct(self, x):
136+
mu, logvar = self.encode(x)
137+
z = self.reparameterize(mu, logvar)
138+
return self.decode(z)
139+
140+
def sample(self, n_samples=10):
141+
z = np.random.randn(n_samples, self.W_mu.b.shape[0])
142+
return self.decode(z)
143+
144+
# ----- Visualize -----
145+
def plot_reconstructions(vae, X, n=10):
146+
recon = vae.reconstruct(X[:n])
147+
fig, axs = plt.subplots(2, n, figsize=(n, 2))
148+
for i in range(n):
149+
axs[0, i].imshow(X[i].reshape(28, 28), cmap='gray')
150+
axs[0, i].axis('off')
151+
axs[1, i].imshow(recon[i].reshape(28, 28), cmap='gray')
152+
axs[1, i].axis('off')
153+
axs[0, 0].set_title('Original')
154+
axs[1, 0].set_title('Reconstructed')
155+
plt.show()
156+
157+
def plot_generated(vae, n=10):
158+
samples = vae.sample(n)
159+
fig, axs = plt.subplots(1, n, figsize=(n, 1.5))
160+
for i in range(n):
161+
axs[i].imshow(samples[i].reshape(28, 28), cmap='gray')
162+
axs[i].axis('off')
163+
plt.suptitle('Generated Samples')
164+
plt.show()
165+
166+
# ----- Run on MNIST -----
167+
if __name__ == "__main__":
168+
X = load_mnist()[:10000]
169+
vae = VAE(input_dim=784, hidden_dims=[128, 64], latent_dim=2, learning_rate=0.05)
170+
vae.train(X, epochs=10, batch_size=128)
171+
plot_reconstructions(vae, X)
172+
plot_generated(vae)

0 commit comments

Comments
 (0)