diff options
| -rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 64 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 24 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 4 | ||||
| -rw-r--r-- | python/cpp_failing_0-10.txt | 4 | ||||
| -rw-r--r-- | python/tests_0-10/message.py | 60 | ||||
| -rw-r--r-- | python/tests_0-10/queue.py | 62 |
10 files changed, 149 insertions, 109 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index ed4bb176f6..65c60182b8 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -35,7 +35,7 @@ namespace qpid { { intrusive_ptr<Message> payload; framing::SequenceNumber position; - Queue* queue; + Queue* queue; QueuedMessage(Queue* q, intrusive_ptr<Message> msg, framing::SequenceNumber sn) : payload(msg), position(sn), queue(q) {} diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 154394e5de..ca90f32a5d 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -32,16 +32,20 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, const std::string _tag, DeliveryToken::shared_ptr _token, const DeliveryId _id, - bool _acquired, bool _confirmed) : msg(_msg), - queue(_queue), - tag(_tag), - token(_token), - id(_id), - acquired(_acquired), - confirmed(_confirmed), - pull(false), - cancelled(false) + bool _acquired, bool accepted) : msg(_msg), + queue(_queue), + tag(_tag), + token(_token), + id(_id), + acquired(_acquired), + pull(false), + cancelled(false), + credit(msg.payload ? msg.payload->getRequiredCredit() : 0), + size(msg.payload ? msg.payload->contentSize() : 0), + completed(false), + ended(accepted) { + if (accepted) setEnded(); } DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, @@ -50,14 +54,23 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, queue(_queue), id(_id), acquired(true), - confirmed(false), pull(true), - cancelled(false) + cancelled(false), + credit(msg.payload ? msg.payload->getRequiredCredit() : 0), + size(msg.payload ? msg.payload->contentSize() : 0), + completed(false), + ended(false) {} +void DeliveryRecord::setEnded() +{ + ended = true; + //reset msg pointer, don't need to hold on to it anymore + msg.payload = boost::intrusive_ptr<Message>(); +} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - if (acquired && !confirmed) { + if (acquired && !ended) { queue->dequeue(ctxt, msg.payload); } } @@ -79,7 +92,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const } void DeliveryRecord::redeliver(SemanticState* const session) { - if (!confirmed) { + if (!ended) { if(pull || cancelled){ //if message was originally sent as response to get, we must requeue it @@ -96,7 +109,7 @@ void DeliveryRecord::redeliver(SemanticState* const session) { void DeliveryRecord::requeue() const { - if (acquired && !confirmed) { + if (acquired && !ended) { msg.payload->redeliver(); queue->requeue(msg); } @@ -104,9 +117,22 @@ void DeliveryRecord::requeue() const void DeliveryRecord::release() { - if (acquired && !confirmed) { + if (acquired && !ended) { queue->requeue(msg); acquired = false; + setEnded(); + } +} + +void DeliveryRecord::complete() +{ + completed = true; +} + +void DeliveryRecord::accept(TransactionContext* ctxt) { + if (acquired && !ended) { + queue->dequeue(ctxt, msg.payload); + setEnded(); } } @@ -124,9 +150,9 @@ void DeliveryRecord::reject() } } -void DeliveryRecord::updateByteCredit(uint32_t& credit) const +uint32_t DeliveryRecord::getCredit() const { - credit += msg.payload->getRequiredCredit(); + return credit; } @@ -134,7 +160,7 @@ void DeliveryRecord::addTo(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch.size += msg.payload->contentSize(); + prefetch.size += size; prefetch.count++; } } @@ -143,7 +169,7 @@ void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch.size -= msg.payload->contentSize(); + prefetch.size -= size; prefetch.count--; } } diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index eeb363bcfc..b2672345b4 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -47,32 +47,45 @@ class DeliveryRecord{ DeliveryToken::shared_ptr token; DeliveryId id; bool acquired; - const bool confirmed; const bool pull; bool cancelled; + const uint32_t credit; + const uint64_t size; + + bool completed; + bool ended; + + void setEnded(); public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, const DeliveryId id, bool acquired, bool confirmed = false); DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); - void dequeue(TransactionContext* ctxt = 0) const; bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; bool coveredBy(const framing::AccumulatedAck* const range) const; + + void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; void release(); void reject(); void cancel(const std::string& tag); void redeliver(SemanticState* const); - void updateByteCredit(uint32_t& credit) const; + void acquire(DeliveryIds& results); + void complete(); + void accept(TransactionContext* ctxt); + + bool isAcquired() const { return acquired; } + bool isComplete() const { return completed; } + bool isRedundant() const { return ended && completed; } + + uint32_t getCredit() const; void addTo(Prefetch&) const; void subtractFrom(Prefetch&) const; const std::string& getTag() const { return tag; } bool isPull() const { return pull; } - bool isAcquired() const { return acquired; } - void acquire(DeliveryIds& results); friend bool operator<(const DeliveryRecord&, const DeliveryRecord&); friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp index 676f9e4b3d..6c3d960d1f 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp @@ -64,7 +64,7 @@ void MultiVersionConnectionInputHandler::idleIn() bool MultiVersionConnectionInputHandler::doOutput() { - return check(false) && handler->doOutput(); + return handler.get() && handler->doOutput(); } qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation() @@ -74,17 +74,14 @@ qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiat void MultiVersionConnectionInputHandler::closed() { - check(); - handler->closed(); + if (handler.get()) handler->closed(); + //else closed before initiated, nothing to do } -bool MultiVersionConnectionInputHandler::check(bool fail) +void MultiVersionConnectionInputHandler::check() { if (!handler.get()) { - if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!"); - else return false; - } else { - return true; + throw qpid::framing::InternalErrorException("Handler not initialised!"); } } diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h index 4301eba57c..440c00c09a 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h @@ -38,7 +38,7 @@ class MultiVersionConnectionInputHandler : public qpid::sys::ConnectionInputHand Broker& broker; const std::string id; - bool check(bool fail = true); + void check(); public: MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 5851eeeafb..f372c60044 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -393,7 +393,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(start, end, boost::bind(&SemanticState::complete, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -433,20 +433,23 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::complete(DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->adjustFlow(delivery); + get_pointer(i)->complete(delivery); } } -void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { - if (windowing) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); + if (!delivery.isComplete()) { + delivery.complete(); + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); + } } } @@ -662,15 +665,16 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) dtxBuffer->enlist(txAck); } } else { - for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(range.start, range.end); + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); } } void SemanticState::completed(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); - for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); requestDispatch(); } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 8648135cae..3d31d5a5a2 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains, void addMessageCredit(uint32_t value); void flush(); void stop(); - void adjustFlow(const DeliveryRecord&); + void complete(DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } bool isBlocked() const { return blocked; } @@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains, void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); - void adjustFlow(const DeliveryRecord&); + void complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 5329c0f6e2..1b937563c6 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -52,8 +52,6 @@ tests_0-10.message.MessageTests.test_consume_no_local tests_0-10.message.MessageTests.test_consume_no_local_awkward tests_0-10.message.MessageTests.test_consume_queue_errors tests_0-10.message.MessageTests.test_consume_unique_consumers -tests_0-10.message.MessageTests.test_credit_flow_bytes -tests_0-10.message.MessageTests.test_credit_flow_messages tests_0-10.message.MessageTests.test_no_size tests_0-10.message.MessageTests.test_qos_prefetch_count tests_0-10.message.MessageTests.test_qos_prefetch_size @@ -63,8 +61,6 @@ tests_0-10.message.MessageTests.test_recover_requeue tests_0-10.message.MessageTests.test_subscribe_not_acquired tests_0-10.message.MessageTests.test_subscribe_not_acquired_2 tests_0-10.message.MessageTests.test_subscribe_not_acquired_3 -tests_0-10.message.MessageTests.test_window_flow_bytes -tests_0-10.message.MessageTests.test_window_flow_messages tests_0-10.testlib.TestBaseTest.testAssertEmptyFail tests_0-10.testlib.TestBaseTest.testAssertEmptyPass tests_0-10.testlib.TestBaseTest.testMessageProperties diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 5f97d6c705..64e2bc44c4 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -464,10 +464,10 @@ class MessageTests(TestBase010): session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") - session.message_flow_mode(mode = 0, destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") #send batch of messages to queue for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) session.message_flow(unit = 0, value = 5, destination = "c") @@ -494,13 +494,13 @@ class MessageTests(TestBase010): session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) session.message_subscribe(queue = "q", destination = "c") - session.message_flow_mode(mode = 0, destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") #send batch of messages to queue for i in range(10): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 35 + msg_size = 21 #set byte credit to finite amount (less than enough for all messages) session.message_flow(unit = 1, value = msg_size*5, destination = "c") @@ -527,11 +527,11 @@ class MessageTests(TestBase010): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) - session.message_flow_mode(mode = 1, destination = "c") + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") #send batch of messages to queue for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) session.message_flow(unit = 0, value = 5, destination = "c") @@ -539,13 +539,16 @@ class MessageTests(TestBase010): session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") #check that expected number were received q = session.incoming("c") - for i in range(1, 6): + for i in range(1, 6): msg = q.get(timeout = 1) + session.receiver._completed.add(msg.id)#TODO: this may be done automatically self.assertDataEquals(session, msg, "Message %d" % i) self.assertEmpty(q) #acknowledge messages and check more are received - msg.complete(cumulative=True) + #TODO: there may be a nicer way of doing this + session.channel.session_completed(session.receiver._completed) + for i in range(6, 11): self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) @@ -559,14 +562,14 @@ class MessageTests(TestBase010): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) - session.message_flow_mode(mode = 1, destination = "c") + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") #send batch of messages to queue for i in range(10): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 40 + msg_size = 19 #set byte credit to finite amount (less than enough for all messages) session.message_flow(unit = 1, value = msg_size*5, destination = "c") @@ -584,7 +587,9 @@ class MessageTests(TestBase010): #ack each message individually and check more are received for i in range(5): msg = msgs.pop() - msg.complete(cumulative=False) + #TODO: there may be a nicer way of doing this + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) @@ -595,13 +600,17 @@ class MessageTests(TestBase010): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 6): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) - self.subscribe(queue = "q", destination = "a", acquire_mode = 1) - self.subscribe(queue = "q", destination = "b", acquire_mode = 1) + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") for i in range(6, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) #both subscribers should see all messages qA = session.incoming("a") @@ -610,8 +619,9 @@ class MessageTests(TestBase010): for q in [qA, qB]: msg = q.get(timeout = 1) self.assertEquals("Message %s" % i, msg.body) - msg.complete() + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) #messages should still be on the queue: self.assertEquals(10, session.queue_query(queue = "q").message_count) @@ -625,7 +635,7 @@ class MessageTests(TestBase010): #use fanout for now: session.exchange_bind(exchange="amq.fanout", queue="q") session.message_transfer(destination="amq.fanout", message=Message("acquire me")) - #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) + #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) @@ -724,11 +734,11 @@ class MessageTests(TestBase010): #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #consume some of them session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) - session.message_flow_mode(mode = 0, destination = "a") + session.message_set_flow_mode(flow_mode = 0, destination = "a") session.message_flow(unit = 0, value = 5, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") @@ -762,7 +772,7 @@ class MessageTests(TestBase010): #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): - session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #create a not-acquired subscriber session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) @@ -861,7 +871,7 @@ class MessageTests(TestBase010): def assertEmpty(self, queue): try: extra = queue.get(timeout=1) - self.fail("Queue not empty, contains: " + extra.content.body) + self.fail("Queue not empty, contains: " + extra.body) except Empty: None class SizelessContent(Content): diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index f192a2af90..b972166325 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -18,10 +18,10 @@ # from qpid.client import Client, Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase010 +from qpid.datatypes import Message -class QueueTests(TestBase): +class QueueTests(TestBase010): """Tests for 'methods' on the amqp queue 'class'""" def test_purge(self): @@ -31,9 +31,9 @@ class QueueTests(TestBase): session = self.session #setup, declare a queue and add some messages to it: session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"})) - session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"})) - session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"})) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three")) #check that the queue now reports 3 messages: session.queue_declare(queue="test-queue") @@ -46,15 +46,16 @@ class QueueTests(TestBase): self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"})) - self.subscribe(queue="test-queue", destination="tag") - queue = self.client.queue("tag") + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four")) + session.message_subscribe(queue="test-queue", destination="tag") + session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF) + queue = session.incoming("tag") msg = queue.get(timeout=1) - self.assertEqual("four", msg.content.body) + self.assertEqual("four", msg.body) #check error conditions (use new sessions): - session = self.client.session(2) - session.session_open() + session = self.conn.session("error-checker") try: #queue specified but doesn't exist: session.queue_purge(queue="invalid-queue") @@ -62,8 +63,7 @@ class QueueTests(TestBase): except Closed, e: self.assertChannelException(404, e.args[0]) - session = self.client.session(3) - session.session_open() + session = self.conn.session("error-checker") try: #queue not specified and none previously declared for channel: session.queue_purge() @@ -71,12 +71,6 @@ class QueueTests(TestBase): except Closed, e: self.assertConnectionException(530, e.args[0]) - #cleanup - other = self.connect() - session = other.session(1) - session.session_open() - session.exchange_delete(exchange="test-exchange") - def test_declare_exclusive(self): """ Test that the exclusive field is honoured in queue.declare @@ -167,32 +161,32 @@ class QueueTests(TestBase): self.subscribe(queue="queue-1", destination="queue-1") self.subscribe(queue="queue-2", destination="queue-2") - queue1 = self.client.queue("queue-1") - queue2 = self.client.queue("queue-2") + queue1 = session.incoming("queue-1") + queue2 = session.incoming("queue-2") session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings session.message_transfer(destination=exchange, - content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) + message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "one")) #unbind first queue session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) #send another message session.message_transfer(destination=exchange, - content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) + message=Message(session.delivery_properties(routing_key=routing_key, application_headers=headers), "two", )) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).content.body) + self.assertEquals("one", queue1.get(timeout=1).body) try: msg = queue1.get(timeout=1) - self.fail("Got extra message: %s" % msg.content.body) + self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).content.body) - self.assertEquals("two", queue2.get(timeout=1).content.body) + self.assertEquals("one", queue2.get(timeout=1).body) + self.assertEquals("two", queue2.get(timeout=1).body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) @@ -207,9 +201,9 @@ class QueueTests(TestBase): #straight-forward case: session.queue_declare(queue="delete-me") - session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) - session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) - session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) + session.message_transfer(message=Message("a", session.delivery_properties(routing_key="delete-me"))) + session.message_transfer(message=Message("b", session.delivery_properties(routing_key="delete-me"))) + session.message_transfer(message=Message("c", session.delivery_properties(routing_key="delete-me"))) session.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: @@ -238,7 +232,7 @@ class QueueTests(TestBase): #create a queue and add a message to it (use default binding): session.queue_declare(queue="delete-me-2") session.queue_declare(queue="delete-me-2", passive=True) - session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) + session.message_transfer(message=Message("message", session.delivery_properties(routing_key="delete-me-2"))) #try to delete, but only if empty: try: @@ -253,9 +247,9 @@ class QueueTests(TestBase): #empty queue: self.subscribe(session, destination="consumer_tag", queue="delete-me-2") - queue = self.client.queue("consumer_tag") + queue = session.incoming("consumer_tag") msg = queue.get(timeout=1) - self.assertEqual("message", msg.content.body) + self.assertEqual("message", msg.body) session.message_cancel(destination="consumer_tag") #retry deletion on empty queue: |
