summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
committerAlan Conway <aconway@apache.org>2007-07-26 15:47:23 +0000
commit233cc9184c758702d8fa4a83d1bf8ec7dc0b3474 (patch)
tree6a73a6dfb117218e8fd94c8b447def68e0ab9de0 /cpp/src/qpid/sys
parent89a8765ee2bac1d77be65f1011ffeeb2cbbabe2d (diff)
downloadqpid-python-233cc9184c758702d8fa4a83d1bf8ec7dc0b3474.tar.gz
* README: Instructions for openais install.
* configure.ac: Enable clustering if suitable openais is present. * src/tests/Cluster.cpp, .h, Cluster_child: Updated for 0-10 * src/qpid/sys/ConcurrentQueue.h: Added waitPop() * src/Makefile.am, src/qpid/sys/ThreadSafeQueue.h, ProducerConsumer.h: Removed unused code, ConcurrentQueue provides same functionality. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h53
-rw-r--r--cpp/src/qpid/sys/ProducerConsumer.cpp141
-rw-r--r--cpp/src/qpid/sys/ProducerConsumer.h165
-rw-r--r--cpp/src/qpid/sys/ThreadSafeQueue.h98
4 files changed, 50 insertions, 407 deletions
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
index dd7689666b..917afc5704 100644
--- a/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,10 @@
*
*/
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ScopedIncrement.h"
+
+#include <boost/bind.hpp>
#include <deque>
@@ -33,9 +36,24 @@ namespace sys {
* Thread-safe queue that allows threads to push items onto
* the queue concurrently with threads popping items off the
* queue.
+ *
+ * Also allows consuming threads to wait until an item is available.
*/
template <class T> class ConcurrentQueue {
public:
+ ConcurrentQueue() : waiters(0), shutdown(false) {}
+
+ /** Threads in wait() are woken with ShutdownException before
+ * destroying the queue.
+ */
+ ~ConcurrentQueue() {
+ Mutex::ScopedLock l(lock);
+ shutdown = true;
+ lock.notifyAll();
+ while (waiters > 0)
+ lock.wait();
+ }
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
@@ -47,6 +65,28 @@ template <class T> class ConcurrentQueue {
*/
bool pop(T& data) {
Mutex::ScopedLock l(lock);
+ return popInternal(data);
+ }
+
+ /** Wait up to deadline for a data item to be available.
+ *@return true if data was available, false if timed out.
+ *@throws ShutdownException if the queue is destroyed.
+ */
+ bool waitPop(T& data, Duration timeout) {
+ Mutex::ScopedLock l(lock);
+ ScopedIncrement<size_t> w(
+ waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ AbsTime deadline(now(), timeout);
+ while (queue.empty() && lock.wait(deadline))
+ ;
+ return popInternal(data);
+ }
+
+ private:
+
+ bool popInternal(T& data) {
+ if (shutdown)
+ throw ShutdownException();
if (queue.empty())
return false;
else {
@@ -56,9 +96,16 @@ template <class T> class ConcurrentQueue {
}
}
- private:
- Mutex lock;
+ void noWaiters() {
+ assert(waiters == 0);
+ if (shutdown)
+ lock.notify(); // Notify dtor thread.
+ }
+
+ Monitor lock;
std::deque<T> queue;
+ size_t waiters;
+ bool shutdown;
};
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ProducerConsumer.cpp b/cpp/src/qpid/sys/ProducerConsumer.cpp
deleted file mode 100644
index e892f60794..0000000000
--- a/cpp/src/qpid/sys/ProducerConsumer.cpp
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-
-#include "qpid/QpidError.h"
-#include "ScopedIncrement.h"
-#include "ProducerConsumer.h"
-
-namespace qpid {
-namespace sys {
-
-// // ================ ProducerConsumer
-
-ProducerConsumer::ProducerConsumer(size_t init_items)
- : items(init_items), waiters(0), shutdownFlag(false)
-{}
-
-void ProducerConsumer::shutdown() {
- Mutex::ScopedLock l(monitor);
- shutdownFlag = true;
- monitor.notifyAll();
- // Wait for waiting consumers to wake up.
- while (waiters > 0)
- monitor.wait();
-}
-
-size_t ProducerConsumer::available() const {
- Mutex::ScopedLock l(monitor);
- return items;
-}
-
-size_t ProducerConsumer::consumers() const {
- Mutex::ScopedLock l(monitor);
- return waiters;
-}
-
-// ================ Lock
-
-ProducerConsumer::Lock::Lock(ProducerConsumer& p)
- : pc(p), lock(p.monitor), status(INCOMPLETE) {}
-
-bool ProducerConsumer::Lock::isOk() const {
- return !pc.isShutdown() && status==INCOMPLETE;
-}
-
-void ProducerConsumer::Lock::checkOk() const {
- assert(!pc.isShutdown());
- assert(status == INCOMPLETE);
-}
-
-ProducerConsumer::Lock::~Lock() {
- assert(status != INCOMPLETE || pc.isShutdown());
-}
-
-void ProducerConsumer::Lock::confirm() {
- checkOk();
- status = CONFIRMED;
-}
-
-void ProducerConsumer::Lock::cancel() {
- checkOk();
- status = CANCELLED;
-}
-
-// ================ ProducerLock
-
-ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p)
-{}
-
-
-ProducerConsumer::ProducerLock::~ProducerLock() {
- if (status == CONFIRMED) {
- pc.items++;
- pc.monitor.notify(); // Notify a consumer.
- }
-}
-
-// ================ ConsumerLock
-
-ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
-{
- if (isOk()) {
- ScopedIncrement<size_t> inc(pc.waiters);
- while (pc.items == 0 && !pc.shutdownFlag) {
- pc.monitor.wait();
- }
- }
-}
-
-ProducerConsumer::ConsumerLock::ConsumerLock(
- ProducerConsumer& p, const Duration& timeout) : Lock(p)
-{
- if (isOk()) {
- // Don't wait if timeout==0
- if (timeout == 0) {
- if (pc.items == 0)
- status = TIMEOUT;
- return;
- }
- else {
- AbsTime deadline(now(), timeout);
- ScopedIncrement<size_t> inc(pc.waiters);
- while (pc.items == 0 && !pc.shutdownFlag) {
- if (!pc.monitor.wait(deadline)) {
- status = TIMEOUT;
- return;
- }
- }
- }
- }
-}
-
-ProducerConsumer::ConsumerLock::~ConsumerLock() {
- if (pc.isShutdown()) {
- if (pc.waiters == 0)
- pc.monitor.notifyAll(); // Notify shutdown thread(s)
- }
- else if (status==CONFIRMED) {
- pc.items--;
- if (pc.items > 0)
- pc.monitor.notify(); // Notify another consumer.
- }
-}
-
-
-}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ProducerConsumer.h b/cpp/src/qpid/sys/ProducerConsumer.h
deleted file mode 100644
index 2a02dab503..0000000000
--- a/cpp/src/qpid/sys/ProducerConsumer.h
+++ /dev/null
@@ -1,165 +0,0 @@
-#ifndef _sys_ProducerConsumer_h
-#define _sys_ProducerConsumer_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <boost/noncopyable.hpp>
-#include "qpid/Exception.h"
-#include "Monitor.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Producer-consumer synchronisation.
- *
- * Producers increase the number of available items, consumers reduce it.
- * Consumers wait till an item is available. Waiting threads can be
- * woken for shutdown using shutdown().
- *
- * Note: Currently implements unbounded producer-consumer, i.e. no limit
- * to available items, producers never block. Can be extended to support
- * bounded PC if required.
- *
- // TODO aconway 2007-02-13: example, from tests.
-*/
-class ProducerConsumer
-{
- public:
- ProducerConsumer(size_t init_items=0);
-
- ~ProducerConsumer() { shutdown(); }
-
- /**
- * Wake any threads waiting for ProducerLock or ConsumerLock.
- *@post No threads are waiting in Producer or Consumer locks.
- */
- void shutdown();
-
- /** True if queue is shutdown */
- bool isShutdown() { return shutdownFlag; }
-
- /** Number of items available for consumers */
- size_t available() const;
-
- /** Number of consumers waiting for items */
- size_t consumers() const;
-
- /** True if available == 0 */
- bool empty() const { return available() == 0; }
-
- /**
- * Base class for producer and consumer locks.
- */
- class Lock : private boost::noncopyable {
- public:
-
- /**
- * You must call isOk() after creating a lock to verify its state.
- *
- *@return true means the lock succeeded. You MUST call either
- *confirm() or cancel() before the lock goes out of scope.
- *
- * false means the lock failed - timed out or the
- * ProducerConsumer is shutdown. You should not do anything in
- * the scope of the lock.
- */
- bool isOk() const;
-
- /**
- * Confirm that an item was produced/consumed.
- *@pre isOk()
- */
- void confirm();
-
- /**
- * Cancel the lock to indicate nothing was produced/consumed.
- * Note that locks are not actually released until destroyed.
- *
- *@pre isOk()
- */
- void cancel();
-
- /** True if this lock experienced a timeout */
- bool isTimedOut() const { return status == TIMEOUT; }
-
- /** True if we have been shutdown */
- bool isShutdown() const { return pc.isShutdown(); }
-
- ProducerConsumer& pc;
-
- protected:
- /** Lock status */
- enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT };
-
- Lock(ProducerConsumer& p);
- ~Lock();
- void checkOk() const;
- Mutex::ScopedLock lock;
- Status status;
- };
-
- /** Lock for code that produces items. */
- struct ProducerLock : public Lock {
- /**
- * Acquire locks to produce an item.
- *@post If isOk() the calling thread has exclusive access
- * to produce an item.
- */
- ProducerLock(ProducerConsumer& p);
-
- /** Release locks, signal waiting consumers if confirm() was called. */
- ~ProducerLock();
- };
-
- /** Lock for code that consumes items */
- struct ConsumerLock : public Lock {
- /**
- * Wait for an item to consume and acquire locks.
- *
- *@post If isOk() there is at least one item available and the
- *calling thread has exclusive access to consume it.
- */
- ConsumerLock(ProducerConsumer& p);
-
- /**
- * Wait up to timeout to acquire lock.
- *@post If isOk() caller has a producer lock.
- * If isTimedOut() there was a timeout.
- * If neither then we were shutdown.
- */
- ConsumerLock(ProducerConsumer& p, const Duration& timeout);
-
- /** Release locks */
- ~ConsumerLock();
- };
-
- private:
- mutable Monitor monitor;
- size_t items;
- size_t waiters;
- bool shutdownFlag;
-
- friend class Lock;
- friend class ProducerLock;
- friend class ConsumerLock;
-};
-
-}} // namespace qpid::sys
-
-#endif /*!_sys_ProducerConsumer_h*/
diff --git a/cpp/src/qpid/sys/ThreadSafeQueue.h b/cpp/src/qpid/sys/ThreadSafeQueue.h
deleted file mode 100644
index 8f11c42051..0000000000
--- a/cpp/src/qpid/sys/ThreadSafeQueue.h
+++ /dev/null
@@ -1,98 +0,0 @@
-#ifndef _sys_ThreadSafeQueue_h
-#define _sys_ThreadSafeQueue_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <deque>
-#include "ProducerConsumer.h"
-#include "qpid/Exception.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * A thread safe queue template.
- */
-template <class T, class ContainerType=std::deque<T> >
-class ThreadSafeQueue
-{
- public:
-
- ThreadSafeQueue() {}
-
- /** Push a value onto the back of the queue */
- void push(const T& value) {
- ProducerConsumer::ProducerLock producer(pc);
- if (producer.isOk()) {
- producer.confirm();
- container.push_back(value);
- }
- }
-
- /** Pop a value from the front of the queue. Waits till value is available.
- *@throw ShutdownException if queue is shutdown while waiting.
- */
- T pop() {
- ProducerConsumer::ConsumerLock consumer(pc);
- if (consumer.isOk()) {
- consumer.confirm();
- T value(container.front());
- container.pop_front();
- return value;
- }
- throw ShutdownException();
- }
-
- /**
- * If a value becomes available within the timeout, set outValue
- * and return true. Otherwise return false;
- */
- bool pop(T& outValue, const Time& timeout) {
- ProducerConsumer::ConsumerLock consumer(pc, timeout);
- if (consumer.isOk()) {
- consumer.confirm();
- outValue = container.front();
- container.pop_front();
- return true;
- }
- return false;
- }
-
- /** Interrupt threads waiting in pop() */
- void shutdown() { pc.shutdown(); }
-
- /** True if queue is shutdown */
- bool isShutdown() { return pc.isShutdown(); }
-
- /** Size of the queue */
- size_t size() { ProducerConsumer::Lock l(pc); return container.size(); }
-
- /** True if queue is empty */
- bool empty() { ProducerConsumer::Lock l(pc); return container.empty(); }
-
- private:
- ProducerConsumer pc;
- ContainerType container;
-};
-
-}} // namespace qpid::sys
-
-
-
-#endif /*!_sys_ThreadSafeQueue_h*/