summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-06 21:17:19 +0000
committerAlan Conway <aconway@apache.org>2008-08-06 21:17:19 +0000
commit03c3f6edb2b17841053250bda46fde1054d32d67 (patch)
tree09f7c1d5ddbde595164414f1c0ba1d860675a315 /cpp/src/qpid/broker
parent31b2f1436808fc07f636dc02416386e44b23071c (diff)
downloadqpid-python-03c3f6edb2b17841053250bda46fde1054d32d67.tar.gz
- Added OutputTask::hasOutput() test.
- Cluster only sends doOutput events when hasOutput() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683416 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--cpp/src/qpid/broker/Connection.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp5
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
6 files changed, 20 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 2525fed864..ab18d1f035 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -193,6 +193,8 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions.
}
}
+bool Connection::hasOutput() { return outputTasks.hasOutput(); }
+
bool Connection::doOutput() { return doOutputFn(); }
bool Connection::doOutputImpl() {
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index ae8708861a..1367f3b9ca 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -76,6 +76,7 @@ class Connection : public sys::ConnectionInputHandler,
void received(framing::AMQFrame& frame);
void idleOut();
void idleIn();
+ bool hasOutput();
bool doOutput();
void closed();
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index ebb143a472..3b447e97f2 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -212,6 +212,11 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
}
}
+bool Queue::empty() const {
+ Mutex::ScopedLock locker(messageLock);
+ return messages.empty();
+}
+
bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
{
while (true) {
@@ -348,7 +353,6 @@ void Queue::consume(Consumer& c, bool requestExclusive){
}
}
consumerCount++;
-
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 2d238ff57d..e35b3ef7ee 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -107,7 +107,6 @@ namespace qpid {
void notify();
void removeListener(Consumer&);
- void addListener(Consumer&);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
@@ -115,6 +114,9 @@ namespace qpid {
void popAndDequeue();
public:
+ // FIXME aconway 2008-08-06: was private, verify if needed public.
+ void addListener(Consumer&);
+
virtual void notifyDurableIOComplete();
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -126,6 +128,8 @@ namespace qpid {
management::Manageable* parent = 0);
~Queue();
+ bool empty() const;
+
bool dispatch(Consumer&);
void create(const qpid::framing::FieldTable& settings);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 484a406c3b..bf034a0559 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -590,6 +590,11 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
unacked.erase(range.start, range.end);
}
+bool SemanticState::ConsumerImpl::hasOutput() {
+ queue->addListener(*this);
+ return !queue->empty();
+}
+
bool SemanticState::ConsumerImpl::doOutput()
{
//TODO: think through properly
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index a0424bf747..e03d5ec89b 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -96,6 +96,7 @@ class SemanticState : public sys::OutputTask,
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
+ bool hasOutput();
bool doOutput();
};
@@ -180,6 +181,7 @@ class SemanticState : public sys::OutputTask,
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
+ bool hasOutput() { return outputTasks.hasOutput(); }
bool doOutput() { return outputTasks.doOutput(); }
//final 0-10 spec (completed and accepted are distinct):