summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-08 17:07:44 +0000
committerAlan Conway <aconway@apache.org>2006-11-08 17:07:44 +0000
commit5be658a8817b8092a7b53b116f622412a5d0aef6 (patch)
treebbee363911f47bad6abe245629cacba0ebf3b9be /cpp/src/qpid/client
parent85fef3d1e669b240deae82cdd353620667a85e08 (diff)
downloadqpid-python-5be658a8817b8092a7b53b116f622412a5d0aef6.tar.gz
More reorg to separate APR/posix code, work in progress.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472545 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Channel.cpp51
-rw-r--r--cpp/src/qpid/client/Channel.h10
-rw-r--r--cpp/src/qpid/client/ResponseHandler.cpp25
-rw-r--r--cpp/src/qpid/client/ResponseHandler.h4
4 files changed, 31 insertions, 59 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index a7b30f2f94..fad648f27d 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -17,7 +17,6 @@
*/
#include "qpid/client/Channel.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/ThreadFactory.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
@@ -29,26 +28,15 @@ using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
id(0),
con(0),
- dispatcher(0),
out(0),
incoming(0),
closed(true),
prefetch(_prefetch),
transactional(_transactional)
-{
- threadFactory = new ThreadFactory();
- dispatchMonitor = new Monitor();
- retrievalMonitor = new Monitor();
-}
+{ }
Channel::~Channel(){
- if(dispatcher){
- stop();
- delete dispatcher;
- }
- delete retrievalMonitor;
- delete dispatchMonitor;
- delete threadFactory;
+ stop();
}
void Channel::setPrefetch(u_int16_t _prefetch){
@@ -176,9 +164,9 @@ void Channel::cancelAll(){
}
void Channel::retrieve(Message& msg){
- retrievalMonitor->acquire();
+ Monitor::ScopedLock l(retrievalMonitor);
while(retrieved == 0){
- retrievalMonitor->wait();
+ retrievalMonitor.wait();
}
msg.header = retrieved->getHeader();
@@ -186,8 +174,6 @@ void Channel::retrieve(Message& msg){
retrieved->getData(msg.data);
delete retrieved;
retrieved = 0;
-
- retrievalMonitor->release();
}
bool Channel::get(Message& msg, const Queue& queue, int ackMode){
@@ -315,18 +301,16 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void Channel::start(){
- dispatcher = threadFactory->create(this);
- dispatcher->start();
+ dispatcher = Thread(this);
}
void Channel::stop(){
- closed = true;
- dispatchMonitor->acquire();
- dispatchMonitor->notify();
- dispatchMonitor->release();
- if(dispatcher){
- dispatcher->join();
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ closed = true;
+ dispatchMonitor.notify();
}
+ dispatcher.join();
}
void Channel::run(){
@@ -335,30 +319,27 @@ void Channel::run(){
void Channel::enqueue(){
if(incoming->isResponse()){
- retrievalMonitor->acquire();
+ Monitor::ScopedLock l(retrievalMonitor);
retrieved = incoming;
- retrievalMonitor->notify();
- retrievalMonitor->release();
+ retrievalMonitor.notify();
}else{
- dispatchMonitor->acquire();
+ Monitor::ScopedLock l(dispatchMonitor);
messages.push(incoming);
- dispatchMonitor->notify();
- dispatchMonitor->release();
+ dispatchMonitor.notify();
}
incoming = 0;
}
IncomingMessage* Channel::dequeue(){
- dispatchMonitor->acquire();
+ Monitor::ScopedLock l(dispatchMonitor);
while(messages.empty() && !closed){
- dispatchMonitor->wait();
+ dispatchMonitor.wait();
}
IncomingMessage* msg = 0;
if(!messages.empty()){
msg = messages.front();
messages.pop();
}
- dispatchMonitor->release();
return msg;
}
diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h
index fa8cd3afe0..daf2b6f9d9 100644
--- a/cpp/src/qpid/client/Channel.h
+++ b/cpp/src/qpid/client/Channel.h
@@ -24,9 +24,6 @@
#define _Channel_
#include "qpid/framing/amqp_framing.h"
-
-#include "qpid/sys/ThreadFactory.h"
-
#include "qpid/client/Connection.h"
#include "qpid/client/Exchange.h"
#include "qpid/client/IncomingMessage.h"
@@ -51,15 +48,14 @@ namespace client {
u_int16_t id;
Connection* con;
- qpid::sys::ThreadFactory* threadFactory;
- qpid::sys::Thread* dispatcher;
+ qpid::sys::Thread dispatcher;
qpid::framing::OutputHandler* out;
IncomingMessage* incoming;
ResponseHandler responses;
std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
IncomingMessage* retrieved;//holds response to basic.get
- qpid::sys::Monitor* dispatchMonitor;
- qpid::sys::Monitor* retrievalMonitor;
+ qpid::sys::Monitor dispatchMonitor;
+ qpid::sys::Monitor retrievalMonitor;
std::map<std::string, Consumer*> consumers;
ReturnedMessageHandler* returnsHandler;
bool closed;
diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp
index 16989e2c25..5d2e03c9d9 100644
--- a/cpp/src/qpid/client/ResponseHandler.cpp
+++ b/cpp/src/qpid/client/ResponseHandler.cpp
@@ -19,40 +19,35 @@
#include "qpid/sys/Monitor.h"
#include "qpid/QpidError.h"
-qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
- monitor = new qpid::sys::Monitor();
-}
+using namespace qpid::sys;
-qpid::client::ResponseHandler::~ResponseHandler(){
- delete monitor;
-}
+qpid::client::ResponseHandler::ResponseHandler() : waiting(false){}
+
+qpid::client::ResponseHandler::~ResponseHandler(){}
bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
return expected.match(response.get());
}
void qpid::client::ResponseHandler::waitForResponse(){
- monitor->acquire();
+ Monitor::ScopedLock l(monitor);
if(waiting){
- monitor->wait();
+ monitor.wait();
}
- monitor->release();
}
void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
response = _response;
- monitor->acquire();
+ Monitor::ScopedLock l(monitor);
waiting = false;
- monitor->notify();
- monitor->release();
+ monitor.notify();
}
void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
- monitor->acquire();
+ Monitor::ScopedLock l(monitor);
if(waiting){
- monitor->wait();
+ monitor.wait();
}
- monitor->release();
if(!validate(expected)){
THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
}
diff --git a/cpp/src/qpid/client/ResponseHandler.h b/cpp/src/qpid/client/ResponseHandler.h
index ac4c351211..247c974c14 100644
--- a/cpp/src/qpid/client/ResponseHandler.h
+++ b/cpp/src/qpid/client/ResponseHandler.h
@@ -17,7 +17,7 @@
*/
#include <string>
#include "qpid/framing/amqp_framing.h"
-#include "qpid/sys/Monitor.h"
+#include <qpid/sys/Monitor.h>
#ifndef _ResponseHandler_
#define _ResponseHandler_
@@ -28,7 +28,7 @@ namespace qpid {
class ResponseHandler{
bool waiting;
qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::sys::Monitor* monitor;
+ qpid::sys::Monitor monitor;
public:
ResponseHandler();