-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.py
96 lines (75 loc) · 3.54 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import numpy as np
import scipy.signal
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.normal import Normal
def combined_shape(length, shape=None):
if shape is None:
return (length,)
return (length, shape) if np.isscalar(shape) else (length, *shape)
def mlp(sizes, activation, output_activation=nn.Identity):
layers = []
for j in range(len(sizes)-1):
act = activation if j < len(sizes)-2 else output_activation
layers += [nn.Linear(sizes[j], sizes[j+1]), act()]
return nn.Sequential(*layers)
def count_vars(module):
return sum([np.prod(p.shape) for p in module.parameters()])
LOG_STD_MAX = 2
LOG_STD_MIN = -20
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = "cpu"
class SquashedGaussianMLPActor(nn.Module):
def __init__(self, obs_dim, act_dim, hidden_sizes, activation, act_limit):
super().__init__()
self.net = mlp([obs_dim] + list(hidden_sizes), activation, activation)
self.mu_layer = nn.Linear(hidden_sizes[-1], act_dim)
self.log_std_layer = nn.Linear(hidden_sizes[-1], act_dim)
self.act_limit = act_limit
def forward(self, obs, deterministic=False, with_logprob=True):
net_out = self.net(obs)
mu = self.mu_layer(net_out)
log_std = self.log_std_layer(net_out)
log_std = torch.clamp(log_std, LOG_STD_MIN, LOG_STD_MAX)
std = torch.exp(log_std)
# Pre-squash distribution and sample
pi_distribution = Normal(mu, std)
if deterministic:
# Only used for evaluating policy at test time.
pi_action = mu
else:
pi_action = pi_distribution.rsample()
if with_logprob:
# Compute logprob from Gaussian, and then apply correction for Tanh squashing.
# NOTE: The correction formula is a little bit magic. To get an understanding
# of where it comes from, check out the original SAC paper (arXiv 1801.01290)
# and look in appendix C. This is a more numerically-stable equivalent to Eq 21.
# Try deriving it yourself as a (very difficult) exercise. :)
logp_pi = pi_distribution.log_prob(pi_action).sum(axis=-1)
logp_pi -= (2*(np.log(2) - pi_action - F.softplus(-2*pi_action))).sum(axis=1)
else:
logp_pi = None
pi_action = torch.tanh(pi_action)
# pi_action = torch.sigmoid(pi_action)
pi_action = self.act_limit * (pi_action+1)/2
return pi_action, logp_pi
class MLPQFunction(nn.Module):
def __init__(self, obs_dim, act_dim, hidden_sizes, activation):
super().__init__()
self.q = mlp([obs_dim + act_dim] + list(hidden_sizes) + [1], activation)
def forward(self, obs, act):
q = self.q(torch.cat([obs, act], dim=-1))
return torch.squeeze(q, -1) # Critical to ensure q has right shape.
class MLPActorCritic(nn.Module):
def __init__(self, obs_dim, act_dim, act_limit, hidden_sizes=(256,256),
activation=nn.ReLU, agent_num = 3):
super().__init__()
# build policy and value functions
self.pi = SquashedGaussianMLPActor(obs_dim, act_dim, hidden_sizes, activation, act_limit)
self.q1 = MLPQFunction(obs_dim*agent_num, act_dim*agent_num, hidden_sizes, activation)
self.q2 = MLPQFunction(obs_dim*agent_num, act_dim*agent_num, hidden_sizes, activation)
def act(self, obs, deterministic=False):
with torch.no_grad():
a, _ = self.pi(obs, deterministic, False)
return a