summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/TxAccept.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/TxAccept.cpp')
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp61
1 files changed, 49 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 82acf61cd1..6d307bf735 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -26,19 +26,60 @@ using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
using qpid::framing::SequenceSet;
+using qpid::framing::SequenceNumber;
+
+TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {}
+
+void TxAccept::RangeOp::prepare(TransactionContext* ctxt)
+{
+ for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt));
+}
+
+void TxAccept::RangeOp::commit()
+{
+ for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
+}
+
+TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {}
+
+void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end)
+{
+ ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end)));
+}
+
+void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
+{
+ for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
+}
+
+void TxAccept::RangeOps::commit()
+{
+ for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
+ //now remove if isRedundant():
+ if (!ranges.empty()) {
+ ack_iterator i = ranges.front().range.start;
+ ack_iterator end = ranges.back().range.end;
+ while (i != end) {
+ if (i->isRedundant()) {
+ i = unacked.erase(i);
+ } else {
+ i++;
+ }
+ }
+ }
+}
TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
- acked(_acked), unacked(_unacked) {}
+ acked(_acked), unacked(_unacked), ops(unacked)
+{
+ //populate the ops
+ acked.for_each(ops);
+}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
{
try{
- //dequeue messages from their respective queues:
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) {
- i->dequeue(ctxt);
- }
- }
+ ops.prepare(ctxt);
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -51,11 +92,7 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw()
void TxAccept::commit() throw()
{
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) i->setEnded();
- }
-
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+ ops.commit();
}
void TxAccept::rollback() throw() {}