forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserialization.py
418 lines (349 loc) · 13.1 KB
/
serialization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
import base64
from collections import OrderedDict
import importlib
import io
import zlib
from typing import Any, Dict, Optional, Sequence, Type, Union
import gymnasium as gym
import numpy as np
import ray
from ray.rllib.utils.annotations import DeveloperAPI
from ray.rllib.utils.error import NotSerializable
from ray.rllib.utils.spaces.flexdict import FlexDict
from ray.rllib.utils.spaces.repeated import Repeated
from ray.rllib.utils.spaces.simplex import Simplex
NOT_SERIALIZABLE = "__not_serializable__"
@DeveloperAPI
def convert_numpy_to_python_primitives(obj: Any):
"""Convert an object that is a numpy type to a python type.
If the object is not a numpy type, it is returned unchanged.
Args:
obj: The object to convert.
"""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, np.str_):
return str(obj)
elif isinstance(obj, np.ndarray):
ret = obj.tolist()
for i, v in enumerate(ret):
ret[i] = convert_numpy_to_python_primitives(v)
return ret
else:
return obj
def _serialize_ndarray(array: np.ndarray) -> str:
"""Pack numpy ndarray into Base64 encoded strings for serialization.
This function uses numpy.save() instead of pickling to ensure
compatibility.
Args:
array: numpy ndarray.
Returns:
b64 escaped string.
"""
buf = io.BytesIO()
np.save(buf, array)
return base64.b64encode(zlib.compress(buf.getvalue())).decode("ascii")
def _deserialize_ndarray(b64_string: str) -> np.ndarray:
"""Unpack b64 escaped string into numpy ndarray.
This function assumes the unescaped bytes are of npy format.
Args:
b64_string: Base64 escaped string.
Returns:
numpy ndarray.
"""
return np.load(
io.BytesIO(zlib.decompress(base64.b64decode(b64_string))), allow_pickle=True
)
@DeveloperAPI
def gym_space_to_dict(space: gym.spaces.Space) -> Dict:
"""Serialize a gym Space into a JSON-serializable dict.
Args:
space: gym.spaces.Space
Returns:
Serialized JSON string.
"""
if space is None:
return None
def _box(sp: gym.spaces.Box) -> Dict:
return {
"space": "box",
"low": _serialize_ndarray(sp.low),
"high": _serialize_ndarray(sp.high),
"shape": sp._shape, # shape is a tuple.
"dtype": sp.dtype.str,
}
def _discrete(sp: gym.spaces.Discrete) -> Dict:
d = {
"space": "discrete",
"n": int(sp.n),
}
# Offset is a relatively new Discrete space feature.
if hasattr(sp, "start"):
d["start"] = int(sp.start)
return d
def _multi_binary(sp: gym.spaces.MultiBinary) -> Dict:
return {
"space": "multi-binary",
"n": sp.n,
}
def _multi_discrete(sp: gym.spaces.MultiDiscrete) -> Dict:
return {
"space": "multi-discrete",
"nvec": _serialize_ndarray(sp.nvec),
"dtype": sp.dtype.str,
}
def _tuple(sp: gym.spaces.Tuple) -> Dict:
return {
"space": "tuple",
"spaces": [gym_space_to_dict(sp) for sp in sp.spaces],
}
def _dict(sp: gym.spaces.Dict) -> Dict:
return {
"space": "dict",
"spaces": {k: gym_space_to_dict(sp) for k, sp in sp.spaces.items()},
}
def _simplex(sp: Simplex) -> Dict:
return {
"space": "simplex",
"shape": sp._shape, # shape is a tuple.
"concentration": sp.concentration,
"dtype": sp.dtype.str,
}
def _repeated(sp: Repeated) -> Dict:
return {
"space": "repeated",
"child_space": gym_space_to_dict(sp.child_space),
"max_len": sp.max_len,
}
def _flex_dict(sp: FlexDict) -> Dict:
d = {
"space": "flex_dict",
}
for k, s in sp.spaces:
d[k] = gym_space_to_dict(s)
return d
def _text(sp: "gym.spaces.Text") -> Dict:
# Note (Kourosh): This only works in gym >= 0.25.0
charset = getattr(sp, "character_set", None)
if charset is None:
charset = getattr(sp, "charset", None)
if charset is None:
raise ValueError(
"Text space must have a character_set or charset attribute"
)
return {
"space": "text",
"min_length": sp.min_length,
"max_length": sp.max_length,
"charset": charset,
}
if isinstance(space, gym.spaces.Box):
return _box(space)
elif isinstance(space, gym.spaces.Discrete):
return _discrete(space)
elif isinstance(space, gym.spaces.MultiBinary):
return _multi_binary(space)
elif isinstance(space, gym.spaces.MultiDiscrete):
return _multi_discrete(space)
elif isinstance(space, gym.spaces.Tuple):
return _tuple(space)
elif isinstance(space, gym.spaces.Dict):
return _dict(space)
elif isinstance(space, gym.spaces.Text):
return _text(space)
elif isinstance(space, Simplex):
return _simplex(space)
elif isinstance(space, Repeated):
return _repeated(space)
elif isinstance(space, FlexDict):
return _flex_dict(space)
else:
raise ValueError("Unknown space type for serialization, ", type(space))
@DeveloperAPI
def space_to_dict(space: gym.spaces.Space) -> Dict:
d = {"space": gym_space_to_dict(space)}
if "original_space" in space.__dict__:
d["original_space"] = space_to_dict(space.original_space)
return d
@DeveloperAPI
def gym_space_from_dict(d: Dict) -> gym.spaces.Space:
"""De-serialize a dict into gym Space.
Args:
str: serialized JSON str.
Returns:
De-serialized gym space.
"""
if d is None:
return None
def __common(d: Dict):
"""Common updates to the dict before we use it to construct spaces"""
ret = d.copy()
del ret["space"]
if "dtype" in ret:
ret["dtype"] = np.dtype(ret["dtype"])
return ret
def _box(d: Dict) -> gym.spaces.Box:
ret = d.copy()
ret.update(
{
"low": _deserialize_ndarray(d["low"]),
"high": _deserialize_ndarray(d["high"]),
}
)
return gym.spaces.Box(**__common(ret))
def _discrete(d: Dict) -> gym.spaces.Discrete:
return gym.spaces.Discrete(**__common(d))
def _multi_binary(d: Dict) -> gym.spaces.MultiBinary:
return gym.spaces.MultiBinary(**__common(d))
def _multi_discrete(d: Dict) -> gym.spaces.MultiDiscrete:
ret = d.copy()
ret.update(
{
"nvec": _deserialize_ndarray(ret["nvec"]),
}
)
return gym.spaces.MultiDiscrete(**__common(ret))
def _tuple(d: Dict) -> gym.spaces.Discrete:
spaces = [gym_space_from_dict(sp) for sp in d["spaces"]]
return gym.spaces.Tuple(spaces=spaces)
def _dict(d: Dict) -> gym.spaces.Discrete:
# We need to always use an OrderedDict here to cover the following two ways, by
# which a user might construct a Dict space originally. We need to restore this
# original Dict space with the exact order of keys the user intended to.
# - User provides an OrderedDict inside the gym.spaces.Dict constructor ->
# gymnasium should NOT further sort the keys. The same (user-provided) order
# must be restored.
# - User provides a simple dict inside the gym.spaces.Dict constructor ->
# By its API definition, gymnasium automatically sorts all keys alphabetically.
# The same (alphabetical) order must thus be restored.
spaces = OrderedDict(
{k: gym_space_from_dict(sp) for k, sp in d["spaces"].items()}
)
return gym.spaces.Dict(spaces=spaces)
def _simplex(d: Dict) -> Simplex:
return Simplex(**__common(d))
def _repeated(d: Dict) -> Repeated:
child_space = gym_space_from_dict(d["child_space"])
return Repeated(child_space=child_space, max_len=d["max_len"])
def _flex_dict(d: Dict) -> FlexDict:
spaces = {k: gym_space_from_dict(s) for k, s in d.items() if k != "space"}
return FlexDict(spaces=spaces)
def _text(d: Dict) -> "gym.spaces.Text":
return gym.spaces.Text(**__common(d))
space_map = {
"box": _box,
"discrete": _discrete,
"multi-binary": _multi_binary,
"multi-discrete": _multi_discrete,
"tuple": _tuple,
"dict": _dict,
"simplex": _simplex,
"repeated": _repeated,
"flex_dict": _flex_dict,
"text": _text,
}
space_type = d["space"]
if space_type not in space_map:
raise ValueError("Unknown space type for de-serialization, ", space_type)
return space_map[space_type](d)
@DeveloperAPI
def space_from_dict(d: Dict) -> gym.spaces.Space:
space = gym_space_from_dict(d["space"])
if "original_space" in d:
assert "space" in d["original_space"]
if isinstance(d["original_space"]["space"], str):
# For backward compatibility reasons, if d["original_space"]["space"]
# is a string, this original space was serialized by gym_space_to_dict.
space.original_space = gym_space_from_dict(d["original_space"])
else:
# Otherwise, this original space was serialized by space_to_dict.
space.original_space = space_from_dict(d["original_space"])
return space
@DeveloperAPI
def check_if_args_kwargs_serializable(args: Sequence[Any], kwargs: Dict[str, Any]):
"""Check if parameters to a function are serializable by ray.
Args:
args: arguments to be checked.
kwargs: keyword arguments to be checked.
Raises:
NoteSerializable if either args are kwargs are not serializable
by ray.
"""
for arg in args:
try:
# if the object is truly serializable we should be able to
# ray.put and ray.get it.
ray.get(ray.put(arg))
except TypeError as e:
raise NotSerializable(
"RLModule constructor arguments must be serializable. "
f"Found non-serializable argument: {arg}.\n"
f"Original serialization error: {e}"
)
for k, v in kwargs.items():
try:
# if the object is truly serializable we should be able to
# ray.put and ray.get it.
ray.get(ray.put(v))
except TypeError as e:
raise NotSerializable(
"RLModule constructor arguments must be serializable. "
f"Found non-serializable keyword argument: {k} = {v}.\n"
f"Original serialization error: {e}"
)
@DeveloperAPI
def serialize_type(type_: Union[Type, str]) -> str:
"""Converts a type into its full classpath ([module file] + "." + [class name]).
Args:
type_: The type to convert.
Returns:
The full classpath of the given type, e.g. "ray.rllib.algorithms.ppo.PPOConfig".
"""
# TODO (avnishn): find a way to incorporate the tune registry here.
# Already serialized.
if isinstance(type_, str):
return type_
return type_.__module__ + "." + type_.__qualname__
@DeveloperAPI
def deserialize_type(
module: Union[str, Type], error: bool = False
) -> Optional[Union[str, Type]]:
"""Resolves a class path to a class.
If the given module is already a class, it is returned as is.
If the given module is a string, it is imported and the class is returned.
Args:
module: The classpath (str) or type to resolve.
error: Whether to throw a ValueError if `module` could not be resolved into
a class. If False and `module` is not resolvable, returns None.
Returns:
The resolved class or `module` (if `error` is False and no resolution possible).
Raises:
ValueError: If `error` is True and `module` cannot be resolved.
"""
# Already a class, return as-is.
if isinstance(module, type):
return module
# A string.
elif isinstance(module, str):
# Try interpreting (as classpath) and importing the given module.
try:
module_path, class_name = module.rsplit(".", 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
# Module not found OR not a module (but a registered string?).
except (ModuleNotFoundError, ImportError, AttributeError, ValueError) as e:
# Ignore if error=False.
if error:
raise ValueError(
f"Could not deserialize the given classpath `module={module}` into "
"a valid python class! Make sure you have all necessary pip "
"packages installed and all custom modules are in your "
"`PYTHONPATH` env variable."
) from e
else:
raise ValueError(f"`module` ({module} must be type or string (classpath)!")
return module