diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
commit | 0637677cf6653256b67c82dcb74f35133601220c (patch) | |
tree | 8507bb8373e8b6dfd8c9b96fcb4b262fd4d61501 /cpp/src/tests | |
parent | 48dab065ef526f68a5a7d4c4ba22c5b8b2e2e026 (diff) | |
download | qpid-python-0637677cf6653256b67c82dcb74f35133601220c.tar.gz |
QPID-920: converted c++ client to use final 0-10 protocol
* connection handler converted to using invoker & proxy and updated to final method defs
* SessionCore & ExecutionHandler replace by SessionImpl
* simplified handling of completion & results, removed handling of responses
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/ExchangeTest.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/FramingTest.cpp | 108 | ||||
-rw-r--r-- | cpp/src/tests/HeaderTest.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/MessageBuilderTest.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/MessageTest.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/MessageUtils.h | 8 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/TxAckTest.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/client_test.cpp | 11 | ||||
-rw-r--r-- | cpp/src/tests/exception_test.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 30 | ||||
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/topic_publisher.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/txtest.cpp | 12 |
16 files changed, 85 insertions, 152 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 44d5ed4650..9b6e0dce21 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -118,13 +118,13 @@ QPID_AUTO_TEST_CASE(testTransfer) ClientSessionFixture fix; fix.session=fix.connection.newSession(ASYNC); fix.declareSubscribe(); - fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); + fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue")); //get & test the message: FrameSet::shared_ptr msg = fix.session.get(); BOOST_CHECK(msg->isA<MessageTransferBody>()); BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); //confirm receipt: - fix.session.getExecution().completed(msg->getId(), true, true); + fix.session.getExecution().markCompleted(msg->getId(), true, true); } QPID_AUTO_TEST_CASE(testDispatcher) @@ -161,6 +161,8 @@ BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture) } */ +/* + * GS (18-APR-2008): disabled resume tests until resumption for 0-10 final spec is implemented QPID_AUTO_TEST_CASE(_FIXTURE) { ClientSessionFixture fix; @@ -195,7 +197,7 @@ QPID_AUTO_TEST_CASE(testSuspendResume) FrameSet::shared_ptr msg = fix.session.get(); BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); } - +*/ /** * Currently broken due to a deadlock in SessionCore * diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 2904424d5c..94e2c025d6 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -30,7 +30,6 @@ #include "qpid/broker/TopicExchange.h" #include "qpid_test_plugin.h" #include <iostream> -#include "qpid/framing/BasicGetBody.h" #include "MessageUtils.h" using boost::intrusive_ptr; diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 0c7adb2af8..275d32acfe 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -23,8 +23,6 @@ #include "qpid/client/Connection.h" #include "qpid/client/Connector.h" #include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/BasicGetOkBody.h" -#include "qpid/framing/ConnectionRedirectBody.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/amqp_framing.h" @@ -54,16 +52,12 @@ std::string tostring(const T& x) class FramingTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(FramingTest); - CPPUNIT_TEST(testBasicQosBody); + CPPUNIT_TEST(testMessageTransferBody); CPPUNIT_TEST(testConnectionSecureBody); CPPUNIT_TEST(testConnectionRedirectBody); - CPPUNIT_TEST(testAccessRequestBody); - CPPUNIT_TEST(testBasicConsumeBody); + CPPUNIT_TEST(testQueueDeclareBody); CPPUNIT_TEST(testConnectionRedirectBodyFrame); - CPPUNIT_TEST(testBasicConsumeOkBodyFrame); - CPPUNIT_TEST(testInlineContent); - CPPUNIT_TEST(testContentReference); - CPPUNIT_TEST(testContentValidation); + CPPUNIT_TEST(testMessageCancelBodyFrame); CPPUNIT_TEST_SUITE_END(); private: @@ -74,14 +68,14 @@ class FramingTest : public CppUnit::TestCase FramingTest() : version(highestProtocolVersion) {} - void testBasicQosBody() + void testMessageTransferBody() { Buffer wbuff(buffer, sizeof(buffer)); - BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true); + MessageTransferBody in(version, "my-exchange", 1, 1); in.encode(wbuff); Buffer rbuff(buffer, sizeof(buffer)); - BasicQosBody out(version); + MessageTransferBody out(version); out.decode(rbuff); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } @@ -104,7 +98,11 @@ class FramingTest : public CppUnit::TestCase Buffer wbuff(buffer, sizeof(buffer)); std::string a = "hostA"; std::string b = "hostB"; - ConnectionRedirectBody in(version, a, b); + Array hosts(0x95); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a))); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b))); + + ConnectionRedirectBody in(version, a, hosts); in.encode(wbuff); Buffer rbuff(buffer, sizeof(buffer)); @@ -113,41 +111,28 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testAccessRequestBody() - { - Buffer wbuff(buffer, sizeof(buffer)); - std::string s = "text"; - AccessRequestBody in(version, s, true, false, true, false, true); - in.encode(wbuff); - - Buffer rbuff(buffer, sizeof(buffer)); - AccessRequestBody out(version); - out.decode(rbuff); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - void testBasicConsumeBody() + void testQueueDeclareBody() { Buffer wbuff(buffer, sizeof(buffer)); - std::string q = "queue"; - std::string t = "tag"; - BasicConsumeBody in(version, 0, q, t, false, true, false, false, - FieldTable()); + QueueDeclareBody in(version, "name", "dlq", true, false, true, false, FieldTable()); in.encode(wbuff); Buffer rbuff(buffer, sizeof(buffer)); - BasicConsumeBody out(version); + QueueDeclareBody out(version); out.decode(rbuff); CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testConnectionRedirectBodyFrame() { Buffer wbuff(buffer, sizeof(buffer)); std::string a = "hostA"; std::string b = "hostB"; - AMQFrame in(in_place<ConnectionRedirectBody>(version, a, b)); + Array hosts(0x95); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(a))); + hosts.add(boost::shared_ptr<FieldValue>(new Str16Value(b))); + + AMQFrame in(in_place<ConnectionRedirectBody>(version, a, hosts)); in.setChannel(999); in.encode(wbuff); @@ -157,11 +142,10 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testBasicConsumeOkBodyFrame() + void testMessageCancelBodyFrame() { Buffer wbuff(buffer, sizeof(buffer)); - std::string s = "hostA"; - AMQFrame in(in_place<BasicConsumeOkBody>(version, s)); + AMQFrame in(in_place<MessageCancelBody>(version, "tag")); in.setChannel(999); in.encode(wbuff); @@ -171,56 +155,6 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); } - void testInlineContent() { - Buffer wbuff(buffer, sizeof(buffer)); - Content content(INLINE, "MyData"); - CPPUNIT_ASSERT(content.isInline()); - content.encode(wbuff); - - Buffer rbuff(buffer, sizeof(buffer)); - Content recovered; - recovered.decode(rbuff); - CPPUNIT_ASSERT(recovered.isInline()); - CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); - } - - void testContentReference() { - Buffer wbuff(buffer, sizeof(buffer)); - Content content(REFERENCE, "MyRef"); - CPPUNIT_ASSERT(content.isReference()); - content.encode(wbuff); - - Buffer rbuff(buffer, sizeof(buffer)); - Content recovered; - recovered.decode(rbuff); - CPPUNIT_ASSERT(recovered.isReference()); - CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); - } - - void testContentValidation() { - try { - Content content(REFERENCE, ""); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (const InvalidArgumentException& e) {} - - try { - Content content(2, "Blah"); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (const SyntaxErrorException& e) {} - - try { - Buffer wbuff(buffer, sizeof(buffer)); - wbuff.putOctet(2); - wbuff.putLongString("blah, blah"); - - Buffer rbuff(buffer, sizeof(buffer)); - Content content; - content.decode(rbuff); - CPPUNIT_FAIL("Expected exception"); - } catch (Exception& e) {} - - } - }; diff --git a/cpp/src/tests/HeaderTest.cpp b/cpp/src/tests/HeaderTest.cpp index 9e2bddb4de..56be38a302 100644 --- a/cpp/src/tests/HeaderTest.cpp +++ b/cpp/src/tests/HeaderTest.cpp @@ -61,16 +61,13 @@ class HeaderTest : public CppUnit::TestCase out.castBody<AMQHeaderBody>()->get<MessageProperties>(true); props1->setContentLength(42); - props1->setMessageId("messageId"); + props1->setMessageId(Uuid(true)); props1->setCorrelationId("correlationId"); props1->setReplyTo(ReplyTo("ex","key")); props1->setContentType("contentType"); props1->setContentEncoding("contentEncoding"); - props1->setType("type"); props1->setUserId("userId"); props1->setAppId("appId"); - props1->setTransactionId("transactionId"); - props1->setSecurityToken("securityToken"); char buff[10000]; Buffer wbuffer(buff, 10000); @@ -87,11 +84,8 @@ class HeaderTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(props1->getCorrelationId(), props2->getCorrelationId()); CPPUNIT_ASSERT_EQUAL(props1->getContentType(), props2->getContentType()); CPPUNIT_ASSERT_EQUAL(props1->getContentEncoding(), props2->getContentEncoding()); - CPPUNIT_ASSERT_EQUAL(props1->getType(), props2->getType()); CPPUNIT_ASSERT_EQUAL(props1->getUserId(), props2->getUserId()); CPPUNIT_ASSERT_EQUAL(props1->getAppId(), props2->getAppId()); - CPPUNIT_ASSERT_EQUAL(props1->getTransactionId(), props2->getTransactionId()); - CPPUNIT_ASSERT_EQUAL(props1->getSecurityToken(), props2->getSecurityToken()); } diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index 092e02cc2f..7149ec50c7 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/framing/frame_functors.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/TypeFilter.h" #include "qpid_test_plugin.h" #include <list> @@ -101,7 +102,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0); @@ -124,7 +125,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string exchange("builder-exchange"); std::string key("builder-exchange"); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>(data)); method.setEof(false); @@ -158,7 +159,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); @@ -194,7 +195,7 @@ class MessageBuilderTest : public CppUnit::TestCase std::string key("builder-exchange"); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index a19080e1ce..d7688c74a9 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -21,7 +21,9 @@ #include "qpid/broker/Message.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" +#include "qpid/framing/Uuid.h" #include "qpid_test_plugin.h" @@ -44,14 +46,14 @@ class MessageTest : public CppUnit::TestCase { string exchange = "MyExchange"; string routingKey = "MyRoutingKey"; - string messageId = "MyMessage"; + Uuid messageId(true); string data1("abcdefg"); string data2("hijklmn"); intrusive_ptr<Message> msg(new Message()); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content1(in_place<AMQContentBody>(data1)); AMQFrame content2(in_place<AMQContentBody>(data2)); diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h index 3def8cd41b..21ee834ba7 100644 --- a/cpp/src/tests/MessageUtils.h +++ b/cpp/src/tests/MessageUtils.h @@ -22,6 +22,8 @@ #include "qpid/broker/Message.h" #include "qpid/broker/MessageDelivery.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/Uuid.h" using namespace qpid; using namespace broker; @@ -29,12 +31,12 @@ using namespace framing; struct MessageUtils { - static boost::intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey, - const string& messageId, uint64_t contentSize = 0) + static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="", + const Uuid& messageId=Uuid(true), uint64_t contentSize = 0) { boost::intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); msg->getFrames().append(method); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 70132bce76..1d454d9f4a 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid_test_plugin.h" #include <iostream> #include "boost/format.hpp" @@ -75,7 +76,7 @@ class QueueTest : public CppUnit::TestCase intrusive_ptr<Message> message(std::string exchange, std::string routingKey) { intrusive_ptr<Message> msg(new Message()); AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); + ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); msg->getFrames().append(method); msg->getFrames().append(header); diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index 89b98cb93c..b86f3d75e0 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "MessageUtils.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManager.h" #include "qpid/broker/TxAck.h" @@ -69,14 +70,8 @@ public: TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) { for(int i = 0; i < 10; i++){ - intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, "exchange", 0, 0)); - AMQFrame header(in_place<AMQHeaderBody>()); - msg->getFrames().append(method); - msg->getFrames().append(header); + intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key")); msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); - msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key"); messages.push_back(msg); QueuedMessage qm(queue.get()); qm.payload = msg; diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index bd2a541c92..011dcd4678 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -33,12 +33,11 @@ #include "qpid/client/Message.h" #include "qpid/client/Session.h" #include "qpid/framing/FrameSet.h" -#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid; using namespace qpid::client; -using qpid::framing::FrameSet; -using qpid::framing::MessageTransferBody; +using namespace qpid::framing; using std::string; struct Args : public qpid::TestOptions { @@ -104,14 +103,14 @@ int main(int argc, char** argv) if (opts.trace) std::cout << "Declared queue." << std::endl; //now bind the queue to the exchange - session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey"); + session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey"); if (opts.trace) std::cout << "Bound queue to exchange." << std::endl; //create and send a message to the exchange using the routing //key we bound our queue with: Message msgOut(generateData(opts.msgSize)); msgOut.getDeliveryProperties().setRoutingKey("MyKey"); - session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut); + session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1); if (opts.trace) print("Published message: ", msgOut); //subscribe to the queue, add sufficient credit and then get @@ -132,6 +131,8 @@ int main(int argc, char** argv) } else { print("Received an unexepected message: ", msgIn); } + } else { + throw Exception("Unexpected command received"); } //close the session & connection diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index f7a91f662f..7c68973d4d 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -96,7 +96,8 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) { QPID_AUTO_TEST_CASE(NoSuchQueueTest) { ProxySessionFixture fix; - BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue").sync(), NotFoundException); + fix.session.setSynchronous(true); + BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException); } QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 2b44a5477a..a61a4b2e42 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -192,7 +192,7 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0 mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2))); mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true); } else { - mgr.setConfirmMode(false); + mgr.setAcceptMode(1/*not-required*/); mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(*this, queue); @@ -257,14 +257,13 @@ void Sender::sendByCount() msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } - Completion c; for (uint i = 0; i < opts.count; i++) { uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - c = session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } - c.sync(); + session.sync(); } void Sender::sendByRate() @@ -283,7 +282,7 @@ void Sender::sendByRate() uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } uint64_t timeTaken = (current_time() - start) / TIME_USEC; if (timeTaken < 1000) { diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 3dd4e876fc..966d708ff6 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -210,7 +210,8 @@ struct Setup : public Client { void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); - session.queuePurge(arg::queue=name).sync(); + session.queuePurge(arg::queue=name); + session.sync(); } void run() { @@ -334,7 +335,7 @@ struct Controller : public Client { << endl; Message msg(data, queue); for (size_t i = 0; i < n; ++i) - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } void run() { // Controller @@ -421,7 +422,6 @@ struct PublishThread : public Client { } void run() { // Publisher - Completion completion; try { string data; size_t offset(0); @@ -459,19 +459,19 @@ struct PublishThread : public Client { // any heap allocation. const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t), reinterpret_cast<const char*>(&i), sizeof(uint32_t)); - completion = session.messageTransfer( + session.messageTransfer( arg::destination=destination, arg::content=msg, - arg::confirmMode=opts.confirm); - if (opts.intervalPub) ::usleep(opts.intervalPub*1000); + arg::acceptMode=1); + if (opts.intervalPub) ::usleep(opts.intervalPub*1000); } - if (opts.confirm) completion.sync(); + if (opts.confirm) session.sync(); AbsTime end=now(); double time=secs(start,end); // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); - session.messageTransfer(arg::content=report); + session.messageTransfer(arg::content=report, arg::acceptMode=1); } session.close(); } @@ -496,9 +496,9 @@ struct SubscribeThread : public Client { arg::exclusive=true, arg::autoDelete=true, arg::durable=opts.durable); - session.queueBind(arg::queue=queue, - arg::exchange=ex, - arg::routingKey=key); + session.exchangeBind(arg::queue=queue, + arg::exchange=ex, + arg::bindingKey=key); } void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) { @@ -506,7 +506,7 @@ struct SubscribeThread : public Client { Message error( QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual), "sub_done"); - session.messageTransfer(arg::content=error); + session.messageTransfer(arg::content=error, arg::acceptMode=1); throw Exception(error.getData()); } } @@ -515,12 +515,12 @@ struct SubscribeThread : public Client { try { SubscriptionManager subs(session); LocalQueue lq(AckPolicy(opts.ack)); - subs.setConfirmMode(opts.ack > 0); + subs.setAcceptMode(opts.ack > 0 ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. - session.messageTransfer(arg::content=Message("ready", "sub_ready")); + session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); for (size_t j = 0; j < opts.iterations; ++j) { @@ -556,7 +556,7 @@ struct SubscribeThread : public Client { // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); - session.messageTransfer(arg::content=result); + session.messageTransfer(arg::content=result, arg::acceptMode=1); } session.close(); } diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index e5e7d24112..5208b67445 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -114,7 +114,7 @@ int main(int argc, char** argv){ } else { session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true); } - session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control"); + session.exchangeBind(arg::exchange="amq.topic", arg::queue=control, arg::bindingKey="topic_control"); //set up listener SubscriptionManager mgr(session); @@ -123,7 +123,7 @@ int main(int argc, char** argv){ mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2))); mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true); } else { - mgr.setConfirmMode(false); + mgr.setAcceptMode(1/*-not-required*/); mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(listener, control); @@ -181,7 +181,7 @@ void Listener::report(){ << time/TIME_MSEC << " ms."; Message msg(reportstr.str(), responseQueue); msg.getHeaders().setString("TYPE", "REPORT"); - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); if(transactional){ session.txCommit(); } diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 2271849c35..8242530db1 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -164,12 +164,12 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ AbsTime start = now(); for(int i = 0; i < msgs; i++){ - session.messageTransfer(arg::content=msg, arg::destination="amq.topic"); + session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1); } //send report request Message reportRequest("", controlTopic); reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic"); + session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1); if(transactional){ session.txCommit(); } @@ -198,7 +198,7 @@ void Publisher::terminate(){ //send termination request Message terminationRequest("", controlTopic); terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic"); + session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1); if(transactional){ session.txCommit(); } diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index 4c5814986c..5030b24070 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -142,7 +142,7 @@ struct Transfer : public Client, public Runnable out.setData(in.getData()); out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId()); out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); - session.messageTransfer(arg::content=out); + session.messageTransfer(arg::content=out, arg::acceptMode=1); } in.acknowledge(); session.txCommit(); @@ -168,7 +168,8 @@ struct Controller : public Client { //declare queues for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - session.queueDeclare(arg::queue=*i, arg::durable=opts.durable).sync(); + session.queueDeclare(arg::queue=*i, arg::durable=opts.durable); + session.sync(); } Message msg(generateData(opts.size), *queues.begin()); @@ -179,7 +180,7 @@ struct Controller : public Client //publish messages for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { msg.getMessageProperties().setCorrelationId(*i); - session.messageTransfer(arg::content=msg); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); } } @@ -205,7 +206,7 @@ struct Controller : public Client { SubscriptionManager subs(session); subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); - subs.setConfirmMode(false); + subs.setAcceptMode(1/*not-required*/); StringSet drained; //drain each queue and verify the correct set of messages are available @@ -213,7 +214,8 @@ struct Controller : public Client //subscribe, allocate credit and flush LocalQueue lq(AckPolicy(0));//manual acking subs.subscribe(lq, *i, *i); - session.messageFlush(arg::destination=*i).sync(); + session.messageFlush(arg::destination=*i); + session.sync(); uint count(0); while (!lq.empty()) { |