-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmain.py
177 lines (163 loc) · 6.86 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#! /usr/bin/env python
__author__ = 'Ning Shi'
__email__ = '[email protected]'
# import dependency
import gym # OpenAI Game Environment
import gym.envs.toy_text # Customized Map
import numpy as np
from tqdm import trange # Processing Bar
import matplotlib.pyplot as plt
class FrozenLake(object):
"""docstring for FrozenLake"""
def __init__(self, amap='SFFFHFFFG'):
super(FrozenLake, self).__init__()
print('Initialize environment...')
self.env = self.initialize_env(amap)
self.n_states, self.n_actions = self.env.observation_space.n, self.env.action_space.n
self.RESULT_IMG_PATH = 'img/result_img_{}.png'
def initialize_env(self, amap):
grid_shape = np.int(np.sqrt(len(amap)))
custom_map = np.array(list(amap)).reshape(grid_shape, grid_shape)
env = gym.envs.toy_text.frozen_lake.FrozenLakeEnv(desc=custom_map).unwrapped
env.render()
return env
def random_agent(self, n_episodes):
# performance of an agent taking random actions
t = trange(n_episodes)
# to record reward for each episode
reward_array = np.zeros(n_episodes)
# for each episode
for i in t:
# reset environment
self.env.reset()
# done flag
done = False
while not done:
# randomly pick an action
action = np.random.randint(self.n_actions)
# get feedback from the environment
_, reward, done, _ = self.env.step(action)
if done:
# update processing bar
t.set_description('Episode {} Reward {}'.format(i + 1, reward))
t.refresh()
reward_array[i] = reward
break
self.env.close()
# show average reward
avg_reward = round(np.mean(reward_array), 4)
print('Averaged reward per episode {}'.format(avg_reward))
# generate output image
title = 'Random Strategy\nReward Per Episode for {} Episodes - Average: {:.2f}'.format(n_episodes, avg_reward)
self.gen_img(reward_array, title, 0)
# initialize the agent’s Q-table to zeros
def init_q(self, s, a):
"""
s: number of states
a: number of actions
"""
return np.zeros((s, a))
# epsilon-greedy exploration strategy
def epsilon_greedy(self, Q, epsilon, s):
"""
Q: Q Table
epsilon: exploration parameter
s: state
"""
# selects a random action with probability epsilon
if np.random.random() <= epsilon:
return np.random.randint(self.n_actions)
else:
return np.argmax(Q[s, :])
# SARSA Process
def sarsa_agent(self, alpha, gamma, epsilon, n_episodes):
"""
alpha: learning rate
gamma: exploration parameter
n_episodes: number of episodes
"""
# initialize Q table
Q = self.init_q(self.n_states, self.n_actions)
# initialize processing bar
t = trange(n_episodes)
# to record reward for each episode
reward_array = np.zeros(n_episodes)
for i in t:
# initial state
s = self.env.reset()
# initial action
a = self.epsilon_greedy(Q, epsilon, s)
done = False
while not done:
s_, reward, done, _ = self.env.step(a)
a_ = self.epsilon_greedy(Q, epsilon, s_)
# update Q table
Q[s, a] += alpha * (reward + (gamma * Q[s_, a_]) - Q[s, a])
# update processing bar
if done:
t.set_description('Episode {} Reward {}'.format(i + 1, reward))
t.refresh()
reward_array[i] = reward
break
s, a = s_, a_
self.env.close()
# show Q table
print('Trained Q Table:')
print(Q)
# show average reward
avg_reward = round(np.mean(reward_array), 4)
print('Training Averaged reward per episode {}'.format(avg_reward))
return Q
def eva(self, Q, n_episodes):
"""
Q: trained Q table
n_episodes: number of episodes
"""
t = trange(n_episodes)
# to record reward for each episode
reward_array = np.zeros(n_episodes)
# for each episode
for i in t:
# initial state
s = self.env.reset()
# initial action
a = np.argmax(Q[s])
done = False
while not done:
s_, reward, done, _ = self.env.step(a)
# pick an action according the state and trained Q table
a_ = np.argmax(Q[s_])
if done:
t.set_description('Episode {} Reward {}'.format(i + 1, reward))
t.refresh()
reward_array[i] = reward
break
s, a = s_, a_
self.env.close()
# show average reward
avg_reward = round(np.mean(reward_array), 4)
print('Training Averaged reward per episode {}'.format(avg_reward))
# generate output image
title = 'SARSA Agent\nReward Per Episode for {} Episodes - Average: {:.2f}'.format(n_episodes, avg_reward)
self.gen_img(reward_array, title, 1)
def gen_img(self, reward_array, title, idx):
# show reward per episode
plt.subplots(figsize = (6, 6), dpi=100)
plt.plot(reward_array, color='black', linewidth=0.5)
plt.ylabel('Reward', fontsize=12)
plt.xlabel('Episode', fontsize=12)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.title(title, fontsize=12)
plt.savefig(self.RESULT_IMG_PATH.format(idx), dpi=100, bbox_inches='tight')
print('Saving output to ' + self.RESULT_IMG_PATH.format(idx))
def main():
fl = FrozenLake()
print('\nAn agent taking random actions:')
fl.random_agent(100)
print('\nSARSA agent:')
Q = fl.sarsa_agent(0.1, 0.9, 0.5, 1000)
reward_array = fl.eva(Q, 100)
print('Done.')
if __name__ == '__main__':
main()