summaryrefslogtreecommitdiff
path: root/t/unit/test_connection.py
diff options
context:
space:
mode:
Diffstat (limited to 't/unit/test_connection.py')
-rw-r--r--t/unit/test_connection.py203
1 files changed, 191 insertions, 12 deletions
diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py
index 0b184d3b..c2daee3b 100644
--- a/t/unit/test_connection.py
+++ b/t/unit/test_connection.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import pickle
import socket
from copy import copy, deepcopy
@@ -9,7 +11,7 @@ from kombu import Connection, Consumer, Producer, parse_url
from kombu.connection import Resource
from kombu.exceptions import OperationalError
from kombu.utils.functional import lazy
-from t.mocks import Transport
+from t.mocks import TimeoutingTransport, Transport
class test_connection_utils:
@@ -99,6 +101,19 @@ class test_connection_utils:
# see Appendix A of http://www.rabbitmq.com/uri-spec.html
self.assert_info(Connection(url), **expected)
+ @pytest.mark.parametrize('url,expected', [
+ ('sqs://user:pass@',
+ {'userid': None, 'password': None, 'hostname': None,
+ 'port': None, 'virtual_host': '/'}),
+ ('sqs://',
+ {'userid': None, 'password': None, 'hostname': None,
+ 'port': None, 'virtual_host': '/'}),
+ ])
+ def test_sqs_example_urls(self, url, expected, caplog):
+ pytest.importorskip('boto3')
+ self.assert_info(Connection('sqs://'), **expected)
+ assert not caplog.records
+
@pytest.mark.skip('TODO: urllib cannot parse ipv6 urls')
def test_url_IPV6(self):
self.assert_info(
@@ -293,7 +308,9 @@ class test_Connection:
assert not c.is_evented
def test_register_with_event_loop(self):
- c = Connection(transport=Mock)
+ transport = Mock(name='transport')
+ transport.connection_errors = []
+ c = Connection(transport=transport)
loop = Mock(name='loop')
c.register_with_event_loop(loop)
c.transport.register_with_event_loop.assert_called_with(
@@ -383,14 +400,12 @@ class test_Connection:
qsms.assert_called_with(self.conn.connection)
def test__enter____exit__(self):
- conn = self.conn
- context = conn.__enter__()
- assert context is conn
- conn.connect()
- assert conn.connection.connected
- conn.__exit__()
- assert conn.connection is None
- conn.close() # again
+ with self.conn as context:
+ assert context is self.conn
+ self.conn.connect()
+ assert self.conn.connection.connected
+ assert self.conn.connection is None
+ self.conn.close() # again
def test_close_survives_connerror(self):
@@ -477,15 +492,52 @@ class test_Connection:
def publish():
raise _ConnectionError('failed connection')
- self.conn.transport.connection_errors = (_ConnectionError,)
+ self.conn.get_transport_cls().connection_errors = (_ConnectionError,)
ensured = self.conn.ensure(self.conn, publish)
with pytest.raises(OperationalError):
ensured()
+ def test_ensure_retry_errors_is_not_looping_infinitely(self):
+ class _MessageNacked(Exception):
+ pass
+
+ def publish():
+ raise _MessageNacked('NACK')
+
+ with pytest.raises(ValueError):
+ self.conn.ensure(
+ self.conn,
+ publish,
+ retry_errors=(_MessageNacked,)
+ )
+
+ def test_ensure_retry_errors_is_limited_by_max_retries(self):
+ class _MessageNacked(Exception):
+ pass
+
+ tries = 0
+
+ def publish():
+ nonlocal tries
+ tries += 1
+ if tries <= 3:
+ raise _MessageNacked('NACK')
+ # On the 4th try, we let it pass
+ return 'ACK'
+
+ ensured = self.conn.ensure(
+ self.conn,
+ publish,
+ max_retries=3, # 3 retries + 1 initial try = 4 tries
+ retry_errors=(_MessageNacked,)
+ )
+
+ assert ensured() == 'ACK'
+
def test_autoretry(self):
myfun = Mock()
- self.conn.transport.connection_errors = (KeyError,)
+ self.conn.get_transport_cls().connection_errors = (KeyError,)
def on_call(*args, **kwargs):
myfun.side_effect = None
@@ -571,6 +623,18 @@ class test_Connection:
conn = Connection(transport=MyTransport)
assert conn.channel_errors == (KeyError, ValueError)
+ def test_channel_errors__exception_no_cache(self):
+ """Ensure the channel_errors can be retrieved without an initialized
+ transport.
+ """
+
+ class MyTransport(Transport):
+ channel_errors = (KeyError,)
+
+ conn = Connection(transport=MyTransport)
+ MyTransport.__init__ = Mock(side_effect=Exception)
+ assert conn.channel_errors == (KeyError,)
+
def test_connection_errors(self):
class MyTransport(Transport):
@@ -579,6 +643,80 @@ class test_Connection:
conn = Connection(transport=MyTransport)
assert conn.connection_errors == (KeyError, ValueError)
+ def test_connection_errors__exception_no_cache(self):
+ """Ensure the connection_errors can be retrieved without an
+ initialized transport.
+ """
+
+ class MyTransport(Transport):
+ connection_errors = (KeyError,)
+
+ conn = Connection(transport=MyTransport)
+ MyTransport.__init__ = Mock(side_effect=Exception)
+ assert conn.connection_errors == (KeyError,)
+
+ def test_recoverable_connection_errors(self):
+
+ class MyTransport(Transport):
+ recoverable_connection_errors = (KeyError, ValueError)
+
+ conn = Connection(transport=MyTransport)
+ assert conn.recoverable_connection_errors == (KeyError, ValueError)
+
+ def test_recoverable_connection_errors__fallback(self):
+ """Ensure missing recoverable_connection_errors on the Transport does
+ not cause a fatal error.
+ """
+
+ class MyTransport(Transport):
+ connection_errors = (KeyError,)
+ channel_errors = (ValueError,)
+
+ conn = Connection(transport=MyTransport)
+ assert conn.recoverable_connection_errors == (KeyError, ValueError)
+
+ def test_recoverable_connection_errors__exception_no_cache(self):
+ """Ensure the recoverable_connection_errors can be retrieved without
+ an initialized transport.
+ """
+
+ class MyTransport(Transport):
+ recoverable_connection_errors = (KeyError,)
+
+ conn = Connection(transport=MyTransport)
+ MyTransport.__init__ = Mock(side_effect=Exception)
+ assert conn.recoverable_connection_errors == (KeyError,)
+
+ def test_recoverable_channel_errors(self):
+
+ class MyTransport(Transport):
+ recoverable_channel_errors = (KeyError, ValueError)
+
+ conn = Connection(transport=MyTransport)
+ assert conn.recoverable_channel_errors == (KeyError, ValueError)
+
+ def test_recoverable_channel_errors__fallback(self):
+ """Ensure missing recoverable_channel_errors on the Transport does not
+ cause a fatal error.
+ """
+
+ class MyTransport(Transport):
+ pass
+
+ conn = Connection(transport=MyTransport)
+ assert conn.recoverable_channel_errors == ()
+
+ def test_recoverable_channel_errors__exception_no_cache(self):
+ """Ensure the recoverable_channel_errors can be retrieved without an
+ initialized transport.
+ """
+ class MyTransport(Transport):
+ recoverable_channel_errors = (KeyError,)
+
+ conn = Connection(transport=MyTransport)
+ MyTransport.__init__ = Mock(side_effect=Exception)
+ assert conn.recoverable_channel_errors == (KeyError,)
+
def test_multiple_urls_hostname(self):
conn = Connection(['example.com;amqp://example.com'])
assert conn.as_uri() == 'amqp://guest:**@example.com:5672//'
@@ -587,6 +725,47 @@ class test_Connection:
conn = Connection('example.com;example.com;')
assert conn.as_uri() == 'amqp://guest:**@example.com:5672//'
+ def test_connection_respect_its_timeout(self):
+ invalid_port = 1222
+ with Connection(
+ f'amqp://guest:guest@localhost:{invalid_port}//',
+ transport_options={'max_retries': 2},
+ connect_timeout=1
+ ) as conn:
+ with pytest.raises(OperationalError):
+ conn.default_channel
+
+ def test_connection_failover_without_total_timeout(self):
+ with Connection(
+ ['server1', 'server2'],
+ transport=TimeoutingTransport,
+ connect_timeout=1,
+ transport_options={'interval_start': 0, 'interval_step': 0},
+ ) as conn:
+ conn._establish_connection = Mock(
+ side_effect=conn._establish_connection
+ )
+ with pytest.raises(OperationalError):
+ conn.default_channel
+ # Never retried, because `retry_over_time` `timeout` is equal
+ # to `connect_timeout`
+ conn._establish_connection.assert_called_once()
+
+ def test_connection_failover_with_total_timeout(self):
+ with Connection(
+ ['server1', 'server2'],
+ transport=TimeoutingTransport,
+ connect_timeout=1,
+ transport_options={'connect_retries_timeout': 2,
+ 'interval_start': 0, 'interval_step': 0},
+ ) as conn:
+ conn._establish_connection = Mock(
+ side_effect=conn._establish_connection
+ )
+ with pytest.raises(OperationalError):
+ conn.default_channel
+ assert conn._establish_connection.call_count == 2
+
class test_Connection_with_transport_options: