diff options
| author | Gordon Sim <gsim@apache.org> | 2008-07-10 14:44:01 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-07-10 14:44:01 +0000 |
| commit | 13ba086a8edc90d5e0d415e5116d748cec459822 (patch) | |
| tree | c0c8a7d69d57ff60a61983359c45b1ea819ea2c3 | |
| parent | cc8e46324bee3299678e7a9fbd4e3e5726a153b5 (diff) | |
| download | qpid-python-13ba086a8edc90d5e0d415e5116d748cec459822.tar.gz | |
Add a get() method to subscription manager that retrieves one message from the specified queue if available, returns false otherwise.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@675598 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FlowControl.h | 73 | ||||
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 45 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 68 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 97 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Waitable.h | 10 | ||||
| -rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 32 | ||||
| -rw-r--r-- | cpp/src/tests/ConcurrentQueue.cpp | 208 | ||||
| -rwxr-xr-x | cpp/src/tests/ais_run | 15 |
11 files changed, 280 insertions, 312 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index cc594b0f3b..931167b3ac 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -443,6 +443,7 @@ nobase_include_HEADERS = \ qpid/client/Demux.h \ qpid/client/Dispatcher.h \ qpid/client/Execution.h \ + qpid/client/FlowControl.h \ qpid/client/Future.h \ qpid/client/FutureCompletion.h \ qpid/client/FutureResult.h \ diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h new file mode 100644 index 0000000000..a4ed9879f4 --- /dev/null +++ b/cpp/src/qpid/client/FlowControl.h @@ -0,0 +1,73 @@ +#ifndef QPID_CLIENT_FLOWCONTROL_H +#define QPID_CLIENT_FLOWCONTROL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * Flow control works by associating a finite amount of "credit" + * associated with a subscription. + * + * Credit includes a message count and a byte count. Each message + * received decreases the message count by one, and the byte count by + * the size of the message. Either count can have the special value + * UNLIMITED which is never decreased. + * + * A subscription's credit is exhausted when the message count is 0 or + * the byte count is too small for the next available message. The + * subscription will not receive any further messages until is credit + * is renewed. + * + * In "window mode" credit is automatically renewed when a message is + * acknowledged (@see AckPolicy) In non-window mode credit is not + * automatically renewed, it must be explicitly re-set (@see + * SubscriptionManager) + */ +struct FlowControl { + static const uint32_t UNLIMITED=0xFFFFFFFF; + FlowControl(uint32_t messages_=0, uint32_t bytes_=0, bool window_=false) + : messages(messages_), bytes(bytes_), window(window_) {} + + static FlowControl messageCredit(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,false); } + static FlowControl messageWindow(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,true); } + static FlowControl byteCredit(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,false); } + static FlowControl byteWindow(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,true); } + static FlowControl unlimited() { return FlowControl(UNLIMITED, UNLIMITED, false); } + static FlowControl zero() { return FlowControl(0, 0, false); } + + /** Message credit: subscription can accept up to this many messages. */ + uint32_t messages; + /** Byte credit: subscription can accept up to this many bytes of message content. */ + uint32_t bytes; + /** Window mode. If true credit is automatically renewed as messages are acknowledged. */ + bool window; + + bool operator==(const FlowControl& x) { + return messages == x.messages && bytes == x.bytes && window == x.window; + }; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_FLOWCONTROL_H*/ diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index 04cee40a37..99ab6f0133 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -31,14 +31,25 @@ using namespace framing; LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {} LocalQueue::~LocalQueue() {} -Message LocalQueue::pop() { +Message LocalQueue::pop() { return get(); } + +Message LocalQueue::get() { + Message result; + bool ok = get(result, sys::TIME_INFINITE); + assert(ok); (void) ok; + return result; +} + +bool LocalQueue::get(Message& result, sys::Duration timeout) { if (!queue) throw ClosedException(); - FrameSet::shared_ptr content = queue->pop(); + FrameSet::shared_ptr content; + bool ok = queue->pop(content, timeout); + if (!ok) return false; if (content->isA<MessageTransferBody>()) { - Message m(*content); - autoAck.ack(m, session); - return m; + result = Message(*content); + autoAck.ack(result, session); + return true; } else throw CommandInvalidException( diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index c76d0756eb..f81065ef3c 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -25,6 +25,7 @@ #include "qpid/client/Message.h" #include "qpid/client/Demux.h" #include "qpid/client/AckPolicy.h" +#include "qpid/sys/Time.h" namespace qpid { namespace client { @@ -42,7 +43,7 @@ class LocalQueue public: /** Create a local queue. Subscribe the local queue to a remote broker * queue with a SubscriptionManager. - * + * * LocalQueue is an alternative to implementing a MessageListener. * *@param ackPolicy Policy for acknowledging messages. @see AckPolicy. @@ -51,14 +52,22 @@ class LocalQueue ~LocalQueue(); - /** Pop the next message off the local queue. + /** Wait up to timeout for the next message from the local queue. + *@param result Set to the message from the queue. + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if queue was empty after timeout. + */ + bool get(Message& result, sys::Duration timeout=0); + + /** Get the next message off the local queue, or wait for a + * message from the broker queue. *@exception ClosedException if subscription has been closed. */ - Message pop(); + Message get(); /** Synonym for get(). */ - Message get() { return pop(); } - + Message pop(); + /** Return true if local queue is empty. */ bool empty() const; @@ -72,10 +81,11 @@ class LocalQueue AckPolicy& getAckPolicy(); private: - friend class SubscriptionManager; Session session; Demux::QueuePtr queue; AckPolicy autoAck; + + friend class SubscriptionManager; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 3fa75a54ac..9bb75f9a49 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -25,6 +25,7 @@ #include <qpid/client/Dispatcher.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> +#include <qpid/framing/Uuid.h> #include <set> #include <sstream> @@ -34,35 +35,48 @@ namespace client { SubscriptionManager::SubscriptionManager(const Session& s) : dispatcher(s), session(s), - messages(UNLIMITED), bytes(UNLIMITED), window(true), + flowControl(UNLIMITED, UNLIMITED, false), acceptMode(0), acquireMode(0), autoStop(true) {} void SubscriptionManager::subscribeInternal( - const std::string& q, const std::string& dest) + const std::string& q, const std::string& dest, const FlowControl& fc) { session.messageSubscribe( arg::queue=q, arg::destination=dest, arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - setFlowControl(dest, messages, bytes, window); + if (fc.messages || fc.bytes) // No need to set if all 0. + setFlowControl(dest, fc); } void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { + subscribe(listener, q, getFlowControl(), d); +} + +void SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d) +{ std::string dest=d.empty() ? q:d; dispatcher.listen(dest, &listener, autoAck); - return subscribeInternal(q, dest); + return subscribeInternal(q, dest, fc); } void SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { + subscribe(lq, q, getFlowControl(), d); +} + +void SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d) +{ std::string dest=d.empty() ? q:d; lq.session=session; lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); - return subscribeInternal(q, dest); + return subscribeInternal(q, dest, fc); } void SubscriptionManager::setFlowControl( @@ -74,14 +88,20 @@ void SubscriptionManager::setFlowControl( session.sync(); } +void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) { + setFlowControl(dest, fc.messages, fc.bytes, fc.window); +} + +void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; } + void SubscriptionManager::setFlowControl( uint32_t messages_, uint32_t bytes_, bool window_) { - messages=messages_; - bytes=bytes_; - window=window_; + setFlowControl(FlowControl(messages_, bytes_, window_)); } +const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; } + void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } @@ -109,6 +129,15 @@ void SubscriptionManager::stop() dispatcher.stop(); } +bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { + LocalQueue lq; + std::string unique = framing::Uuid(true).str(); + subscribe(lq, queue, FlowControl::messageCredit(1), unique); + AutoCancel ac(*this, unique); + sync(session).messageFlush(unique); + return lq.get(result, timeout); +} + }} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 930175564e..3dad15fd29 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -27,8 +27,8 @@ #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> #include <qpid/client/LocalQueue.h> +#include <qpid/client/FlowControl.h> #include <qpid/sys/Runnable.h> - #include <set> #include <sstream> @@ -48,13 +48,11 @@ class SubscriptionManager : public sys::Runnable typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - void subscribeInternal(const std::string& q, const std::string& dest); + void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&); qpid::client::Dispatcher dispatcher; qpid::client::AsyncSession session; - uint32_t messages; - uint32_t bytes; - bool window; + FlowControl flowControl; AckPolicy autoAck; bool acceptMode; bool acquireMode; @@ -72,6 +70,38 @@ class SubscriptionManager : public sys::Runnable * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. + *@param flow initial FlowControl for the subscription. + *@param tag Unique destination tag for the listener. + * If not specified, the queue name is used. + */ + void subscribe(MessageListener& listener, + const std::string& queue, + const FlowControl& flow, + const std::string& tag=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + * Incoming messages are stored in the queue for you to retrieve. + * + *@param queue Name of the queue to subscribe to. + *@param flow initial FlowControl for the subscription. + *@param tag Unique destination tag for the listener. + * If not specified, the queue name is used. + */ + void subscribe(LocalQueue& localQueue, + const std::string& queue, + const FlowControl& flow, + const std::string& tag=std::string()); + + /** + * Subscribe a MessagesListener to receive messages from queue. + * + * Provide your own subclass of MessagesListener to process + * incoming messages. It will be called for each message received. + * + *@param listener Listener object to receive messages. + *@param queue Name of the queue to subscribe to. *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ @@ -92,6 +122,15 @@ class SubscriptionManager : public sys::Runnable const std::string& queue, const std::string& tag=std::string()); + + /** Get a single message from a queue. + *@param result is set to the message from the queue. + *@ + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if no message available after timeout. + */ + bool get(Message& result, const std::string& queue, sys::Duration timeout=0); + /** Cancel a subscription. */ void cancel(const std::string tag); @@ -107,9 +146,17 @@ class SubscriptionManager : public sys::Runnable /** Cause run() to return */ void stop(); - static const uint32_t UNLIMITED=0xFFFFFFFF; + /** Set the flow control for destination. */ + void setFlowControl(const std::string& destintion, const FlowControl& flow); + + /** Set the default initial flow control for subscriptions that do not specify it. */ + void setFlowControl(const FlowControl& flow); + + /** Get the default flow control for new subscriptions that do not specify it. */ + const FlowControl& getFlowControl() const; + /** Set the flow control for destination tag. *@param tag: name of the destination. *@param messages: message credit. @@ -148,6 +195,15 @@ class SubscriptionManager : public sys::Runnable AckPolicy& getAckPolicy(); }; +/** AutoCancel cancels a subscription in its destructor */ +class AutoCancel { + public: + AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {} + ~AutoCancel() { sm.cancel(tag); } + private: + SubscriptionManager& sm; + std::string tag; +}; }} // namespace qpid::client diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 86020fad81..4402c242e6 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -35,47 +35,45 @@ namespace sys { template <class T> class BlockingQueue { - mutable sys::Waitable lock; + mutable sys::Waitable waitable; std::queue<T> queue; - bool closed; public: - BlockingQueue() : closed(false) {} + BlockingQueue() {} ~BlockingQueue() { close(); } - /** Block until there is a value to pop */ - T pop() - { - Waitable::ScopedLock l(lock); - if (!queueWait()) throw ClosedException(); - return popInternal(); - } - - /** Non-blocking pop. If there is a value set outValue and return - * true, else return false; + /** Pop from the queue, block up to timeout if empty. + *@param result Set to value popped from queue. + *@param timeout Defaults to infinite. + *@return true if result was set, false if queue empty after timeout. */ - bool tryPop(T& outValue) { - Waitable::ScopedLock l(lock); + bool pop(T& result, Duration timeout=TIME_INFINITE) { + Mutex::ScopedLock l(waitable); + { + Waitable::ScopedWait w(waitable); + AbsTime deadline(now(),timeout); + while (queue.empty() && deadline > now()) waitable.wait(deadline); + } if (queue.empty()) return false; - outValue = popInternal(); + result = queue.front(); + queue.pop(); + if (!queue.empty()) + waitable.notify(); // Notify another waiter. return true; } - /** Non-blocking pop. If there is a value return it, else return - * valueIfEmpty. - */ - T tryPop(const T& valueIfEmpty=T()) { - T result=valueIfEmpty; - tryPop(result); + T pop() { + T result; + bool ok = pop(result); + assert(ok); (void) ok; // Infinite wait. return result; } - + /** Push a value onto the queue */ - void push(const T& t) - { - Waitable::ScopedLock l(lock); + void push(const T& t) { + Mutex::ScopedLock l(waitable); queue.push(t); - queueNotify(0); + waitable.notify(); // Notify a waiter. } /** @@ -84,56 +82,33 @@ public: */ void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException())) { - Waitable::ScopedLock l(lock); - if (!closed) { - lock.setException(ex); - closed = true; - lock.notifyAll(); - lock.waitWaiters(); // Ensure no threads are still waiting. + Mutex::ScopedLock l(waitable); + if (!waitable.hasException()) { + waitable.setException(ex); + waitable.notifyAll(); + waitable.waitWaiters(); // Ensure no threads are still waiting. } } /** Open a closed queue. */ void open() { - Waitable::ScopedLock l(lock); - closed=false; + Mutex::ScopedLock l(waitable); + waitable.resetException(); } bool isClosed() const { - Waitable::ScopedLock l(lock); - return closed; + Mutex::ScopedLock l(waitable); + return waitable.hasException(); } bool empty() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.empty(); } size_t size() const { - Waitable::ScopedLock l(lock); + Mutex::ScopedLock l(waitable); return queue.size(); } - - private: - - void queueNotify(size_t ignore) { - if (!queue.empty() && lock.hasWaiters()>ignore) - lock.notify(); // Notify another waiter. - } - - bool queueWait() { - Waitable::ScopedWait w(lock); - while (!closed && queue.empty()) - lock.wait(); - return !queue.empty(); - } - - T popInternal() { - T t=queue.front(); - queue.pop(); - queueNotify(1); - return t; - } - }; }} diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h index 61b7e7d82b..7701b6f97d 100644 --- a/cpp/src/qpid/sys/Waitable.h +++ b/cpp/src/qpid/sys/Waitable.h @@ -76,6 +76,12 @@ class Waitable : public Monitor { } + /** True if the waitable has an exception */ + bool hasException() const { return exception; } + + /** Clear the exception if any */ + void resetException() { exception.reset(); } + /** Throws an exception if one is set before or during the wait. */ void wait() { ExCheck e(exception); @@ -88,8 +94,6 @@ class Waitable : public Monitor { return Monitor::wait(absoluteTime); } - ExceptionHolder exception; - private: struct ExCheck { const ExceptionHolder& exception; @@ -98,6 +102,8 @@ class Waitable : public Monitor { }; size_t waiters; + ExceptionHolder exception; + friend struct ScopedWait; }; diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 83c3317094..5861dec9fc 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -41,6 +41,7 @@ using namespace qpid::client::arg; using namespace qpid::framing; using namespace qpid; using qpid::sys::Monitor; +using qpid::sys::TIME_SEC; using std::string; using std::cout; using std::endl; @@ -203,7 +204,7 @@ QPID_AUTO_TEST_CASE(testSendToSelf) { ClientSessionFixture fix; SimpleListener mylistener; fix.session.queueDeclare(queue="myq", exclusive=true, autoDelete=true); - fix.subs.subscribe(mylistener, "myq", "myq"); + fix.subs.subscribe(mylistener, "myq"); sys::Thread runner(fix.subs);//start dispatcher thread string data("msg"); Message msg(data, "myq"); @@ -222,5 +223,34 @@ QPID_AUTO_TEST_CASE(testSendToSelf) { } } +QPID_AUTO_TEST_CASE(testLocalQueue) { + ClientSessionFixture fix; + fix.session.queueDeclare(queue="lq", exclusive=true, autoDelete=true); + LocalQueue lq; + fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false)); + fix.session.messageTransfer(content=Message("foo0", "lq")); + fix.session.messageTransfer(content=Message("foo1", "lq")); + fix.session.messageTransfer(content=Message("foo2", "lq")); + BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); + BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); + BOOST_CHECK(lq.empty()); // Credit exhausted. + fix.subs.setFlowControl("lq", FlowControl::unlimited()); + BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); +} + +QPID_AUTO_TEST_CASE(testGet) { + ClientSessionFixture fix; + fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true); + fix.session.messageTransfer(content=Message("foo0", "getq")); + fix.session.messageTransfer(content=Message("foo1", "getq")); + Message got; + BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); + BOOST_CHECK_EQUAL("foo0", got.getData()); + BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); + BOOST_CHECK_EQUAL("foo1", got.getData()); + BOOST_CHECK(!fix.subs.get(got, "getq")); +} + QPID_AUTO_TEST_SUITE_END() + diff --git a/cpp/src/tests/ConcurrentQueue.cpp b/cpp/src/tests/ConcurrentQueue.cpp deleted file mode 100644 index c6ca40e897..0000000000 --- a/cpp/src/tests/ConcurrentQueue.cpp +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/**@file - * Compare alternative implementations for BlockingQueue. - */ - -#include "qpid/sys/BlockingQueue.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Time.h" - -#include <boost/test/test_tools.hpp> -#include <boost/bind.hpp> - -#include <deque> -#include <vector> -#include <iostream> - -#include "time.h" - -using namespace qpid::sys; -using namespace std; - -template <class T> class DualVectorDualLockQueue { - public: - /** Optionally specify initial capacity of the queue to minimize - * re-allocation. - */ - DualVectorDualLockQueue(size_t capacity=16) { - pushVec.reserve(capacity); - popVec.reserve(capacity); - popIter = popVec.end(); - } - - /** Push a data item onto the back of the queue */ - void push(const T& data) { - Mutex::ScopedLock l(pushLock); - pushVec.push_back(data); - } - - /** If the queue is non-empty, pop the front item into data and - * return true. If the queue is empty, return false - */ - bool tryPop(T& data) { - Mutex::ScopedLock l(popLock); - if (popIter == popVec.end()) { - popVec.clear(); - Mutex::ScopedLock l(pushLock); - pushVec.swap(popVec); - popIter = popVec.begin(); - } - if (popIter == popVec.end()) - return false; - else { - data = *popIter++; - return true; - } - } - - private: - Mutex pushLock, popLock; - std::vector<T> pushVec, popVec; - typename std::vector<T>::iterator popIter; -}; - -template <class T> struct LockedDequeQueue : public BlockingQueue<T> { - /** size_t ignored, can't pre-allocate space in a dequeue */ - LockedDequeQueue(size_t=0) {}; -}; - -// ================ Test code. - -/** Pause by sleeping */ -void nsleep(const Duration& delay) { - static Monitor m; - AbsTime stop(now(), delay); - while (now() < stop) - m.wait(stop); -} - -/** Pause by spinning */ -void nspin(const Duration& delay) { - AbsTime stop(now(), delay); - while (now() < stop) - ; -} - -/** Unlocked fake queue for comparison */ -struct NullQueue { - NullQueue(int items=0) : npush(items), npop(items) {} - void push(int) { --npush; } - bool tryPop(int& n) { - if (npop == 0) - return false; - else { - n=npop--; - return true; - } - } - volatile int npush, npop; -}; - - -// Global test parameters. -int items; -Duration delay(0); -boost::function<void()> npause; - -template <class Q> -struct Pusher : public Runnable { - Pusher(Q& q) : queue(q) {} - void run() { - for (int i=items; i > 0; i--) { - queue.push(i); - npause(); - } - } - Q& queue; -}; - -template <class Q> -struct Popper : public Runnable { - Popper(Q& q) : queue(q) {} - void run() { - for (int i=items; i > 0; i--) { - int n; - if (queue.tryPop(n)) - BOOST_REQUIRE_EQUAL(i,n); - npause(); - } - } - Q& queue; -}; - -ostream& operator<<(ostream& out, const Duration& d) { - return out << double(d)/TIME_MSEC << " msecs"; -} - -void report(const char* s, const Duration &d) { - cout << s << ": " << d - << " (" << (double(items)*TIME_SEC)/d << " push-pops/sec" << ")" - << endl; -} - -template <class Q, class PusherT=Pusher<Q>, class PopperT=Popper<Q> > -struct Timer { - static Duration time() { - cout << endl << "==" << typeid(Q).name() << endl; - - Q queue(items); - PusherT pusher(queue); - PopperT popper(queue); - - // Serial - AbsTime start=now(); - pusher.run(); - popper.run(); - Duration serial(start,now()); - report ("Serial", serial); - - // Concurrent - start=now(); - Thread pushThread(pusher); - Thread popThread(popper); - pushThread.join(); - popThread.join(); - Duration concurrent(start,now()); - report ("Concurrent", concurrent); - - cout << "Serial/concurrent: " << double(serial)/concurrent << endl; - return concurrent; - } -}; - -int test_main(int argc, char** argv) { - items = (argc > 1) ? atoi(argv[1]) : 250*1000; - delay = (argc > 2) ? atoi(argv[2]) : 4*1000; - npause=boost::bind(nspin, delay); - - cout << "Push/pop " << items << " items, delay=" << delay << endl; - Timer<NullQueue>::time(); - Duration dv = Timer<DualVectorDualLockQueue<int> >::time(); - Duration d = Timer<LockedDequeQueue<int> >::time(); - cout << endl; - cout << "Ratio deque/dual vector=" << double(d)/dv << endl; - return 0; -} -// namespace diff --git a/cpp/src/tests/ais_run b/cpp/src/tests/ais_run deleted file mode 100755 index 0f45edc39c..0000000000 --- a/cpp/src/tests/ais_run +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/sh -# -# Run AIS tests, assumes that ais_check has passed and we are -# running with the ais group ID. -# - -# FIXME aconway 2008-01-30: we should valgrind the cluster brokers. - -srcdir=`dirname $0` -$srcdir/start_cluster 4 -./ais_test -ret=$? -$srcdir/stop_cluster -exit $ret - |
