From 876d0b94c37f252b08c81656386100fad18a8a46 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 21 Feb 2007 19:25:45 +0000 Subject: Thread safety fixes for race conditions on incoming messages. * cpp/lib/client/MessageListener.h: const correctness. * cpp/tests/*: MessageListener const change. * cpp/lib/broker/Content.h: Removed out-of-date FIXME comments. * cpp/lib/client/ClientChannel.h/ .cpp(): - added locking for consumers map and other member access. - refactored implementations of Basic get, deliver, return: most logic now encapsulted in IncomingMessage class. - fix channel close problems. * cpp/lib/client/ClientMessage.h/.cpp: - const correctness & API convenience fixes. - getMethod/setMethod/getHeader: for new IncomingMessage * cpp/lib/client/Connection.h/.cpp: - Fixes to channel closure. * cpp/lib/client/IncomingMessage.h/.cpp: - Encapsulate *all* incoming message handling for client. - Moved handling of BasicGetOk to IncomingMessage to fix race. - Thread safety fixes. * cpp/lib/client/ResponseHandler.h/.cpp: - added getResponse for ClientChannel. * cpp/lib/common/Exception.h: - added missing throwSelf implementations. - added ShutdownException as general purpose shut-down indicator. - added EmptyException as general purpose "empty" indicator. * cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp: - Condition variable abstraction extracted from Monitor for situations where a single lock is associated with multiple conditions. * cpp/tests/ClientChannelTest.cpp: - Test incoming message transfer, get, consume etc. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/tests/ClientChannelTest.cpp | 110 +++++++++++++++++++++++++++++++++++----- 1 file changed, 96 insertions(+), 14 deletions(-) (limited to 'cpp/tests/ClientChannelTest.cpp') diff --git a/cpp/tests/ClientChannelTest.cpp b/cpp/tests/ClientChannelTest.cpp index e1eec4402d..9ba67ef04c 100644 --- a/cpp/tests/ClientChannelTest.cpp +++ b/cpp/tests/ClientChannelTest.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include #include "qpid_test_plugin.h" #include "InProcessBroker.h" #include "ClientChannel.h" @@ -28,6 +29,7 @@ using namespace std; using namespace boost; using namespace qpid::client; +using namespace qpid::sys; using namespace qpid::framing; /** @@ -36,45 +38,125 @@ using namespace qpid::framing; class ClientChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ClientChannelTest); - CPPUNIT_TEST(testGet); - CPPUNIT_TEST(testConsume); + CPPUNIT_TEST(testPublishGet); + CPPUNIT_TEST(testGetNoContent); + CPPUNIT_TEST(testConsumeCancel); + CPPUNIT_TEST(testConsumePublished); CPPUNIT_TEST_SUITE_END(); + struct Listener: public qpid::client::MessageListener { + vector messages; + Monitor monitor; + void received(Message& msg) { + Mutex::ScopedLock l(monitor); + messages.push_back(msg); + monitor.notifyAll(); + } + }; + InProcessBrokerClient connection; // client::connection + local broker Channel channel; - const std::string key; + const std::string qname; const std::string data; Queue queue; Exchange exchange; + Listener listener; public: ClientChannelTest() - : key("testq"), data("hello"), - queue(key, true), exchange("", Exchange::DIRECT_EXCHANGE) + : qname("testq"), data("hello"), + queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) { connection.openChannel(channel); CPPUNIT_ASSERT(channel.getId() != 0); channel.declareQueue(queue); } - void testGet() { - // FIXME aconway 2007-02-16: Must fix thread safety bug - // in ClientChannel::get for this to pass. - return; - + void testPublishGet() { Message pubMsg(data); - channel.publish(pubMsg, exchange, key); + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); Message getMsg; - channel.get(getMsg, queue); + CPPUNIT_ASSERT(channel.get(getMsg, queue)); CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); + CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue } - void testConsume() { + void testGetNoContent() { + Message pubMsg; + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + CPPUNIT_ASSERT(getMsg.getData().empty()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); } + + void testConsumeCancel() { + string tag; // Broker assigned + channel.consume(queue, tag, &listener); + channel.start(); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.publish(Message("a"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + Time deadline(now() + 1*TIME_SEC); + while (listener.messages.size() != 1) { + CPPUNIT_ASSERT(listener.monitor.wait(deadline)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData()); + + channel.publish(Message("b"), exchange, qname); + channel.publish(Message("c"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 3) { + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData()); + CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData()); + channel.cancel(tag); + channel.publish(Message("d"), exchange, qname); + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + { + Mutex::ScopedLock l(listener.monitor); + CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2)); + } + Message msg; + CPPUNIT_ASSERT(channel.get(msg, queue)); + CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData()); + } + + // Consume already-published messages + void testConsumePublished() { + Message pubMsg("x"); + pubMsg.getHeaders().setString("y", "z"); + channel.publish(pubMsg, exchange, qname); + string tag; + channel.consume(queue, tag, &listener); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.start(); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 1) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(string("z"), + listener.messages[0].getHeaders().getString("y")); + } - // FIXME aconway 2007-02-15: Cover full channel API + + }; // Make this test suite a plugin. -- cgit v1.2.1