-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDCRNNModel.py
executable file
·99 lines (93 loc) · 4.86 KB
/
DCRNNModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import random
import torch
import torch.nn as nn
from DCRNNCell import DCGRUCell
class DCRNN(nn.Module):
def __init__(self, supports, num_node, input_dim, hidden_dim, order, num_layers=1):
super(DCRNN, self).__init__()
assert num_layers >= 1, 'At least one DCRNN layer in the Encoder.'
self.num_layers = num_layers
self.dcrnn_cells = nn.ModuleList()
self.dcrnn_cells.append(DCGRUCell(supports, num_node, input_dim, hidden_dim, order))
for _ in range(1, num_layers):
self.dcrnn_cells.append(DCGRUCell(supports, num_node, hidden_dim, hidden_dim, order))
def forward(self, x, init_state):
#shape of x: (B, T, N, D)
#shape of init_state: (num_layers, B, N, hidden_dim)
seq_length = x.shape[1]
current_inputs = x
output_hidden = []
for i in range(self.num_layers):
state = init_state[i]
inner_states = []
for t in range(seq_length):
state = self.dcrnn_cells[i](current_inputs[:, t, :, :], state)
inner_states.append(state)
output_hidden.append(state)
current_inputs = torch.stack(inner_states, dim=1)
#current_inputs: the outputs of last layer: (B, T, N, hidden_dim)
#output_hidden: the last state for each layer: (num_layers, B, N, hidden_dim)
#last_state: (B, N, hidden_dim)
return current_inputs, output_hidden
def init_hidden(self, batch_size):
init_states = []
for i in range(self.num_layers):
init_states.append(self.dcrnn_cells[i].init_hidden_state(batch_size))
return torch.stack(init_states, dim=0) #(num_layers, B, N, hidden_dim)
class Decoder(nn.Module):
def __init__(self, supports, num_node, input_dim, hidden_dim, order, num_layers):
super(Decoder, self).__init__()
assert num_layers >= 1, 'At least one DCRNN layer in the Encoder.'
self.num_node = num_node
self.num_layers = num_layers
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.decoder_cells = nn.ModuleList()
self.decoder_cells.append(DCGRUCell(supports, num_node, input_dim, hidden_dim, order))
for _ in range(1, num_layers):
self.decoder_cells.append(DCGRUCell(supports, num_node, hidden_dim, hidden_dim, order))
self.projection = nn.Linear(hidden_dim, input_dim)
def forward(self, inputs, init_state, teacher_forcing_ratio=0.5):
# shape of inputs: (B, T, N, D)
# shape of init_state: (num_layers, B, N, hidden_dim)
#if teacher_forcing=1, then teacher forcing in all steps
#if teacher_forcing=0, then no teacher forcing
seq_length = inputs.shape[1]
outputs = []
current_input = inputs[:, 0, :, :self.input_dim]
for t in range(seq_length-1):
new_state = []
for i in range(self.num_layers):
state = init_state[i]
state = self.decoder_cells[i](current_input, state)
current_input = state
new_state.append(state)
init_state = torch.stack(new_state, dim=0)
current_input = current_input.reshape(-1, self.hidden_dim) ## [B, N, dim_out] to [B*N, D]
current_input = self.projection(current_input)
current_input = current_input.reshape(-1, self.num_node, self.input_dim)
outputs.append(current_input)
#in the val and test phase, teacher_forcing_ratio=0
teacher_force = random.random() < teacher_forcing_ratio # a bool value
current_input = (inputs[:, t+1, :, :self.input_dim] if teacher_force else current_input)
return torch.stack(outputs, dim=1) #B, T, N, dim_in
class DCRNNModel(nn.Module):
def __init__(self, input_dim, out_dim, seq_len, horizon, num_node, hidden_dim, num_layers, supports, order):
super(DCRNNModel, self).__init__()
self.num_node = num_node
self.input_dim = input_dim
self.output_dim = out_dim
self.encoder = DCRNN(supports, num_node, input_dim, hidden_dim, order, num_layers)
self.decoder = Decoder(supports, num_node, out_dim, hidden_dim, order, num_layers)
def forward(self, source, targets, teaching_force=0.5):
#source: B, T_1, N, D
#target: B, T_2, N, D
init_state = self.encoder.init_hidden(source.shape[0])
_, encoder_hidden_state = self.encoder(source, init_state)
GO_Symbol = torch.zeros(targets.shape[0], 1, self.num_node, self.output_dim).to(targets.device)
targets_len = int(targets.shape[1])
#targets = torch.cat([GO_Symbol, targets], dim=1)[:, 0:targets_len, ...] # B, T, N, D to B, T, N, D
targets = torch.cat([GO_Symbol, targets], dim=1)
outputs = self.decoder(targets, encoder_hidden_state, teaching_force)
return outputs #B, T, N, D
#return outputs[:, 1:, :, :]