diff options
author | Asif Saif Uddin <auvipy@gmail.com> | 2023-04-08 22:45:08 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-08 22:45:08 +0600 |
commit | 973dc3790ac25b9da7b6d2641ac72d95470f6ed8 (patch) | |
tree | 9e7ba02d8520994a06efc37dde05fba722138189 /t/integration/common.py | |
parent | 7ceb675bb69917fae182ebdaf9a2298a308c3fa4 (diff) | |
parent | 2de7f9f038dd62e097e490cb3fa609067c1c3c36 (diff) | |
download | kombu-py310.tar.gz |
Merge branch 'main' into py310py310
Diffstat (limited to 't/integration/common.py')
-rw-r--r-- | t/integration/common.py | 53 |
1 files changed, 51 insertions, 2 deletions
diff --git a/t/integration/common.py b/t/integration/common.py index 84f44dd3..82dc3f1b 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import socket from contextlib import closing from time import sleep @@ -136,14 +138,21 @@ class BaseExchangeTypes: message.delivery_info['exchange'] == '' assert message.payload == body - def _consume(self, connection, queue): + def _create_consumer(self, connection, queue): consumer = kombu.Consumer( connection, [queue], accept=['pickle'] ) consumer.register_callback(self._callback) + return consumer + + def _consume_from(self, connection, consumer): with consumer: connection.drain_events(timeout=1) + def _consume(self, connection, queue): + with self._create_consumer(connection, queue): + connection.drain_events(timeout=1) + def _publish(self, channel, exchange, queues=None, routing_key=None): producer = kombu.Producer(channel, exchange=exchange) if routing_key: @@ -213,7 +222,6 @@ class BaseExchangeTypes: channel, ex, [test_queue1, test_queue2, test_queue3], routing_key='t.1' ) - self._consume(conn, test_queue1) self._consume(conn, test_queue2) with pytest.raises(socket.timeout): @@ -398,6 +406,47 @@ class BasePriority: assert msg.payload == data +class BaseMessage: + + def test_ack(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_ack')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.ack() + with pytest.raises(queue.Empty): + queue.get_nowait() + + def test_reject_no_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_reject_no_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.reject(requeue=False) + with pytest.raises(queue.Empty): + queue.get_nowait() + + def test_reject_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_reject_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.reject(requeue=True) + message2 = queue.get_nowait() + assert message.body == message2.body + message2.ack() + + def test_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.requeue() + message2 = queue.get_nowait() + assert message.body == message2.body + message2.ack() + + class BaseFailover(BasicFunctionality): def test_connect(self, failover_connection): |