-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbadrbm.py
executable file
·128 lines (91 loc) · 3.8 KB
/
badrbm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#! /usr/bin/python3
import numpy
import scipy.special # for expit (a quick numpy-array-aware sigmoid)
# consider http://stackoverflow.com/questions/3985619/how-to-calculate-a-logistic-sigmoid-function-in-python
class RbmError(Exception):
pass
class rbm:
def __init__(self, i, j, rate, p=None): # i = |v|, j = |h|, p -> momentum
if i < 1:
raise RbmError("There must be at least one visible unit")
if j < 1:
raise RbmError("There must be at least one hidden unit")
if p is not None:
if p > 0.5:
raise RbmError("Momentum greater than 0.5 doesn't make sense.")
elif p <= 0.0:
raise RbmError("Zero-or-less momentum doesn't make sense (use momentum=None to turn momentum off).")
self.i = i # record |v| for error-checking
self.j = j # record |h| for error-checking
self.a = numpy.random.rand(i, 1) # visible biases
self.b = numpy.random.rand(j, 1) # hidden viases
self.W = numpy.random.rand(i,j)
self.rate = rate
self.p = p
self.last_dW = None
def get_energy(self, v, h):
if v.shape != (self.i, 1):
raise RbmError("wrong shape for v, should be (i, 1).")
if h.shape != (self.j, 1):
raise RbmError("wrong shape for h, should be (j, 1).")
E = -1.0 * self.a.T @ v
E -= self.b.T @ h
E -= v.T @ self.W @ h
return E
def get_v(self, h):
v_probs = scipy.special.expit(self.a + (self.W @ h))
v_vals = numpy.random.rand(self.i, 1)
# there has to be some craftier way to do this
# possibly using numpy.vectorize?
v_res = numpy.zeros((self.i, 1))
for i in range(self.i):
if v_vals[i] <= v_probs[i]:
v_res[i] = 1.0
return v_res
def get_h(self, v):
h_probs = scipy.special.expit(self.b + (self.W.T @ v))
#h_probs = scipy.special.expit(self.b + (v.T @ self.W).T) # trying to avoid an expensive matrix invert
h_vals = numpy.random.rand(self.j, 1)
h_res = numpy.zeros((self.j, 1))
for i in range(self.j):
if h_vals[i] < h_probs[i]:
h_res[i] = 1.0
return h_res
def get_updates(self, v):
h = self.get_h(v)
vprime = self.get_v(h)
hprime = self.get_h(vprime)
w_update = (v @ h.T) - (vprime @ hprime.T)
a_update = (v - vprime)
b_update = (h - hprime)
return w_update, a_update, b_update
def apply_update(self, v, rate=None):
if rate is None:
rate = self.rate
w_update, a_update, b_update = self.get_updates(v)
if self.p is not None:
self.W += (1.0 - self.p) * w_update * rate
if self.last_dW is not None:
self.W += self.p * self.last_dW * rate
self.last_dW = w_update
else:
self.W += w_update * rate
self.a += a_update * (rate / len(v))
self.b += b_update * (rate / len(v))
def get_samples(self, count, initial_visible=None):
samples = []
if initial_visible is None:
current_v = numpy.random.rand(self.i, 1)
else:
current_v = initial_visible
generated = 0;
while generated < count:
current_h = self.get_h(current_v)
current_v = self.get_v(current_h)
samples.append( current_v )
generated += 1
return samples
# I determined that W.T @ v = v @ W in the lasiest and dumbest way possible
# I asked my brother, who is a methematician
# with one proviso: at least in numpy, v.T @ w produces a row-vector, and w.T @ v produces a column vector.
# simple test shows that it's actually faster with the matrix transpose? It is a small matrix in the test, hmm.