From c1624ea57c4097b30c155844e66ebbbe2dddc20e Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 8 Jul 2009 11:48:57 +0000 Subject: QPID-1974: Fixes (and tests) for updating lvq state to new cluster members. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@792103 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Message.cpp | 15 +++++++++++++++ cpp/src/qpid/broker/Message.h | 4 ++++ cpp/src/qpid/broker/Queue.cpp | 2 +- cpp/src/qpid/broker/Queue.h | 10 ++++++++-- 4 files changed, 28 insertions(+), 3 deletions(-) (limited to 'cpp/src/qpid/broker') diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 2c3c444baa..b912cd3a1c 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -401,4 +401,19 @@ framing::FieldTable& Message::getOrInsertHeaders() return getProperties()->getApplicationHeaders(); } + +void Message::setUpdateDestination(const std::string& d) +{ + updateDestination = d; +} + + +bool Message::isUpdateMessage() +{ + return updateDestination.size() && isA() + && getMethod()->getDestination() == updateDestination; +} + +std::string Message::updateDestination; + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 94ce7561a7..2c75d945fa 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -160,6 +160,9 @@ public: void setDequeueCompleteCallback(MessageCallback& cb); void resetDequeueCompleteCallback(); + bool isUpdateMessage(); + static void setUpdateDestination(const std::string&); + private: typedef std::map > Replacement; @@ -186,6 +189,7 @@ public: mutable boost::intrusive_ptr empty; MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; + static std::string updateDestination; }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 759a38d919..30be733f89 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -570,7 +570,7 @@ void Queue::push(boost::intrusive_ptr& msg, bool isRecovery){ string key = ft->getAsString(qpidVQMatchProperty); i = lvq.find(key); - if (i == lvq.end()){ + if (i == lvq.end() || msg->isUpdateMessage()){ messages.push_back(qm); listeners.populate(copy); lvq[key] = msg; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 56e349b06b..dbad5e12ed 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -300,9 +300,15 @@ namespace qpid { ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); /** Apply f to each Message on the queue. */ - template void eachMessage(F f) const { + template void eachMessage(F f) { sys::Mutex::ScopedLock l(messageLock); - std::for_each(messages.begin(), messages.end(), f); + if (lastValueQueue) { + for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { + f(checkLvqReplace(*i)); + } + } else { + std::for_each(messages.begin(), messages.end(), f); + } } /** Apply f to each QueueBinding on the queue */ -- cgit v1.2.1