summaryrefslogtreecommitdiff
path: root/cpp/tests/ClientChannelTest.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
committerAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
commit876d0b94c37f252b08c81656386100fad18a8a46 (patch)
tree4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/tests/ClientChannelTest.cpp
parentc36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff)
downloadqpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness. * cpp/tests/*: MessageListener const change. * cpp/lib/broker/Content.h: Removed out-of-date FIXME comments. * cpp/lib/client/ClientChannel.h/ .cpp(): - added locking for consumers map and other member access. - refactored implementations of Basic get, deliver, return: most logic now encapsulted in IncomingMessage class. - fix channel close problems. * cpp/lib/client/ClientMessage.h/.cpp: - const correctness & API convenience fixes. - getMethod/setMethod/getHeader: for new IncomingMessage * cpp/lib/client/Connection.h/.cpp: - Fixes to channel closure. * cpp/lib/client/IncomingMessage.h/.cpp: - Encapsulate *all* incoming message handling for client. - Moved handling of BasicGetOk to IncomingMessage to fix race. - Thread safety fixes. * cpp/lib/client/ResponseHandler.h/.cpp: - added getResponse for ClientChannel. * cpp/lib/common/Exception.h: - added missing throwSelf implementations. - added ShutdownException as general purpose shut-down indicator. - added EmptyException as general purpose "empty" indicator. * cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp: - Condition variable abstraction extracted from Monitor for situations where a single lock is associated with multiple conditions. * cpp/tests/ClientChannelTest.cpp: - Test incoming message transfer, get, consume etc. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests/ClientChannelTest.cpp')
-rw-r--r--cpp/tests/ClientChannelTest.cpp110
1 files changed, 96 insertions, 14 deletions
diff --git a/cpp/tests/ClientChannelTest.cpp b/cpp/tests/ClientChannelTest.cpp
index e1eec4402d..9ba67ef04c 100644
--- a/cpp/tests/ClientChannelTest.cpp
+++ b/cpp/tests/ClientChannelTest.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <vector>
#include "qpid_test_plugin.h"
#include "InProcessBroker.h"
#include "ClientChannel.h"
@@ -28,6 +29,7 @@
using namespace std;
using namespace boost;
using namespace qpid::client;
+using namespace qpid::sys;
using namespace qpid::framing;
/**
@@ -36,45 +38,125 @@ using namespace qpid::framing;
class ClientChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ClientChannelTest);
- CPPUNIT_TEST(testGet);
- CPPUNIT_TEST(testConsume);
+ CPPUNIT_TEST(testPublishGet);
+ CPPUNIT_TEST(testGetNoContent);
+ CPPUNIT_TEST(testConsumeCancel);
+ CPPUNIT_TEST(testConsumePublished);
CPPUNIT_TEST_SUITE_END();
+ struct Listener: public qpid::client::MessageListener {
+ vector<Message> messages;
+ Monitor monitor;
+ void received(Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ messages.push_back(msg);
+ monitor.notifyAll();
+ }
+ };
+
InProcessBrokerClient connection; // client::connection + local broker
Channel channel;
- const std::string key;
+ const std::string qname;
const std::string data;
Queue queue;
Exchange exchange;
+ Listener listener;
public:
ClientChannelTest()
- : key("testq"), data("hello"),
- queue(key, true), exchange("", Exchange::DIRECT_EXCHANGE)
+ : qname("testq"), data("hello"),
+ queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
{
connection.openChannel(channel);
CPPUNIT_ASSERT(channel.getId() != 0);
channel.declareQueue(queue);
}
- void testGet() {
- // FIXME aconway 2007-02-16: Must fix thread safety bug
- // in ClientChannel::get for this to pass.
- return;
-
+ void testPublishGet() {
Message pubMsg(data);
- channel.publish(pubMsg, exchange, key);
+ pubMsg.getHeaders().setString("hello", "world");
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- channel.get(getMsg, queue);
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
+ CPPUNIT_ASSERT_EQUAL(string("world"),
+ getMsg.getHeaders().getString("hello"));
+ CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
}
- void testConsume() {
+ void testGetNoContent() {
+ Message pubMsg;
+ pubMsg.getHeaders().setString("hello", "world");
+ channel.publish(pubMsg, exchange, qname);
+ Message getMsg;
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ CPPUNIT_ASSERT(getMsg.getData().empty());
+ CPPUNIT_ASSERT_EQUAL(string("world"),
+ getMsg.getHeaders().getString("hello"));
}
+
+ void testConsumeCancel() {
+ string tag; // Broker assigned
+ channel.consume(queue, tag, &listener);
+ channel.start();
+ CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
+ channel.publish(Message("a"), exchange, qname);
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ Time deadline(now() + 1*TIME_SEC);
+ while (listener.messages.size() != 1) {
+ CPPUNIT_ASSERT(listener.monitor.wait(deadline));
+ }
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
+ CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
+
+ channel.publish(Message("b"), exchange, qname);
+ channel.publish(Message("c"), exchange, qname);
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ while (listener.messages.size() != 3) {
+ CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+ }
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
+ CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
+ CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
+ channel.cancel(tag);
+ channel.publish(Message("d"), exchange, qname);
+ CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
+ }
+ Message msg;
+ CPPUNIT_ASSERT(channel.get(msg, queue));
+ CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
+ }
+
+ // Consume already-published messages
+ void testConsumePublished() {
+ Message pubMsg("x");
+ pubMsg.getHeaders().setString("y", "z");
+ channel.publish(pubMsg, exchange, qname);
+ string tag;
+ channel.consume(queue, tag, &listener);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
+ channel.start();
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ while (listener.messages.size() != 1)
+ CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+ }
+ CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData());
+ CPPUNIT_ASSERT_EQUAL(string("z"),
+ listener.messages[0].getHeaders().getString("y"));
+ }
- // FIXME aconway 2007-02-15: Cover full channel API
+
+
};
// Make this test suite a plugin.