summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerQueue.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp38
1 files changed, 17 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 7311d043d0..553f6016d2 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -102,10 +102,10 @@ void Queue::process(Message::shared_ptr& msg){
}
-void Queue::requeue(Message::shared_ptr& msg){
+void Queue::requeue(const QueuedMessage& msg){
{
Mutex::ScopedLock locker(messageLock);
- msg->enqueueComplete(); // mark the message as enqueued
+ msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
}
serializer.execute(dispatchCallback);
@@ -118,7 +118,7 @@ void Queue::requestDispatch(){
}
-bool Queue::dispatch(Message::shared_ptr& msg){
+bool Queue::dispatch(QueuedMessage& msg){
RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
@@ -144,21 +144,19 @@ bool Queue::dispatch(Message::shared_ptr& msg){
void Queue::dispatch(){
-
-
- Message::shared_ptr msg;
+ QueuedMessage msg;
while(true){
{
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) break;
msg = messages.front();
}
- if( msg->isEnqueueComplete() && dispatch(msg) ){
+ if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
pop();
- }else break;
-
- }
-
+ } else {
+ break;
+ }
+ }
}
void Queue::consume(Consumer* c, bool requestExclusive){
@@ -185,18 +183,16 @@ void Queue::cancel(Consumer* c){
if(exclusive == c) exclusive = 0;
}
-Message::shared_ptr Queue::dequeue(){
+QueuedMessage Queue::dequeue(){
Mutex::ScopedLock locker(messageLock);
- Message::shared_ptr msg;
+ QueuedMessage msg;
if(!messages.empty()){
msg = messages.front();
- if (msg->isEnqueueComplete()){
+ if (msg.payload->isEnqueueComplete()){
pop();
- return msg;
}
}
- Message::shared_ptr msg_empty;
- return msg_empty;
+ return msg;
}
uint32_t Queue::purge(){
@@ -208,13 +204,13 @@ uint32_t Queue::purge(){
void Queue::pop(){
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->dequeued(messages.front()->contentSize());
+ if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
Mutex::ScopedLock locker(messageLock);
- messages.push_back(msg);
+ messages.push_back(QueuedMessage(msg, ++sequence));
if (policy.get()) {
policy->enqueued(msg->contentSize());
if (policy->limitExceeded()) {
@@ -229,7 +225,7 @@ uint32_t Queue::getMessageCount() const{
uint32_t count =0;
for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- if ( (*i)->isEnqueueComplete() ) count ++;
+ if ( i->payload->isEnqueueComplete() ) count ++;
}
return count;
@@ -296,7 +292,7 @@ void Queue::destroy()
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
while(!messages.empty()){
- DeliverableMessage msg(messages.front());
+ DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
&(msg.getMessage().getApplicationHeaders()));
pop();