diff options
Diffstat (limited to 'cpp/src/qpid/client/Channel.cpp')
| -rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 51 |
1 files changed, 16 insertions, 35 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index a7b30f2f94..fad648f27d 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -17,7 +17,6 @@ */ #include "qpid/client/Channel.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/ThreadFactory.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" @@ -29,26 +28,15 @@ using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), con(0), - dispatcher(0), out(0), incoming(0), closed(true), prefetch(_prefetch), transactional(_transactional) -{ - threadFactory = new ThreadFactory(); - dispatchMonitor = new Monitor(); - retrievalMonitor = new Monitor(); -} +{ } Channel::~Channel(){ - if(dispatcher){ - stop(); - delete dispatcher; - } - delete retrievalMonitor; - delete dispatchMonitor; - delete threadFactory; + stop(); } void Channel::setPrefetch(u_int16_t _prefetch){ @@ -176,9 +164,9 @@ void Channel::cancelAll(){ } void Channel::retrieve(Message& msg){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); while(retrieved == 0){ - retrievalMonitor->wait(); + retrievalMonitor.wait(); } msg.header = retrieved->getHeader(); @@ -186,8 +174,6 @@ void Channel::retrieve(Message& msg){ retrieved->getData(msg.data); delete retrieved; retrieved = 0; - - retrievalMonitor->release(); } bool Channel::get(Message& msg, const Queue& queue, int ackMode){ @@ -315,18 +301,16 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - dispatcher = threadFactory->create(this); - dispatcher->start(); + dispatcher = Thread(this); } void Channel::stop(){ - closed = true; - dispatchMonitor->acquire(); - dispatchMonitor->notify(); - dispatchMonitor->release(); - if(dispatcher){ - dispatcher->join(); + { + Monitor::ScopedLock l(dispatchMonitor); + closed = true; + dispatchMonitor.notify(); } + dispatcher.join(); } void Channel::run(){ @@ -335,30 +319,27 @@ void Channel::run(){ void Channel::enqueue(){ if(incoming->isResponse()){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); retrieved = incoming; - retrievalMonitor->notify(); - retrievalMonitor->release(); + retrievalMonitor.notify(); }else{ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); messages.push(incoming); - dispatchMonitor->notify(); - dispatchMonitor->release(); + dispatchMonitor.notify(); } incoming = 0; } IncomingMessage* Channel::dequeue(){ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); while(messages.empty() && !closed){ - dispatchMonitor->wait(); + dispatchMonitor.wait(); } IncomingMessage* msg = 0; if(!messages.empty()){ msg = messages.front(); messages.pop(); } - dispatchMonitor->release(); return msg; } |
