forked from mttga/pymarl_transformers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyplot_animator.py
175 lines (130 loc) · 6.05 KB
/
pyplot_animator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import numpy as np
# for plot
import logging
logging.getLogger('matplotlib').setLevel(logging.CRITICAL)
from matplotlib import pyplot as plt
from matplotlib import cm
import matplotlib.animation as animation
from matplotlib.ticker import MaxNLocator
import contextlib
"""
TODO
Adapt to mpe (currently is the uat animator)
"""
class MPEAnimator(animation.TimedAnimation):
def __init__(self,
agent_positions,
landmark_positions,
episode_rewards,
mask_agents=False):
# general parameters
self.frames = (agent_positions.shape[1])
self.n_agents = len(agent_positions)
self.n_landmarks = len(landmark_positions)
self.lags = self.frames
self.agent_positions = agent_positions
self.landmark_positions = landmark_positions
self.episode_rewards = episode_rewards
# create the subplots
self.fig = plt.figure(figsize=(20, 10), dpi=120)
self.ax_episode = self.fig.add_subplot(1, 2, 1)
self.ax_reward = self.fig.add_subplot(1, 2, 2)
self.ax_episode.set_title('Episode')
self.ax_reward.set_title('Reward')
# colors
self.agent_colors = cm.Dark2.colors
self.landmark_colors = [cm.summer(l*10) for l in range(self.n_landmarks)] # pastl greens
# init the lines
self.lines_episode = self._init_episode_animation(self.ax_episode)
self.lines_reward = self._init_reward_animation(self.ax_reward)
animation.TimedAnimation.__init__(self, self.fig, interval=50, blit=True)
def save_animation(self, savepath='episode'):
with contextlib.redirect_stdout(None):
self.save(savepath+'.gif')
self.fig.savefig(savepath+'.png')
def _episode_update(self, data, line, frame, lags, name=None):
line.set_data(data[max(0,frame-lags):frame, 0], data[max(0,frame-lags):frame, 1])
if name is not None:
line.set_label(name)
def _frameline_update(self, data, line, frame, name=None):
line.set_data(np.arange(1,frame+1), data[:frame])
if name is not None:
line.set_label(name)
def _draw_frame(self, frame):
# Update the episode subplot
line_episode = 0
# update agents heads
for n in range(self.n_agents):
self._episode_update(self.agent_positions[n], self.lines_episode[line_episode], frame, 1, f'Agent_{n+1}')
line_episode += 1
# update agents trajectories
for n in range(self.n_agents):
self._episode_update(self.agent_positions[n], self.lines_episode[line_episode], max(0,frame-1), self.lags)
line_episode += 1
# landmark real positions
for n in range(self.n_landmarks):
self._episode_update(self.landmark_positions[n], self.lines_episode[line_episode], frame, self.lags, f'Landmark_{n+1}_real')
line_episode += 1
self.ax_episode.legend()
# Update the reward subplot
self._frameline_update(self.episode_rewards, self.lines_reward[0], frame)
self._drawn_artists = self.lines_episode + self.lines_reward
def _init_episode_animation(self, ax):
# retrieve the episode dimensions
x_max = max(self.agent_positions[:,:,0].max(),
self.landmark_positions[:,:,0].max())
x_min = min(self.agent_positions[:,:,0].min(),
self.landmark_positions[:,:,0].min())
y_max = max(self.agent_positions[:,:,1].max(),
self.landmark_positions[:,:,1].max())
y_min = min(self.agent_positions[:,:,1].min(),
self.landmark_positions[:,:,1].min())
abs_min = min(x_min, y_min)
abs_max = max(x_max, y_max)
ax.set_xlim(abs_min-1, abs_max+1)
ax.set_ylim(abs_min-1,abs_max+1)
ax.set_ylabel('Y Position')
# remove frame
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
# lines:
# 1. agent head
# 2. agent trajectory
# 3. landmark real
lines = [ax.plot([],[],'o',color=self.agent_colors[a], alpha=0.8,markersize=8)[0] for a in range(self.n_agents)] + \
[ax.plot([],[],'o',color=self.agent_colors[a], alpha=0.2,markersize=4)[0] for a in range(self.n_agents)] + \
[ax.plot([],[],'s',color=self.landmark_colors[l], alpha=0.8,markersize=8)[0] for l in range(self.n_landmarks)]
return lines
def _init_reward_animation(self, ax):
ax.set_xlim(0, self.frames)
ax.set_ylim(self.episode_rewards.min(), self.episode_rewards.max()+1)
ax.set_xlabel('Timestep')
ax.set_ylabel('Reward')
ax.yaxis.set_major_locator(MaxNLocator(integer=True)) # force integer ticks
# remove frame
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
lines = [ax.plot([],[], color='green')[0]]
return lines
def _init_error_animation(self, ax):
ax.set_xlim(0, self.frames)
ax.set_ylim(self.episode_errors.min(), self.episode_errors.max())
ax.set_xlabel('Timestep')
ax.set_ylabel('Prediction error')
# remove frame
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
lines = [ax.plot([],[], color=self.prediction_colors[l])[0] for l in range(self.n_landmarks)]
return lines
def new_frame_seq(self):
return iter(range(self.frames))
def _init_draw(self):
lines = self.lines_episode + self.lines_reward
for l in lines:
l.set_data([], [])