summaryrefslogtreecommitdiff
path: root/t/integration/common.py
diff options
context:
space:
mode:
authorAsif Saif Uddin <auvipy@gmail.com>2023-04-08 22:45:08 +0600
committerGitHub <noreply@github.com>2023-04-08 22:45:08 +0600
commit973dc3790ac25b9da7b6d2641ac72d95470f6ed8 (patch)
tree9e7ba02d8520994a06efc37dde05fba722138189 /t/integration/common.py
parent7ceb675bb69917fae182ebdaf9a2298a308c3fa4 (diff)
parent2de7f9f038dd62e097e490cb3fa609067c1c3c36 (diff)
downloadkombu-py310.tar.gz
Merge branch 'main' into py310py310
Diffstat (limited to 't/integration/common.py')
-rw-r--r--t/integration/common.py53
1 files changed, 51 insertions, 2 deletions
diff --git a/t/integration/common.py b/t/integration/common.py
index 84f44dd3..82dc3f1b 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import socket
from contextlib import closing
from time import sleep
@@ -136,14 +138,21 @@ class BaseExchangeTypes:
message.delivery_info['exchange'] == ''
assert message.payload == body
- def _consume(self, connection, queue):
+ def _create_consumer(self, connection, queue):
consumer = kombu.Consumer(
connection, [queue], accept=['pickle']
)
consumer.register_callback(self._callback)
+ return consumer
+
+ def _consume_from(self, connection, consumer):
with consumer:
connection.drain_events(timeout=1)
+ def _consume(self, connection, queue):
+ with self._create_consumer(connection, queue):
+ connection.drain_events(timeout=1)
+
def _publish(self, channel, exchange, queues=None, routing_key=None):
producer = kombu.Producer(channel, exchange=exchange)
if routing_key:
@@ -213,7 +222,6 @@ class BaseExchangeTypes:
channel, ex, [test_queue1, test_queue2, test_queue3],
routing_key='t.1'
)
-
self._consume(conn, test_queue1)
self._consume(conn, test_queue2)
with pytest.raises(socket.timeout):
@@ -398,6 +406,47 @@ class BasePriority:
assert msg.payload == data
+class BaseMessage:
+
+ def test_ack(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_ack')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.ack()
+ with pytest.raises(queue.Empty):
+ queue.get_nowait()
+
+ def test_reject_no_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_reject_no_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.reject(requeue=False)
+ with pytest.raises(queue.Empty):
+ queue.get_nowait()
+
+ def test_reject_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_reject_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.reject(requeue=True)
+ message2 = queue.get_nowait()
+ assert message.body == message2.body
+ message2.ack()
+
+ def test_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.requeue()
+ message2 = queue.get_nowait()
+ assert message.body == message2.body
+ message2.ack()
+
+
class BaseFailover(BasicFunctionality):
def test_connect(self, failover_connection):