diff options
| author | Alan Conway <aconway@apache.org> | 2006-11-08 17:07:44 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-11-08 17:07:44 +0000 |
| commit | 5be658a8817b8092a7b53b116f622412a5d0aef6 (patch) | |
| tree | bbee363911f47bad6abe245629cacba0ebf3b9be /cpp/src/qpid/client | |
| parent | 85fef3d1e669b240deae82cdd353620667a85e08 (diff) | |
| download | qpid-python-5be658a8817b8092a7b53b116f622412a5d0aef6.tar.gz | |
More reorg to separate APR/posix code, work in progress.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472545 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Channel.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ResponseHandler.cpp | 25 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ResponseHandler.h | 4 |
4 files changed, 31 insertions, 59 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index a7b30f2f94..fad648f27d 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -17,7 +17,6 @@ */ #include "qpid/client/Channel.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/ThreadFactory.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" @@ -29,26 +28,15 @@ using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), con(0), - dispatcher(0), out(0), incoming(0), closed(true), prefetch(_prefetch), transactional(_transactional) -{ - threadFactory = new ThreadFactory(); - dispatchMonitor = new Monitor(); - retrievalMonitor = new Monitor(); -} +{ } Channel::~Channel(){ - if(dispatcher){ - stop(); - delete dispatcher; - } - delete retrievalMonitor; - delete dispatchMonitor; - delete threadFactory; + stop(); } void Channel::setPrefetch(u_int16_t _prefetch){ @@ -176,9 +164,9 @@ void Channel::cancelAll(){ } void Channel::retrieve(Message& msg){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); while(retrieved == 0){ - retrievalMonitor->wait(); + retrievalMonitor.wait(); } msg.header = retrieved->getHeader(); @@ -186,8 +174,6 @@ void Channel::retrieve(Message& msg){ retrieved->getData(msg.data); delete retrieved; retrieved = 0; - - retrievalMonitor->release(); } bool Channel::get(Message& msg, const Queue& queue, int ackMode){ @@ -315,18 +301,16 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - dispatcher = threadFactory->create(this); - dispatcher->start(); + dispatcher = Thread(this); } void Channel::stop(){ - closed = true; - dispatchMonitor->acquire(); - dispatchMonitor->notify(); - dispatchMonitor->release(); - if(dispatcher){ - dispatcher->join(); + { + Monitor::ScopedLock l(dispatchMonitor); + closed = true; + dispatchMonitor.notify(); } + dispatcher.join(); } void Channel::run(){ @@ -335,30 +319,27 @@ void Channel::run(){ void Channel::enqueue(){ if(incoming->isResponse()){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); retrieved = incoming; - retrievalMonitor->notify(); - retrievalMonitor->release(); + retrievalMonitor.notify(); }else{ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); messages.push(incoming); - dispatchMonitor->notify(); - dispatchMonitor->release(); + dispatchMonitor.notify(); } incoming = 0; } IncomingMessage* Channel::dequeue(){ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); while(messages.empty() && !closed){ - dispatchMonitor->wait(); + dispatchMonitor.wait(); } IncomingMessage* msg = 0; if(!messages.empty()){ msg = messages.front(); messages.pop(); } - dispatchMonitor->release(); return msg; } diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h index fa8cd3afe0..daf2b6f9d9 100644 --- a/cpp/src/qpid/client/Channel.h +++ b/cpp/src/qpid/client/Channel.h @@ -24,9 +24,6 @@ #define _Channel_ #include "qpid/framing/amqp_framing.h" - -#include "qpid/sys/ThreadFactory.h" - #include "qpid/client/Connection.h" #include "qpid/client/Exchange.h" #include "qpid/client/IncomingMessage.h" @@ -51,15 +48,14 @@ namespace client { u_int16_t id; Connection* con; - qpid::sys::ThreadFactory* threadFactory; - qpid::sys::Thread* dispatcher; + qpid::sys::Thread dispatcher; qpid::framing::OutputHandler* out; IncomingMessage* incoming; ResponseHandler responses; std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume IncomingMessage* retrieved;//holds response to basic.get - qpid::sys::Monitor* dispatchMonitor; - qpid::sys::Monitor* retrievalMonitor; + qpid::sys::Monitor dispatchMonitor; + qpid::sys::Monitor retrievalMonitor; std::map<std::string, Consumer*> consumers; ReturnedMessageHandler* returnsHandler; bool closed; diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp index 16989e2c25..5d2e03c9d9 100644 --- a/cpp/src/qpid/client/ResponseHandler.cpp +++ b/cpp/src/qpid/client/ResponseHandler.cpp @@ -19,40 +19,35 @@ #include "qpid/sys/Monitor.h" #include "qpid/QpidError.h" -qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ - monitor = new qpid::sys::Monitor(); -} +using namespace qpid::sys; -qpid::client::ResponseHandler::~ResponseHandler(){ - delete monitor; -} +qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} + +qpid::client::ResponseHandler::~ResponseHandler(){} bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ return expected.match(response.get()); } void qpid::client::ResponseHandler::waitForResponse(){ - monitor->acquire(); + Monitor::ScopedLock l(monitor); if(waiting){ - monitor->wait(); + monitor.wait(); } - monitor->release(); } void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ response = _response; - monitor->acquire(); + Monitor::ScopedLock l(monitor); waiting = false; - monitor->notify(); - monitor->release(); + monitor.notify(); } void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ - monitor->acquire(); + Monitor::ScopedLock l(monitor); if(waiting){ - monitor->wait(); + monitor.wait(); } - monitor->release(); if(!validate(expected)){ THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); } diff --git a/cpp/src/qpid/client/ResponseHandler.h b/cpp/src/qpid/client/ResponseHandler.h index ac4c351211..247c974c14 100644 --- a/cpp/src/qpid/client/ResponseHandler.h +++ b/cpp/src/qpid/client/ResponseHandler.h @@ -17,7 +17,7 @@ */ #include <string> #include "qpid/framing/amqp_framing.h" -#include "qpid/sys/Monitor.h" +#include <qpid/sys/Monitor.h> #ifndef _ResponseHandler_ #define _ResponseHandler_ @@ -28,7 +28,7 @@ namespace qpid { class ResponseHandler{ bool waiting; qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor* monitor; + qpid::sys::Monitor monitor; public: ResponseHandler(); |
