diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2009-11-09 18:41:36 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2009-11-09 18:41:36 +0000 |
commit | 5801ede4cd403953c0b809c8b2efdbc77fe0e371 (patch) | |
tree | da155eea89966133c075e247a06295220bb01df6 /cpp/src | |
parent | d67b869a335e14a6713fcf59cb65b0f05669bb40 (diff) | |
download | qpid-python-5801ede4cd403953c0b809c8b2efdbc77fe0e371.tar.gz |
remove looping for position search and replace with stl algorithms for better performance
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@834172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 95 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuedMessage.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 3 |
4 files changed, 65 insertions, 38 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 11b2682575..a3a7336f35 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -247,18 +247,18 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "Attempting to acquire message at " << position); - for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { - if (i->position == position) { - message = *i; - if (lastValueQueue) { - clearLVQIndex(*i); - } - QPID_LOG(debug, - "Acquired message at " << i->position << " from " << name); - messages.erase(i); - return true; + + Messages::iterator i = findAt(position); + if (i != messages.end() ) { + message = *i; + if (lastValueQueue) { + clearLVQIndex(*i); } - } + QPID_LOG(debug, + "Acquired message at " << i->position << " from " << name); + messages.erase(i); + return true; + } QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); return false; } @@ -266,21 +266,21 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "attempting to acquire " << msg.position); - for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { - if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set - || (lastValueQueue && (i->position == msg.position) && - msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { - - clearLVQIndex(msg); - QPID_LOG(debug, - "Match found, acquire succeeded: " << - i->position << " == " << msg.position); - messages.erase(i); - return true; - } else { - QPID_LOG(debug, "No match: " << i->position << " != " << msg.position); - } + Messages::iterator i = findAt(msg.position); + if ((i != messages.end() && !lastValueQueue) // note that in some cases payload not be set + || (lastValueQueue && (i->position == msg.position) && + msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { + + clearLVQIndex(msg); + QPID_LOG(debug, + "Match found, acquire succeeded: " << + i->position << " == " << msg.position); + messages.erase(i); + return true; + } else { + QPID_LOG(debug, "No match: " << i->position << " != " << msg.position); } + QPID_LOG(debug, "Acquire failed for " << msg.position); return false; } @@ -449,19 +449,35 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { return false; } -namespace { -struct PositionEquals { - SequenceNumber pos; - PositionEquals(SequenceNumber p) : pos(p) {} - bool operator()(const QueuedMessage& msg) const { return msg.position == pos; } -}; -}// namespace +Queue::Messages::iterator Queue::findAt(SequenceNumber pos) { + + if(!messages.empty()){ + QueuedMessage compM; + compM.position = pos; + unsigned long diff = pos.getValue() - messages.front().position.getValue(); + long maxEnd = diff < messages.size()? diff : messages.size(); + + Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); + if (i!= messages.end() && i->position == pos) + return i; + } + return messages.end(); // no match found. +} + QueuedMessage Queue::find(SequenceNumber pos) const { + Mutex::ScopedLock locker(messageLock); - Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos)); - if (i != messages.end()) - return *i; + if(!messages.empty()){ + QueuedMessage compM; + compM.position = pos; + unsigned long diff = pos.getValue() - messages.front().position.getValue(); + long maxEnd = diff < messages.size()? diff : messages.size(); + + Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); + if (i != messages.end()) + return *i; + } return QueuedMessage(); } @@ -646,10 +662,9 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) } /** function only provided for unit tests, or code not in critical message path */ -uint32_t Queue::getMessageCount() const +uint32_t Queue::getEnqueueCompleteMessageCount() const { Mutex::ScopedLock locker(messageLock); - uint32_t count = 0; for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { //NOTE: don't need to use checkLvqReplace() here as it @@ -661,6 +676,12 @@ uint32_t Queue::getMessageCount() const return count; } +uint32_t Queue::getMessageCount() const +{ + Mutex::ScopedLock locker(messageLock); + return messages.size(); +} + uint32_t Queue::getConsumerCount() const { Mutex::ScopedLock locker(consumerLock); diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 5d9fbebc7d..f0d02b8cad 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -148,6 +148,8 @@ namespace qpid { } } } + + Messages::iterator findAt(framing::SequenceNumber pos); public: @@ -221,6 +223,7 @@ namespace qpid { uint32_t move(const Queue::shared_ptr destq, uint32_t qty); QPID_BROKER_EXTERN uint32_t getMessageCount() const; + QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; QPID_BROKER_EXTERN uint32_t getConsumerCount() const; inline const string& getName() const { return name; } bool isExclusiveOwner(const OwnershipToken* const o) const; diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h index ccc9b539df..35e48b11f3 100644 --- a/cpp/src/qpid/broker/QueuedMessage.h +++ b/cpp/src/qpid/broker/QueuedMessage.h @@ -38,7 +38,9 @@ struct QueuedMessage QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : payload(msg), position(sn), queue(q) {} QueuedMessage(Queue* q) : queue(q) {} + }; + inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } }} diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 3cfaa763ca..e1645d97e3 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -120,9 +120,10 @@ QPID_AUTO_TEST_CASE(testAsyncMessageCount){ queue->process(msg1); sleep(2); uint32_t compval=0; - BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); + BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); msg1->enqueueComplete(); compval=1; + BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); } |