diff options
author | Gordon Sim <gsim@apache.org> | 2008-07-25 13:50:32 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-07-25 13:50:32 +0000 |
commit | 1ce91a21e53358cfdb88203a7d00f9e99c4f5487 (patch) | |
tree | d8c3ee61c703c79e54185f22f07bc63f88302fb2 /cpp/src | |
parent | c063c7b4a350ce64abb4075b28b0de16fd5ec71c (diff) | |
download | qpid-python-1ce91a21e53358cfdb88203a7d00f9e99c4f5487.tar.gz |
Only reduce count and size maintained for queue plicy when messages are actually dequeued (i.e. acked).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@679805 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 68 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/TxPublishTest.cpp | 4 |
7 files changed, 68 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index bf64760fc7..d718acff03 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -230,7 +230,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) if (c.filter(msg.payload)) { if (c.accept(msg.payload)) { m = msg; - pop(); + messages.pop_front(); return true; } else { //message(s) are available but consumer hasn't got enough credit @@ -361,13 +361,13 @@ void Queue::cancel(Consumer& c){ mgmtObject->dec_consumerCount (); } -QueuedMessage Queue::dequeue(){ +QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if(!messages.empty()){ msg = messages.front(); - pop(); + messages.pop_front(); } return msg; } @@ -376,35 +376,11 @@ uint32_t Queue::purge(){ Mutex::ScopedLock locker(messageLock); int count = messages.size(); while(!messages.empty()) { - QueuedMessage& msg = messages.front(); - if (store && msg.payload->isPersistent()) { - boost::intrusive_ptr<PersistableMessage> pmsg = - boost::static_pointer_cast<PersistableMessage>(msg.payload); - store->dequeue(0, pmsg, *this); - } - pop(); + popAndDequeue(); } return count; } -/** - * Assumes messageLock is held - */ -void Queue::pop(){ - QueuedMessage& msg = messages.front(); - - if (policy.get()) policy->dequeued(msg.payload->contentSize()); - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - if (msg.payload->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); - } - } - messages.pop_front(); -} - void Queue::push(boost::intrusive_ptr<Message>& msg){ Mutex::ScopedLock locker(messageLock); messages.push_back(QueuedMessage(this, msg, ++sequence)); @@ -421,7 +397,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ } else { QPID_LOG(error, "Message " << msg << " on " << name << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); - throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name)); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); } } else { if (policyExceeded) { @@ -475,6 +451,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { + { + Mutex::ScopedLock locker(messageLock); + dequeued(msg); + } if (msg->isPersistent() && store) { msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); @@ -485,6 +465,34 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) return false; } +/** + * Removes a message from the in-memory delivery queue as well + * dequeing it from the logical (and persistent if applicable) queue + */ +void Queue::popAndDequeue() +{ + boost::intrusive_ptr<Message> msg = messages.front().payload; + messages.pop_front(); + dequeue(0, msg); +} + +/** + * Updates policy and management when a message has been dequeued, + * expects messageLock to be held + */ +void Queue::dequeued(boost::intrusive_ptr<Message>& msg) +{ + if (policy.get()) policy->dequeued(msg->contentSize()); + if (mgmtObject != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + if (msg->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + } + } +} + namespace { @@ -534,7 +542,7 @@ void Queue::destroy() DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); - pop(); + popAndDequeue(); } alternateExchange->decAlternateUsers(); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 5b2311ce2c..f1694eb5a4 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -98,7 +98,6 @@ namespace qpid { framing::SequenceNumber sequence; management::Queue* mgmtObject; - void pop(); void push(boost::intrusive_ptr<Message>& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); bool seek(QueuedMessage& msg, Consumer& position); @@ -112,6 +111,9 @@ namespace qpid { bool isExcluded(boost::intrusive_ptr<Message>& msg); + void dequeued(boost::intrusive_ptr<Message>& msg); + void popAndDequeue(); + public: virtual void notifyDurableIOComplete(); typedef boost::shared_ptr<Queue> shared_ptr; @@ -178,10 +180,11 @@ namespace qpid { * dequeue from store (only done once messages is acknowledged) */ bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); + /** - * dequeues from memory only + * Gets the next available message */ - QueuedMessage dequeue(); + QueuedMessage get(); const QueuePolicy* getPolicy(); diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index de84362f8f..08838aac79 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -71,3 +71,18 @@ const std::string QueuePolicy::maxCountKey("qpid.max_count"); const std::string QueuePolicy::maxSizeKey("qpid.max_size"); uint64_t QueuePolicy::defaultMaxSize(0); +namespace qpid { + namespace broker { + +std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) +{ + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; + else out << "size unlimited, current=" << p.size; + out << "; "; + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; + else out << "count unlimited, current=" << p.count; + return out; +} + + } +} diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h index 2135e327a7..4511a63b64 100644 --- a/cpp/src/qpid/broker/QueuePolicy.h +++ b/cpp/src/qpid/broker/QueuePolicy.h @@ -21,6 +21,7 @@ #ifndef _QueuePolicy_ #define _QueuePolicy_ +#include <iostream> #include "qpid/framing/FieldTable.h" namespace qpid { @@ -50,6 +51,7 @@ namespace qpid { uint64_t getMaxSize() const { return maxSize; } static void setDefaultMaxSize(uint64_t); + friend std::ostream& operator<<(std::ostream&, const QueuePolicy&); }; } } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 085578295d..1cbde08630 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -429,7 +429,7 @@ void SemanticState::recover(bool requeue) bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) { - QueuedMessage msg = queue->dequeue(); + QueuedMessage msg = queue->get(); if(msg.payload){ DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token); if(ackExpected){ diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 39696707f4..20b3d90eb6 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -91,7 +91,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { BOOST_CHECK(!c1.received); msg1->enqueueComplete(); - received = queue->dequeue().payload; + received = queue->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); } @@ -179,11 +179,11 @@ QPID_AUTO_TEST_CASE(testDequeue){ BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); - received = queue->dequeue().payload; + received = queue->get().payload; BOOST_CHECK_EQUAL(msg1.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount()); - received = queue->dequeue().payload; + received = queue->get().payload; BOOST_CHECK_EQUAL(msg2.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount()); @@ -196,7 +196,7 @@ QPID_AUTO_TEST_CASE(testDequeue){ BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); - received = queue->dequeue().payload; + received = queue->get().payload; BOOST_CHECK(!received); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index 5c4686c905..9e9715c987 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -82,13 +82,13 @@ QPID_AUTO_TEST_CASE(testCommit) t.op.prepare(0); t.op.commit(); BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount()); - intrusive_ptr<Message> msg_dequeue = t.queue1->dequeue().payload; + intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload; BOOST_CHECK_EQUAL( true, (static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete()); BOOST_CHECK_EQUAL(t.msg, msg_dequeue); BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount()); - BOOST_CHECK_EQUAL(t.msg, t.queue2->dequeue().payload); + BOOST_CHECK_EQUAL(t.msg, t.queue2->get().payload); } QPID_AUTO_TEST_SUITE_END() |