summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/PersistableMessage.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.h')
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h78
1 files changed, 42 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 06fc59107e..30d8d37409 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -26,6 +26,7 @@
#include <boost/shared_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Monitor.h"
namespace qpid {
namespace broker {
@@ -36,31 +37,23 @@ namespace broker {
*/
class PersistableMessage : public Persistable
{
+ sys::Monitor asyncEnqueueLock;
-
- /**
- * Needs to be set false on Message construction, then
- * set once the broker has taken responsibility for the
- * message. For transient, once enqueued, for durable, once
- * stored.
- */
- bool enqueueCompleted;
-
/**
- * Counts the number of times the message has been processed
- * async - thus when it == 0 the broker knows it has ownership
- * -> an async store can increment this counter if it writes a
- * copy to each queue, and case use this counter to know when all
- * the write are complete
- */
- int asyncCounter;
+ * Tracks the number of outstanding asynchronous enqueue
+ * operations. When the message is enqueued asynchronously the
+ * count is incremented; when that enqueue completes it is
+ * decremented. Thus when it is 0, there are no outstanding
+ * enqueues.
+ */
+ int asyncEnqueueCounter;
/**
- * Needs to be set false on Message construction, then
- * set once the dequeueis complete, it gets set
- * For transient, once dequeued, for durable, once
- * dequeue record has been stored.
- */
+ * Needs to be set false on Message construction, then
+ * set once the dequeueis complete, it gets set
+ * For transient, once dequeued, for durable, once
+ * dequeue record has been stored.
+ */
bool dequeueCompleted;
public:
@@ -73,23 +66,36 @@ public:
virtual ~PersistableMessage() {};
- PersistableMessage():
- enqueueCompleted(false),
- asyncCounter(0),
- dequeueCompleted(false){};
+ PersistableMessage(): asyncEnqueueCounter(0), dequeueCompleted(false) {}
- inline bool isEnqueueComplete() {return enqueueCompleted;};
+ inline void waitForEnqueueComplete() {
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ while (asyncEnqueueCounter > 0) {
+ asyncEnqueueLock.wait();
+ }
+ }
+
+ inline bool isEnqueueComplete() {
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ return asyncEnqueueCounter == 0;
+ }
+
inline void enqueueComplete() {
- if (asyncCounter<=1) {
- asyncCounter =0;
- enqueueCompleted = true;
- }else{
- asyncCounter--;
- }
- };
- inline void enqueueAsync() {asyncCounter++;};
- inline bool isDequeueComplete() {return dequeueCompleted;};
- inline void dequeueComplete() {dequeueCompleted = true;};
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ if (asyncEnqueueCounter > 0) {
+ if (--asyncEnqueueCounter == 0) {
+ asyncEnqueueLock.notify();
+ }
+ }
+ }
+
+ inline void enqueueAsync() {
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ asyncEnqueueCounter++;
+ }
+
+ inline bool isDequeueComplete() { return dequeueCompleted; }
+ inline void dequeueComplete() { dequeueCompleted = true; }
};