diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 1 |
5 files changed, 20 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index bbdbf19e92..ffe743bac1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -202,7 +202,7 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - if (policy.get() && !policy->isEnqueued(msg)) return; + if (!isEnqueued(msg)) return; QueueListeners::NotificationSet copy; { @@ -691,7 +691,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { { Mutex::ScopedLock locker(messageLock); - if (policy.get() && !policy->isEnqueued(msg)) return false; + if (!isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } @@ -1019,3 +1019,8 @@ void Queue::enqueued(const QueuedMessage& m) QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } } + +bool Queue::isEnqueued(const QueuedMessage& msg) +{ + return policy.get() && policy->isEnqueued(msg); +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index de60362854..a8b775cba7 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -257,6 +257,15 @@ namespace qpid { * clustered broker. */ void enqueued(const QueuedMessage& msg); + + /** + * Test whether the specified message (identified by its + * sequence/position), is still enqueued (note this + * doesn't mean it is available for delivery as it may + * have been delievered to a subscriber who has not yet + * accepted it). + */ + bool isEnqueued(const QueuedMessage& msg); /** * Gets the next available message diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index f2904c6734..be6281b4e3 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -325,6 +325,7 @@ void Connection::deliveryRecord(const string& qname, bool completed, bool ended, bool windowing, + bool enqueued, uint32_t credit) { broker::QueuedMessage m; @@ -333,7 +334,7 @@ void Connection::deliveryRecord(const string& qname, if (acquired) { // Message is on the update queue m = getUpdateMessage(); m.queue = queue.get(); - queue->enqueued(m); //inform queue of the message + if (enqueued) queue->enqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index a0be2203e4..8e3b0ad337 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -129,6 +129,7 @@ class Connection : bool completed, bool ended, bool windowing, + bool enqueued, uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 7fdbe73926..332e74c512 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -372,6 +372,7 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { dr.isComplete(), dr.isEnded(), dr.isWindowing(), + dr.getQueue()->isEnqueued(dr.getMessage()), dr.getCredit() ); } |
