From 5d2b66b3169f0e2fe6ea6e2a52cd4f6d6e687280 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 20 Oct 2008 19:37:06 +0000 Subject: cluster: DumpClient replicates session MessageBuilder. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@706381 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/cluster_test.cpp | 54 +++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) (limited to 'qpid/cpp/src/tests/cluster_test.cpp') diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 99ca5c7161..72440bbe88 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -31,6 +31,7 @@ #include "qpid/framing/AMQBody.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/enum.h" #include "qpid/log/Logger.h" #include @@ -201,6 +202,59 @@ template std::set knownBrokerPorts(T& source, int n=-1) { return s; } +class Sender { + public: + Sender(boost::shared_ptr ci, uint16_t ch) : connection(ci), channel(ch) {} + void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) { + AMQFrame f(body); + f.setChannel(channel); + f.setFirstSegment(firstSeg); + f.setLastSegment(lastSeg); + f.setFirstFrame(firstFrame); + f.setLastFrame(lastFrame); + connection->handle(f); + } + + private: + boost::shared_ptr connection; + uint16_t channel; +}; + +QPID_AUTO_TEST_CASE(testDumpMessageBuilder) { + // Verify that we dump a partially recieved message to a new member. + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); + + // Send first 2 frames of message. + MessageTransferBody transfer( + ProtocolVersion(), std::string(), // default exchange. + framing::message::ACCEPT_MODE_NONE, + framing::message::ACQUIRE_MODE_PRE_ACQUIRED); + sender.send(transfer, true, false, true, true); + AMQHeaderBody header; + header.get(true)->setRoutingKey("q"); + sender.send(header, false, false, true, true); + + // No reliable way to ensure the partial message has arrived + // before we start the new broker, so we sleep. + ::usleep(250); + cluster.add(); + + // Send final 2 frames of message. + sender.send(AMQContentBody("ab"), false, true, true, false); + sender.send(AMQContentBody("cd"), false, true, false, true); + + // Verify message is enqued correctly on second member. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "abcd"); + + BOOST_CHECK_EQUAL(2u, getGlobalCluster().getUrls().size()); +} + QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); -- cgit v1.2.1