diff options
| author | Asif Saif Uddin <auvipy@gmail.com> | 2023-04-08 22:45:08 +0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-04-08 22:45:08 +0600 |
| commit | 973dc3790ac25b9da7b6d2641ac72d95470f6ed8 (patch) | |
| tree | 9e7ba02d8520994a06efc37dde05fba722138189 /t/integration | |
| parent | 7ceb675bb69917fae182ebdaf9a2298a308c3fa4 (diff) | |
| parent | 2de7f9f038dd62e097e490cb3fa609067c1c3c36 (diff) | |
| download | kombu-py310.tar.gz | |
Merge branch 'main' into py310py310
Diffstat (limited to 't/integration')
| -rw-r--r-- | t/integration/__init__.py | 2 | ||||
| -rw-r--r-- | t/integration/common.py | 53 | ||||
| -rw-r--r-- | t/integration/test_kafka.py | 69 | ||||
| -rw-r--r-- | t/integration/test_mongodb.py | 115 | ||||
| -rw-r--r-- | t/integration/test_py_amqp.py | 12 | ||||
| -rw-r--r-- | t/integration/test_redis.py | 34 |
6 files changed, 279 insertions, 6 deletions
diff --git a/t/integration/__init__.py b/t/integration/__init__.py index 1bea4880..cfce4f85 100644 --- a/t/integration/__init__.py +++ b/t/integration/__init__.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import sys diff --git a/t/integration/common.py b/t/integration/common.py index 84f44dd3..82dc3f1b 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import socket from contextlib import closing from time import sleep @@ -136,14 +138,21 @@ class BaseExchangeTypes: message.delivery_info['exchange'] == '' assert message.payload == body - def _consume(self, connection, queue): + def _create_consumer(self, connection, queue): consumer = kombu.Consumer( connection, [queue], accept=['pickle'] ) consumer.register_callback(self._callback) + return consumer + + def _consume_from(self, connection, consumer): with consumer: connection.drain_events(timeout=1) + def _consume(self, connection, queue): + with self._create_consumer(connection, queue): + connection.drain_events(timeout=1) + def _publish(self, channel, exchange, queues=None, routing_key=None): producer = kombu.Producer(channel, exchange=exchange) if routing_key: @@ -213,7 +222,6 @@ class BaseExchangeTypes: channel, ex, [test_queue1, test_queue2, test_queue3], routing_key='t.1' ) - self._consume(conn, test_queue1) self._consume(conn, test_queue2) with pytest.raises(socket.timeout): @@ -398,6 +406,47 @@ class BasePriority: assert msg.payload == data +class BaseMessage: + + def test_ack(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_ack')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.ack() + with pytest.raises(queue.Empty): + queue.get_nowait() + + def test_reject_no_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_reject_no_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.reject(requeue=False) + with pytest.raises(queue.Empty): + queue.get_nowait() + + def test_reject_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_reject_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.reject(requeue=True) + message2 = queue.get_nowait() + assert message.body == message2.body + message2.ack() + + def test_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.requeue() + message2 = queue.get_nowait() + assert message.body == message2.body + message2.ack() + + class BaseFailover(BasicFunctionality): def test_connect(self, failover_connection): diff --git a/t/integration/test_kafka.py b/t/integration/test_kafka.py new file mode 100644 index 00000000..2303d887 --- /dev/null +++ b/t/integration/test_kafka.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import pytest + +import kombu + +from .common import (BaseExchangeTypes, BaseFailover, BaseMessage, + BasicFunctionality) + + +def get_connection(hostname, port): + return kombu.Connection( + f'confluentkafka://{hostname}:{port}', + ) + + +def get_failover_connection(hostname, port): + return kombu.Connection( + f'confluentkafka://localhost:12345;confluentkafka://{hostname}:{port}', + connect_timeout=10, + ) + + +@pytest.fixture() +def invalid_connection(): + return kombu.Connection('confluentkafka://localhost:12345') + + +@pytest.fixture() +def connection(): + return get_connection( + hostname='localhost', + port='9092' + ) + + +@pytest.fixture() +def failover_connection(): + return get_failover_connection( + hostname='localhost', + port='9092' + ) + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaBasicFunctionality(BasicFunctionality): + pass + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaBaseExchangeTypes(BaseExchangeTypes): + + @pytest.mark.skip('fanout is not implemented') + def test_fanout(self, connection): + pass + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaFailover(BaseFailover): + pass + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaMessage(BaseMessage): + pass diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py new file mode 100644 index 00000000..445f1389 --- /dev/null +++ b/t/integration/test_mongodb.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import os + +import pytest + +import kombu + +from .common import (BaseExchangeTypes, BaseMessage, BasePriority, + BasicFunctionality) + + +def get_connection(hostname, port, vhost): + return kombu.Connection( + f'mongodb://{hostname}:{port}/{vhost}', + transport_options={'ttl': True}, + ) + + +@pytest.fixture() +def invalid_connection(): + return kombu.Connection('mongodb://localhost:12345?connectTimeoutMS=1') + + +@pytest.fixture() +def connection(request): + return get_connection( + hostname=os.environ.get('MONGODB_HOST', 'localhost'), + port=os.environ.get('MONGODB_27017_TCP', '27017'), + vhost=getattr( + request.config, "slaveinput", {} + ).get("slaveid", 'tests'), + ) + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBBasicFunctionality(BasicFunctionality): + pass + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBBaseExchangeTypes(BaseExchangeTypes): + + # MongoDB consumer skips old messages upon initialization. + # Ensure that it's created before test messages are published. + + def test_fanout(self, connection): + ex = kombu.Exchange('test_fanout', type='fanout') + test_queue1 = kombu.Queue('fanout1', exchange=ex) + consumer1 = self._create_consumer(connection, test_queue1) + test_queue2 = kombu.Queue('fanout2', exchange=ex) + consumer2 = self._create_consumer(connection, test_queue2) + + with connection as conn: + with conn.channel() as channel: + self._publish(channel, ex, [test_queue1, test_queue2]) + + self._consume_from(conn, consumer1) + self._consume_from(conn, consumer2) + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBPriority(BasePriority): + + # drain_events() consumes only one value unlike in py-amqp. + + def test_publish_consume(self, connection): + test_queue = kombu.Queue( + 'priority_test', routing_key='priority_test', max_priority=10 + ) + + received_messages = [] + + def callback(body, message): + received_messages.append(body) + message.ack() + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 3], + [{'msg': 'second'}, 6], + [{'msg': 'third'}, 3], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + # Second message must be received first + assert received_messages[0] == {'msg': 'second'} + assert received_messages[1] == {'msg': 'first'} + assert received_messages[2] == {'msg': 'third'} + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBMessage(BaseMessage): + pass diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py index 88ff0ac7..260f164d 100644 --- a/t/integration/test_py_amqp.py +++ b/t/integration/test_py_amqp.py @@ -1,11 +1,13 @@ +from __future__ import annotations + import os import pytest import kombu -from .common import (BaseExchangeTypes, BaseFailover, BasePriority, - BaseTimeToLive, BasicFunctionality) +from .common import (BaseExchangeTypes, BaseFailover, BaseMessage, + BasePriority, BaseTimeToLive, BasicFunctionality) def get_connection(hostname, port, vhost): @@ -73,3 +75,9 @@ class test_PyAMQPPriority(BasePriority): @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_PyAMQPFailover(BaseFailover): pass + + +@pytest.mark.env('py-amqp') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_PyAMQPMessage(BaseMessage): + pass diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index 72ba803f..b2ae5ab8 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -1,12 +1,17 @@ +from __future__ import annotations + import os +import socket from time import sleep import pytest import redis import kombu +from kombu.transport.redis import Transport -from .common import BaseExchangeTypes, BasePriority, BasicFunctionality +from .common import (BaseExchangeTypes, BaseMessage, BasePriority, + BasicFunctionality) def get_connection( @@ -55,7 +60,11 @@ def test_failed_credentials(): @pytest.mark.env('redis') @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_RedisBasicFunctionality(BasicFunctionality): - pass + def test_failed_connection__ConnectionError(self, invalid_connection): + # method raises transport exception + with pytest.raises(redis.exceptions.ConnectionError) as ex: + invalid_connection.connection + assert ex.type in Transport.connection_errors @pytest.mark.env('redis') @@ -120,3 +129,24 @@ class test_RedisPriority(BasePriority): assert received_messages[0] == {'msg': 'second'} assert received_messages[1] == {'msg': 'first'} assert received_messages[2] == {'msg': 'third'} + + +@pytest.mark.env('redis') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_RedisMessage(BaseMessage): + pass + + +@pytest.mark.env('redis') +def test_RedisConnectTimeout(monkeypatch): + # simulate a connection timeout for a new connection + def connect_timeout(self): + raise socket.timeout + monkeypatch.setattr( + redis.connection.Connection, "_connect", connect_timeout) + + # ensure the timeout raises a TimeoutError + with pytest.raises(redis.exceptions.TimeoutError): + # note the host/port here is irrelevant because + # connect will raise a socket.timeout + kombu.Connection('redis://localhost:12345').connect() |
