diff options
author | Alan Conway <aconway@apache.org> | 2008-05-26 18:10:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-26 18:10:05 +0000 |
commit | ce7678789fe3e8c5caebb59a26aa418fbb95e5d3 (patch) | |
tree | affd8e2de460cba285e7c25e15f5c3d94444f905 /cpp/src/tests | |
parent | 0b56077cbb8b6e9cdd982cbdeaa3ec6fe1bd5434 (diff) | |
download | qpid-python-ce7678789fe3e8c5caebb59a26aa418fbb95e5d3.tar.gz |
Changes to Session API:
- Session is synchronous, no futures.
- AsyncSession is async, returns futures.
- Conversion functions sync(s) async(s) return a sync/async view of session s.
- Connection::newSession - takes name, no timeout
- SessionBase::getId - returns SessionId not UUID.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@660258 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 84 | ||||
-rw-r--r-- | cpp/src/tests/XmlClientSessionTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/client_test.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/consume.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/exception_test.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 10 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/publish.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 11 | ||||
-rw-r--r-- | cpp/src/tests/topic_publisher.cpp | 10 | ||||
-rwxr-xr-x | cpp/src/tests/topictest | 2 | ||||
-rw-r--r-- | cpp/src/tests/txtest.cpp | 6 |
13 files changed, 74 insertions, 72 deletions
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index 83b3f621c7..31f63d71a0 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -92,7 +92,7 @@ struct SessionFixtureT : BrokerFixture { qpid::client::LocalQueue lq; SessionFixtureT() : connection(broker->getPort()), - session(connection.newSession(qpid::client::ASYNC)), + session(connection.newSession("SessionFixture")), subs(session) {} diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 801e33d412..1dade47ee9 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -106,19 +106,19 @@ struct ClientSessionFixture : public ProxySessionFixture QPID_AUTO_TEST_CASE(testQueueQuery) { ClientSessionFixture fix; - fix.session = fix.connection.newSession(ASYNC); + fix.session = fix.connection.newSession(); fix.session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); - TypedResult<QueueQueryResult> result = fix.session.queueQuery(string("my-queue")); - BOOST_CHECK_EQUAL(false, result.get().getDurable()); - BOOST_CHECK_EQUAL(true, result.get().getExclusive()); + QueueQueryResult result = fix.session.queueQuery(string("my-queue")); + BOOST_CHECK_EQUAL(false, result.getDurable()); + BOOST_CHECK_EQUAL(true, result.getExclusive()); BOOST_CHECK_EQUAL(string("amq.fanout"), - result.get().getAlternateExchange()); + result.getAlternateExchange()); } QPID_AUTO_TEST_CASE(testTransfer) { ClientSessionFixture fix; - fix.session=fix.connection.newSession(ASYNC); + fix.session=fix.connection.newSession(); fix.declareSubscribe(); fix.session.messageTransfer(acceptMode=1, content=TransferContent("my-message", "my-queue")); //get & test the message: @@ -133,7 +133,7 @@ QPID_AUTO_TEST_CASE(testTransfer) QPID_AUTO_TEST_CASE(testDispatcher) { ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC); + fix.session =fix.connection.newSession(); fix.declareSubscribe(); size_t count = 100; for (size_t i = 0; i < count; ++i) @@ -148,7 +148,7 @@ QPID_AUTO_TEST_CASE(testDispatcher) QPID_AUTO_TEST_CASE(testDispatcherThread) { ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC); + fix.session =fix.connection.newSession(); fix.declareSubscribe(); size_t count = 10; DummyListener listener(fix.session, "my-dest", count); @@ -162,40 +162,42 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData()); } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC, 0); - fix.session.suspend(); // session has 0 timeout. - try { - fix.connection.resume(fix.session); - BOOST_FAIL("Expected InvalidArgumentException."); - } catch(const InternalErrorException&) {} -} - -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC, 60); - fix.session.suspend(); - try { - fix.session.exchangeQuery(name="amq.fanout"); - BOOST_FAIL("Expected session suspended exception"); - } catch(const CommandInvalidException&) {} -} +// FIXME aconway 2008-05-26: Re-enable with final resume implementation. +// +// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) +// { +// ClientSessionFixture fix; +// fix.session.suspend(); // session has 0 timeout. +// try { +// fix.connection.resume(fix.session); +// BOOST_FAIL("Expected InvalidArgumentException."); +// } catch(const InternalErrorException&) {} +// } + +// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1) +// { +// ClientSessionFixture fix; +// fix.session =fix.session.timeout(60); +// fix.session.suspend(); +// try { +// fix.session.exchangeQuery(name="amq.fanout"); +// BOOST_FAIL("Expected session suspended exception"); +// } catch(const CommandInvalidException&) {} +// } + +// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) +// { +// ClientSessionFixture fix; +// fix.session.timeout(60); +// fix.declareSubscribe(); +// fix.session.suspend(); +// // Make sure we are still subscribed after resume. +// fix.connection.resume(fix.session); +// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); +// FrameSet::shared_ptr msg = fix.session.get(); +// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); +// } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(ASYNC, 60); - fix.declareSubscribe(); - fix.session.suspend(); - // Make sure we are still subscribed after resume. - fix.connection.resume(fix.session); - fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); - FrameSet::shared_ptr msg = fix.session.get(); - BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); -} QPID_AUTO_TEST_CASE(testSendToSelf) { ClientSessionFixture fix; diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp index dc6cce24fc..9b0c035f37 100644 --- a/cpp/src/tests/XmlClientSessionTest.cpp +++ b/cpp/src/tests/XmlClientSessionTest.cpp @@ -121,7 +121,7 @@ struct ClientSessionFixture : public ProxySessionFixture QPID_AUTO_TEST_CASE(testXmlBinding) { ClientSessionFixture f; - Session session = f.connection.newSession(ASYNC); + Session session = f.connection.newSession(); SubscriptionManager subscriptions(session); SubscribedLocalQueue localQueue(subscriptions); diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index 20e8b21a3a..04269b299d 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -92,7 +92,7 @@ int main(int argc, char** argv) //Create and open a session on the connection through which //most functionality is exposed: - Session session = connection.newSession(ASYNC); + Session session = connection.newSession(); if (opts.verbose) std::cout << "Opened session." << std::endl; diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp index 6d2f0a7413..43e08a80b7 100644 --- a/cpp/src/tests/consume.cpp +++ b/cpp/src/tests/consume.cpp @@ -62,7 +62,7 @@ struct Client Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } void consume() diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index f75269c959..a656e0cf1a 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -96,7 +96,6 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) { QPID_AUTO_TEST_CASE(NoSuchQueueTest) { ProxySessionFixture fix; - fix.session.setSynchronous(true); BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException); } diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 0b343d0243..f4cbade36b 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -30,7 +30,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -99,7 +99,7 @@ class Client : public Runnable { protected: Connection connection; - Session session; + AsyncSession session; Thread thread; string queue; @@ -157,7 +157,7 @@ public: Client::Client(const string& q) : queue(q) { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } void Client::start() @@ -262,7 +262,7 @@ void Sender::sendByCount() uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - session.messageTransfer(arg::content=msg, arg::acceptMode=1); + async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); } session.sync(); } @@ -283,7 +283,7 @@ void Sender::sendByRate() uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables - session.messageTransfer(arg::content=msg, arg::acceptMode=1); + async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); } uint64_t timeTaken = (current_time() - start) / TIME_USEC; if (timeTaken < 1000) { diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 2a8a9ec17c..91ecd83f50 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -21,7 +21,7 @@ #include "TestOptions.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/client/Completion.h" @@ -194,12 +194,12 @@ Opts opts; struct Client : public Runnable { Connection connection; - Session session; + AsyncSession session; Thread thread; Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } ~Client() { @@ -431,7 +431,7 @@ struct PublishThread : public Client { offset = 5; data += "data:";//marker (requested for latency testing tool scripts) data += string(sizeof(size_t), 'X');//space for seq no - data += string(reinterpret_cast<const char*>(session.getId().data()), session.getId().size()); + data += session.getId().str(); if (opts.size > data.size()) { data += string(opts.size - data.size(), 'X'); } else if(opts.size < data.size()) { diff --git a/cpp/src/tests/publish.cpp b/cpp/src/tests/publish.cpp index 17e3d4e104..b78f3fdf6d 100644 --- a/cpp/src/tests/publish.cpp +++ b/cpp/src/tests/publish.cpp @@ -28,7 +28,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -61,12 +61,12 @@ Args opts; struct Client { Connection connection; - Session session; + AsyncSession session; Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } std::string id(uint i) diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 8f0e290070..6daf928401 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -53,7 +53,7 @@ using namespace std; * defined. */ class Listener : public MessageListener{ - Session& session; + Session session; SubscriptionManager& mgr; const string responseQueue; const bool transactional; @@ -64,7 +64,7 @@ class Listener : public MessageListener{ void shutdown(); void report(); public: - Listener(Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); + Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); virtual void received(Message& msg); }; @@ -101,7 +101,7 @@ int main(int argc, char** argv){ else { Connection connection; args.open(connection); - Session session = connection.newSession(ASYNC); + AsyncSession session = connection.newSession(); if (args.transactional) { session.txSelect(); } @@ -127,7 +127,8 @@ int main(int argc, char** argv){ mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); } mgr.subscribe(listener, control); - + session.sync(); + cout << "topic_listener: listening..." << endl; mgr.run(); if (args.durable) { @@ -144,7 +145,7 @@ int main(int argc, char** argv){ return 1; } -Listener::Listener(Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : +Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} void Listener::received(Message& message){ diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index a6a7b4d80d..c8f0d543ec 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -37,7 +37,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" #include <unistd.h> @@ -56,7 +56,7 @@ using namespace std; * back by the subscribers. */ class Publisher { - Session& session; + AsyncSession session; SubscriptionManager mgr; LocalQueue queue; const string controlTopic; @@ -66,7 +66,7 @@ class Publisher { string generateData(int size); public: - Publisher(Session& session, const string& controlTopic, bool tx, bool durable); + Publisher(const AsyncSession& session, const string& controlTopic, bool tx, bool durable); int64_t publish(int msgs, int listeners, int size); void terminate(); }; @@ -107,7 +107,7 @@ int main(int argc, char** argv) { else { Connection connection; args.open(connection); - Session session = connection.newSession(ASYNC); + AsyncSession session = connection.newSession(); if (args.transactional) { session.txSelect(); } @@ -150,7 +150,7 @@ int main(int argc, char** argv) { return 1; } -Publisher::Publisher(Session& _session, const string& _controlTopic, bool tx, bool d) : +Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) : session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) { mgr.subscribe(queue, "response"); diff --git a/cpp/src/tests/topictest b/cpp/src/tests/topictest index c36aa319ba..ad7c5df693 100755 --- a/cpp/src/tests/topictest +++ b/cpp/src/tests/topictest @@ -36,5 +36,5 @@ for ((i=$SUBSCRIBERS ; i--; )); do subscribe $i & done # FIXME aconway 2007-03-27: Hack around startup race. Fix topic test. -sleep 1 +sleep 2 publish 2>&1 || exit 1 diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index a8369df759..6eb812738d 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -28,7 +28,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" using namespace qpid; @@ -96,12 +96,12 @@ Args opts; struct Client { Connection connection; - Session session; + AsyncSession session; Client() { opts.open(connection); - session = connection.newSession(ASYNC); + session = connection.newSession(); } ~Client() |