diff options
| author | Gordon Sim <gsim@apache.org> | 2013-07-17 08:58:57 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-07-17 08:58:57 +0000 |
| commit | 850e80ea566fc4d9662ce236351a88f32440517a (patch) | |
| tree | 68a416c823b00026f7cc101c375b9908cadb0739 /qpid/cpp/src | |
| parent | 206e8c68e5a81816e265cf0f2465d8ff394c6675 (diff) | |
| download | qpid-python-850e80ea566fc4d9662ce236351a88f32440517a.tar.gz | |
QPID-4993: reroute dropped messages in ring queue if alternate exchange is specified
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1504058 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/LossyQueue.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 19 |
4 files changed, 35 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp index 4104503dac..ba7dfd11a1 100644 --- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp +++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp @@ -52,7 +52,7 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message) QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize()); qpid::sys::Mutex::ScopedUnlock u(messageLock); //TODO: arguably we should try and purge expired messages first but that is potentially expensive - if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE, false)) { + if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) { if (mgmtObject) { mgmtObject->inc_discardsRing(1); if (brokerMgmtObject) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 1d0a8017ef..339cdb7f9d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -677,17 +677,6 @@ namespace { return new MessageFilter(); } - bool reroute(boost::shared_ptr<Exchange> e, const Message& m) - { - if (e) { - DeliverableMessage d(m, 0); - d.getMessage().clearTrace(); - e->routeWithAlternate(d); - return true; - } else { - return false; - } - } void moveTo(boost::shared_ptr<Queue> q, Message& m) { if (q) { @@ -1684,6 +1673,19 @@ void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) { mgmtObject->set_redirectSource(isSrc); } } + +bool Queue::reroute(boost::shared_ptr<Exchange> e, const Message& m) +{ + if (e) { + DeliverableMessage d(m, 0); + d.getMessage().clearTrace(); + e->routeWithAlternate(d); + return true; + } else { + return false; + } +} + Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {} void Queue::QueueUsers::addConsumer() { ++consumers; } void Queue::QueueUsers::addBrowser() { ++browsers; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 5598ee5d13..e66a2171e9 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -501,6 +501,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN bool isRedirectSource() const { return redirectSource; } QPID_BROKER_EXTERN void setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ); + //utility function + static bool reroute(boost::shared_ptr<Exchange> e, const Message& m); + friend class QueueFactory; }; } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 9791cf9f38..2cf9648be4 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1292,7 +1292,6 @@ QPID_AUTO_TEST_CASE(testSimpleRequestResponse) QPID_AUTO_TEST_CASE(testSelfDestructQueue) { MessagingFixture fix; - //create receiver on temp queue for responses (using shorthand for temp queue) Session other = fix.connection.createSession(); Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}"); Receiver r2 = fix.session.createReceiver("amq.fanout"); @@ -1315,6 +1314,24 @@ QPID_AUTO_TEST_CASE(testSelfDestructQueue) } } +QPID_AUTO_TEST_CASE(testReroutingRingQueue) +{ + MessagingFixture fix; + Receiver r1 = fix.session.createReceiver("my-queue; {create:always, node:{x-declare:{alternate-exchange:amq.fanout, auto-delete:True, arguments:{qpid.max_count:10,qpid.policy_type:ring}}}}"); + Receiver r2 = fix.session.createReceiver("amq.fanout"); + + Sender s = fix.session.createSender("my-queue"); + for (uint i = 0; i < 20; ++i) { + s.send(Message((boost::format("MSG_%1%") % (i+1)).str())); + } + for (uint i = 10; i < 20; ++i) { + BOOST_CHECK_EQUAL(r1.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); + } + for (uint i = 0; i < 10; ++i) { + BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); + } +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |
