summaryrefslogtreecommitdiff
path: root/t/integration
diff options
context:
space:
mode:
authorAsif Saif Uddin <auvipy@gmail.com>2023-04-08 22:45:08 +0600
committerGitHub <noreply@github.com>2023-04-08 22:45:08 +0600
commit973dc3790ac25b9da7b6d2641ac72d95470f6ed8 (patch)
tree9e7ba02d8520994a06efc37dde05fba722138189 /t/integration
parent7ceb675bb69917fae182ebdaf9a2298a308c3fa4 (diff)
parent2de7f9f038dd62e097e490cb3fa609067c1c3c36 (diff)
downloadkombu-py310.tar.gz
Merge branch 'main' into py310py310
Diffstat (limited to 't/integration')
-rw-r--r--t/integration/__init__.py2
-rw-r--r--t/integration/common.py53
-rw-r--r--t/integration/test_kafka.py69
-rw-r--r--t/integration/test_mongodb.py115
-rw-r--r--t/integration/test_py_amqp.py12
-rw-r--r--t/integration/test_redis.py34
6 files changed, 279 insertions, 6 deletions
diff --git a/t/integration/__init__.py b/t/integration/__init__.py
index 1bea4880..cfce4f85 100644
--- a/t/integration/__init__.py
+++ b/t/integration/__init__.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import os
import sys
diff --git a/t/integration/common.py b/t/integration/common.py
index 84f44dd3..82dc3f1b 100644
--- a/t/integration/common.py
+++ b/t/integration/common.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import socket
from contextlib import closing
from time import sleep
@@ -136,14 +138,21 @@ class BaseExchangeTypes:
message.delivery_info['exchange'] == ''
assert message.payload == body
- def _consume(self, connection, queue):
+ def _create_consumer(self, connection, queue):
consumer = kombu.Consumer(
connection, [queue], accept=['pickle']
)
consumer.register_callback(self._callback)
+ return consumer
+
+ def _consume_from(self, connection, consumer):
with consumer:
connection.drain_events(timeout=1)
+ def _consume(self, connection, queue):
+ with self._create_consumer(connection, queue):
+ connection.drain_events(timeout=1)
+
def _publish(self, channel, exchange, queues=None, routing_key=None):
producer = kombu.Producer(channel, exchange=exchange)
if routing_key:
@@ -213,7 +222,6 @@ class BaseExchangeTypes:
channel, ex, [test_queue1, test_queue2, test_queue3],
routing_key='t.1'
)
-
self._consume(conn, test_queue1)
self._consume(conn, test_queue2)
with pytest.raises(socket.timeout):
@@ -398,6 +406,47 @@ class BasePriority:
assert msg.payload == data
+class BaseMessage:
+
+ def test_ack(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_ack')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.ack()
+ with pytest.raises(queue.Empty):
+ queue.get_nowait()
+
+ def test_reject_no_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_reject_no_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.reject(requeue=False)
+ with pytest.raises(queue.Empty):
+ queue.get_nowait()
+
+ def test_reject_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_reject_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.reject(requeue=True)
+ message2 = queue.get_nowait()
+ assert message.body == message2.body
+ message2.ack()
+
+ def test_requeue(self, connection):
+ with connection as conn:
+ with closing(conn.SimpleQueue('test_requeue')) as queue:
+ queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
+ message = queue.get_nowait()
+ message.requeue()
+ message2 = queue.get_nowait()
+ assert message.body == message2.body
+ message2.ack()
+
+
class BaseFailover(BasicFunctionality):
def test_connect(self, failover_connection):
diff --git a/t/integration/test_kafka.py b/t/integration/test_kafka.py
new file mode 100644
index 00000000..2303d887
--- /dev/null
+++ b/t/integration/test_kafka.py
@@ -0,0 +1,69 @@
+from __future__ import annotations
+
+import pytest
+
+import kombu
+
+from .common import (BaseExchangeTypes, BaseFailover, BaseMessage,
+ BasicFunctionality)
+
+
+def get_connection(hostname, port):
+ return kombu.Connection(
+ f'confluentkafka://{hostname}:{port}',
+ )
+
+
+def get_failover_connection(hostname, port):
+ return kombu.Connection(
+ f'confluentkafka://localhost:12345;confluentkafka://{hostname}:{port}',
+ connect_timeout=10,
+ )
+
+
+@pytest.fixture()
+def invalid_connection():
+ return kombu.Connection('confluentkafka://localhost:12345')
+
+
+@pytest.fixture()
+def connection():
+ return get_connection(
+ hostname='localhost',
+ port='9092'
+ )
+
+
+@pytest.fixture()
+def failover_connection():
+ return get_failover_connection(
+ hostname='localhost',
+ port='9092'
+ )
+
+
+@pytest.mark.env('kafka')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_KafkaBasicFunctionality(BasicFunctionality):
+ pass
+
+
+@pytest.mark.env('kafka')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_KafkaBaseExchangeTypes(BaseExchangeTypes):
+
+ @pytest.mark.skip('fanout is not implemented')
+ def test_fanout(self, connection):
+ pass
+
+
+@pytest.mark.env('kafka')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_KafkaFailover(BaseFailover):
+ pass
+
+
+@pytest.mark.env('kafka')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_KafkaMessage(BaseMessage):
+ pass
diff --git a/t/integration/test_mongodb.py b/t/integration/test_mongodb.py
new file mode 100644
index 00000000..445f1389
--- /dev/null
+++ b/t/integration/test_mongodb.py
@@ -0,0 +1,115 @@
+from __future__ import annotations
+
+import os
+
+import pytest
+
+import kombu
+
+from .common import (BaseExchangeTypes, BaseMessage, BasePriority,
+ BasicFunctionality)
+
+
+def get_connection(hostname, port, vhost):
+ return kombu.Connection(
+ f'mongodb://{hostname}:{port}/{vhost}',
+ transport_options={'ttl': True},
+ )
+
+
+@pytest.fixture()
+def invalid_connection():
+ return kombu.Connection('mongodb://localhost:12345?connectTimeoutMS=1')
+
+
+@pytest.fixture()
+def connection(request):
+ return get_connection(
+ hostname=os.environ.get('MONGODB_HOST', 'localhost'),
+ port=os.environ.get('MONGODB_27017_TCP', '27017'),
+ vhost=getattr(
+ request.config, "slaveinput", {}
+ ).get("slaveid", 'tests'),
+ )
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBBasicFunctionality(BasicFunctionality):
+ pass
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBBaseExchangeTypes(BaseExchangeTypes):
+
+ # MongoDB consumer skips old messages upon initialization.
+ # Ensure that it's created before test messages are published.
+
+ def test_fanout(self, connection):
+ ex = kombu.Exchange('test_fanout', type='fanout')
+ test_queue1 = kombu.Queue('fanout1', exchange=ex)
+ consumer1 = self._create_consumer(connection, test_queue1)
+ test_queue2 = kombu.Queue('fanout2', exchange=ex)
+ consumer2 = self._create_consumer(connection, test_queue2)
+
+ with connection as conn:
+ with conn.channel() as channel:
+ self._publish(channel, ex, [test_queue1, test_queue2])
+
+ self._consume_from(conn, consumer1)
+ self._consume_from(conn, consumer2)
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBPriority(BasePriority):
+
+ # drain_events() consumes only one value unlike in py-amqp.
+
+ def test_publish_consume(self, connection):
+ test_queue = kombu.Queue(
+ 'priority_test', routing_key='priority_test', max_priority=10
+ )
+
+ received_messages = []
+
+ def callback(body, message):
+ received_messages.append(body)
+ message.ack()
+
+ with connection as conn:
+ with conn.channel() as channel:
+ producer = kombu.Producer(channel)
+ for msg, prio in [
+ [{'msg': 'first'}, 3],
+ [{'msg': 'second'}, 6],
+ [{'msg': 'third'}, 3],
+ ]:
+ producer.publish(
+ msg,
+ retry=True,
+ exchange=test_queue.exchange,
+ routing_key=test_queue.routing_key,
+ declare=[test_queue],
+ serializer='pickle',
+ priority=prio
+ )
+ consumer = kombu.Consumer(
+ conn, [test_queue], accept=['pickle']
+ )
+ consumer.register_callback(callback)
+ with consumer:
+ conn.drain_events(timeout=1)
+ conn.drain_events(timeout=1)
+ conn.drain_events(timeout=1)
+ # Second message must be received first
+ assert received_messages[0] == {'msg': 'second'}
+ assert received_messages[1] == {'msg': 'first'}
+ assert received_messages[2] == {'msg': 'third'}
+
+
+@pytest.mark.env('mongodb')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_MongoDBMessage(BaseMessage):
+ pass
diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py
index 88ff0ac7..260f164d 100644
--- a/t/integration/test_py_amqp.py
+++ b/t/integration/test_py_amqp.py
@@ -1,11 +1,13 @@
+from __future__ import annotations
+
import os
import pytest
import kombu
-from .common import (BaseExchangeTypes, BaseFailover, BasePriority,
- BaseTimeToLive, BasicFunctionality)
+from .common import (BaseExchangeTypes, BaseFailover, BaseMessage,
+ BasePriority, BaseTimeToLive, BasicFunctionality)
def get_connection(hostname, port, vhost):
@@ -73,3 +75,9 @@ class test_PyAMQPPriority(BasePriority):
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_PyAMQPFailover(BaseFailover):
pass
+
+
+@pytest.mark.env('py-amqp')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_PyAMQPMessage(BaseMessage):
+ pass
diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py
index 72ba803f..b2ae5ab8 100644
--- a/t/integration/test_redis.py
+++ b/t/integration/test_redis.py
@@ -1,12 +1,17 @@
+from __future__ import annotations
+
import os
+import socket
from time import sleep
import pytest
import redis
import kombu
+from kombu.transport.redis import Transport
-from .common import BaseExchangeTypes, BasePriority, BasicFunctionality
+from .common import (BaseExchangeTypes, BaseMessage, BasePriority,
+ BasicFunctionality)
def get_connection(
@@ -55,7 +60,11 @@ def test_failed_credentials():
@pytest.mark.env('redis')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_RedisBasicFunctionality(BasicFunctionality):
- pass
+ def test_failed_connection__ConnectionError(self, invalid_connection):
+ # method raises transport exception
+ with pytest.raises(redis.exceptions.ConnectionError) as ex:
+ invalid_connection.connection
+ assert ex.type in Transport.connection_errors
@pytest.mark.env('redis')
@@ -120,3 +129,24 @@ class test_RedisPriority(BasePriority):
assert received_messages[0] == {'msg': 'second'}
assert received_messages[1] == {'msg': 'first'}
assert received_messages[2] == {'msg': 'third'}
+
+
+@pytest.mark.env('redis')
+@pytest.mark.flaky(reruns=5, reruns_delay=2)
+class test_RedisMessage(BaseMessage):
+ pass
+
+
+@pytest.mark.env('redis')
+def test_RedisConnectTimeout(monkeypatch):
+ # simulate a connection timeout for a new connection
+ def connect_timeout(self):
+ raise socket.timeout
+ monkeypatch.setattr(
+ redis.connection.Connection, "_connect", connect_timeout)
+
+ # ensure the timeout raises a TimeoutError
+ with pytest.raises(redis.exceptions.TimeoutError):
+ # note the host/port here is irrelevant because
+ # connect will raise a socket.timeout
+ kombu.Connection('redis://localhost:12345').connect()