summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp87
1 files changed, 51 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1d49e08eb0..09f5b8ce98 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -346,38 +346,40 @@ void SemanticState::ackRange(DeliveryId first, DeliveryId last)
void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
- } else {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+ {
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator start = cumulative ? unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
+ if (cumulative || first != last) {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ } else {
+ //just acked single element (move end past it)
+ ++end;
}
- } else {
- for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(start, end);
- }
+
+ for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
+
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(start, end);
+ }
+ }//end of lock scope for delivery lock (TODO this is ugly, make it prettier)
//if the prefetch limit had previously been reached, or credit
//had expired in windowing mode there may be messages that can
@@ -525,12 +527,10 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
void SemanticState::ConsumerImpl::flush()
{
//need to prevent delivery after requestDispatch returns but
- //before credit is reduced to zero; TODO: come up with better
- //implementation of flush.
- Mutex::ScopedLock l(lock);
- queue->requestDispatch(this, true);
- byteCredit = 0;
- msgCredit = 0;
+ //before credit is reduced to zero
+ FlushCompletion completion(*this);
+ queue->flush(completion);
+ completion.wait();
}
void SemanticState::ConsumerImpl::stop()
@@ -599,4 +599,19 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
unacked.erase(range.start, range.end);
}
+
+void SemanticState::FlushCompletion::wait()
+{
+ Monitor::ScopedLock locker(lock);
+ while (!complete) lock.wait();
+}
+
+void SemanticState::FlushCompletion::completed()
+{
+ Monitor::ScopedLock locker(lock);
+ consumer.stop();
+ complete = true;
+ lock.notifyAll();
+}
+
}} // namespace qpid::broker