summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/Cluster.cpp25
-rw-r--r--cpp/src/tests/Cluster.h30
-rw-r--r--cpp/src/tests/Cluster_child.cpp21
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/ProducerConsumerTest.cpp284
-rw-r--r--cpp/src/tests/cluster.mk1
6 files changed, 33 insertions, 331 deletions
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index b22f312038..5ace48b736 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -19,8 +19,8 @@
#include "Cluster.h"
#include "test_tools.h"
-#include "qpid/framing/ChannelPingBody.h"
-#include "qpid/framing/ChannelOkBody.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
#include "qpid/cluster/ClassifierHandler.h"
#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
@@ -33,16 +33,16 @@ static const ProtocolVersion VER;
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ AMQFrame frame(VER, 1, new SessionPingBody(VER));
Uuid id(true);
SessionFrame send(id, frame, true);
cluster.handle(send);
- BOOST_REQUIRE(cluster.received.waitFor(1));
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
- SessionFrame& sf=cluster.received[0];
BOOST_CHECK(sf.isIncoming);
BOOST_CHECK_EQUAL(id, sf.uuid);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
BOOST_CHECK_EQUAL(1u, cluster.size());
Cluster::MemberList members = cluster.getMembers();
@@ -65,17 +65,18 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ AMQFrame frame(VER, 1, new SessionPingBody(VER));
Uuid id(true);
SessionFrame send(id, frame, true);
cluster.handle(send);
- BOOST_REQUIRE(cluster.received.waitFor(1));
- SessionFrame& sf=cluster.received[0];
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
BOOST_CHECK_EQUAL(id, sf.uuid);
BOOST_CHECK(sf.isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *sf.frame.getBody());
- BOOST_REQUIRE(cluster.received.waitFor(2));
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody());
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
+
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
+ BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
if (!nofork) {
// Wait for child to exit.
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
index e896fccafe..02e642f641 100644
--- a/cpp/src/tests/Cluster.h
+++ b/cpp/src/tests/Cluster.h
@@ -20,16 +20,13 @@
*/
#include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ConcurrentQueue.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelOkBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
#include <boost/test/test_tools.hpp>
#include <iostream>
-#include <vector>
#include <functional>
/**
@@ -48,26 +45,12 @@ using namespace boost;
void null_deleter(void*) {}
template <class T>
-class TestHandler : public Handler<T&>, public vector<T>
+class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
{
- Monitor lock;
-
public:
- void handle(T& frame) {
- Mutex::ScopedLock l(lock);
- push_back(frame);
- BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size());
- lock.notifyAll();
- }
-
- bool waitFor(size_t n) {
- Mutex::ScopedLock l(lock);
- BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size());
- AbsTime deadline(now(), 2*TIME_SEC);
- while (this->size() < n && lock.wait(deadline))
- ;
- return this->size() >= n;
- }
+ void handle(T& frame) { push(frame); }
+ bool waitPop(T& x) { return waitPop(x, TIME_SEC); }
+ using ConcurrentQueue<T>::waitPop;
};
typedef TestHandler<AMQFrame> TestFrameHandler;
@@ -83,7 +66,8 @@ struct TestCluster : public Cluster
/** Wait for cluster to be of size n. */
bool waitFor(size_t n) {
BOOST_CHECKPOINT("About to call Cluster::wait");
- return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n));
+ return wait(boost::bind(
+ equal_to<size_t>(), bind(&Cluster::size,this), n));
}
TestSessionFrameHandler received;
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index c509dc1950..fd4eb42e7b 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -20,6 +20,8 @@
#include "Cluster.h"
#include "test_tools.h"
+#include "qpid/framing/SessionPingBody.h"
+#include "qpid/framing/SessionPongBody.h"
using namespace std;
using namespace qpid;
@@ -33,17 +35,18 @@ static const ProtocolVersion VER;
/** Chlid part of Cluster::clusterTwo test */
void clusterTwo() {
TestCluster cluster("clusterTwo", "amqp:child:2");
- BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent.
- BOOST_CHECK(cluster.received[0].isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody());
+ SessionFrame sf;
+ BOOST_REQUIRE(cluster.received.waitPop(sf)); // Frame from parent.
+ BOOST_CHECK(sf.isIncoming);
+ BOOST_CHECK_TYPEID_EQUAL(SessionPingBody, *sf.frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame frame(VER, 1, new ChannelOkBody(VER));
- SessionFrame sf(cluster.received[0].uuid, frame, false);
- cluster.handle(sf);
- BOOST_REQUIRE(cluster.received.waitFor(2));
- BOOST_CHECK(!cluster.received[1].isIncoming);
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody());
+ AMQFrame frame(VER, 1, new SessionPongBody(VER));
+ SessionFrame sendframe(sf.uuid, frame, false);
+ cluster.handle(sendframe);
+ BOOST_REQUIRE(cluster.received.waitPop(sf));
+ BOOST_CHECK(!sf.isIncoming);
+ BOOST_CHECK_TYPEID_EQUAL(SessionPongBody, *sf.frame.getBody());
}
int test_main(int, char**) {
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 4f1e7e1ec3..edb4f5b375 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -87,9 +87,6 @@ framing_unit_tests = \
HeaderTest \
SequenceNumberTest
-misc_unit_tests = \
- ProducerConsumerTest
-
posix_unit_tests = \
EventChannelTest \
EventChannelThreadsTest
diff --git a/cpp/src/tests/ProducerConsumerTest.cpp b/cpp/src/tests/ProducerConsumerTest.cpp
deleted file mode 100644
index 789e365a85..0000000000
--- a/cpp/src/tests/ProducerConsumerTest.cpp
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <vector>
-#include <iostream>
-
-#include <boost/bind.hpp>
-
-#include "qpid_test_plugin.h"
-#include "InProcessBroker.h"
-#include "qpid/sys/ProducerConsumer.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/sys/AtomicCount.h"
-
-using namespace qpid;
-using namespace sys;
-using namespace framing;
-using namespace boost;
-using namespace std;
-
-/** A counter that notifies a monitor when changed */
-class WatchedCounter : public Monitor {
- public:
- WatchedCounter(int i=0) : count(i) {}
- WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
-
- WatchedCounter& operator=(const WatchedCounter& x) {
- return *this = int(x);
- }
-
- WatchedCounter& operator=(int i) {
- Lock l(*this);
- count = i;
- return *this;
- }
-
- int operator++() {
- Lock l(*this);
- notifyAll();
- return ++count;
- }
-
- int operator++(int) {
- Lock l(*this);
- notifyAll();
- return count++;
- }
-
- bool operator==(int i) const {
- Lock l(const_cast<WatchedCounter&>(*this));
- return i == count;
- }
-
- operator int() const {
- Lock l(const_cast<WatchedCounter&>(*this));
- return count;
- }
-
- bool waitFor(int i, Duration timeout=TIME_SEC) {
- Lock l(*this);
- AbsTime deadline(now(), timeout);
- while (count != i) {
- if (!wait(deadline))
- return false;
- }
- assert(count == i);
- return true;
- }
-
- private:
- typedef Mutex::ScopedLock Lock;
- int count;
-};
-
-class ProducerConsumerTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ProducerConsumerTest);
- CPPUNIT_TEST(testProduceConsume);
- CPPUNIT_TEST(testTimeout);
- CPPUNIT_TEST(testShutdown);
- CPPUNIT_TEST(testCancel);
- CPPUNIT_TEST_SUITE_END();
-
- public:
- client::InProcessBrokerClient client;
- ProducerConsumer pc;
-
- WatchedCounter shutdown;
- WatchedCounter timeout;
- WatchedCounter consumed;
- WatchedCounter produced;
-
- struct ConsumeRunnable : public Runnable {
- ProducerConsumerTest& test;
- ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
- void run() { test.consume(); }
- };
-
- struct ConsumeTimeoutRunnable : public Runnable {
- ProducerConsumerTest& test;
- Duration timeout;
- ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Duration& t)
- : test(test_), timeout(t) {}
- void run() { test.consumeTimeout(timeout); }
- };
-
-
- void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
- if (pc.isShutdown()) {
- ++shutdown;
- return;
- }
- if (consumer.isTimedOut()) {
- ++timeout;
- return;
- }
- CPPUNIT_ASSERT(consumer.isOk());
- CPPUNIT_ASSERT(pc.available() > 0);
- consumer.confirm();
- consumed++;
- }
-
- void consume() {
- ProducerConsumer::ConsumerLock consumer(pc);
- consumeInternal(consumer);
- };
-
- void consumeTimeout(const Duration& timeout) {
- ProducerConsumer::ConsumerLock consumer(pc, timeout);
- consumeInternal(consumer);
- };
-
- void produce() {
- ProducerConsumer::ProducerLock producer(pc);
- CPPUNIT_ASSERT(producer.isOk());
- producer.confirm();
- produced++;
- }
-
- void join(vector<Thread>& threads) {
- for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
- }
-
- vector<Thread> startThreads(size_t n, Runnable& runnable) {
- vector<Thread> threads(n);
- while (n > 0)
- threads[--n] = Thread(runnable);
- return threads;
- }
-
-public:
- ProducerConsumerTest() : client() {}
-
- void testProduceConsume() {
- ConsumeRunnable runMe(*this);
- produce();
- produce();
- CPPUNIT_ASSERT(produced.waitFor(2));
- vector<Thread> threads = startThreads(1, runMe);
- CPPUNIT_ASSERT(consumed.waitFor(1));
- join(threads);
-
- threads = startThreads(1, runMe);
- CPPUNIT_ASSERT(consumed.waitFor(2));
- join(threads);
-
- threads = startThreads(3, runMe);
- produce();
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(4));
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(5));
- join(threads);
- CPPUNIT_ASSERT_EQUAL(0, int(shutdown));
- }
-
- void testTimeout() {
- try {
- // 0 timeout no items available throws exception
- ProducerConsumer::ConsumerLock consumer(pc, 0);
- CPPUNIT_FAIL("Expected exception");
- } catch(...){}
-
- produce();
- CPPUNIT_ASSERT(produced.waitFor(1));
- CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
- {
- // 0 timeout succeeds if there's an item available.
- ProducerConsumer::ConsumerLock consume(pc, 0);
- CPPUNIT_ASSERT(consume.isOk());
- consume.confirm();
- }
- CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
-
- // Produce an item within the timeout.
- ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
- vector<Thread> threads = startThreads(1, runMe);
- produce();
- CPPUNIT_ASSERT(consumed.waitFor(1));
- join(threads);
- }
-
-
- void testShutdown() {
- ConsumeRunnable runMe(*this);
- vector<Thread> threads = startThreads(2, runMe);
- while (pc.consumers() != 2)
- Thread::yield();
- pc.shutdown();
- CPPUNIT_ASSERT(shutdown.waitFor(2));
- join(threads);
-
- threads = startThreads(1, runMe); // Should shutdown immediately.
- CPPUNIT_ASSERT(shutdown.waitFor(3));
- join(threads);
-
- // Produce/consume while shutdown should return isShutdown and
- // throw on confirm.
- try {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(pc.isShutdown());
- CPPUNIT_FAIL("Expected exception");
- }
- catch (...) {} // Expected
- try {
- ProducerConsumer::ConsumerLock c(pc);
- CPPUNIT_ASSERT(pc.isShutdown());
- CPPUNIT_FAIL("Expected exception");
- }
- catch (...) {} // Expected
- }
-
- void testCancel() {
- CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
- {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(p.isOk());
- p.cancel();
- }
- // Nothing was produced.
- CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
- {
- ProducerConsumer::ConsumerLock c(pc, 0);
- CPPUNIT_ASSERT(c.isTimedOut());
- }
- // Now produce but cancel the consume
- {
- ProducerConsumer::ProducerLock p(pc);
- CPPUNIT_ASSERT(p.isOk());
- p.confirm();
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
- {
- ProducerConsumer::ConsumerLock c(pc);
- CPPUNIT_ASSERT(c.isOk());
- c.cancel();
- }
- CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
- }
-};
-
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);
-
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 765aa02eb0..68725758fa 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -20,6 +20,7 @@ check_PROGRAMS+=Cpg
Cpg_SOURCES=Cpg.cpp
Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
+# TODO aconway 2007-07-26: Fix this test.
#TESTS+=Cluster
check_PROGRAMS+=Cluster
Cluster_SOURCES=Cluster.cpp Cluster.h