diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 54 |
1 files changed, 39 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 37a9e9b4af..bdd5f33601 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/TxPublish.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/IsInSequenceSet.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" @@ -49,8 +51,9 @@ namespace qpid { namespace broker { -using std::mem_fun_ref; +using namespace std; using boost::intrusive_ptr; +using boost::bind; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; @@ -631,13 +634,27 @@ void SemanticState::ConsumerImpl::notify() } -void SemanticState::accepted(DeliveryId first, DeliveryId last) -{ - AckRange range = findRange(first, last); +// Test that a DeliveryRecord's ID is in a sequence set and some other +// predicate on DeliveryRecord holds. +template <class Predicate> struct IsInSequenceSetAnd { + IsInSequenceSet isInSet; + Predicate predicate; + IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {} + bool operator()(DeliveryRecord& dr) { + return isInSet(dr.getId()) && predicate(dr); + } +}; + +template<class Predicate> IsInSequenceSetAnd<Predicate> +isInSequenceSetAnd(const SequenceSet& s, Predicate p) { + return IsInSequenceSetAnd<Predicate>(s,p); +} + +void SemanticState::accepted(const SequenceSet& commands) { if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: - accumulatedAck.add(first, last); + accumulatedAck.add(commands); if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from @@ -649,21 +666,28 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be //removed from the record - DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded)); - unacked.erase(removed, range.end); + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&DeliveryRecord::setEnded, _1))); + unacked.erase(removed, unacked.end()); } } else { - DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0)); - unacked.erase(removed, range.end); + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&DeliveryRecord::accept, _1, + (TransactionContext*) 0))); + unacked.erase(removed, unacked.end()); } } -void SemanticState::completed(DeliveryId first, DeliveryId last) -{ - AckRange range = findRange(first, last); - - DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); - unacked.erase(removed, range.end); +void SemanticState::completed(const SequenceSet& commands) { + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&SemanticState::complete, this, _1))); + unacked.erase(removed, unacked.end()); requestDispatch(); } |
