summaryrefslogtreecommitdiff
path: root/cpp/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-14 15:02:10 +0000
committerAlan Conway <aconway@apache.org>2007-02-14 15:02:10 +0000
commit8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec (patch)
treef43ed248e99d67639f4330e604a0ed718f736d22 /cpp/tests
parent20a5f81e8bbf8d4429d55fffb47278e7ade81c17 (diff)
downloadqpid-python-8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec.tar.gz
* cpp/lib/common/sys/ProducerConsumer.h:
General-purpose producer-consumer synchronization. Anywhere we have producer/consumer threads in qpid we should re-use this sync object rather than re-inventing the synchronization each time. * cpp/lib/common/sys/AtomicCount.h: Separated ScopedIncrement/ScopedDecrement into ScopedIncrement.h * cpp/tests/InProcessBroker.h: Added class InProcessBrokerClient, a self contained in-process client + broker convenience for tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests')
-rw-r--r--cpp/tests/InProcessBroker.h167
-rw-r--r--cpp/tests/Makefile.am5
-rw-r--r--cpp/tests/ProducerConsumerTest.cpp283
3 files changed, 304 insertions, 151 deletions
diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h
index cf2b9df8b0..8ace039bfe 100644
--- a/cpp/tests/InProcessBroker.h
+++ b/cpp/tests/InProcessBroker.h
@@ -26,6 +26,7 @@
#include "broker/Broker.h"
#include "broker/Connection.h"
#include "client/Connector.h"
+#include "client/Connection.h"
namespace qpid {
namespace broker {
@@ -54,6 +55,7 @@ framing::AMQFrame copy(framing::AMQFrame& from) {
class InProcessBroker : public client::Connector {
public:
enum Sender {CLIENT,BROKER};
+
struct Frame : public framing::AMQFrame {
Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
bool fromBroker() const { return from == BROKER; }
@@ -68,7 +70,7 @@ class InProcessBroker : public client::Connector {
};
typedef std::vector<Frame> Conversation;
- InProcessBroker(const framing::ProtocolVersion& ver) :
+ InProcessBroker(framing::ProtocolVersion ver) :
Connector(ver),
protocolInit(ver),
broker(broker::Broker::create()),
@@ -77,6 +79,8 @@ class InProcessBroker : public client::Connector {
clientOut(CLIENT, conversation, &brokerConnection)
{}
+ ~InProcessBroker() { broker->shutdown(); }
+
void connect(const std::string& /*host*/, int /*port*/) {}
void init() { brokerConnection.initiated(&protocolInit); }
void close() {}
@@ -141,157 +145,22 @@ std::ostream& operator<<(
}} // namespace qpid::broker
-#endif /*!_tests_InProcessBroker_h*/
-#ifndef _tests_InProcessBroker_h
-#define _tests_InProcessBroker_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <vector>
-#include <iostream>
-#include <algorithm>
-
-#include "framing/AMQFrame.h"
-#include "broker/Broker.h"
-#include "broker/Connection.h"
-#include "client/Connector.h"
-
-namespace qpid {
-namespace broker {
-
-/** Make a copy of a frame body. Inefficient, only intended for tests. */
-// TODO aconway 2007-01-29: from should be const, need to fix
-// AMQPFrame::encode as const.
-framing::AMQFrame copy(framing::AMQFrame& from) {
- framing::Buffer buffer(from.size());
- from.encode(buffer);
- buffer.flip();
- framing::AMQFrame result;
- result.decode(buffer);
- return result;
-}
-
-/**
- * A broker that implements client::Connector allowing direct
- * in-process connection of client to broker. Used to write round-trip
- * tests without requiring an external broker process.
- *
- * Also allows you to "snoop" on frames exchanged between client & broker.
- *
- * Use as follows:
- *
- \code
- broker::InProcessBroker ibroker(version);
- client::Connection clientConnection;
- clientConnection.setConnector(ibroker);
- clientConnection.open("");
- ... use as normal
- \endcode
- *
- */
-class InProcessBroker : public client::Connector {
+/** An in-process client+broker all in one. */
+class InProcessBrokerClient : public qpid::client::Connection {
public:
- enum Sender {CLIENT,BROKER};
- struct Frame : public framing::AMQFrame {
- Frame(Sender e, const AMQFrame& f) : AMQFrame(f), from(e) {}
- bool fromBroker() const { return from == BROKER; }
- bool fromClient() const { return from == CLIENT; }
-
- template <class MethodType>
- MethodType* asMethod() {
- return dynamic_cast<MethodType*>(getBody().get());
- }
-
- Sender from;
- };
- typedef std::vector<Frame> Conversation;
-
- InProcessBroker(const framing::ProtocolVersion& ver) :
- Connector(ver),
- protocolInit(ver),
- broker(broker::Broker::create()),
- brokerOut(BROKER, conversation),
- brokerConnection(&brokerOut, *broker),
- clientOut(CLIENT, conversation, &brokerConnection)
- {}
-
- void connect(const std::string& /*host*/, int /*port*/) {}
- void init() { brokerConnection.initiated(&protocolInit); }
- void close() {}
-
- /** Client's input handler. */
- void setInputHandler(framing::InputHandler* handler) {
- brokerOut.in = handler;
+ qpid::broker::InProcessBroker broker;
+
+ /** Constructor creates broker and opens client connection. */
+ InProcessBrokerClient(qpid::framing::ProtocolVersion version)
+ : broker(version)
+ {
+ setConnector(broker);
+ open("");
}
- /** Called by client to send a frame */
- void send(framing::AMQFrame* frame) {
- clientOut.send(frame);
+ ~InProcessBrokerClient() {
+ close(); // close before broker is deleted.
}
-
- /** Entire client-broker conversation is recorded here */
- Conversation conversation;
-
- private:
- /** OutputHandler that forwards data to an InputHandler */
- struct OutputToInputHandler : public sys::ConnectionOutputHandler {
- OutputToInputHandler(
- Sender from_, Conversation& conversation_,
- framing::InputHandler* ih=0
- ) : from(from_), conversation(conversation_), in(ih) {}
-
- void send(framing::AMQFrame* frame) {
- conversation.push_back(Frame(from, copy(*frame)));
- in->received(frame);
- }
-
- void close() {}
-
- Sender from;
- Conversation& conversation;
- framing::InputHandler* in;
- };
-
- framing::ProtocolInitiation protocolInit;
- Broker::shared_ptr broker;
- OutputToInputHandler brokerOut;
- broker::Connection brokerConnection;
- OutputToInputHandler clientOut;
};
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::Frame& frame)
-{
- return out << (frame.fromBroker()? "BROKER: ":"CLIENT: ") <<
- static_cast<const framing::AMQFrame&>(frame);
-}
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::Conversation& conv)
-{
- for (InProcessBroker::Conversation::const_iterator i = conv.begin();
- i != conv.end(); ++i)
- {
- out << *i << std::endl;
- }
- return out;
-}
-
-
-}} // namespace qpid::broker
-
-#endif /*!_tests_InProcessBroker_h*/
+#endif // _tests_InProcessBroker_h
diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am
index 2b51a5b125..768558f219 100644
--- a/cpp/tests/Makefile.am
+++ b/cpp/tests/Makefile.am
@@ -47,14 +47,15 @@ framing_tests = \
HeaderTest
misc_tests = \
- ExceptionTest
+ ExceptionTest \
+ ProducerConsumerTest
posix_tests = \
EventChannelTest \
EventChannelThreadsTest
unit_tests = \
- $(broker_tests) \
+b $(broker_tests) \
$(framing_tests) \
$(misc_tests) \
$(round_trip_tests)
diff --git a/cpp/tests/ProducerConsumerTest.cpp b/cpp/tests/ProducerConsumerTest.cpp
new file mode 100644
index 0000000000..e6d4090596
--- /dev/null
+++ b/cpp/tests/ProducerConsumerTest.cpp
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include <iostream>
+
+#include <boost/bind.hpp>
+
+#include <qpid_test_plugin.h>
+#include "InProcessBroker.h"
+#include "sys/ProducerConsumer.h"
+#include "sys/Thread.h"
+#include "AMQP_HighestVersion.h"
+#include "sys/AtomicCount.h"
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace boost;
+using namespace std;
+
+/** A counter that notifies a monitor when changed */
+class WatchedCounter : public Monitor {
+ public:
+ WatchedCounter(int i=0) : count(i) {}
+ WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
+
+ WatchedCounter& operator=(const WatchedCounter& x) {
+ return *this = int(x);
+ }
+
+ WatchedCounter& operator=(int i) {
+ Lock l(*this);
+ count = i;
+ return *this;
+ }
+
+ int operator++() {
+ Lock l(*this);
+ notifyAll();
+ return ++count;
+ }
+
+ int operator++(int) {
+ Lock l(*this);
+ notifyAll();
+ return count++;
+ }
+
+ bool operator==(int i) const {
+ Lock l(const_cast<WatchedCounter&>(*this));
+ return i == count;
+ }
+
+ operator int() const {
+ Lock l(const_cast<WatchedCounter&>(*this));
+ return count;
+ }
+
+ bool waitFor(int i, Time timeout=TIME_SEC) {
+ Lock l(*this);
+ Time deadline = timeout+now();
+ while (count != i) {
+ if (!wait(deadline))
+ return false;
+ }
+ assert(count == i);
+ return true;
+ }
+
+ private:
+ typedef Mutex::ScopedLock Lock;
+ int count;
+};
+
+class ProducerConsumerTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ProducerConsumerTest);
+ CPPUNIT_TEST(testProduceConsume);
+ CPPUNIT_TEST(testTimeout);
+ CPPUNIT_TEST(testStop);
+ CPPUNIT_TEST(testCancel);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ InProcessBrokerClient client;
+ ProducerConsumer pc;
+
+ WatchedCounter stopped;
+ WatchedCounter timeout;
+ WatchedCounter consumed;
+ WatchedCounter produced;
+
+ struct ConsumeRunnable : public Runnable {
+ ProducerConsumerTest& test;
+ ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
+ void run() { test.consume(); }
+ };
+
+ struct ConsumeTimeoutRunnable : public Runnable {
+ ProducerConsumerTest& test;
+ Time timeout;
+ ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t)
+ : test(test_), timeout(t) {}
+ void run() { test.consumeTimeout(timeout); }
+ };
+
+
+ void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
+ if (pc.isStopped()) {
+ ++stopped;
+ return;
+ }
+ if (consumer.isTimedOut()) {
+ ++timeout;
+ return;
+ }
+ CPPUNIT_ASSERT(consumer.isOk());
+ CPPUNIT_ASSERT(pc.available() > 0);
+ consumer.confirm();
+ consumed++;
+ }
+
+ void consume() {
+ ProducerConsumer::ConsumerLock consumer(pc);
+ consumeInternal(consumer);
+ };
+
+ void consumeTimeout(const Time& timeout) {
+ ProducerConsumer::ConsumerLock consumer(pc, timeout);
+ consumeInternal(consumer);
+ };
+
+ void produce() {
+ ProducerConsumer::ProducerLock producer(pc);
+ CPPUNIT_ASSERT(producer.isOk());
+ producer.confirm();
+ produced++;
+ }
+
+ void join(vector<Thread>& threads) {
+ for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
+ }
+
+ vector<Thread> startThreads(size_t n, Runnable& runnable) {
+ vector<Thread> threads(n);
+ while (n > 0)
+ threads[--n] = Thread(runnable);
+ return threads;
+ }
+
+public:
+ ProducerConsumerTest() : client(highestProtocolVersion) {}
+
+ void testProduceConsume() {
+ ConsumeRunnable runMe(*this);
+ produce();
+ produce();
+ CPPUNIT_ASSERT(produced.waitFor(2));
+ vector<Thread> threads = startThreads(1, runMe);
+ CPPUNIT_ASSERT(consumed.waitFor(1));
+ join(threads);
+
+ threads = startThreads(1, runMe);
+ CPPUNIT_ASSERT(consumed.waitFor(2));
+ join(threads);
+
+ threads = startThreads(3, runMe);
+ produce();
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(4));
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(5));
+ join(threads);
+ CPPUNIT_ASSERT_EQUAL(0, int(stopped));
+ }
+
+ void testTimeout() {
+ try {
+ // 0 timeout no items available throws exception
+ ProducerConsumer::ConsumerLock consumer(pc, 0);
+ CPPUNIT_FAIL("Expected exception");
+ } catch(...){}
+
+ produce();
+ CPPUNIT_ASSERT(produced.waitFor(1));
+ CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
+ {
+ // 0 timeout succeeds if there's an item available.
+ ProducerConsumer::ConsumerLock consume(pc, 0);
+ CPPUNIT_ASSERT(consume.isOk());
+ consume.confirm();
+ }
+ CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
+
+ // Produce an item within the timeout.
+ ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
+ vector<Thread> threads = startThreads(1, runMe);
+ produce();
+ CPPUNIT_ASSERT(consumed.waitFor(1));
+ join(threads);
+ }
+
+
+ void testStop() {
+ ConsumeRunnable runMe(*this);
+ vector<Thread> threads = startThreads(2, runMe);
+ while (pc.consumers() != 2)
+ Thread::yield();
+ pc.stop();
+ CPPUNIT_ASSERT(stopped.waitFor(2));
+ join(threads);
+
+ threads = startThreads(1, runMe); // Should stop immediately.
+ CPPUNIT_ASSERT(stopped.waitFor(3));
+ join(threads);
+
+ // Produce/consume while stopped should return isStopped and
+ // throw on confirm.
+ try {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(pc.isStopped());
+ CPPUNIT_FAIL("Expected exception");
+ }
+ catch (...) {} // Expected
+ try {
+ ProducerConsumer::ConsumerLock c(pc);
+ CPPUNIT_ASSERT(pc.isStopped());
+ CPPUNIT_FAIL("Expected exception");
+ }
+ catch (...) {} // Expected
+ }
+
+ void testCancel() {
+ CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+ {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(p.isOk());
+ p.cancel();
+ }
+ // Nothing was produced.
+ CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
+ {
+ ProducerConsumer::ConsumerLock c(pc, 0);
+ CPPUNIT_ASSERT(c.isTimedOut());
+ }
+ // Now produce but cancel the consume
+ {
+ ProducerConsumer::ProducerLock p(pc);
+ CPPUNIT_ASSERT(p.isOk());
+ p.confirm();
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+ {
+ ProducerConsumer::ConsumerLock c(pc);
+ CPPUNIT_ASSERT(c.isOk());
+ c.cancel();
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
+ }
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);
+