summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
1 files changed, 13 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 000552715b..46b14a23f5 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -38,7 +38,7 @@ Queue::Queue(const string& _name, u_int32_t _autodelete,
exclusive(0),
persistenceId(0)
{
- if(autodelete) lastUsed = Time::now().msecs();
+ if(autodelete) lastUsed = getTimeMsecs();
}
Queue::~Queue(){
@@ -58,7 +58,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}
void Queue::process(Message::shared_ptr& msg){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(queueing || !dispatch(msg)){
queueing = true;
messages.push(msg);
@@ -90,7 +90,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){
}
bool Queue::startDispatching(){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(queueing && !dispatching){
dispatching = true;
return true;
@@ -102,7 +102,7 @@ bool Queue::startDispatching(){
void Queue::dispatch(){
bool proceed = startDispatching();
while(proceed){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(!messages.empty() && dispatch(messages.front())){
messages.pop();
}else{
@@ -114,7 +114,7 @@ void Queue::dispatch(){
}
void Queue::consume(Consumer* c, bool requestExclusive){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(exclusive) throw ExclusiveAccessException();
if(requestExclusive){
if(!consumers.empty()) throw ExclusiveAccessException();
@@ -126,14 +126,14 @@ void Queue::consume(Consumer* c, bool requestExclusive){
}
void Queue::cancel(Consumer* c){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = Time::now().msecs();
+ if(autodelete && consumers.empty()) lastUsed = getTimeMsecs();
if(exclusive == c) exclusive = 0;
}
Message::shared_ptr Queue::dequeue(){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
@@ -143,25 +143,25 @@ Message::shared_ptr Queue::dequeue(){
}
u_int32_t Queue::purge(){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
int count = messages.size();
while(!messages.empty()) messages.pop();
return count;
}
u_int32_t Queue::getMessageCount() const{
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
return messages.size();
}
u_int32_t Queue::getConsumerCount() const{
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
return consumers.size();
}
bool Queue::canAutoDelete() const{
- Locker locker(lock);
- return lastUsed && (Time::now().msecs() - lastUsed > autodelete);
+ Mutex::ScopedLock locker(lock);
+ return lastUsed && (getTimeMsecs() - lastUsed > autodelete);
}
void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)