diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py index b63655838c..d2577d403b 100644 --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -96,6 +96,7 @@ def test_deadlock(self): self.assertGreaterEqual(nb_deadlocks, 1) self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) + @unittest.skip("TODO: RUSTPYTHON, flaky test") def test_no_deadlock(self): results = self.run_deadlock_avoidance_test(False) self.assertEqual(results.count((True, False)), 0) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index bdaa79bf31..b9537e4981 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1329,6 +1329,11 @@ class PyRLockTests(lock_tests.RLockTests): class CRLockTests(lock_tests.RLockTests): locktype = staticmethod(threading._CRLock) + # TODO: RUSTPYTHON + @unittest.skip("TODO: RUSTPYTHON, flaky test") + def test_different_thread(self): + super().test_different_thread() + # TODO: RUSTPYTHON @unittest.expectedFailure def test_release_save_unacquired(self): diff --git a/Lib/test/test_utf8_mode.py b/Lib/test/test_utf8_mode.py new file mode 100644 index 0000000000..e5e9568702 --- /dev/null +++ b/Lib/test/test_utf8_mode.py @@ -0,0 +1,271 @@ +""" +Test the implementation of the PEP 540: the UTF-8 Mode. +""" + +import locale +import sys +import textwrap +import unittest +from test import support +from test.support.script_helper import assert_python_ok, assert_python_failure + + +MS_WINDOWS = (sys.platform == 'win32') +POSIX_LOCALES = ('C', 'POSIX') +VXWORKS = (sys.platform == "vxworks") + +class UTF8ModeTests(unittest.TestCase): + DEFAULT_ENV = { + 'PYTHONUTF8': '', + 'PYTHONLEGACYWINDOWSFSENCODING': '', + 'PYTHONCOERCECLOCALE': '0', + } + + def posix_locale(self): + loc = locale.setlocale(locale.LC_CTYPE, None) + return (loc in POSIX_LOCALES) + + def get_output(self, *args, failure=False, **kw): + kw = dict(self.DEFAULT_ENV, **kw) + if failure: + out = assert_python_failure(*args, **kw) + out = out[2] + else: + out = assert_python_ok(*args, **kw) + out = out[1] + return out.decode().rstrip("\n\r") + + @unittest.skipIf(MS_WINDOWS, 'Windows has no POSIX locale') + def test_posix_locale(self): + code = 'import sys; print(sys.flags.utf8_mode)' + + for loc in POSIX_LOCALES: + with self.subTest(LC_ALL=loc): + out = self.get_output('-c', code, LC_ALL=loc) + self.assertEqual(out, '1') + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_xoption(self): + code = 'import sys; print(sys.flags.utf8_mode)' + + out = self.get_output('-X', 'utf8', '-c', code) + self.assertEqual(out, '1') + + # undocumented but accepted syntax: -X utf8=1 + out = self.get_output('-X', 'utf8=1', '-c', code) + self.assertEqual(out, '1') + + out = self.get_output('-X', 'utf8=0', '-c', code) + self.assertEqual(out, '0') + + if MS_WINDOWS: + # PYTHONLEGACYWINDOWSFSENCODING disables the UTF-8 Mode + # and has the priority over -X utf8 + out = self.get_output('-X', 'utf8', '-c', code, + PYTHONLEGACYWINDOWSFSENCODING='1') + self.assertEqual(out, '0') + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_env_var(self): + code = 'import sys; print(sys.flags.utf8_mode)' + + out = self.get_output('-c', code, PYTHONUTF8='1') + self.assertEqual(out, '1') + + out = self.get_output('-c', code, PYTHONUTF8='0') + self.assertEqual(out, '0') + + # -X utf8 has the priority over PYTHONUTF8 + out = self.get_output('-X', 'utf8=0', '-c', code, PYTHONUTF8='1') + self.assertEqual(out, '0') + + if MS_WINDOWS: + # PYTHONLEGACYWINDOWSFSENCODING disables the UTF-8 mode + # and has the priority over PYTHONUTF8 + out = self.get_output('-X', 'utf8', '-c', code, PYTHONUTF8='1', + PYTHONLEGACYWINDOWSFSENCODING='1') + self.assertEqual(out, '0') + + # Cannot test with the POSIX locale, since the POSIX locale enables + # the UTF-8 mode + if not self.posix_locale(): + # PYTHONUTF8 should be ignored if -E is used + out = self.get_output('-E', '-c', code, PYTHONUTF8='1') + self.assertEqual(out, '0') + + # invalid mode + out = self.get_output('-c', code, PYTHONUTF8='xxx', failure=True) + self.assertIn('invalid PYTHONUTF8 environment variable value', + out.rstrip()) + + def test_filesystemencoding(self): + code = textwrap.dedent(''' + import sys + print("{}/{}".format(sys.getfilesystemencoding(), + sys.getfilesystemencodeerrors())) + ''') + + if MS_WINDOWS: + expected = 'utf-8/surrogatepass' + else: + expected = 'utf-8/surrogateescape' + + out = self.get_output('-X', 'utf8', '-c', code) + self.assertEqual(out, expected) + + if MS_WINDOWS: + # PYTHONLEGACYWINDOWSFSENCODING disables the UTF-8 mode + # and has the priority over -X utf8 and PYTHONUTF8 + out = self.get_output('-X', 'utf8', '-c', code, + PYTHONUTF8='strict', + PYTHONLEGACYWINDOWSFSENCODING='1') + self.assertEqual(out, 'mbcs/replace') + + # TODO: RUSTPYTHON + if MS_WINDOWS: + test_filesystemencoding = unittest.expectedFailure(test_filesystemencoding) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_stdio(self): + code = textwrap.dedent(''' + import sys + print(f"stdin: {sys.stdin.encoding}/{sys.stdin.errors}") + print(f"stdout: {sys.stdout.encoding}/{sys.stdout.errors}") + print(f"stderr: {sys.stderr.encoding}/{sys.stderr.errors}") + ''') + + out = self.get_output('-X', 'utf8', '-c', code, + PYTHONIOENCODING='') + self.assertEqual(out.splitlines(), + ['stdin: utf-8/surrogateescape', + 'stdout: utf-8/surrogateescape', + 'stderr: utf-8/backslashreplace']) + + # PYTHONIOENCODING has the priority over PYTHONUTF8 + out = self.get_output('-X', 'utf8', '-c', code, + PYTHONIOENCODING="latin1") + self.assertEqual(out.splitlines(), + ['stdin: iso8859-1/strict', + 'stdout: iso8859-1/strict', + 'stderr: iso8859-1/backslashreplace']) + + out = self.get_output('-X', 'utf8', '-c', code, + PYTHONIOENCODING=":namereplace") + self.assertEqual(out.splitlines(), + ['stdin: utf-8/namereplace', + 'stdout: utf-8/namereplace', + 'stderr: utf-8/backslashreplace']) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_io(self): + code = textwrap.dedent(''' + import sys + filename = sys.argv[1] + with open(filename) as fp: + print(f"{fp.encoding}/{fp.errors}") + ''') + filename = __file__ + + out = self.get_output('-c', code, filename, PYTHONUTF8='1') + self.assertEqual(out, 'UTF-8/strict') + + def _check_io_encoding(self, module, encoding=None, errors=None): + filename = __file__ + + # Encoding explicitly set + args = [] + if encoding: + args.append(f'encoding={encoding!r}') + if errors: + args.append(f'errors={errors!r}') + code = textwrap.dedent(''' + import sys + from %s import open + filename = sys.argv[1] + with open(filename, %s) as fp: + print(f"{fp.encoding}/{fp.errors}") + ''') % (module, ', '.join(args)) + out = self.get_output('-c', code, filename, + PYTHONUTF8='1') + + if not encoding: + encoding = 'UTF-8' + if not errors: + errors = 'strict' + self.assertEqual(out, f'{encoding}/{errors}') + + def check_io_encoding(self, module): + self._check_io_encoding(module, encoding="latin1") + self._check_io_encoding(module, errors="namereplace") + self._check_io_encoding(module, + encoding="latin1", errors="namereplace") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_io_encoding(self): + self.check_io_encoding('io') + + def test_pyio_encoding(self): + self.check_io_encoding('_pyio') + + def test_locale_getpreferredencoding(self): + code = 'import locale; print(locale.getpreferredencoding(False), locale.getpreferredencoding(True))' + out = self.get_output('-X', 'utf8', '-c', code) + self.assertEqual(out, 'UTF-8 UTF-8') + + for loc in POSIX_LOCALES: + with self.subTest(LC_ALL=loc): + out = self.get_output('-X', 'utf8', '-c', code, LC_ALL=loc) + self.assertEqual(out, 'UTF-8 UTF-8') + + # TODO: RUSTPYTHON + @unittest.expectedFailure + @unittest.skipIf(MS_WINDOWS, 'test specific to Unix') + def test_cmd_line(self): + arg = 'h\xe9\u20ac'.encode('utf-8') + arg_utf8 = arg.decode('utf-8') + arg_ascii = arg.decode('ascii', 'surrogateescape') + code = 'import locale, sys; print("%s:%s" % (locale.getpreferredencoding(), ascii(sys.argv[1:])))' + + def check(utf8_opt, expected, **kw): + out = self.get_output('-X', utf8_opt, '-c', code, arg, **kw) + args = out.partition(':')[2].rstrip() + self.assertEqual(args, ascii(expected), out) + + check('utf8', [arg_utf8]) + for loc in POSIX_LOCALES: + with self.subTest(LC_ALL=loc): + check('utf8', [arg_utf8], LC_ALL=loc) + + if sys.platform == 'darwin' or support.is_android or VXWORKS: + c_arg = arg_utf8 + elif sys.platform.startswith("aix"): + c_arg = arg.decode('iso-8859-1') + else: + c_arg = arg_ascii + for loc in POSIX_LOCALES: + with self.subTest(LC_ALL=loc): + check('utf8=0', [c_arg], LC_ALL=loc) + + def test_optim_level(self): + # CPython: check that Py_Main() doesn't increment Py_OptimizeFlag + # twice when -X utf8 requires to parse the configuration twice (when + # the encoding changes after reading the configuration, the + # configuration is read again with the new encoding). + code = 'import sys; print(sys.flags.optimize)' + out = self.get_output('-X', 'utf8', '-O', '-c', code) + self.assertEqual(out, '1') + out = self.get_output('-X', 'utf8', '-OO', '-c', code) + self.assertEqual(out, '2') + + code = 'import sys; print(sys.flags.ignore_environment)' + out = self.get_output('-X', 'utf8', '-E', '-c', code) + self.assertEqual(out, '1') + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_utf8source.py b/Lib/test/test_utf8source.py new file mode 100644 index 0000000000..e2f745c259 --- /dev/null +++ b/Lib/test/test_utf8source.py @@ -0,0 +1,47 @@ +# This file is marked as binary in the CVS, to prevent MacCVS from recoding it. + +import unittest + +class PEP3120Test(unittest.TestCase): + + def test_pep3120(self): + self.assertEqual( + "Питон".encode("utf-8"), + b'\xd0\x9f\xd0\xb8\xd1\x82\xd0\xbe\xd0\xbd' + ) + self.assertEqual( + "\П".encode("utf-8"), + b'\\\xd0\x9f' + ) + + # TODO: RUSTPYTHON + @unittest.expectedFailure # "badsyntax_pep3120.py" may make the WASM CI fail + def test_badsyntax(self): + try: + import test.badsyntax_pep3120 + except SyntaxError as msg: + msg = str(msg).lower() + self.assertTrue('utf-8' in msg) + else: + self.fail("expected exception didn't occur") + + +class BuiltinCompileTests(unittest.TestCase): + + # TODO: RUSTPYTHON + @unittest.expectedFailure + # Issue 3574. + def test_latin1(self): + # Allow compile() to read Latin-1 source. + source_code = '# coding: Latin-1\nu = "Ç"\n'.encode("Latin-1") + try: + code = compile(source_code, '', 'exec') + except SyntaxError: + self.fail("compile() cannot handle Latin-1 source") + ns = {} + exec(code, ns) + self.assertEqual('Ç', ns['u']) + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_uu.py b/Lib/test/test_uu.py new file mode 100644 index 0000000000..55e075ad9a --- /dev/null +++ b/Lib/test/test_uu.py @@ -0,0 +1,275 @@ +""" +Tests for uu module. +Nick Mathewson +""" + +import unittest +from test import support + +import os +import stat +import sys +import uu +import io + +plaintext = b"The symbols on top of your keyboard are !@#$%^&*()_+|~\n" + +encodedtext = b"""\ +M5&AE('-Y;6)O;',@;VX@=&]P(&]F('EO=7(@:V5Y8F]A" % self.arg + def __eq__(self, other): + if isinstance(other, Object): + return self.arg == other.arg + return NotImplemented + def __lt__(self, other): + if isinstance(other, Object): + return self.arg < other.arg + return NotImplemented + def __hash__(self): + return hash(self.arg) + def some_method(self): + return 4 + def other_method(self): + return 5 + + +class RefCycle: + def __init__(self): + self.cycle = self + + +class TestBase(unittest.TestCase): + + def setUp(self): + self.cbcalled = 0 + + def callback(self, ref): + self.cbcalled += 1 + + +@contextlib.contextmanager +def collect_in_thread(period=0.0001): + """ + Ensure GC collections happen in a different thread, at a high frequency. + """ + please_stop = False + + def collect(): + while not please_stop: + time.sleep(period) + gc.collect() + + with support.disable_gc(): + t = threading.Thread(target=collect) + t.start() + try: + yield + finally: + please_stop = True + t.join() + + +class ReferencesTestCase(TestBase): + + def test_basic_ref(self): + self.check_basic_ref(C) + self.check_basic_ref(create_function) + self.check_basic_ref(create_bound_method) + + # Just make sure the tp_repr handler doesn't raise an exception. + # Live reference: + o = C() + wr = weakref.ref(o) + repr(wr) + # Dead reference: + del o + repr(wr) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_basic_callback(self): + self.check_basic_callback(C) + self.check_basic_callback(create_function) + self.check_basic_callback(create_bound_method) + + @support.cpython_only + def test_cfunction(self): + import _testcapi + create_cfunction = _testcapi.create_cfunction + f = create_cfunction() + wr = weakref.ref(f) + self.assertIs(wr(), f) + del f + self.assertIsNone(wr()) + self.check_basic_ref(create_cfunction) + self.check_basic_callback(create_cfunction) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_multiple_callbacks(self): + o = C() + ref1 = weakref.ref(o, self.callback) + ref2 = weakref.ref(o, self.callback) + del o + self.assertIsNone(ref1(), "expected reference to be invalidated") + self.assertIsNone(ref2(), "expected reference to be invalidated") + self.assertEqual(self.cbcalled, 2, + "callback not called the right number of times") + + def test_multiple_selfref_callbacks(self): + # Make sure all references are invalidated before callbacks are called + # + # What's important here is that we're using the first + # reference in the callback invoked on the second reference + # (the most recently created ref is cleaned up first). This + # tests that all references to the object are invalidated + # before any of the callbacks are invoked, so that we only + # have one invocation of _weakref.c:cleanup_helper() active + # for a particular object at a time. + # + def callback(object, self=self): + self.ref() + c = C() + self.ref = weakref.ref(c, callback) + ref1 = weakref.ref(c, callback) + del c + + def test_constructor_kwargs(self): + c = C() + self.assertRaises(TypeError, weakref.ref, c, callback=None) + + @unittest.skip("TODO: RUSTPYTHON, thread 'main' panicked at 'Passed a callback to weakproxy, but weakproxy does not yet support proxies.'") + def test_proxy_ref(self): + o = C() + o.bar = 1 + ref1 = weakref.proxy(o, self.callback) + ref2 = weakref.proxy(o, self.callback) + del o + + def check(proxy): + proxy.bar + + self.assertRaises(ReferenceError, check, ref1) + self.assertRaises(ReferenceError, check, ref2) + self.assertRaises(ReferenceError, bool, weakref.proxy(C())) + self.assertEqual(self.cbcalled, 2) + + def check_basic_ref(self, factory): + o = factory() + ref = weakref.ref(o) + self.assertIsNotNone(ref(), + "weak reference to live object should be live") + o2 = ref() + self.assertIs(o, o2, + "() should return original object if live") + + def check_basic_callback(self, factory): + self.cbcalled = 0 + o = factory() + ref = weakref.ref(o, self.callback) + del o + self.assertEqual(self.cbcalled, 1, + "callback did not properly set 'cbcalled'") + self.assertIsNone(ref(), + "ref2 should be dead after deleting object reference") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_ref_reuse(self): + o = C() + ref1 = weakref.ref(o) + # create a proxy to make sure that there's an intervening creation + # between these two; it should make no difference + proxy = weakref.proxy(o) + ref2 = weakref.ref(o) + self.assertIs(ref1, ref2, + "reference object w/out callback should be re-used") + + o = C() + proxy = weakref.proxy(o) + ref1 = weakref.ref(o) + ref2 = weakref.ref(o) + self.assertIs(ref1, ref2, + "reference object w/out callback should be re-used") + self.assertEqual(weakref.getweakrefcount(o), 2, + "wrong weak ref count for object") + del proxy + self.assertEqual(weakref.getweakrefcount(o), 1, + "wrong weak ref count for object after deleting proxy") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_proxy_reuse(self): + o = C() + proxy1 = weakref.proxy(o) + ref = weakref.ref(o) + proxy2 = weakref.proxy(o) + self.assertIs(proxy1, proxy2, + "proxy object w/out callback should have been re-used") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_basic_proxy(self): + o = C() + self.check_proxy(o, weakref.proxy(o)) + + L = collections.UserList() + p = weakref.proxy(L) + self.assertFalse(p, "proxy for empty UserList should be false") + p.append(12) + self.assertEqual(len(L), 1) + self.assertTrue(p, "proxy for non-empty UserList should be true") + p[:] = [2, 3] + self.assertEqual(len(L), 2) + self.assertEqual(len(p), 2) + self.assertIn(3, p, "proxy didn't support __contains__() properly") + p[1] = 5 + self.assertEqual(L[1], 5) + self.assertEqual(p[1], 5) + L2 = collections.UserList(L) + p2 = weakref.proxy(L2) + self.assertEqual(p, p2) + ## self.assertEqual(repr(L2), repr(p2)) + L3 = collections.UserList(range(10)) + p3 = weakref.proxy(L3) + self.assertEqual(L3[:], p3[:]) + self.assertEqual(L3[5:], p3[5:]) + self.assertEqual(L3[:5], p3[:5]) + self.assertEqual(L3[2:5], p3[2:5]) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_proxy_unicode(self): + # See bug 5037 + class C(object): + def __str__(self): + return "string" + def __bytes__(self): + return b"bytes" + instance = C() + self.assertIn("__bytes__", dir(weakref.proxy(instance))) + self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_proxy_index(self): + class C: + def __index__(self): + return 10 + o = C() + p = weakref.proxy(o) + self.assertEqual(operator.index(p), 10) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_proxy_div(self): + class C: + def __floordiv__(self, other): + return 42 + def __ifloordiv__(self, other): + return 21 + o = C() + p = weakref.proxy(o) + self.assertEqual(p // 5, 42) + p //= 5 + self.assertEqual(p, 21) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_proxy_matmul(self): + class C: + def __matmul__(self, other): + return 1729 + def __rmatmul__(self, other): + return -163 + def __imatmul__(self, other): + return 561 + o = C() + p = weakref.proxy(o) + self.assertEqual(p @ 5, 1729) + self.assertEqual(5 @ p, -163) + p @= 5 + self.assertEqual(p, 561) + + # The PyWeakref_* C API is documented as allowing either NULL or + # None as the value for the callback, where either means "no + # callback". The "no callback" ref and proxy objects are supposed + # to be shared so long as they exist by all callers so long as + # they are active. In Python 2.3.3 and earlier, this guarantee + # was not honored, and was broken in different ways for + # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_shared_ref_without_callback(self): + self.check_shared_without_callback(weakref.ref) + + @unittest.skip("TODO: RUSTPYTHON, thread 'main' panicked at 'Passed a callback to weakproxy, but weakproxy does not yet support proxies.'") + def test_shared_proxy_without_callback(self): + self.check_shared_without_callback(weakref.proxy) + + def check_shared_without_callback(self, makeref): + o = Object(1) + p1 = makeref(o, None) + p2 = makeref(o, None) + self.assertIs(p1, p2, "both callbacks were None in the C API") + del p1, p2 + p1 = makeref(o) + p2 = makeref(o, None) + self.assertIs(p1, p2, "callbacks were NULL, None in the C API") + del p1, p2 + p1 = makeref(o) + p2 = makeref(o) + self.assertIs(p1, p2, "both callbacks were NULL in the C API") + del p1, p2 + p1 = makeref(o, None) + p2 = makeref(o) + self.assertIs(p1, p2, "callbacks were None, NULL in the C API") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_callable_proxy(self): + o = Callable() + ref1 = weakref.proxy(o) + + self.check_proxy(o, ref1) + + self.assertIs(type(ref1), weakref.CallableProxyType, + "proxy is not of callable type") + ref1('twinkies!') + self.assertEqual(o.bar, 'twinkies!', + "call through proxy not passed through to original") + ref1(x='Splat.') + self.assertEqual(o.bar, 'Splat.', + "call through proxy not passed through to original") + + # expect due to too few args + self.assertRaises(TypeError, ref1) + + # expect due to too many args + self.assertRaises(TypeError, ref1, 1, 2, 3) + + def check_proxy(self, o, proxy): + o.foo = 1 + self.assertEqual(proxy.foo, 1, + "proxy does not reflect attribute addition") + o.foo = 2 + self.assertEqual(proxy.foo, 2, + "proxy does not reflect attribute modification") + del o.foo + self.assertFalse(hasattr(proxy, 'foo'), + "proxy does not reflect attribute removal") + + proxy.foo = 1 + self.assertEqual(o.foo, 1, + "object does not reflect attribute addition via proxy") + proxy.foo = 2 + self.assertEqual(o.foo, 2, + "object does not reflect attribute modification via proxy") + del proxy.foo + self.assertFalse(hasattr(o, 'foo'), + "object does not reflect attribute removal via proxy") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_proxy_deletion(self): + # Test clearing of SF bug #762891 + class Foo: + result = None + def __delitem__(self, accessor): + self.result = accessor + g = Foo() + f = weakref.proxy(g) + del f[0] + self.assertEqual(f.result, 0) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_proxy_bool(self): + # Test clearing of SF bug #1170766 + class List(list): pass + lyst = List() + self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) + + def test_proxy_iter(self): + # Test fails with a debug build of the interpreter + # (see bpo-38395). + + obj = None + + class MyObj: + def __iter__(self): + nonlocal obj + del obj + return NotImplemented + + obj = MyObj() + p = weakref.proxy(obj) + with self.assertRaises(TypeError): + # "blech" in p calls MyObj.__iter__ through the proxy, + # without keeping a reference to the real object, so it + # can be killed in the middle of the call + "blech" in p + + @unittest.skip("TODO: RUSTPYTHON, thread 'main' panicked at 'Passed a callback to weakproxy, but weakproxy does not yet support proxies.'") + def test_getweakrefcount(self): + o = C() + ref1 = weakref.ref(o) + ref2 = weakref.ref(o, self.callback) + self.assertEqual(weakref.getweakrefcount(o), 2, + "got wrong number of weak reference objects") + + proxy1 = weakref.proxy(o) + proxy2 = weakref.proxy(o, self.callback) + self.assertEqual(weakref.getweakrefcount(o), 4, + "got wrong number of weak reference objects") + + del ref1, ref2, proxy1, proxy2 + self.assertEqual(weakref.getweakrefcount(o), 0, + "weak reference objects not unlinked from" + " referent when discarded.") + + # assumes ints do not support weakrefs + self.assertEqual(weakref.getweakrefcount(1), 0, + "got wrong number of weak reference objects for int") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_getweakrefs(self): + o = C() + ref1 = weakref.ref(o, self.callback) + ref2 = weakref.ref(o, self.callback) + del ref1 + self.assertEqual(weakref.getweakrefs(o), [ref2], + "list of refs does not match") + + o = C() + ref1 = weakref.ref(o, self.callback) + ref2 = weakref.ref(o, self.callback) + del ref2 + self.assertEqual(weakref.getweakrefs(o), [ref1], + "list of refs does not match") + + del ref1 + self.assertEqual(weakref.getweakrefs(o), [], + "list of refs not cleared") + + # assumes ints do not support weakrefs + self.assertEqual(weakref.getweakrefs(1), [], + "list of refs does not match for int") + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_newstyle_number_ops(self): + class F(float): + pass + f = F(2.0) + p = weakref.proxy(f) + self.assertEqual(p + 1.0, 3.0) + self.assertEqual(1.0 + p, 3.0) # this used to SEGV + + def test_callbacks_protected(self): + # Callbacks protected from already-set exceptions? + # Regression test for SF bug #478534. + class BogusError(Exception): + pass + data = {} + def remove(k): + del data[k] + def encapsulate(): + f = lambda : () + data[weakref.ref(f, remove)] = None + raise BogusError + try: + encapsulate() + except BogusError: + pass + else: + self.fail("exception not properly restored") + try: + encapsulate() + except BogusError: + pass + else: + self.fail("exception not properly restored") + + def test_sf_bug_840829(self): + # "weakref callbacks and gc corrupt memory" + # subtype_dealloc erroneously exposed a new-style instance + # already in the process of getting deallocated to gc, + # causing double-deallocation if the instance had a weakref + # callback that triggered gc. + # If the bug exists, there probably won't be an obvious symptom + # in a release build. In a debug build, a segfault will occur + # when the second attempt to remove the instance from the "list + # of all objects" occurs. + + import gc + + class C(object): + pass + + c = C() + wr = weakref.ref(c, lambda ignore: gc.collect()) + del c + + # There endeth the first part. It gets worse. + del wr + + c1 = C() + c1.i = C() + wr = weakref.ref(c1.i, lambda ignore: gc.collect()) + + c2 = C() + c2.c1 = c1 + del c1 # still alive because c2 points to it + + # Now when subtype_dealloc gets called on c2, it's not enough just + # that c2 is immune from gc while the weakref callbacks associated + # with c2 execute (there are none in this 2nd half of the test, btw). + # subtype_dealloc goes on to call the base classes' deallocs too, + # so any gc triggered by weakref callbacks associated with anything + # torn down by a base class dealloc can also trigger double + # deallocation of c2. + del c2 + + def test_callback_in_cycle_1(self): + import gc + + class J(object): + pass + + class II(object): + def acallback(self, ignore): + self.J + + I = II() + I.J = J + I.wr = weakref.ref(J, I.acallback) + + # Now J and II are each in a self-cycle (as all new-style class + # objects are, since their __mro__ points back to them). I holds + # both a weak reference (I.wr) and a strong reference (I.J) to class + # J. I is also in a cycle (I.wr points to a weakref that references + # I.acallback). When we del these three, they all become trash, but + # the cycles prevent any of them from getting cleaned up immediately. + # Instead they have to wait for cyclic gc to deduce that they're + # trash. + # + # gc used to call tp_clear on all of them, and the order in which + # it does that is pretty accidental. The exact order in which we + # built up these things manages to provoke gc into running tp_clear + # in just the right order (I last). Calling tp_clear on II leaves + # behind an insane class object (its __mro__ becomes NULL). Calling + # tp_clear on J breaks its self-cycle, but J doesn't get deleted + # just then because of the strong reference from I.J. Calling + # tp_clear on I starts to clear I's __dict__, and just happens to + # clear I.J first -- I.wr is still intact. That removes the last + # reference to J, which triggers the weakref callback. The callback + # tries to do "self.J", and instances of new-style classes look up + # attributes ("J") in the class dict first. The class (II) wants to + # search II.__mro__, but that's NULL. The result was a segfault in + # a release build, and an assert failure in a debug build. + del I, J, II + gc.collect() + + def test_callback_in_cycle_2(self): + import gc + + # This is just like test_callback_in_cycle_1, except that II is an + # old-style class. The symptom is different then: an instance of an + # old-style class looks in its own __dict__ first. 'J' happens to + # get cleared from I.__dict__ before 'wr', and 'J' was never in II's + # __dict__, so the attribute isn't found. The difference is that + # the old-style II doesn't have a NULL __mro__ (it doesn't have any + # __mro__), so no segfault occurs. Instead it got: + # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... + # Exception exceptions.AttributeError: + # "II instance has no attribute 'J'" in > ignored + + class J(object): + pass + + class II: + def acallback(self, ignore): + self.J + + I = II() + I.J = J + I.wr = weakref.ref(J, I.acallback) + + del I, J, II + gc.collect() + + def test_callback_in_cycle_3(self): + import gc + + # This one broke the first patch that fixed the last two. In this + # case, the objects reachable from the callback aren't also reachable + # from the object (c1) *triggering* the callback: you can get to + # c1 from c2, but not vice-versa. The result was that c2's __dict__ + # got tp_clear'ed by the time the c2.cb callback got invoked. + + class C: + def cb(self, ignore): + self.me + self.c1 + self.wr + + c1, c2 = C(), C() + + c2.me = c2 + c2.c1 = c1 + c2.wr = weakref.ref(c1, c2.cb) + + del c1, c2 + gc.collect() + + def test_callback_in_cycle_4(self): + import gc + + # Like test_callback_in_cycle_3, except c2 and c1 have different + # classes. c2's class (C) isn't reachable from c1 then, so protecting + # objects reachable from the dying object (c1) isn't enough to stop + # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. + # The result was a segfault (C.__mro__ was NULL when the callback + # tried to look up self.me). + + class C(object): + def cb(self, ignore): + self.me + self.c1 + self.wr + + class D: + pass + + c1, c2 = D(), C() + + c2.me = c2 + c2.c1 = c1 + c2.wr = weakref.ref(c1, c2.cb) + + del c1, c2, C, D + gc.collect() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + @support.requires_type_collecting + def test_callback_in_cycle_resurrection(self): + import gc + + # Do something nasty in a weakref callback: resurrect objects + # from dead cycles. For this to be attempted, the weakref and + # its callback must also be part of the cyclic trash (else the + # objects reachable via the callback couldn't be in cyclic trash + # to begin with -- the callback would act like an external root). + # But gc clears trash weakrefs with callbacks early now, which + # disables the callbacks, so the callbacks shouldn't get called + # at all (and so nothing actually gets resurrected). + + alist = [] + class C(object): + def __init__(self, value): + self.attribute = value + + def acallback(self, ignore): + alist.append(self.c) + + c1, c2 = C(1), C(2) + c1.c = c2 + c2.c = c1 + c1.wr = weakref.ref(c2, c1.acallback) + c2.wr = weakref.ref(c1, c2.acallback) + + def C_went_away(ignore): + alist.append("C went away") + wr = weakref.ref(C, C_went_away) + + del c1, c2, C # make them all trash + self.assertEqual(alist, []) # del isn't enough to reclaim anything + + gc.collect() + # c1.wr and c2.wr were part of the cyclic trash, so should have + # been cleared without their callbacks executing. OTOH, the weakref + # to C is bound to a function local (wr), and wasn't trash, so that + # callback should have been invoked when C went away. + self.assertEqual(alist, ["C went away"]) + # The remaining weakref should be dead now (its callback ran). + self.assertEqual(wr(), None) + + del alist[:] + gc.collect() + self.assertEqual(alist, []) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_callbacks_on_callback(self): + import gc + + # Set up weakref callbacks *on* weakref callbacks. + alist = [] + def safe_callback(ignore): + alist.append("safe_callback called") + + class C(object): + def cb(self, ignore): + alist.append("cb called") + + c, d = C(), C() + c.other = d + d.other = c + callback = c.cb + c.wr = weakref.ref(d, callback) # this won't trigger + d.wr = weakref.ref(callback, d.cb) # ditto + external_wr = weakref.ref(callback, safe_callback) # but this will + self.assertIs(external_wr(), callback) + + # The weakrefs attached to c and d should get cleared, so that + # C.cb is never called. But external_wr isn't part of the cyclic + # trash, and no cyclic trash is reachable from it, so safe_callback + # should get invoked when the bound method object callback (c.cb) + # -- which is itself a callback, and also part of the cyclic trash -- + # gets reclaimed at the end of gc. + + del callback, c, d, C + self.assertEqual(alist, []) # del isn't enough to clean up cycles + gc.collect() + self.assertEqual(alist, ["safe_callback called"]) + self.assertEqual(external_wr(), None) + + del alist[:] + gc.collect() + self.assertEqual(alist, []) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_gc_during_ref_creation(self): + self.check_gc_during_creation(weakref.ref) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_gc_during_proxy_creation(self): + self.check_gc_during_creation(weakref.proxy) + + def check_gc_during_creation(self, makeref): + thresholds = gc.get_threshold() + gc.set_threshold(1, 1, 1) + gc.collect() + class A: + pass + + def callback(*args): + pass + + referenced = A() + + a = A() + a.a = a + a.wr = makeref(referenced) + + try: + # now make sure the object and the ref get labeled as + # cyclic trash: + a = A() + weakref.ref(referenced, callback) + + finally: + gc.set_threshold(*thresholds) + + def test_ref_created_during_del(self): + # Bug #1377858 + # A weakref created in an object's __del__() would crash the + # interpreter when the weakref was cleaned up since it would refer to + # non-existent memory. This test should not segfault the interpreter. + class Target(object): + def __del__(self): + global ref_from_del + ref_from_del = weakref.ref(self) + + w = Target() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_init(self): + # Issue 3634 + # .__init__() doesn't check errors correctly + r = weakref.ref(Exception) + self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) + # No exception should be raised here + gc.collect() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_classes(self): + # Check that classes are weakrefable. + class A(object): + pass + l = [] + weakref.ref(int) + a = weakref.ref(A, l.append) + A = None + gc.collect() + self.assertEqual(a(), None) + self.assertEqual(l, [a]) + + def test_equality(self): + # Alive weakrefs defer equality testing to their underlying object. + x = Object(1) + y = Object(1) + z = Object(2) + a = weakref.ref(x) + b = weakref.ref(y) + c = weakref.ref(z) + d = weakref.ref(x) + # Note how we directly test the operators here, to stress both + # __eq__ and __ne__. + self.assertTrue(a == b) + self.assertFalse(a != b) + self.assertFalse(a == c) + self.assertTrue(a != c) + self.assertTrue(a == d) + self.assertFalse(a != d) + del x, y, z + gc.collect() + for r in a, b, c: + # Sanity check + self.assertIs(r(), None) + # Dead weakrefs compare by identity: whether `a` and `d` are the + # same weakref object is an implementation detail, since they pointed + # to the same original object and didn't have a callback. + # (see issue #16453). + self.assertFalse(a == b) + self.assertTrue(a != b) + self.assertFalse(a == c) + self.assertTrue(a != c) + self.assertEqual(a == d, a is d) + self.assertEqual(a != d, a is not d) + + def test_ordering(self): + # weakrefs cannot be ordered, even if the underlying objects can. + ops = [operator.lt, operator.gt, operator.le, operator.ge] + x = Object(1) + y = Object(1) + a = weakref.ref(x) + b = weakref.ref(y) + for op in ops: + self.assertRaises(TypeError, op, a, b) + # Same when dead. + del x, y + gc.collect() + for op in ops: + self.assertRaises(TypeError, op, a, b) + + def test_hashing(self): + # Alive weakrefs hash the same as the underlying object + x = Object(42) + y = Object(42) + a = weakref.ref(x) + b = weakref.ref(y) + self.assertEqual(hash(a), hash(42)) + del x, y + gc.collect() + # Dead weakrefs: + # - retain their hash is they were hashed when alive; + # - otherwise, cannot be hashed. + self.assertEqual(hash(a), hash(42)) + self.assertRaises(TypeError, hash, b) + + def test_trashcan_16602(self): + # Issue #16602: when a weakref's target was part of a long + # deallocation chain, the trashcan mechanism could delay clearing + # of the weakref and make the target object visible from outside + # code even though its refcount had dropped to 0. A crash ensued. + class C: + def __init__(self, parent): + if not parent: + return + wself = weakref.ref(self) + def cb(wparent): + o = wself() + self.wparent = weakref.ref(parent, cb) + + d = weakref.WeakKeyDictionary() + root = c = C(None) + for n in range(100): + d[c] = c = C(c) + del root + gc.collect() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_callback_attribute(self): + x = Object(1) + callback = lambda ref: None + ref1 = weakref.ref(x, callback) + self.assertIs(ref1.__callback__, callback) + + ref2 = weakref.ref(x) + self.assertIsNone(ref2.__callback__) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_callback_attribute_after_deletion(self): + x = Object(1) + ref = weakref.ref(x, self.callback) + self.assertIsNotNone(ref.__callback__) + del x + support.gc_collect() + self.assertIsNone(ref.__callback__) + + def test_set_callback_attribute(self): + x = Object(1) + callback = lambda ref: None + ref1 = weakref.ref(x, callback) + with self.assertRaises(AttributeError): + ref1.__callback__ = lambda ref: None + + def test_callback_gcs(self): + class ObjectWithDel(Object): + def __del__(self): pass + x = ObjectWithDel(1) + ref1 = weakref.ref(x, lambda ref: support.gc_collect()) + del x + support.gc_collect() + + +class SubclassableWeakrefTestCase(TestBase): + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_subclass_refs(self): + class MyRef(weakref.ref): + def __init__(self, ob, callback=None, value=42): + self.value = value + super().__init__(ob, callback) + def __call__(self): + self.called = True + return super().__call__() + o = Object("foo") + mr = MyRef(o, value=24) + self.assertIs(mr(), o) + self.assertTrue(mr.called) + self.assertEqual(mr.value, 24) + del o + self.assertIsNone(mr()) + self.assertTrue(mr.called) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_subclass_refs_dont_replace_standard_refs(self): + class MyRef(weakref.ref): + pass + o = Object(42) + r1 = MyRef(o) + r2 = weakref.ref(o) + self.assertIsNot(r1, r2) + self.assertEqual(weakref.getweakrefs(o), [r2, r1]) + self.assertEqual(weakref.getweakrefcount(o), 2) + r3 = MyRef(o) + self.assertEqual(weakref.getweakrefcount(o), 3) + refs = weakref.getweakrefs(o) + self.assertEqual(len(refs), 3) + self.assertIs(r2, refs[0]) + self.assertIn(r1, refs[1:]) + self.assertIn(r3, refs[1:]) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_subclass_refs_dont_conflate_callbacks(self): + class MyRef(weakref.ref): + pass + o = Object(42) + r1 = MyRef(o, id) + r2 = MyRef(o, str) + self.assertIsNot(r1, r2) + refs = weakref.getweakrefs(o) + self.assertIn(r1, refs) + self.assertIn(r2, refs) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_subclass_refs_with_slots(self): + class MyRef(weakref.ref): + __slots__ = "slot1", "slot2" + def __new__(type, ob, callback, slot1, slot2): + return weakref.ref.__new__(type, ob, callback) + def __init__(self, ob, callback, slot1, slot2): + self.slot1 = slot1 + self.slot2 = slot2 + def meth(self): + return self.slot1 + self.slot2 + o = Object(42) + r = MyRef(o, None, "abc", "def") + self.assertEqual(r.slot1, "abc") + self.assertEqual(r.slot2, "def") + self.assertEqual(r.meth(), "abcdef") + self.assertFalse(hasattr(r, "__dict__")) + + def test_subclass_refs_with_cycle(self): + """Confirm https://bugs.python.org/issue3100 is fixed.""" + # An instance of a weakref subclass can have attributes. + # If such a weakref holds the only strong reference to the object, + # deleting the weakref will delete the object. In this case, + # the callback must not be called, because the ref object is + # being deleted. + class MyRef(weakref.ref): + pass + + # Use a local callback, for "regrtest -R::" + # to detect refcounting problems + def callback(w): + self.cbcalled += 1 + + o = C() + r1 = MyRef(o, callback) + r1.o = o + del o + + del r1 # Used to crash here + + self.assertEqual(self.cbcalled, 0) + + # Same test, with two weakrefs to the same object + # (since code paths are different) + o = C() + r1 = MyRef(o, callback) + r2 = MyRef(o, callback) + r1.r = r2 + r2.o = o + del o + del r2 + + del r1 # Used to crash here + + self.assertEqual(self.cbcalled, 0) + + +class WeakMethodTestCase(unittest.TestCase): + + def _subclass(self): + """Return an Object subclass overriding `some_method`.""" + class C(Object): + def some_method(self): + return 6 + return C + + def test_alive(self): + o = Object(1) + r = weakref.WeakMethod(o.some_method) + self.assertIsInstance(r, weakref.ReferenceType) + self.assertIsInstance(r(), type(o.some_method)) + self.assertIs(r().__self__, o) + self.assertIs(r().__func__, o.some_method.__func__) + self.assertEqual(r()(), 4) + + def test_object_dead(self): + o = Object(1) + r = weakref.WeakMethod(o.some_method) + del o + gc.collect() + self.assertIs(r(), None) + + def test_method_dead(self): + C = self._subclass() + o = C(1) + r = weakref.WeakMethod(o.some_method) + del C.some_method + gc.collect() + self.assertIs(r(), None) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_callback_when_object_dead(self): + # Test callback behaviour when object dies first. + C = self._subclass() + calls = [] + def cb(arg): + calls.append(arg) + o = C(1) + r = weakref.WeakMethod(o.some_method, cb) + del o + gc.collect() + self.assertEqual(calls, [r]) + # Callback is only called once. + C.some_method = Object.some_method + gc.collect() + self.assertEqual(calls, [r]) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_callback_when_method_dead(self): + # Test callback behaviour when method dies first. + C = self._subclass() + calls = [] + def cb(arg): + calls.append(arg) + o = C(1) + r = weakref.WeakMethod(o.some_method, cb) + del C.some_method + gc.collect() + self.assertEqual(calls, [r]) + # Callback is only called once. + del o + gc.collect() + self.assertEqual(calls, [r]) + + @support.cpython_only + def test_no_cycles(self): + # A WeakMethod doesn't create any reference cycle to itself. + o = Object(1) + def cb(_): + pass + r = weakref.WeakMethod(o.some_method, cb) + wr = weakref.ref(r) + del r + self.assertIs(wr(), None) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_equality(self): + def _eq(a, b): + self.assertTrue(a == b) + self.assertFalse(a != b) + def _ne(a, b): + self.assertTrue(a != b) + self.assertFalse(a == b) + x = Object(1) + y = Object(1) + a = weakref.WeakMethod(x.some_method) + b = weakref.WeakMethod(y.some_method) + c = weakref.WeakMethod(x.other_method) + d = weakref.WeakMethod(y.other_method) + # Objects equal, same method + _eq(a, b) + _eq(c, d) + # Objects equal, different method + _ne(a, c) + _ne(a, d) + _ne(b, c) + _ne(b, d) + # Objects unequal, same or different method + z = Object(2) + e = weakref.WeakMethod(z.some_method) + f = weakref.WeakMethod(z.other_method) + _ne(a, e) + _ne(a, f) + _ne(b, e) + _ne(b, f) + del x, y, z + gc.collect() + # Dead WeakMethods compare by identity + refs = a, b, c, d, e, f + for q in refs: + for r in refs: + self.assertEqual(q == r, q is r) + self.assertEqual(q != r, q is not r) + + def test_hashing(self): + # Alive WeakMethods are hashable if the underlying object is + # hashable. + x = Object(1) + y = Object(1) + a = weakref.WeakMethod(x.some_method) + b = weakref.WeakMethod(y.some_method) + c = weakref.WeakMethod(y.other_method) + # Since WeakMethod objects are equal, the hashes should be equal. + self.assertEqual(hash(a), hash(b)) + ha = hash(a) + # Dead WeakMethods retain their old hash value + del x, y + gc.collect() + self.assertEqual(hash(a), ha) + self.assertEqual(hash(b), ha) + # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. + self.assertRaises(TypeError, hash, c) + + +class MappingTestCase(TestBase): + + COUNT = 10 + + def check_len_cycles(self, dict_type, cons): + N = 20 + items = [RefCycle() for i in range(N)] + dct = dict_type(cons(o) for o in items) + # Keep an iterator alive + it = dct.items() + try: + next(it) + except StopIteration: + pass + del items + gc.collect() + n1 = len(dct) + del it + gc.collect() + n2 = len(dct) + # one item may be kept alive inside the iterator + self.assertIn(n1, (0, 1)) + self.assertEqual(n2, 0) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_keyed_len_cycles(self): + self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_valued_len_cycles(self): + self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) + + def check_len_race(self, dict_type, cons): + # Extended sanity checks for len() in the face of cyclic collection + self.addCleanup(gc.set_threshold, *gc.get_threshold()) + for th in range(1, 100): + N = 20 + gc.collect(0) + gc.set_threshold(th, th, th) + items = [RefCycle() for i in range(N)] + dct = dict_type(cons(o) for o in items) + del items + # All items will be collected at next garbage collection pass + it = dct.items() + try: + next(it) + except StopIteration: + pass + n1 = len(dct) + del it + n2 = len(dct) + self.assertGreaterEqual(n1, 0) + self.assertLessEqual(n1, N) + self.assertGreaterEqual(n2, 0) + self.assertLessEqual(n2, n1) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_keyed_len_race(self): + self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_valued_len_race(self): + self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_values(self): + # + # This exercises d.copy(), d.items(), d[], del d[], len(d). + # + dict, objects = self.make_weak_valued_dict() + for o in objects: + self.assertEqual(weakref.getweakrefcount(o), 1) + self.assertIs(o, dict[o.arg], + "wrong object returned by weak dict!") + items1 = list(dict.items()) + items2 = list(dict.copy().items()) + items1.sort() + items2.sort() + self.assertEqual(items1, items2, + "cloning of weak-valued dictionary did not work!") + del items1, items2 + self.assertEqual(len(dict), self.COUNT) + del objects[0] + self.assertEqual(len(dict), self.COUNT - 1, + "deleting object did not cause dictionary update") + del objects, o + self.assertEqual(len(dict), 0, + "deleting the values did not clear the dictionary") + # regression on SF bug #447152: + dict = weakref.WeakValueDictionary() + self.assertRaises(KeyError, dict.__getitem__, 1) + dict[2] = C() + self.assertRaises(KeyError, dict.__getitem__, 2) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_keys(self): + # + # This exercises d.copy(), d.items(), d[] = v, d[], del d[], + # len(d), k in d. + # + dict, objects = self.make_weak_keyed_dict() + for o in objects: + self.assertEqual(weakref.getweakrefcount(o), 1, + "wrong number of weak references to %r!" % o) + self.assertIs(o.arg, dict[o], + "wrong object returned by weak dict!") + items1 = dict.items() + items2 = dict.copy().items() + self.assertEqual(set(items1), set(items2), + "cloning of weak-keyed dictionary did not work!") + del items1, items2 + self.assertEqual(len(dict), self.COUNT) + del objects[0] + self.assertEqual(len(dict), (self.COUNT - 1), + "deleting object did not cause dictionary update") + del objects, o + self.assertEqual(len(dict), 0, + "deleting the keys did not clear the dictionary") + o = Object(42) + dict[o] = "What is the meaning of the universe?" + self.assertIn(o, dict) + self.assertNotIn(34, dict) + + def test_weak_keyed_iters(self): + dict, objects = self.make_weak_keyed_dict() + self.check_iters(dict) + + # Test keyrefs() + refs = dict.keyrefs() + self.assertEqual(len(refs), len(objects)) + objects2 = list(objects) + for wr in refs: + ob = wr() + self.assertIn(ob, dict) + self.assertIn(ob, dict) + self.assertEqual(ob.arg, dict[ob]) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + + # Test iterkeyrefs() + objects2 = list(objects) + self.assertEqual(len(list(dict.keyrefs())), len(objects)) + for wr in dict.keyrefs(): + ob = wr() + self.assertIn(ob, dict) + self.assertIn(ob, dict) + self.assertEqual(ob.arg, dict[ob]) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + + def test_weak_valued_iters(self): + dict, objects = self.make_weak_valued_dict() + self.check_iters(dict) + + # Test valuerefs() + refs = dict.valuerefs() + self.assertEqual(len(refs), len(objects)) + objects2 = list(objects) + for wr in refs: + ob = wr() + self.assertEqual(ob, dict[ob.arg]) + self.assertEqual(ob.arg, dict[ob.arg].arg) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + + # Test itervaluerefs() + objects2 = list(objects) + self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) + for wr in dict.itervaluerefs(): + ob = wr() + self.assertEqual(ob, dict[ob.arg]) + self.assertEqual(ob.arg, dict[ob.arg].arg) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + + def check_iters(self, dict): + # item iterator: + items = list(dict.items()) + for item in dict.items(): + items.remove(item) + self.assertFalse(items, "items() did not touch all items") + + # key iterator, via __iter__(): + keys = list(dict.keys()) + for k in dict: + keys.remove(k) + self.assertFalse(keys, "__iter__() did not touch all keys") + + # key iterator, via iterkeys(): + keys = list(dict.keys()) + for k in dict.keys(): + keys.remove(k) + self.assertFalse(keys, "iterkeys() did not touch all keys") + + # value iterator: + values = list(dict.values()) + for v in dict.values(): + values.remove(v) + self.assertFalse(values, + "itervalues() did not touch all values") + + def check_weak_destroy_while_iterating(self, dict, objects, iter_name): + n = len(dict) + it = iter(getattr(dict, iter_name)()) + next(it) # Trigger internal iteration + # Destroy an object + del objects[-1] + gc.collect() # just in case + # We have removed either the first consumed object, or another one + self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) + del it + # The removal has been committed + self.assertEqual(len(dict), n - 1) + + def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): + # Check that we can explicitly mutate the weak dict without + # interfering with delayed removal. + # `testcontext` should create an iterator, destroy one of the + # weakref'ed objects and then return a new key/value pair corresponding + # to the destroyed object. + with testcontext() as (k, v): + self.assertNotIn(k, dict) + with testcontext() as (k, v): + self.assertRaises(KeyError, dict.__delitem__, k) + self.assertNotIn(k, dict) + with testcontext() as (k, v): + self.assertRaises(KeyError, dict.pop, k) + self.assertNotIn(k, dict) + with testcontext() as (k, v): + dict[k] = v + self.assertEqual(dict[k], v) + ddict = copy.copy(dict) + with testcontext() as (k, v): + dict.update(ddict) + self.assertEqual(dict, ddict) + with testcontext() as (k, v): + dict.clear() + self.assertEqual(len(dict), 0) + + def check_weak_del_and_len_while_iterating(self, dict, testcontext): + # Check that len() works when both iterating and removing keys + # explicitly through various means (.pop(), .clear()...), while + # implicit mutation is deferred because an iterator is alive. + # (each call to testcontext() should schedule one item for removal + # for this test to work properly) + o = Object(123456) + with testcontext(): + n = len(dict) + # Since underlaying dict is ordered, first item is popped + dict.pop(next(dict.keys())) + self.assertEqual(len(dict), n - 1) + dict[o] = o + self.assertEqual(len(dict), n) + # last item in objects is removed from dict in context shutdown + with testcontext(): + self.assertEqual(len(dict), n - 1) + # Then, (o, o) is popped + dict.popitem() + self.assertEqual(len(dict), n - 2) + with testcontext(): + self.assertEqual(len(dict), n - 3) + del dict[next(dict.keys())] + self.assertEqual(len(dict), n - 4) + with testcontext(): + self.assertEqual(len(dict), n - 5) + dict.popitem() + self.assertEqual(len(dict), n - 6) + with testcontext(): + dict.clear() + self.assertEqual(len(dict), 0) + self.assertEqual(len(dict), 0) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_keys_destroy_while_iterating(self): + # Issue #7105: iterators shouldn't crash when a key is implicitly removed + dict, objects = self.make_weak_keyed_dict() + self.check_weak_destroy_while_iterating(dict, objects, 'keys') + self.check_weak_destroy_while_iterating(dict, objects, 'items') + self.check_weak_destroy_while_iterating(dict, objects, 'values') + self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') + dict, objects = self.make_weak_keyed_dict() + @contextlib.contextmanager + def testcontext(): + try: + it = iter(dict.items()) + next(it) + # Schedule a key/value for removal and recreate it + v = objects.pop().arg + gc.collect() # just in case + yield Object(v), v + finally: + it = None # should commit all removals + gc.collect() + self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) + # Issue #21173: len() fragile when keys are both implicitly and + # explicitly removed. + dict, objects = self.make_weak_keyed_dict() + self.check_weak_del_and_len_while_iterating(dict, testcontext) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_values_destroy_while_iterating(self): + # Issue #7105: iterators shouldn't crash when a key is implicitly removed + dict, objects = self.make_weak_valued_dict() + self.check_weak_destroy_while_iterating(dict, objects, 'keys') + self.check_weak_destroy_while_iterating(dict, objects, 'items') + self.check_weak_destroy_while_iterating(dict, objects, 'values') + self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') + self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') + dict, objects = self.make_weak_valued_dict() + @contextlib.contextmanager + def testcontext(): + try: + it = iter(dict.items()) + next(it) + # Schedule a key/value for removal and recreate it + k = objects.pop().arg + gc.collect() # just in case + yield k, Object(k) + finally: + it = None # should commit all removals + gc.collect() + self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) + dict, objects = self.make_weak_valued_dict() + self.check_weak_del_and_len_while_iterating(dict, testcontext) + + def test_make_weak_keyed_dict_from_dict(self): + o = Object(3) + dict = weakref.WeakKeyDictionary({o:364}) + self.assertEqual(dict[o], 364) + + def test_make_weak_keyed_dict_from_weak_keyed_dict(self): + o = Object(3) + dict = weakref.WeakKeyDictionary({o:364}) + dict2 = weakref.WeakKeyDictionary(dict) + self.assertEqual(dict[o], 364) + + def make_weak_keyed_dict(self): + dict = weakref.WeakKeyDictionary() + objects = list(map(Object, range(self.COUNT))) + for o in objects: + dict[o] = o.arg + return dict, objects + + def test_make_weak_valued_dict_from_dict(self): + o = Object(3) + dict = weakref.WeakValueDictionary({364:o}) + self.assertEqual(dict[364], o) + + def test_make_weak_valued_dict_from_weak_valued_dict(self): + o = Object(3) + dict = weakref.WeakValueDictionary({364:o}) + dict2 = weakref.WeakValueDictionary(dict) + self.assertEqual(dict[364], o) + + def test_make_weak_valued_dict_misc(self): + # errors + self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) + self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) + self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) + # special keyword arguments + o = Object(3) + for kw in 'self', 'dict', 'other', 'iterable': + d = weakref.WeakValueDictionary(**{kw: o}) + self.assertEqual(list(d.keys()), [kw]) + self.assertEqual(d[kw], o) + + def make_weak_valued_dict(self): + dict = weakref.WeakValueDictionary() + objects = list(map(Object, range(self.COUNT))) + for o in objects: + dict[o.arg] = o + return dict, objects + + def check_popitem(self, klass, key1, value1, key2, value2): + weakdict = klass() + weakdict[key1] = value1 + weakdict[key2] = value2 + self.assertEqual(len(weakdict), 2) + k, v = weakdict.popitem() + self.assertEqual(len(weakdict), 1) + if k is key1: + self.assertIs(v, value1) + else: + self.assertIs(v, value2) + k, v = weakdict.popitem() + self.assertEqual(len(weakdict), 0) + if k is key1: + self.assertIs(v, value1) + else: + self.assertIs(v, value2) + + def test_weak_valued_dict_popitem(self): + self.check_popitem(weakref.WeakValueDictionary, + "key1", C(), "key2", C()) + + def test_weak_keyed_dict_popitem(self): + self.check_popitem(weakref.WeakKeyDictionary, + C(), "value 1", C(), "value 2") + + def check_setdefault(self, klass, key, value1, value2): + self.assertIsNot(value1, value2, + "invalid test" + " -- value parameters must be distinct objects") + weakdict = klass() + o = weakdict.setdefault(key, value1) + self.assertIs(o, value1) + self.assertIn(key, weakdict) + self.assertIs(weakdict.get(key), value1) + self.assertIs(weakdict[key], value1) + + o = weakdict.setdefault(key, value2) + self.assertIs(o, value1) + self.assertIn(key, weakdict) + self.assertIs(weakdict.get(key), value1) + self.assertIs(weakdict[key], value1) + + def test_weak_valued_dict_setdefault(self): + self.check_setdefault(weakref.WeakValueDictionary, + "key", C(), C()) + + def test_weak_keyed_dict_setdefault(self): + self.check_setdefault(weakref.WeakKeyDictionary, + C(), "value 1", "value 2") + + def check_update(self, klass, dict): + # + # This exercises d.update(), len(d), d.keys(), k in d, + # d.get(), d[]. + # + weakdict = klass() + weakdict.update(dict) + self.assertEqual(len(weakdict), len(dict)) + for k in weakdict.keys(): + self.assertIn(k, dict, "mysterious new key appeared in weak dict") + v = dict.get(k) + self.assertIs(v, weakdict[k]) + self.assertIs(v, weakdict.get(k)) + for k in dict.keys(): + self.assertIn(k, weakdict, "original key disappeared in weak dict") + v = dict[k] + self.assertIs(v, weakdict[k]) + self.assertIs(v, weakdict.get(k)) + + def test_weak_valued_dict_update(self): + self.check_update(weakref.WeakValueDictionary, + {1: C(), 'a': C(), C(): C()}) + # errors + self.assertRaises(TypeError, weakref.WeakValueDictionary.update) + d = weakref.WeakValueDictionary() + self.assertRaises(TypeError, d.update, {}, {}) + self.assertRaises(TypeError, d.update, (), ()) + self.assertEqual(list(d.keys()), []) + # special keyword arguments + o = Object(3) + for kw in 'self', 'dict', 'other', 'iterable': + d = weakref.WeakValueDictionary() + d.update(**{kw: o}) + self.assertEqual(list(d.keys()), [kw]) + self.assertEqual(d[kw], o) + + def test_weak_keyed_dict_update(self): + self.check_update(weakref.WeakKeyDictionary, + {C(): 1, C(): 2, C(): 3}) + + def test_weak_keyed_delitem(self): + d = weakref.WeakKeyDictionary() + o1 = Object('1') + o2 = Object('2') + d[o1] = 'something' + d[o2] = 'something' + self.assertEqual(len(d), 2) + del d[o1] + self.assertEqual(len(d), 1) + self.assertEqual(list(d.keys()), [o2]) + + def test_weak_valued_delitem(self): + d = weakref.WeakValueDictionary() + o1 = Object('1') + o2 = Object('2') + d['something'] = o1 + d['something else'] = o2 + self.assertEqual(len(d), 2) + del d['something'] + self.assertEqual(len(d), 1) + self.assertEqual(list(d.items()), [('something else', o2)]) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_keyed_bad_delitem(self): + d = weakref.WeakKeyDictionary() + o = Object('1') + # An attempt to delete an object that isn't there should raise + # KeyError. It didn't before 2.3. + self.assertRaises(KeyError, d.__delitem__, o) + self.assertRaises(KeyError, d.__getitem__, o) + + # If a key isn't of a weakly referencable type, __getitem__ and + # __setitem__ raise TypeError. __delitem__ should too. + self.assertRaises(TypeError, d.__delitem__, 13) + self.assertRaises(TypeError, d.__getitem__, 13) + self.assertRaises(TypeError, d.__setitem__, 13, 13) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_keyed_cascading_deletes(self): + # SF bug 742860. For some reason, before 2.3 __delitem__ iterated + # over the keys via self.data.iterkeys(). If things vanished from + # the dict during this (or got added), that caused a RuntimeError. + + d = weakref.WeakKeyDictionary() + mutate = False + + class C(object): + def __init__(self, i): + self.value = i + def __hash__(self): + return hash(self.value) + def __eq__(self, other): + if mutate: + # Side effect that mutates the dict, by removing the + # last strong reference to a key. + del objs[-1] + return self.value == other.value + + objs = [C(i) for i in range(4)] + for o in objs: + d[o] = o.value + del o # now the only strong references to keys are in objs + # Find the order in which iterkeys sees the keys. + objs = list(d.keys()) + # Reverse it, so that the iteration implementation of __delitem__ + # has to keep looping to find the first object we delete. + objs.reverse() + + # Turn on mutation in C.__eq__. The first time through the loop, + # under the iterkeys() business the first comparison will delete + # the last item iterkeys() would see, and that causes a + # RuntimeError: dictionary changed size during iteration + # when the iterkeys() loop goes around to try comparing the next + # key. After this was fixed, it just deletes the last object *our* + # "for o in obj" loop would have gotten to. + mutate = True + count = 0 + for o in objs: + count += 1 + del d[o] + self.assertEqual(len(d), 0) + self.assertEqual(count, 2) + + def test_make_weak_valued_dict_repr(self): + dict = weakref.WeakValueDictionary() + self.assertRegex(repr(dict), '') + + def test_make_weak_keyed_dict_repr(self): + dict = weakref.WeakKeyDictionary() + self.assertRegex(repr(dict), '') + + def test_threaded_weak_valued_setdefault(self): + d = weakref.WeakValueDictionary() + with collect_in_thread(): + for i in range(100000): + x = d.setdefault(10, RefCycle()) + self.assertIsNot(x, None) # we never put None in there! + del x + + def test_threaded_weak_valued_pop(self): + d = weakref.WeakValueDictionary() + with collect_in_thread(): + for i in range(100000): + d[10] = RefCycle() + x = d.pop(10, 10) + self.assertIsNot(x, None) # we never put None in there! + + def test_threaded_weak_valued_consistency(self): + # Issue #28427: old keys should not remove new values from + # WeakValueDictionary when collecting from another thread. + d = weakref.WeakValueDictionary() + with collect_in_thread(): + for i in range(200000): + o = RefCycle() + d[10] = o + # o is still alive, so the dict can't be empty + self.assertEqual(len(d), 1) + o = None # lose ref + + def check_threaded_weak_dict_copy(self, type_, deepcopy): + # `type_` should be either WeakKeyDictionary or WeakValueDictionary. + # `deepcopy` should be either True or False. + exc = [] + + class DummyKey: + def __init__(self, ctr): + self.ctr = ctr + + class DummyValue: + def __init__(self, ctr): + self.ctr = ctr + + def dict_copy(d, exc): + try: + if deepcopy is True: + _ = copy.deepcopy(d) + else: + _ = d.copy() + except Exception as ex: + exc.append(ex) + + def pop_and_collect(lst): + gc_ctr = 0 + while lst: + i = random.randint(0, len(lst) - 1) + gc_ctr += 1 + lst.pop(i) + if gc_ctr % 10000 == 0: + gc.collect() # just in case + + self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) + + d = type_() + keys = [] + values = [] + # Initialize d with many entries + for i in range(70000): + k, v = DummyKey(i), DummyValue(i) + keys.append(k) + values.append(v) + d[k] = v + del k + del v + + t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) + if type_ is weakref.WeakKeyDictionary: + t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) + else: # weakref.WeakValueDictionary + t_collect = threading.Thread(target=pop_and_collect, args=(values,)) + + t_copy.start() + t_collect.start() + + t_copy.join() + t_collect.join() + + # Test exceptions + if exc: + raise exc[0] + + def test_threaded_weak_key_dict_copy(self): + # Issue #35615: Weakref keys or values getting GC'ed during dict + # copying should not result in a crash. + self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) + + def test_threaded_weak_key_dict_deepcopy(self): + # Issue #35615: Weakref keys or values getting GC'ed during dict + # copying should not result in a crash. + self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) + + def test_threaded_weak_value_dict_copy(self): + # Issue #35615: Weakref keys or values getting GC'ed during dict + # copying should not result in a crash. + self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) + + def test_threaded_weak_value_dict_deepcopy(self): + # Issue #35615: Weakref keys or values getting GC'ed during dict + # copying should not result in a crash. + self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) + + @support.cpython_only + def test_remove_closure(self): + d = weakref.WeakValueDictionary() + self.assertIsNone(d._remove.__closure__) + + +from test import mapping_tests + +class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): + """Check that WeakValueDictionary conforms to the mapping protocol""" + __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} + type2test = weakref.WeakValueDictionary + def _reference(self): + return self.__ref.copy() + +class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): + """Check that WeakKeyDictionary conforms to the mapping protocol""" + __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} + type2test = weakref.WeakKeyDictionary + def _reference(self): + return self.__ref.copy() + + +class FinalizeTestCase(unittest.TestCase): + + class A: + pass + + def _collect_if_necessary(self): + # we create no ref-cycles so in CPython no gc should be needed + if sys.implementation.name != 'cpython': + support.gc_collect() + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_finalize(self): + def add(x,y,z): + res.append(x + y + z) + return x + y + z + + a = self.A() + + res = [] + f = weakref.finalize(a, add, 67, 43, z=89) + self.assertEqual(f.alive, True) + self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) + self.assertEqual(f(), 199) + self.assertEqual(f(), None) + self.assertEqual(f(), None) + self.assertEqual(f.peek(), None) + self.assertEqual(f.detach(), None) + self.assertEqual(f.alive, False) + self.assertEqual(res, [199]) + + res = [] + f = weakref.finalize(a, add, 67, 43, 89) + self.assertEqual(f.peek(), (a, add, (67,43,89), {})) + self.assertEqual(f.detach(), (a, add, (67,43,89), {})) + self.assertEqual(f(), None) + self.assertEqual(f(), None) + self.assertEqual(f.peek(), None) + self.assertEqual(f.detach(), None) + self.assertEqual(f.alive, False) + self.assertEqual(res, []) + + res = [] + f = weakref.finalize(a, add, x=67, y=43, z=89) + del a + self._collect_if_necessary() + self.assertEqual(f(), None) + self.assertEqual(f(), None) + self.assertEqual(f.peek(), None) + self.assertEqual(f.detach(), None) + self.assertEqual(f.alive, False) + self.assertEqual(res, [199]) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_arg_errors(self): + def fin(*args, **kwargs): + res.append((args, kwargs)) + + a = self.A() + + res = [] + f = weakref.finalize(a, fin, 1, 2, func=3, obj=4) + self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4})) + f() + self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})]) + + res = [] + with self.assertWarns(DeprecationWarning): + f = weakref.finalize(a, func=fin, arg=1) + self.assertEqual(f.peek(), (a, fin, (), {'arg': 1})) + f() + self.assertEqual(res, [((), {'arg': 1})]) + + res = [] + with self.assertWarns(DeprecationWarning): + f = weakref.finalize(obj=a, func=fin, arg=1) + self.assertEqual(f.peek(), (a, fin, (), {'arg': 1})) + f() + self.assertEqual(res, [((), {'arg': 1})]) + + self.assertRaises(TypeError, weakref.finalize, a) + self.assertRaises(TypeError, weakref.finalize) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_order(self): + a = self.A() + res = [] + + f1 = weakref.finalize(a, res.append, 'f1') + f2 = weakref.finalize(a, res.append, 'f2') + f3 = weakref.finalize(a, res.append, 'f3') + f4 = weakref.finalize(a, res.append, 'f4') + f5 = weakref.finalize(a, res.append, 'f5') + + # make sure finalizers can keep themselves alive + del f1, f4 + + self.assertTrue(f2.alive) + self.assertTrue(f3.alive) + self.assertTrue(f5.alive) + + self.assertTrue(f5.detach()) + self.assertFalse(f5.alive) + + f5() # nothing because previously unregistered + res.append('A') + f3() # => res.append('f3') + self.assertFalse(f3.alive) + res.append('B') + f3() # nothing because previously called + res.append('C') + del a + self._collect_if_necessary() + # => res.append('f4') + # => res.append('f2') + # => res.append('f1') + self.assertFalse(f2.alive) + res.append('D') + f2() # nothing because previously called by gc + + expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] + self.assertEqual(res, expected) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_all_freed(self): + # we want a weakrefable subclass of weakref.finalize + class MyFinalizer(weakref.finalize): + pass + + a = self.A() + res = [] + def callback(): + res.append(123) + f = MyFinalizer(a, callback) + + wr_callback = weakref.ref(callback) + wr_f = weakref.ref(f) + del callback, f + + self.assertIsNotNone(wr_callback()) + self.assertIsNotNone(wr_f()) + + del a + self._collect_if_necessary() + + self.assertIsNone(wr_callback()) + self.assertIsNone(wr_f()) + self.assertEqual(res, [123]) + + @classmethod + def run_in_child(cls): + def error(): + # Create an atexit finalizer from inside a finalizer called + # at exit. This should be the next to be run. + g1 = weakref.finalize(cls, print, 'g1') + print('f3 error') + 1/0 + + # cls should stay alive till atexit callbacks run + f1 = weakref.finalize(cls, print, 'f1', _global_var) + f2 = weakref.finalize(cls, print, 'f2', _global_var) + f3 = weakref.finalize(cls, error) + f4 = weakref.finalize(cls, print, 'f4', _global_var) + + assert f1.atexit == True + f2.atexit = False + assert f3.atexit == True + assert f4.atexit == True + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_atexit(self): + prog = ('from test.test_weakref import FinalizeTestCase;'+ + 'FinalizeTestCase.run_in_child()') + rc, out, err = script_helper.assert_python_ok('-c', prog) + out = out.decode('ascii').splitlines() + self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) + self.assertTrue(b'ZeroDivisionError' in err) + + +libreftest = """ Doctest for examples in the library reference: weakref.rst + +>>> import weakref +>>> class Dict(dict): +... pass +... +>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable +>>> r = weakref.ref(obj) +>>> print(r() is obj) +True + +>>> import weakref +>>> class Object: +... pass +... +>>> o = Object() +>>> r = weakref.ref(o) +>>> o2 = r() +>>> o is o2 +True +>>> del o, o2 +>>> print(r()) +None + +>>> import weakref +>>> class ExtendedRef(weakref.ref): +... def __init__(self, ob, callback=None, **annotations): +... super().__init__(ob, callback) +... self.__counter = 0 +... for k, v in annotations.items(): +... setattr(self, k, v) +... def __call__(self): +... '''Return a pair containing the referent and the number of +... times the reference has been called. +... ''' +... ob = super().__call__() +... if ob is not None: +... self.__counter += 1 +... ob = (ob, self.__counter) +... return ob +... +>>> class A: # not in docs from here, just testing the ExtendedRef +... pass +... +>>> a = A() +>>> r = ExtendedRef(a, foo=1, bar="baz") +>>> r.foo +1 +>>> r.bar +'baz' +>>> r()[1] +1 +>>> r()[1] +2 +>>> r()[0] is a +True + + +>>> import weakref +>>> _id2obj_dict = weakref.WeakValueDictionary() +>>> def remember(obj): +... oid = id(obj) +... _id2obj_dict[oid] = obj +... return oid +... +>>> def id2obj(oid): +... return _id2obj_dict[oid] +... +>>> a = A() # from here, just testing +>>> a_id = remember(a) +>>> id2obj(a_id) is a +True +>>> del a +>>> try: +... id2obj(a_id) +... except KeyError: +... print('OK') +... else: +... print('WeakValueDictionary error') +OK + +""" + +__test__ = {'libreftest' : libreftest} + +def test_main(): + support.run_unittest( + ReferencesTestCase, + WeakMethodTestCase, + MappingTestCase, + WeakValueDictionaryTestCase, + WeakKeyDictionaryTestCase, + SubclassableWeakrefTestCase, + FinalizeTestCase, + ) + # TODO: RUSTPYTHON + # support.run_doctest(sys.modules[__name__]) + + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_weakset.py b/Lib/test/test_weakset.py new file mode 100644 index 0000000000..40a3bee49f --- /dev/null +++ b/Lib/test/test_weakset.py @@ -0,0 +1,464 @@ +import unittest +from weakref import WeakSet +import string +from collections import UserString as ustr +import gc +import contextlib + + +class Foo: + pass + +class RefCycle: + def __init__(self): + self.cycle = self + + +class TestWeakSet(unittest.TestCase): + + def setUp(self): + # need to keep references to them + self.items = [ustr(c) for c in ('a', 'b', 'c')] + self.items2 = [ustr(c) for c in ('x', 'y', 'z')] + self.ab_items = [ustr(c) for c in 'ab'] + self.abcde_items = [ustr(c) for c in 'abcde'] + self.def_items = [ustr(c) for c in 'def'] + self.ab_weakset = WeakSet(self.ab_items) + self.abcde_weakset = WeakSet(self.abcde_items) + self.def_weakset = WeakSet(self.def_items) + self.letters = [ustr(c) for c in string.ascii_letters] + self.s = WeakSet(self.items) + self.d = dict.fromkeys(self.items) + self.obj = ustr('F') + self.fs = WeakSet([self.obj]) + + def test_methods(self): + weaksetmethods = dir(WeakSet) + for method in dir(set): + if method == 'test_c_api' or method.startswith('_'): + continue + self.assertIn(method, weaksetmethods, + "WeakSet missing method " + method) + + def test_new_or_init(self): + self.assertRaises(TypeError, WeakSet, [], 2) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_len(self): + self.assertEqual(len(self.s), len(self.d)) + self.assertEqual(len(self.fs), 1) + del self.obj + self.assertEqual(len(self.fs), 0) + + def test_contains(self): + for c in self.letters: + self.assertEqual(c in self.s, c in self.d) + # 1 is not weakref'able, but that TypeError is caught by __contains__ + self.assertNotIn(1, self.s) + self.assertIn(self.obj, self.fs) + del self.obj + self.assertNotIn(ustr('F'), self.fs) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_union(self): + u = self.s.union(self.items2) + for c in self.letters: + self.assertEqual(c in u, c in self.d or c in self.items2) + self.assertEqual(self.s, WeakSet(self.items)) + self.assertEqual(type(u), WeakSet) + self.assertRaises(TypeError, self.s.union, [[]]) + for C in set, frozenset, dict.fromkeys, list, tuple: + x = WeakSet(self.items + self.items2) + c = C(self.items2) + self.assertEqual(self.s.union(c), x) + del c + self.assertEqual(len(u), len(self.items) + len(self.items2)) + self.items2.pop() + gc.collect() + self.assertEqual(len(u), len(self.items) + len(self.items2)) + + def test_or(self): + i = self.s.union(self.items2) + self.assertEqual(self.s | set(self.items2), i) + self.assertEqual(self.s | frozenset(self.items2), i) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_intersection(self): + s = WeakSet(self.letters) + i = s.intersection(self.items2) + for c in self.letters: + self.assertEqual(c in i, c in self.items2 and c in self.letters) + self.assertEqual(s, WeakSet(self.letters)) + self.assertEqual(type(i), WeakSet) + for C in set, frozenset, dict.fromkeys, list, tuple: + x = WeakSet([]) + self.assertEqual(i.intersection(C(self.items)), x) + self.assertEqual(len(i), len(self.items2)) + self.items2.pop() + gc.collect() + self.assertEqual(len(i), len(self.items2)) + + def test_isdisjoint(self): + self.assertTrue(self.s.isdisjoint(WeakSet(self.items2))) + self.assertTrue(not self.s.isdisjoint(WeakSet(self.letters))) + + def test_and(self): + i = self.s.intersection(self.items2) + self.assertEqual(self.s & set(self.items2), i) + self.assertEqual(self.s & frozenset(self.items2), i) + + def test_difference(self): + i = self.s.difference(self.items2) + for c in self.letters: + self.assertEqual(c in i, c in self.d and c not in self.items2) + self.assertEqual(self.s, WeakSet(self.items)) + self.assertEqual(type(i), WeakSet) + self.assertRaises(TypeError, self.s.difference, [[]]) + + def test_sub(self): + i = self.s.difference(self.items2) + self.assertEqual(self.s - set(self.items2), i) + self.assertEqual(self.s - frozenset(self.items2), i) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_symmetric_difference(self): + i = self.s.symmetric_difference(self.items2) + for c in self.letters: + self.assertEqual(c in i, (c in self.d) ^ (c in self.items2)) + self.assertEqual(self.s, WeakSet(self.items)) + self.assertEqual(type(i), WeakSet) + self.assertRaises(TypeError, self.s.symmetric_difference, [[]]) + self.assertEqual(len(i), len(self.items) + len(self.items2)) + self.items2.pop() + gc.collect() + self.assertEqual(len(i), len(self.items) + len(self.items2)) + + def test_xor(self): + i = self.s.symmetric_difference(self.items2) + self.assertEqual(self.s ^ set(self.items2), i) + self.assertEqual(self.s ^ frozenset(self.items2), i) + + def test_sub_and_super(self): + self.assertTrue(self.ab_weakset <= self.abcde_weakset) + self.assertTrue(self.abcde_weakset <= self.abcde_weakset) + self.assertTrue(self.abcde_weakset >= self.ab_weakset) + self.assertFalse(self.abcde_weakset <= self.def_weakset) + self.assertFalse(self.abcde_weakset >= self.def_weakset) + self.assertTrue(set('a').issubset('abc')) + self.assertTrue(set('abc').issuperset('a')) + self.assertFalse(set('a').issubset('cbs')) + self.assertFalse(set('cbs').issuperset('a')) + + def test_lt(self): + self.assertTrue(self.ab_weakset < self.abcde_weakset) + self.assertFalse(self.abcde_weakset < self.def_weakset) + self.assertFalse(self.ab_weakset < self.ab_weakset) + self.assertFalse(WeakSet() < WeakSet()) + + def test_gt(self): + self.assertTrue(self.abcde_weakset > self.ab_weakset) + self.assertFalse(self.abcde_weakset > self.def_weakset) + self.assertFalse(self.ab_weakset > self.ab_weakset) + self.assertFalse(WeakSet() > WeakSet()) + + def test_gc(self): + # Create a nest of cycles to exercise overall ref count check + s = WeakSet(Foo() for i in range(1000)) + for elem in s: + elem.cycle = s + elem.sub = elem + elem.set = WeakSet([elem]) + + def test_subclass_with_custom_hash(self): + # Bug #1257731 + class H(WeakSet): + def __hash__(self): + return int(id(self) & 0x7fffffff) + s=H() + f=set() + f.add(s) + self.assertIn(s, f) + f.remove(s) + f.add(s) + f.discard(s) + + def test_init(self): + s = WeakSet() + s.__init__(self.items) + self.assertEqual(s, self.s) + s.__init__(self.items2) + self.assertEqual(s, WeakSet(self.items2)) + self.assertRaises(TypeError, s.__init__, s, 2); + self.assertRaises(TypeError, s.__init__, 1); + + def test_constructor_identity(self): + s = WeakSet(self.items) + t = WeakSet(s) + self.assertNotEqual(id(s), id(t)) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_hash(self): + self.assertRaises(TypeError, hash, self.s) + + def test_clear(self): + self.s.clear() + self.assertEqual(self.s, WeakSet([])) + self.assertEqual(len(self.s), 0) + + def test_copy(self): + dup = self.s.copy() + self.assertEqual(self.s, dup) + self.assertNotEqual(id(self.s), id(dup)) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_add(self): + x = ustr('Q') + self.s.add(x) + self.assertIn(x, self.s) + dup = self.s.copy() + self.s.add(x) + self.assertEqual(self.s, dup) + self.assertRaises(TypeError, self.s.add, []) + self.fs.add(Foo()) + self.assertTrue(len(self.fs) == 1) + self.fs.add(self.obj) + self.assertTrue(len(self.fs) == 1) + + def test_remove(self): + x = ustr('a') + self.s.remove(x) + self.assertNotIn(x, self.s) + self.assertRaises(KeyError, self.s.remove, x) + self.assertRaises(TypeError, self.s.remove, []) + + def test_discard(self): + a, q = ustr('a'), ustr('Q') + self.s.discard(a) + self.assertNotIn(a, self.s) + self.s.discard(q) + self.assertRaises(TypeError, self.s.discard, []) + + def test_pop(self): + for i in range(len(self.s)): + elem = self.s.pop() + self.assertNotIn(elem, self.s) + self.assertRaises(KeyError, self.s.pop) + + def test_update(self): + retval = self.s.update(self.items2) + self.assertEqual(retval, None) + for c in (self.items + self.items2): + self.assertIn(c, self.s) + self.assertRaises(TypeError, self.s.update, [[]]) + + def test_update_set(self): + self.s.update(set(self.items2)) + for c in (self.items + self.items2): + self.assertIn(c, self.s) + + def test_ior(self): + self.s |= set(self.items2) + for c in (self.items + self.items2): + self.assertIn(c, self.s) + + def test_intersection_update(self): + retval = self.s.intersection_update(self.items2) + self.assertEqual(retval, None) + for c in (self.items + self.items2): + if c in self.items2 and c in self.items: + self.assertIn(c, self.s) + else: + self.assertNotIn(c, self.s) + self.assertRaises(TypeError, self.s.intersection_update, [[]]) + + def test_iand(self): + self.s &= set(self.items2) + for c in (self.items + self.items2): + if c in self.items2 and c in self.items: + self.assertIn(c, self.s) + else: + self.assertNotIn(c, self.s) + + def test_difference_update(self): + retval = self.s.difference_update(self.items2) + self.assertEqual(retval, None) + for c in (self.items + self.items2): + if c in self.items and c not in self.items2: + self.assertIn(c, self.s) + else: + self.assertNotIn(c, self.s) + self.assertRaises(TypeError, self.s.difference_update, [[]]) + self.assertRaises(TypeError, self.s.symmetric_difference_update, [[]]) + + def test_isub(self): + self.s -= set(self.items2) + for c in (self.items + self.items2): + if c in self.items and c not in self.items2: + self.assertIn(c, self.s) + else: + self.assertNotIn(c, self.s) + + def test_symmetric_difference_update(self): + retval = self.s.symmetric_difference_update(self.items2) + self.assertEqual(retval, None) + for c in (self.items + self.items2): + if (c in self.items) ^ (c in self.items2): + self.assertIn(c, self.s) + else: + self.assertNotIn(c, self.s) + self.assertRaises(TypeError, self.s.symmetric_difference_update, [[]]) + + def test_ixor(self): + self.s ^= set(self.items2) + for c in (self.items + self.items2): + if (c in self.items) ^ (c in self.items2): + self.assertIn(c, self.s) + else: + self.assertNotIn(c, self.s) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_inplace_on_self(self): + t = self.s.copy() + t |= t + self.assertEqual(t, self.s) + t &= t + self.assertEqual(t, self.s) + t -= t + self.assertEqual(t, WeakSet()) + t = self.s.copy() + t ^= t + self.assertEqual(t, WeakSet()) + + def test_eq(self): + # issue 5964 + self.assertTrue(self.s == self.s) + self.assertTrue(self.s == WeakSet(self.items)) + self.assertFalse(self.s == set(self.items)) + self.assertFalse(self.s == list(self.items)) + self.assertFalse(self.s == tuple(self.items)) + self.assertFalse(self.s == WeakSet([Foo])) + self.assertFalse(self.s == 1) + + def test_ne(self): + self.assertTrue(self.s != set(self.items)) + s1 = WeakSet() + s2 = WeakSet() + self.assertFalse(s1 != s2) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_destroy_while_iterating(self): + # Issue #7105: iterators shouldn't crash when a key is implicitly removed + # Create new items to be sure no-one else holds a reference + items = [ustr(c) for c in ('a', 'b', 'c')] + s = WeakSet(items) + it = iter(s) + next(it) # Trigger internal iteration + # Destroy an item + del items[-1] + gc.collect() # just in case + # We have removed either the first consumed items, or another one + self.assertIn(len(list(it)), [len(items), len(items) - 1]) + del it + # The removal has been committed + self.assertEqual(len(s), len(items)) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_weak_destroy_and_mutate_while_iterating(self): + # Issue #7105: iterators shouldn't crash when a key is implicitly removed + items = [ustr(c) for c in string.ascii_letters] + s = WeakSet(items) + @contextlib.contextmanager + def testcontext(): + try: + it = iter(s) + # Start iterator + yielded = ustr(str(next(it))) + # Schedule an item for removal and recreate it + u = ustr(str(items.pop())) + if yielded == u: + # The iterator still has a reference to the removed item, + # advance it (issue #20006). + next(it) + gc.collect() # just in case + yield u + finally: + it = None # should commit all removals + + with testcontext() as u: + self.assertNotIn(u, s) + with testcontext() as u: + self.assertRaises(KeyError, s.remove, u) + self.assertNotIn(u, s) + with testcontext() as u: + s.add(u) + self.assertIn(u, s) + t = s.copy() + with testcontext() as u: + s.update(t) + self.assertEqual(len(s), len(t)) + with testcontext() as u: + s.clear() + self.assertEqual(len(s), 0) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_len_cycles(self): + N = 20 + items = [RefCycle() for i in range(N)] + s = WeakSet(items) + del items + it = iter(s) + try: + next(it) + except StopIteration: + pass + gc.collect() + n1 = len(s) + del it + gc.collect() + n2 = len(s) + # one item may be kept alive inside the iterator + self.assertIn(n1, (0, 1)) + self.assertEqual(n2, 0) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_len_race(self): + # Extended sanity checks for len() in the face of cyclic collection + self.addCleanup(gc.set_threshold, *gc.get_threshold()) + for th in range(1, 100): + N = 20 + gc.collect(0) + gc.set_threshold(th, th, th) + items = [RefCycle() for i in range(N)] + s = WeakSet(items) + del items + # All items will be collected at next garbage collection pass + it = iter(s) + try: + next(it) + except StopIteration: + pass + n1 = len(s) + del it + n2 = len(s) + self.assertGreaterEqual(n1, 0) + self.assertLessEqual(n1, N) + self.assertGreaterEqual(n2, 0) + self.assertLessEqual(n2, n1) + + def test_repr(self): + assert repr(self.s) == repr(self.s.data) + + +if __name__ == "__main__": + unittest.main() diff --git a/extra_tests/not_impl_gen.py b/extra_tests/not_impl_gen.py index b5037c1c71..77ecd92ba1 100644 --- a/extra_tests/not_impl_gen.py +++ b/extra_tests/not_impl_gen.py @@ -17,6 +17,39 @@ from pydoc import ModuleScanner +# modules suggested for deprecation by PEP 594 (www.python.org/dev/peps/pep-0594/) +# some of these might be implemented, but they are not a priority +PEP_594_MODULES = { + "aifc", + "asynchat", + "asyncore", + "audioop", + "binhex", + "cgi", + "cgitb", + "chunk", + "crypt", + "formatter", + "fpectl", + "imghdr", + "imp", + "macpath", + "msilib", + "nntplib", + "nis", + "ossaudiodev", + "parser", + "pipes", + "smtpd", + "sndhdr", + "spwd", + "sunau", + "telnetlib", + "uu", + "xdrlib", +} + + sys.path = [ path for path in sys.path @@ -163,7 +196,7 @@ def gen_modules(): # e.g. printing something or opening a webpage modules = {} for mod_name in scan_modules(): - if mod_name in ("this", "antigravity"): + if mod_name in ("this", "antigravity") or mod_name in PEP_594_MODULES: continue dir_result = dir_of_mod_or_error(mod_name) if isinstance(dir_result, Exception): diff --git a/vm/src/stdlib/os.rs b/vm/src/stdlib/os.rs index a8dc67f9a0..412a91d40f 100644 --- a/vm/src/stdlib/os.rs +++ b/vm/src/stdlib/os.rs @@ -1563,8 +1563,12 @@ mod _os { // Safety: p_offset_src and p_offset_dst is a unique pointer for offset_src and offset_dst respectively, // and will only be freed after this function ends. + // + // Why not use `libc::copy_file_range`: On `musl-libc`, `libc::copy_file_range` is not provided. Therefore + // we use syscalls directly instead. let ret = unsafe { - libc::copy_file_range( + libc::syscall( + libc::SYS_copy_file_range, args.src, p_offset_src as *mut i64, args.dst,