summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-08-12 18:29:03 +0000
committerAlan Conway <aconway@apache.org>2009-08-12 18:29:03 +0000
commit61aa0a45a6d9311e72da05159377046e9654cf44 (patch)
treeac4d1b50b567fbafeb8f31a2327488100741ce8e /cpp/src/qpid/broker
parent01054223a90825cc65388397f5812b84eeba09a4 (diff)
downloadqpid-python-61aa0a45a6d9311e72da05159377046e9654cf44.tar.gz
Optimized handling of accepts and completions.
SemanticState::accept/completed now make a single pass through the SequenceSet of commands and the unacked DeliveryRecord list in parallel, rather than doing a pass through unacked for every range in the SequenceSet. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@803655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp54
-rw-r--r--cpp/src/qpid/broker/SemanticState.h5
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp7
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp3
4 files changed, 44 insertions, 25 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 37a9e9b4af..bdd5f33601 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -49,8 +51,9 @@
namespace qpid {
namespace broker {
-using std::mem_fun_ref;
+using namespace std;
using boost::intrusive_ptr;
+using boost::bind;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -631,13 +634,27 @@ void SemanticState::ConsumerImpl::notify()
}
-void SemanticState::accepted(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
+// Test that a DeliveryRecord's ID is in a sequence set and some other
+// predicate on DeliveryRecord holds.
+template <class Predicate> struct IsInSequenceSetAnd {
+ IsInSequenceSet isInSet;
+ Predicate predicate;
+ IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {}
+ bool operator()(DeliveryRecord& dr) {
+ return isInSet(dr.getId()) && predicate(dr);
+ }
+};
+
+template<class Predicate> IsInSequenceSetAnd<Predicate>
+isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
+ return IsInSequenceSetAnd<Predicate>(s,p);
+}
+
+void SemanticState::accepted(const SequenceSet& commands) {
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.add(first, last);
+ accumulatedAck.add(commands);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
@@ -649,21 +666,28 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
//removed from the record
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::setEnded, _1)));
+ unacked.erase(removed, unacked.end());
}
} else {
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::accept, _1,
+ (TransactionContext*) 0)));
+ unacked.erase(removed, unacked.end());
}
}
-void SemanticState::completed(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
-
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
- unacked.erase(removed, range.end);
+void SemanticState::completed(const SequenceSet& commands) {
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&SemanticState::complete, this, _1)));
+ unacked.erase(removed, unacked.end());
requestDispatch();
}
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 4b87c59c68..da8383fc12 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -206,9 +206,8 @@ class SemanticState : private boost::noncopyable {
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
- //final 0-10 spec (completed and accepted are distinct):
- void completed(DeliveryId deliveryTag, DeliveryId endTag);
- void accepted(DeliveryId deliveryTag, DeliveryId endTag);
+ void completed(const framing::SequenceSet& commands);
+ void accepted(const framing::SequenceSet& commands);
void attached();
void detached();
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 8eed673add..b0c5e9ea00 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -429,13 +429,11 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
}
}
-
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
- acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
{}
//
@@ -547,8 +545,7 @@ void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
{
-
- commands.for_each(acceptOp);
+ state.accepted(commands);
}
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index d4e5cfaa67..4c5aaf7fc4 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -357,8 +357,7 @@ void SessionState::sendCompletion() {
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);
- for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++)
- semanticState.completed(i->first(), i->last());
+ semanticState.completed(commands);
}
void SessionState::readyToSend() {