-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathwarehouse_test.py
333 lines (303 loc) · 13.9 KB
/
warehouse_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Mar 31 21:14:11 2020
@author: jingci
"""
from new_small_maze import Maze
from RL_brain_Robo1 import QLearningTable1
from RL_brain_Robo2 import QLearningTable2
from RL_brain_Robo3 import QLearningTable3
from returnBrain1 import ReturnQLearningTable1
from returnBrain2 import ReturnQLearningTable2
from returnBrain3 import ReturnQLearningTable3
import matplotlib.pyplot as plt
import pickle
'''
Add the possible human walking path here
UP: 0
DOWN: 1
RIGHT: 2
LEFT: 3
WAIT: 4
'''
HUMANWALK1 = [4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4]
UNIT = 20
STATE0 = "RAW"
STATE1 = "MAP"
STATE2 = "PATH"
'''
Define the current state here
Different states relate to different pre-trained q table
'''
CURRENTSTATE = STATE2
'''
True, if using the data of this trial for future training
False, if just testing
'''
RECORDDATA = False
'''
Before running the code, please modify this file path as needed
'''
FILEPATH = "/Users/jingci/Desktop/RL/warehouseTest/WarehouseRobotPathPlanning-master/"
def update():
totalReward1 = 0
totalReward2 = 0
totalReward3 = 0
rewardList1 = []
rewardList2 = []
rewardList3 = []
totalRewardList = []
#check whether the agent is available to move
freeze1 = False
freeze2 = False
freeze3 = False
#check whether facing fixed obstacles
backup1 = False
backup2 = False
backup3 = False
for episode in range(300):
# initial observation
observation1, observation2, observation3 = env.resetRobot()
#nearbyEnvironment(observation1)
human1, human2 = env.resetHuman()
humanWalkHelper = 0
wait_time1 = 0
wait_time2 = 0
wait_time3 = 0
while True:
# fresh env
env.render()
# simulation of human walking
human1 = humanWalk(humanWalkHelper)
humanWalkHelper +=1
# RL choose action based on observation
if freeze1:
action1 = 4
else:
# Choose action
if not backup1:
action1 = chooseAction(episode, RL1, observation1)
if stateChecking(human1, observation1, action1) == 'no_collision':
observation1_, reward1, done1 = env.step1(action1)
else:
if wait_time1 > 5:
if stateChecking(human1, observation1, action1) == 'upward_collision':
observation1_, reward1, done1 = env.step1(1)
elif stateChecking(human1, observation1, action1) == 'downward_collision':
observation1_, reward1, done1 = env.step1(0)
elif stateChecking(human1, observation1, action1) == 'left_collision':
observation1_, reward1, done1 = env.step1(2)
elif stateChecking(human1, observation1, action1) == 'right_collision':
observation1_, reward1, done1 = env.step1(3)
backup1 = True
wait_time1 = 0
else:
observation1_, reward1, done1 = env.step1(4)
wait_time1 +=1
learn (episode, RL1, action1, reward1, observation1, observation1_)
else:
action1 = chooseAction(episode, backupRL1, observation1)
observation1_, reward1, done1 = env.step1(action1, human1)
learn (episode, backupRL1, action1, reward1, observation1, observation1_)
totalReward1+=reward1
observation1 = observation1_
if freeze2:
action2 = 4
else:
# Choose action
if not backup2:
action2 = chooseAction(episode, RL2, observation2)
if stateChecking(human1, observation2, action2) == 'no_collision':
observation2_, reward2, done2 = env.step2(action2)
else:
if wait_time2 > 5:
if stateChecking(human1, observation2, action2) == 'upward_collision':
observation2_, reward2, done2 = env.step2(1)
elif stateChecking(human1, observation2, action2) == 'downward_collision':
observation2_, reward2, done2 = env.step2(0)
elif stateChecking(human1, observation2, action2) == 'left_collision':
observation2_, reward2, done2 = env.step2(2)
elif stateChecking(human1, observation2, action2) == 'right_collision':
observation2_, reward2, done2 = env.step2(3)
backup2 = True
wait_time2 = 0
else:
observation2_, reward2, done2 = env.step2(4)
wait_time2 +=1
learn (episode, RL2, action2, reward2, observation2, observation2_)
else:
action2 = chooseAction(episode, backupRL2, observation2)
observation2_, reward2, done2 = env.step2(action2, human1)
learn (episode, backupRL2, action2, reward2, observation2, observation2_)
totalReward2+=reward2
# swap observation
observation2 = observation2_
if freeze3:
action3 = 4
else:
# Choose action
action3 = chooseAction(episode, RL3, observation3)
collisionType = ['upward_collision','downward_collision','left_collision','right_collision']
# RL take action and get next observation and reward
if (stateChecking(observation1, observation3, action3) == 'no_collision'
and stateChecking(observation2, observation3, action3) == 'no_collision'):
observation3_, reward3, done3 = env.step3(action3)
wait_time3 = 0
elif (stateChecking(observation1, observation3, action3) in collisionType
or stateChecking(observation1, observation3, action3) in collisionType):
observation3_, reward3, done3 = env.step3(4)
wait_time3 += 1
else:
observation3_, reward3, done3 = env.step3(action3)
wait_time3 = 0
totalReward3+=reward3
# RL learn from this transition
learn (episode, RL3, action3, reward3, observation3, observation3_)
# swap observation
observation3 = observation3_
# break while loop when end of this episode
if (done1 == 'hit' or done1 == 'arrive') and (done2 == 'hit' or done2 == 'arrive') and (done3 == 'hit' or done3 == 'arrive'):
print (episode, 'trial: ','Robot1: ', totalReward1, '; Robot2: ', totalReward2, '; Robot3: ', totalReward3)
rewardList1.append(totalReward1)
rewardList2.append(totalReward2)
rewardList3.append(totalReward3)
totalRewardList.append(totalReward1+totalReward2+totalReward3)
totalReward1 = 0
totalReward2 = 0
totalReward3 = 0
if done1 == 'arrive' and done2 == 'arrive' and done3 == 'arrive':
for i in range(50):
if startReturnTable (episode, observation1, ReturnRL1,1) == 'arrive':
break
for i in range(50):
if startReturnTable (episode, observation2, ReturnRL2,2) == 'arrive':
break
for i in range(50):
if startReturnTable (episode, observation3, ReturnRL3,3) == 'arrive':
break
freeze1 = False
freeze2 = False
freeze3 = False
break
if done1 == 'hit' or done1 == 'arrive':
freeze1 = True
if done2 == 'hit' or done2 == 'arrive':
freeze2 = True
if done3 == 'hit' or done3 == 'arrive':
freeze3 = True
# Train the map and dump into pickle
if RECORDDATA:
f1 = open(FILEPATH + 'Return_qtable1.txt', 'wb');
pickle.dump(ReturnRL1.q_table,f1)
f1.close()
f2 = open(FILEPATH + 'Return_qtable2.txt', 'wb');
pickle.dump(ReturnRL2.q_table,f2)
f2.close()
f3 = open(FILEPATH + 'Return_qtable3.txt', 'wb');
pickle.dump(ReturnRL3.q_table,f3)
f3.close()
f4 = open(FILEPATH + 'path_qtable1.txt', 'wb');
pickle.dump(RL1.q_table,f4)
f4.close()
f5 = open(FILEPATH + 'path_qtable2.txt', 'wb');
pickle.dump(RL2.q_table,f5)
f5.close()
f6 = open(FILEPATH + 'path_qtable3.txt', 'wb');
pickle.dump(RL3.q_table,f6)
f6.close()
plot(rewardList1)
plot(rewardList2)
plot(rewardList3)
plot(totalRewardList)
# end of game
print('game over')
env.destroy()
#print(rewardList)
def startReturnTable (episode, observation, RL, robotNumber):
#observation1, observation2, observation3 = env.resetRobot()
while True:
env.render()
action = chooseNoRandomAction(RL, observation)
if robotNumber == 1:
observation_, reward, done = env.returnStep1(action)
elif robotNumber == 2:
observation_, reward, done = env.returnStep2(action)
elif robotNumber == 3:
observation_, reward, done = env.returnStep3(action)
learn (episode, RL, action, reward, observation, observation_)
observation = observation_
#print (done)
if done == 'arrive' or done == 'hit':
break
return done
def chooseNoRandomAction(RL, observation):
return RL.choose_action(str(observation),1)
def chooseAction (episode, RL, observation):
if episode < -1:
return RL.choose_action(str(observation), 0.9 + episode * 0.001)
else:
return RL.choose_action(str(observation),1)
def learn (episode, RL, action, reward, observation, observation_):
if episode < 500:
RL.learn(str(observation), action, reward, str(observation_), 0.03, 0.9)
elif episode < 1500 and episode >= 500:
RL.learn(str(observation), action, reward, str(observation_), 0.3-0.0002*(episode-500), 0.9)
else:
RL.learn(str(observation), action, reward, str(observation_), 0.001, 0.9)
def plot(reward):
plt.style.use('seaborn-deep')
plt.plot(reward,linewidth= 0.3)
plt.title('Q Learning Total Reward')
plt.xlabel('Trial')
plt.ylabel('Reward')
plt.show()
def stateChecking(alien_agent, key_agent, action):
directEnvironment = directNearbyEnvironment(key_agent)
indirectEnvironment = indirectNearbyEnvironment(key_agent)
if action == 0 and (alien_agent in [directEnvironment[0], indirectEnvironment[0], indirectEnvironment[1]]):
return 'upward_collision'
elif action == 1 and (alien_agent in [directEnvironment[1], indirectEnvironment[2], indirectEnvironment[3]]):
return 'downward_collision'
elif action == 2 and (alien_agent in [directEnvironment[3], indirectEnvironment[1], indirectEnvironment[3]]):
return 'right_collision'
elif action == 3 and (alien_agent in [directEnvironment[2], indirectEnvironment[0], indirectEnvironment[2]]):
return 'left_collision'
else:
return 'no_collision'
def indirectNearbyEnvironment(coordinate):
upleft = [coordinate[0]-UNIT, coordinate[1]-UNIT, coordinate[2]-UNIT, coordinate[3]-UNIT]
upright = [coordinate[0]+UNIT, coordinate[1]-UNIT, coordinate[2]+UNIT, coordinate[3]-UNIT]
downleft = [coordinate[0]-UNIT, coordinate[1]+UNIT, coordinate[2]-UNIT, coordinate[3]+UNIT]
downright = [coordinate[0]+UNIT, coordinate[1]+UNIT, coordinate[2]+UNIT, coordinate[3]+UNIT]
nearby = [upleft, upright, downleft, downright]
return nearby
def directNearbyEnvironment(coordinate):
left = [coordinate[0]-UNIT, coordinate[1], coordinate[2]-UNIT, coordinate[3]]
right = [coordinate[0]+UNIT, coordinate[1], coordinate[2]+UNIT, coordinate[3]]
up = [coordinate[0], coordinate[1]-UNIT, coordinate[2], coordinate[3]-UNIT]
down = [coordinate[0], coordinate[1]+UNIT, coordinate[2], coordinate[3]+UNIT]
nearby = [up, down, left, right]
return nearby
def humanWalk(humanWalkHelper):
if humanWalkHelper % 2 == 0:
if humanWalkHelper/2 < len(HUMANWALK1):
human = env.humanStep1(HUMANWALK1[int(humanWalkHelper/2)])
else:
human = env.humanStep1(4)
else:
human = env.humanStep1(4)
return human
if __name__ == "__main__":
env = Maze()
RL1 = QLearningTable1(actions=list(range(env.n_actions)),state=CURRENTSTATE)
RL2 = QLearningTable2(actions=list(range(env.n_actions)),state=CURRENTSTATE)
RL3 = QLearningTable3(actions=list(range(env.n_actions)),state=CURRENTSTATE)
backupRL1 = QLearningTable1(actions=list(range(env.n_actions)),state=STATE1)
backupRL2 = QLearningTable2(actions=list(range(env.n_actions)),state=STATE1)
backupRL3 = QLearningTable3(actions=list(range(env.n_actions)),state=STATE1)
ReturnRL1 = ReturnQLearningTable1(actions=list(range(env.n_actions)),state=CURRENTSTATE)
ReturnRL2 = ReturnQLearningTable2(actions=list(range(env.n_actions)),state=CURRENTSTATE)
ReturnRL3 = ReturnQLearningTable3(actions=list(range(env.n_actions)),state=CURRENTSTATE)
env.after(300, update)
env.mainloop()