diff options
Diffstat (limited to 'cpp/src/tests')
| -rw-r--r-- | cpp/src/tests/Cluster.cpp | 25 | ||||
| -rw-r--r-- | cpp/src/tests/Cluster.h | 30 | ||||
| -rw-r--r-- | cpp/src/tests/Cluster_child.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/tests/Makefile.am | 3 | ||||
| -rw-r--r-- | cpp/src/tests/ProducerConsumerTest.cpp | 284 | ||||
| -rw-r--r-- | cpp/src/tests/cluster.mk | 1 |
6 files changed, 33 insertions, 331 deletions
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp index b22f312038..5ace48b736 100644 --- a/cpp/src/tests/Cluster.cpp +++ b/cpp/src/tests/Cluster.cpp @@ -19,8 +19,8 @@ #include "Cluster.h" #include "test_tools.h" -#include "qpid/framing/ChannelPingBody.h" -#include "qpid/framing/ChannelOkBody.h" +#include "qpid/framing/SessionPingBody.h" +#include "qpid/framing/SessionPongBody.h" #include "qpid/cluster/ClassifierHandler.h" #define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> @@ -33,16 +33,16 @@ static const ProtocolVersion VER; /** Verify membership in a cluster with one member. */ BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); - AMQFrame frame(VER, 1, new ChannelPingBody(VER)); + AMQFrame frame(VER, 1, new SessionPingBody(VER)); Uuid id(true); SessionFrame send(id, frame, true); cluster.handle(send); - BOOST_REQUIRE(cluster.received.waitFor(1)); + SessionFrame sf; + BOOST_REQUIRE(cluster.received.waitPop(sf)); - SessionFrame& sf=cluster.received[0]; BOOST_CHECK(sf.isIncoming); BOOST_CHECK_EQUAL(id, sf.uuid); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); + BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody()); BOOST_CHECK_EQUAL(1u, cluster.size()); Cluster::MemberList members = cluster.getMembers(); @@ -65,17 +65,18 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. // Exchange frames with child. - AMQFrame frame(VER, 1, new ChannelPingBody(VER)); + AMQFrame frame(VER, 1, new SessionPingBody(VER)); Uuid id(true); SessionFrame send(id, frame, true); cluster.handle(send); - BOOST_REQUIRE(cluster.received.waitFor(1)); - SessionFrame& sf=cluster.received[0]; + SessionFrame sf; + BOOST_REQUIRE(cluster.received.waitPop(sf)); BOOST_CHECK_EQUAL(id, sf.uuid); BOOST_CHECK(sf.isIncoming); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody()); - BOOST_REQUIRE(cluster.received.waitFor(2)); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); + BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody()); + + BOOST_REQUIRE(cluster.received.waitPop(sf)); + BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody()); if (!nofork) { // Wait for child to exit. diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h index e896fccafe..02e642f641 100644 --- a/cpp/src/tests/Cluster.h +++ b/cpp/src/tests/Cluster.h @@ -20,16 +20,13 @@ */ #include "qpid/cluster/Cluster.h" +#include "qpid/sys/ConcurrentQueue.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelOkBody.h" -#include "qpid/framing/BasicGetOkBody.h" -#include "qpid/log/Logger.h" #include <boost/bind.hpp> #include <boost/test/test_tools.hpp> #include <iostream> -#include <vector> #include <functional> /** @@ -48,26 +45,12 @@ using namespace boost; void null_deleter(void*) {} template <class T> -class TestHandler : public Handler<T&>, public vector<T> +class TestHandler : public Handler<T&>, public ConcurrentQueue<T> { - Monitor lock; - public: - void handle(T& frame) { - Mutex::ScopedLock l(lock); - push_back(frame); - BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size()); - lock.notifyAll(); - } - - bool waitFor(size_t n) { - Mutex::ScopedLock l(lock); - BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size()); - AbsTime deadline(now(), 2*TIME_SEC); - while (this->size() < n && lock.wait(deadline)) - ; - return this->size() >= n; - } + void handle(T& frame) { push(frame); } + bool waitPop(T& x) { return waitPop(x, TIME_SEC); } + using ConcurrentQueue<T>::waitPop; }; typedef TestHandler<AMQFrame> TestFrameHandler; @@ -83,7 +66,8 @@ struct TestCluster : public Cluster /** Wait for cluster to be of size n. */ bool waitFor(size_t n) { BOOST_CHECKPOINT("About to call Cluster::wait"); - return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n)); + return wait(boost::bind( + equal_to<size_t>(), bind(&Cluster::size,this), n)); } TestSessionFrameHandler received; diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index c509dc1950..fd4eb42e7b 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -20,6 +20,8 @@ #include "Cluster.h" #include "test_tools.h" +#include "qpid/framing/SessionPingBody.h" +#include "qpid/framing/SessionPongBody.h" using namespace std; using namespace qpid; @@ -33,17 +35,18 @@ static const ProtocolVersion VER; /** Chlid part of Cluster::clusterTwo test */ void clusterTwo() { TestCluster cluster("clusterTwo", "amqp:child:2"); - BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent. - BOOST_CHECK(cluster.received[0].isIncoming); - BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody()); + SessionFrame sf; + BOOST_REQUIRE(cluster.received.waitPop(sf)); // Frame from parent. + BOOST_CHECK(sf.isIncoming); + BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent - AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - SessionFrame sf(cluster.received[0].uuid, frame, false); - cluster.handle(sf); - BOOST_REQUIRE(cluster.received.waitFor(2)); - BOOST_CHECK(!cluster.received[1].isIncoming); - BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); + AMQFrame frame(VER, 1, new SessionPongBody(VER)); + SessionFrame sendframe(sf.uuid, frame, false); + cluster.handle(sendframe); + BOOST_REQUIRE(cluster.received.waitPop(sf)); + BOOST_CHECK(!sf.isIncoming); + BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody()); } int test_main(int, char**) { diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 4f1e7e1ec3..edb4f5b375 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -87,9 +87,6 @@ framing_unit_tests = \ HeaderTest \ SequenceNumberTest -misc_unit_tests = \ - ProducerConsumerTest - posix_unit_tests = \ EventChannelTest \ EventChannelThreadsTest diff --git a/cpp/src/tests/ProducerConsumerTest.cpp b/cpp/src/tests/ProducerConsumerTest.cpp deleted file mode 100644 index 789e365a85..0000000000 --- a/cpp/src/tests/ProducerConsumerTest.cpp +++ /dev/null @@ -1,284 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <vector> -#include <iostream> - -#include <boost/bind.hpp> - -#include "qpid_test_plugin.h" -#include "InProcessBroker.h" -#include "qpid/sys/ProducerConsumer.h" -#include "qpid/sys/Thread.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/sys/AtomicCount.h" - -using namespace qpid; -using namespace sys; -using namespace framing; -using namespace boost; -using namespace std; - -/** A counter that notifies a monitor when changed */ -class WatchedCounter : public Monitor { - public: - WatchedCounter(int i=0) : count(i) {} - WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {} - - WatchedCounter& operator=(const WatchedCounter& x) { - return *this = int(x); - } - - WatchedCounter& operator=(int i) { - Lock l(*this); - count = i; - return *this; - } - - int operator++() { - Lock l(*this); - notifyAll(); - return ++count; - } - - int operator++(int) { - Lock l(*this); - notifyAll(); - return count++; - } - - bool operator==(int i) const { - Lock l(const_cast<WatchedCounter&>(*this)); - return i == count; - } - - operator int() const { - Lock l(const_cast<WatchedCounter&>(*this)); - return count; - } - - bool waitFor(int i, Duration timeout=TIME_SEC) { - Lock l(*this); - AbsTime deadline(now(), timeout); - while (count != i) { - if (!wait(deadline)) - return false; - } - assert(count == i); - return true; - } - - private: - typedef Mutex::ScopedLock Lock; - int count; -}; - -class ProducerConsumerTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ProducerConsumerTest); - CPPUNIT_TEST(testProduceConsume); - CPPUNIT_TEST(testTimeout); - CPPUNIT_TEST(testShutdown); - CPPUNIT_TEST(testCancel); - CPPUNIT_TEST_SUITE_END(); - - public: - client::InProcessBrokerClient client; - ProducerConsumer pc; - - WatchedCounter shutdown; - WatchedCounter timeout; - WatchedCounter consumed; - WatchedCounter produced; - - struct ConsumeRunnable : public Runnable { - ProducerConsumerTest& test; - ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {} - void run() { test.consume(); } - }; - - struct ConsumeTimeoutRunnable : public Runnable { - ProducerConsumerTest& test; - Duration timeout; - ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Duration& t) - : test(test_), timeout(t) {} - void run() { test.consumeTimeout(timeout); } - }; - - - void consumeInternal(ProducerConsumer::ConsumerLock& consumer) { - if (pc.isShutdown()) { - ++shutdown; - return; - } - if (consumer.isTimedOut()) { - ++timeout; - return; - } - CPPUNIT_ASSERT(consumer.isOk()); - CPPUNIT_ASSERT(pc.available() > 0); - consumer.confirm(); - consumed++; - } - - void consume() { - ProducerConsumer::ConsumerLock consumer(pc); - consumeInternal(consumer); - }; - - void consumeTimeout(const Duration& timeout) { - ProducerConsumer::ConsumerLock consumer(pc, timeout); - consumeInternal(consumer); - }; - - void produce() { - ProducerConsumer::ProducerLock producer(pc); - CPPUNIT_ASSERT(producer.isOk()); - producer.confirm(); - produced++; - } - - void join(vector<Thread>& threads) { - for_each(threads.begin(), threads.end(), bind(&Thread::join,_1)); - } - - vector<Thread> startThreads(size_t n, Runnable& runnable) { - vector<Thread> threads(n); - while (n > 0) - threads[--n] = Thread(runnable); - return threads; - } - -public: - ProducerConsumerTest() : client() {} - - void testProduceConsume() { - ConsumeRunnable runMe(*this); - produce(); - produce(); - CPPUNIT_ASSERT(produced.waitFor(2)); - vector<Thread> threads = startThreads(1, runMe); - CPPUNIT_ASSERT(consumed.waitFor(1)); - join(threads); - - threads = startThreads(1, runMe); - CPPUNIT_ASSERT(consumed.waitFor(2)); - join(threads); - - threads = startThreads(3, runMe); - produce(); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(4)); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(5)); - join(threads); - CPPUNIT_ASSERT_EQUAL(0, int(shutdown)); - } - - void testTimeout() { - try { - // 0 timeout no items available throws exception - ProducerConsumer::ConsumerLock consumer(pc, 0); - CPPUNIT_FAIL("Expected exception"); - } catch(...){} - - produce(); - CPPUNIT_ASSERT(produced.waitFor(1)); - CPPUNIT_ASSERT_EQUAL(1, int(pc.available())); - { - // 0 timeout succeeds if there's an item available. - ProducerConsumer::ConsumerLock consume(pc, 0); - CPPUNIT_ASSERT(consume.isOk()); - consume.confirm(); - } - CPPUNIT_ASSERT_EQUAL(0, int(pc.available())); - - // Produce an item within the timeout. - ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC); - vector<Thread> threads = startThreads(1, runMe); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(1)); - join(threads); - } - - - void testShutdown() { - ConsumeRunnable runMe(*this); - vector<Thread> threads = startThreads(2, runMe); - while (pc.consumers() != 2) - Thread::yield(); - pc.shutdown(); - CPPUNIT_ASSERT(shutdown.waitFor(2)); - join(threads); - - threads = startThreads(1, runMe); // Should shutdown immediately. - CPPUNIT_ASSERT(shutdown.waitFor(3)); - join(threads); - - // Produce/consume while shutdown should return isShutdown and - // throw on confirm. - try { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(pc.isShutdown()); - CPPUNIT_FAIL("Expected exception"); - } - catch (...) {} // Expected - try { - ProducerConsumer::ConsumerLock c(pc); - CPPUNIT_ASSERT(pc.isShutdown()); - CPPUNIT_FAIL("Expected exception"); - } - catch (...) {} // Expected - } - - void testCancel() { - CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); - { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(p.isOk()); - p.cancel(); - } - // Nothing was produced. - CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); - { - ProducerConsumer::ConsumerLock c(pc, 0); - CPPUNIT_ASSERT(c.isTimedOut()); - } - // Now produce but cancel the consume - { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(p.isOk()); - p.confirm(); - } - CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); - { - ProducerConsumer::ConsumerLock c(pc); - CPPUNIT_ASSERT(c.isOk()); - c.cancel(); - } - CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest); - diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 765aa02eb0..68725758fa 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -20,6 +20,7 @@ check_PROGRAMS+=Cpg Cpg_SOURCES=Cpg.cpp Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework +# TODO aconway 2007-07-26: Fix this test. #TESTS+=Cluster check_PROGRAMS+=Cluster Cluster_SOURCES=Cluster.cpp Cluster.h |
