-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmountain_car.py
329 lines (287 loc) · 11.9 KB
/
mountain_car.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#######################################################################
# Copyright (C) #
# 2017-2018 Shangtong Zhang([email protected]) #
# Permission given to modify the code as long as you keep this #
# declaration at the top #
#######################################################################
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from math import floor
from tqdm import tqdm
#######################################################################
# Following are some utilities for tile coding from Rich.
# To make each file self-contained, I copied them from
# http://incompleteideas.net/tiles/tiles3.py-remove
# with some naming convention changes
#
# Tile coding starts
class IHT:
"Structure to handle collisions"
def __init__(self, size_val):
self.size = size_val
self.overfull_count = 0
self.dictionary = {}
def count(self):
return len(self.dictionary)
def full(self):
return len(self.dictionary) >= self.size
def get_index(self, obj, read_only=False):
d = self.dictionary
if obj in d:
return d[obj]
elif read_only:
return None
size = self.size
count = self.count()
if count >= size:
if self.overfull_count == 0: print('IHT full, starting to allow collisions')
self.overfull_count += 1
return hash(obj) % self.size
else:
d[obj] = count
return count
def hash_coords(coordinates, m, read_only=False):
if isinstance(m, IHT): return m.get_index(tuple(coordinates), read_only)
if isinstance(m, int): return hash(tuple(coordinates)) % m
if m is None: return coordinates
def tiles(iht_or_size, num_tilings, floats, ints=None, read_only=False):
"""returns num-tilings tile indices corresponding to the floats and ints"""
if ints is None:
ints = []
qfloats = [floor(f * num_tilings) for f in floats]
tiles = []
for tiling in range(num_tilings):
tilingX2 = tiling * 2
coords = [tiling]
b = tiling
for q in qfloats:
coords.append((q + b) // num_tilings)
b += tilingX2
coords.extend(ints)
tiles.append(hash_coords(coords, iht_or_size, read_only))
return tiles
# Tile coding ends
#######################################################################
# all possible actions
ACTION_REVERSE = -1
ACTION_ZERO = 0
ACTION_FORWARD = 1
# order is important
ACTIONS = [ACTION_REVERSE, ACTION_ZERO, ACTION_FORWARD]
# bound for position and velocity
POSITION_MIN = -1.2
POSITION_MAX = 0.5
VELOCITY_MIN = -0.07
VELOCITY_MAX = 0.07
# discount is always 1.0 in these experiments
DISCOUNT = 1.0
# use optimistic initial value, so it's ok to set epsilon to 0
EPSILON = 0
# maximum steps per episode
STEP_LIMIT = 5000
# take an @action at @position and @velocity
# @return: new position, new velocity, reward (always -1)
def step(position, velocity, action):
new_velocity = velocity + 0.001 * action - 0.0025 * np.cos(3 * position)
new_velocity = min(max(VELOCITY_MIN, new_velocity), VELOCITY_MAX)
new_position = position + new_velocity
new_position = min(max(POSITION_MIN, new_position), POSITION_MAX)
reward = -1.0
if new_position == POSITION_MIN:
new_velocity = 0.0
return new_position, new_velocity, reward
# accumulating trace update rule
# @trace: old trace (will be modified)
# @activeTiles: current active tile indices
# @lam: lambda
# @return: new trace for convenience
def accumulating_trace(trace, active_tiles, lam):
trace *= lam * DISCOUNT
trace[active_tiles] += 1
return trace
# replacing trace update rule
# @trace: old trace (will be modified)
# @activeTiles: current active tile indices
# @lam: lambda
# @return: new trace for convenience
def replacing_trace(trace, activeTiles, lam):
active = np.in1d(np.arange(len(trace)), activeTiles)
trace[active] = 1
trace[~active] *= lam * DISCOUNT
return trace
# replacing trace update rule, 'clearing' means set all tiles corresponding to non-selected actions to 0
# @trace: old trace (will be modified)
# @activeTiles: current active tile indices
# @lam: lambda
# @clearingTiles: tiles to be cleared
# @return: new trace for convenience
def replacing_trace_with_clearing(trace, active_tiles, lam, clearing_tiles):
active = np.in1d(np.arange(len(trace)), active_tiles)
trace[~active] *= lam * DISCOUNT
trace[clearing_tiles] = 0
trace[active] = 1
return trace
# dutch trace update rule
# @trace: old trace (will be modified)
# @activeTiles: current active tile indices
# @lam: lambda
# @alpha: step size for all tiles
# @return: new trace for convenience
def dutch_trace(trace, active_tiles, lam, alpha):
coef = 1 - alpha * DISCOUNT * lam * np.sum(trace[active_tiles])
trace *= DISCOUNT * lam
trace[active_tiles] += coef
return trace
# wrapper class for Sarsa(lambda)
class Sarsa:
# In this example I use the tiling software instead of implementing standard tiling by myself
# One important thing is that tiling is only a map from (state, action) to a series of indices
# It doesn't matter whether the indices have meaning, only if this map satisfy some property
# View the following webpage for more information
# http://incompleteideas.net/sutton/tiles/tiles3.html
# @maxSize: the maximum # of indices
def __init__(self, step_size, lam, trace_update=accumulating_trace, num_of_tilings=8, max_size=2048):
self.max_size = max_size
self.num_of_tilings = num_of_tilings
self.trace_update = trace_update
self.lam = lam
# divide step size equally to each tiling
self.step_size = step_size / num_of_tilings
self.hash_table = IHT(max_size)
# weight for each tile
self.weights = np.zeros(max_size)
# trace for each tile
self.trace = np.zeros(max_size)
# position and velocity needs scaling to satisfy the tile software
self.position_scale = self.num_of_tilings / (POSITION_MAX - POSITION_MIN)
self.velocity_scale = self.num_of_tilings / (VELOCITY_MAX - VELOCITY_MIN)
# get indices of active tiles for given state and action
def get_active_tiles(self, position, velocity, action):
# I think positionScale * (position - position_min) would be a good normalization.
# However positionScale * position_min is a constant, so it's ok to ignore it.
active_tiles = tiles(self.hash_table, self.num_of_tilings,
[self.position_scale * position, self.velocity_scale * velocity],
[action])
return active_tiles
# estimate the value of given state and action
def value(self, position, velocity, action):
if position == POSITION_MAX:
return 0.0
active_tiles = self.get_active_tiles(position, velocity, action)
return np.sum(self.weights[active_tiles])
# learn with given state, action and target
def learn(self, position, velocity, action, target):
active_tiles = self.get_active_tiles(position, velocity, action)
estimation = np.sum(self.weights[active_tiles])
delta = target - estimation
if self.trace_update == accumulating_trace or self.trace_update == replacing_trace:
self.trace_update(self.trace, active_tiles, self.lam)
elif self.trace_update == dutch_trace:
self.trace_update(self.trace, active_tiles, self.lam, self.step_size)
elif self.trace_update == replacing_trace_with_clearing:
clearing_tiles = []
for act in ACTIONS:
if act != action:
clearing_tiles.extend(self.get_active_tiles(position, velocity, act))
self.trace_update(self.trace, active_tiles, self.lam, clearing_tiles)
else:
raise Exception('Unexpected Trace Type')
self.weights += self.step_size * delta * self.trace
# get # of steps to reach the goal under current state value function
def cost_to_go(self, position, velocity):
costs = []
for action in ACTIONS:
costs.append(self.value(position, velocity, action))
return -np.max(costs)
# get action at @position and @velocity based on epsilon greedy policy and @valueFunction
def get_action(position, velocity, valueFunction):
if np.random.binomial(1, EPSILON) == 1:
return np.random.choice(ACTIONS)
values = []
for action in ACTIONS:
values.append(valueFunction.value(position, velocity, action))
return np.argmax(values) - 1
# play Mountain Car for one episode based on given method @evaluator
# @return: total steps in this episode
def play(evaluator):
position = np.random.uniform(-0.6, -0.4)
velocity = 0.0
action = get_action(position, velocity, evaluator)
steps = 0
while True:
next_position, next_velocity, reward = step(position, velocity, action)
next_action = get_action(next_position, next_velocity, evaluator)
steps += 1
target = reward + DISCOUNT * evaluator.value(next_position, next_velocity, next_action)
evaluator.learn(position, velocity, action, target)
position = next_position
velocity = next_velocity
action = next_action
if next_position == POSITION_MAX:
break
if steps >= STEP_LIMIT:
print('Step Limit Exceeded!')
break
return steps
# figure 12.10, effect of the lambda and alpha on early performance of Sarsa(lambda)
def figure_12_10():
runs = 30
episodes = 50
alphas = np.arange(1, 8) / 4.0
lams = [0.99, 0.95, 0.5, 0]
steps = np.zeros((len(lams), len(alphas), runs, episodes))
for lamInd, lam in enumerate(lams):
for alphaInd, alpha in enumerate(alphas):
for run in tqdm(range(runs)):
evaluator = Sarsa(alpha, lam, replacing_trace)
for ep in range(episodes):
step = play(evaluator)
steps[lamInd, alphaInd, run, ep] = step
# average over episodes
steps = np.mean(steps, axis=3)
# average over runs
steps = np.mean(steps, axis=2)
for lamInd, lam in enumerate(lams):
plt.plot(alphas, steps[lamInd, :], label='lambda = %s' % (str(lam)))
plt.xlabel('alpha * # of tilings (8)')
plt.ylabel('averaged steps per episode')
plt.ylim([180, 300])
plt.legend()
plt.savefig('../images/figure_12_10.png')
plt.close()
# figure 12.11, summary comparision of Sarsa(lambda) algorithms
# I use 8 tilings rather than 10 tilings
def figure_12_11():
traceTypes = [dutch_trace, replacing_trace, replacing_trace_with_clearing, accumulating_trace]
alphas = np.arange(0.2, 2.2, 0.2)
episodes = 20
runs = 30
lam = 0.9
rewards = np.zeros((len(traceTypes), len(alphas), runs, episodes))
for traceInd, trace in enumerate(traceTypes):
for alphaInd, alpha in enumerate(alphas):
for run in tqdm(range(runs)):
evaluator = Sarsa(alpha, lam, trace)
for ep in range(episodes):
if trace == accumulating_trace and alpha > 0.6:
steps = STEP_LIMIT
else:
steps = play(evaluator)
rewards[traceInd, alphaInd, run, ep] = -steps
# average over episodes
rewards = np.mean(rewards, axis=3)
# average over runs
rewards = np.mean(rewards, axis=2)
for traceInd, trace in enumerate(traceTypes):
plt.plot(alphas, rewards[traceInd, :], label=trace.__name__)
plt.xlabel('alpha * # of tilings (8)')
plt.ylabel('averaged rewards pre episode')
plt.ylim([-550, -150])
plt.legend()
plt.savefig('../images/figure_12_11.png')
plt.close()
if __name__ == '__main__':
figure_12_10()
figure_12_11()