diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 87 |
1 files changed, 51 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1d49e08eb0..09f5b8ce98 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -346,38 +346,40 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last) void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator start = cumulative ? unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); - ack_iterator end = start; - - if (cumulative || first != last) { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); - } else { - //just acked single element (move end past it) - ++end; - } - - for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); - - if (txBuffer.get()) { - //in transactional mode, don't dequeue or remove, just - //maintain set of acknowledged messages: - accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + { + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator start = cumulative ? unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; - if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from - //unacked and record it against that transaction - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + if (cumulative || first != last) { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } else { + //just acked single element (move end past it) + ++end; } - } else { - for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(start, end); - } + + for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(start, end); + } + }//end of lock scope for delivery lock (TODO this is ugly, make it prettier) //if the prefetch limit had previously been reached, or credit //had expired in windowing mode there may be messages that can @@ -525,12 +527,10 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) void SemanticState::ConsumerImpl::flush() { //need to prevent delivery after requestDispatch returns but - //before credit is reduced to zero; TODO: come up with better - //implementation of flush. - Mutex::ScopedLock l(lock); - queue->requestDispatch(this, true); - byteCredit = 0; - msgCredit = 0; + //before credit is reduced to zero + FlushCompletion completion(*this); + queue->flush(completion); + completion.wait(); } void SemanticState::ConsumerImpl::stop() @@ -599,4 +599,19 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) unacked.erase(range.start, range.end); } + +void SemanticState::FlushCompletion::wait() +{ + Monitor::ScopedLock locker(lock); + while (!complete) lock.wait(); +} + +void SemanticState::FlushCompletion::completed() +{ + Monitor::ScopedLock locker(lock); + consumer.stop(); + complete = true; + lock.notifyAll(); +} + }} // namespace qpid::broker |
