diff options
Diffstat (limited to 't/unit/test_connection.py')
-rw-r--r-- | t/unit/test_connection.py | 203 |
1 files changed, 191 insertions, 12 deletions
diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index 0b184d3b..c2daee3b 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import pickle import socket from copy import copy, deepcopy @@ -9,7 +11,7 @@ from kombu import Connection, Consumer, Producer, parse_url from kombu.connection import Resource from kombu.exceptions import OperationalError from kombu.utils.functional import lazy -from t.mocks import Transport +from t.mocks import TimeoutingTransport, Transport class test_connection_utils: @@ -99,6 +101,19 @@ class test_connection_utils: # see Appendix A of http://www.rabbitmq.com/uri-spec.html self.assert_info(Connection(url), **expected) + @pytest.mark.parametrize('url,expected', [ + ('sqs://user:pass@', + {'userid': None, 'password': None, 'hostname': None, + 'port': None, 'virtual_host': '/'}), + ('sqs://', + {'userid': None, 'password': None, 'hostname': None, + 'port': None, 'virtual_host': '/'}), + ]) + def test_sqs_example_urls(self, url, expected, caplog): + pytest.importorskip('boto3') + self.assert_info(Connection('sqs://'), **expected) + assert not caplog.records + @pytest.mark.skip('TODO: urllib cannot parse ipv6 urls') def test_url_IPV6(self): self.assert_info( @@ -293,7 +308,9 @@ class test_Connection: assert not c.is_evented def test_register_with_event_loop(self): - c = Connection(transport=Mock) + transport = Mock(name='transport') + transport.connection_errors = [] + c = Connection(transport=transport) loop = Mock(name='loop') c.register_with_event_loop(loop) c.transport.register_with_event_loop.assert_called_with( @@ -383,14 +400,12 @@ class test_Connection: qsms.assert_called_with(self.conn.connection) def test__enter____exit__(self): - conn = self.conn - context = conn.__enter__() - assert context is conn - conn.connect() - assert conn.connection.connected - conn.__exit__() - assert conn.connection is None - conn.close() # again + with self.conn as context: + assert context is self.conn + self.conn.connect() + assert self.conn.connection.connected + assert self.conn.connection is None + self.conn.close() # again def test_close_survives_connerror(self): @@ -477,15 +492,52 @@ class test_Connection: def publish(): raise _ConnectionError('failed connection') - self.conn.transport.connection_errors = (_ConnectionError,) + self.conn.get_transport_cls().connection_errors = (_ConnectionError,) ensured = self.conn.ensure(self.conn, publish) with pytest.raises(OperationalError): ensured() + def test_ensure_retry_errors_is_not_looping_infinitely(self): + class _MessageNacked(Exception): + pass + + def publish(): + raise _MessageNacked('NACK') + + with pytest.raises(ValueError): + self.conn.ensure( + self.conn, + publish, + retry_errors=(_MessageNacked,) + ) + + def test_ensure_retry_errors_is_limited_by_max_retries(self): + class _MessageNacked(Exception): + pass + + tries = 0 + + def publish(): + nonlocal tries + tries += 1 + if tries <= 3: + raise _MessageNacked('NACK') + # On the 4th try, we let it pass + return 'ACK' + + ensured = self.conn.ensure( + self.conn, + publish, + max_retries=3, # 3 retries + 1 initial try = 4 tries + retry_errors=(_MessageNacked,) + ) + + assert ensured() == 'ACK' + def test_autoretry(self): myfun = Mock() - self.conn.transport.connection_errors = (KeyError,) + self.conn.get_transport_cls().connection_errors = (KeyError,) def on_call(*args, **kwargs): myfun.side_effect = None @@ -571,6 +623,18 @@ class test_Connection: conn = Connection(transport=MyTransport) assert conn.channel_errors == (KeyError, ValueError) + def test_channel_errors__exception_no_cache(self): + """Ensure the channel_errors can be retrieved without an initialized + transport. + """ + + class MyTransport(Transport): + channel_errors = (KeyError,) + + conn = Connection(transport=MyTransport) + MyTransport.__init__ = Mock(side_effect=Exception) + assert conn.channel_errors == (KeyError,) + def test_connection_errors(self): class MyTransport(Transport): @@ -579,6 +643,80 @@ class test_Connection: conn = Connection(transport=MyTransport) assert conn.connection_errors == (KeyError, ValueError) + def test_connection_errors__exception_no_cache(self): + """Ensure the connection_errors can be retrieved without an + initialized transport. + """ + + class MyTransport(Transport): + connection_errors = (KeyError,) + + conn = Connection(transport=MyTransport) + MyTransport.__init__ = Mock(side_effect=Exception) + assert conn.connection_errors == (KeyError,) + + def test_recoverable_connection_errors(self): + + class MyTransport(Transport): + recoverable_connection_errors = (KeyError, ValueError) + + conn = Connection(transport=MyTransport) + assert conn.recoverable_connection_errors == (KeyError, ValueError) + + def test_recoverable_connection_errors__fallback(self): + """Ensure missing recoverable_connection_errors on the Transport does + not cause a fatal error. + """ + + class MyTransport(Transport): + connection_errors = (KeyError,) + channel_errors = (ValueError,) + + conn = Connection(transport=MyTransport) + assert conn.recoverable_connection_errors == (KeyError, ValueError) + + def test_recoverable_connection_errors__exception_no_cache(self): + """Ensure the recoverable_connection_errors can be retrieved without + an initialized transport. + """ + + class MyTransport(Transport): + recoverable_connection_errors = (KeyError,) + + conn = Connection(transport=MyTransport) + MyTransport.__init__ = Mock(side_effect=Exception) + assert conn.recoverable_connection_errors == (KeyError,) + + def test_recoverable_channel_errors(self): + + class MyTransport(Transport): + recoverable_channel_errors = (KeyError, ValueError) + + conn = Connection(transport=MyTransport) + assert conn.recoverable_channel_errors == (KeyError, ValueError) + + def test_recoverable_channel_errors__fallback(self): + """Ensure missing recoverable_channel_errors on the Transport does not + cause a fatal error. + """ + + class MyTransport(Transport): + pass + + conn = Connection(transport=MyTransport) + assert conn.recoverable_channel_errors == () + + def test_recoverable_channel_errors__exception_no_cache(self): + """Ensure the recoverable_channel_errors can be retrieved without an + initialized transport. + """ + class MyTransport(Transport): + recoverable_channel_errors = (KeyError,) + + conn = Connection(transport=MyTransport) + MyTransport.__init__ = Mock(side_effect=Exception) + assert conn.recoverable_channel_errors == (KeyError,) + def test_multiple_urls_hostname(self): conn = Connection(['example.com;amqp://example.com']) assert conn.as_uri() == 'amqp://guest:**@example.com:5672//' @@ -587,6 +725,47 @@ class test_Connection: conn = Connection('example.com;example.com;') assert conn.as_uri() == 'amqp://guest:**@example.com:5672//' + def test_connection_respect_its_timeout(self): + invalid_port = 1222 + with Connection( + f'amqp://guest:guest@localhost:{invalid_port}//', + transport_options={'max_retries': 2}, + connect_timeout=1 + ) as conn: + with pytest.raises(OperationalError): + conn.default_channel + + def test_connection_failover_without_total_timeout(self): + with Connection( + ['server1', 'server2'], + transport=TimeoutingTransport, + connect_timeout=1, + transport_options={'interval_start': 0, 'interval_step': 0}, + ) as conn: + conn._establish_connection = Mock( + side_effect=conn._establish_connection + ) + with pytest.raises(OperationalError): + conn.default_channel + # Never retried, because `retry_over_time` `timeout` is equal + # to `connect_timeout` + conn._establish_connection.assert_called_once() + + def test_connection_failover_with_total_timeout(self): + with Connection( + ['server1', 'server2'], + transport=TimeoutingTransport, + connect_timeout=1, + transport_options={'connect_retries_timeout': 2, + 'interval_start': 0, 'interval_step': 0}, + ) as conn: + conn._establish_connection = Mock( + side_effect=conn._establish_connection + ) + with pytest.raises(OperationalError): + conn.default_channel + assert conn._establish_connection.call_count == 2 + class test_Connection_with_transport_options: |