diff options
author | Asif Saif Uddin <auvipy@gmail.com> | 2023-04-08 22:45:08 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-08 22:45:08 +0600 |
commit | 973dc3790ac25b9da7b6d2641ac72d95470f6ed8 (patch) | |
tree | 9e7ba02d8520994a06efc37dde05fba722138189 /t/integration/test_mongodb.py | |
parent | 7ceb675bb69917fae182ebdaf9a2298a308c3fa4 (diff) | |
parent | 2de7f9f038dd62e097e490cb3fa609067c1c3c36 (diff) | |
download | kombu-py310.tar.gz |
Merge branch 'main' into py310py310
Diffstat (limited to 't/integration/test_mongodb.py')
-rw-r--r-- | t/integration/test_mongodb.py | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py new file mode 100644 index 00000000..445f1389 --- /dev/null +++ b/t/integration/test_mongodb.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import os + +import pytest + +import kombu + +from .common import (BaseExchangeTypes, BaseMessage, BasePriority, + BasicFunctionality) + + +def get_connection(hostname, port, vhost): + return kombu.Connection( + f'mongodb://{hostname}:{port}/{vhost}', + transport_options={'ttl': True}, + ) + + +@pytest.fixture() +def invalid_connection(): + return kombu.Connection('mongodb://localhost:12345?connectTimeoutMS=1') + + +@pytest.fixture() +def connection(request): + return get_connection( + hostname=os.environ.get('MONGODB_HOST', 'localhost'), + port=os.environ.get('MONGODB_27017_TCP', '27017'), + vhost=getattr( + request.config, "slaveinput", {} + ).get("slaveid", 'tests'), + ) + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBBasicFunctionality(BasicFunctionality): + pass + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBBaseExchangeTypes(BaseExchangeTypes): + + # MongoDB consumer skips old messages upon initialization. + # Ensure that it's created before test messages are published. + + def test_fanout(self, connection): + ex = kombu.Exchange('test_fanout', type='fanout') + test_queue1 = kombu.Queue('fanout1', exchange=ex) + consumer1 = self._create_consumer(connection, test_queue1) + test_queue2 = kombu.Queue('fanout2', exchange=ex) + consumer2 = self._create_consumer(connection, test_queue2) + + with connection as conn: + with conn.channel() as channel: + self._publish(channel, ex, [test_queue1, test_queue2]) + + self._consume_from(conn, consumer1) + self._consume_from(conn, consumer2) + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBPriority(BasePriority): + + # drain_events() consumes only one value unlike in py-amqp. + + def test_publish_consume(self, connection): + test_queue = kombu.Queue( + 'priority_test', routing_key='priority_test', max_priority=10 + ) + + received_messages = [] + + def callback(body, message): + received_messages.append(body) + message.ack() + + with connection as conn: + with conn.channel() as channel: + producer = kombu.Producer(channel) + for msg, prio in [ + [{'msg': 'first'}, 3], + [{'msg': 'second'}, 6], + [{'msg': 'third'}, 3], + ]: + producer.publish( + msg, + retry=True, + exchange=test_queue.exchange, + routing_key=test_queue.routing_key, + declare=[test_queue], + serializer='pickle', + priority=prio + ) + consumer = kombu.Consumer( + conn, [test_queue], accept=['pickle'] + ) + consumer.register_callback(callback) + with consumer: + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + conn.drain_events(timeout=1) + # Second message must be received first + assert received_messages[0] == {'msg': 'second'} + assert received_messages[1] == {'msg': 'first'} + assert received_messages[2] == {'msg': 'third'} + + +@pytest.mark.env('mongodb') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_MongoDBMessage(BaseMessage): + pass |