summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-01 10:24:25 +0000
committerGordon Sim <gsim@apache.org>2007-10-01 10:24:25 +0000
commit01647c7581b2a9549555c2c2f306c8b072b571a2 (patch)
treebfe73f8ddb1b606f9f7b5f89db7ae1e791913c2d /cpp/src
parentd7bc99dfca05ce5eb8029282c7d09776af2a0f2c (diff)
downloadqpid-python-01647c7581b2a9549555c2c2f306c8b072b571a2.tar.gz
Make ExecutionHandler threadsafe for calls that can be made by application threads.
Added generic listener for completion changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/CompletionTracker.h3
-rw-r--r--cpp/src/qpid/client/Execution.h1
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp43
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h5
-rw-r--r--cpp/src/qpid/client/SessionCore.h2
5 files changed, 40 insertions, 14 deletions
diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h
index 05cdc45c9f..b2697f399f 100644
--- a/cpp/src/qpid/client/CompletionTracker.h
+++ b/cpp/src/qpid/client/CompletionTracker.h
@@ -34,8 +34,7 @@ namespace client {
class CompletionTracker
{
public:
- //typedef boost::function<void()> CompletionListener;
- typedef boost::function0<void> CompletionListener;
+ typedef boost::function<void()> CompletionListener;
typedef boost::function<void(const std::string&)> ResultListener;
CompletionTracker();
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
index 9caac45790..66a720a699 100644
--- a/cpp/src/qpid/client/Execution.h
+++ b/cpp/src/qpid/client/Execution.h
@@ -37,6 +37,7 @@ public:
virtual Demux& getDemux() = 0;
virtual bool isComplete(const framing::SequenceNumber& id) = 0;
virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
+ virtual void setCompletionListener(boost::function<void()>) = 0;
};
}}
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 7e4926bc25..25813dd623 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -29,6 +29,7 @@
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
+using qpid::sys::Mutex;
bool isMessageMethod(AMQMethodBody* method)
{
@@ -81,9 +82,12 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
SequenceNumber mark(cumulative);
- outgoingCompletionStatus.update(mark, range);
+ {
+ Mutex::ScopedLock l(lock);
+ outgoingCompletionStatus.update(mark, range);
+ }
+ if (completionListener) completionListener();
completion.completed(outgoingCompletionStatus.mark);
-
//TODO: signal listeners of early notification?
}
}
@@ -111,6 +115,7 @@ void ExecutionHandler::sync()
void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
{
+ Mutex::ScopedLock l(lock);
if (point > outgoingCompletionStatus.mark) {
sendFlushRequest();
}
@@ -118,12 +123,14 @@ void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
void ExecutionHandler::sendFlushRequest()
{
+ Mutex::ScopedLock l(lock);
AMQFrame frame(0, ExecutionFlushBody());
out(frame);
}
void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
+ Mutex::ScopedLock l(lock);
if (point > outgoingCompletionStatus.mark) {
sendSyncRequest();
}
@@ -132,17 +139,21 @@ void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
void ExecutionHandler::sendSyncRequest()
{
+ Mutex::ScopedLock l(lock);
AMQFrame frame(0, ExecutionSyncBody());
out(frame);
}
void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
{
- if (id > incomingCompletionStatus.mark) {
- if (cumulative) {
- incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
- } else {
- incomingCompletionStatus.update(id, id);
+ {
+ Mutex::ScopedLock l(lock);
+ if (id > incomingCompletionStatus.mark) {
+ if (cumulative) {
+ incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
+ } else {
+ incomingCompletionStatus.update(id, id);
+ }
}
}
if (send) {
@@ -153,15 +164,17 @@ void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool
void ExecutionHandler::sendCompletion()
{
+ Mutex::ScopedLock l(lock);
SequenceNumberSet range;
incomingCompletionStatus.collectRanges(range);
AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range));
out(frame);
}
-SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener)
{
- return send(command, l, false);
+ Mutex::ScopedLock l(lock);
+ return send(command, listener, false);
}
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
@@ -179,9 +192,10 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker:
}
SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
- CompletionTracker::ResultListener l)
+ CompletionTracker::ResultListener listener)
{
- SequenceNumber id = send(command, l, true);
+ Mutex::ScopedLock l(lock);
+ SequenceNumber id = send(command, listener, true);
sendContent(content);
return id;
}
@@ -227,10 +241,17 @@ void ExecutionHandler::sendContent(const MethodContent& content)
bool ExecutionHandler::isComplete(const SequenceNumber& id)
{
+ Mutex::ScopedLock l(lock);
return outgoingCompletionStatus.covers(id);
}
bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
{
+ Mutex::ScopedLock l(lock);
return outgoingCompletionStatus.mark >= id;
}
+
+void ExecutionHandler::setCompletionListener(boost::function<void()> l)
+{
+ completionListener = l;
+}
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 427c39b61f..7205983cf7 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -27,6 +27,7 @@
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
@@ -49,8 +50,10 @@ class ExecutionHandler :
Correlator correlation;
CompletionTracker completion;
Demux demux;
+ sys::Mutex lock;
framing::ProtocolVersion version;
uint64_t maxFrameSize;
+ boost::function<void()> completionListener;
void complete(uint32_t mark, const framing::SequenceNumberSet& range);
void flush();
@@ -90,6 +93,8 @@ public:
Correlator& getCorrelator() { return correlation; }
CompletionTracker& getCompletionTracker() { return completion; }
Demux& getDemux() { return demux; }
+
+ void setCompletionListener(boost::function<void()>);
};
}}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 74b15a11da..ac109e1f5c 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -56,7 +56,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler
SessionHandler l2;
ExecutionHandler l3;
framing::Uuid uuid;
- bool sync;
+ volatile bool sync;
Reason reason;
protected: