summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-11-08 15:55:41 +0000
committerGordon Sim <gsim@apache.org>2007-11-08 15:55:41 +0000
commit3ca5d076d75d1b9b67694ea6599bc9b9d8598858 (patch)
tree738100e6d2759432fe335f1db2fa2bf636918cd0
parent6e1a188f6615cf9a3e302bb5821042290271398c (diff)
downloadqpid-python-3ca5d076d75d1b9b67694ea6599bc9b9d8598858.tar.gz
Exception handling for dispatch functor
Don't dequeue no-ack messages unless they have been acquired git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@593212 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h10
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
3 files changed, 17 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index a250009c77..1484fe464e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -556,3 +556,18 @@ bool Queue::hasExclusiveConsumer() const
{
return exclusive;
}
+
+void Queue::DispatchFunctor::operator()()
+{
+ try {
+ if (consumer && !consumer->preAcquires()) {
+ queue.serviceBrowser(consumer);
+ }else{
+ queue.dispatch();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Exception on dispatch: " << e.what());
+ }
+
+ if (sync) sync->completed();
+}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index e554c1011a..20eb7c0800 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -71,15 +71,7 @@ namespace qpid {
DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {}
DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {}
- void operator()()
- {
- if (consumer && !consumer->preAcquires()) {
- queue.serviceBrowser(consumer);
- }else{
- queue.dispatch();
- }
- if (sync) sync->completed();
- }
+ void operator()();
};
const string name;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 8651b9034c..d844cc5086 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -280,7 +280,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->deliveryAdapter.deliver(msg.payload, token);
if (windowing || ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
- } else if (!ackExpected) {
+ } else if (acquire && !ackExpected) {
queue->dequeue(0, msg.payload);
}
}