diff options
| author | Gordon Sim <gsim@apache.org> | 2007-10-01 10:24:25 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-10-01 10:24:25 +0000 |
| commit | 01647c7581b2a9549555c2c2f306c8b072b571a2 (patch) | |
| tree | bfe73f8ddb1b606f9f7b5f89db7ae1e791913c2d /cpp/src/qpid/client/ExecutionHandler.cpp | |
| parent | d7bc99dfca05ce5eb8029282c7d09776af2a0f2c (diff) | |
| download | qpid-python-01647c7581b2a9549555c2c2f306c8b072b571a2.tar.gz | |
Make ExecutionHandler threadsafe for calls that can be made by application threads.
Added generic listener for completion changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ExecutionHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 43 |
1 files changed, 32 insertions, 11 deletions
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 7e4926bc25..25813dd623 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -29,6 +29,7 @@ using namespace qpid::client; using namespace qpid::framing; using namespace boost; +using qpid::sys::Mutex; bool isMessageMethod(AMQMethodBody* method) { @@ -81,9 +82,12 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { SequenceNumber mark(cumulative); - outgoingCompletionStatus.update(mark, range); + { + Mutex::ScopedLock l(lock); + outgoingCompletionStatus.update(mark, range); + } + if (completionListener) completionListener(); completion.completed(outgoingCompletionStatus.mark); - //TODO: signal listeners of early notification? } } @@ -111,6 +115,7 @@ void ExecutionHandler::sync() void ExecutionHandler::flushTo(const framing::SequenceNumber& point) { + Mutex::ScopedLock l(lock); if (point > outgoingCompletionStatus.mark) { sendFlushRequest(); } @@ -118,12 +123,14 @@ void ExecutionHandler::flushTo(const framing::SequenceNumber& point) void ExecutionHandler::sendFlushRequest() { + Mutex::ScopedLock l(lock); AMQFrame frame(0, ExecutionFlushBody()); out(frame); } void ExecutionHandler::syncTo(const framing::SequenceNumber& point) { + Mutex::ScopedLock l(lock); if (point > outgoingCompletionStatus.mark) { sendSyncRequest(); } @@ -132,17 +139,21 @@ void ExecutionHandler::syncTo(const framing::SequenceNumber& point) void ExecutionHandler::sendSyncRequest() { + Mutex::ScopedLock l(lock); AMQFrame frame(0, ExecutionSyncBody()); out(frame); } void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) { - if (id > incomingCompletionStatus.mark) { - if (cumulative) { - incomingCompletionStatus.update(incomingCompletionStatus.mark, id); - } else { - incomingCompletionStatus.update(id, id); + { + Mutex::ScopedLock l(lock); + if (id > incomingCompletionStatus.mark) { + if (cumulative) { + incomingCompletionStatus.update(incomingCompletionStatus.mark, id); + } else { + incomingCompletionStatus.update(id, id); + } } } if (send) { @@ -153,15 +164,17 @@ void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool void ExecutionHandler::sendCompletion() { + Mutex::ScopedLock l(lock); SequenceNumberSet range; incomingCompletionStatus.collectRanges(range); AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range)); out(frame); } -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) { - return send(command, l, false); + Mutex::ScopedLock l(lock); + return send(command, listener, false); } SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) @@ -179,9 +192,10 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker: } SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, - CompletionTracker::ResultListener l) + CompletionTracker::ResultListener listener) { - SequenceNumber id = send(command, l, true); + Mutex::ScopedLock l(lock); + SequenceNumber id = send(command, listener, true); sendContent(content); return id; } @@ -227,10 +241,17 @@ void ExecutionHandler::sendContent(const MethodContent& content) bool ExecutionHandler::isComplete(const SequenceNumber& id) { + Mutex::ScopedLock l(lock); return outgoingCompletionStatus.covers(id); } bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) { + Mutex::ScopedLock l(lock); return outgoingCompletionStatus.mark >= id; } + +void ExecutionHandler::setCompletionListener(boost::function<void()> l) +{ + completionListener = l; +} |
