-
Notifications
You must be signed in to change notification settings - Fork 0
/
Env.py
220 lines (166 loc) · 7.87 KB
/
Env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# Import routines
import numpy as np
import math
import random
from itertools import permutations
# Defining hyperparameters
m = 5 # number of cities, ranges from 1 ..... m
t = 24 # number of hours, ranges from 0 .... t-1
d = 7 # number of days, ranges from 0 ... d-1
C = 5 # Per hour fuel and other costs
R = 9 # per hour revenue from a passenger
class CabDriver():
def __init__(self):
"""initialise your state and define your action space and state space"""
self.action_space = [(0, 0)] + \
list(permutations([i for i in range(m)], 2))
self.state_space = [[x, y, z]
for x in range(m) for y in range(t) for z in range(d)]
self.state_init = random.choice(self.state_space)
# Start the first round
self.reset()
## Encoding state (or state-action) for NN input
def state_encod_arch1(self, state):
"""convert the state into a vector so that it can be fed to the NN. This method converts a given state into a vector format. Hint: The vector is of size m + t + d."""
state_encod = [0 for _ in range(m+t+d)]
state_encod[self.state_get_loc(state)] = 1
state_encod[m+self.state_get_time(state)] = 1
state_encod[m+t+self.state_get_day(state)] = 1
return state_encod
# Use this function if you are using architecture-2
def state_encod_arch2(self, state, action):
# """convert the (state-action) into a vector so that it can be fed to the NN. This method converts a given state-action pair into a vector format. Hint: The vector is of size m + t + d + m + m."""
state_encod = [0 for _ in range(m+t+d+m+m)]
state_encod[self.state_get_loc(state)] = 1
state_encod[m+self.state_get_time(state)] = 1
state_encod[m+t+self.state_get_day(state)] = 1
if (action[0] != 0):
state_encod[m+t+d+self.action_get_pickup(action)] = 1
if (action[1] != 0):
state_encod[m+t+d+m+self.action_get_drop(action)] = 1
return state_encod
# return state_encod
## Getting number of requests
def requests(self, state):
"""Determining the number of requests basis the location.
Use the table specified in the MDP and complete for rest of the locations"""
location = state[0]
if location == 0:
requests = np.random.poisson(2)
if location == 1:
requests = np.random.poisson(12)
if location == 2:
requests = np.random.poisson(4)
if location == 3:
requests = np.random.poisson(7)
if location == 4:
requests = np.random.poisson(8)
if requests > 15:
requests = 15
# (0,0) is not considered as customer request, however the driver is free to refuse all
# customer requests. Hence, add the index of action (0,0).
possible_actions_index = random.sample(range(1, (m-1)*m + 1), requests) + [0]
actions = [self.action_space[i] for i in possible_actions_index]
return possible_actions_index, actions
def update_time_day(self, time, day, ride_duration):
"""
Takes in the current state and time taken for driver's journey to return
the state post that journey.
"""
ride_duration = int(ride_duration)
if (time + ride_duration) < 24:
time = time + ride_duration
# day is unchanged
else:
# duration taken spreads over to subsequent days
# convert the time to 0-23 range
time = (time + ride_duration) % 24
# Get the number of days
num_days = (time + ride_duration) // 24
# Convert the day to 0-6 range
day = (day + num_days ) % 7
return time, day
def next_state_func(self, state, action, Time_matrix):
"""Takes state and action as input and returns next state"""
next_state = []
# Initialize various times
total_time = 0
transit_time = 0 # to go from current location to pickup location
wait_time = 0 # in case driver chooses to refuse all requests
ride_time = 0 # from Pick-up to drop
# Derive the current location, time, day and request locations
curr_loc = self.state_get_loc(state)
pickup_loc = self.action_get_pickup(action)
drop_loc = self.action_get_drop(action)
curr_time = self.state_get_time(state)
curr_day = self.state_get_day(state)
"""
3 Scenarios:
a) Refuse all requests
b) Driver is already at pick up point
c) Driver is not at the pickup point.
"""
if ((pickup_loc== 0) and (drop_loc == 0)):
# Refuse all requests, so wait time is 1 unit, next location is current location
wait_time = 1
next_loc = curr_loc
elif (curr_loc == pickup_loc):
# means driver is already at pickup point, wait and transit are both 0 then.
ride_time = Time_matrix[curr_loc][drop_loc][curr_time][curr_day]
# next location is the drop location
next_loc = drop_loc
else:
# Driver is not at the pickup point, he needs to travel to pickup point first
# time take to reach pickup point
transit_time = Time_matrix[curr_loc][pickup_loc][curr_time][curr_day]
new_time, new_day = self.update_time_day(curr_time, curr_day, transit_time)
# The driver is now at the pickup point
# Time taken to drop the passenger
ride_time = Time_matrix[pickup_loc][drop_loc][new_time][new_day]
next_loc = drop_loc
# Calculate total time as sum of all durations
total_time = (wait_time + transit_time + ride_time)
next_time, next_day = self.update_time_day(curr_time, curr_day, total_time)
# Construct next_state using the next_loc and the new time states.
next_state = [next_loc, next_time, next_day]
return next_state, wait_time, transit_time, ride_time
def reward_func(self, wait_time, transit_time, ride_time):
"""Takes in state, action and Time-matrix and returns the reward"""
# transit and wait time yield no revenue, only battery costs, so they are idle times.
passenger_time = ride_time
idle_time = wait_time + transit_time
reward = (R * passenger_time) - (C * (passenger_time + idle_time))
return reward
def step(self, state, action, Time_matrix):
"""
Take a trip as cabby to get rewards next step and total time spent
"""
# Get the next state and the various time durations
next_state, wait_time, transit_time, ride_time = self.next_state_func(
state, action, Time_matrix)
# Calculate the reward based on the different time durations
rewards = self.reward_func(wait_time, transit_time, ride_time)
total_time = wait_time + transit_time + ride_time
return rewards, next_state, total_time
def state_get_loc(self, state):
return state[0]
def state_get_time(self, state):
return state[1]
def state_get_day(self, state):
return state[2]
def action_get_pickup(self, action):
return action[0]
def action_get_drop(self, action):
return action[1]
def state_set_loc(self, state, loc):
state[0] = loc
def state_set_time(self, state, time):
state[1] = time
def state_set_day(self, state, day):
state[2] = day
def action_set_pickup(self, action, pickup):
action[0] = pickup
def action_set_drop(self, action, drop):
action[1] = drop
def reset(self):
return self.action_space, self.state_space, self.state_init