summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp97
-rw-r--r--cpp/src/tests/Cluster.cpp8
-rw-r--r--cpp/src/tests/Cluster_child.cpp2
-rw-r--r--cpp/src/tests/ExchangeTest.cpp3
-rw-r--r--cpp/src/tests/FramingTest.cpp5
-rw-r--r--cpp/src/tests/HeaderTest.cpp27
-rw-r--r--cpp/src/tests/InMemoryContentTest.cpp91
-rw-r--r--cpp/src/tests/LazyLoadedContentTest.cpp113
-rw-r--r--cpp/src/tests/Makefile.am4
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp284
-rw-r--r--cpp/src/tests/MessageTest.cpp65
-rw-r--r--cpp/src/tests/MessageUtils.h53
-rw-r--r--cpp/src/tests/QueueTest.cpp9
-rw-r--r--cpp/src/tests/ReferenceTest.cpp94
-rw-r--r--cpp/src/tests/TxAckTest.cpp12
-rw-r--r--cpp/src/tests/TxPublishTest.cpp7
16 files changed, 302 insertions, 572 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 3253a3d27a..1e5a30f157 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -19,12 +19,14 @@
*
*/
#include "qpid/broker/BrokerChannel.h"
-#include "qpid/broker/BrokerMessage.h"
#include "qpid/broker/BrokerQueue.h"
#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageDelivery.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid_test_plugin.h"
#include <iostream>
+#include <sstream>
#include <memory>
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
@@ -72,7 +74,6 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_TEST_SUITE(BrokerChannelTest);
CPPUNIT_TEST(testConsumerMgmt);;
CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue);
@@ -155,7 +156,16 @@ class BrokerChannelTest : public CppUnit::TestCase
void check()
{
- CPPUNIT_ASSERT(expected.empty());
+ if (!expected.empty()) {
+ std::stringstream error;
+ error << "Expected: ";
+ while (!expected.empty()) {
+ MethodCall& m = expected.front();
+ error << m.name << "(" << m.msg << ", '" << m.data << "'); ";
+ expected.pop();
+ }
+ CPPUNIT_FAIL(error.str());
+ }
}
};
@@ -173,7 +183,7 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, recorder, 0, 0);
+ Channel channel(connection, recorder, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -203,7 +213,7 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
@@ -213,48 +223,6 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- void testStaging(){
- MockMessageStore store;
- connection.setFrameMax(1000);
- connection.setStagingThreshold(10);
- Channel channel(connection, recorder, 1, &store);
- const string data[] = {"abcde", "fghij", "klmno"};
-
- Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
-
- store.expect();
- store.stage(*msg);
- for (int i = 0; i < 3; i++) {
- store.appendContent(*msg, data[i]);
- }
- store.destroy(*msg);
- store.test();
-
- Exchange::shared_ptr exchange =
- broker->getExchanges().declare("my_exchange", "fanout").first;
- Queue::shared_ptr queue(new Queue("my_queue"));
- exchange->bind(queue, "", 0);
-
- AMQHeaderBody header(BASIC);
- uint64_t contentSize(0);
- for (int i = 0; i < 3; i++) {
- contentSize += data[i].size();
- }
- header.setContentSize(contentSize);
- channel.handlePublish(msg);
- channel.handleHeader(&header);
-
- for (int i = 0; i < 3; i++) {
- AMQContentBody body(data[i]);
- channel.handleContent(&body);
- }
- Message::shared_ptr msg2 = queue->dequeue();
- CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
- msg2.reset();//should trigger destroy call
-
- store.check();
- }
-
//NOTE: strictly speaking this should/could be part of QueueTest,
//but as it can usefully use the same utility classes as this
@@ -279,7 +247,6 @@ class BrokerChannelTest : public CppUnit::TestCase
store.expect();
store.stage(*msg3);
- store.destroy(*msg3);
store.test();
Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
@@ -348,16 +315,17 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody()));
- const string data("abcdefghijklmn");
-
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
+
+ //'publish' a message
+ Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ addContent(msg, "abcdefghijklmn");
queue->deliver(msg);
+
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
@@ -369,21 +337,26 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
+ Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
- BasicMessage* msg = new BasicMessage(
- 0, exchange, routingKey, false, false);
- AMQHeaderBody header(BASIC);
- header.setContentSize(contentSize);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
return msg;
}
void addContent(Message::shared_ptr msg, const string& data)
{
- AMQContentBody body(data);
- msg->addContent(&body);
+ AMQFrame content(0, AMQContentBody(data));
+ msg->getFrames().append(content);
}
};
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index a9caa89321..b3a6a745b8 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -34,7 +34,7 @@ static const ProtocolVersion VER;
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
- AMQFrame send(VER, 1, SessionOpenBody(VER));
+ AMQFrame send(1, SessionOpenBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame send(VER, 1, SessionOpenBody(VER));
+ AMQFrame send(1, SessionOpenBody(VER));
cluster.handle(send);
AMQFrame received;
BOOST_REQUIRE(cluster.received.waitPop(received));
@@ -91,8 +91,8 @@ struct CountHandler : public FrameHandler {
/** Test the ClassifierHandler */
BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
- AMQFrame queueDecl(VER, 0, QueueDeclareBody(VER));
- AMQFrame messageTrans(VER, 0, MessageTransferBody(VER));
+ AMQFrame queueDecl(0, QueueDeclareBody(VER));
+ AMQFrame messageTrans(0, MessageTransferBody(VER));
shared_ptr<CountHandler> wiring(new CountHandler());
shared_ptr<CountHandler> other(new CountHandler());
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index bd76e58127..c03d7396f0 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -40,7 +40,7 @@ void clusterTwo() {
BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
- AMQFrame send(VER, 1, SessionAttachedBody(VER));
+ AMQFrame send(1, SessionAttachedBody(VER));
cluster.handle(send);
BOOST_REQUIRE(cluster.received.waitPop(frame));
BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *frame.getBody());
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index ef2646519d..59941864e2 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -31,6 +31,7 @@
#include "qpid_test_plugin.h"
#include <iostream>
#include "qpid/framing/BasicGetBody.h"
+#include "MessageUtils.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -63,7 +64,7 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(new BasicMessage(0, "e", "A", true, true));
+ Message::shared_ptr msgPtr(MessageUtils::createMessage("exchange", "key", "id"));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index a0dd8d37f6..1b843defc1 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -137,8 +137,7 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(version, 999,
- ConnectionRedirectBody(version, a, b));
+ AMQFrame in(999, ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -149,7 +148,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, BasicConsumeOkBody(version, s));
+ AMQFrame in(999, BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
diff --git a/cpp/src/tests/HeaderTest.cpp b/cpp/src/tests/HeaderTest.cpp
index 17381cc868..df2230342c 100644
--- a/cpp/src/tests/HeaderTest.cpp
+++ b/cpp/src/tests/HeaderTest.cpp
@@ -36,8 +36,8 @@ public:
void testGenericProperties()
{
- AMQHeaderBody body(BASIC);
- dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE");
+ AMQHeaderBody body;
+ body.get<BasicHeaderProperties>(true)->getHeaders().setString("A", "BCDE");
Buffer buffer(100);
body.encode(buffer);
@@ -45,7 +45,7 @@ public:
AMQHeaderBody body2;
body2.decode(buffer, body.size());
BasicHeaderProperties* props =
- dynamic_cast<BasicHeaderProperties*>(body2.getProperties());
+ body2.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"),
props->getHeaders().getString("A"));
}
@@ -64,10 +64,11 @@ public:
string userId("guest");
string appId("just testing");
string clusterId("no clustering required");
+ uint64_t contentLength(54321);
- AMQHeaderBody body(BASIC);
+ AMQFrame out(0, AMQHeaderBody());
BasicHeaderProperties* properties =
- dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+ out.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
properties->setContentType(contentType);
properties->getHeaders().setString("A", "BCDE");
properties->setDeliveryMode(deliveryMode);
@@ -81,13 +82,14 @@ public:
properties->setUserId(userId);
properties->setAppId(appId);
properties->setClusterId(clusterId);
+ properties->setContentLength(contentLength);
Buffer buffer(10000);
- body.encode(buffer);
+ out.encode(buffer);
buffer.flip();
- AMQHeaderBody temp;
- temp.decode(buffer, body.size());
- properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+ AMQFrame in;
+ in.decode(buffer);
+ properties = in.castBody<AMQHeaderBody>()->get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A"));
@@ -102,6 +104,7 @@ public:
CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId());
CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId());
CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId());
+ CPPUNIT_ASSERT_EQUAL(contentLength, properties->getContentLength());
}
void testSomeSpecificProperties(){
@@ -111,9 +114,9 @@ public:
string expiration("Z");
uint64_t timestamp(0xabe4a34a);
- AMQHeaderBody body(BASIC);
+ AMQHeaderBody body;
BasicHeaderProperties* properties =
- dynamic_cast<BasicHeaderProperties*>(body.getProperties());
+ body.get<BasicHeaderProperties>(true);
properties->setContentType(contentType);
properties->setDeliveryMode(deliveryMode);
properties->setPriority(priority);
@@ -125,7 +128,7 @@ public:
buffer.flip();
AMQHeaderBody temp;
temp.decode(buffer, body.size());
- properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties());
+ properties = temp.get<BasicHeaderProperties>(true);
CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType());
CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode());
diff --git a/cpp/src/tests/InMemoryContentTest.cpp b/cpp/src/tests/InMemoryContentTest.cpp
deleted file mode 100644
index bc95548d45..0000000000
--- a/cpp/src/tests/InMemoryContentTest.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/InMemoryContent.h"
-#include "qpid_test_plugin.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include <iostream>
-#include <list>
-#include "qpid/framing/AMQFrame.h"
-#include "MockChannel.h"
-
-using std::list;
-using std::string;
-using boost::dynamic_pointer_cast;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-class InMemoryContentTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(InMemoryContentTest);
- CPPUNIT_TEST(testRefragmentation);
- CPPUNIT_TEST_SUITE_END();
-
-public:
- void testRefragmentation()
- {
- {//no remainder
- string out[] = {"abcde", "fghij", "klmno", "pqrst"};
- string in[] = {out[0] + out[1], out[2] + out[3]};
- refragment(2, in, 4, out);
- }
- {//remainder for last frame
- string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"};
- string in[] = {out[0] + out[1], out[2] + out[3] + out[4]};
- refragment(2, in, 5, out);
- }
- }
-
-
- void refragment(size_t inCount, string* in, size_t outCount, string* out, uint32_t framesize = 5)
- {
- InMemoryContent content;
- MockChannel channel(3);
-
- addframes(content, inCount, in);
- content.send(channel, framesize);
- CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
-
- for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody* chunk = dynamic_cast<AMQContentBody*>(
- channel.out.frames[i].getBody());
- CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(
- ChannelId(3), channel.out.frames[i].getChannel());
- }
- }
-
- void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
- {
- for (unsigned int i = 0; i < frameCount; i++) {
- AMQContentBody frame(frameData[i]);
- content.add(&frame);
- }
- }
-
-
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest);
-
diff --git a/cpp/src/tests/LazyLoadedContentTest.cpp b/cpp/src/tests/LazyLoadedContentTest.cpp
deleted file mode 100644
index df46f6b48e..0000000000
--- a/cpp/src/tests/LazyLoadedContentTest.cpp
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/LazyLoadedContent.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid_test_plugin.h"
-#include <iostream>
-#include <list>
-#include <sstream>
-#include "qpid/framing/AMQFrame.h"
-#include "MockChannel.h"
-using std::list;
-using std::string;
-using boost::dynamic_pointer_cast;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-
-class LazyLoadedContentTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(LazyLoadedContentTest);
- CPPUNIT_TEST(testFragmented);
- CPPUNIT_TEST(testWhole);
- CPPUNIT_TEST(testHalved);
- CPPUNIT_TEST_SUITE_END();
-
- class TestMessageStore : public NullMessageStore
- {
- const string content;
-
- public:
- TestMessageStore(const string& _content) : content(_content) {}
-
- void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length)
- {
- if (offset + length <= content.size()) {
- data = content.substr(offset, length);
- } else{
- std::stringstream error;
- error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size();
- throw qpid::Exception(error.str());
- }
- }
- };
-
-
-public:
- void testFragmented()
- {
- string data = "abcdefghijklmnopqrstuvwxyz";
- uint32_t framesize = 5;
- string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"};
- load(data, 6, out, framesize);
- }
-
- void testWhole()
- {
- string data = "abcdefghijklmnopqrstuvwxyz";
- uint32_t framesize = 50;
- string out[] = {data};
- load(data, 1, out, framesize);
- }
-
- void testHalved()
- {
- string data = "abcdefghijklmnopqrstuvwxyz";
- uint32_t framesize = 13;
- string out[] = {"abcdefghijklm", "nopqrstuvwxyz"};
- load(data, 2, out, framesize);
- }
-
- void load(string& in, size_t outCount, string* out, uint32_t framesize)
- {
- TestMessageStore store(in);
- LazyLoadedContent content(&store, 0, in.size());
- MockChannel channel(3);
- content.send(channel, framesize);
- CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size());
-
- for (unsigned int i = 0; i < outCount; i++) {
- AMQContentBody* chunk(dynamic_cast<AMQContentBody*>(
- channel.out.frames[i].getBody()));
- CPPUNIT_ASSERT(chunk);
- CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData());
- CPPUNIT_ASSERT_EQUAL(
- ChannelId(3), channel.out.frames[i].getChannel());
- }
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest);
-
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 34e7e973ac..7ff6a843a9 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -82,11 +82,8 @@ broker_unit_tests = \
DtxWorkRecordTest \
ExchangeTest \
HeadersExchangeTest \
- InMemoryContentTest \
- LazyLoadedContentTest \
MessageBuilderTest \
MessageTest \
- ReferenceTest \
QueueRegistryTest \
QueueTest \
QueuePolicyTest \
@@ -142,6 +139,7 @@ EXTRA_DIST += \
.valgrind.supp-default \
.valgrindrc-default \
InProcessBroker.h \
+ MessageUtils.h \
MockChannel.h \
MockConnectionInputHandler.h \
TxMocks.h \
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index a12fc603ce..341fdf56f5 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -18,15 +18,13 @@
* under the License.
*
*/
-#include "qpid/Exception.h"
-#include "qpid/broker/BrokerMessage.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NullMessageStore.h"
-#include "qpid/framing/Buffer.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/TypeFilter.h"
#include "qpid_test_plugin.h"
-#include <iostream>
-#include <memory>
-#include "MockChannel.h"
+#include <list>
using namespace boost;
using namespace qpid::broker;
@@ -35,72 +33,55 @@ using namespace qpid::sys;
class MessageBuilderTest : public CppUnit::TestCase
{
- struct MockHandler : CompletionHandler {
- Message::shared_ptr msg;
+ class MockMessageStore : public NullMessageStore
+ {
+ enum Op {STAGE=1, APPEND=2};
- virtual void complete(Message::shared_ptr _msg){
- msg = _msg;
+ uint64_t id;
+ PersistableMessage* expectedMsg;
+ string expectedData;
+ std::list<Op> ops;
+
+ void checkExpectation(Op actual)
+ {
+ CPPUNIT_ASSERT_EQUAL(ops.front(), actual);
+ ops.pop_front();
}
- };
- class TestMessageStore : public NullMessageStore
- {
- Buffer* header;
- Buffer* content;
- const uint32_t contentBufferSize;
-
- public:
+ public:
+ MockMessageStore() : id(0), expectedMsg(0) {}
- void stage(PersistableMessage& msg)
- {
- if (msg.getPersistenceId() == 0) {
- header = new Buffer(msg.encodedSize());
- msg.encode(*header);
- content = new Buffer(contentBufferSize);
- msg.setPersistenceId(1);
- } else {
- throw qpid::Exception("Message already staged!");
- }
+ void expectStage(PersistableMessage& msg)
+ {
+ expectedMsg = &msg;
+ ops.push_back(STAGE);
}
- void appendContent(PersistableMessage& msg, const string& data)
- {
- if (msg.getPersistenceId() == 1) {
- content->putRawData(data);
- } else {
- throw qpid::Exception("Invalid message id!");
- }
+ void expectAppendContent(PersistableMessage& msg, const string& data)
+ {
+ expectedMsg = &msg;
+ expectedData = data;
+ ops.push_back(APPEND);
}
- using NullMessageStore::destroy;
+ void stage(PersistableMessage& msg)
+ {
+ checkExpectation(STAGE);
+ CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
+ msg.setPersistenceId(++id);
+ }
- void destroy(PersistableMessage& msg)
+ void appendContent(PersistableMessage& msg, const string& data)
{
- CPPUNIT_ASSERT(msg.getPersistenceId());
+ checkExpectation(APPEND);
+ CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
+ CPPUNIT_ASSERT_EQUAL(expectedData, data);
}
- BasicMessage::shared_ptr getRestoredMessage()
+ bool expectationsMet()
{
- BasicMessage::shared_ptr msg(new BasicMessage());
- if (header) {
- header->flip();
- msg->decodeHeader(*header);
- delete header;
- header = 0;
- if (content) {
- content->flip();
- msg->decodeContent(*content);
- delete content;
- content = 0;
- }
- }
- return msg;
+ return ops.empty();
}
-
- //dont care about any of the other methods:
- TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0),
- contentBufferSize(_contentBufferSize) {}
- ~TestMessageStore(){}
};
CPPUNIT_TEST_SUITE(MessageBuilderTest);
@@ -113,106 +94,115 @@ class MessageBuilderTest : public CppUnit::TestCase
public:
void testHeaderOnly(){
- MockHandler handler;
- MessageBuilder builder(&handler);
-
- Message::shared_ptr message(
- new BasicMessage(
- 0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(0);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
+
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0);
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT_EQUAL(exchange, builder.getMessage()->getExchangeName());
+ CPPUNIT_ASSERT_EQUAL(key, builder.getMessage()->getRoutingKey());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void test1ContentFrame(){
- MockHandler handler;
- MessageBuilder builder(&handler);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
- string data1("abcdefg");
+ std::string data("abcdefg");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(7);
- AMQContentBody part1(data1);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part1);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content(0, AMQContentBody(data));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(header);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(content);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void test2ContentFrames(){
- MockHandler handler;
- MessageBuilder builder(&handler);
-
- string data1("abcdefg");
- string data2("hijklmn");
-
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part1);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part2);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
+
+ std::string data1("abcdefg");
+ std::string data2("hijklmn");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+ builder.handle(content1);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(content2);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void testStaging(){
- //store must be the last thing to be destroyed or destructor
- //of Message fails (it uses the store to call destroy if lazy
- //loaded content is in use)
- TestMessageStore store(14);
- {
- MockHandler handler;
- MessageBuilder builder(&handler, &store, 5);
-
- string data1("abcdefg");
- string data2("hijklmn");
-
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties());
- properties->setMessageId("MyMessage");
- properties->getHeaders().setString("abc", "xyz");
-
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
-
- builder.initialise(message);
- builder.setHeader(&header);
- builder.addContent(&part1);
- builder.addContent(&part2);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
-
- BasicMessage::shared_ptr restored = store.getRestoredMessage();
- CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
- CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId());
- CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),
- restored->getHeaderProperties()->getHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize());
- }
+ MockMessageStore store;
+ MessageBuilder builder(&store, 5);
+ builder.start(SequenceNumber());
+
+ std::string data1("abcdefg");
+ std::string data2("hijklmn");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+
+ store.expectStage(*builder.getMessage());
+ builder.handle(content1);
+ CPPUNIT_ASSERT(store.expectationsMet());
+ CPPUNIT_ASSERT_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId());
+
+ store.expectAppendContent(*builder.getMessage(), data2);
+ builder.handle(content2);
+ CPPUNIT_ASSERT(store.expectationsMet());
+
+ //were the content frames dropped?
+ CPPUNIT_ASSERT_EQUAL((uint64_t) 0, builder.getMessage()->contentSize());
}
};
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index 1fbb18b7d3..3d080ef3dc 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "qpid/broker/BrokerMessage.h"
+#include "qpid/broker/Message.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include "qpid/framing/AMQP_HighestVersion.h"
@@ -45,40 +45,45 @@ class MessageTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- BasicMessage::shared_ptr msg(
- new BasicMessage(0, exchange, routingKey, false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
- msg->setHeader(&header);
- msg->addContent(&part1);
- msg->addContent(&part2);
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().append(content1);
+ msg->getFrames().append(content2);
+
+ MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ mProps->setContentLength(data1.size() + data2.size());
+ mProps->setMessageId(messageId);
+ FieldTable applicationHeaders;
+ applicationHeaders.setString("abc", "xyz");
+ mProps->setApplicationHeaders(applicationHeaders);
+ DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ dProps->setRoutingKey(routingKey);
+ dProps->setDeliveryMode(PERSISTENT);
+ CPPUNIT_ASSERT(msg->isPersistent());
- msg->getHeaderProperties()->setMessageId(messageId);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
- msg->getHeaderProperties()->getHeaders().setString("abc", "xyz");
Buffer buffer(msg->encodedSize());
msg->encode(buffer);
- buffer.flip();
-
- msg.reset(new BasicMessage());
- msg->decode(buffer);
- CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
- CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
- CPPUNIT_ASSERT_EQUAL(PERSISTENT, msg->getHeaderProperties()->getDeliveryMode());
- CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL((uint64_t) 14, msg->contentSize());
+ buffer.flip();
+ msg.reset(new Message());
+ msg->decodeHeader(buffer);
+ msg->decodeContent(buffer);
- MockChannel channel(1);
- msg->deliver(channel, "ignore", 0, 100);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size());
- AMQContentBody* contentBody(
- dynamic_cast<AMQContentBody*>(channel.out.frames[2].getBody()));
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
+ CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName());
+ CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
+ CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
+ CPPUNIT_ASSERT_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength());
+ CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc"));
+ CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
+ CPPUNIT_ASSERT(msg->isPersistent());
}
};
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
new file mode 100644
index 0000000000..7fb1755c4b
--- /dev/null
+++ b/cpp/src/tests/MessageUtils.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageDelivery.h"
+#include "qpid/framing/AMQFrame.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct MessageUtils
+{
+ static Message::shared_ptr createMessage(const string& exchange, const string& routingKey,
+ const string& messageId, uint64_t contentSize = 0)
+ {
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+ }
+
+ static void addContent(Message::shared_ptr msg, const string& data)
+ {
+ AMQFrame content(0, AMQContentBody(data));
+ msg->getFrames().append(content);
+ }
+};
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index e7ca124631..ef1518af4c 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -70,8 +70,13 @@ class QueueTest : public CppUnit::TestCase
public:
Message::shared_ptr message(std::string exchange, std::string routingKey) {
- return Message::shared_ptr(
- new BasicMessage(0, exchange, routingKey, false, false));
+ Message::shared_ptr msg(new Message());
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
}
diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp
deleted file mode 100644
index 411462564a..0000000000
--- a/cpp/src/tests/ReferenceTest.cpp
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <memory>
-#include "qpid_test_plugin.h"
-#include "qpid/broker/Reference.h"
-#include "qpid/broker/BrokerMessageMessage.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "qpid/broker/CompletionHandler.h"
-
-using namespace boost;
-using namespace qpid;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace std;
-
-class ReferenceTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ReferenceTest);
- CPPUNIT_TEST(testRegistry);
- CPPUNIT_TEST(testReference);
- CPPUNIT_TEST_SUITE_END();
-
- ProtocolVersion v;
- ReferenceRegistry registry;
-
- public:
- void testRegistry() {
- Reference::shared_ptr ref = registry.open("foo");
- CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId());
- CPPUNIT_ASSERT(ref == registry.get("foo"));
- try {
- registry.get("none");
- CPPUNIT_FAIL("Expected exception");
- } catch (...) {}
- try {
- registry.open("foo");
- CPPUNIT_FAIL("Expected exception");
- } catch(...) {}
- ref->close();
- try {
- registry.get("foo");
- CPPUNIT_FAIL("Expected exception");
- } catch(...) {}
- }
-
- void testReference() {
-
- Reference::shared_ptr r1(registry.open("bar"));
-
- MessageTransferBody t1(v);
- // TODO aconway 2007-04-03: hack around lack of generated setters. Clean this up.
- const_cast<framing::Content&>(t1.getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m1(new MessageMessage(0, &t1, r1));
-
- MessageTransferBody t2(v);
- const_cast<framing::Content&>(t2.getBody()) = framing::Content(REFERENCE,"bar");
- MessageMessage::shared_ptr m2(new MessageMessage(0, &t2, r1));
-
- MessageAppendBody a1(v);
- MessageAppendBody a2(v);
-
- r1->addMessage(m1);
- r1->addMessage(m2);
- CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size());
- r1->append(a1);
- r1->append(a2);
- CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size());
- r1->close();
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest);
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 24e8aac701..89a907d495 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -68,11 +68,13 @@ public:
TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
{
for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(
- new BasicMessage(0, "exchange", "routing_key", false, false));
- AMQHeaderBody body(BASIC);
- msg->setHeader(&body);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ Message::shared_ptr msg(new Message());
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, "exchange", 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+ msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
messages.push_back(msg);
deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
}
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index d009dd9112..5628cf1d1c 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -26,6 +26,7 @@
#include <list>
#include <vector>
#include "MockChannel.h"
+#include "MessageUtils.h"
using std::list;
using std::pair;
@@ -70,12 +71,10 @@ public:
TxPublishTest() :
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(new BasicMessage(0, "exchange", "routing_key", false, false)),
+ msg(MessageUtils::createMessage("exchange", "routing_key", "id")),
op(msg)
{
- AMQHeaderBody body(BASIC);
- msg->setHeader(&body);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
}