diff options
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.h')
| -rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 06fc59107e..30d8d37409 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -26,6 +26,7 @@ #include <boost/shared_ptr.hpp> #include "Persistable.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/Monitor.h" namespace qpid { namespace broker { @@ -36,31 +37,23 @@ namespace broker { */ class PersistableMessage : public Persistable { + sys::Monitor asyncEnqueueLock; - - /** - * Needs to be set false on Message construction, then - * set once the broker has taken responsibility for the - * message. For transient, once enqueued, for durable, once - * stored. - */ - bool enqueueCompleted; - /** - * Counts the number of times the message has been processed - * async - thus when it == 0 the broker knows it has ownership - * -> an async store can increment this counter if it writes a - * copy to each queue, and case use this counter to know when all - * the write are complete - */ - int asyncCounter; + * Tracks the number of outstanding asynchronous enqueue + * operations. When the message is enqueued asynchronously the + * count is incremented; when that enqueue completes it is + * decremented. Thus when it is 0, there are no outstanding + * enqueues. + */ + int asyncEnqueueCounter; /** - * Needs to be set false on Message construction, then - * set once the dequeueis complete, it gets set - * For transient, once dequeued, for durable, once - * dequeue record has been stored. - */ + * Needs to be set false on Message construction, then + * set once the dequeueis complete, it gets set + * For transient, once dequeued, for durable, once + * dequeue record has been stored. + */ bool dequeueCompleted; public: @@ -73,23 +66,36 @@ public: virtual ~PersistableMessage() {}; - PersistableMessage(): - enqueueCompleted(false), - asyncCounter(0), - dequeueCompleted(false){}; + PersistableMessage(): asyncEnqueueCounter(0), dequeueCompleted(false) {} - inline bool isEnqueueComplete() {return enqueueCompleted;}; + inline void waitForEnqueueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + while (asyncEnqueueCounter > 0) { + asyncEnqueueLock.wait(); + } + } + + inline bool isEnqueueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + return asyncEnqueueCounter == 0; + } + inline void enqueueComplete() { - if (asyncCounter<=1) { - asyncCounter =0; - enqueueCompleted = true; - }else{ - asyncCounter--; - } - }; - inline void enqueueAsync() {asyncCounter++;}; - inline bool isDequeueComplete() {return dequeueCompleted;}; - inline void dequeueComplete() {dequeueCompleted = true;}; + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + if (asyncEnqueueCounter > 0) { + if (--asyncEnqueueCounter == 0) { + asyncEnqueueLock.notify(); + } + } + } + + inline void enqueueAsync() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + asyncEnqueueCounter++; + } + + inline bool isDequeueComplete() { return dequeueCompleted; } + inline void dequeueComplete() { dequeueCompleted = true; } }; |
