diff options
Diffstat (limited to 't/integration/common.py')
-rw-r--r-- | t/integration/common.py | 53 |
1 files changed, 51 insertions, 2 deletions
diff --git a/t/integration/common.py b/t/integration/common.py index 84f44dd3..82dc3f1b 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import socket from contextlib import closing from time import sleep @@ -136,14 +138,21 @@ class BaseExchangeTypes: message.delivery_info['exchange'] == '' assert message.payload == body - def _consume(self, connection, queue): + def _create_consumer(self, connection, queue): consumer = kombu.Consumer( connection, [queue], accept=['pickle'] ) consumer.register_callback(self._callback) + return consumer + + def _consume_from(self, connection, consumer): with consumer: connection.drain_events(timeout=1) + def _consume(self, connection, queue): + with self._create_consumer(connection, queue): + connection.drain_events(timeout=1) + def _publish(self, channel, exchange, queues=None, routing_key=None): producer = kombu.Producer(channel, exchange=exchange) if routing_key: @@ -213,7 +222,6 @@ class BaseExchangeTypes: channel, ex, [test_queue1, test_queue2, test_queue3], routing_key='t.1' ) - self._consume(conn, test_queue1) self._consume(conn, test_queue2) with pytest.raises(socket.timeout): @@ -398,6 +406,47 @@ class BasePriority: assert msg.payload == data +class BaseMessage: + + def test_ack(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_ack')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.ack() + with pytest.raises(queue.Empty): + queue.get_nowait() + + def test_reject_no_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_reject_no_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.reject(requeue=False) + with pytest.raises(queue.Empty): + queue.get_nowait() + + def test_reject_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_reject_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.reject(requeue=True) + message2 = queue.get_nowait() + assert message.body == message2.body + message2.ack() + + def test_requeue(self, connection): + with connection as conn: + with closing(conn.SimpleQueue('test_requeue')) as queue: + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + message = queue.get_nowait() + message.requeue() + message2 = queue.get_nowait() + assert message.body == message2.body + message2.ack() + + class BaseFailover(BasicFunctionality): def test_connect(self, failover_connection): |