-
Notifications
You must be signed in to change notification settings - Fork 34
/
gru.py
73 lines (58 loc) · 3.36 KB
/
gru.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#pylint: skip-file
import numpy as np
import theano
import theano.tensor as T
from utils_pg import *
class GRULayer(object):
def __init__(self, rng, layer_id, shape, X, mask, is_train = 1, batch_size = 1, p = 0.5):
prefix = "GRU_"
layer_id = "_" + layer_id
self.in_size, self.out_size = shape
self.W_xr = init_weights((self.in_size, self.out_size), prefix + "W_xr" + layer_id)
self.W_hr = init_weights((self.out_size, self.out_size), prefix + "W_hr" + layer_id)
self.b_r = init_bias(self.out_size, prefix + "b_r" + layer_id)
self.W_xz = init_weights((self.in_size, self.out_size), prefix + "W_xz" + layer_id)
self.W_hz = init_weights((self.out_size, self.out_size), prefix + "W_hz" + layer_id)
self.b_z = init_bias(self.out_size, prefix + "b_z" + layer_id)
self.W_xh = init_weights((self.in_size, self.out_size), prefix + "W_xh" + layer_id)
self.W_hh = init_weights((self.out_size, self.out_size), prefix + "W_hh" + layer_id)
self.b_h = init_bias(self.out_size, prefix + "b_h" + layer_id)
self.X = X
self.M = mask
def _active_mask(x, m, pre_h):
x = T.reshape(x, (batch_size, self.in_size))
pre_h = T.reshape(pre_h, (batch_size, self.out_size))
r = T.nnet.sigmoid(T.dot(x, self.W_xr) + T.dot(pre_h, self.W_hr) + self.b_r)
z = T.nnet.sigmoid(T.dot(x, self.W_xz) + T.dot(pre_h, self.W_hz) + self.b_z)
gh = T.tanh(T.dot(x, self.W_xh) + T.dot(r * pre_h, self.W_hh) + self.b_h)
h = (1 - z) * pre_h + z * gh
h = h * m[:, None] + (1 - m[:, None]) * pre_h
h = T.reshape(h, (1, batch_size * self.out_size))
return h
h, updates = theano.scan(_active_mask, sequences = [self.X, self.M],
outputs_info = [T.alloc(floatX(0.), 1, batch_size * self.out_size)])
# dic to matrix
h = T.reshape(h, (self.X.shape[0], batch_size * self.out_size))
# dropout
if p > 0:
srng = T.shared_randomstreams.RandomStreams(rng.randint(999999))
drop_mask = srng.binomial(n = 1, p = 1-p, size = h.shape, dtype = theano.config.floatX)
self.activation = T.switch(T.eq(is_train, 1), h * drop_mask, h * (1 - p))
else:
self.activation = T.switch(T.eq(is_train, 1), h, h)
self.params = [self.W_xr, self.W_hr, self.b_r,
self.W_xz, self.W_hz, self.b_z,
self.W_xh, self.W_hh, self.b_h]
def _active(self, x, pre_h):
r = T.nnet.sigmoid(T.dot(x, self.W_xr) + T.dot(pre_h, self.W_hr) + self.b_r)
z = T.nnet.sigmoid(T.dot(x, self.W_xz) + T.dot(pre_h, self.W_hz) + self.b_z)
gh = T.tanh(T.dot(x, self.W_xh) + T.dot(r * pre_h, self.W_hh) + self.b_h)
h = z * pre_h + (1 - z) * gh
return h
class BdGRU(object):
# Bidirectional GRU Layer.
def __init__(self, rng, layer_id, shape, X, mask, is_train = 1, batch_size = 1, p = 0.5):
fwd = GRULayer(rng, "_fwd_" + layer_id, shape, X, mask, is_train, batch_size, p)
bwd = GRULayer(rng, "_bwd_" + layer_id, shape, X[::-1], mask[::-1], is_train, batch_size, p)
self.params = fwd.params + bwd.params
self.activation = T.concatenate([fwd.activation, bwd.activation[::-1]], axis=1)