summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp54
1 files changed, 39 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 37a9e9b4af..bdd5f33601 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -49,8 +51,9 @@
namespace qpid {
namespace broker {
-using std::mem_fun_ref;
+using namespace std;
using boost::intrusive_ptr;
+using boost::bind;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -631,13 +634,27 @@ void SemanticState::ConsumerImpl::notify()
}
-void SemanticState::accepted(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
+// Test that a DeliveryRecord's ID is in a sequence set and some other
+// predicate on DeliveryRecord holds.
+template <class Predicate> struct IsInSequenceSetAnd {
+ IsInSequenceSet isInSet;
+ Predicate predicate;
+ IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {}
+ bool operator()(DeliveryRecord& dr) {
+ return isInSet(dr.getId()) && predicate(dr);
+ }
+};
+
+template<class Predicate> IsInSequenceSetAnd<Predicate>
+isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
+ return IsInSequenceSetAnd<Predicate>(s,p);
+}
+
+void SemanticState::accepted(const SequenceSet& commands) {
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.add(first, last);
+ accumulatedAck.add(commands);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
@@ -649,21 +666,28 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
//removed from the record
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::setEnded, _1)));
+ unacked.erase(removed, unacked.end());
}
} else {
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::accept, _1,
+ (TransactionContext*) 0)));
+ unacked.erase(removed, unacked.end());
}
}
-void SemanticState::completed(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
-
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
- unacked.erase(removed, range.end);
+void SemanticState::completed(const SequenceSet& commands) {
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&SemanticState::complete, this, _1)));
+ unacked.erase(removed, unacked.end());
requestDispatch();
}