diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 61 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 24 |
5 files changed, 107 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 65c1f0a1fa..31ccdd8260 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -171,6 +171,24 @@ void DeliveryRecord::cancel(const std::string& cancelledTag) cancelled = true; } +AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last) +{ + ack_iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first)); + ack_iterator end = start; + + if (start != records.end()) { + if (first == last) { + //just acked single element (move end past it) + ++end; + } else { + //need to find end (position it just after the last record in range) + end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last)); + } + } + return AckRange(start, end); +} + + namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 6be6a9249a..f6ffb64697 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -34,11 +34,24 @@ namespace qpid { namespace broker { class SemanticState; +class DeliveryRecord; + +typedef std::list<DeliveryRecord> DeliveryRecords; +typedef std::list<DeliveryRecord>::iterator ack_iterator; + +struct AckRange +{ + ack_iterator start; + ack_iterator end; + AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {} +}; + /** * Record of a delivery for which an ack is outstanding. */ -class DeliveryRecord{ +class DeliveryRecord +{ QueuedMessage msg; mutable Queue::shared_ptr queue; const std::string tag; @@ -91,24 +104,14 @@ class DeliveryRecord{ void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize); void setId(DeliveryId _id) { id = _id; } + static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last); const QueuedMessage& getMessage() const { return msg; } framing::SequenceNumber getId() const { return id; } Queue::shared_ptr getQueue() const { return queue; } - friend bool operator<(const DeliveryRecord&, const DeliveryRecord&); friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; -typedef std::list<DeliveryRecord> DeliveryRecords; -typedef std::list<DeliveryRecord>::iterator ack_iterator; - -struct AckRange -{ - ack_iterator start; - ack_iterator end; - AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {} -}; - struct AcquireFunctor { DeliveryIds& results; @@ -120,7 +123,6 @@ struct AcquireFunctor record.acquire(results); } }; - } } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 42ef8030a6..76eacb8808 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -549,20 +549,8 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ - ack_iterator start = find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first)); - ack_iterator end = start; - - if (start != unacked.end()) { - if (first == last) { - //just acked single element (move end past it) - ++end; - } else { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last)); - } - } - return AckRange(start, end); +{ + return DeliveryRecord::findRange(unacked, first, last); } void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired) diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 82acf61cd1..6d307bf735 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -26,19 +26,60 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; using qpid::framing::SequenceSet; +using qpid::framing::SequenceNumber; + +TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {} + +void TxAccept::RangeOp::prepare(TransactionContext* ctxt) +{ + for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt)); +} + +void TxAccept::RangeOp::commit() +{ + for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1)); +} + +TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {} + +void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end) +{ + ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end))); +} + +void TxAccept::RangeOps::prepare(TransactionContext* ctxt) +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt)); +} + +void TxAccept::RangeOps::commit() +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); + //now remove if isRedundant(): + if (!ranges.empty()) { + ack_iterator i = ranges.front().range.start; + ack_iterator end = ranges.back().range.end; + while (i != end) { + if (i->isRedundant()) { + i = unacked.erase(i); + } else { + i++; + } + } + } +} TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : - acked(_acked), unacked(_unacked) {} + acked(_acked), unacked(_unacked), ops(unacked) +{ + //populate the ops + acked.for_each(ops); +} bool TxAccept::prepare(TransactionContext* ctxt) throw() { try{ - //dequeue messages from their respective queues: - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) { - i->dequeue(ctxt); - } - } + ops.prepare(ctxt); return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -51,11 +92,7 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() void TxAccept::commit() throw() { - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) i->setEnded(); - } - - unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + ops.commit(); } void TxAccept::rollback() throw() {} diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h index 9548c50c2a..5474327f7c 100644 --- a/cpp/src/qpid/broker/TxAccept.h +++ b/cpp/src/qpid/broker/TxAccept.h @@ -34,9 +34,31 @@ namespace qpid { * Defines the transactional behaviour for accepts received by * a transactional channel. */ - class TxAccept : public TxOp{ + class TxAccept : public TxOp { + struct RangeOp + { + AckRange range; + + RangeOp(const AckRange& r); + void prepare(TransactionContext* ctxt); + void commit(); + }; + + struct RangeOps + { + std::vector<RangeOp> ranges; + DeliveryRecords& unacked; + + RangeOps(DeliveryRecords& u); + + void operator()(framing::SequenceNumber start, framing::SequenceNumber end); + void prepare(TransactionContext* ctxt); + void commit(); + }; + framing::SequenceSet& acked; std::list<DeliveryRecord>& unacked; + RangeOps ops; public: /** |
