summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Channel.cpp')
-rw-r--r--cpp/src/qpid/client/Channel.cpp51
1 files changed, 16 insertions, 35 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index a7b30f2f94..fad648f27d 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -17,7 +17,6 @@
*/
#include "qpid/client/Channel.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/ThreadFactory.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
@@ -29,26 +28,15 @@ using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
id(0),
con(0),
- dispatcher(0),
out(0),
incoming(0),
closed(true),
prefetch(_prefetch),
transactional(_transactional)
-{
- threadFactory = new ThreadFactory();
- dispatchMonitor = new Monitor();
- retrievalMonitor = new Monitor();
-}
+{ }
Channel::~Channel(){
- if(dispatcher){
- stop();
- delete dispatcher;
- }
- delete retrievalMonitor;
- delete dispatchMonitor;
- delete threadFactory;
+ stop();
}
void Channel::setPrefetch(u_int16_t _prefetch){
@@ -176,9 +164,9 @@ void Channel::cancelAll(){
}
void Channel::retrieve(Message& msg){
- retrievalMonitor->acquire();
+ Monitor::ScopedLock l(retrievalMonitor);
while(retrieved == 0){
- retrievalMonitor->wait();
+ retrievalMonitor.wait();
}
msg.header = retrieved->getHeader();
@@ -186,8 +174,6 @@ void Channel::retrieve(Message& msg){
retrieved->getData(msg.data);
delete retrieved;
retrieved = 0;
-
- retrievalMonitor->release();
}
bool Channel::get(Message& msg, const Queue& queue, int ackMode){
@@ -315,18 +301,16 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void Channel::start(){
- dispatcher = threadFactory->create(this);
- dispatcher->start();
+ dispatcher = Thread(this);
}
void Channel::stop(){
- closed = true;
- dispatchMonitor->acquire();
- dispatchMonitor->notify();
- dispatchMonitor->release();
- if(dispatcher){
- dispatcher->join();
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ closed = true;
+ dispatchMonitor.notify();
}
+ dispatcher.join();
}
void Channel::run(){
@@ -335,30 +319,27 @@ void Channel::run(){
void Channel::enqueue(){
if(incoming->isResponse()){
- retrievalMonitor->acquire();
+ Monitor::ScopedLock l(retrievalMonitor);
retrieved = incoming;
- retrievalMonitor->notify();
- retrievalMonitor->release();
+ retrievalMonitor.notify();
}else{
- dispatchMonitor->acquire();
+ Monitor::ScopedLock l(dispatchMonitor);
messages.push(incoming);
- dispatchMonitor->notify();
- dispatchMonitor->release();
+ dispatchMonitor.notify();
}
incoming = 0;
}
IncomingMessage* Channel::dequeue(){
- dispatchMonitor->acquire();
+ Monitor::ScopedLock l(dispatchMonitor);
while(messages.empty() && !closed){
- dispatchMonitor->wait();
+ dispatchMonitor.wait();
}
IncomingMessage* msg = 0;
if(!messages.empty()){
msg = messages.front();
messages.pop();
}
- dispatchMonitor->release();
return msg;
}