forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmemory.py
211 lines (179 loc) · 7.61 KB
/
memory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from collections import defaultdict
import numpy as np
import tree # pip install dm_tree
from typing import DefaultDict, List, Optional, Set
from ray.rllib.utils.annotations import DeveloperAPI
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, SampleBatch
from ray.util.debug import _test_some_code_for_memory_leaks, Suspect
@DeveloperAPI
def check_memory_leaks(
algorithm,
to_check: Optional[Set[str]] = None,
repeats: Optional[int] = None,
max_num_trials: int = 3,
) -> DefaultDict[str, List[Suspect]]:
"""Diagnoses the given Algorithm for possible memory leaks.
Isolates single components inside the Algorithm's local worker, e.g. the env,
policy, etc.. and calls some of their methods repeatedly, while checking
the memory footprints and keeping track of which lines in the code add
un-GC'd items to memory.
Args:
algorithm: The Algorithm instance to test.
to_check: Set of strings to indentify components to test. Allowed strings
are: "env", "policy", "model", "rollout_worker". By default, check all
of these.
repeats: Number of times the test code block should get executed (per trial).
If a trial fails, a new trial may get started with a larger number of
repeats: actual_repeats = `repeats` * (trial + 1) (1st trial == 0).
max_num_trials: The maximum number of trials to run each check for.
Raises:
A defaultdict(list) with keys being the `to_check` strings and values being
lists of Suspect instances that were found.
"""
local_worker = algorithm.env_runner
# Which components should we test?
to_check = to_check or {"env", "model", "policy", "rollout_worker"}
results_per_category = defaultdict(list)
# Test a single sub-env (first in the VectorEnv)?
if "env" in to_check:
assert local_worker.async_env is not None, (
"ERROR: Cannot test 'env' since given Algorithm does not have one "
"in its local worker. Try setting `create_env_on_driver=True`."
)
# Isolate the first sub-env in the vectorized setup and test it.
env = local_worker.async_env.get_sub_environments()[0]
action_space = env.action_space
# Always use same action to avoid numpy random caused memory leaks.
action_sample = action_space.sample()
def code():
ts = 0
env.reset()
while True:
# If masking is used, try something like this:
# np.random.choice(
# action_space.n, p=(obs["action_mask"] / sum(obs["action_mask"])))
_, _, done, _, _ = env.step(action_sample)
ts += 1
if done:
break
test = _test_some_code_for_memory_leaks(
desc="Looking for leaks in env, running through episodes.",
init=None,
code=code,
# How many times to repeat the function call?
repeats=repeats or 200,
max_num_trials=max_num_trials,
)
if test:
results_per_category["env"].extend(test)
# Test the policy (single-agent case only so far).
if "policy" in to_check:
policy = local_worker.policy_map[DEFAULT_POLICY_ID]
# Get a fixed obs (B=10).
obs = tree.map_structure(
lambda s: np.stack([s] * 10, axis=0), policy.observation_space.sample()
)
print("Looking for leaks in Policy")
def code():
policy.compute_actions_from_input_dict(
{
"obs": obs,
}
)
# Call `compute_actions_from_input_dict()` n times.
test = _test_some_code_for_memory_leaks(
desc="Calling `compute_actions_from_input_dict()`.",
init=None,
code=code,
# How many times to repeat the function call?
repeats=repeats or 400,
# How many times to re-try if we find a suspicious memory
# allocation?
max_num_trials=max_num_trials,
)
if test:
results_per_category["policy"].extend(test)
# Testing this only makes sense if the learner API is disabled.
if not policy.config.get("enable_rl_module_and_learner", False):
# Call `learn_on_batch()` n times.
dummy_batch = policy._get_dummy_batch_from_view_requirements(batch_size=16)
test = _test_some_code_for_memory_leaks(
desc="Calling `learn_on_batch()`.",
init=None,
code=lambda: policy.learn_on_batch(dummy_batch),
# How many times to repeat the function call?
repeats=repeats or 100,
max_num_trials=max_num_trials,
)
if test:
results_per_category["policy"].extend(test)
# Test only the model.
if "model" in to_check:
policy = local_worker.policy_map[DEFAULT_POLICY_ID]
# Get a fixed obs.
obs = tree.map_structure(lambda s: s[None], policy.observation_space.sample())
print("Looking for leaks in Model")
# Call `compute_actions_from_input_dict()` n times.
test = _test_some_code_for_memory_leaks(
desc="Calling `[model]()`.",
init=None,
code=lambda: policy.model({SampleBatch.OBS: obs}),
# How many times to repeat the function call?
repeats=repeats or 400,
# How many times to re-try if we find a suspicious memory
# allocation?
max_num_trials=max_num_trials,
)
if test:
results_per_category["model"].extend(test)
# Test the RolloutWorker.
if "rollout_worker" in to_check:
print("Looking for leaks in local RolloutWorker")
def code():
local_worker.sample()
local_worker.get_metrics()
# Call `compute_actions_from_input_dict()` n times.
test = _test_some_code_for_memory_leaks(
desc="Calling `sample()` and `get_metrics()`.",
init=None,
code=code,
# How many times to repeat the function call?
repeats=repeats or 50,
# How many times to re-try if we find a suspicious memory
# allocation?
max_num_trials=max_num_trials,
)
if test:
results_per_category["rollout_worker"].extend(test)
if "learner" in to_check and algorithm.config.get(
"enable_rl_module_and_learner", False
):
learner_group = algorithm.learner_group
assert learner_group._is_local, (
"This test will miss leaks hidden in remote "
"workers. Please make sure that there is a "
"local learner inside the learner group for "
"this test."
)
dummy_batch = (
algorithm.get_policy()
._get_dummy_batch_from_view_requirements(batch_size=16)
.as_multi_agent()
)
print("Looking for leaks in Learner")
def code():
learner_group.update(dummy_batch)
# Call `compute_actions_from_input_dict()` n times.
test = _test_some_code_for_memory_leaks(
desc="Calling `LearnerGroup.update()`.",
init=None,
code=code,
# How many times to repeat the function call?
repeats=repeats or 400,
# How many times to re-try if we find a suspicious memory
# allocation?
max_num_trials=max_num_trials,
)
if test:
results_per_category["learner"].extend(test)
return results_per_category