diff options
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 4 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 1 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 6 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 8 | ||||
| -rw-r--r-- | cpp/lib/common/Makefile.am | 6 | ||||
| -rw-r--r-- | cpp/lib/common/sys/AtomicCount.h | 24 | ||||
| -rw-r--r-- | cpp/lib/common/sys/ProducerConsumer.cpp | 141 | ||||
| -rw-r--r-- | cpp/lib/common/sys/ProducerConsumer.h | 165 | ||||
| -rw-r--r-- | cpp/lib/common/sys/ScopedIncrement.h | 59 | ||||
| -rw-r--r-- | cpp/tests/InProcessBroker.h | 167 | ||||
| -rw-r--r-- | cpp/tests/Makefile.am | 5 | ||||
| -rw-r--r-- | cpp/tests/ProducerConsumerTest.cpp | 283 |
12 files changed, 684 insertions, 185 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 674d0e9505..fc82e3111d 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -327,9 +327,9 @@ void Channel::handleMethodInContext( ) { try{ - if(id != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { + if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { std::stringstream out; - out << "Attempt to use unopened channel: " << id; + out << "Attempt to use unopened channel: " << getId(); throw ConnectionException(504, out.str()); } else { method->invoke(*adapter, context); diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 1a1c4dabba..18e833b85c 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -78,7 +78,6 @@ class Channel : public framing::ChannelAdapter, typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; Connection& connection; - u_int16_t id; u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 52910f5161..613469c4ba 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -54,11 +54,6 @@ AMQP_ServerProxy& Channel::brokerProxy() { return *proxy; } -AMQMethodBody::shared_ptr Channel::brokerResponse() { - // FIXME aconway 2007-02-08: implement responses. - return AMQMethodBody::shared_ptr(); -} - void Channel::open(ChannelId id, Connection& con) { if (isOpen()) @@ -482,7 +477,6 @@ void Channel::close( u_int16_t code, const std::string& text, ClassId classId, MethodId methodId) { - // FIXME aconway 2007-01-26: Locking? if (getId() != 0 && isOpen()) { try { sendAndReceive<ChannelCloseOkBody>( diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 1c082f3b59..d6ec1d9772 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -44,6 +44,7 @@ namespace qpid { namespace framing { class ChannelCloseBody; class AMQP_ServerProxy; +class AMQMethodBody; } namespace client { @@ -89,10 +90,13 @@ class Channel : public framing::ChannelAdapter, u_int64_t lastDeliveryTag; }; typedef std::map<std::string, Consumer> ConsumerMap; + typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods; + static const std::string OK; Connection* connection; sys::Thread dispatcher; + IncomingMethods incomingMethods; IncomingMessage* incoming; ResponseHandler responses; std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume @@ -367,12 +371,12 @@ class Channel : public framing::ChannelAdapter, * Returns a proxy for the "raw" AMQP broker protocol. Only for use by * protocol experts. */ - framing::AMQP_ServerProxy& brokerProxy(); + /** * Wait for the next method from the broker. */ - framing::AMQMethodBody::shared_ptr brokerResponse(); + framing::AMQMethodBody::shared_ptr receive(); }; }} diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index c44480bddf..971571089f 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -85,7 +85,8 @@ libqpidcommon_la_SOURCES = \ ExceptionHolder.cpp \ QpidError.cpp \ sys/Runnable.cpp \ - sys/Time.cpp + sys/Time.cpp \ + sys/ProducerConsumer.cpp nobase_pkginclude_HEADERS = \ $(gen)/AMQP_HighestVersion.h \ @@ -132,7 +133,8 @@ nobase_pkginclude_HEADERS = \ sys/Socket.h \ sys/Thread.h \ sys/Time.h \ - sys/TimeoutHandler.h + sys/TimeoutHandler.h \ + sys/ProducerConsumer.h # Force build during dist phase so help2man will work. diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h index b625b2c9b0..63670cbf00 100644 --- a/cpp/lib/common/sys/AtomicCount.h +++ b/cpp/lib/common/sys/AtomicCount.h @@ -20,7 +20,7 @@ */ #include <boost/detail/atomic_count.hpp> -#include <boost/noncopyable.hpp> +#include "ScopedIncrement.h" namespace qpid { namespace sys { @@ -30,26 +30,8 @@ namespace sys { */ class AtomicCount : boost::noncopyable { public: - class ScopedDecrement : boost::noncopyable { - public: - /** Decrement counter in constructor and increment in destructor. */ - ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } - ~ScopedDecrement() { ++count; } - /** Return the value returned by the decrement. */ - operator long() { return value; } - private: - AtomicCount& count; - long value; - }; - - class ScopedIncrement : boost::noncopyable { - public: - /** Increment counter in constructor and increment in destructor. */ - ScopedIncrement(AtomicCount& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } - private: - AtomicCount& count; - }; + typedef ScopedDecrement<AtomicCount> ScopedDecrement; + typedef ScopedIncrement<AtomicCount> ScopedIncrement; AtomicCount(long value = 0) : count(value) {} diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp new file mode 100644 index 0000000000..3f6156f230 --- /dev/null +++ b/cpp/lib/common/sys/ProducerConsumer.cpp @@ -0,0 +1,141 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "QpidError.h" +#include "ScopedIncrement.h" +#include "ProducerConsumer.h" + +namespace qpid { +namespace sys { + +// // ================ ProducerConsumer + +ProducerConsumer::ProducerConsumer(size_t init_items) + : items(init_items), waiters(0), stopped(false) +{} + +void ProducerConsumer::stop() { + Mutex::ScopedLock l(monitor); + stopped = true; + monitor.notifyAll(); + // Wait for waiting consumers to wake up. + while (waiters > 0) + monitor.wait(); +} + +size_t ProducerConsumer::available() const { + Mutex::ScopedLock l(monitor); + return items; +} + +size_t ProducerConsumer::consumers() const { + Mutex::ScopedLock l(monitor); + return waiters; +} + +// ================ Lock + +ProducerConsumer::Lock::Lock(ProducerConsumer& p) + : pc(p), lock(p.monitor), status(INCOMPLETE) {} + +bool ProducerConsumer::Lock::isOk() const { + return !pc.isStopped() && status==INCOMPLETE; +} + +void ProducerConsumer::Lock::checkOk() const { + assert(!pc.isStopped()); + assert(status == INCOMPLETE); +} + +ProducerConsumer::Lock::~Lock() { + assert(status != INCOMPLETE || pc.isStopped()); +} + +void ProducerConsumer::Lock::confirm() { + checkOk(); + status = CONFIRMED; +} + +void ProducerConsumer::Lock::cancel() { + checkOk(); + status = CANCELLED; +} + +// ================ ProducerLock + +ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p) +{} + + +ProducerConsumer::ProducerLock::~ProducerLock() { + if (status == CONFIRMED) { + pc.items++; + pc.monitor.notify(); // Notify a consumer. + } +} + +// ================ ConsumerLock + +ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) +{ + if (isOk()) { + ScopedIncrement<size_t> inc(pc.waiters); + while (pc.items == 0 && !pc.stopped) { + pc.monitor.wait(); + } + } +} + +ProducerConsumer::ConsumerLock::ConsumerLock( + ProducerConsumer& p, const Time& timeout) : Lock(p) +{ + if (isOk()) { + // Don't wait if timeout==0 + if (timeout == 0) { + if (pc.items == 0) + status = TIMEOUT; + return; + } + else { + Time deadline = now() + timeout; + ScopedIncrement<size_t> inc(pc.waiters); + while (pc.items == 0 && !pc.stopped) { + if (!pc.monitor.wait(deadline)) { + status = TIMEOUT; + return; + } + } + } + } +} + +ProducerConsumer::ConsumerLock::~ConsumerLock() { + if (pc.isStopped()) { + if (pc.waiters == 0) + pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s) + } + else if (status==CONFIRMED) { + pc.items--; + if (pc.items > 0) + pc.monitor.notify(); // Notify another consumer. + } +} + + +}} // namespace qpid::sys diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h new file mode 100644 index 0000000000..742639323b --- /dev/null +++ b/cpp/lib/common/sys/ProducerConsumer.h @@ -0,0 +1,165 @@ +#ifndef _sys_ProducerConsumer_h +#define _sys_ProducerConsumer_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <boost/noncopyable.hpp> +#include "Exception.h" +#include "sys/Monitor.h" + +namespace qpid { +namespace sys { + +/** + * Producer-consumer synchronisation. + * + * Producers increase the number of available items, consumers reduce it. + * Consumers wait till an item is available. Waiting threads can be + * woken for shutdown using stop(). + * + * Note: Currently implements unbounded producer-consumer, i.e. no limit + * to available items, producers never block. Can be extended to support + * bounded PC if required. + * + // TODO aconway 2007-02-13: example, from tests. +*/ +class ProducerConsumer +{ + public: + ProducerConsumer(size_t init_items=0); + + ~ProducerConsumer() { stop(); } + + /** + * Wake any threads waiting for ProducerLock or ConsumerLock. + *@post No threads are waiting in Producer or Consumer locks. + */ + void stop(); + + /** True if queue is stopped */ + bool isStopped() { return stopped; } + + /** Number of items available for consumers */ + size_t available() const; + + /** Number of consumers waiting for items */ + size_t consumers() const; + + /** True if available == 0 */ + bool empty() const { return available() == 0; } + + /** + * Base class for producer and consumer locks. + */ + class Lock : private boost::noncopyable { + public: + + /** + * You must call isOk() after creating a lock to verify its state. + * + *@return true means the lock succeeded. You MUST call either + *confirm() or cancel() before the lock goes out of scope. + * + * false means the lock failed - timed out or the + * ProducerConsumer is stopped. You should not do anything in + * the scope of the lock. + */ + bool isOk() const; + + /** + * Confirm that an item was produced/consumed. + *@pre isOk() + */ + void confirm(); + + /** + * Cancel the lock to indicate nothing was produced/consumed. + * Note that locks are not actually released until destroyed. + * + *@pre isOk() + */ + void cancel(); + + /** True if this lock experienced a timeout */ + bool isTimedOut() const { return status == TIMEOUT; } + + /** True if we have been stopped */ + bool isStopped() const { return pc.isStopped(); } + + ProducerConsumer& pc; + + protected: + /** Lock status */ + enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT }; + + Lock(ProducerConsumer& p); + ~Lock(); + void checkOk() const; + Mutex::ScopedLock lock; + Status status; + }; + + /** Lock for code that produces items. */ + struct ProducerLock : public Lock { + /** + * Acquire locks to produce an item. + *@post If isOk() the calling thread has exclusive access + * to produce an item. + */ + ProducerLock(ProducerConsumer& p); + + /** Release locks, signal waiting consumers if confirm() was called. */ + ~ProducerLock(); + }; + + /** Lock for code that consumes items */ + struct ConsumerLock : public Lock { + /** + * Wait for an item to consume and acquire locks. + * + *@post If isOk() there is at least one item available and the + *calling thread has exclusive access to consume it. + */ + ConsumerLock(ProducerConsumer& p); + + /** + * Wait up to timeout to acquire lock. + *@post If isOk() caller has a producer lock. + * If isTimedOut() there was a timeout. + * If neither then we were stopped. + */ + ConsumerLock(ProducerConsumer& p, const Time& timeout); + + /** Release locks */ + ~ConsumerLock(); + }; + + private: + mutable Monitor monitor; + size_t items; + size_t waiters; + bool stopped; + + friend class Lock; + friend class ProducerLock; + friend class ConsumerLock; +}; + +}} // namespace qpid::sys + +#endif /*!_sys_ProducerConsumer_h*/ diff --git a/cpp/lib/common/sys/ScopedIncrement.h b/cpp/lib/common/sys/ScopedIncrement.h new file mode 100644 index 0000000000..f14461ddaf --- /dev/null +++ b/cpp/lib/common/sys/ScopedIncrement.h @@ -0,0 +1,59 @@ +#ifndef _posix_ScopedIncrement_h +#define _posix_ScopedIncrement_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** Increment counter in constructor and decrement in destructor. */ +template <class T> +class ScopedIncrement : boost::noncopyable +{ + public: + ScopedIncrement(T& c) : count(c) { ++count; } + ~ScopedIncrement() { --count; } + private: + T& count; +}; + + +/** Decrement counter in constructor and increment in destructor. */ +template <class T> +class ScopedDecrement : boost::noncopyable +{ + public: + ScopedDecrement(T& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + + /** Return the value after the decrement. */ + operator long() { return value; } + + private: + T& count; + long value; +}; + + +}} + + +#endif // _posix_ScopedIncrement_h diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h index cf2b9df8b0..8ace039bfe 100644 --- a/cpp/tests/InProcessBroker.h +++ b/cpp/tests/InProcessBroker.h @@ -26,6 +26,7 @@ #include "broker/Broker.h" #include "broker/Connection.h" #include "client/Connector.h" +#include "client/Connection.h" namespace qpid { namespace broker { @@ -54,6 +55,7 @@ framing::AMQFrame copy(framing::AMQFrame& from) { class InProcessBroker : public client::Connector { public: enum Sender {CLIENT,BROKER}; + struct Frame : public framing::AMQFrame { Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} bool fromBroker() const { return from == BROKER; } @@ -68,7 +70,7 @@ class InProcessBroker : public client::Connector { }; typedef std::vector<Frame> Conversation; - InProcessBroker(const framing::ProtocolVersion& ver) : + InProcessBroker(framing::ProtocolVersion ver) : Connector(ver), protocolInit(ver), broker(broker::Broker::create()), @@ -77,6 +79,8 @@ class InProcessBroker : public client::Connector { clientOut(CLIENT, conversation, &brokerConnection) {} + ~InProcessBroker() { broker->shutdown(); } + void connect(const std::string& /*host*/, int /*port*/) {} void init() { brokerConnection.initiated(&protocolInit); } void close() {} @@ -141,157 +145,22 @@ std::ostream& operator<<( }} // namespace qpid::broker -#endif /*!_tests_InProcessBroker_h*/ -#ifndef _tests_InProcessBroker_h -#define _tests_InProcessBroker_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <vector> -#include <iostream> -#include <algorithm> - -#include "framing/AMQFrame.h" -#include "broker/Broker.h" -#include "broker/Connection.h" -#include "client/Connector.h" - -namespace qpid { -namespace broker { - -/** Make a copy of a frame body. Inefficient, only intended for tests. */ -// TODO aconway 2007-01-29: from should be const, need to fix -// AMQPFrame::encode as const. -framing::AMQFrame copy(framing::AMQFrame& from) { - framing::Buffer buffer(from.size()); - from.encode(buffer); - buffer.flip(); - framing::AMQFrame result; - result.decode(buffer); - return result; -} - -/** - * A broker that implements client::Connector allowing direct - * in-process connection of client to broker. Used to write round-trip - * tests without requiring an external broker process. - * - * Also allows you to "snoop" on frames exchanged between client & broker. - * - * Use as follows: - * - \code - broker::InProcessBroker ibroker(version); - client::Connection clientConnection; - clientConnection.setConnector(ibroker); - clientConnection.open(""); - ... use as normal - \endcode - * - */ -class InProcessBroker : public client::Connector { +/** An in-process client+broker all in one. */ +class InProcessBrokerClient : public qpid::client::Connection { public: - enum Sender {CLIENT,BROKER}; - struct Frame : public framing::AMQFrame { - Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} - bool fromBroker() const { return from == BROKER; } - bool fromClient() const { return from == CLIENT; } - - template <class MethodType> - MethodType* asMethod() { - return dynamic_cast<MethodType*>(getBody().get()); - } - - Sender from; - }; - typedef std::vector<Frame> Conversation; - - InProcessBroker(const framing::ProtocolVersion& ver) : - Connector(ver), - protocolInit(ver), - broker(broker::Broker::create()), - brokerOut(BROKER, conversation), - brokerConnection(&brokerOut, *broker), - clientOut(CLIENT, conversation, &brokerConnection) - {} - - void connect(const std::string& /*host*/, int /*port*/) {} - void init() { brokerConnection.initiated(&protocolInit); } - void close() {} - - /** Client's input handler. */ - void setInputHandler(framing::InputHandler* handler) { - brokerOut.in = handler; + qpid::broker::InProcessBroker broker; + + /** Constructor creates broker and opens client connection. */ + InProcessBrokerClient(qpid::framing::ProtocolVersion version) + : broker(version) + { + setConnector(broker); + open(""); } - /** Called by client to send a frame */ - void send(framing::AMQFrame* frame) { - clientOut.send(frame); + ~InProcessBrokerClient() { + close(); // close before broker is deleted. } - - /** Entire client-broker conversation is recorded here */ - Conversation conversation; - - private: - /** OutputHandler that forwards data to an InputHandler */ - struct OutputToInputHandler : public sys::ConnectionOutputHandler { - OutputToInputHandler( - Sender from_, Conversation& conversation_, - framing::InputHandler* ih=0 - ) : from(from_), conversation(conversation_), in(ih) {} - - void send(framing::AMQFrame* frame) { - conversation.push_back(Frame(from, copy(*frame))); - in->received(frame); - } - - void close() {} - - Sender from; - Conversation& conversation; - framing::InputHandler* in; - }; - - framing::ProtocolInitiation protocolInit; - Broker::shared_ptr broker; - OutputToInputHandler brokerOut; - broker::Connection brokerConnection; - OutputToInputHandler clientOut; }; -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::Frame& frame) -{ - return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") << - static_cast<const framing::AMQFrame&>(frame); -} -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::Conversation& conv) -{ - for (InProcessBroker::Conversation::const_iterator i = conv.begin(); - i != conv.end(); ++i) - { - out << *i << std::endl; - } - return out; -} - - -}} // namespace qpid::broker - -#endif /*!_tests_InProcessBroker_h*/ +#endif // _tests_InProcessBroker_h diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index 2b51a5b125..768558f219 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -47,14 +47,15 @@ framing_tests = \ HeaderTest misc_tests = \ - ExceptionTest + ExceptionTest \ + ProducerConsumerTest posix_tests = \ EventChannelTest \ EventChannelThreadsTest unit_tests = \ - $(broker_tests) \ +b $(broker_tests) \ $(framing_tests) \ $(misc_tests) \ $(round_trip_tests) diff --git a/cpp/tests/ProducerConsumerTest.cpp b/cpp/tests/ProducerConsumerTest.cpp new file mode 100644 index 0000000000..e6d4090596 --- /dev/null +++ b/cpp/tests/ProducerConsumerTest.cpp @@ -0,0 +1,283 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <vector> +#include <iostream> + +#include <boost/bind.hpp> + +#include <qpid_test_plugin.h> +#include "InProcessBroker.h" +#include "sys/ProducerConsumer.h" +#include "sys/Thread.h" +#include "AMQP_HighestVersion.h" +#include "sys/AtomicCount.h" + +using namespace qpid::sys; +using namespace qpid::framing; +using namespace boost; +using namespace std; + +/** A counter that notifies a monitor when changed */ +class WatchedCounter : public Monitor { + public: + WatchedCounter(int i=0) : count(i) {} + WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {} + + WatchedCounter& operator=(const WatchedCounter& x) { + return *this = int(x); + } + + WatchedCounter& operator=(int i) { + Lock l(*this); + count = i; + return *this; + } + + int operator++() { + Lock l(*this); + notifyAll(); + return ++count; + } + + int operator++(int) { + Lock l(*this); + notifyAll(); + return count++; + } + + bool operator==(int i) const { + Lock l(const_cast<WatchedCounter&>(*this)); + return i == count; + } + + operator int() const { + Lock l(const_cast<WatchedCounter&>(*this)); + return count; + } + + bool waitFor(int i, Time timeout=TIME_SEC) { + Lock l(*this); + Time deadline = timeout+now(); + while (count != i) { + if (!wait(deadline)) + return false; + } + assert(count == i); + return true; + } + + private: + typedef Mutex::ScopedLock Lock; + int count; +}; + +class ProducerConsumerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ProducerConsumerTest); + CPPUNIT_TEST(testProduceConsume); + CPPUNIT_TEST(testTimeout); + CPPUNIT_TEST(testStop); + CPPUNIT_TEST(testCancel); + CPPUNIT_TEST_SUITE_END(); + + public: + InProcessBrokerClient client; + ProducerConsumer pc; + + WatchedCounter stopped; + WatchedCounter timeout; + WatchedCounter consumed; + WatchedCounter produced; + + struct ConsumeRunnable : public Runnable { + ProducerConsumerTest& test; + ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {} + void run() { test.consume(); } + }; + + struct ConsumeTimeoutRunnable : public Runnable { + ProducerConsumerTest& test; + Time timeout; + ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t) + : test(test_), timeout(t) {} + void run() { test.consumeTimeout(timeout); } + }; + + + void consumeInternal(ProducerConsumer::ConsumerLock& consumer) { + if (pc.isStopped()) { + ++stopped; + return; + } + if (consumer.isTimedOut()) { + ++timeout; + return; + } + CPPUNIT_ASSERT(consumer.isOk()); + CPPUNIT_ASSERT(pc.available() > 0); + consumer.confirm(); + consumed++; + } + + void consume() { + ProducerConsumer::ConsumerLock consumer(pc); + consumeInternal(consumer); + }; + + void consumeTimeout(const Time& timeout) { + ProducerConsumer::ConsumerLock consumer(pc, timeout); + consumeInternal(consumer); + }; + + void produce() { + ProducerConsumer::ProducerLock producer(pc); + CPPUNIT_ASSERT(producer.isOk()); + producer.confirm(); + produced++; + } + + void join(vector<Thread>& threads) { + for_each(threads.begin(), threads.end(), bind(&Thread::join,_1)); + } + + vector<Thread> startThreads(size_t n, Runnable& runnable) { + vector<Thread> threads(n); + while (n > 0) + threads[--n] = Thread(runnable); + return threads; + } + +public: + ProducerConsumerTest() : client(highestProtocolVersion) {} + + void testProduceConsume() { + ConsumeRunnable runMe(*this); + produce(); + produce(); + CPPUNIT_ASSERT(produced.waitFor(2)); + vector<Thread> threads = startThreads(1, runMe); + CPPUNIT_ASSERT(consumed.waitFor(1)); + join(threads); + + threads = startThreads(1, runMe); + CPPUNIT_ASSERT(consumed.waitFor(2)); + join(threads); + + threads = startThreads(3, runMe); + produce(); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(4)); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(5)); + join(threads); + CPPUNIT_ASSERT_EQUAL(0, int(stopped)); + } + + void testTimeout() { + try { + // 0 timeout no items available throws exception + ProducerConsumer::ConsumerLock consumer(pc, 0); + CPPUNIT_FAIL("Expected exception"); + } catch(...){} + + produce(); + CPPUNIT_ASSERT(produced.waitFor(1)); + CPPUNIT_ASSERT_EQUAL(1, int(pc.available())); + { + // 0 timeout succeeds if there's an item available. + ProducerConsumer::ConsumerLock consume(pc, 0); + CPPUNIT_ASSERT(consume.isOk()); + consume.confirm(); + } + CPPUNIT_ASSERT_EQUAL(0, int(pc.available())); + + // Produce an item within the timeout. + ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC); + vector<Thread> threads = startThreads(1, runMe); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(1)); + join(threads); + } + + + void testStop() { + ConsumeRunnable runMe(*this); + vector<Thread> threads = startThreads(2, runMe); + while (pc.consumers() != 2) + Thread::yield(); + pc.stop(); + CPPUNIT_ASSERT(stopped.waitFor(2)); + join(threads); + + threads = startThreads(1, runMe); // Should stop immediately. + CPPUNIT_ASSERT(stopped.waitFor(3)); + join(threads); + + // Produce/consume while stopped should return isStopped and + // throw on confirm. + try { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(pc.isStopped()); + CPPUNIT_FAIL("Expected exception"); + } + catch (...) {} // Expected + try { + ProducerConsumer::ConsumerLock c(pc); + CPPUNIT_ASSERT(pc.isStopped()); + CPPUNIT_FAIL("Expected exception"); + } + catch (...) {} // Expected + } + + void testCancel() { + CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); + { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(p.isOk()); + p.cancel(); + } + // Nothing was produced. + CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); + { + ProducerConsumer::ConsumerLock c(pc, 0); + CPPUNIT_ASSERT(c.isTimedOut()); + } + // Now produce but cancel the consume + { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(p.isOk()); + p.confirm(); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); + { + ProducerConsumer::ConsumerLock c(pc); + CPPUNIT_ASSERT(c.isOk()); + c.cancel(); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest); + |
