summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp68
1 files changed, 31 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index befc5c4eff..8bbccda844 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -186,6 +186,8 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
+ if (policy.get() && !policy->isEnqueued(msg)) return;
+
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -415,29 +417,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
- messages.push_back(QueuedMessage(this, msg, ++sequence));
- if (policy.get()) {
- policy->enqueued(msg->contentSize());
- if (policy->limitExceeded()) {
- if (!policyExceeded) {
- policyExceeded = true;
- QPID_LOG(info, "Queue size exceeded policy for " << name);
- }
- if (store) {
- QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
- msg->releaseContent(store);
- } else {
- QPID_LOG(error, "Message " << msg << " on " << name
- << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
- }
- } else {
- if (policyExceeded) {
- policyExceeded = false;
- QPID_LOG(info, "Queue size within policy for " << name);
- }
- }
- }
+ QueuedMessage qm(this, msg, ++sequence);
+ if (policy.get()) policy->tryEnqueue(qm);
+
+ messages.push_back(qm);
listeners.swap(copy);
}
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
@@ -486,15 +469,16 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
}
// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ if (policy.get() && !policy->isEnqueued(msg)) return false;
{
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
}
- if (msg->isPersistent() && store) {
- msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ if (msg.payload->isPersistent() && store) {
+ msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
return true;
}
@@ -508,7 +492,7 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
*/
void Queue::popAndDequeue()
{
- boost::intrusive_ptr<Message> msg = messages.front().payload;
+ QueuedMessage msg = messages.front();
messages.pop_front();
dequeue(0, msg);
}
@@ -517,15 +501,15 @@ void Queue::popAndDequeue()
* Updates policy and management when a message has been dequeued,
* expects messageLock to be held
*/
-void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+void Queue::dequeued(const QueuedMessage& msg)
{
- if (policy.get()) policy->dequeued(msg->contentSize());
+ if (policy.get()) policy->dequeued(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg->contentSize());
- if (msg->isPersistent ()){
+ mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+ if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
}
}
}
@@ -551,10 +535,7 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings)
{
- std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
- if (_policy->getMaxCount() || _policy->getMaxSize()) {
- setPolicy(_policy);
- }
+ setPolicy(QueuePolicy::createQueuePolicy(_settings));
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
@@ -720,6 +701,19 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+bool Queue::releaseMessageContent(const QueuedMessage& m)
+{
+ if (store) {
+ QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
+ m.payload->releaseContent(store);
+ return true;
+ } else {
+ QPID_LOG(warning, "Message " << m.position << " on " << name
+ << " cannot be released from memory as the queue is not durable");
+ return false;
+ }
+}
+
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;