From 7a15f04bab303acbce8017ccc2ad13ab21fad2de Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 1 May 2025 15:58:31 -0400 Subject: [PATCH 1/8] PYTHON-5369 - Re-raise socket.timeout errors if the deadline has already been execeeded --- pymongo/network_layer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 3fa180bf7a..0e7c037106 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -357,7 +357,7 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me except socket.timeout: if conn.cancel_context.cancelled: raise _OperationCancelled("operation cancelled") from None - if _PYPY: + if _PYPY or deadline is not None and deadline - time.monotonic() < 0: # We reached the true deadline. raise continue From 59e1897757d930344067548358019eba2eaa75d6 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 2 May 2025 08:33:11 -0400 Subject: [PATCH 2/8] Try detecting faas --- pymongo/network_layer.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 0e7c037106..4c17a2875b 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -36,6 +36,7 @@ from pymongo.compression_support import decompress from pymongo.errors import ProtocolError, _OperationCancelled from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply +from pymongo.pool_options import _is_faas from pymongo.socket_checker import _errno_from_exception try: @@ -357,7 +358,12 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me except socket.timeout: if conn.cancel_context.cancelled: raise _OperationCancelled("operation cancelled") from None - if _PYPY or deadline is not None and deadline - time.monotonic() < 0: + if ( + _PYPY + or not _is_faas() + and deadline is not None + and deadline - time.monotonic() < 0 + ): # We reached the true deadline. raise continue From 7765b7d6c11cb2469e83207f0e558135ae342d90 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 2 May 2025 10:26:50 -0400 Subject: [PATCH 3/8] Only run test_sigstop_sigint on FaaS --- test/asynchronous/test_client.py | 3 ++- test/test_client.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 1e1faf0a2a..dd69af3746 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -117,7 +117,7 @@ WriteConcernError, ) from pymongo.monitoring import ServerHeartbeatListener, ServerHeartbeatStartedEvent -from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions +from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions, _is_faas from pymongo.read_preferences import ReadPreference from pymongo.server_description import ServerDescription from pymongo.server_selectors import readable_server_selector, writable_server_selector @@ -2010,6 +2010,7 @@ async def test_srv_max_hosts_kwarg(self): "loadBalanced clients do not run SDAM", ) @unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP") + @unittest.skipUnless(_is_faas(), "Non-FaaS environments raise timeouts faster") @async_client_context.require_sync def test_sigstop_sigcont(self): test_dir = os.path.dirname(os.path.realpath(__file__)) diff --git a/test/test_client.py b/test/test_client.py index 189e58e803..416eba9e6e 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -106,7 +106,7 @@ WriteConcernError, ) from pymongo.monitoring import ServerHeartbeatListener, ServerHeartbeatStartedEvent -from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions +from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions, _is_faas from pymongo.read_preferences import ReadPreference from pymongo.server_description import ServerDescription from pymongo.server_selectors import readable_server_selector, writable_server_selector @@ -1967,6 +1967,7 @@ def test_srv_max_hosts_kwarg(self): "loadBalanced clients do not run SDAM", ) @unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP") + @unittest.skipUnless(_is_faas(), "Non-FaaS environments raise timeouts faster") @client_context.require_sync def test_sigstop_sigcont(self): test_dir = os.path.dirname(os.path.realpath(__file__)) From 55324d4821c41500945993d04c00687e0d38ed29 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 6 May 2025 09:38:21 -0400 Subject: [PATCH 4/8] Mock running on FaaS --- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index dd69af3746..cf0b6d6e04 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2010,8 +2010,8 @@ async def test_srv_max_hosts_kwarg(self): "loadBalanced clients do not run SDAM", ) @unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP") - @unittest.skipUnless(_is_faas(), "Non-FaaS environments raise timeouts faster") @async_client_context.require_sync + @mock.patch.dict("os.environ", {"AWS_LAMBDA_RUNTIME_API": "1"}) def test_sigstop_sigcont(self): test_dir = os.path.dirname(os.path.realpath(__file__)) script = os.path.join(test_dir, "sigstop_sigcont.py") diff --git a/test/test_client.py b/test/test_client.py index 416eba9e6e..4e28d01e73 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1967,8 +1967,8 @@ def test_srv_max_hosts_kwarg(self): "loadBalanced clients do not run SDAM", ) @unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP") - @unittest.skipUnless(_is_faas(), "Non-FaaS environments raise timeouts faster") @client_context.require_sync + @mock.patch.dict("os.environ", {"AWS_LAMBDA_RUNTIME_API": "1"}) def test_sigstop_sigcont(self): test_dir = os.path.dirname(os.path.realpath(__file__)) script = os.path.join(test_dir, "sigstop_sigcont.py") From 40058216bc9cf6eef702ed2cbfc59ca5844fb5c7 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 8 May 2025 13:38:48 -0400 Subject: [PATCH 5/8] Remove unused import --- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index cf0b6d6e04..d8c7d66ff3 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -117,7 +117,7 @@ WriteConcernError, ) from pymongo.monitoring import ServerHeartbeatListener, ServerHeartbeatStartedEvent -from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions, _is_faas +from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions from pymongo.read_preferences import ReadPreference from pymongo.server_description import ServerDescription from pymongo.server_selectors import readable_server_selector, writable_server_selector diff --git a/test/test_client.py b/test/test_client.py index 4e28d01e73..54c82a6d21 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -106,7 +106,7 @@ WriteConcernError, ) from pymongo.monitoring import ServerHeartbeatListener, ServerHeartbeatStartedEvent -from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions, _is_faas +from pymongo.pool_options import _MAX_METADATA_SIZE, _METADATA, ENV_VAR_K8S, PoolOptions from pymongo.read_preferences import ReadPreference from pymongo.server_description import ServerDescription from pymongo.server_selectors import readable_server_selector, writable_server_selector From d851d85e7198d46e03d40b383847b570fe2f23f8 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 8 May 2025 14:19:34 -0400 Subject: [PATCH 6/8] Rename handshake to is_sdamm and use to detect when to re-raise network timeouts early --- pymongo/asynchronous/pool.py | 17 ++++++++++------- pymongo/asynchronous/topology.py | 2 +- pymongo/network_layer.py | 3 +-- pymongo/synchronous/pool.py | 17 ++++++++++------- pymongo/synchronous/topology.py | 2 +- test/asynchronous/utils.py | 3 ++- test/utils.py | 3 ++- 7 files changed, 27 insertions(+), 20 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index f4d5b174fa..947b3124a5 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -131,6 +131,7 @@ class AsyncConnection: :param pool: a Pool instance :param address: the server's (host, port) :param id: the id of this socket in it's pool + :param is_sdam: SDAM connections do not call hello on creation """ def __init__( @@ -139,11 +140,13 @@ def __init__( pool: Pool, address: tuple[str, int], id: int, + is_sdam: bool, ): self.pool_ref = weakref.ref(pool) self.conn = conn self.address = address self.id = id + self.is_sdam = is_sdam self.closed = False self.last_checkin_time = time.monotonic() self.performed_handshake = False @@ -711,13 +714,13 @@ def __init__( self, address: _Address, options: PoolOptions, - handshake: bool = True, + is_sdam: bool = True, client_id: Optional[ObjectId] = None, ): """ :param address: a (hostname, port) tuple :param options: a PoolOptions instance - :param handshake: whether to call hello for each new AsyncConnection + :param is_sdam: whether to call hello for each new AsyncConnection """ if options.pause_enabled: self.state = PoolState.PAUSED @@ -746,14 +749,14 @@ def __init__( self.pid = os.getpid() self.address = address self.opts = options - self.handshake = handshake + self.is_sdam = is_sdam # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( - self.handshake + not self.is_sdam and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) - self.enabled_for_logging = self.handshake + self.enabled_for_logging = not self.is_sdam # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1058,14 +1061,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A raise - conn = AsyncConnection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type] + conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] async with self.lock: self.active_contexts.add(conn.cancel_context) self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() try: - if self.handshake: + if not self.is_sdam: await conn.hello() self.is_writable = conn.is_writable if handler: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 438dd1e352..942945598f 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -985,7 +985,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: ) return self._settings.pool_class( - address, monitor_pool_options, handshake=False, client_id=self._topology_id + address, monitor_pool_options, is_sdam=True, client_id=self._topology_id ) def _error_message(self, selector: Callable[[Selection], Selection]) -> str: diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 4c17a2875b..0a02b39a17 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -36,7 +36,6 @@ from pymongo.compression_support import decompress from pymongo.errors import ProtocolError, _OperationCancelled from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply -from pymongo.pool_options import _is_faas from pymongo.socket_checker import _errno_from_exception try: @@ -360,7 +359,7 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me raise _OperationCancelled("operation cancelled") from None if ( _PYPY - or not _is_faas() + or not conn.is_sdam() and deadline is not None and deadline - time.monotonic() < 0 ): diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 44aec31a86..8d2a0e9d75 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -131,6 +131,7 @@ class Connection: :param pool: a Pool instance :param address: the server's (host, port) :param id: the id of this socket in it's pool + :param is_sdam: SDAM connections do not call hello on creation """ def __init__( @@ -139,11 +140,13 @@ def __init__( pool: Pool, address: tuple[str, int], id: int, + is_sdam: bool, ): self.pool_ref = weakref.ref(pool) self.conn = conn self.address = address self.id = id + self.is_sdam = is_sdam self.closed = False self.last_checkin_time = time.monotonic() self.performed_handshake = False @@ -709,13 +712,13 @@ def __init__( self, address: _Address, options: PoolOptions, - handshake: bool = True, + is_sdam: bool = True, client_id: Optional[ObjectId] = None, ): """ :param address: a (hostname, port) tuple :param options: a PoolOptions instance - :param handshake: whether to call hello for each new Connection + :param is_sdam: whether to call hello for each new Connection """ if options.pause_enabled: self.state = PoolState.PAUSED @@ -744,14 +747,14 @@ def __init__( self.pid = os.getpid() self.address = address self.opts = options - self.handshake = handshake + self.is_sdam = is_sdam # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( - self.handshake + not self.is_sdam and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) - self.enabled_for_logging = self.handshake + self.enabled_for_logging = not self.is_sdam # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1054,14 +1057,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect raise - conn = Connection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type] + conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] with self.lock: self.active_contexts.add(conn.cancel_context) self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() try: - if self.handshake: + if not self.is_sdam: conn.hello() self.is_writable = conn.is_writable if handler: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 1e99adf726..011eedf111 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -983,7 +983,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: ) return self._settings.pool_class( - address, monitor_pool_options, handshake=False, client_id=self._topology_id + address, monitor_pool_options, is_sdam=True, client_id=self._topology_id ) def _error_message(self, selector: Callable[[Selection], Selection]) -> str: diff --git a/test/asynchronous/utils.py b/test/asynchronous/utils.py index f653c575e9..ca80d1f6dd 100644 --- a/test/asynchronous/utils.py +++ b/test/asynchronous/utils.py @@ -159,6 +159,7 @@ def __init__(self): self.cancel_context = _CancellationContext() self.more_to_come = False self.id = random.randint(0, 100) + self.is_sdam = False self.server_connection_id = random.randint(0, 100) def close_conn(self, reason): @@ -172,7 +173,7 @@ def __aexit__(self, exc_type, exc_val, exc_tb): class AsyncMockPool: - def __init__(self, address, options, handshake=True, client_id=None): + def __init__(self, address, options, is_sdam=False, client_id=None): self.gen = _PoolGeneration() self._lock = _async_create_lock() self.opts = options diff --git a/test/utils.py b/test/utils.py index 3027ed7517..25d95d1d3c 100644 --- a/test/utils.py +++ b/test/utils.py @@ -157,6 +157,7 @@ def __init__(self): self.cancel_context = _CancellationContext() self.more_to_come = False self.id = random.randint(0, 100) + self.is_sdam = False self.server_connection_id = random.randint(0, 100) def close_conn(self, reason): @@ -170,7 +171,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): class MockPool: - def __init__(self, address, options, handshake=True, client_id=None): + def __init__(self, address, options, is_sdam=False, client_id=None): self.gen = _PoolGeneration() self._lock = _create_lock() self.opts = options From e775c653117ca9fac7438d31bbc14e07129bf0fd Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 8 May 2025 14:30:24 -0400 Subject: [PATCH 7/8] Fix default is_sdam setting --- pymongo/asynchronous/pool.py | 2 +- pymongo/network_layer.py | 2 +- pymongo/synchronous/pool.py | 2 +- test/asynchronous/test_client.py | 1 - test/test_client.py | 1 - 5 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 947b3124a5..9a39883fc2 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -714,7 +714,7 @@ def __init__( self, address: _Address, options: PoolOptions, - is_sdam: bool = True, + is_sdam: bool = False, client_id: Optional[ObjectId] = None, ): """ diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 0a02b39a17..6f1bb9a357 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -359,7 +359,7 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me raise _OperationCancelled("operation cancelled") from None if ( _PYPY - or not conn.is_sdam() + or not conn.is_sdam and deadline is not None and deadline - time.monotonic() < 0 ): diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 8d2a0e9d75..505f58c60f 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -712,7 +712,7 @@ def __init__( self, address: _Address, options: PoolOptions, - is_sdam: bool = True, + is_sdam: bool = False, client_id: Optional[ObjectId] = None, ): """ diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index d8c7d66ff3..1e1faf0a2a 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2011,7 +2011,6 @@ async def test_srv_max_hosts_kwarg(self): ) @unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP") @async_client_context.require_sync - @mock.patch.dict("os.environ", {"AWS_LAMBDA_RUNTIME_API": "1"}) def test_sigstop_sigcont(self): test_dir = os.path.dirname(os.path.realpath(__file__)) script = os.path.join(test_dir, "sigstop_sigcont.py") diff --git a/test/test_client.py b/test/test_client.py index 54c82a6d21..189e58e803 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1968,7 +1968,6 @@ def test_srv_max_hosts_kwarg(self): ) @unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP") @client_context.require_sync - @mock.patch.dict("os.environ", {"AWS_LAMBDA_RUNTIME_API": "1"}) def test_sigstop_sigcont(self): test_dir = os.path.dirname(os.path.realpath(__file__)) script = os.path.join(test_dir, "sigstop_sigcont.py") From 968fc56b1928a0c738910fc9a937a39b83402e5d Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 8 May 2025 14:39:21 -0400 Subject: [PATCH 8/8] Fix test_timeout_configuration --- test/test_topology.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_topology.py b/test/test_topology.py index 22e94739ee..141b2d7f21 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -121,7 +121,7 @@ def test_timeout_configuration(self): self.assertEqual(1, monitor._pool.opts.socket_timeout) # The monitor, not its pool, is responsible for calling hello. - self.assertFalse(monitor._pool.handshake) + self.assertTrue(monitor._pool.is_sdam) class TestSingleServerTopology(TopologyTest):