diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerQueue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 38 |
1 files changed, 17 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 7311d043d0..553f6016d2 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -102,10 +102,10 @@ void Queue::process(Message::shared_ptr& msg){ } -void Queue::requeue(Message::shared_ptr& msg){ +void Queue::requeue(const QueuedMessage& msg){ { Mutex::ScopedLock locker(messageLock); - msg->enqueueComplete(); // mark the message as enqueued + msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); } serializer.execute(dispatchCallback); @@ -118,7 +118,7 @@ void Queue::requestDispatch(){ } -bool Queue::dispatch(Message::shared_ptr& msg){ +bool Queue::dispatch(QueuedMessage& msg){ RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... @@ -144,21 +144,19 @@ bool Queue::dispatch(Message::shared_ptr& msg){ void Queue::dispatch(){ - - - Message::shared_ptr msg; + QueuedMessage msg; while(true){ { Mutex::ScopedLock locker(messageLock); if (messages.empty()) break; msg = messages.front(); } - if( msg->isEnqueueComplete() && dispatch(msg) ){ + if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { pop(); - }else break; - - } - + } else { + break; + } + } } void Queue::consume(Consumer* c, bool requestExclusive){ @@ -185,18 +183,16 @@ void Queue::cancel(Consumer* c){ if(exclusive == c) exclusive = 0; } -Message::shared_ptr Queue::dequeue(){ +QueuedMessage Queue::dequeue(){ Mutex::ScopedLock locker(messageLock); - Message::shared_ptr msg; + QueuedMessage msg; if(!messages.empty()){ msg = messages.front(); - if (msg->isEnqueueComplete()){ + if (msg.payload->isEnqueueComplete()){ pop(); - return msg; } } - Message::shared_ptr msg_empty; - return msg_empty; + return msg; } uint32_t Queue::purge(){ @@ -208,13 +204,13 @@ uint32_t Queue::purge(){ void Queue::pop(){ Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->dequeued(messages.front()->contentSize()); + if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); messages.pop_front(); } void Queue::push(Message::shared_ptr& msg){ Mutex::ScopedLock locker(messageLock); - messages.push_back(msg); + messages.push_back(QueuedMessage(msg, ++sequence)); if (policy.get()) { policy->enqueued(msg->contentSize()); if (policy->limitExceeded()) { @@ -229,7 +225,7 @@ uint32_t Queue::getMessageCount() const{ uint32_t count =0; for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - if ( (*i)->isEnqueueComplete() ) count ++; + if ( i->payload->isEnqueueComplete() ) count ++; } return count; @@ -296,7 +292,7 @@ void Queue::destroy() if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); while(!messages.empty()){ - DeliverableMessage msg(messages.front()); + DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), &(msg.getMessage().getApplicationHeaders())); pop(); |