diff options
Diffstat (limited to 'cpp/src/qpid/broker/TxAccept.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 61 |
1 files changed, 49 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 82acf61cd1..6d307bf735 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -26,19 +26,60 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; using qpid::framing::SequenceSet; +using qpid::framing::SequenceNumber; + +TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {} + +void TxAccept::RangeOp::prepare(TransactionContext* ctxt) +{ + for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt)); +} + +void TxAccept::RangeOp::commit() +{ + for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1)); +} + +TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {} + +void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end) +{ + ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end))); +} + +void TxAccept::RangeOps::prepare(TransactionContext* ctxt) +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt)); +} + +void TxAccept::RangeOps::commit() +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); + //now remove if isRedundant(): + if (!ranges.empty()) { + ack_iterator i = ranges.front().range.start; + ack_iterator end = ranges.back().range.end; + while (i != end) { + if (i->isRedundant()) { + i = unacked.erase(i); + } else { + i++; + } + } + } +} TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : - acked(_acked), unacked(_unacked) {} + acked(_acked), unacked(_unacked), ops(unacked) +{ + //populate the ops + acked.for_each(ops); +} bool TxAccept::prepare(TransactionContext* ctxt) throw() { try{ - //dequeue messages from their respective queues: - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) { - i->dequeue(ctxt); - } - } + ops.prepare(ctxt); return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -51,11 +92,7 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() void TxAccept::commit() throw() { - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) i->setEnded(); - } - - unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + ops.commit(); } void TxAccept::rollback() throw() {} |
