diff options
| author | Alan Conway <aconway@apache.org> | 2008-08-06 21:17:19 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-08-06 21:17:19 +0000 |
| commit | 03c3f6edb2b17841053250bda46fde1054d32d67 (patch) | |
| tree | 09f7c1d5ddbde595164414f1c0ba1d860675a315 /cpp/src/qpid/broker | |
| parent | 31b2f1436808fc07f636dc02416386e44b23071c (diff) | |
| download | qpid-python-03c3f6edb2b17841053250bda46fde1054d32d67.tar.gz | |
- Added OutputTask::hasOutput() test.
- Cluster only sends doOutput events when hasOutput()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683416 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 2 |
6 files changed, 20 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 2525fed864..ab18d1f035 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -193,6 +193,8 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions. } } +bool Connection::hasOutput() { return outputTasks.hasOutput(); } + bool Connection::doOutput() { return doOutputFn(); } bool Connection::doOutputImpl() { diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index ae8708861a..1367f3b9ca 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -76,6 +76,7 @@ class Connection : public sys::ConnectionInputHandler, void received(framing::AMQFrame& frame); void idleOut(); void idleIn(); + bool hasOutput(); bool doOutput(); void closed(); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index ebb143a472..3b447e97f2 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -212,6 +212,11 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer& c) } } +bool Queue::empty() const { + Mutex::ScopedLock locker(messageLock); + return messages.empty(); +} + bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) { while (true) { @@ -348,7 +353,6 @@ void Queue::consume(Consumer& c, bool requestExclusive){ } } consumerCount++; - if (mgmtObject != 0) mgmtObject->inc_consumerCount (); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 2d238ff57d..e35b3ef7ee 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -107,7 +107,6 @@ namespace qpid { void notify(); void removeListener(Consumer&); - void addListener(Consumer&); bool isExcluded(boost::intrusive_ptr<Message>& msg); @@ -115,6 +114,9 @@ namespace qpid { void popAndDequeue(); public: + // FIXME aconway 2008-08-06: was private, verify if needed public. + void addListener(Consumer&); + virtual void notifyDurableIOComplete(); typedef boost::shared_ptr<Queue> shared_ptr; @@ -126,6 +128,8 @@ namespace qpid { management::Manageable* parent = 0); ~Queue(); + bool empty() const; + bool dispatch(Consumer&); void create(const qpid::framing::FieldTable& settings); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 484a406c3b..bf034a0559 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -590,6 +590,11 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) unacked.erase(range.start, range.end); } +bool SemanticState::ConsumerImpl::hasOutput() { + queue->addListener(*this); + return !queue->empty(); +} + bool SemanticState::ConsumerImpl::doOutput() { //TODO: think through properly diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index a0424bf747..e03d5ec89b 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -96,6 +96,7 @@ class SemanticState : public sys::OutputTask, Queue::shared_ptr getQueue() { return queue; } bool isBlocked() const { return blocked; } + bool hasOutput(); bool doOutput(); }; @@ -180,6 +181,7 @@ class SemanticState : public sys::OutputTask, void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); void handle(boost::intrusive_ptr<Message> msg); + bool hasOutput() { return outputTasks.hasOutput(); } bool doOutput() { return outputTasks.doOutput(); } //final 0-10 spec (completed and accepted are distinct): |
