diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-14 15:02:10 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-14 15:02:10 +0000 |
| commit | 8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec (patch) | |
| tree | f43ed248e99d67639f4330e604a0ed718f736d22 /cpp/tests | |
| parent | 20a5f81e8bbf8d4429d55fffb47278e7ade81c17 (diff) | |
| download | qpid-python-8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec.tar.gz | |
* cpp/lib/common/sys/ProducerConsumer.h:
General-purpose producer-consumer synchronization. Anywhere we have
producer/consumer threads in qpid we should re-use this sync object
rather than re-inventing the synchronization each time.
* cpp/lib/common/sys/AtomicCount.h: Separated ScopedIncrement/ScopedDecrement
into ScopedIncrement.h
* cpp/tests/InProcessBroker.h: Added class InProcessBrokerClient, a
self contained in-process client + broker convenience for tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests')
| -rw-r--r-- | cpp/tests/InProcessBroker.h | 167 | ||||
| -rw-r--r-- | cpp/tests/Makefile.am | 5 | ||||
| -rw-r--r-- | cpp/tests/ProducerConsumerTest.cpp | 283 |
3 files changed, 304 insertions, 151 deletions
diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h index cf2b9df8b0..8ace039bfe 100644 --- a/cpp/tests/InProcessBroker.h +++ b/cpp/tests/InProcessBroker.h @@ -26,6 +26,7 @@ #include "broker/Broker.h" #include "broker/Connection.h" #include "client/Connector.h" +#include "client/Connection.h" namespace qpid { namespace broker { @@ -54,6 +55,7 @@ framing::AMQFrame copy(framing::AMQFrame& from) { class InProcessBroker : public client::Connector { public: enum Sender {CLIENT,BROKER}; + struct Frame : public framing::AMQFrame { Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} bool fromBroker() const { return from == BROKER; } @@ -68,7 +70,7 @@ class InProcessBroker : public client::Connector { }; typedef std::vector<Frame> Conversation; - InProcessBroker(const framing::ProtocolVersion& ver) : + InProcessBroker(framing::ProtocolVersion ver) : Connector(ver), protocolInit(ver), broker(broker::Broker::create()), @@ -77,6 +79,8 @@ class InProcessBroker : public client::Connector { clientOut(CLIENT, conversation, &brokerConnection) {} + ~InProcessBroker() { broker->shutdown(); } + void connect(const std::string& /*host*/, int /*port*/) {} void init() { brokerConnection.initiated(&protocolInit); } void close() {} @@ -141,157 +145,22 @@ std::ostream& operator<<( }} // namespace qpid::broker -#endif /*!_tests_InProcessBroker_h*/ -#ifndef _tests_InProcessBroker_h -#define _tests_InProcessBroker_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <vector> -#include <iostream> -#include <algorithm> - -#include "framing/AMQFrame.h" -#include "broker/Broker.h" -#include "broker/Connection.h" -#include "client/Connector.h" - -namespace qpid { -namespace broker { - -/** Make a copy of a frame body. Inefficient, only intended for tests. */ -// TODO aconway 2007-01-29: from should be const, need to fix -// AMQPFrame::encode as const. -framing::AMQFrame copy(framing::AMQFrame& from) { - framing::Buffer buffer(from.size()); - from.encode(buffer); - buffer.flip(); - framing::AMQFrame result; - result.decode(buffer); - return result; -} - -/** - * A broker that implements client::Connector allowing direct - * in-process connection of client to broker. Used to write round-trip - * tests without requiring an external broker process. - * - * Also allows you to "snoop" on frames exchanged between client & broker. - * - * Use as follows: - * - \code - broker::InProcessBroker ibroker(version); - client::Connection clientConnection; - clientConnection.setConnector(ibroker); - clientConnection.open(""); - ... use as normal - \endcode - * - */ -class InProcessBroker : public client::Connector { +/** An in-process client+broker all in one. */ +class InProcessBrokerClient : public qpid::client::Connection { public: - enum Sender {CLIENT,BROKER}; - struct Frame : public framing::AMQFrame { - Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {} - bool fromBroker() const { return from == BROKER; } - bool fromClient() const { return from == CLIENT; } - - template <class MethodType> - MethodType* asMethod() { - return dynamic_cast<MethodType*>(getBody().get()); - } - - Sender from; - }; - typedef std::vector<Frame> Conversation; - - InProcessBroker(const framing::ProtocolVersion& ver) : - Connector(ver), - protocolInit(ver), - broker(broker::Broker::create()), - brokerOut(BROKER, conversation), - brokerConnection(&brokerOut, *broker), - clientOut(CLIENT, conversation, &brokerConnection) - {} - - void connect(const std::string& /*host*/, int /*port*/) {} - void init() { brokerConnection.initiated(&protocolInit); } - void close() {} - - /** Client's input handler. */ - void setInputHandler(framing::InputHandler* handler) { - brokerOut.in = handler; + qpid::broker::InProcessBroker broker; + + /** Constructor creates broker and opens client connection. */ + InProcessBrokerClient(qpid::framing::ProtocolVersion version) + : broker(version) + { + setConnector(broker); + open(""); } - /** Called by client to send a frame */ - void send(framing::AMQFrame* frame) { - clientOut.send(frame); + ~InProcessBrokerClient() { + close(); // close before broker is deleted. } - - /** Entire client-broker conversation is recorded here */ - Conversation conversation; - - private: - /** OutputHandler that forwards data to an InputHandler */ - struct OutputToInputHandler : public sys::ConnectionOutputHandler { - OutputToInputHandler( - Sender from_, Conversation& conversation_, - framing::InputHandler* ih=0 - ) : from(from_), conversation(conversation_), in(ih) {} - - void send(framing::AMQFrame* frame) { - conversation.push_back(Frame(from, copy(*frame))); - in->received(frame); - } - - void close() {} - - Sender from; - Conversation& conversation; - framing::InputHandler* in; - }; - - framing::ProtocolInitiation protocolInit; - Broker::shared_ptr broker; - OutputToInputHandler brokerOut; - broker::Connection brokerConnection; - OutputToInputHandler clientOut; }; -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::Frame& frame) -{ - return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") << - static_cast<const framing::AMQFrame&>(frame); -} -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::Conversation& conv) -{ - for (InProcessBroker::Conversation::const_iterator i = conv.begin(); - i != conv.end(); ++i) - { - out << *i << std::endl; - } - return out; -} - - -}} // namespace qpid::broker - -#endif /*!_tests_InProcessBroker_h*/ +#endif // _tests_InProcessBroker_h diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index 2b51a5b125..768558f219 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -47,14 +47,15 @@ framing_tests = \ HeaderTest misc_tests = \ - ExceptionTest + ExceptionTest \ + ProducerConsumerTest posix_tests = \ EventChannelTest \ EventChannelThreadsTest unit_tests = \ - $(broker_tests) \ +b $(broker_tests) \ $(framing_tests) \ $(misc_tests) \ $(round_trip_tests) diff --git a/cpp/tests/ProducerConsumerTest.cpp b/cpp/tests/ProducerConsumerTest.cpp new file mode 100644 index 0000000000..e6d4090596 --- /dev/null +++ b/cpp/tests/ProducerConsumerTest.cpp @@ -0,0 +1,283 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <vector> +#include <iostream> + +#include <boost/bind.hpp> + +#include <qpid_test_plugin.h> +#include "InProcessBroker.h" +#include "sys/ProducerConsumer.h" +#include "sys/Thread.h" +#include "AMQP_HighestVersion.h" +#include "sys/AtomicCount.h" + +using namespace qpid::sys; +using namespace qpid::framing; +using namespace boost; +using namespace std; + +/** A counter that notifies a monitor when changed */ +class WatchedCounter : public Monitor { + public: + WatchedCounter(int i=0) : count(i) {} + WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {} + + WatchedCounter& operator=(const WatchedCounter& x) { + return *this = int(x); + } + + WatchedCounter& operator=(int i) { + Lock l(*this); + count = i; + return *this; + } + + int operator++() { + Lock l(*this); + notifyAll(); + return ++count; + } + + int operator++(int) { + Lock l(*this); + notifyAll(); + return count++; + } + + bool operator==(int i) const { + Lock l(const_cast<WatchedCounter&>(*this)); + return i == count; + } + + operator int() const { + Lock l(const_cast<WatchedCounter&>(*this)); + return count; + } + + bool waitFor(int i, Time timeout=TIME_SEC) { + Lock l(*this); + Time deadline = timeout+now(); + while (count != i) { + if (!wait(deadline)) + return false; + } + assert(count == i); + return true; + } + + private: + typedef Mutex::ScopedLock Lock; + int count; +}; + +class ProducerConsumerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ProducerConsumerTest); + CPPUNIT_TEST(testProduceConsume); + CPPUNIT_TEST(testTimeout); + CPPUNIT_TEST(testStop); + CPPUNIT_TEST(testCancel); + CPPUNIT_TEST_SUITE_END(); + + public: + InProcessBrokerClient client; + ProducerConsumer pc; + + WatchedCounter stopped; + WatchedCounter timeout; + WatchedCounter consumed; + WatchedCounter produced; + + struct ConsumeRunnable : public Runnable { + ProducerConsumerTest& test; + ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {} + void run() { test.consume(); } + }; + + struct ConsumeTimeoutRunnable : public Runnable { + ProducerConsumerTest& test; + Time timeout; + ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t) + : test(test_), timeout(t) {} + void run() { test.consumeTimeout(timeout); } + }; + + + void consumeInternal(ProducerConsumer::ConsumerLock& consumer) { + if (pc.isStopped()) { + ++stopped; + return; + } + if (consumer.isTimedOut()) { + ++timeout; + return; + } + CPPUNIT_ASSERT(consumer.isOk()); + CPPUNIT_ASSERT(pc.available() > 0); + consumer.confirm(); + consumed++; + } + + void consume() { + ProducerConsumer::ConsumerLock consumer(pc); + consumeInternal(consumer); + }; + + void consumeTimeout(const Time& timeout) { + ProducerConsumer::ConsumerLock consumer(pc, timeout); + consumeInternal(consumer); + }; + + void produce() { + ProducerConsumer::ProducerLock producer(pc); + CPPUNIT_ASSERT(producer.isOk()); + producer.confirm(); + produced++; + } + + void join(vector<Thread>& threads) { + for_each(threads.begin(), threads.end(), bind(&Thread::join,_1)); + } + + vector<Thread> startThreads(size_t n, Runnable& runnable) { + vector<Thread> threads(n); + while (n > 0) + threads[--n] = Thread(runnable); + return threads; + } + +public: + ProducerConsumerTest() : client(highestProtocolVersion) {} + + void testProduceConsume() { + ConsumeRunnable runMe(*this); + produce(); + produce(); + CPPUNIT_ASSERT(produced.waitFor(2)); + vector<Thread> threads = startThreads(1, runMe); + CPPUNIT_ASSERT(consumed.waitFor(1)); + join(threads); + + threads = startThreads(1, runMe); + CPPUNIT_ASSERT(consumed.waitFor(2)); + join(threads); + + threads = startThreads(3, runMe); + produce(); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(4)); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(5)); + join(threads); + CPPUNIT_ASSERT_EQUAL(0, int(stopped)); + } + + void testTimeout() { + try { + // 0 timeout no items available throws exception + ProducerConsumer::ConsumerLock consumer(pc, 0); + CPPUNIT_FAIL("Expected exception"); + } catch(...){} + + produce(); + CPPUNIT_ASSERT(produced.waitFor(1)); + CPPUNIT_ASSERT_EQUAL(1, int(pc.available())); + { + // 0 timeout succeeds if there's an item available. + ProducerConsumer::ConsumerLock consume(pc, 0); + CPPUNIT_ASSERT(consume.isOk()); + consume.confirm(); + } + CPPUNIT_ASSERT_EQUAL(0, int(pc.available())); + + // Produce an item within the timeout. + ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC); + vector<Thread> threads = startThreads(1, runMe); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(1)); + join(threads); + } + + + void testStop() { + ConsumeRunnable runMe(*this); + vector<Thread> threads = startThreads(2, runMe); + while (pc.consumers() != 2) + Thread::yield(); + pc.stop(); + CPPUNIT_ASSERT(stopped.waitFor(2)); + join(threads); + + threads = startThreads(1, runMe); // Should stop immediately. + CPPUNIT_ASSERT(stopped.waitFor(3)); + join(threads); + + // Produce/consume while stopped should return isStopped and + // throw on confirm. + try { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(pc.isStopped()); + CPPUNIT_FAIL("Expected exception"); + } + catch (...) {} // Expected + try { + ProducerConsumer::ConsumerLock c(pc); + CPPUNIT_ASSERT(pc.isStopped()); + CPPUNIT_FAIL("Expected exception"); + } + catch (...) {} // Expected + } + + void testCancel() { + CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); + { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(p.isOk()); + p.cancel(); + } + // Nothing was produced. + CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); + { + ProducerConsumer::ConsumerLock c(pc, 0); + CPPUNIT_ASSERT(c.isTimedOut()); + } + // Now produce but cancel the consume + { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(p.isOk()); + p.confirm(); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); + { + ProducerConsumer::ConsumerLock c(pc); + CPPUNIT_ASSERT(c.isOk()); + c.cancel(); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest); + |
