diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
| commit | 876d0b94c37f252b08c81656386100fad18a8a46 (patch) | |
| tree | 4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/tests/ClientChannelTest.cpp | |
| parent | c36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff) | |
| download | qpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz | |
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness.
* cpp/tests/*: MessageListener const change.
* cpp/lib/broker/Content.h: Removed out-of-date FIXME comments.
* cpp/lib/client/ClientChannel.h/ .cpp():
- added locking for consumers map and other member access.
- refactored implementations of Basic get, deliver, return:
most logic now encapsulted in IncomingMessage class.
- fix channel close problems.
* cpp/lib/client/ClientMessage.h/.cpp:
- const correctness & API convenience fixes.
- getMethod/setMethod/getHeader: for new IncomingMessage
* cpp/lib/client/Connection.h/.cpp:
- Fixes to channel closure.
* cpp/lib/client/IncomingMessage.h/.cpp:
- Encapsulate *all* incoming message handling for client.
- Moved handling of BasicGetOk to IncomingMessage to fix race.
- Thread safety fixes.
* cpp/lib/client/ResponseHandler.h/.cpp:
- added getResponse for ClientChannel.
* cpp/lib/common/Exception.h:
- added missing throwSelf implementations.
- added ShutdownException as general purpose shut-down indicator.
- added EmptyException as general purpose "empty" indicator.
* cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp:
- Condition variable abstraction extracted from Monitor for situations
where a single lock is associated with multiple conditions.
* cpp/tests/ClientChannelTest.cpp:
- Test incoming message transfer, get, consume etc.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests/ClientChannelTest.cpp')
| -rw-r--r-- | cpp/tests/ClientChannelTest.cpp | 110 |
1 files changed, 96 insertions, 14 deletions
diff --git a/cpp/tests/ClientChannelTest.cpp b/cpp/tests/ClientChannelTest.cpp index e1eec4402d..9ba67ef04c 100644 --- a/cpp/tests/ClientChannelTest.cpp +++ b/cpp/tests/ClientChannelTest.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include <vector> #include "qpid_test_plugin.h" #include "InProcessBroker.h" #include "ClientChannel.h" @@ -28,6 +29,7 @@ using namespace std; using namespace boost; using namespace qpid::client; +using namespace qpid::sys; using namespace qpid::framing; /** @@ -36,45 +38,125 @@ using namespace qpid::framing; class ClientChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ClientChannelTest); - CPPUNIT_TEST(testGet); - CPPUNIT_TEST(testConsume); + CPPUNIT_TEST(testPublishGet); + CPPUNIT_TEST(testGetNoContent); + CPPUNIT_TEST(testConsumeCancel); + CPPUNIT_TEST(testConsumePublished); CPPUNIT_TEST_SUITE_END(); + struct Listener: public qpid::client::MessageListener { + vector<Message> messages; + Monitor monitor; + void received(Message& msg) { + Mutex::ScopedLock l(monitor); + messages.push_back(msg); + monitor.notifyAll(); + } + }; + InProcessBrokerClient connection; // client::connection + local broker Channel channel; - const std::string key; + const std::string qname; const std::string data; Queue queue; Exchange exchange; + Listener listener; public: ClientChannelTest() - : key("testq"), data("hello"), - queue(key, true), exchange("", Exchange::DIRECT_EXCHANGE) + : qname("testq"), data("hello"), + queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) { connection.openChannel(channel); CPPUNIT_ASSERT(channel.getId() != 0); channel.declareQueue(queue); } - void testGet() { - // FIXME aconway 2007-02-16: Must fix thread safety bug - // in ClientChannel::get for this to pass. - return; - + void testPublishGet() { Message pubMsg(data); - channel.publish(pubMsg, exchange, key); + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); Message getMsg; - channel.get(getMsg, queue); + CPPUNIT_ASSERT(channel.get(getMsg, queue)); CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); + CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue } - void testConsume() { + void testGetNoContent() { + Message pubMsg; + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + CPPUNIT_ASSERT(getMsg.getData().empty()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); } + + void testConsumeCancel() { + string tag; // Broker assigned + channel.consume(queue, tag, &listener); + channel.start(); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.publish(Message("a"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + Time deadline(now() + 1*TIME_SEC); + while (listener.messages.size() != 1) { + CPPUNIT_ASSERT(listener.monitor.wait(deadline)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData()); + + channel.publish(Message("b"), exchange, qname); + channel.publish(Message("c"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 3) { + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData()); + CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData()); + channel.cancel(tag); + channel.publish(Message("d"), exchange, qname); + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + { + Mutex::ScopedLock l(listener.monitor); + CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2)); + } + Message msg; + CPPUNIT_ASSERT(channel.get(msg, queue)); + CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData()); + } + + // Consume already-published messages + void testConsumePublished() { + Message pubMsg("x"); + pubMsg.getHeaders().setString("y", "z"); + channel.publish(pubMsg, exchange, qname); + string tag; + channel.consume(queue, tag, &listener); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.start(); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 1) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(string("z"), + listener.messages[0].getHeaders().getString("y")); + } - // FIXME aconway 2007-02-15: Cover full channel API + + }; // Make this test suite a plugin. |
