summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-07-25 13:12:29 +0000
committerAlan Conway <aconway@apache.org>2011-07-25 13:12:29 +0000
commitda62c2bfa9d7f0382620432b68cf372a9118a6fd (patch)
treef3d968c65e4408d9c676ca90143fac41ad03175d
parent2acf99898514e6c44c8817f2962a64258c6e301a (diff)
downloadqpid-python-da62c2bfa9d7f0382620432b68cf372a9118a6fd.tar.gz
QPID-2920: Allow stopping consumers on queues.
Stop consumers from dispatching and wait for already dispatching consumers to exit. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1150685 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp27
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h47
-rw-r--r--qpid/cpp/src/qpid/sys/Stoppable.h91
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp49
4 files changed, 184 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 48ba320e39..6f2d1f95ce 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -373,11 +373,18 @@ void Queue::removeListener(Consumer::shared_ptr c)
bool Queue::dispatch(Consumer::shared_ptr c)
{
- QueuedMessage msg(this);
- if (getNextMessage(msg, c)) {
- c->deliver(msg);
- return true;
- } else {
+ Stoppable::Scope doDispatch(dispatching);
+ if (doDispatch) {
+ QueuedMessage msg(this);
+ if (getNextMessage(msg, c)) {
+ c->deliver(msg);
+ return true;
+ } else {
+ return false;
+ }
+ } else { // Dispatching is stopped
+ Mutex::ScopedLock locker(messageLock);
+ listeners.addListener(c); // FIXME aconway 2011-05-05:
return false;
}
}
@@ -1257,3 +1264,13 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
+void Queue::stop() {
+ dispatching.stop();
+}
+
+void Queue::start() {
+ dispatching.start();
+ notifyListener();
+}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 73d52ec9ca..1588ae1171 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,7 @@
#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Stoppable.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
@@ -70,17 +70,18 @@ class Exchange;
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
+ // Used to prevent destruction of the queue while it is in use.
struct UsageBarrier
{
Queue& parent;
uint count;
-
+
UsageBarrier(Queue&);
bool acquire();
void release();
void destroy();
};
-
+
struct ScopedUse
{
UsageBarrier& barrier;
@@ -88,7 +89,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
~ScopedUse() { if (acquired) barrier.release(); }
};
-
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -129,6 +130,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+ // Allow dispatching consumer threads to be stopped.
+ sys::Stoppable dispatching;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -184,8 +187,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
+ bool autodelete = false,
+ MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
@@ -245,11 +248,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
QPID_BROKER_EXTERN void purgeExpired();
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -288,8 +291,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* Inform queue of messages that were enqueued, have since
* been acquired but not yet accepted or released (and
* thus are still logically on the queue) - used in
- * clustered broker.
- */
+ * clustered broker.
+ */
void updateEnqueued(const QueuedMessage& msg);
/**
@@ -300,9 +303,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* accepted it).
*/
bool isEnqueued(const QueuedMessage& msg);
-
+
/**
- * Gets the next available message
+ * Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
@@ -377,9 +380,21 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void flush();
const Broker* getBroker();
+
+ /** Stop consumers. Return when all consumer threads are stopped.
+ *@pre Queue is active and not already stopping.
+ */
+ void stop();
+
+ /** Start consumers.
+ *@pre Queue is stopped and idle: no thread in dispatch.
+ */
+ void start();
+
+ /** Context data attached and used by cluster code. */
+ boost::intrusive_ptr<qpid::RefCounted> clusterContext;
};
-}
-}
+}} // qpid::broker
#endif /*!_broker_Queue_h*/
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h
new file mode 100644
index 0000000000..af21af46ba
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/Stoppable.h
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_STOPPABLE_H
+#define QPID_SYS_STOPPABLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace sys {
+
+/**
+ * An activity that may be executed by multiple threads, and can be stopped.
+ * Stopping prevents new threads from entering and waits till exiting busy threads leave.
+ */
+class Stoppable {
+ public:
+ Stoppable() : busy(0), stopped(false) {}
+ ~Stoppable() { stop(); }
+
+ /** Mark the scope of a busy thread like this:
+ * <pre>
+ * {
+ * Stoppable::Scope working(stoppable);
+ * if (working) { do stuff }
+ * }
+ * </pre>
+ */
+ class Scope {
+ Stoppable& state;
+ bool entered;
+ public:
+ Scope(Stoppable& s) : state(s) { entered = s.enter(); }
+ ~Scope() { if (entered) state.exit(); }
+ operator bool() const { return entered; }
+ };
+
+ friend class Scope;
+
+ /** Mark stopped, wait for all threads to leave their busy scope. */
+ void stop() {
+ sys::Monitor::ScopedLock l(lock);
+ stopped = true;
+ while (busy > 0) lock.wait();
+ }
+
+ /** Set the state to started.
+ *@pre state is stopped and no theads are busy.
+ */
+ void start() {
+ sys::Monitor::ScopedLock l(lock);
+ assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
+ stopped = false;
+ }
+
+ private:
+ uint busy;
+ bool stopped;
+ sys::Monitor lock;
+
+ bool enter() {
+ sys::Monitor::ScopedLock l(lock);
+ if (!stopped) ++busy;
+ return !stopped;
+ }
+
+ void exit() {
+ sys::Monitor::ScopedLock l(lock);
+ assert(busy > 0);
+ if (--busy == 0) lock.notifyAll();
+ }
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_STOPPABLE_H*/
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 2059727e7b..21139331d9 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -1,4 +1,4 @@
- /*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -41,6 +41,7 @@
#include <iostream>
#include "boost/format.hpp"
+#include <boost/enable_shared_from_this.hpp>
using boost::intrusive_ptr;
using namespace qpid;
@@ -57,16 +58,22 @@ public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
intrusive_ptr<Message> last;
- bool received;
- TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+ bool received, notified;
+
+ TestConsumer(bool acquire = true):
+ Consumer(acquire), received(false), notified(false) {};
virtual bool deliver(QueuedMessage& msg){
last = msg.payload;
received = true;
return true;
};
- void notify() {}
+ void notify() {
+ notified = true;
+ }
+
OwnershipToken* getSession() { return 0; }
+ void reset() { last = intrusive_ptr<Message>(); received = false; }
};
class FailOnDeliver : public Deliverable
@@ -303,11 +310,11 @@ QPID_AUTO_TEST_CASE(testSeek){
QueuedMessage qm;
queue->dispatch(consumer);
-
+
BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
-
+
}
QPID_AUTO_TEST_CASE(testSearch){
@@ -325,15 +332,15 @@ QPID_AUTO_TEST_CASE(testSearch){
SequenceNumber seq(2);
QueuedMessage qm = queue->find(seq);
-
+
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
+
queue->acquire(qm);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
QueuedMessage qm1 = queue->find(seq1);
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
+
}
const std::string nullxid = "";
@@ -1106,6 +1113,30 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
}
+QPID_AUTO_TEST_CASE(testStopStart) {
+ boost::shared_ptr<Queue> q(new Queue("foo"));
+ boost::shared_ptr<TestConsumer> c(new TestConsumer);
+ intrusive_ptr<Message> m = create_message("x","y");
+ q->consume(c);
+ // Initially q is started.
+ q->deliver(m);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK(c->received);
+ c->reset();
+ // Stop q, should not receive message
+ q->stop();
+ q->deliver(m);
+ BOOST_CHECK(!q->dispatch(c));
+ BOOST_CHECK(!c->received);
+ BOOST_CHECK(!c->notified);
+ // Start q, should be notified and delivered
+ q->start();
+ q->deliver(m);
+ BOOST_CHECK(c->notified);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK(c->received);
+}
+
QPID_AUTO_TEST_SUITE_END()