summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ExecutionHandler.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-01 10:24:25 +0000
committerGordon Sim <gsim@apache.org>2007-10-01 10:24:25 +0000
commit01647c7581b2a9549555c2c2f306c8b072b571a2 (patch)
treebfe73f8ddb1b606f9f7b5f89db7ae1e791913c2d /cpp/src/qpid/client/ExecutionHandler.cpp
parentd7bc99dfca05ce5eb8029282c7d09776af2a0f2c (diff)
downloadqpid-python-01647c7581b2a9549555c2c2f306c8b072b571a2.tar.gz
Make ExecutionHandler threadsafe for calls that can be made by application threads.
Added generic listener for completion changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp43
1 files changed, 32 insertions, 11 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 7e4926bc25..25813dd623 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -29,6 +29,7 @@
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
+using qpid::sys::Mutex;
bool isMessageMethod(AMQMethodBody* method)
{
@@ -81,9 +82,12 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
SequenceNumber mark(cumulative);
- outgoingCompletionStatus.update(mark, range);
+ {
+ Mutex::ScopedLock l(lock);
+ outgoingCompletionStatus.update(mark, range);
+ }
+ if (completionListener) completionListener();
completion.completed(outgoingCompletionStatus.mark);
-
//TODO: signal listeners of early notification?
}
}
@@ -111,6 +115,7 @@ void ExecutionHandler::sync()
void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
{
+ Mutex::ScopedLock l(lock);
if (point > outgoingCompletionStatus.mark) {
sendFlushRequest();
}
@@ -118,12 +123,14 @@ void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
void ExecutionHandler::sendFlushRequest()
{
+ Mutex::ScopedLock l(lock);
AMQFrame frame(0, ExecutionFlushBody());
out(frame);
}
void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
+ Mutex::ScopedLock l(lock);
if (point > outgoingCompletionStatus.mark) {
sendSyncRequest();
}
@@ -132,17 +139,21 @@ void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
void ExecutionHandler::sendSyncRequest()
{
+ Mutex::ScopedLock l(lock);
AMQFrame frame(0, ExecutionSyncBody());
out(frame);
}
void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
{
- if (id > incomingCompletionStatus.mark) {
- if (cumulative) {
- incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
- } else {
- incomingCompletionStatus.update(id, id);
+ {
+ Mutex::ScopedLock l(lock);
+ if (id > incomingCompletionStatus.mark) {
+ if (cumulative) {
+ incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
+ } else {
+ incomingCompletionStatus.update(id, id);
+ }
}
}
if (send) {
@@ -153,15 +164,17 @@ void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool
void ExecutionHandler::sendCompletion()
{
+ Mutex::ScopedLock l(lock);
SequenceNumberSet range;
incomingCompletionStatus.collectRanges(range);
AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range));
out(frame);
}
-SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener)
{
- return send(command, l, false);
+ Mutex::ScopedLock l(lock);
+ return send(command, listener, false);
}
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
@@ -179,9 +192,10 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker:
}
SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
- CompletionTracker::ResultListener l)
+ CompletionTracker::ResultListener listener)
{
- SequenceNumber id = send(command, l, true);
+ Mutex::ScopedLock l(lock);
+ SequenceNumber id = send(command, listener, true);
sendContent(content);
return id;
}
@@ -227,10 +241,17 @@ void ExecutionHandler::sendContent(const MethodContent& content)
bool ExecutionHandler::isComplete(const SequenceNumber& id)
{
+ Mutex::ScopedLock l(lock);
return outgoingCompletionStatus.covers(id);
}
bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
{
+ Mutex::ScopedLock l(lock);
return outgoingCompletionStatus.mark >= id;
}
+
+void ExecutionHandler::setCompletionListener(boost::function<void()> l)
+{
+ completionListener = l;
+}