Skip to content

Commit

Permalink
[SPARK-37457][PYTHON] Update cloudpickle to v2.0.0
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to upgrade cloudpickle from 1.6.0 to 2.0.0 (see also cloudpipe/cloudpickle@v1.6.0...v2.0.0).

### Why are the changes needed?

To leverage bug fixes from the cloudpickle upstream.
More importantly, 2.0.0 added the official support of Python 3.8 and 3.9.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Jenkins build and GitHub actions build will test it out.

Closes apache#34705 from HyukjinKwon/cloudpickle-2.0.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Nov 26, 2021
1 parent 69e1151 commit 95fc4c5
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 48 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/cloudpickle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
# expose their Pickler subclass at top-level under the "Pickler" name.
Pickler = CloudPickler

__version__ = '1.6.0'
__version__ = '2.0.0'
195 changes: 163 additions & 32 deletions python/pyspark/cloudpickle/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import warnings

from .compat import pickle
from collections import OrderedDict
from typing import Generic, Union, Tuple, Callable
from pickle import _getattribute
from importlib._bootstrap import _find_spec
Expand Down Expand Up @@ -87,8 +88,11 @@ def g():
# communication speed over compatibility:
DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL

# Names of modules whose resources should be treated as dynamic.
_PICKLE_BY_VALUE_MODULES = set()

# Track the provenance of reconstructed dynamic classes to make it possible to
# recontruct instances from the matching singleton class definition when
# reconstruct instances from the matching singleton class definition when
# appropriate and preserve the usual "isinstance" semantics of Python objects.
_DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
_DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary()
Expand Down Expand Up @@ -123,6 +127,77 @@ def _lookup_class_or_track(class_tracker_id, class_def):
return class_def


def register_pickle_by_value(module):
"""Register a module to make it functions and classes picklable by value.
By default, functions and classes that are attributes of an importable
module are to be pickled by reference, that is relying on re-importing
the attribute from the module at load time.
If `register_pickle_by_value(module)` is called, all its functions and
classes are subsequently to be pickled by value, meaning that they can
be loaded in Python processes where the module is not importable.
This is especially useful when developing a module in a distributed
execution environment: restarting the client Python process with the new
source code is enough: there is no need to re-install the new version
of the module on all the worker nodes nor to restart the workers.
Note: this feature is considered experimental. See the cloudpickle
README.md file for more details and limitations.
"""
if not isinstance(module, types.ModuleType):
raise ValueError(
f"Input should be a module object, got {str(module)} instead"
)
# In the future, cloudpickle may need a way to access any module registered
# for pickling by value in order to introspect relative imports inside
# functions pickled by value. (see
# https://github.com/cloudpipe/cloudpickle/pull/417#issuecomment-873684633).
# This access can be ensured by checking that module is present in
# sys.modules at registering time and assuming that it will still be in
# there when accessed during pickling. Another alternative would be to
# store a weakref to the module. Even though cloudpickle does not implement
# this introspection yet, in order to avoid a possible breaking change
# later, we still enforce the presence of module inside sys.modules.
if module.__name__ not in sys.modules:
raise ValueError(
f"{module} was not imported correctly, have you used an "
f"`import` statement to access it?"
)
_PICKLE_BY_VALUE_MODULES.add(module.__name__)


def unregister_pickle_by_value(module):
"""Unregister that the input module should be pickled by value."""
if not isinstance(module, types.ModuleType):
raise ValueError(
f"Input should be a module object, got {str(module)} instead"
)
if module.__name__ not in _PICKLE_BY_VALUE_MODULES:
raise ValueError(f"{module} is not registered for pickle by value")
else:
_PICKLE_BY_VALUE_MODULES.remove(module.__name__)


def list_registry_pickle_by_value():
return _PICKLE_BY_VALUE_MODULES.copy()


def _is_registered_pickle_by_value(module):
module_name = module.__name__
if module_name in _PICKLE_BY_VALUE_MODULES:
return True
while True:
parent_name = module_name.rsplit(".", 1)[0]
if parent_name == module_name:
break
if parent_name in _PICKLE_BY_VALUE_MODULES:
return True
module_name = parent_name
return False


def _whichmodule(obj, name):
"""Find the module an object belongs to.
Expand All @@ -136,11 +211,14 @@ def _whichmodule(obj, name):
# Workaround bug in old Python versions: prior to Python 3.7,
# T.__module__ would always be set to "typing" even when the TypeVar T
# would be defined in a different module.
#
# For such older Python versions, we ignore the __module__ attribute of
# TypeVar instances and instead exhaustively lookup those instances in
# all currently imported modules.
module_name = None
if name is not None and getattr(typing, name, None) is obj:
# Built-in TypeVar defined in typing such as AnyStr
return 'typing'
else:
# User defined or third-party TypeVar: __module__ attribute is
# irrelevant, thus trigger a exhaustive search for obj in all
# modules.
module_name = None
else:
module_name = getattr(obj, '__module__', None)

Expand All @@ -166,18 +244,35 @@ def _whichmodule(obj, name):
return None


def _is_importable(obj, name=None):
"""Dispatcher utility to test the importability of various constructs."""
if isinstance(obj, types.FunctionType):
return _lookup_module_and_qualname(obj, name=name) is not None
elif issubclass(type(obj), type):
return _lookup_module_and_qualname(obj, name=name) is not None
def _should_pickle_by_reference(obj, name=None):
"""Test whether an function or a class should be pickled by reference
Pickling by reference means by that the object (typically a function or a
class) is an attribute of a module that is assumed to be importable in the
target Python environment. Loading will therefore rely on importing the
module and then calling `getattr` on it to access the function or class.
Pickling by reference is the only option to pickle functions and classes
in the standard library. In cloudpickle the alternative option is to
pickle by value (for instance for interactively or locally defined
functions and classes or for attributes of modules that have been
explicitly registered to be pickled by value.
"""
if isinstance(obj, types.FunctionType) or issubclass(type(obj), type):
module_and_name = _lookup_module_and_qualname(obj, name=name)
if module_and_name is None:
return False
module, name = module_and_name
return not _is_registered_pickle_by_value(module)

elif isinstance(obj, types.ModuleType):
# We assume that sys.modules is primarily used as a cache mechanism for
# the Python import machinery. Checking if a module has been added in
# is sys.modules therefore a cheap and simple heuristic to tell us whether
# we can assume that a given module could be imported by name in
# another Python process.
# is sys.modules therefore a cheap and simple heuristic to tell us
# whether we can assume that a given module could be imported by name
# in another Python process.
if _is_registered_pickle_by_value(obj):
return False
return obj.__name__ in sys.modules
else:
raise TypeError(
Expand Down Expand Up @@ -233,18 +328,21 @@ def _extract_code_globals(co):
out_names = _extract_code_globals_cache.get(co)
if out_names is None:
names = co.co_names
out_names = {names[oparg] for _, oparg in _walk_global_ops(co)}
# We use a dict with None values instead of a set to get a
# deterministic order (assuming Python 3.6+) and avoid introducing
# non-deterministic pickle bytes as a results.
out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}

# Declaring a function inside another one using the "def ..."
# syntax generates a constant code object corresonding to the one
# syntax generates a constant code object corresponding to the one
# of the nested function's As the nested function may itself need
# global variables, we need to introspect its code, extract its
# globals, (look for code object in it's co_consts attribute..) and
# add the result to code_globals
if co.co_consts:
for const in co.co_consts:
if isinstance(const, types.CodeType):
out_names |= _extract_code_globals(const)
out_names.update(_extract_code_globals(const))

_extract_code_globals_cache[co] = out_names

Expand Down Expand Up @@ -452,15 +550,31 @@ def _extract_class_dict(cls):

if sys.version_info[:2] < (3, 7): # pragma: no branch
def _is_parametrized_type_hint(obj):
# This is very cheap but might generate false positives.
# This is very cheap but might generate false positives. So try to
# narrow it down is good as possible.
type_module = getattr(type(obj), '__module__', None)
from_typing_extensions = type_module == 'typing_extensions'
from_typing = type_module == 'typing'

# general typing Constructs
is_typing = getattr(obj, '__origin__', None) is not None

# typing_extensions.Literal
is_litteral = getattr(obj, '__values__', None) is not None
is_literal = (
(getattr(obj, '__values__', None) is not None)
and from_typing_extensions
)

# typing_extensions.Final
is_final = getattr(obj, '__type__', None) is not None
is_final = (
(getattr(obj, '__type__', None) is not None)
and from_typing_extensions
)

# typing.ClassVar
is_classvar = (
(getattr(obj, '__type__', None) is not None) and from_typing
)

# typing.Union/Tuple for old Python 3.5
is_union = getattr(obj, '__union_params__', None) is not None
Expand All @@ -469,8 +583,8 @@ def _is_parametrized_type_hint(obj):
getattr(obj, '__result__', None) is not None and
getattr(obj, '__args__', None) is not None
)
return any((is_typing, is_litteral, is_final, is_union, is_tuple,
is_callable))
return any((is_typing, is_literal, is_final, is_classvar, is_union,
is_tuple, is_callable))

def _create_parametrized_type_hint(origin, args):
return origin[args]
Expand Down Expand Up @@ -557,8 +671,11 @@ def _rebuild_tornado_coroutine(func):
loads = pickle.loads


# hack for __import__ not working as desired
def subimport(name):
# We cannot do simply: `return __import__(name)`: Indeed, if ``name`` is
# the name of a submodule, __import__ will return the top-level root module
# of this submodule. For instance, __import__('os.path') returns the `os`
# module.
__import__(name)
return sys.modules[name]

Expand Down Expand Up @@ -699,7 +816,7 @@ def _make_skel_func(code, cell_count, base_globals=None):
"""
# This function is deprecated and should be removed in cloudpickle 1.7
warnings.warn(
"A pickle file created using an old (<=1.4.1) version of cloudpicke "
"A pickle file created using an old (<=1.4.1) version of cloudpickle "
"is currently being loaded. This is not supported by cloudpickle and "
"will break in cloudpickle 1.7", category=UserWarning
)
Expand Down Expand Up @@ -813,10 +930,15 @@ def _decompose_typevar(obj):


def _typevar_reduce(obj):
# TypeVar instances have no __qualname__ hence we pass the name explicitly.
# TypeVar instances require the module information hence why we
# are not using the _should_pickle_by_reference directly
module_and_name = _lookup_module_and_qualname(obj, name=obj.__name__)

if module_and_name is None:
return (_make_typevar, _decompose_typevar(obj))
elif _is_registered_pickle_by_value(module_and_name[0]):
return (_make_typevar, _decompose_typevar(obj))

return (getattr, module_and_name)


Expand All @@ -830,13 +952,22 @@ def _get_bases(typ):
return getattr(typ, bases_attr)


def _make_dict_keys(obj):
return dict.fromkeys(obj).keys()
def _make_dict_keys(obj, is_ordered=False):
if is_ordered:
return OrderedDict.fromkeys(obj).keys()
else:
return dict.fromkeys(obj).keys()


def _make_dict_values(obj):
return {i: _ for i, _ in enumerate(obj)}.values()
def _make_dict_values(obj, is_ordered=False):
if is_ordered:
return OrderedDict((i, _) for i, _ in enumerate(obj)).values()
else:
return {i: _ for i, _ in enumerate(obj)}.values()


def _make_dict_items(obj):
return obj.items()
def _make_dict_items(obj, is_ordered=False):
if is_ordered:
return OrderedDict(obj).items()
else:
return obj.items()
Loading

0 comments on commit 95fc4c5

Please sign in to comment.