summaryrefslogtreecommitdiff
path: root/t/integration/common.py
diff options
context:
space:
mode:
Diffstat (limited to 't/integration/common.py')
-rw-r--r--t/integration/common.py53
1 files changed, 51 insertions, 2 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
index 84f44dd3..82dc3f1b 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import socket
from contextlib import closing
from time import sleep
@@ -136,14 +138,21 @@ class BaseExchangeTypes:
message.delivery_info['exchange'] == ''
assert message.payload == body
- def _consume(self, connection, queue):
+ def _create_consumer(self, connection, queue):
consumer = kombu.Consumer(
connection, [queue], accept=['pickle']
)
consumer.register_callback(self._callback)
+ return consumer
+
+ def _consume_from(self, connection, consumer):
with consumer:
connection.drain_events(timeout=1)
+ def _consume(self, connection, queue):
+ with self._create_consumer(connection, queue):
+ connection.drain_events(timeout=1)
+
def _publish(self, channel, exchange, queues=None, routing_key=None):
producer = kombu.Producer(channel, exchange=exchange)
if routing_key:
@@ -213,7 +222,6 @@ class BaseExchangeTypes:
channel, ex, [test_queue1, test_queue2, test_queue3],
routing_key='t.1'
)
-
self._consume(conn, test_queue1)
self._consume(conn, test_queue2)
with pytest.raises(socket.timeout):
@@ -398,6 +406,47 @@ class BasePriority:
assert msg.payload == data
+class BaseMessage:
+
+ def test_ack(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_ack')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.ack()
+ with pytest.raises(queue.Empty):
+ queue.get_nowait()
+
+ def test_reject_no_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_reject_no_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.reject(requeue=False)
+ with pytest.raises(queue.Empty):
+ queue.get_nowait()
+
+ def test_reject_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_reject_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.reject(requeue=True)
+ message2 = queue.get_nowait()
+ assert message.body == message2.body
+ message2.ack()
+
+ def test_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.requeue()
+ message2 = queue.get_nowait()
+ assert message.body == message2.body
+ message2.ack()
+
+
class BaseFailover(BasicFunctionality):
def test_connect(self, failover_connection):