summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-07-10 14:44:01 +0000
committerGordon Sim <gsim@apache.org>2008-07-10 14:44:01 +0000
commit13ba086a8edc90d5e0d415e5116d748cec459822 (patch)
treec0c8a7d69d57ff60a61983359c45b1ea819ea2c3
parentcc8e46324bee3299678e7a9fbd4e3e5726a153b5 (diff)
downloadqpid-python-13ba086a8edc90d5e0d415e5116d748cec459822.tar.gz
Add a get() method to subscription manager that retrieves one message from the specified queue if available, returns false otherwise.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@675598 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/client/FlowControl.h73
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp21
-rw-r--r--cpp/src/qpid/client/LocalQueue.h22
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp45
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h68
-rw-r--r--cpp/src/qpid/sys/BlockingQueue.h97
-rw-r--r--cpp/src/qpid/sys/Waitable.h10
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp32
-rw-r--r--cpp/src/tests/ConcurrentQueue.cpp208
-rwxr-xr-xcpp/src/tests/ais_run15
11 files changed, 280 insertions, 312 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index cc594b0f3b..931167b3ac 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -443,6 +443,7 @@ nobase_include_HEADERS = \
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
+ qpid/client/FlowControl.h \
qpid/client/Future.h \
qpid/client/FutureCompletion.h \
qpid/client/FutureResult.h \
diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h
new file mode 100644
index 0000000000..a4ed9879f4
--- /dev/null
+++ b/cpp/src/qpid/client/FlowControl.h
@@ -0,0 +1,73 @@
+#ifndef QPID_CLIENT_FLOWCONTROL_H
+#define QPID_CLIENT_FLOWCONTROL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * Flow control works by associating a finite amount of "credit"
+ * associated with a subscription.
+ *
+ * Credit includes a message count and a byte count. Each message
+ * received decreases the message count by one, and the byte count by
+ * the size of the message. Either count can have the special value
+ * UNLIMITED which is never decreased.
+ *
+ * A subscription's credit is exhausted when the message count is 0 or
+ * the byte count is too small for the next available message. The
+ * subscription will not receive any further messages until is credit
+ * is renewed.
+ *
+ * In "window mode" credit is automatically renewed when a message is
+ * acknowledged (@see AckPolicy) In non-window mode credit is not
+ * automatically renewed, it must be explicitly re-set (@see
+ * SubscriptionManager)
+ */
+struct FlowControl {
+ static const uint32_t UNLIMITED=0xFFFFFFFF;
+ FlowControl(uint32_t messages_=0, uint32_t bytes_=0, bool window_=false)
+ : messages(messages_), bytes(bytes_), window(window_) {}
+
+ static FlowControl messageCredit(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,false); }
+ static FlowControl messageWindow(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,true); }
+ static FlowControl byteCredit(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,false); }
+ static FlowControl byteWindow(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,true); }
+ static FlowControl unlimited() { return FlowControl(UNLIMITED, UNLIMITED, false); }
+ static FlowControl zero() { return FlowControl(0, 0, false); }
+
+ /** Message credit: subscription can accept up to this many messages. */
+ uint32_t messages;
+ /** Byte credit: subscription can accept up to this many bytes of message content. */
+ uint32_t bytes;
+ /** Window mode. If true credit is automatically renewed as messages are acknowledged. */
+ bool window;
+
+ bool operator==(const FlowControl& x) {
+ return messages == x.messages && bytes == x.bytes && window == x.window;
+ };
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FLOWCONTROL_H*/
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
index 04cee40a37..99ab6f0133 100644
--- a/cpp/src/qpid/client/LocalQueue.cpp
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -31,14 +31,25 @@ using namespace framing;
LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
LocalQueue::~LocalQueue() {}
-Message LocalQueue::pop() {
+Message LocalQueue::pop() { return get(); }
+
+Message LocalQueue::get() {
+ Message result;
+ bool ok = get(result, sys::TIME_INFINITE);
+ assert(ok); (void) ok;
+ return result;
+}
+
+bool LocalQueue::get(Message& result, sys::Duration timeout) {
if (!queue)
throw ClosedException();
- FrameSet::shared_ptr content = queue->pop();
+ FrameSet::shared_ptr content;
+ bool ok = queue->pop(content, timeout);
+ if (!ok) return false;
if (content->isA<MessageTransferBody>()) {
- Message m(*content);
- autoAck.ack(m, session);
- return m;
+ result = Message(*content);
+ autoAck.ack(result, session);
+ return true;
}
else
throw CommandInvalidException(
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index c76d0756eb..f81065ef3c 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -25,6 +25,7 @@
#include "qpid/client/Message.h"
#include "qpid/client/Demux.h"
#include "qpid/client/AckPolicy.h"
+#include "qpid/sys/Time.h"
namespace qpid {
namespace client {
@@ -42,7 +43,7 @@ class LocalQueue
public:
/** Create a local queue. Subscribe the local queue to a remote broker
* queue with a SubscriptionManager.
- *
+ *
* LocalQueue is an alternative to implementing a MessageListener.
*
*@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
@@ -51,14 +52,22 @@ class LocalQueue
~LocalQueue();
- /** Pop the next message off the local queue.
+ /** Wait up to timeout for the next message from the local queue.
+ *@param result Set to the message from the queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if queue was empty after timeout.
+ */
+ bool get(Message& result, sys::Duration timeout=0);
+
+ /** Get the next message off the local queue, or wait for a
+ * message from the broker queue.
*@exception ClosedException if subscription has been closed.
*/
- Message pop();
+ Message get();
/** Synonym for get(). */
- Message get() { return pop(); }
-
+ Message pop();
+
/** Return true if local queue is empty. */
bool empty() const;
@@ -72,10 +81,11 @@ class LocalQueue
AckPolicy& getAckPolicy();
private:
- friend class SubscriptionManager;
Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
+
+ friend class SubscriptionManager;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 3fa75a54ac..9bb75f9a49 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -25,6 +25,7 @@
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
+#include <qpid/framing/Uuid.h>
#include <set>
#include <sstream>
@@ -34,35 +35,48 @@ namespace client {
SubscriptionManager::SubscriptionManager(const Session& s)
: dispatcher(s), session(s),
- messages(UNLIMITED), bytes(UNLIMITED), window(true),
+ flowControl(UNLIMITED, UNLIMITED, false),
acceptMode(0), acquireMode(0),
autoStop(true)
{}
void SubscriptionManager::subscribeInternal(
- const std::string& q, const std::string& dest)
+ const std::string& q, const std::string& dest, const FlowControl& fc)
{
session.messageSubscribe(
arg::queue=q, arg::destination=dest,
arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- setFlowControl(dest, messages, bytes, window);
+ if (fc.messages || fc.bytes) // No need to set if all 0.
+ setFlowControl(dest, fc);
}
void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
+ subscribe(listener, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+{
std::string dest=d.empty() ? q:d;
dispatcher.listen(dest, &listener, autoAck);
- return subscribeInternal(q, dest);
+ return subscribeInternal(q, dest, fc);
}
void SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
+ subscribe(lq, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
+{
std::string dest=d.empty() ? q:d;
lq.session=session;
lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
- return subscribeInternal(q, dest);
+ return subscribeInternal(q, dest, fc);
}
void SubscriptionManager::setFlowControl(
@@ -74,14 +88,20 @@ void SubscriptionManager::setFlowControl(
session.sync();
}
+void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
+ setFlowControl(dest, fc.messages, fc.bytes, fc.window);
+}
+
+void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
+
void SubscriptionManager::setFlowControl(
uint32_t messages_, uint32_t bytes_, bool window_)
{
- messages=messages_;
- bytes=bytes_;
- window=window_;
+ setFlowControl(FlowControl(messages_, bytes_, window_));
}
+const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
+
void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
@@ -109,6 +129,15 @@ void SubscriptionManager::stop()
dispatcher.stop();
}
+bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
+ LocalQueue lq;
+ std::string unique = framing::Uuid(true).str();
+ subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+ AutoCancel ac(*this, unique);
+ sync(session).messageFlush(unique);
+ return lq.get(result, timeout);
+}
+
}} // namespace qpid::client
#endif
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 930175564e..3dad15fd29 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -27,8 +27,8 @@
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
+#include <qpid/client/FlowControl.h>
#include <qpid/sys/Runnable.h>
-
#include <set>
#include <sstream>
@@ -48,13 +48,11 @@ class SubscriptionManager : public sys::Runnable
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest);
+ void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
qpid::client::Dispatcher dispatcher;
qpid::client::AsyncSession session;
- uint32_t messages;
- uint32_t bytes;
- bool window;
+ FlowControl flowControl;
AckPolicy autoAck;
bool acceptMode;
bool acquireMode;
@@ -72,6 +70,38 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
+ *@param flow initial FlowControl for the subscription.
+ *@param tag Unique destination tag for the listener.
+ * If not specified, the queue name is used.
+ */
+ void subscribe(MessageListener& listener,
+ const std::string& queue,
+ const FlowControl& flow,
+ const std::string& tag=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ * Incoming messages are stored in the queue for you to retrieve.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param flow initial FlowControl for the subscription.
+ *@param tag Unique destination tag for the listener.
+ * If not specified, the queue name is used.
+ */
+ void subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const FlowControl& flow,
+ const std::string& tag=std::string());
+
+ /**
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
+ * Provide your own subclass of MessagesListener to process
+ * incoming messages. It will be called for each message received.
+ *
+ *@param listener Listener object to receive messages.
+ *@param queue Name of the queue to subscribe to.
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
@@ -92,6 +122,15 @@ class SubscriptionManager : public sys::Runnable
const std::string& queue,
const std::string& tag=std::string());
+
+ /** Get a single message from a queue.
+ *@param result is set to the message from the queue.
+ *@
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if no message available after timeout.
+ */
+ bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
+
/** Cancel a subscription. */
void cancel(const std::string tag);
@@ -107,9 +146,17 @@ class SubscriptionManager : public sys::Runnable
/** Cause run() to return */
void stop();
-
static const uint32_t UNLIMITED=0xFFFFFFFF;
+ /** Set the flow control for destination. */
+ void setFlowControl(const std::string& destintion, const FlowControl& flow);
+
+ /** Set the default initial flow control for subscriptions that do not specify it. */
+ void setFlowControl(const FlowControl& flow);
+
+ /** Get the default flow control for new subscriptions that do not specify it. */
+ const FlowControl& getFlowControl() const;
+
/** Set the flow control for destination tag.
*@param tag: name of the destination.
*@param messages: message credit.
@@ -148,6 +195,15 @@ class SubscriptionManager : public sys::Runnable
AckPolicy& getAckPolicy();
};
+/** AutoCancel cancels a subscription in its destructor */
+class AutoCancel {
+ public:
+ AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+ ~AutoCancel() { sm.cancel(tag); }
+ private:
+ SubscriptionManager& sm;
+ std::string tag;
+};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h
index 86020fad81..4402c242e6 100644
--- a/cpp/src/qpid/sys/BlockingQueue.h
+++ b/cpp/src/qpid/sys/BlockingQueue.h
@@ -35,47 +35,45 @@ namespace sys {
template <class T>
class BlockingQueue
{
- mutable sys::Waitable lock;
+ mutable sys::Waitable waitable;
std::queue<T> queue;
- bool closed;
public:
- BlockingQueue() : closed(false) {}
+ BlockingQueue() {}
~BlockingQueue() { close(); }
- /** Block until there is a value to pop */
- T pop()
- {
- Waitable::ScopedLock l(lock);
- if (!queueWait()) throw ClosedException();
- return popInternal();
- }
-
- /** Non-blocking pop. If there is a value set outValue and return
- * true, else return false;
+ /** Pop from the queue, block up to timeout if empty.
+ *@param result Set to value popped from queue.
+ *@param timeout Defaults to infinite.
+ *@return true if result was set, false if queue empty after timeout.
*/
- bool tryPop(T& outValue) {
- Waitable::ScopedLock l(lock);
+ bool pop(T& result, Duration timeout=TIME_INFINITE) {
+ Mutex::ScopedLock l(waitable);
+ {
+ Waitable::ScopedWait w(waitable);
+ AbsTime deadline(now(),timeout);
+ while (queue.empty() && deadline > now()) waitable.wait(deadline);
+ }
if (queue.empty()) return false;
- outValue = popInternal();
+ result = queue.front();
+ queue.pop();
+ if (!queue.empty())
+ waitable.notify(); // Notify another waiter.
return true;
}
- /** Non-blocking pop. If there is a value return it, else return
- * valueIfEmpty.
- */
- T tryPop(const T& valueIfEmpty=T()) {
- T result=valueIfEmpty;
- tryPop(result);
+ T pop() {
+ T result;
+ bool ok = pop(result);
+ assert(ok); (void) ok; // Infinite wait.
return result;
}
-
+
/** Push a value onto the queue */
- void push(const T& t)
- {
- Waitable::ScopedLock l(lock);
+ void push(const T& t) {
+ Mutex::ScopedLock l(waitable);
queue.push(t);
- queueNotify(0);
+ waitable.notify(); // Notify a waiter.
}
/**
@@ -84,56 +82,33 @@ public:
*/
void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException()))
{
- Waitable::ScopedLock l(lock);
- if (!closed) {
- lock.setException(ex);
- closed = true;
- lock.notifyAll();
- lock.waitWaiters(); // Ensure no threads are still waiting.
+ Mutex::ScopedLock l(waitable);
+ if (!waitable.hasException()) {
+ waitable.setException(ex);
+ waitable.notifyAll();
+ waitable.waitWaiters(); // Ensure no threads are still waiting.
}
}
/** Open a closed queue. */
void open() {
- Waitable::ScopedLock l(lock);
- closed=false;
+ Mutex::ScopedLock l(waitable);
+ waitable.resetException();
}
bool isClosed() const {
- Waitable::ScopedLock l(lock);
- return closed;
+ Mutex::ScopedLock l(waitable);
+ return waitable.hasException();
}
bool empty() const {
- Waitable::ScopedLock l(lock);
+ Mutex::ScopedLock l(waitable);
return queue.empty();
}
size_t size() const {
- Waitable::ScopedLock l(lock);
+ Mutex::ScopedLock l(waitable);
return queue.size();
}
-
- private:
-
- void queueNotify(size_t ignore) {
- if (!queue.empty() && lock.hasWaiters()>ignore)
- lock.notify(); // Notify another waiter.
- }
-
- bool queueWait() {
- Waitable::ScopedWait w(lock);
- while (!closed && queue.empty())
- lock.wait();
- return !queue.empty();
- }
-
- T popInternal() {
- T t=queue.front();
- queue.pop();
- queueNotify(1);
- return t;
- }
-
};
}}
diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h
index 61b7e7d82b..7701b6f97d 100644
--- a/cpp/src/qpid/sys/Waitable.h
+++ b/cpp/src/qpid/sys/Waitable.h
@@ -76,6 +76,12 @@ class Waitable : public Monitor {
}
+ /** True if the waitable has an exception */
+ bool hasException() const { return exception; }
+
+ /** Clear the exception if any */
+ void resetException() { exception.reset(); }
+
/** Throws an exception if one is set before or during the wait. */
void wait() {
ExCheck e(exception);
@@ -88,8 +94,6 @@ class Waitable : public Monitor {
return Monitor::wait(absoluteTime);
}
- ExceptionHolder exception;
-
private:
struct ExCheck {
const ExceptionHolder& exception;
@@ -98,6 +102,8 @@ class Waitable : public Monitor {
};
size_t waiters;
+ ExceptionHolder exception;
+
friend struct ScopedWait;
};
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 83c3317094..5861dec9fc 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -41,6 +41,7 @@ using namespace qpid::client::arg;
using namespace qpid::framing;
using namespace qpid;
using qpid::sys::Monitor;
+using qpid::sys::TIME_SEC;
using std::string;
using std::cout;
using std::endl;
@@ -203,7 +204,7 @@ QPID_AUTO_TEST_CASE(testSendToSelf) {
ClientSessionFixture fix;
SimpleListener mylistener;
fix.session.queueDeclare(queue="myq", exclusive=true, autoDelete=true);
- fix.subs.subscribe(mylistener, "myq", "myq");
+ fix.subs.subscribe(mylistener, "myq");
sys::Thread runner(fix.subs);//start dispatcher thread
string data("msg");
Message msg(data, "myq");
@@ -222,5 +223,34 @@ QPID_AUTO_TEST_CASE(testSendToSelf) {
}
}
+QPID_AUTO_TEST_CASE(testLocalQueue) {
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(queue="lq", exclusive=true, autoDelete=true);
+ LocalQueue lq;
+ fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false));
+ fix.session.messageTransfer(content=Message("foo0", "lq"));
+ fix.session.messageTransfer(content=Message("foo1", "lq"));
+ fix.session.messageTransfer(content=Message("foo2", "lq"));
+ BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
+ BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
+ BOOST_CHECK(lq.empty()); // Credit exhausted.
+ fix.subs.setFlowControl("lq", FlowControl::unlimited());
+ BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
+}
+
+QPID_AUTO_TEST_CASE(testGet) {
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
+ fix.session.messageTransfer(content=Message("foo0", "getq"));
+ fix.session.messageTransfer(content=Message("foo1", "getq"));
+ Message got;
+ BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+ BOOST_CHECK_EQUAL("foo0", got.getData());
+ BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+ BOOST_CHECK_EQUAL("foo1", got.getData());
+ BOOST_CHECK(!fix.subs.get(got, "getq"));
+}
+
QPID_AUTO_TEST_SUITE_END()
+
diff --git a/cpp/src/tests/ConcurrentQueue.cpp b/cpp/src/tests/ConcurrentQueue.cpp
deleted file mode 100644
index c6ca40e897..0000000000
--- a/cpp/src/tests/ConcurrentQueue.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/**@file
- * Compare alternative implementations for BlockingQueue.
- */
-
-#include "qpid/sys/BlockingQueue.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Time.h"
-
-#include <boost/test/test_tools.hpp>
-#include <boost/bind.hpp>
-
-#include <deque>
-#include <vector>
-#include <iostream>
-
-#include "time.h"
-
-using namespace qpid::sys;
-using namespace std;
-
-template <class T> class DualVectorDualLockQueue {
- public:
- /** Optionally specify initial capacity of the queue to minimize
- * re-allocation.
- */
- DualVectorDualLockQueue(size_t capacity=16) {
- pushVec.reserve(capacity);
- popVec.reserve(capacity);
- popIter = popVec.end();
- }
-
- /** Push a data item onto the back of the queue */
- void push(const T& data) {
- Mutex::ScopedLock l(pushLock);
- pushVec.push_back(data);
- }
-
- /** If the queue is non-empty, pop the front item into data and
- * return true. If the queue is empty, return false
- */
- bool tryPop(T& data) {
- Mutex::ScopedLock l(popLock);
- if (popIter == popVec.end()) {
- popVec.clear();
- Mutex::ScopedLock l(pushLock);
- pushVec.swap(popVec);
- popIter = popVec.begin();
- }
- if (popIter == popVec.end())
- return false;
- else {
- data = *popIter++;
- return true;
- }
- }
-
- private:
- Mutex pushLock, popLock;
- std::vector<T> pushVec, popVec;
- typename std::vector<T>::iterator popIter;
-};
-
-template <class T> struct LockedDequeQueue : public BlockingQueue<T> {
- /** size_t ignored, can't pre-allocate space in a dequeue */
- LockedDequeQueue(size_t=0) {};
-};
-
-// ================ Test code.
-
-/** Pause by sleeping */
-void nsleep(const Duration& delay) {
- static Monitor m;
- AbsTime stop(now(), delay);
- while (now() < stop)
- m.wait(stop);
-}
-
-/** Pause by spinning */
-void nspin(const Duration& delay) {
- AbsTime stop(now(), delay);
- while (now() < stop)
- ;
-}
-
-/** Unlocked fake queue for comparison */
-struct NullQueue {
- NullQueue(int items=0) : npush(items), npop(items) {}
- void push(int) { --npush; }
- bool tryPop(int& n) {
- if (npop == 0)
- return false;
- else {
- n=npop--;
- return true;
- }
- }
- volatile int npush, npop;
-};
-
-
-// Global test parameters.
-int items;
-Duration delay(0);
-boost::function<void()> npause;
-
-template <class Q>
-struct Pusher : public Runnable {
- Pusher(Q& q) : queue(q) {}
- void run() {
- for (int i=items; i > 0; i--) {
- queue.push(i);
- npause();
- }
- }
- Q& queue;
-};
-
-template <class Q>
-struct Popper : public Runnable {
- Popper(Q& q) : queue(q) {}
- void run() {
- for (int i=items; i > 0; i--) {
- int n;
- if (queue.tryPop(n))
- BOOST_REQUIRE_EQUAL(i,n);
- npause();
- }
- }
- Q& queue;
-};
-
-ostream& operator<<(ostream& out, const Duration& d) {
- return out << double(d)/TIME_MSEC << " msecs";
-}
-
-void report(const char* s, const Duration &d) {
- cout << s << ": " << d
- << " (" << (double(items)*TIME_SEC)/d << " push-pops/sec" << ")"
- << endl;
-}
-
-template <class Q, class PusherT=Pusher<Q>, class PopperT=Popper<Q> >
-struct Timer {
- static Duration time() {
- cout << endl << "==" << typeid(Q).name() << endl;
-
- Q queue(items);
- PusherT pusher(queue);
- PopperT popper(queue);
-
- // Serial
- AbsTime start=now();
- pusher.run();
- popper.run();
- Duration serial(start,now());
- report ("Serial", serial);
-
- // Concurrent
- start=now();
- Thread pushThread(pusher);
- Thread popThread(popper);
- pushThread.join();
- popThread.join();
- Duration concurrent(start,now());
- report ("Concurrent", concurrent);
-
- cout << "Serial/concurrent: " << double(serial)/concurrent << endl;
- return concurrent;
- }
-};
-
-int test_main(int argc, char** argv) {
- items = (argc > 1) ? atoi(argv[1]) : 250*1000;
- delay = (argc > 2) ? atoi(argv[2]) : 4*1000;
- npause=boost::bind(nspin, delay);
-
- cout << "Push/pop " << items << " items, delay=" << delay << endl;
- Timer<NullQueue>::time();
- Duration dv = Timer<DualVectorDualLockQueue<int> >::time();
- Duration d = Timer<LockedDequeQueue<int> >::time();
- cout << endl;
- cout << "Ratio deque/dual vector=" << double(d)/dv << endl;
- return 0;
-}
-// namespace
diff --git a/cpp/src/tests/ais_run b/cpp/src/tests/ais_run
deleted file mode 100755
index 0f45edc39c..0000000000
--- a/cpp/src/tests/ais_run
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/bin/sh
-#
-# Run AIS tests, assumes that ais_check has passed and we are
-# running with the ais group ID.
-#
-
-# FIXME aconway 2008-01-30: we should valgrind the cluster brokers.
-
-srcdir=`dirname $0`
-$srcdir/start_cluster 4
-./ais_test
-ret=$?
-$srcdir/stop_cluster
-exit $ret
-