diff options
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryAdapter.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 25 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 3 | ||||
| -rw-r--r-- | cpp/src/tests/DeliveryRecordTest.cpp | 68 | ||||
| -rw-r--r-- | cpp/src/tests/Makefile.am | 1 | ||||
| -rw-r--r-- | cpp/src/tests/TxAckTest.cpp | 2 | ||||
| -rw-r--r-- | python/tests_0-10/message.py | 46 |
11 files changed, 152 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h index f645b37c23..28e2e3eb93 100644 --- a/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -43,7 +43,6 @@ namespace broker { { public: virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0; - virtual void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) = 0; virtual ~DeliveryAdapter(){} }; diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index df700cd15a..62c2002801 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -27,13 +27,15 @@ using namespace qpid::broker; using std::string; -DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, +DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, Queue::shared_ptr _queue, - const string _consumerTag, + const std::string _tag, + DeliveryToken::shared_ptr _token, const DeliveryId _id, bool _acquired, bool _confirmed) : msg(_msg), queue(_queue), - consumerTag(_consumerTag), + tag(_tag), + token(_token), id(_id), acquired(_acquired), confirmed(_confirmed), @@ -41,11 +43,10 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, { } -DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, +DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, Queue::shared_ptr _queue, const DeliveryId _id) : msg(_msg), queue(_queue), - consumerTag(""), id(_id), acquired(true), confirmed(false), @@ -74,13 +75,13 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const return range->covers(id); } -void DeliveryRecord::redeliver(SemanticState* const session) const{ +void DeliveryRecord::redeliver(SemanticState* const session) { if (!confirmed) { if(pull){ //if message was originally sent as response to get, we must requeue it requeue(); }else{ - session->deliver(msg.payload, consumerTag, id); + id = session->redeliver(msg.payload, token); } } } @@ -151,11 +152,17 @@ void DeliveryRecord::acquire(std::vector<DeliveryId>& results) { namespace qpid { namespace broker { -std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) { +std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) +{ out << "{" << "id=" << r.id.getValue(); - out << ", consumer=" << r.consumerTag; + out << ", tag=" << r.tag << "}"; out << ", queue=" << r.queue->getName() << "}"; return out; } +bool operator<(const DeliveryRecord& a, const DeliveryRecord& b) +{ + return a.id < b.id; +} + }} diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index f2c343e27a..21944cd553 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -29,6 +29,7 @@ #include "Queue.h" #include "Consumer.h" #include "DeliveryId.h" +#include "DeliveryToken.h" #include "Message.h" #include "Prefetch.h" @@ -42,16 +43,17 @@ class SemanticState; class DeliveryRecord{ mutable QueuedMessage msg; mutable Queue::shared_ptr queue; - const std::string consumerTag; - const DeliveryId id; + const std::string tag; + DeliveryToken::shared_ptr token; + DeliveryId id; bool acquired; const bool confirmed; const bool pull; public: - DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, + DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, const DeliveryId id, bool acquired, bool confirmed = false); - DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); + DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); void dequeue(TransactionContext* ctxt = 0) const; bool matches(DeliveryId tag) const; @@ -61,17 +63,16 @@ class DeliveryRecord{ void requeue() const; void release(); void reject(); - void redeliver(SemanticState* const) const; + void redeliver(SemanticState* const); void updateByteCredit(uint32_t& credit) const; void addTo(Prefetch&) const; void subtractFrom(Prefetch&) const; - const std::string& getConsumerTag() const { return consumerTag; } + const std::string& getTag() const { return tag; } bool isPull() const { return pull; } bool isAcquired() const { return acquired; } - //void setAcquired(bool isAcquired) { acquired = isAcquired; } void acquire(std::vector<DeliveryId>& results); - - friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); + friend bool operator<(const DeliveryRecord&, const DeliveryRecord&); + friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; typedef std::list<DeliveryRecord>::iterator ack_iterator; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index ae5810684d..9cacb4ccf7 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -176,12 +176,6 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha return outgoing.hwm; } -void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) -{ - MessageDelivery::deliver(msg, session.getHandler().out, tag, token, - session.getConnection().getFrameMax()); -} - SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) { //will be replaced by field in 0-10 frame header diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index d6dbf878c9..dc7f21ac34 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -71,7 +71,6 @@ class SemanticHandler : public DeliveryAdapter, //delivery adapter methods: DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); - void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag); framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } Connection& getConnection() { return session.getConnection(); } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index a3858d0989..11bce282eb 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -250,7 +250,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) DeliveryId deliveryTag = parent->deliveryAdapter.deliver(msg.payload, token); if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected)); + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); } } return !blocked; @@ -273,11 +273,6 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) } } -void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { - Mutex::ScopedLock locker(parent->deliveryLock); - parent->deliveryAdapter.redeliver(msg, token, deliveryTag); -} - SemanticState::ConsumerImpl::~ConsumerImpl() { cancel(); } @@ -384,7 +379,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) void SemanticState::acknowledged(const DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); - ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); + ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->acknowledged(delivery); } @@ -411,6 +406,10 @@ void SemanticState::recover(bool requeue) for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); + //unconfirmed messages re redelivered and therefore have their + //id adjusted, confirmed messages are not and so the ordering + //w.r.t id is lost + unacked.sort(); } } @@ -429,13 +428,10 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue } } -void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag, - DeliveryId deliveryTag) +DeliveryId SemanticState::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { - ConsumerImplMap::iterator i = consumers.find(consumerTag); - if (i != consumers.end()){ - i->redeliver(msg, deliveryTag); - } + Mutex::ScopedLock locker(deliveryLock); + return deliveryAdapter.deliver(msg, token); } void SemanticState::flow(bool active) diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 6147380714..8372f345ad 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -77,7 +77,6 @@ class SemanticState : public framing::FrameHandler::Chains, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); bool deliver(QueuedMessage& msg); - void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag); void cancel(); void requestDispatch(); @@ -169,7 +168,7 @@ class SemanticState : public framing::FrameHandler::Chains, void ackRange(DeliveryId deliveryTag, DeliveryId endTag); void recover(bool requeue); void flow(bool active); - void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); + DeliveryId redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired); void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp new file mode 100644 index 0000000000..011d8bf694 --- /dev/null +++ b/cpp/src/tests/DeliveryRecordTest.cpp @@ -0,0 +1,68 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/DeliveryRecord.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <memory> +#include <boost/format.hpp> + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; +using boost::dynamic_pointer_cast; +using std::list; + +class DeliveryRecordTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(DeliveryRecordTest); + CPPUNIT_TEST(testSort); + CPPUNIT_TEST_SUITE_END(); + +public: + + void testSort() + { + list<SequenceNumber> ids; + ids.push_back(SequenceNumber(6)); + ids.push_back(SequenceNumber(2)); + ids.push_back(SequenceNumber(4)); + ids.push_back(SequenceNumber(5)); + ids.push_back(SequenceNumber(1)); + ids.push_back(SequenceNumber(3)); + + list<DeliveryRecord> records; + for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { + records.push_back(DeliveryRecord(QueuedMessage(), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false)); + } + records.sort(); + + SequenceNumber expected(0); + for (list<DeliveryRecord>::iterator i = records.begin(); i != records.end(); i++) { + CPPUNIT_ASSERT(i->matches(++expected)); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(DeliveryRecordTest); + diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 9c4d5c0fd0..b8622fc925 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -86,6 +86,7 @@ perftest_LDADD=$(lib_client) broker_unit_tests = \ AccumulatedAckTest \ DtxWorkRecordTest \ + DeliveryRecordTest \ ExchangeTest \ HeadersExchangeTest \ MessageTest \ diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index 34d0bcd156..73628f25b5 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -78,7 +78,7 @@ public: messages.push_back(msg); QueuedMessage qm; qm.payload = msg; - deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), true)); + deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true)); } //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index b5b058340f..967c03bbea 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -187,6 +187,46 @@ class MessageTests(TestBase): self.fail("Got unexpected message: " + extra.content.body) except Empty: None + + def test_recover(self): + """ + Test recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="queue-a", exclusive=True) + channel.queue_bind(exchange="amq.fanout", queue="queue-a") + channel.queue_declare(queue="queue-b", exclusive=True) + channel.queue_bind(exchange="amq.fanout", queue="queue-b") + + self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1) + self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0) + confirmed = self.client.queue("confirmed") + unconfirmed = self.client.queue("unconfirmed") + + data = ["One", "Two", "Three", "Four", "Five"] + for d in data: + channel.message_transfer(destination="amq.fanout", content=Content(body=d)) + + for q in [confirmed, unconfirmed]: + for d in data: + self.assertEqual(d, q.get(timeout=1).content.body) + self.assertEmpty(q) + + channel.message_recover(requeue=False) + + self.assertEmpty(confirmed) + + while len(data): + msg = None + for d in data: + msg = unconfirmed.get(timeout=1) + self.assertEqual(d, msg.content.body) + self.assertEmpty(unconfirmed) + data.remove(msg.content.body) + msg.complete(cumulative=False) + channel.message_recover(requeue=False) + + def test_recover_requeue(self): """ Test requeing on recovery @@ -551,3 +591,9 @@ class MessageTests(TestBase): def assertDataEquals(self, channel, msg, expected): self.assertEquals(expected, msg.content.body) + + def assertEmpty(self, queue): + try: + extra = queue.get(timeout=1) + self.fail("Queue not empty, contains: " + extra.content.body) + except Empty: None |
