summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-20 12:10:37 +0000
committerGordon Sim <gsim@apache.org>2008-04-20 12:10:37 +0000
commit0637677cf6653256b67c82dcb74f35133601220c (patch)
tree8507bb8373e8b6dfd8c9b96fcb4b262fd4d61501 /cpp/src/tests
parent48dab065ef526f68a5a7d4c4ba22c5b8b2e2e026 (diff)
downloadqpid-python-0637677cf6653256b67c82dcb74f35133601220c.tar.gz
QPID-920: converted c++ client to use final 0-10 protocol
* connection handler converted to using invoker & proxy and updated to final method defs * SessionCore & ExecutionHandler replace by SessionImpl * simplified handling of completion & results, removed handling of responses git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp8
-rw-r--r--cpp/src/tests/ExchangeTest.cpp1
-rw-r--r--cpp/src/tests/FramingTest.cpp108
-rw-r--r--cpp/src/tests/HeaderTest.cpp8
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp9
-rw-r--r--cpp/src/tests/MessageTest.cpp6
-rw-r--r--cpp/src/tests/MessageUtils.h8
-rw-r--r--cpp/src/tests/QueueTest.cpp3
-rw-r--r--cpp/src/tests/TxAckTest.cpp9
-rw-r--r--cpp/src/tests/client_test.cpp11
-rw-r--r--cpp/src/tests/exception_test.cpp3
-rw-r--r--cpp/src/tests/latencytest.cpp9
-rw-r--r--cpp/src/tests/perftest.cpp30
-rw-r--r--cpp/src/tests/topic_listener.cpp6
-rw-r--r--cpp/src/tests/topic_publisher.cpp6
-rw-r--r--cpp/src/tests/txtest.cpp12
16 files changed, 85 insertions, 152 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 44d5ed4650..9b6e0dce21 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -118,13 +118,13 @@ QPID_AUTO_TEST_CASE(testTransfer)
ClientSessionFixture fix;
fix.session=fix.connection.newSession(ASYNC);
fix.declareSubscribe();
- fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+ fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue"));
//get & test the message:
FrameSet::shared_ptr msg = fix.session.get();
BOOST_CHECK(msg->isA<MessageTransferBody>());
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
//confirm receipt:
- fix.session.getExecution().completed(msg->getId(), true, true);
+ fix.session.getExecution().markCompleted(msg->getId(), true, true);
}
QPID_AUTO_TEST_CASE(testDispatcher)
@@ -161,6 +161,8 @@ BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture)
}
*/
+/*
+ * GS (18-APR-2008): disabled resume tests until resumption for 0-10 final spec is implemented
QPID_AUTO_TEST_CASE(_FIXTURE)
{
ClientSessionFixture fix;
@@ -195,7 +197,7 @@ QPID_AUTO_TEST_CASE(testSuspendResume)
FrameSet::shared_ptr msg = fix.session.get();
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
}
-
+*/
/**
* Currently broken due to a deadlock in SessionCore
*
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 2904424d5c..94e2c025d6 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -30,7 +30,6 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid_test_plugin.h"
#include <iostream>
-#include "qpid/framing/BasicGetBody.h"
#include "MessageUtils.h"
using boost::intrusive_ptr;
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 0c7adb2af8..275d32acfe 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -23,8 +23,6 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Connector.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/ConnectionRedirectBody.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/amqp_framing.h"
@@ -54,16 +52,12 @@ std::string tostring(const T& x)
class FramingTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(FramingTest);
- CPPUNIT_TEST(testBasicQosBody);
+ CPPUNIT_TEST(testMessageTransferBody);
CPPUNIT_TEST(testConnectionSecureBody);
CPPUNIT_TEST(testConnectionRedirectBody);
- CPPUNIT_TEST(testAccessRequestBody);
- CPPUNIT_TEST(testBasicConsumeBody);
+ CPPUNIT_TEST(testQueueDeclareBody);
CPPUNIT_TEST(testConnectionRedirectBodyFrame);
- CPPUNIT_TEST(testBasicConsumeOkBodyFrame);
- CPPUNIT_TEST(testInlineContent);
- CPPUNIT_TEST(testContentReference);
- CPPUNIT_TEST(testContentValidation);
+ CPPUNIT_TEST(testMessageCancelBodyFrame);
CPPUNIT_TEST_SUITE_END();
private:
@@ -74,14 +68,14 @@ class FramingTest : public CppUnit::TestCase
FramingTest() : version(highestProtocolVersion) {}
- void testBasicQosBody()
+ void testMessageTransferBody()
{
Buffer wbuff(buffer, sizeof(buffer));
- BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true);
+ MessageTransferBody in(version, "my-exchange", 1, 1);
in.encode(wbuff);
Buffer rbuff(buffer, sizeof(buffer));
- BasicQosBody out(version);
+ MessageTransferBody out(version);
out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
@@ -104,7 +98,11 @@ class FramingTest : public CppUnit::TestCase
Buffer wbuff(buffer, sizeof(buffer));
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(version, a, b);
+ Array hosts(0x95);
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a)));
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b)));
+
+ ConnectionRedirectBody in(version, a, hosts);
in.encode(wbuff);
Buffer rbuff(buffer, sizeof(buffer));
@@ -113,41 +111,28 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
- void testAccessRequestBody()
- {
- Buffer wbuff(buffer, sizeof(buffer));
- std::string s = "text";
- AccessRequestBody in(version, s, true, false, true, false, true);
- in.encode(wbuff);
-
- Buffer rbuff(buffer, sizeof(buffer));
- AccessRequestBody out(version);
- out.decode(rbuff);
- CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
- }
-
- void testBasicConsumeBody()
+ void testQueueDeclareBody()
{
Buffer wbuff(buffer, sizeof(buffer));
- std::string q = "queue";
- std::string t = "tag";
- BasicConsumeBody in(version, 0, q, t, false, true, false, false,
- FieldTable());
+ QueueDeclareBody in(version, "name", "dlq", true, false, true, false, FieldTable());
in.encode(wbuff);
Buffer rbuff(buffer, sizeof(buffer));
- BasicConsumeBody out(version);
+ QueueDeclareBody out(version);
out.decode(rbuff);
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
-
void testConnectionRedirectBodyFrame()
{
Buffer wbuff(buffer, sizeof(buffer));
std::string a = "hostA";
std::string b = "hostB";
- AMQFrame in(in_place<ConnectionRedirectBody>(version, a, b));
+ Array hosts(0x95);
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a)));
+ hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b)));
+
+ AMQFrame in(in_place<ConnectionRedirectBody>(version, a, hosts));
in.setChannel(999);
in.encode(wbuff);
@@ -157,11 +142,10 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
- void testBasicConsumeOkBodyFrame()
+ void testMessageCancelBodyFrame()
{
Buffer wbuff(buffer, sizeof(buffer));
- std::string s = "hostA";
- AMQFrame in(in_place<BasicConsumeOkBody>(version, s));
+ AMQFrame in(in_place<MessageCancelBody>(version, "tag"));
in.setChannel(999);
in.encode(wbuff);
@@ -171,56 +155,6 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out));
}
- void testInlineContent() {
- Buffer wbuff(buffer, sizeof(buffer));
- Content content(INLINE, "MyData");
- CPPUNIT_ASSERT(content.isInline());
- content.encode(wbuff);
-
- Buffer rbuff(buffer, sizeof(buffer));
- Content recovered;
- recovered.decode(rbuff);
- CPPUNIT_ASSERT(recovered.isInline());
- CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
- }
-
- void testContentReference() {
- Buffer wbuff(buffer, sizeof(buffer));
- Content content(REFERENCE, "MyRef");
- CPPUNIT_ASSERT(content.isReference());
- content.encode(wbuff);
-
- Buffer rbuff(buffer, sizeof(buffer));
- Content recovered;
- recovered.decode(rbuff);
- CPPUNIT_ASSERT(recovered.isReference());
- CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue());
- }
-
- void testContentValidation() {
- try {
- Content content(REFERENCE, "");
- CPPUNIT_ASSERT(false);//fail, expected exception
- } catch (const InvalidArgumentException& e) {}
-
- try {
- Content content(2, "Blah");
- CPPUNIT_ASSERT(false);//fail, expected exception
- } catch (const SyntaxErrorException& e) {}
-
- try {
- Buffer wbuff(buffer, sizeof(buffer));
- wbuff.putOctet(2);
- wbuff.putLongString("blah, blah");
-
- Buffer rbuff(buffer, sizeof(buffer));
- Content content;
- content.decode(rbuff);
- CPPUNIT_FAIL("Expected exception");
- } catch (Exception& e) {}
-
- }
-
};
diff --git a/cpp/src/tests/HeaderTest.cpp b/cpp/src/tests/HeaderTest.cpp
index 9e2bddb4de..56be38a302 100644
--- a/cpp/src/tests/HeaderTest.cpp
+++ b/cpp/src/tests/HeaderTest.cpp
@@ -61,16 +61,13 @@ class HeaderTest : public CppUnit::TestCase
out.castBody<AMQHeaderBody>()->get<MessageProperties>(true);
props1->setContentLength(42);
- props1->setMessageId("messageId");
+ props1->setMessageId(Uuid(true));
props1->setCorrelationId("correlationId");
props1->setReplyTo(ReplyTo("ex","key"));
props1->setContentType("contentType");
props1->setContentEncoding("contentEncoding");
- props1->setType("type");
props1->setUserId("userId");
props1->setAppId("appId");
- props1->setTransactionId("transactionId");
- props1->setSecurityToken("securityToken");
char buff[10000];
Buffer wbuffer(buff, 10000);
@@ -87,11 +84,8 @@ class HeaderTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(props1->getCorrelationId(), props2->getCorrelationId());
CPPUNIT_ASSERT_EQUAL(props1->getContentType(), props2->getContentType());
CPPUNIT_ASSERT_EQUAL(props1->getContentEncoding(), props2->getContentEncoding());
- CPPUNIT_ASSERT_EQUAL(props1->getType(), props2->getType());
CPPUNIT_ASSERT_EQUAL(props1->getUserId(), props2->getUserId());
CPPUNIT_ASSERT_EQUAL(props1->getAppId(), props2->getAppId());
- CPPUNIT_ASSERT_EQUAL(props1->getTransactionId(), props2->getTransactionId());
- CPPUNIT_ASSERT_EQUAL(props1->getSecurityToken(), props2->getSecurityToken());
}
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index 092e02cc2f..7149ec50c7 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/TypeFilter.h"
#include "qpid_test_plugin.h"
#include <list>
@@ -101,7 +102,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string key("builder-exchange");
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0);
@@ -124,7 +125,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string exchange("builder-exchange");
std::string key("builder-exchange");
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>(data));
method.setEof(false);
@@ -158,7 +159,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string key("builder-exchange");
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content1(in_place<AMQContentBody>(data1));
AMQFrame content2(in_place<AMQContentBody>(data2));
@@ -194,7 +195,7 @@ class MessageBuilderTest : public CppUnit::TestCase
std::string key("builder-exchange");
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content1(in_place<AMQContentBody>(data1));
AMQFrame content2(in_place<AMQContentBody>(data2));
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index a19080e1ce..d7688c74a9 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -21,7 +21,9 @@
#include "qpid/broker/Message.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/Uuid.h"
#include "qpid_test_plugin.h"
@@ -44,14 +46,14 @@ class MessageTest : public CppUnit::TestCase
{
string exchange = "MyExchange";
string routingKey = "MyRoutingKey";
- string messageId = "MyMessage";
+ Uuid messageId(true);
string data1("abcdefg");
string data2("hijklmn");
intrusive_ptr<Message> msg(new Message());
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content1(in_place<AMQContentBody>(data1));
AMQFrame content2(in_place<AMQContentBody>(data2));
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
index 3def8cd41b..21ee834ba7 100644
--- a/cpp/src/tests/MessageUtils.h
+++ b/cpp/src/tests/MessageUtils.h
@@ -22,6 +22,8 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/MessageDelivery.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Uuid.h"
using namespace qpid;
using namespace broker;
@@ -29,12 +31,12 @@ using namespace framing;
struct MessageUtils
{
- static boost::intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey,
- const string& messageId, uint64_t contentSize = 0)
+ static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="",
+ const Uuid& messageId=Uuid(true), uint64_t contentSize = 0)
{
boost::intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
msg->getFrames().append(method);
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 70132bce76..1d454d9f4a 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include "boost/format.hpp"
@@ -75,7 +76,7 @@ class QueueTest : public CppUnit::TestCase
intrusive_ptr<Message> message(std::string exchange, std::string routingKey) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, exchange, 0, 0));
+ ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
msg->getFrames().append(method);
msg->getFrames().append(header);
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 89b98cb93c..b86f3d75e0 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "MessageUtils.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManager.h"
#include "qpid/broker/TxAck.h"
@@ -69,14 +70,8 @@ public:
TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
{
for(int i = 0; i < 10; i++){
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), 0, "exchange", 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
- msg->getFrames().append(method);
- msg->getFrames().append(header);
+ intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key"));
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
- msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
messages.push_back(msg);
QueuedMessage qm(queue.get());
qm.payload = msg;
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index bd2a541c92..011dcd4678 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -33,12 +33,11 @@
#include "qpid/client/Message.h"
#include "qpid/client/Session.h"
#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid;
using namespace qpid::client;
-using qpid::framing::FrameSet;
-using qpid::framing::MessageTransferBody;
+using namespace qpid::framing;
using std::string;
struct Args : public qpid::TestOptions {
@@ -104,14 +103,14 @@ int main(int argc, char** argv)
if (opts.trace) std::cout << "Declared queue." << std::endl;
//now bind the queue to the exchange
- session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey");
+ session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey");
if (opts.trace) std::cout << "Bound queue to exchange." << std::endl;
//create and send a message to the exchange using the routing
//key we bound our queue with:
Message msgOut(generateData(opts.msgSize));
msgOut.getDeliveryProperties().setRoutingKey("MyKey");
- session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut);
+ session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1);
if (opts.trace) print("Published message: ", msgOut);
//subscribe to the queue, add sufficient credit and then get
@@ -132,6 +131,8 @@ int main(int argc, char** argv)
} else {
print("Received an unexepected message: ", msgIn);
}
+ } else {
+ throw Exception("Unexpected command received");
}
//close the session & connection
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index f7a91f662f..7c68973d4d 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -96,7 +96,8 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
ProxySessionFixture fix;
- BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue").sync(), NotFoundException);
+ fix.session.setSynchronous(true);
+ BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException);
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 2b44a5477a..a61a4b2e42 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -192,7 +192,7 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0
mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true);
} else {
- mgr.setConfirmMode(false);
+ mgr.setAcceptMode(1/*not-required*/);
mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
}
mgr.subscribe(*this, queue);
@@ -257,14 +257,13 @@ void Sender::sendByCount()
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
- Completion c;
for (uint i = 0; i < opts.count; i++) {
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- c = session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
- c.sync();
+ session.sync();
}
void Sender::sendByRate()
@@ -283,7 +282,7 @@ void Sender::sendByRate()
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
//msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
uint64_t timeTaken = (current_time() - start) / TIME_USEC;
if (timeTaken < 1000) {
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index 3dd4e876fc..966d708ff6 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -210,7 +210,8 @@ struct Setup : public Client {
void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) {
session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings);
- session.queuePurge(arg::queue=name).sync();
+ session.queuePurge(arg::queue=name);
+ session.sync();
}
void run() {
@@ -334,7 +335,7 @@ struct Controller : public Client {
<< endl;
Message msg(data, queue);
for (size_t i = 0; i < n; ++i)
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
void run() { // Controller
@@ -421,7 +422,6 @@ struct PublishThread : public Client {
}
void run() { // Publisher
- Completion completion;
try {
string data;
size_t offset(0);
@@ -459,19 +459,19 @@ struct PublishThread : public Client {
// any heap allocation.
const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t),
reinterpret_cast<const char*>(&i), sizeof(uint32_t));
- completion = session.messageTransfer(
+ session.messageTransfer(
arg::destination=destination,
arg::content=msg,
- arg::confirmMode=opts.confirm);
- if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
+ arg::acceptMode=1);
+ if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
}
- if (opts.confirm) completion.sync();
+ if (opts.confirm) session.sync();
AbsTime end=now();
double time=secs(start,end);
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), "pub_done");
- session.messageTransfer(arg::content=report);
+ session.messageTransfer(arg::content=report, arg::acceptMode=1);
}
session.close();
}
@@ -496,9 +496,9 @@ struct SubscribeThread : public Client {
arg::exclusive=true,
arg::autoDelete=true,
arg::durable=opts.durable);
- session.queueBind(arg::queue=queue,
- arg::exchange=ex,
- arg::routingKey=key);
+ session.exchangeBind(arg::queue=queue,
+ arg::exchange=ex,
+ arg::bindingKey=key);
}
void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) {
@@ -506,7 +506,7 @@ struct SubscribeThread : public Client {
Message error(
QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual),
"sub_done");
- session.messageTransfer(arg::content=error);
+ session.messageTransfer(arg::content=error, arg::acceptMode=1);
throw Exception(error.getData());
}
}
@@ -515,12 +515,12 @@ struct SubscribeThread : public Client {
try {
SubscriptionManager subs(session);
LocalQueue lq(AckPolicy(opts.ack));
- subs.setConfirmMode(opts.ack > 0);
+ subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
- session.messageTransfer(arg::content=Message("ready", "sub_ready"));
+ session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
for (size_t j = 0; j < opts.iterations; ++j) {
@@ -556,7 +556,7 @@ struct SubscribeThread : public Client {
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
- session.messageTransfer(arg::content=result);
+ session.messageTransfer(arg::content=result, arg::acceptMode=1);
}
session.close();
}
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index e5e7d24112..5208b67445 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -114,7 +114,7 @@ int main(int argc, char** argv){
} else {
session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true);
}
- session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
+ session.exchangeBind(arg::exchange="amq.topic", arg::queue=control, arg::bindingKey="topic_control");
//set up listener
SubscriptionManager mgr(session);
@@ -123,7 +123,7 @@ int main(int argc, char** argv){
mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2)));
mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true);
} else {
- mgr.setConfirmMode(false);
+ mgr.setAcceptMode(1/*-not-required*/);
mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
}
mgr.subscribe(listener, control);
@@ -181,7 +181,7 @@ void Listener::report(){
<< time/TIME_MSEC << " ms.";
Message msg(reportstr.str(), responseQueue);
msg.getHeaders().setString("TYPE", "REPORT");
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
if(transactional){
session.txCommit();
}
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 2271849c35..8242530db1 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -164,12 +164,12 @@ int64_t Publisher::publish(int msgs, int listeners, int size){
AbsTime start = now();
for(int i = 0; i < msgs; i++){
- session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1);
}
//send report request
Message reportRequest("", controlTopic);
reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1);
if(transactional){
session.txCommit();
}
@@ -198,7 +198,7 @@ void Publisher::terminate(){
//send termination request
Message terminationRequest("", controlTopic);
terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic");
+ session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1);
if(transactional){
session.txCommit();
}
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index 4c5814986c..5030b24070 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -142,7 +142,7 @@ struct Transfer : public Client, public Runnable
out.setData(in.getData());
out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId());
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
- session.messageTransfer(arg::content=out);
+ session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
in.acknowledge();
session.txCommit();
@@ -168,7 +168,8 @@ struct Controller : public Client
{
//declare queues
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
- session.queueDeclare(arg::queue=*i, arg::durable=opts.durable).sync();
+ session.queueDeclare(arg::queue=*i, arg::durable=opts.durable);
+ session.sync();
}
Message msg(generateData(opts.size), *queues.begin());
@@ -179,7 +180,7 @@ struct Controller : public Client
//publish messages
for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {
msg.getMessageProperties().setCorrelationId(*i);
- session.messageTransfer(arg::content=msg);
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
}
}
@@ -205,7 +206,7 @@ struct Controller : public Client
{
SubscriptionManager subs(session);
subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
- subs.setConfirmMode(false);
+ subs.setAcceptMode(1/*not-required*/);
StringSet drained;
//drain each queue and verify the correct set of messages are available
@@ -213,7 +214,8 @@ struct Controller : public Client
//subscribe, allocate credit and flush
LocalQueue lq(AckPolicy(0));//manual acking
subs.subscribe(lq, *i, *i);
- session.messageFlush(arg::destination=*i).sync();
+ session.messageFlush(arg::destination=*i);
+ session.sync();
uint count(0);
while (!lq.empty()) {