summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-07-25 13:50:32 +0000
committerGordon Sim <gsim@apache.org>2008-07-25 13:50:32 +0000
commit1ce91a21e53358cfdb88203a7d00f9e99c4f5487 (patch)
treed8c3ee61c703c79e54185f22f07bc63f88302fb2 /cpp/src
parentc063c7b4a350ce64abb4075b28b0de16fd5ec71c (diff)
downloadqpid-python-1ce91a21e53358cfdb88203a7d00f9e99c4f5487.tar.gz
Only reduce count and size maintained for queue plicy when messages are actually dequeued (i.e. acked).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@679805 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp68
-rw-r--r--cpp/src/qpid/broker/Queue.h9
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp15
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.h2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/tests/QueueTest.cpp8
-rw-r--r--cpp/src/tests/TxPublishTest.cpp4
7 files changed, 68 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index bf64760fc7..d718acff03 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -230,7 +230,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
if (c.filter(msg.payload)) {
if (c.accept(msg.payload)) {
m = msg;
- pop();
+ messages.pop_front();
return true;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -361,13 +361,13 @@ void Queue::cancel(Consumer& c){
mgmtObject->dec_consumerCount ();
}
-QueuedMessage Queue::dequeue(){
+QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = messages.front();
- pop();
+ messages.pop_front();
}
return msg;
}
@@ -376,35 +376,11 @@ uint32_t Queue::purge(){
Mutex::ScopedLock locker(messageLock);
int count = messages.size();
while(!messages.empty()) {
- QueuedMessage& msg = messages.front();
- if (store && msg.payload->isPersistent()) {
- boost::intrusive_ptr<PersistableMessage> pmsg =
- boost::static_pointer_cast<PersistableMessage>(msg.payload);
- store->dequeue(0, pmsg, *this);
- }
- pop();
+ popAndDequeue();
}
return count;
}
-/**
- * Assumes messageLock is held
- */
-void Queue::pop(){
- QueuedMessage& msg = messages.front();
-
- if (policy.get()) policy->dequeued(msg.payload->contentSize());
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- if (msg.payload->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
- }
- }
- messages.pop_front();
-}
-
void Queue::push(boost::intrusive_ptr<Message>& msg){
Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
@@ -421,7 +397,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
} else {
QPID_LOG(error, "Message " << msg << " on " << name
<< " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
}
} else {
if (policyExceeded) {
@@ -475,6 +451,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ dequeued(msg);
+ }
if (msg->isPersistent() && store) {
msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
@@ -485,6 +465,34 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
return false;
}
+/**
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
+ */
+void Queue::popAndDequeue()
+{
+ boost::intrusive_ptr<Message> msg = messages.front().payload;
+ messages.pop_front();
+ dequeue(0, msg);
+}
+
+/**
+ * Updates policy and management when a message has been dequeued,
+ * expects messageLock to be held
+ */
+void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+{
+ if (policy.get()) policy->dequeued(msg->contentSize());
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ if (msg->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ }
+ }
+}
+
namespace
{
@@ -534,7 +542,7 @@ void Queue::destroy()
DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
- pop();
+ popAndDequeue();
}
alternateExchange->decAlternateUsers();
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 5b2311ce2c..f1694eb5a4 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -98,7 +98,6 @@ namespace qpid {
framing::SequenceNumber sequence;
management::Queue* mgmtObject;
- void pop();
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
bool seek(QueuedMessage& msg, Consumer& position);
@@ -112,6 +111,9 @@ namespace qpid {
bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ void dequeued(boost::intrusive_ptr<Message>& msg);
+ void popAndDequeue();
+
public:
virtual void notifyDurableIOComplete();
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -178,10 +180,11 @@ namespace qpid {
* dequeue from store (only done once messages is acknowledged)
*/
bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+
/**
- * dequeues from memory only
+ * Gets the next available message
*/
- QueuedMessage dequeue();
+ QueuedMessage get();
const QueuePolicy* getPolicy();
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index de84362f8f..08838aac79 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -71,3 +71,18 @@ const std::string QueuePolicy::maxCountKey("qpid.max_count");
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
uint64_t QueuePolicy::defaultMaxSize(0);
+namespace qpid {
+ namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
+{
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
+ else out << "size unlimited, current=" << p.size;
+ out << "; ";
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
+ else out << "count unlimited, current=" << p.count;
+ return out;
+}
+
+ }
+}
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h
index 2135e327a7..4511a63b64 100644
--- a/cpp/src/qpid/broker/QueuePolicy.h
+++ b/cpp/src/qpid/broker/QueuePolicy.h
@@ -21,6 +21,7 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
+#include <iostream>
#include "qpid/framing/FieldTable.h"
namespace qpid {
@@ -50,6 +51,7 @@ namespace qpid {
uint64_t getMaxSize() const { return maxSize; }
static void setDefaultMaxSize(uint64_t);
+ friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
};
}
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 085578295d..1cbde08630 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -429,7 +429,7 @@ void SemanticState::recover(bool requeue)
bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
{
- QueuedMessage msg = queue->dequeue();
+ QueuedMessage msg = queue->get();
if(msg.payload){
DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
if(ackExpected){
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 39696707f4..20b3d90eb6 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -91,7 +91,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
BOOST_CHECK(!c1.received);
msg1->enqueueComplete();
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK_EQUAL(msg1.get(), received.get());
}
@@ -179,11 +179,11 @@ QPID_AUTO_TEST_CASE(testDequeue){
BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK_EQUAL(msg1.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK_EQUAL(msg2.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
@@ -196,7 +196,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK(!received);
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index 5c4686c905..9e9715c987 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -82,13 +82,13 @@ QPID_AUTO_TEST_CASE(testCommit)
t.op.prepare(0);
t.op.commit();
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
- intrusive_ptr<Message> msg_dequeue = t.queue1->dequeue().payload;
+ intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
BOOST_CHECK_EQUAL( true, (static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());
- BOOST_CHECK_EQUAL(t.msg, t.queue2->dequeue().payload);
+ BOOST_CHECK_EQUAL(t.msg, t.queue2->get().payload);
}
QPID_AUTO_TEST_SUITE_END()