diff options
| author | Gordon Sim <gsim@apache.org> | 2007-10-01 10:24:25 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-10-01 10:24:25 +0000 |
| commit | 01647c7581b2a9549555c2c2f306c8b072b571a2 (patch) | |
| tree | bfe73f8ddb1b606f9f7b5f89db7ae1e791913c2d /cpp/src | |
| parent | d7bc99dfca05ce5eb8029282c7d09776af2a0f2c (diff) | |
| download | qpid-python-01647c7581b2a9549555c2c2f306c8b072b571a2.tar.gz | |
Make ExecutionHandler threadsafe for calls that can be made by application threads.
Added generic listener for completion changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/client/CompletionTracker.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Execution.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.h | 2 |
5 files changed, 40 insertions, 14 deletions
diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h index 05cdc45c9f..b2697f399f 100644 --- a/cpp/src/qpid/client/CompletionTracker.h +++ b/cpp/src/qpid/client/CompletionTracker.h @@ -34,8 +34,7 @@ namespace client { class CompletionTracker { public: - //typedef boost::function<void()> CompletionListener; - typedef boost::function0<void> CompletionListener; + typedef boost::function<void()> CompletionListener; typedef boost::function<void(const std::string&)> ResultListener; CompletionTracker(); diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index 9caac45790..66a720a699 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -37,6 +37,7 @@ public: virtual Demux& getDemux() = 0; virtual bool isComplete(const framing::SequenceNumber& id) = 0; virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0; + virtual void setCompletionListener(boost::function<void()>) = 0; }; }} diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 7e4926bc25..25813dd623 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -29,6 +29,7 @@ using namespace qpid::client; using namespace qpid::framing; using namespace boost; +using qpid::sys::Mutex; bool isMessageMethod(AMQMethodBody* method) { @@ -81,9 +82,12 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { SequenceNumber mark(cumulative); - outgoingCompletionStatus.update(mark, range); + { + Mutex::ScopedLock l(lock); + outgoingCompletionStatus.update(mark, range); + } + if (completionListener) completionListener(); completion.completed(outgoingCompletionStatus.mark); - //TODO: signal listeners of early notification? } } @@ -111,6 +115,7 @@ void ExecutionHandler::sync() void ExecutionHandler::flushTo(const framing::SequenceNumber& point) { + Mutex::ScopedLock l(lock); if (point > outgoingCompletionStatus.mark) { sendFlushRequest(); } @@ -118,12 +123,14 @@ void ExecutionHandler::flushTo(const framing::SequenceNumber& point) void ExecutionHandler::sendFlushRequest() { + Mutex::ScopedLock l(lock); AMQFrame frame(0, ExecutionFlushBody()); out(frame); } void ExecutionHandler::syncTo(const framing::SequenceNumber& point) { + Mutex::ScopedLock l(lock); if (point > outgoingCompletionStatus.mark) { sendSyncRequest(); } @@ -132,17 +139,21 @@ void ExecutionHandler::syncTo(const framing::SequenceNumber& point) void ExecutionHandler::sendSyncRequest() { + Mutex::ScopedLock l(lock); AMQFrame frame(0, ExecutionSyncBody()); out(frame); } void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) { - if (id > incomingCompletionStatus.mark) { - if (cumulative) { - incomingCompletionStatus.update(incomingCompletionStatus.mark, id); - } else { - incomingCompletionStatus.update(id, id); + { + Mutex::ScopedLock l(lock); + if (id > incomingCompletionStatus.mark) { + if (cumulative) { + incomingCompletionStatus.update(incomingCompletionStatus.mark, id); + } else { + incomingCompletionStatus.update(id, id); + } } } if (send) { @@ -153,15 +164,17 @@ void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool void ExecutionHandler::sendCompletion() { + Mutex::ScopedLock l(lock); SequenceNumberSet range; incomingCompletionStatus.collectRanges(range); AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range)); out(frame); } -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) { - return send(command, l, false); + Mutex::ScopedLock l(lock); + return send(command, listener, false); } SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) @@ -179,9 +192,10 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker: } SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, - CompletionTracker::ResultListener l) + CompletionTracker::ResultListener listener) { - SequenceNumber id = send(command, l, true); + Mutex::ScopedLock l(lock); + SequenceNumber id = send(command, listener, true); sendContent(content); return id; } @@ -227,10 +241,17 @@ void ExecutionHandler::sendContent(const MethodContent& content) bool ExecutionHandler::isComplete(const SequenceNumber& id) { + Mutex::ScopedLock l(lock); return outgoingCompletionStatus.covers(id); } bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) { + Mutex::ScopedLock l(lock); return outgoingCompletionStatus.mark >= id; } + +void ExecutionHandler::setCompletionListener(boost::function<void()> l) +{ + completionListener = l; +} diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 427c39b61f..7205983cf7 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -27,6 +27,7 @@ #include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Mutex.h" #include "ChainableFrameHandler.h" #include "CompletionTracker.h" #include "Correlator.h" @@ -49,8 +50,10 @@ class ExecutionHandler : Correlator correlation; CompletionTracker completion; Demux demux; + sys::Mutex lock; framing::ProtocolVersion version; uint64_t maxFrameSize; + boost::function<void()> completionListener; void complete(uint32_t mark, const framing::SequenceNumberSet& range); void flush(); @@ -90,6 +93,8 @@ public: Correlator& getCorrelator() { return correlation; } CompletionTracker& getCompletionTracker() { return completion; } Demux& getDemux() { return demux; } + + void setCompletionListener(boost::function<void()>); }; }} diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 74b15a11da..ac109e1f5c 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -56,7 +56,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler SessionHandler l2; ExecutionHandler l3; framing::Uuid uuid; - bool sync; + volatile bool sync; Reason reason; protected: |
