summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp63
1 files changed, 3 insertions, 60 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index ad617c1bc1..bdd8edac87 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -28,7 +28,6 @@
#include "Queue.h"
#include "SessionContext.h"
#include "TxAccept.h"
-#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -63,7 +62,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
prefetchCount(0),
tagGenerator("sgen"),
dtxSelected(false),
- accumulatedAck(0),
flowActive(true),
outputTasks(ss)
{
@@ -116,14 +114,12 @@ void SemanticState::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
+void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- TxOp::shared_ptr txAck(completeOnCommit ?
- static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) :
- static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
+ TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
@@ -377,59 +373,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
}
-void SemanticState::ackCumulative(DeliveryId id)
-{
- ack(id, id, true);
-}
-
-void SemanticState::ackRange(DeliveryId first, DeliveryId last)
-{
- ack(first, last, false);
-}
-
-void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
- {
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last));
- } else if (start != unacked.end()) {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
-
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, copy the relevant slice from
- //unacked and record it against that transaction:
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- //then remove that slice from the unacked record:
- unacked.remove_if(boost::bind(&DeliveryRecord::coveredBy, _1, &accumulatedAck));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- }
- } else {
- for_each(start, end, boost::bind(&DeliveryRecord::dequeue, _1, (TransactionContext*) 0));
- unacked.erase(start, end);
- }
- }//end of lock scope for delivery lock (TODO this is ugly, make it prettier)
-
- //if the prefetch limit had previously been reached, or credit
- //had expired in windowing mode there may be messages that can
- //be now be delivered
- requestDispatch();
-}
-
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
@@ -667,7 +610,7 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
+ accumulatedAck.add(first, last);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from