From a496df01eec8f7c9989c6ec9fe89ea864a6b9cf9 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 24 May 2010 15:48:18 +0000 Subject: Changed the names of tests which are installed in /usr/bin/ to be prefixed with "qpid-". This will make these generic names easier to associate with qpid. (BZ577353) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@947678 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/CMakeLists.txt | 56 +- qpid/cpp/src/tests/Makefile.am | 60 +- qpid/cpp/src/tests/client_test.cpp | 139 ---- qpid/cpp/src/tests/cluster_authentication_soak.cpp | 22 +- qpid/cpp/src/tests/cluster_tests.py | 2 +- qpid/cpp/src/tests/latencytest.cpp | 469 ------------- qpid/cpp/src/tests/perftest.cpp | 741 --------------------- qpid/cpp/src/tests/qpid-client-test.cpp | 139 ++++ qpid/cpp/src/tests/qpid-latency-test.cpp | 469 +++++++++++++ qpid/cpp/src/tests/qpid-perftest.cpp | 741 +++++++++++++++++++++ qpid/cpp/src/tests/qpid-topic-listener.cpp | 209 ++++++ qpid/cpp/src/tests/qpid-topic-publisher.cpp | 230 +++++++ qpid/cpp/src/tests/qpid-txtest.cpp | 340 ++++++++++ qpid/cpp/src/tests/quick_perftest | 2 +- qpid/cpp/src/tests/quick_txtest | 2 +- qpid/cpp/src/tests/run_perftest | 6 +- qpid/cpp/src/tests/ssl_test | 2 +- qpid/cpp/src/tests/topic_listener.cpp | 209 ------ qpid/cpp/src/tests/topic_publisher.cpp | 230 ------- qpid/cpp/src/tests/topictest | 4 +- qpid/cpp/src/tests/txtest.cpp | 340 ---------- 21 files changed, 2206 insertions(+), 2206 deletions(-) delete mode 100644 qpid/cpp/src/tests/client_test.cpp delete mode 100644 qpid/cpp/src/tests/latencytest.cpp delete mode 100644 qpid/cpp/src/tests/perftest.cpp create mode 100644 qpid/cpp/src/tests/qpid-client-test.cpp create mode 100644 qpid/cpp/src/tests/qpid-latency-test.cpp create mode 100644 qpid/cpp/src/tests/qpid-perftest.cpp create mode 100644 qpid/cpp/src/tests/qpid-topic-listener.cpp create mode 100644 qpid/cpp/src/tests/qpid-topic-publisher.cpp create mode 100644 qpid/cpp/src/tests/qpid-txtest.cpp delete mode 100644 qpid/cpp/src/tests/topic_listener.cpp delete mode 100644 qpid/cpp/src/tests/topic_publisher.cpp delete mode 100644 qpid/cpp/src/tests/txtest.cpp (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index c645815989..47714ac5fe 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -182,40 +182,40 @@ endif (BUILD_CLUSTER) # # Other test programs # -add_executable (perftest perftest.cpp ${platform_test_additions}) -target_link_libraries (perftest qpidclient) -#perftest_SOURCES=perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h -remember_location(perftest) +add_executable (qpid-perftest qpid-perftest.cpp ${platform_test_additions}) +target_link_libraries (qpid-perftest qpidclient) +#qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h +remember_location(qpid-perftest) -add_executable (txtest txtest.cpp ${platform_test_additions}) -target_link_libraries (txtest qpidclient) -#txtest_SOURCES=txtest.cpp TestOptions.h ConnectionOptions.h -remember_location(txtest) +add_executable (qpid-txtest qpid-txtest.cpp ${platform_test_additions}) +target_link_libraries (qpid-txtest qpidclient) +#qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h +remember_location(qpid-txtest) -add_executable (latencytest latencytest.cpp ${platform_test_additions}) -target_link_libraries (latencytest qpidclient) -#latencytest_SOURCES=latencytest.cpp TestOptions.h ConnectionOptions.h -remember_location(latencytest) +add_executable (qpid-latency-test qpid-latency-test.cpp ${platform_test_additions}) +target_link_libraries (qpid-latency-test qpidclient) +#qpid_latencytest_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h +remember_location(qpid-latency-test) add_executable (echotest echotest.cpp ${platform_test_additions}) target_link_libraries (echotest qpidclient) #echotest_SOURCES=echotest.cpp TestOptions.h ConnectionOptions.h remember_location(echotest) -add_executable (client_test client_test.cpp ${platform_test_additions}) -target_link_libraries (client_test qpidclient) -#client_test_SOURCES=client_test.cpp TestOptions.h ConnectionOptions.h -remember_location(client_test) +add_executable (qpid-client-test qpid-client-test.cpp ${platform_test_additions}) +target_link_libraries (qpid-client-test qpidclient) +#qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h +remember_location(qpid-client-test) -add_executable (topic_listener topic_listener.cpp ${platform_test_additions}) -target_link_libraries (topic_listener qpidclient) -#topic_listener_SOURCES=topic_listener.cpp TestOptions.h ConnectionOptions.h -remember_location(topic_listener) +add_executable (qpid-topic-listener qpid-topic-listener.cpp ${platform_test_additions}) +target_link_libraries (qpid-topic-listener qpidclient) +#qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h +remember_location(qpid-topic-listener) -add_executable (topic_publisher topic_publisher.cpp ${platform_test_additions}) -target_link_libraries (topic_publisher qpidclient) -#topic_publisher_SOURCES=topic_publisher.cpp TestOptions.h ConnectionOptions.h -remember_location(topic_publisher) +add_executable (qpid-topic-publisher qpid-topic-publisher.cpp ${platform_test_additions}) +target_link_libraries (qpid-topic-publisher qpidclient) +#qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h +remember_location(qpid-topic-publisher) add_executable (publish publish.cpp ${platform_test_additions}) target_link_libraries (publish qpidclient) @@ -272,8 +272,8 @@ add_executable (qpid_send qpid_send.cpp Statistics.cpp ${platform_test_additions target_link_libraries (qpid_send qpidmessaging) remember_location(qpid_send) -# perftest and latencytest are generally useful so install them -install (TARGETS perftest latencytest RUNTIME +# qpid-perftest and qpid-latency-test are generally useful so install them +install (TARGETS qpid-perftest qpid-latency-test RUNTIME DESTINATION ${QPID_INSTALL_BINDIR}) if (CMAKE_SYSTEM_NAME STREQUAL Windows) @@ -286,8 +286,8 @@ set(test_wrap ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_test${test_script_suffix} add_test (unit_test ${test_wrap} ${unit_test_LOCATION}) add_test (start_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/start_broker${test_script_suffix}) -add_test (client_test ${test_wrap} ${client_test_LOCATION}) -add_test (quick_perftest ${test_wrap} ${perftest_LOCATION} --summary --count 100) +add_test (qpid-client-test ${test_wrap} ${client_test_LOCATION}) +add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100) add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix}) add_test (quick_txtest ${test_wrap} ${txtest_LOCATION} --queues 4 --tx-count 10 --quiet) if (PYTHON_EXECUTABLE) diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 235e6fed04..92e4e858f7 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -184,35 +184,35 @@ qpid_send_SOURCES = \ Statistics.cpp qpid_send_LDADD = $(lib_messaging) -qpidtest_PROGRAMS+=perftest -perftest_SOURCES=perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h -perftest_INCLUDES=$(PUBLIC_INCLUDES) -perftest_LDADD=$(lib_client) - -qpidtest_PROGRAMS+=txtest -txtest_INCLUDES=$(PUBLIC_INCLUDES) -txtest_SOURCES=txtest.cpp TestOptions.h ConnectionOptions.h -txtest_LDADD=$(lib_client) - -qpidtest_PROGRAMS+=latencytest -latencytest_INCLUDES=$(PUBLIC_INCLUDES) -latencytest_SOURCES=latencytest.cpp TestOptions.h ConnectionOptions.h -latencytest_LDADD=$(lib_client) - -qpidtest_PROGRAMS+=client_test -client_test_INCLUDES=$(PUBLIC_INCLUDES) -client_test_SOURCES=client_test.cpp TestOptions.h ConnectionOptions.h -client_test_LDADD=$(lib_client) - -qpidtest_PROGRAMS+=topic_listener -topic_listener_INCLUDES=$(PUBLIC_INCLUDES) -topic_listener_SOURCES=topic_listener.cpp TestOptions.h ConnectionOptions.h -topic_listener_LDADD=$(lib_client) - -qpidtest_PROGRAMS+=topic_publisher -topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) -topic_publisher_SOURCES=topic_publisher.cpp TestOptions.h ConnectionOptions.h -topic_publisher_LDADD=$(lib_client) +qpidtest_PROGRAMS+=qpid-perftest +qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h +qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES) +qpid_perftest_LDADD=$(lib_client) + +qpidtest_PROGRAMS+=qpid-txtest +qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES) +qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h +qpid_txtest_LDADD=$(lib_client) + +qpidtest_PROGRAMS+=qpid-latency-test +qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES) +qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h +qpid_latency_test_LDADD=$(lib_client) + +qpidtest_PROGRAMS+=qpid-client-test +qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES) +qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h +qpid_client_test_LDADD=$(lib_client) + +qpidtest_PROGRAMS+=qpid-topic-listener +qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES) +qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h +qpid_topic_listener_LDADD=$(lib_client) + +qpidtest_PROGRAMS+=qpid-topic-publisher +qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) +qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h +qpid_topic_publisher_LDADD=$(lib_client) qpidtest_PROGRAMS+=qpid_ping qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) @@ -313,7 +313,7 @@ TESTS_ENVIRONMENT = \ QPID_DATA_DIR= \ $(srcdir)/run_test -system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest +system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test EXTRA_DIST += \ diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp deleted file mode 100644 index 2f5e8e5afe..0000000000 --- a/qpid/cpp/src/tests/client_test.cpp +++ /dev/null @@ -1,139 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This file provides a simple test (and example) of basic - * functionality including declaring an exchange and a queue, binding - * these together, publishing a message and receiving that message - * asynchronously. - */ - -#include - -#include "TestOptions.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/Session.h" -#include "qpid/client/SubscriptionManager.h" - - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::framing; -using std::string; - -namespace qpid { -namespace tests { - -struct Args : public TestOptions { - uint msgSize; - bool verbose; - - Args() : TestOptions("Simple test of Qpid c++ client; sends and receives a single message."), msgSize(26) - { - addOptions() - ("size", optValue(msgSize, "N"), "message size") - ("verbose", optValue(verbose), "print out some status messages"); - } -}; - -const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); - -std::string generateData(uint size) -{ - if (size < chars.length()) { - return chars.substr(0, size); - } - std::string data; - for (uint i = 0; i < (size / chars.length()); i++) { - data += chars; - } - data += chars.substr(0, size % chars.length()); - return data; -} - -void print(const std::string& text, const Message& msg) -{ - std::cout << text; - if (msg.getData().size() > 16) { - std::cout << msg.getData().substr(0, 16) << "..."; - } else { - std::cout << msg.getData(); - } - std::cout << std::endl; -} - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) -{ - try { - Args opts; - opts.parse(argc, argv); - - //Connect to the broker: - Connection connection; - opts.open(connection); - if (opts.verbose) std::cout << "Opened connection." << std::endl; - - //Create and open a session on the connection through which - //most functionality is exposed: - Session session = connection.newSession(); - if (opts.verbose) std::cout << "Opened session." << std::endl; - - - //'declare' the exchange and the queue, which will create them - //as they don't exist - session.exchangeDeclare(arg::exchange="MyExchange", arg::type="direct"); - if (opts.verbose) std::cout << "Declared exchange." << std::endl; - session.queueDeclare(arg::queue="MyQueue", arg::autoDelete=true, arg::exclusive=true); - if (opts.verbose) std::cout << "Declared queue." << std::endl; - - //now bind the queue to the exchange - session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey"); - if (opts.verbose) std::cout << "Bound queue to exchange." << std::endl; - - //create and send a message to the exchange using the routing - //key we bound our queue with: - Message msgOut(generateData(opts.msgSize)); - msgOut.getDeliveryProperties().setRoutingKey("MyKey"); - session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1); - if (opts.verbose) print("Published message: ", msgOut); - - // Using the SubscriptionManager, get the message from the queue. - SubscriptionManager subs(session); - Message msgIn = subs.get("MyQueue"); - if (msgIn.getData() == msgOut.getData()) - if (opts.verbose) std::cout << "Received the exepected message." << std::endl; - - //close the session & connection - session.close(); - if (opts.verbose) std::cout << "Closed session." << std::endl; - connection.close(); - if (opts.verbose) std::cout << "Closed connection." << std::endl; - return 0; - } catch(const std::exception& e) { - std::cout << e.what() << std::endl; - } - return 1; -} diff --git a/qpid/cpp/src/tests/cluster_authentication_soak.cpp b/qpid/cpp/src/tests/cluster_authentication_soak.cpp index 985c3aa52a..ccf4d278c0 100644 --- a/qpid/cpp/src/tests/cluster_authentication_soak.cpp +++ b/qpid/cpp/src/tests/cluster_authentication_soak.cpp @@ -104,10 +104,10 @@ runPerftest ( ) { stringstream portSs; portSs << newbiePort; - char const * path = "./perftest"; + char const * path = "./qpid-perftest"; vector argv; - argv.push_back ( "./perftest" ); + argv.push_back ( "./qpid-perftest" ); argv.push_back ( "-p" ); argv.push_back ( portSs.str().c_str() ); argv.push_back ( "--username" ); @@ -129,7 +129,7 @@ runPerftest ( ) { execv ( path, const_cast(&argv[0]) ); // The exec failed: we are still in parent process. - perror ( "error running perftest: " ); + perror ( "error running qpid-perftest: " ); return false; } else { @@ -146,19 +146,19 @@ runPerftest ( ) { if ( returned_pid == pid ) { int exit_status = WEXITSTATUS(status); if ( exit_status ) { - cerr << "Perftest failed. exit_status was: " << exit_status; + cerr << "qpid-perftest failed. exit_status was: " << exit_status; return false; } else { - return true; // perftest succeeded. + return true; // qpid-perftest succeeded. } } - else { // perftest has not yet completed. + else { // qpid-perftest has not yet completed. gettimeofday ( & currentTime, 0 ); timersub ( & currentTime, & startTime, & duration ); if ( duration.tv_sec > 60 ) { kill ( pid, 9 ); - cerr << "Perftest pid " << pid << " hanging: killed.\n"; + cerr << "qpid-perftest pid " << pid << " hanging: killed.\n"; return false; } } @@ -214,7 +214,7 @@ main ( int argc, char ** argv ) sleep ( 3 ); - /* Run all perftest iterations, and only then check for brokers + /* Run all qpid-perftest iterations, and only then check for brokers * still being up. If you just want a quick check for the failure * mode in which a single iteration would kill all brokers except * the client-connected one, just run it with the iterations arg @@ -222,14 +222,14 @@ main ( int argc, char ** argv ) */ for ( int iteration = 0; iteration < n_iterations; ++ iteration ) { if ( ! runPerftest ( ) ) { - cerr << "Perftest " << iteration << " failed.\n"; + cerr << "qpid-perftest " << iteration << " failed.\n"; return 1; } if ( ! ( iteration % 10 ) ) { - cerr << "perftest " << iteration << " complete. -------------- \n"; + cerr << "qpid-perftest " << iteration << " complete. -------------- \n"; } } - cerr << "\nperftest " << n_iterations << " iterations complete. -------------- \n\n"; + cerr << "\nqpid-perftest " << n_iterations << " iterations complete. -------------- \n\n"; if ( ! allBrokersAreAlive ( brokers ) ) { cerr << "not all brokers are alive.\n"; diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 893fb9928d..f36cde9ecc 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -211,7 +211,7 @@ class LongTests(BrokerTest): """Start ordinary clients for a broker. Start one client per broker. Round-robin on a colllection of different clients.""" cmds=[ - ["perftest", "--count", 50000, + ["qpid-perftest", "--count", 50000, "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], ["testagent", "localhost", str(broker.port())] ] diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp deleted file mode 100644 index 20eb4568f3..0000000000 --- a/qpid/cpp/src/tests/latencytest.cpp +++ /dev/null @@ -1,469 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include -#include -#include -#include -#include -#include - -#include "TestOptions.h" -#include "qpid/sys/Thread.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/sys/Time.h" - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - -namespace qpid { -namespace tests { - -typedef std::vector StringSet; - -struct Args : public qpid::TestOptions { - uint size; - uint count; - uint rate; - bool sync; - uint reportFrequency; - uint timeLimit; - uint concurrentConnections; - uint prefetch; - uint ack; - bool cumulative; - bool csv; - bool durable; - string base; - bool singleConnect; - - Args() : size(256), count(1000), rate(0), reportFrequency(1000), - timeLimit(0), concurrentConnections(1), - prefetch(100), ack(0), - durable(false), base("latency-test"), singleConnect(false) - - { - addOptions() - - ("size", optValue(size, "N"), "message size") - ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setups, will create another publisher,\ - subcriber, queue, and connections") - ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") - ("count", optValue(count, "N"), "number of messages to send") - ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") - ("sync", optValue(sync), "send messages synchronously") - ("report-frequency", optValue(reportFrequency, "N"), - "number of milliseconds to wait between reports (ignored unless rate specified)") - ("time-limit", optValue(timeLimit, "N"), - "test duration, in seconds") - ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") - ("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)") - ("durable", optValue(durable, "yes|no"), "use durable messages") - ("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)") - ("cumulative", optValue(cumulative), "cumulative stats in csv format") - ("queue-base-name", optValue(base, ""), "base name for queues"); - } -}; - -const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); - -Args opts; -double c_min, c_avg, c_max; -Connection globalConnection; - -uint64_t current_time() -{ - Duration t(EPOCH, now()); - return t; -} - -struct Stats -{ - Mutex lock; - uint count; - double minLatency; - double maxLatency; - double totalLatency; - - Stats(); - void update(double l); - void print(); - void reset(); -}; - -class Client : public Runnable -{ -protected: - Connection* connection; - Connection localConnection; - AsyncSession session; - Thread thread; - string queue; - -public: - Client(const string& q); - virtual ~Client(); - - void start(); - void join(); - void run(); - virtual void test() = 0; -}; - -class Receiver : public Client, public MessageListener -{ - SubscriptionManager mgr; - uint count; - Stats& stats; - -public: - Receiver(const string& queue, Stats& stats); - void test(); - void received(Message& msg); - Stats getStats(); - uint getCount() { return count; } - void stop() { mgr.stop(); mgr.cancel(queue); } -}; - - -class Sender : public Client -{ - string generateData(uint size); - void sendByRate(); - void sendByCount(); - Receiver& receiver; - const string data; - -public: - Sender(const string& queue, Receiver& receiver); - void test(); -}; - - -class Test -{ - const string queue; - Stats stats; - Receiver receiver; - Sender sender; - AbsTime begin; - -public: - Test(const string& q) : queue(q), receiver(queue, stats), sender(queue, receiver), begin(now()) {} - void start(); - void join(); - void report(); -}; - - -Client::Client(const string& q) : queue(q) -{ - if (opts.singleConnect){ - connection = &globalConnection; - if (!globalConnection.isOpen()) opts.open(globalConnection); - }else{ - connection = &localConnection; - opts.open(localConnection); - } - session = connection->newSession(); -} - -void Client::start() -{ - thread = Thread(this); -} - -void Client::join() -{ - thread.join(); -} - -void Client::run() -{ - try{ - test(); - } catch(const std::exception& e) { - std::cout << "Error in receiver: " << e.what() << std::endl; - } -} - -Client::~Client() -{ - try{ - session.close(); - connection->close(); - } catch(const std::exception& e) { - std::cout << "Error in receiver: " << e.what() << std::endl; - } -} - -Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0), stats(s) -{ - session.queueDeclare(arg::queue=queue, arg::durable=opts.durable, arg::autoDelete=true); - uint msgCount = session.queueQuery(arg::queue=queue).get().getMessageCount(); - if (msgCount) { - std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl; - session.queuePurge(arg::queue=queue); - session.sync(); - } - SubscriptionSettings settings; - if (opts.prefetch) { - settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2)); - settings.flowControl = FlowControl::messageWindow(opts.prefetch); - } else { - settings.acceptMode = ACCEPT_MODE_NONE; - settings.flowControl = FlowControl::unlimited(); - } - mgr.subscribe(*this, queue, settings); -} - -void Receiver::test() -{ - mgr.run(); - mgr.cancel(queue); -} - -void Receiver::received(Message& msg) -{ - ++count; - uint64_t receivedAt = current_time(); - uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); - - stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC); - - if (!opts.rate && count >= opts.count) { - mgr.stop(); - } -} - -void Stats::update(double latency) -{ - Mutex::ScopedLock l(lock); - count++; - minLatency = std::min(minLatency, latency); - maxLatency = std::max(maxLatency, latency); - totalLatency += latency; -} - -Stats::Stats() : count(0), minLatency(std::numeric_limits::max()), maxLatency(0), totalLatency(0) {} - -void Stats::print() -{ - static bool already_have_stats = false; - uint value; - - if (opts.rate) - value = opts.rate; - else - value = opts.count; - Mutex::ScopedLock l(lock); - double aux_avg = (totalLatency / count); - if (!opts.cumulative) { - if (!opts.csv) { - if (count) { - std::cout << "Latency(ms): min=" << minLatency << ", max=" << - maxLatency << ", avg=" << aux_avg; - } else { - std::cout << "Stalled: no samples for interval"; - } - } else { - if (count) { - std::cout << value << "," << minLatency << "," << maxLatency << - "," << aux_avg; - } else { - std::cout << value << "," << minLatency << "," << maxLatency << - ", Stalled"; - } - } - } else { - if (count) { - if (already_have_stats) { - c_avg = (c_min + aux_avg) / 2; - if (c_min > minLatency) c_min = minLatency; - if (c_max < maxLatency) c_max = maxLatency; - } else { - c_avg = aux_avg; - c_min = minLatency; - c_max = maxLatency; - already_have_stats = true; - } - std::cout << value << "," << c_min << "," << c_max << - "," << c_avg; - } else { - std::cout << "Stalled: no samples for interval"; - } - } -} - -void Stats::reset() -{ - Mutex::ScopedLock l(lock); - count = 0; - totalLatency = maxLatency = 0; - minLatency = std::numeric_limits::max(); -} - -Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {} - -void Sender::test() -{ - if (opts.rate) sendByRate(); - else sendByCount(); -} - -void Sender::sendByCount() -{ - Message msg(data, queue); - if (opts.durable) { - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - } - - for (uint i = 0; i < opts.count; i++) { - uint64_t sentAt(current_time()); - msg.getDeliveryProperties().setTimestamp(sentAt); - async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); - if (opts.sync) session.sync(); - } - session.sync(); -} - -void Sender::sendByRate() -{ - Message msg(data, queue); - if (opts.durable) { - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - } - uint64_t interval = TIME_SEC/opts.rate; - int64_t timeLimit = opts.timeLimit * TIME_SEC; - uint64_t sent = 0, missedRate = 0; - AbsTime start = now(); - while (true) { - AbsTime sentAt=now(); - msg.getDeliveryProperties().setTimestamp(Duration(EPOCH, sentAt)); - async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); - if (opts.sync) session.sync(); - ++sent; - AbsTime waitTill(start, sent*interval); - Duration delay(sentAt, waitTill); - if (delay < 0) - ++missedRate; - else - sys::usleep(delay / TIME_USEC); - if (timeLimit != 0 && Duration(start, now()) > timeLimit) { - session.sync(); - receiver.stop(); - break; - } - } -} - -string Sender::generateData(uint size) -{ - if (size < chars.length()) { - return chars.substr(0, size); - } - std::string data; - for (uint i = 0; i < (size / chars.length()); i++) { - data += chars; - } - data += chars.substr(0, size % chars.length()); - return data; -} - - -void Test::start() -{ - receiver.start(); - begin = AbsTime(now()); - sender.start(); -} - -void Test::join() -{ - sender.join(); - receiver.join(); - AbsTime end = now(); - Duration time(begin, end); - double msecs(time / TIME_MSEC); - if (!opts.csv) { - std::cout << "Sent " << receiver.getCount() << " msgs through " << queue - << " in " << msecs << "ms (" << (receiver.getCount() * 1000 / msecs) << " msgs/s) "; - } - stats.print(); - std::cout << std::endl; -} - -void Test::report() -{ - stats.print(); - std::cout << std::endl; - stats.reset(); -} - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) -{ - try { - opts.parse(argc, argv); - if (opts.cumulative) - opts.csv = true; - - Connection localConnection; - AsyncSession session; - - boost::ptr_vector tests(opts.concurrentConnections); - for (uint i = 0; i < opts.concurrentConnections; i++) { - std::ostringstream out; - out << opts.base << "-" << (i+1); - tests.push_back(new Test(out.str())); - } - for (boost::ptr_vector::iterator i = tests.begin(); i != tests.end(); i++) { - i->start(); - } - if (opts.rate && !opts.timeLimit) { - while (true) { - qpid::sys::usleep(opts.reportFrequency * 1000); - //print latency report: - for (boost::ptr_vector::iterator i = tests.begin(); i != tests.end(); i++) { - i->report(); - } - } - } else { - for (boost::ptr_vector::iterator i = tests.begin(); i != tests.end(); i++) { - i->join(); - } - } - - return 0; - } catch(const std::exception& e) { - std::cout << e.what() << std::endl; - } - return 1; -} diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp deleted file mode 100644 index 88d9fd15cb..0000000000 --- a/qpid/cpp/src/tests/perftest.cpp +++ /dev/null @@ -1,741 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "TestOptions.h" - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Completion.h" -#include "qpid/client/Message.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Thread.h" - -#include -#include -#include -#include - -#include -#include -#include -#include -#include - - -using namespace std; -using namespace qpid; -using namespace client; -using namespace sys; -using boost::lexical_cast; -using boost::bind; - -namespace qpid { -namespace tests { - -enum Mode { SHARED, FANOUT, TOPIC }; -const char* modeNames[] = { "shared", "fanout", "topic" }; - -// istream/ostream ops so Options can read/display Mode. -istream& operator>>(istream& in, Mode& mode) { - string s; - in >> s; - int i = find(modeNames, modeNames+3, s) - modeNames; - if (i >= 3) throw Exception("Invalid mode: "+s); - mode = Mode(i); - return in; -} - -ostream& operator<<(ostream& out, Mode mode) { - return out << modeNames[mode]; -} - - -struct Opts : public TestOptions { - - // Actions - bool setup, control, publish, subscribe; - - // Queue policy - uint32_t queueMaxCount; - uint64_t queueMaxSize; - std::string baseName; - bool queueDurable; - - // Publisher - size_t pubs; - size_t count ; - size_t size; - bool confirm; - bool durable; - bool uniqueData; - bool syncPub; - - // Subscriber - size_t subs; - size_t ack; - - // General - size_t qt; - bool singleConnect; - size_t iterations; - Mode mode; - bool summary; - uint32_t intervalSub; - uint32_t intervalPub; - size_t tx; - size_t txPub; - size_t txSub; - bool commitAsync; - - static const std::string helpText; - - Opts() : - TestOptions(helpText), - setup(false), control(false), publish(false), subscribe(false), baseName("perftest"), - pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), - subs(1), ack(0), - qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false) - { - addOptions() - ("setup", optValue(setup), "Create shared queues.") - ("control", optValue(control), "Run test, print report.") - ("publish", optValue(publish), "Publish messages.") - ("subscribe", optValue(subscribe), "Subscribe for messages.") - - ("mode", optValue(mode, "shared|fanout|topic"), "Test mode." - "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n" - "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange." - "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n") - - ("npubs", optValue(pubs, "N"), "Create N publishers.") - ("count", optValue(count, "N"), "Each publisher sends N messages.") - ("size", optValue(size, "BYTES"), "Size of messages in bytes.") - ("pub-confirm", optValue(confirm, "yes|no"), "Publisher use confirm-mode.") - ("durable", optValue(durable, "yes|no"), "Publish messages as durable.") - ("unique-data", optValue(uniqueData, "yes|no"), "Make data for each message unique.") - ("sync-publish", optValue(syncPub, "yes|no"), "Wait for confirmation of each message before sending the next one.") - - ("nsubs", optValue(subs, "N"), "Create N subscribers.") - ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n" - "N==0: Subscriber uses unconfirmed mode") - - ("qt", optValue(qt, "N"), "Create N queues or topics.") - ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") - - ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.") - ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec") - - ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'") - ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'") - ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") - ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") - - ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") - ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") - - ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming") - ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing") - ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit") - ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming"); - } - - // Computed values - size_t totalPubs; - size_t totalSubs; - size_t transfers; - size_t subQuota; - - void parse(int argc, char** argv) { - TestOptions::parse(argc, argv); - switch (mode) { - case SHARED: - if (count % subs) { - count += subs - (count % subs); - cout << "WARNING: Adjusted --count to " << count - << " the nearest multiple of --nsubs" << endl; - } - totalPubs = pubs*qt; - totalSubs = subs*qt; - subQuota = (pubs*count)/subs; - break; - case FANOUT: - if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt=" - << qt << endl; - qt=1; - totalPubs = pubs; - totalSubs = subs; - subQuota = totalPubs*count; - break; - case TOPIC: - totalPubs = pubs*qt; - totalSubs = subs*qt; - subQuota = pubs*count; - break; - } - transfers=(totalPubs*count) + (totalSubs*subQuota); - if (tx) { - if (txPub) { - cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl; - } else { - txPub = tx; - } - if (txSub) { - cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl; - } else { - txSub = tx; - } - } - } -}; - -const std::string Opts::helpText= -"There are two ways to use perftest: single process or multi-process.\n\n" -"If none of the --setup, --publish, --subscribe or --control options\n" -"are given perftest will run a single-process test.\n" -"For a multi-process test first run:\n" -" perftest --setup \n" -"and wait for it to complete. The remaining process should run concurrently::\n" -"Run --npubs times: perftest --publish \n" -"Run --nsubs times: perftest --subscribe \n" -"Run once: perftest --control \n" -"Note the must be identical for all processes.\n"; - -Opts opts; -Connection globalConnection; - -std::string fqn(const std::string& name) -{ - ostringstream fqn; - fqn << opts.baseName << "_" << name; - return fqn.str(); -} - -struct Client : public Runnable { - Connection* connection; - Connection localConnection; - AsyncSession session; - Thread thread; - - Client() { - if (opts.singleConnect){ - connection = &globalConnection; - if (!globalConnection.isOpen()) opts.open(globalConnection); - }else{ - connection = &localConnection; - opts.open(localConnection); - } - session = connection->newSession(); - } - - ~Client() { - try { - if (connection->isOpen()) { - session.close(); - connection->close(); - } - } catch (const std::exception& e) { - std::cerr << "Error in shutdown: " << e.what() << std::endl; - } - } -}; - -struct Setup : public Client { - - void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { - session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); - session.queuePurge(arg::queue=name); - session.sync(); - } - - void run() { - queueInit(fqn("pub_start")); - queueInit(fqn("pub_done")); - queueInit(fqn("sub_ready")); - queueInit(fqn("sub_done")); - if (opts.iterations > 1) queueInit(fqn("sub_iteration")); - if (opts.mode==SHARED) { - framing::FieldTable settings;//queue policy settings - settings.setInt("qpid.max_count", opts.queueMaxCount); - settings.setInt("qpid.max_size", opts.queueMaxSize); - for (size_t i = 0; i < opts.qt; ++i) { - ostringstream qname; - qname << opts.baseName << i; - queueInit(qname.str(), opts.durable || opts.queueDurable, settings); - } - } - } -}; - -void expect(string actual, string expect) { - if (expect != actual) - throw Exception("Expecting "+expect+" but received "+actual); - -} - -double secs(Duration d) { return double(d)/TIME_SEC; } -double secs(AbsTime start, AbsTime finish) { - return secs(Duration(start,finish)); -} - - -// Collect rates & print stats. -class Stats { - vector values; - double sum; - - public: - Stats() : sum(0) {} - - // Functor to collect rates. - void operator()(const string& data) { - try { - double d=lexical_cast(data); - values.push_back(d); - sum += d; - } catch (const std::exception&) { - throw Exception("Bad report: "+data); - } - } - - double mean() const { - return sum/values.size(); - } - - double stdev() const { - if (values.size() <= 1) return 0; - double avg = mean(); - double ssq = 0; - for (vector::const_iterator i = values.begin(); - i != values.end(); ++i) { - double x=*i; - x -= avg; - ssq += x*x; - } - return sqrt(ssq/(values.size()-1)); - } - - ostream& print(ostream& out) { - ostream_iterator o(out, "\n"); - copy(values.begin(), values.end(), o); - out << "Average: " << mean(); - if (values.size() > 1) - out << " (std.dev. " << stdev() << ")"; - return out << endl; - } -}; - - -// Manage control queues, collect and print reports. -struct Controller : public Client { - - SubscriptionManager subs; - - Controller() : subs(session) {} - - /** Process messages from queue by applying a functor. */ - void process(size_t n, string queue, - boost::function msgFn) - { - if (!opts.summary) - cout << "Processing " << n << " messages from " - << queue << " " << flush; - LocalQueue lq; - subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false); - subs.subscribe(lq, queue); - for (size_t i = 0; i < n; ++i) { - if (!opts.summary) cout << "." << flush; - msgFn(lq.pop().getData()); - } - if (!opts.summary) cout << " done." << endl; - } - - void process(size_t n, LocalQueue lq, string queue, - boost::function msgFn) - { - session.messageFlow(queue, 0, n); - if (!opts.summary) - cout << "Processing " << n << " messages from " - << queue << " " << flush; - for (size_t i = 0; i < n; ++i) { - if (!opts.summary) cout << "." << flush; - msgFn(lq.pop().getData()); - } - if (!opts.summary) cout << " done." << endl; - } - - void send(size_t n, string queue, string data) { - if (!opts.summary) - cout << "Sending " << data << " " << n << " times to " << queue - << endl; - Message msg(data, queue); - for (size_t i = 0; i < n; ++i) - session.messageTransfer(arg::content=msg, arg::acceptMode=1); - } - - void run() { // Controller - try { - // Wait for subscribers to be ready. - process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready")); - - LocalQueue pubDone; - LocalQueue subDone; - subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); - subs.subscribe(pubDone, fqn("pub_done")); - subs.subscribe(subDone, fqn("sub_done")); - - double txrateTotal(0); - double mbytesTotal(0); - double pubRateTotal(0); - double subRateTotal(0); - - for (size_t j = 0; j < opts.iterations; ++j) { - AbsTime start=now(); - send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers - if (j) { - send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration - } - - Stats pubRates; - Stats subRates; - - process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates)); - process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates)); - - AbsTime end=now(); - - double time=secs(start, end); - double txrate=opts.transfers/time; - double mbytes=(txrate*opts.size)/(1024*1024); - - if (!opts.summary) { - cout << endl << "Total " << opts.transfers << " transfers of " - << opts.size << " bytes in " - << time << " seconds." << endl; - cout << endl << "Publish transfers/sec: " << endl; - pubRates.print(cout); - cout << endl << "Subscribe transfers/sec: " << endl; - subRates.print(cout); - cout << endl - << "Total transfers/sec: " << txrate << endl - << "Total Mbytes/sec: " << mbytes << endl; - } - else { - cout << pubRates.mean() << "\t" - << subRates.mean() << "\t" - << txrate << "\t" - << mbytes << endl; - } - - txrateTotal += txrate; - mbytesTotal += mbytes; - pubRateTotal += pubRates.mean(); - subRateTotal += subRates.mean(); - } - if (opts.iterations > 1) { - cout << "Averages: "<< endl - << (pubRateTotal / opts.iterations) << "\t" - << (subRateTotal / opts.iterations) << "\t" - << (txrateTotal / opts.iterations) << "\t" - << (mbytesTotal / opts.iterations) << endl; - } - } - catch (const std::exception& e) { - cout << "Controller exception: " << e.what() << endl; - } - } -}; - - -struct PublishThread : public Client { - string destination; - string routingKey; - - PublishThread() {}; - - PublishThread(string key, string dest=string()) { - destination=dest; - routingKey=key; - } - - void run() { // Publisher - try { - string data; - size_t offset(0); - if (opts.uniqueData) { - offset = 5; - data += "data:";//marker (requested for latency testing tool scripts) - data += string(sizeof(size_t), 'X');//space for seq no - data += session.getId().str(); - if (opts.size > data.size()) { - data += string(opts.size - data.size(), 'X'); - } else if(opts.size < data.size()) { - cout << "WARNING: Increased --size to " << data.size() - << " to honour --unique-data" << endl; - } - } else { - size_t msgSize=max(opts.size, sizeof(size_t)); - data = string(msgSize, 'X'); - } - - Message msg(data, routingKey); - if (opts.durable) - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - - - if (opts.txPub){ - session.txSelect(); - } - SubscriptionManager subs(session); - LocalQueue lq; - subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); - subs.subscribe(lq, fqn("pub_start")); - - for (size_t j = 0; j < opts.iterations; ++j) { - expect(lq.pop().getData(), "start"); - AbsTime start=now(); - for (size_t i=0; i(msg.getData()).replace(offset, sizeof(size_t), - reinterpret_cast(&i), sizeof(size_t)); - if (opts.syncPub) { - sync(session).messageTransfer( - arg::destination=destination, - arg::content=msg, - arg::acceptMode=1); - } else { - session.messageTransfer( - arg::destination=destination, - arg::content=msg, - arg::acceptMode=1); - } - if (opts.txPub && ((i+1) % opts.txPub == 0)){ - if (opts.commitAsync){ - session.txCommit(); - } else { - sync(session).txCommit(); - } - } - if (opts.intervalPub) - qpid::sys::usleep(opts.intervalPub*1000); - } - if (opts.confirm) session.sync(); - AbsTime end=now(); - double time=secs(start,end); - - // Send result to controller. - Message report(lexical_cast(opts.count/time), fqn("pub_done")); - session.messageTransfer(arg::content=report, arg::acceptMode=1); - if (opts.txPub){ - sync(session).txCommit(); - } - } - session.close(); - } - catch (const std::exception& e) { - cout << "PublishThread exception: " << e.what() << endl; - } - } -}; - -struct SubscribeThread : public Client { - - string queue; - - SubscribeThread() {} - - SubscribeThread(string q) { queue = q; } - - SubscribeThread(string key, string ex) { - queue=session.getId().str(); // Unique name. - session.queueDeclare(arg::queue=queue, - arg::exclusive=true, - arg::autoDelete=true, - arg::durable=opts.durable); - session.exchangeBind(arg::queue=queue, - arg::exchange=ex, - arg::bindingKey=key); - } - - void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) { - if (!cond) { - Message error( - QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual), - "sub_done"); - session.messageTransfer(arg::content=error, arg::acceptMode=1); - throw Exception(error.getData()); - } - } - - void run() { // Subscribe - try { - if (opts.txSub) sync(session).txSelect(); - SubscriptionManager subs(session); - SubscriptionSettings settings; - settings.autoAck = opts.txSub ? opts.txSub : opts.ack; - settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE); - settings.flowControl = FlowControl::messageCredit(opts.subQuota); - LocalQueue lq; - Subscription subscription = subs.subscribe(lq, queue, settings); - // Notify controller we are ready. - session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1); - if (opts.txSub) { - if (opts.commitAsync) session.txCommit(); - else sync(session).txCommit(); - } - - LocalQueue iterationControl; - if (opts.iterations > 1) { - subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0))); - } - - for (size_t j = 0; j < opts.iterations; ++j) { - if (j > 0) { - //need to wait here until all subs are done - session.messageFlow(fqn("sub_iteration"), 0, 1); - iterationControl.pop(); - - //need to allocate some more credit for subscription - session.messageFlow(queue, 0, opts.subQuota); - } - Message msg; - AbsTime start=now(); - size_t expect=0; - for (size_t i = 0; i < opts.subQuota; ++i) { - msg=lq.pop(); - if (opts.txSub && ((i+1) % opts.txSub == 0)) { - if (opts.commitAsync) session.txCommit(); - else sync(session).txCommit(); - } - if (opts.intervalSub) - qpid::sys::usleep(opts.intervalSub*1000); - // TODO aconway 2007-11-23: check message order for. - // multiple publishers. Need an array of counters, - // one per publisher and a publisher ID in the - // message. Careful not to introduce a lot of overhead - // here, e.g. no std::map, std::string etc. - // - // For now verify order only for a single publisher. - size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0; - size_t n = *reinterpret_cast(msg.getData().data() + offset); - if (opts.pubs == 1) { - if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n); - else verify(n>=expect, ">=", expect, n); - expect = n+1; - } - } - if (opts.txSub || opts.ack) - subscription.accept(subscription.getUnaccepted()); - if (opts.txSub) { - if (opts.commitAsync) session.txCommit(); - else sync(session).txCommit(); - } - AbsTime end=now(); - - // Report to publisher. - Message result(lexical_cast(opts.subQuota/secs(start,end)), - fqn("sub_done")); - session.messageTransfer(arg::content=result, arg::acceptMode=1); - if (opts.txSub) sync(session).txCommit(); - } - session.close(); - } - catch (const std::exception& e) { - cout << "SubscribeThread exception: " << e.what() << endl; - } - } -}; - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) { - int exitCode = 0; - boost::ptr_vector subs(opts.subs); - boost::ptr_vector pubs(opts.pubs); - - try { - opts.parse(argc, argv); - - string exchange; - switch (opts.mode) { - case FANOUT: exchange="amq.fanout"; break; - case TOPIC: exchange="amq.topic"; break; - case SHARED: break; - } - - bool singleProcess= - (!opts.setup && !opts.control && !opts.publish && !opts.subscribe); - if (singleProcess) - opts.setup = opts.control = opts.publish = opts.subscribe = true; - - if (opts.setup) Setup().run(); // Set up queues - - // Start pubs/subs for each queue/topic. - for (size_t i = 0; i < opts.qt; ++i) { - ostringstream key; - key << opts.baseName << i; // Queue or topic name. - if (opts.publish) { - size_t n = singleProcess ? opts.pubs : 1; - for (size_t j = 0; j < n; ++j) { - pubs.push_back(new PublishThread(key.str(), exchange)); - pubs.back().thread=Thread(pubs.back()); - } - } - if (opts.subscribe) { - size_t n = singleProcess ? opts.subs : 1; - for (size_t j = 0; j < n; ++j) { - if (opts.mode==SHARED) - subs.push_back(new SubscribeThread(key.str())); - else - subs.push_back(new SubscribeThread(key.str(),exchange)); - subs.back().thread=Thread(subs.back()); - } - } - } - - if (opts.control) Controller().run(); - } - catch (const std::exception& e) { - cout << endl << e.what() << endl; - exitCode = 1; - } - - // Wait for started threads. - if (opts.publish) { - for (boost::ptr_vector::iterator i=pubs.begin(); - i != pubs.end(); - ++i) - i->thread.join(); - } - - if (opts.subscribe) { - for (boost::ptr_vector::iterator i=subs.begin(); - i != subs.end(); - ++i) - i->thread.join(); - } - return exitCode; -} diff --git a/qpid/cpp/src/tests/qpid-client-test.cpp b/qpid/cpp/src/tests/qpid-client-test.cpp new file mode 100644 index 0000000000..2f5e8e5afe --- /dev/null +++ b/qpid/cpp/src/tests/qpid-client-test.cpp @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file provides a simple test (and example) of basic + * functionality including declaring an exchange and a queue, binding + * these together, publishing a message and receiving that message + * asynchronously. + */ + +#include + +#include "TestOptions.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/Session.h" +#include "qpid/client/SubscriptionManager.h" + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; +using std::string; + +namespace qpid { +namespace tests { + +struct Args : public TestOptions { + uint msgSize; + bool verbose; + + Args() : TestOptions("Simple test of Qpid c++ client; sends and receives a single message."), msgSize(26) + { + addOptions() + ("size", optValue(msgSize, "N"), "message size") + ("verbose", optValue(verbose), "print out some status messages"); + } +}; + +const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); + +std::string generateData(uint size) +{ + if (size < chars.length()) { + return chars.substr(0, size); + } + std::string data; + for (uint i = 0; i < (size / chars.length()); i++) { + data += chars; + } + data += chars.substr(0, size % chars.length()); + return data; +} + +void print(const std::string& text, const Message& msg) +{ + std::cout << text; + if (msg.getData().size() > 16) { + std::cout << msg.getData().substr(0, 16) << "..."; + } else { + std::cout << msg.getData(); + } + std::cout << std::endl; +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char** argv) +{ + try { + Args opts; + opts.parse(argc, argv); + + //Connect to the broker: + Connection connection; + opts.open(connection); + if (opts.verbose) std::cout << "Opened connection." << std::endl; + + //Create and open a session on the connection through which + //most functionality is exposed: + Session session = connection.newSession(); + if (opts.verbose) std::cout << "Opened session." << std::endl; + + + //'declare' the exchange and the queue, which will create them + //as they don't exist + session.exchangeDeclare(arg::exchange="MyExchange", arg::type="direct"); + if (opts.verbose) std::cout << "Declared exchange." << std::endl; + session.queueDeclare(arg::queue="MyQueue", arg::autoDelete=true, arg::exclusive=true); + if (opts.verbose) std::cout << "Declared queue." << std::endl; + + //now bind the queue to the exchange + session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey"); + if (opts.verbose) std::cout << "Bound queue to exchange." << std::endl; + + //create and send a message to the exchange using the routing + //key we bound our queue with: + Message msgOut(generateData(opts.msgSize)); + msgOut.getDeliveryProperties().setRoutingKey("MyKey"); + session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1); + if (opts.verbose) print("Published message: ", msgOut); + + // Using the SubscriptionManager, get the message from the queue. + SubscriptionManager subs(session); + Message msgIn = subs.get("MyQueue"); + if (msgIn.getData() == msgOut.getData()) + if (opts.verbose) std::cout << "Received the exepected message." << std::endl; + + //close the session & connection + session.close(); + if (opts.verbose) std::cout << "Closed session." << std::endl; + connection.close(); + if (opts.verbose) std::cout << "Closed connection." << std::endl; + return 0; + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + } + return 1; +} diff --git a/qpid/cpp/src/tests/qpid-latency-test.cpp b/qpid/cpp/src/tests/qpid-latency-test.cpp new file mode 100644 index 0000000000..20eb4568f3 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-latency-test.cpp @@ -0,0 +1,469 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include +#include +#include +#include +#include +#include + +#include "TestOptions.h" +#include "qpid/sys/Thread.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/sys/Time.h" + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +namespace qpid { +namespace tests { + +typedef std::vector StringSet; + +struct Args : public qpid::TestOptions { + uint size; + uint count; + uint rate; + bool sync; + uint reportFrequency; + uint timeLimit; + uint concurrentConnections; + uint prefetch; + uint ack; + bool cumulative; + bool csv; + bool durable; + string base; + bool singleConnect; + + Args() : size(256), count(1000), rate(0), reportFrequency(1000), + timeLimit(0), concurrentConnections(1), + prefetch(100), ack(0), + durable(false), base("latency-test"), singleConnect(false) + + { + addOptions() + + ("size", optValue(size, "N"), "message size") + ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setups, will create another publisher,\ + subcriber, queue, and connections") + ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") + ("count", optValue(count, "N"), "number of messages to send") + ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") + ("sync", optValue(sync), "send messages synchronously") + ("report-frequency", optValue(reportFrequency, "N"), + "number of milliseconds to wait between reports (ignored unless rate specified)") + ("time-limit", optValue(timeLimit, "N"), + "test duration, in seconds") + ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") + ("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)") + ("durable", optValue(durable, "yes|no"), "use durable messages") + ("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)") + ("cumulative", optValue(cumulative), "cumulative stats in csv format") + ("queue-base-name", optValue(base, ""), "base name for queues"); + } +}; + +const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); + +Args opts; +double c_min, c_avg, c_max; +Connection globalConnection; + +uint64_t current_time() +{ + Duration t(EPOCH, now()); + return t; +} + +struct Stats +{ + Mutex lock; + uint count; + double minLatency; + double maxLatency; + double totalLatency; + + Stats(); + void update(double l); + void print(); + void reset(); +}; + +class Client : public Runnable +{ +protected: + Connection* connection; + Connection localConnection; + AsyncSession session; + Thread thread; + string queue; + +public: + Client(const string& q); + virtual ~Client(); + + void start(); + void join(); + void run(); + virtual void test() = 0; +}; + +class Receiver : public Client, public MessageListener +{ + SubscriptionManager mgr; + uint count; + Stats& stats; + +public: + Receiver(const string& queue, Stats& stats); + void test(); + void received(Message& msg); + Stats getStats(); + uint getCount() { return count; } + void stop() { mgr.stop(); mgr.cancel(queue); } +}; + + +class Sender : public Client +{ + string generateData(uint size); + void sendByRate(); + void sendByCount(); + Receiver& receiver; + const string data; + +public: + Sender(const string& queue, Receiver& receiver); + void test(); +}; + + +class Test +{ + const string queue; + Stats stats; + Receiver receiver; + Sender sender; + AbsTime begin; + +public: + Test(const string& q) : queue(q), receiver(queue, stats), sender(queue, receiver), begin(now()) {} + void start(); + void join(); + void report(); +}; + + +Client::Client(const string& q) : queue(q) +{ + if (opts.singleConnect){ + connection = &globalConnection; + if (!globalConnection.isOpen()) opts.open(globalConnection); + }else{ + connection = &localConnection; + opts.open(localConnection); + } + session = connection->newSession(); +} + +void Client::start() +{ + thread = Thread(this); +} + +void Client::join() +{ + thread.join(); +} + +void Client::run() +{ + try{ + test(); + } catch(const std::exception& e) { + std::cout << "Error in receiver: " << e.what() << std::endl; + } +} + +Client::~Client() +{ + try{ + session.close(); + connection->close(); + } catch(const std::exception& e) { + std::cout << "Error in receiver: " << e.what() << std::endl; + } +} + +Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0), stats(s) +{ + session.queueDeclare(arg::queue=queue, arg::durable=opts.durable, arg::autoDelete=true); + uint msgCount = session.queueQuery(arg::queue=queue).get().getMessageCount(); + if (msgCount) { + std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl; + session.queuePurge(arg::queue=queue); + session.sync(); + } + SubscriptionSettings settings; + if (opts.prefetch) { + settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2)); + settings.flowControl = FlowControl::messageWindow(opts.prefetch); + } else { + settings.acceptMode = ACCEPT_MODE_NONE; + settings.flowControl = FlowControl::unlimited(); + } + mgr.subscribe(*this, queue, settings); +} + +void Receiver::test() +{ + mgr.run(); + mgr.cancel(queue); +} + +void Receiver::received(Message& msg) +{ + ++count; + uint64_t receivedAt = current_time(); + uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); + + stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC); + + if (!opts.rate && count >= opts.count) { + mgr.stop(); + } +} + +void Stats::update(double latency) +{ + Mutex::ScopedLock l(lock); + count++; + minLatency = std::min(minLatency, latency); + maxLatency = std::max(maxLatency, latency); + totalLatency += latency; +} + +Stats::Stats() : count(0), minLatency(std::numeric_limits::max()), maxLatency(0), totalLatency(0) {} + +void Stats::print() +{ + static bool already_have_stats = false; + uint value; + + if (opts.rate) + value = opts.rate; + else + value = opts.count; + Mutex::ScopedLock l(lock); + double aux_avg = (totalLatency / count); + if (!opts.cumulative) { + if (!opts.csv) { + if (count) { + std::cout << "Latency(ms): min=" << minLatency << ", max=" << + maxLatency << ", avg=" << aux_avg; + } else { + std::cout << "Stalled: no samples for interval"; + } + } else { + if (count) { + std::cout << value << "," << minLatency << "," << maxLatency << + "," << aux_avg; + } else { + std::cout << value << "," << minLatency << "," << maxLatency << + ", Stalled"; + } + } + } else { + if (count) { + if (already_have_stats) { + c_avg = (c_min + aux_avg) / 2; + if (c_min > minLatency) c_min = minLatency; + if (c_max < maxLatency) c_max = maxLatency; + } else { + c_avg = aux_avg; + c_min = minLatency; + c_max = maxLatency; + already_have_stats = true; + } + std::cout << value << "," << c_min << "," << c_max << + "," << c_avg; + } else { + std::cout << "Stalled: no samples for interval"; + } + } +} + +void Stats::reset() +{ + Mutex::ScopedLock l(lock); + count = 0; + totalLatency = maxLatency = 0; + minLatency = std::numeric_limits::max(); +} + +Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {} + +void Sender::test() +{ + if (opts.rate) sendByRate(); + else sendByCount(); +} + +void Sender::sendByCount() +{ + Message msg(data, queue); + if (opts.durable) { + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + } + + for (uint i = 0; i < opts.count; i++) { + uint64_t sentAt(current_time()); + msg.getDeliveryProperties().setTimestamp(sentAt); + async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); + if (opts.sync) session.sync(); + } + session.sync(); +} + +void Sender::sendByRate() +{ + Message msg(data, queue); + if (opts.durable) { + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + } + uint64_t interval = TIME_SEC/opts.rate; + int64_t timeLimit = opts.timeLimit * TIME_SEC; + uint64_t sent = 0, missedRate = 0; + AbsTime start = now(); + while (true) { + AbsTime sentAt=now(); + msg.getDeliveryProperties().setTimestamp(Duration(EPOCH, sentAt)); + async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); + if (opts.sync) session.sync(); + ++sent; + AbsTime waitTill(start, sent*interval); + Duration delay(sentAt, waitTill); + if (delay < 0) + ++missedRate; + else + sys::usleep(delay / TIME_USEC); + if (timeLimit != 0 && Duration(start, now()) > timeLimit) { + session.sync(); + receiver.stop(); + break; + } + } +} + +string Sender::generateData(uint size) +{ + if (size < chars.length()) { + return chars.substr(0, size); + } + std::string data; + for (uint i = 0; i < (size / chars.length()); i++) { + data += chars; + } + data += chars.substr(0, size % chars.length()); + return data; +} + + +void Test::start() +{ + receiver.start(); + begin = AbsTime(now()); + sender.start(); +} + +void Test::join() +{ + sender.join(); + receiver.join(); + AbsTime end = now(); + Duration time(begin, end); + double msecs(time / TIME_MSEC); + if (!opts.csv) { + std::cout << "Sent " << receiver.getCount() << " msgs through " << queue + << " in " << msecs << "ms (" << (receiver.getCount() * 1000 / msecs) << " msgs/s) "; + } + stats.print(); + std::cout << std::endl; +} + +void Test::report() +{ + stats.print(); + std::cout << std::endl; + stats.reset(); +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char** argv) +{ + try { + opts.parse(argc, argv); + if (opts.cumulative) + opts.csv = true; + + Connection localConnection; + AsyncSession session; + + boost::ptr_vector tests(opts.concurrentConnections); + for (uint i = 0; i < opts.concurrentConnections; i++) { + std::ostringstream out; + out << opts.base << "-" << (i+1); + tests.push_back(new Test(out.str())); + } + for (boost::ptr_vector::iterator i = tests.begin(); i != tests.end(); i++) { + i->start(); + } + if (opts.rate && !opts.timeLimit) { + while (true) { + qpid::sys::usleep(opts.reportFrequency * 1000); + //print latency report: + for (boost::ptr_vector::iterator i = tests.begin(); i != tests.end(); i++) { + i->report(); + } + } + } else { + for (boost::ptr_vector::iterator i = tests.begin(); i != tests.end(); i++) { + i->join(); + } + } + + return 0; + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + } + return 1; +} diff --git a/qpid/cpp/src/tests/qpid-perftest.cpp b/qpid/cpp/src/tests/qpid-perftest.cpp new file mode 100644 index 0000000000..7058851e15 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-perftest.cpp @@ -0,0 +1,741 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "TestOptions.h" + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Completion.h" +#include "qpid/client/Message.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + + +using namespace std; +using namespace qpid; +using namespace client; +using namespace sys; +using boost::lexical_cast; +using boost::bind; + +namespace qpid { +namespace tests { + +enum Mode { SHARED, FANOUT, TOPIC }; +const char* modeNames[] = { "shared", "fanout", "topic" }; + +// istream/ostream ops so Options can read/display Mode. +istream& operator>>(istream& in, Mode& mode) { + string s; + in >> s; + int i = find(modeNames, modeNames+3, s) - modeNames; + if (i >= 3) throw Exception("Invalid mode: "+s); + mode = Mode(i); + return in; +} + +ostream& operator<<(ostream& out, Mode mode) { + return out << modeNames[mode]; +} + + +struct Opts : public TestOptions { + + // Actions + bool setup, control, publish, subscribe; + + // Queue policy + uint32_t queueMaxCount; + uint64_t queueMaxSize; + std::string baseName; + bool queueDurable; + + // Publisher + size_t pubs; + size_t count ; + size_t size; + bool confirm; + bool durable; + bool uniqueData; + bool syncPub; + + // Subscriber + size_t subs; + size_t ack; + + // General + size_t qt; + bool singleConnect; + size_t iterations; + Mode mode; + bool summary; + uint32_t intervalSub; + uint32_t intervalPub; + size_t tx; + size_t txPub; + size_t txSub; + bool commitAsync; + + static const std::string helpText; + + Opts() : + TestOptions(helpText), + setup(false), control(false), publish(false), subscribe(false), baseName("qpid-perftest"), + pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), + subs(1), ack(0), + qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false), + intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false) + { + addOptions() + ("setup", optValue(setup), "Create shared queues.") + ("control", optValue(control), "Run test, print report.") + ("publish", optValue(publish), "Publish messages.") + ("subscribe", optValue(subscribe), "Subscribe for messages.") + + ("mode", optValue(mode, "shared|fanout|topic"), "Test mode." + "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n" + "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange." + "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n") + + ("npubs", optValue(pubs, "N"), "Create N publishers.") + ("count", optValue(count, "N"), "Each publisher sends N messages.") + ("size", optValue(size, "BYTES"), "Size of messages in bytes.") + ("pub-confirm", optValue(confirm, "yes|no"), "Publisher use confirm-mode.") + ("durable", optValue(durable, "yes|no"), "Publish messages as durable.") + ("unique-data", optValue(uniqueData, "yes|no"), "Make data for each message unique.") + ("sync-publish", optValue(syncPub, "yes|no"), "Wait for confirmation of each message before sending the next one.") + + ("nsubs", optValue(subs, "N"), "Create N subscribers.") + ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n" + "N==0: Subscriber uses unconfirmed mode") + + ("qt", optValue(qt, "N"), "Create N queues or topics.") + ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") + + ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.") + ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec") + + ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'") + ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'") + ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") + ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") + + ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") + ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") + + ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming") + ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing") + ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit") + ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming"); + } + + // Computed values + size_t totalPubs; + size_t totalSubs; + size_t transfers; + size_t subQuota; + + void parse(int argc, char** argv) { + TestOptions::parse(argc, argv); + switch (mode) { + case SHARED: + if (count % subs) { + count += subs - (count % subs); + cout << "WARNING: Adjusted --count to " << count + << " the nearest multiple of --nsubs" << endl; + } + totalPubs = pubs*qt; + totalSubs = subs*qt; + subQuota = (pubs*count)/subs; + break; + case FANOUT: + if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt=" + << qt << endl; + qt=1; + totalPubs = pubs; + totalSubs = subs; + subQuota = totalPubs*count; + break; + case TOPIC: + totalPubs = pubs*qt; + totalSubs = subs*qt; + subQuota = pubs*count; + break; + } + transfers=(totalPubs*count) + (totalSubs*subQuota); + if (tx) { + if (txPub) { + cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl; + } else { + txPub = tx; + } + if (txSub) { + cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl; + } else { + txSub = tx; + } + } + } +}; + +const std::string Opts::helpText= +"There are two ways to use qpid-perftest: single process or multi-process.\n\n" +"If none of the --setup, --publish, --subscribe or --control options\n" +"are given qpid-perftest will run a single-process test.\n" +"For a multi-process test first run:\n" +" qpid-perftest --setup \n" +"and wait for it to complete. The remaining process should run concurrently::\n" +"Run --npubs times: pqid-perftest --publish \n" +"Run --nsubs times: qpid-perftest --subscribe \n" +"Run once: qpid-perftest --control \n" +"Note the must be identical for all processes.\n"; + +Opts opts; +Connection globalConnection; + +std::string fqn(const std::string& name) +{ + ostringstream fqn; + fqn << opts.baseName << "_" << name; + return fqn.str(); +} + +struct Client : public Runnable { + Connection* connection; + Connection localConnection; + AsyncSession session; + Thread thread; + + Client() { + if (opts.singleConnect){ + connection = &globalConnection; + if (!globalConnection.isOpen()) opts.open(globalConnection); + }else{ + connection = &localConnection; + opts.open(localConnection); + } + session = connection->newSession(); + } + + ~Client() { + try { + if (connection->isOpen()) { + session.close(); + connection->close(); + } + } catch (const std::exception& e) { + std::cerr << "Error in shutdown: " << e.what() << std::endl; + } + } +}; + +struct Setup : public Client { + + void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { + session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); + session.queuePurge(arg::queue=name); + session.sync(); + } + + void run() { + queueInit(fqn("pub_start")); + queueInit(fqn("pub_done")); + queueInit(fqn("sub_ready")); + queueInit(fqn("sub_done")); + if (opts.iterations > 1) queueInit(fqn("sub_iteration")); + if (opts.mode==SHARED) { + framing::FieldTable settings;//queue policy settings + settings.setInt("qpid.max_count", opts.queueMaxCount); + settings.setInt("qpid.max_size", opts.queueMaxSize); + for (size_t i = 0; i < opts.qt; ++i) { + ostringstream qname; + qname << opts.baseName << i; + queueInit(qname.str(), opts.durable || opts.queueDurable, settings); + } + } + } +}; + +void expect(string actual, string expect) { + if (expect != actual) + throw Exception("Expecting "+expect+" but received "+actual); + +} + +double secs(Duration d) { return double(d)/TIME_SEC; } +double secs(AbsTime start, AbsTime finish) { + return secs(Duration(start,finish)); +} + + +// Collect rates & print stats. +class Stats { + vector values; + double sum; + + public: + Stats() : sum(0) {} + + // Functor to collect rates. + void operator()(const string& data) { + try { + double d=lexical_cast(data); + values.push_back(d); + sum += d; + } catch (const std::exception&) { + throw Exception("Bad report: "+data); + } + } + + double mean() const { + return sum/values.size(); + } + + double stdev() const { + if (values.size() <= 1) return 0; + double avg = mean(); + double ssq = 0; + for (vector::const_iterator i = values.begin(); + i != values.end(); ++i) { + double x=*i; + x -= avg; + ssq += x*x; + } + return sqrt(ssq/(values.size()-1)); + } + + ostream& print(ostream& out) { + ostream_iterator o(out, "\n"); + copy(values.begin(), values.end(), o); + out << "Average: " << mean(); + if (values.size() > 1) + out << " (std.dev. " << stdev() << ")"; + return out << endl; + } +}; + + +// Manage control queues, collect and print reports. +struct Controller : public Client { + + SubscriptionManager subs; + + Controller() : subs(session) {} + + /** Process messages from queue by applying a functor. */ + void process(size_t n, string queue, + boost::function msgFn) + { + if (!opts.summary) + cout << "Processing " << n << " messages from " + << queue << " " << flush; + LocalQueue lq; + subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false); + subs.subscribe(lq, queue); + for (size_t i = 0; i < n; ++i) { + if (!opts.summary) cout << "." << flush; + msgFn(lq.pop().getData()); + } + if (!opts.summary) cout << " done." << endl; + } + + void process(size_t n, LocalQueue lq, string queue, + boost::function msgFn) + { + session.messageFlow(queue, 0, n); + if (!opts.summary) + cout << "Processing " << n << " messages from " + << queue << " " << flush; + for (size_t i = 0; i < n; ++i) { + if (!opts.summary) cout << "." << flush; + msgFn(lq.pop().getData()); + } + if (!opts.summary) cout << " done." << endl; + } + + void send(size_t n, string queue, string data) { + if (!opts.summary) + cout << "Sending " << data << " " << n << " times to " << queue + << endl; + Message msg(data, queue); + for (size_t i = 0; i < n; ++i) + session.messageTransfer(arg::content=msg, arg::acceptMode=1); + } + + void run() { // Controller + try { + // Wait for subscribers to be ready. + process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready")); + + LocalQueue pubDone; + LocalQueue subDone; + subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); + subs.subscribe(pubDone, fqn("pub_done")); + subs.subscribe(subDone, fqn("sub_done")); + + double txrateTotal(0); + double mbytesTotal(0); + double pubRateTotal(0); + double subRateTotal(0); + + for (size_t j = 0; j < opts.iterations; ++j) { + AbsTime start=now(); + send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers + if (j) { + send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration + } + + Stats pubRates; + Stats subRates; + + process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates)); + process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates)); + + AbsTime end=now(); + + double time=secs(start, end); + double txrate=opts.transfers/time; + double mbytes=(txrate*opts.size)/(1024*1024); + + if (!opts.summary) { + cout << endl << "Total " << opts.transfers << " transfers of " + << opts.size << " bytes in " + << time << " seconds." << endl; + cout << endl << "Publish transfers/sec: " << endl; + pubRates.print(cout); + cout << endl << "Subscribe transfers/sec: " << endl; + subRates.print(cout); + cout << endl + << "Total transfers/sec: " << txrate << endl + << "Total Mbytes/sec: " << mbytes << endl; + } + else { + cout << pubRates.mean() << "\t" + << subRates.mean() << "\t" + << txrate << "\t" + << mbytes << endl; + } + + txrateTotal += txrate; + mbytesTotal += mbytes; + pubRateTotal += pubRates.mean(); + subRateTotal += subRates.mean(); + } + if (opts.iterations > 1) { + cout << "Averages: "<< endl + << (pubRateTotal / opts.iterations) << "\t" + << (subRateTotal / opts.iterations) << "\t" + << (txrateTotal / opts.iterations) << "\t" + << (mbytesTotal / opts.iterations) << endl; + } + } + catch (const std::exception& e) { + cout << "Controller exception: " << e.what() << endl; + } + } +}; + + +struct PublishThread : public Client { + string destination; + string routingKey; + + PublishThread() {}; + + PublishThread(string key, string dest=string()) { + destination=dest; + routingKey=key; + } + + void run() { // Publisher + try { + string data; + size_t offset(0); + if (opts.uniqueData) { + offset = 5; + data += "data:";//marker (requested for latency testing tool scripts) + data += string(sizeof(size_t), 'X');//space for seq no + data += session.getId().str(); + if (opts.size > data.size()) { + data += string(opts.size - data.size(), 'X'); + } else if(opts.size < data.size()) { + cout << "WARNING: Increased --size to " << data.size() + << " to honour --unique-data" << endl; + } + } else { + size_t msgSize=max(opts.size, sizeof(size_t)); + data = string(msgSize, 'X'); + } + + Message msg(data, routingKey); + if (opts.durable) + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + + + if (opts.txPub){ + session.txSelect(); + } + SubscriptionManager subs(session); + LocalQueue lq; + subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); + subs.subscribe(lq, fqn("pub_start")); + + for (size_t j = 0; j < opts.iterations; ++j) { + expect(lq.pop().getData(), "start"); + AbsTime start=now(); + for (size_t i=0; i(msg.getData()).replace(offset, sizeof(size_t), + reinterpret_cast(&i), sizeof(size_t)); + if (opts.syncPub) { + sync(session).messageTransfer( + arg::destination=destination, + arg::content=msg, + arg::acceptMode=1); + } else { + session.messageTransfer( + arg::destination=destination, + arg::content=msg, + arg::acceptMode=1); + } + if (opts.txPub && ((i+1) % opts.txPub == 0)){ + if (opts.commitAsync){ + session.txCommit(); + } else { + sync(session).txCommit(); + } + } + if (opts.intervalPub) + qpid::sys::usleep(opts.intervalPub*1000); + } + if (opts.confirm) session.sync(); + AbsTime end=now(); + double time=secs(start,end); + + // Send result to controller. + Message report(lexical_cast(opts.count/time), fqn("pub_done")); + session.messageTransfer(arg::content=report, arg::acceptMode=1); + if (opts.txPub){ + sync(session).txCommit(); + } + } + session.close(); + } + catch (const std::exception& e) { + cout << "PublishThread exception: " << e.what() << endl; + } + } +}; + +struct SubscribeThread : public Client { + + string queue; + + SubscribeThread() {} + + SubscribeThread(string q) { queue = q; } + + SubscribeThread(string key, string ex) { + queue=session.getId().str(); // Unique name. + session.queueDeclare(arg::queue=queue, + arg::exclusive=true, + arg::autoDelete=true, + arg::durable=opts.durable); + session.exchangeBind(arg::queue=queue, + arg::exchange=ex, + arg::bindingKey=key); + } + + void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) { + if (!cond) { + Message error( + QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual), + "sub_done"); + session.messageTransfer(arg::content=error, arg::acceptMode=1); + throw Exception(error.getData()); + } + } + + void run() { // Subscribe + try { + if (opts.txSub) sync(session).txSelect(); + SubscriptionManager subs(session); + SubscriptionSettings settings; + settings.autoAck = opts.txSub ? opts.txSub : opts.ack; + settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE); + settings.flowControl = FlowControl::messageCredit(opts.subQuota); + LocalQueue lq; + Subscription subscription = subs.subscribe(lq, queue, settings); + // Notify controller we are ready. + session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1); + if (opts.txSub) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } + + LocalQueue iterationControl; + if (opts.iterations > 1) { + subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0))); + } + + for (size_t j = 0; j < opts.iterations; ++j) { + if (j > 0) { + //need to wait here until all subs are done + session.messageFlow(fqn("sub_iteration"), 0, 1); + iterationControl.pop(); + + //need to allocate some more credit for subscription + session.messageFlow(queue, 0, opts.subQuota); + } + Message msg; + AbsTime start=now(); + size_t expect=0; + for (size_t i = 0; i < opts.subQuota; ++i) { + msg=lq.pop(); + if (opts.txSub && ((i+1) % opts.txSub == 0)) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } + if (opts.intervalSub) + qpid::sys::usleep(opts.intervalSub*1000); + // TODO aconway 2007-11-23: check message order for. + // multiple publishers. Need an array of counters, + // one per publisher and a publisher ID in the + // message. Careful not to introduce a lot of overhead + // here, e.g. no std::map, std::string etc. + // + // For now verify order only for a single publisher. + size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0; + size_t n = *reinterpret_cast(msg.getData().data() + offset); + if (opts.pubs == 1) { + if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n); + else verify(n>=expect, ">=", expect, n); + expect = n+1; + } + } + if (opts.txSub || opts.ack) + subscription.accept(subscription.getUnaccepted()); + if (opts.txSub) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } + AbsTime end=now(); + + // Report to publisher. + Message result(lexical_cast(opts.subQuota/secs(start,end)), + fqn("sub_done")); + session.messageTransfer(arg::content=result, arg::acceptMode=1); + if (opts.txSub) sync(session).txCommit(); + } + session.close(); + } + catch (const std::exception& e) { + cout << "SubscribeThread exception: " << e.what() << endl; + } + } +}; + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char** argv) { + int exitCode = 0; + boost::ptr_vector subs(opts.subs); + boost::ptr_vector pubs(opts.pubs); + + try { + opts.parse(argc, argv); + + string exchange; + switch (opts.mode) { + case FANOUT: exchange="amq.fanout"; break; + case TOPIC: exchange="amq.topic"; break; + case SHARED: break; + } + + bool singleProcess= + (!opts.setup && !opts.control && !opts.publish && !opts.subscribe); + if (singleProcess) + opts.setup = opts.control = opts.publish = opts.subscribe = true; + + if (opts.setup) Setup().run(); // Set up queues + + // Start pubs/subs for each queue/topic. + for (size_t i = 0; i < opts.qt; ++i) { + ostringstream key; + key << opts.baseName << i; // Queue or topic name. + if (opts.publish) { + size_t n = singleProcess ? opts.pubs : 1; + for (size_t j = 0; j < n; ++j) { + pubs.push_back(new PublishThread(key.str(), exchange)); + pubs.back().thread=Thread(pubs.back()); + } + } + if (opts.subscribe) { + size_t n = singleProcess ? opts.subs : 1; + for (size_t j = 0; j < n; ++j) { + if (opts.mode==SHARED) + subs.push_back(new SubscribeThread(key.str())); + else + subs.push_back(new SubscribeThread(key.str(),exchange)); + subs.back().thread=Thread(subs.back()); + } + } + } + + if (opts.control) Controller().run(); + } + catch (const std::exception& e) { + cout << endl << e.what() << endl; + exitCode = 1; + } + + // Wait for started threads. + if (opts.publish) { + for (boost::ptr_vector::iterator i=pubs.begin(); + i != pubs.end(); + ++i) + i->thread.join(); + } + + if (opts.subscribe) { + for (boost::ptr_vector::iterator i=subs.begin(); + i != subs.end(); + ++i) + i->thread.join(); + } + return exitCode; +} diff --git a/qpid/cpp/src/tests/qpid-topic-listener.cpp b/qpid/cpp/src/tests/qpid-topic-listener.cpp new file mode 100644 index 0000000000..c42e76d760 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-topic-listener.cpp @@ -0,0 +1,209 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file provides one half of a test and example of a pub-sub + * style of interaction. See qpid-topic-publisher.cpp for the other half, + * in which the logic for publishing is defined. + * + * This file contains the listener logic. A listener will subscribe to + * a logical 'topic'. It will count the number of messages it receives + * and the time elapsed between the first one and the last one. It + * recognises two types of 'special' message that tell it to (a) send + * a report containing this information, (b) shutdown (i.e. stop + * listening). + */ + +#include "TestOptions.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/FieldValue.h" +#include +#include + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; +using namespace qpid::framing; +using namespace std; + +namespace qpid { +namespace tests { + +/** + * A message listener implementation in which the runtime logic is + * defined. + */ +class Listener : public MessageListener{ + Session session; + SubscriptionManager& mgr; + const string responseQueue; + const bool transactional; + bool init; + int count; + AbsTime start; + + void shutdown(); + void report(); +public: + Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); + virtual void received(Message& msg); + Subscription subscription; +}; + +/** + * A utility class for managing the options passed in. + */ +struct Args : public qpid::TestOptions { + int ack; + bool transactional; + bool durable; + int prefetch; + string statusqueue; + + Args() : ack(0), transactional(false), durable(false), prefetch(0) { + addOptions() + ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)") + ("transactional", optValue(transactional), "Use transactions") + ("durable", optValue(durable), "subscribers should use durable queues") + ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") + ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to put status messages on"); + } +}; + +Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : + session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} + +void Listener::received(Message& message){ + if(!init){ + start = now(); + count = 0; + init = true; + cout << "Batch started." << endl; + } + string type = message.getHeaders().getAsString("TYPE"); + + if(string("TERMINATION_REQUEST") == type){ + shutdown(); + }else if(string("REPORT_REQUEST") == type){ + subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point + cout <<"Batch ended, sending report." << endl; + //send a report: + report(); + init = false; + }else if (++count % 1000 == 0){ + cout <<"Received " << count << " messages." << endl; + } +} + +void Listener::shutdown(){ + mgr.stop(); +} + +void Listener::report(){ + AbsTime finish = now(); + Duration time(start, finish); + stringstream reportstr; + reportstr << "Received " << count << " messages in " + << time/TIME_MSEC << " ms."; + Message msg(reportstr.str(), responseQueue); + msg.getHeaders().setString("TYPE", "REPORT"); + session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1); + if(transactional){ + sync(session).txCommit(); + } +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +/** + * The main routine creates a Listener instance and sets it up to + * consume from a private queue bound to the exchange with the + * appropriate topic name. + */ +int main(int argc, char** argv){ + try{ + Args args; + args.parse(argc, argv); + if(args.help) + cout << args << endl; + else { + Connection connection; + args.open(connection); + AsyncSession session = connection.newSession(); + + //declare exchange, queue and bind them: + session.queueDeclare(arg::queue="response"); + std::string control = "control_" + session.getId().str(); + if (args.durable) { + session.queueDeclare(arg::queue=control, arg::durable=true); + } else { + session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true); + } + session.exchangeBind(arg::exchange="amq.topic", arg::queue=control, arg::bindingKey="topic_control"); + + //set up listener + SubscriptionManager mgr(session); + Listener listener(session, mgr, "response", args.transactional); + SubscriptionSettings settings; + if (args.prefetch) { + settings.autoAck = (args.ack ? args.ack : (args.prefetch / 2)); + settings.flowControl = FlowControl::messageCredit(args.prefetch); + } else { + settings.acceptMode = ACCEPT_MODE_NONE; + settings.flowControl = FlowControl::unlimited(); + } + listener.subscription = mgr.subscribe(listener, control, settings); + session.sync(); + + if( args.statusqueue.length() > 0 ) { + stringstream msg_str; + msg_str << "qpid-topic-listener: " << qpid::sys::SystemInfo::getProcessId(); + session.messageTransfer(arg::content=Message(msg_str.str(), args.statusqueue)); + cout << "Ready status put on queue '" << args.statusqueue << "'" << endl; + } + + if (args.transactional) { + session.txSelect(); + } + + cout << "qpid-topic-listener: listening..." << endl; + mgr.run(); + if (args.durable) { + session.queueDelete(arg::queue=control); + } + session.close(); + cout << "closing connection" << endl; + connection.close(); + } + return 0; + } catch (const std::exception& error) { + cout << "qpid-topic-listener: " << error.what() << endl; + } + return 1; +} diff --git a/qpid/cpp/src/tests/qpid-topic-publisher.cpp b/qpid/cpp/src/tests/qpid-topic-publisher.cpp new file mode 100644 index 0000000000..f9107b90d0 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-topic-publisher.cpp @@ -0,0 +1,230 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file provides one half of a test and example of a pub-sub + * style of interaction. See qpid-topic-listener.cpp for the other half, in + * which the logic for subscribers is defined. + * + * This file contains the publisher logic. The publisher will send a + * number of messages to the exchange with the appropriate routing key + * for the logical 'topic'. Once it has done this it will then send a + * request that each subscriber report back with the number of message + * it has received and the time that elapsed between receiving the + * first one and receiving the report request. Once the expected + * number of reports are received, it sends out a request that each + * subscriber shutdown. + */ + +#include "TestOptions.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" +#include +#include + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; +using namespace std; + +namespace qpid { +namespace tests { + +/** + * The publishing logic is defined in this class. It implements + * message listener and can therfore be used to receive messages sent + * back by the subscribers. + */ +class Publisher { + AsyncSession session; + SubscriptionManager mgr; + LocalQueue queue; + const string controlTopic; + const bool transactional; + const bool durable; + + string generateData(int size); + +public: + Publisher(const AsyncSession& session, const string& controlTopic, bool tx, bool durable); + int64_t publish(int msgs, int listeners, int size); + void terminate(); +}; + +/** + * A utility class for managing the options passed in to the test + */ +struct Args : public TestOptions { + int messages; + int subscribers; + bool transactional; + bool durable; + int batches; + int delay; + int size; + string statusqueue; + + Args() : messages(1000), subscribers(1), + transactional(false), durable(false), + batches(1), delay(0), size(256) + { + addOptions() + ("messages", optValue(messages, "N"), "how many messages to send") + ("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from") + ("transactional", optValue(transactional), "client should use transactions") + ("durable", optValue(durable), "messages should be durable") + ("batches", optValue(batches, "N"), "how many batches to run") + ("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch") + ("size", optValue(size, "BYTES"), "size of the published messages") + ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to read status messages from"); + } +}; + +Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) : + session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) +{ + mgr.subscribe(queue, "response"); +} + +int64_t Publisher::publish(int msgs, int listeners, int size){ + Message msg(generateData(size), controlTopic); + if (durable) { + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + } + AbsTime start = now(); + + for(int i = 0; i < msgs; i++){ + session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1); + } + //send report request + Message reportRequest("", controlTopic); + reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); + session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1); + if(transactional){ + sync(session).txCommit(); + } + //wait for a response from each listener (TODO, could log these) + for (int i = 0; i < listeners; i++) { + Message report = queue.pop(); + } + + if(transactional){ + sync(session).txCommit(); + } + + AbsTime finish = now(); + return Duration(start, finish); +} + +string Publisher::generateData(int size){ + string data; + for(int i = 0; i < size; i++){ + data += ('A' + (i / 26)); + } + return data; +} + +void Publisher::terminate(){ + //send termination request + Message terminationRequest("", controlTopic); + terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); + session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1); + if(transactional){ + session.txCommit(); + } +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char** argv) { + try{ + Args args; + args.parse(argc, argv); + if(args.help) + cout << args << endl; + else { + Connection connection; + args.open(connection); + AsyncSession session = connection.newSession(); + + // If status-queue is defined, wait for all expected listeners to join in before we start + if( args.statusqueue.length() > 0 ) { + cout << "Waiting for " << args.subscribers << " listeners..." << endl; + SubscriptionManager statusSubs(session); + LocalQueue statusQ; + statusSubs.subscribe(statusQ, args.statusqueue); + for (int i = 0; i < args.subscribers; i++) { + Message m = statusQ.get(); + if( m.getData().find("topic_listener: ", 0) == 0 ) { + cout << "Listener " << (i+1) << " of " << args.subscribers + << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16) + << ")" << endl; + } else { + throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData())); + } + } + } + + if (args.transactional) { + session.txSelect(); + } + session.queueDeclare(arg::queue="response"); + session.exchangeBind(arg::exchange="amq.direct", arg::queue="response", arg::bindingKey="response"); + + Publisher publisher(session, "topic_control", args.transactional, args.durable); + + int batchSize(args.batches); + int64_t max(0); + int64_t min(0); + int64_t sum(0); + for(int i = 0; i < batchSize; i++){ + if(i > 0 && args.delay) qpid::sys::sleep(args.delay); + int64_t msecs = + publisher.publish(args.messages, + args.subscribers, + args.size) / TIME_MSEC; + if(!max || msecs > max) max = msecs; + if(!min || msecs < min) min = msecs; + sum += msecs; + cout << "Completed " << (i+1) << " of " << batchSize + << " in " << msecs << "ms" << endl; + } + publisher.terminate(); + int64_t avg = sum / batchSize; + if(batchSize > 1){ + cout << batchSize << " batches completed. avg=" << avg << + ", max=" << max << ", min=" << min << endl; + } + session.close(); + connection.close(); + } + return 0; + }catch(exception& error) { + cout << error.what() << endl; + } + return 1; +} diff --git a/qpid/cpp/src/tests/qpid-txtest.cpp b/qpid/cpp/src/tests/qpid-txtest.cpp new file mode 100644 index 0000000000..d0ba2f1245 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-txtest.cpp @@ -0,0 +1,340 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include + +#include "TestOptions.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Thread.h" + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +namespace qpid { +namespace tests { + +typedef std::vector StringSet; + +struct Args : public qpid::TestOptions { + bool init, transfer, check;//actions + uint size; + bool durable; + uint queues; + string base; + uint msgsPerTx; + uint txCount; + uint totalMsgCount; + bool dtx; + bool quiet; + + Args() : init(true), transfer(true), check(true), + size(256), durable(true), queues(2), + base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10), + dtx(false), quiet(false) + { + addOptions() + + ("init", optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.") + ("transfer", optValue(transfer, "yes|no"), "'Move' messages from one queue to another using transactions to ensure no message loss.") + ("check", optValue(check, "yes|no"), "Check that the initial messages are all still available.") + ("size", optValue(size, "N"), "message size") + ("durable", optValue(durable, "yes|no"), "use durable messages") + ("queues", optValue(queues, "N"), "number of queues") + ("queue-base-name", optValue(base, ""), "base name for queues") + ("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction") + ("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'") + ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'") + ("dtx", optValue(dtx, "yes|no"), "use distributed transactions") + ("quiet", optValue(quiet), "reduce output from test"); + } +}; + +const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); + +std::string generateData(uint size) +{ + if (size < chars.length()) { + return chars.substr(0, size); + } + std::string data; + for (uint i = 0; i < (size / chars.length()); i++) { + data += chars; + } + data += chars.substr(0, size % chars.length()); + return data; +} + +void generateSet(const std::string& base, uint count, StringSet& collection) +{ + for (uint i = 0; i < count; i++) { + std::ostringstream out; + out << base << "-" << (i+1); + collection.push_back(out.str()); + } +} + +Args opts; + +struct Client +{ + Connection connection; + AsyncSession session; + + Client() + { + opts.open(connection); + session = connection.newSession(); + } + + ~Client() + { + try{ + session.close(); + connection.close(); + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + } + } +}; + +struct Transfer : public Client, public Runnable +{ + std::string src; + std::string dest; + Thread thread; + framing::Xid xid; + + Transfer(const std::string& to, const std::string& from) : src(to), dest(from), xid(0x4c414e47, "", from) {} + + void run() + { + try { + + if (opts.dtx) session.dtxSelect(); + else session.txSelect(); + SubscriptionManager subs(session); + + LocalQueue lq; + SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx)); + settings.autoAck = 0; // Disabled + Subscription sub = subs.subscribe(lq, src, settings); + + for (uint t = 0; t < opts.txCount; t++) { + Message in; + Message out("", dest); + if (opts.dtx) { + setNewXid(xid); + session.dtxStart(arg::xid=xid); + } + for (uint m = 0; m < opts.msgsPerTx; m++) { + in = lq.pop(); + std::string& data = in.getData(); + if (data.size() != opts.size) { + std::ostringstream oss; + oss << "Message size incorrect: size=" << in.getData().size() << "; expected " << opts.size; + throw std::runtime_error(oss.str()); + } + out.setData(data); + out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId()); + out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); + session.messageTransfer(arg::content=out, arg::acceptMode=1); + } + sub.accept(sub.getUnaccepted()); + if (opts.dtx) { + session.dtxEnd(arg::xid=xid); + session.dtxPrepare(arg::xid=xid); + session.dtxCommit(arg::xid=xid); + } else { + session.txCommit(); + } + } + } catch(const std::exception& e) { + std::cout << "Transfer interrupted: " << e.what() << std::endl; + } + } + + void setNewXid(framing::Xid& xid) { + framing::Uuid uuid(true); + xid.setGlobalId(uuid.str()); + } +}; + +struct Controller : public Client +{ + StringSet ids; + StringSet queues; + + Controller() + { + generateSet(opts.base, opts.queues, queues); + generateSet("msg", opts.totalMsgCount, ids); + } + + void init() + { + //declare queues + for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { + session.queueDeclare(arg::queue=*i, arg::durable=opts.durable); + session.sync(); + } + + Message msg(generateData(opts.size), *queues.begin()); + if (opts.durable) { + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + } + + //publish messages + for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { + msg.getMessageProperties().setCorrelationId(*i); + session.messageTransfer(arg::content=msg, arg::acceptMode=1); + } + } + + void transfer() + { + boost::ptr_vector agents(opts.queues); + //launch transfer agents + for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { + StringSet::iterator next = i + 1; + if (next == queues.end()) next = queues.begin(); + + if (!opts.quiet) std::cout << "Transfering from " << *i << " to " << *next << std::endl; + agents.push_back(new Transfer(*i, *next)); + agents.back().thread = Thread(agents.back()); + } + + for (boost::ptr_vector::iterator i = agents.begin(); i != agents.end(); i++) { + i->thread.join(); + } + } + + int check() + { + SubscriptionManager subs(session); + + // Recover DTX transactions (if any) + if (opts.dtx) { + std::vector inDoubtXids; + framing::DtxRecoverResult dtxRes = session.dtxRecover().get(); + const framing::Array& xidArr = dtxRes.getInDoubt(); + xidArr.collect(inDoubtXids); + + if (inDoubtXids.size()) { + if (!opts.quiet) std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl; + framing::StructHelper decoder; + framing::Xid xid; + // abort even, commit odd transactions + for (unsigned i = 0; i < inDoubtXids.size(); i++) { + decoder.decode(xid, inDoubtXids[i]); + if (!opts.quiet) std::cout << (i%2 ? " * aborting " : " * committing "); + xid.print(std::cout); + std::cout << std::endl; + if (i%2) { + session.dtxRollback(arg::xid=xid); + } else { + session.dtxCommit(arg::xid=xid); + } + } + } + } + + StringSet drained; + //drain each queue and verify the correct set of messages are available + for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { + //subscribe, allocate credit and flushn + LocalQueue lq; + SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE); + subs.subscribe(lq, *i, settings); + session.messageFlush(arg::destination=*i); + session.sync(); + + uint count(0); + while (!lq.empty()) { + Message m = lq.pop(); + //add correlation ids of received messages to drained + drained.push_back(m.getMessageProperties().getCorrelationId()); + ++count; + } + if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; + } + + sort(ids.begin(), ids.end()); + sort(drained.begin(), drained.end()); + + //check that drained == ids + StringSet missing; + set_difference(ids.begin(), ids.end(), drained.begin(), drained.end(), back_inserter(missing)); + + StringSet extra; + set_difference(drained.begin(), drained.end(), ids.begin(), ids.end(), back_inserter(extra)); + + if (missing.empty() && extra.empty()) { + std::cout << "All expected messages were retrieved." << std::endl; + return 0; + } else { + if (!missing.empty()) { + std::cout << "The following ids were missing:" << std::endl; + for (StringSet::iterator i = missing.begin(); i != missing.end(); i++) { + std::cout << " '" << *i << "'" << std::endl; + } + } + if (!extra.empty()) { + std::cout << "The following extra ids were encountered:" << std::endl; + for (StringSet::iterator i = extra.begin(); i != extra.end(); i++) { + std::cout << " '" << *i << "'" << std::endl; + } + } + return 1; + } + } +}; + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char** argv) +{ + try { + opts.parse(argc, argv); + Controller controller; + if (opts.init) controller.init(); + if (opts.transfer) controller.transfer(); + if (opts.check) return controller.check(); + return 0; + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + } + return 2; +} diff --git a/qpid/cpp/src/tests/quick_perftest b/qpid/cpp/src/tests/quick_perftest index 4f7cf3cb54..362f9ee96a 100755 --- a/qpid/cpp/src/tests/quick_perftest +++ b/qpid/cpp/src/tests/quick_perftest @@ -19,4 +19,4 @@ # under the License. # -exec `dirname $0`/run_test ./perftest --summary --count 100 +exec `dirname $0`/run_test ./qpid-perftest --summary --count 100 diff --git a/qpid/cpp/src/tests/quick_txtest b/qpid/cpp/src/tests/quick_txtest index 938e3805d8..c872fcec12 100755 --- a/qpid/cpp/src/tests/quick_txtest +++ b/qpid/cpp/src/tests/quick_txtest @@ -19,4 +19,4 @@ # under the License. # -exec `dirname $0`/run_test ./txtest --queues 4 --tx-count 10 --quiet +exec `dirname $0`/run_test ./qpid-txtest --queues 4 --tx-count 10 --quiet diff --git a/qpid/cpp/src/tests/run_perftest b/qpid/cpp/src/tests/run_perftest index 1a9b934641..5ad7c1ff4f 100755 --- a/qpid/cpp/src/tests/run_perftest +++ b/qpid/cpp/src/tests/run_perftest @@ -19,10 +19,10 @@ # under the License. # -# Args: count [perftest options...] -# Run a perftest with count multiplied. +# Args: count [qpid-perftest options...] +# Run a qpid-perftest with count multiplied. # MULTIPLIER=3 COUNT=`expr $1 \* $MULTIPLIER` shift -exec `dirname $0`/run_test ./perftest --summary --count $COUNT "$@" +exec `dirname $0`/run_test ./qpid-perftest --summary --count $COUNT "$@" diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test index 4863eb90c7..a03341ec5b 100755 --- a/qpid/cpp/src/tests/ssl_test +++ b/qpid/cpp/src/tests/ssl_test @@ -73,7 +73,7 @@ export QPID_SSL_CERT_DB=${CERT_DIR} export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE} ## Test connection via connection settings -./perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary +./qpid-perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary ## Test connection with a URL URL=amqp:ssl:$TEST_HOSTNAME:$PORT diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp deleted file mode 100644 index aa8c19df99..0000000000 --- a/qpid/cpp/src/tests/topic_listener.cpp +++ /dev/null @@ -1,209 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This file provides one half of a test and example of a pub-sub - * style of interaction. See topic_publisher.cpp for the other half, - * in which the logic for publishing is defined. - * - * This file contains the listener logic. A listener will subscribe to - * a logical 'topic'. It will count the number of messages it receives - * and the time elapsed between the first one and the last one. It - * recognises two types of 'special' message that tell it to (a) send - * a report containing this information, (b) shutdown (i.e. stop - * listening). - */ - -#include "TestOptions.h" -#include "qpid/client/Connection.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Session.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/sys/SystemInfo.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/FieldValue.h" -#include -#include - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; -using namespace qpid::framing; -using namespace std; - -namespace qpid { -namespace tests { - -/** - * A message listener implementation in which the runtime logic is - * defined. - */ -class Listener : public MessageListener{ - Session session; - SubscriptionManager& mgr; - const string responseQueue; - const bool transactional; - bool init; - int count; - AbsTime start; - - void shutdown(); - void report(); -public: - Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); - virtual void received(Message& msg); - Subscription subscription; -}; - -/** - * A utility class for managing the options passed in. - */ -struct Args : public qpid::TestOptions { - int ack; - bool transactional; - bool durable; - int prefetch; - string statusqueue; - - Args() : ack(0), transactional(false), durable(false), prefetch(0) { - addOptions() - ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)") - ("transactional", optValue(transactional), "Use transactions") - ("durable", optValue(durable), "subscribers should use durable queues") - ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") - ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to put status messages on"); - } -}; - -Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : - session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} - -void Listener::received(Message& message){ - if(!init){ - start = now(); - count = 0; - init = true; - cout << "Batch started." << endl; - } - string type = message.getHeaders().getAsString("TYPE"); - - if(string("TERMINATION_REQUEST") == type){ - shutdown(); - }else if(string("REPORT_REQUEST") == type){ - subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point - cout <<"Batch ended, sending report." << endl; - //send a report: - report(); - init = false; - }else if (++count % 1000 == 0){ - cout <<"Received " << count << " messages." << endl; - } -} - -void Listener::shutdown(){ - mgr.stop(); -} - -void Listener::report(){ - AbsTime finish = now(); - Duration time(start, finish); - stringstream reportstr; - reportstr << "Received " << count << " messages in " - << time/TIME_MSEC << " ms."; - Message msg(reportstr.str(), responseQueue); - msg.getHeaders().setString("TYPE", "REPORT"); - session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1); - if(transactional){ - sync(session).txCommit(); - } -} - -}} // namespace qpid::tests - -using namespace qpid::tests; - -/** - * The main routine creates a Listener instance and sets it up to - * consume from a private queue bound to the exchange with the - * appropriate topic name. - */ -int main(int argc, char** argv){ - try{ - Args args; - args.parse(argc, argv); - if(args.help) - cout << args << endl; - else { - Connection connection; - args.open(connection); - AsyncSession session = connection.newSession(); - - //declare exchange, queue and bind them: - session.queueDeclare(arg::queue="response"); - std::string control = "control_" + session.getId().str(); - if (args.durable) { - session.queueDeclare(arg::queue=control, arg::durable=true); - } else { - session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true); - } - session.exchangeBind(arg::exchange="amq.topic", arg::queue=control, arg::bindingKey="topic_control"); - - //set up listener - SubscriptionManager mgr(session); - Listener listener(session, mgr, "response", args.transactional); - SubscriptionSettings settings; - if (args.prefetch) { - settings.autoAck = (args.ack ? args.ack : (args.prefetch / 2)); - settings.flowControl = FlowControl::messageCredit(args.prefetch); - } else { - settings.acceptMode = ACCEPT_MODE_NONE; - settings.flowControl = FlowControl::unlimited(); - } - listener.subscription = mgr.subscribe(listener, control, settings); - session.sync(); - - if( args.statusqueue.length() > 0 ) { - stringstream msg_str; - msg_str << "topic_listener: " << qpid::sys::SystemInfo::getProcessId(); - session.messageTransfer(arg::content=Message(msg_str.str(), args.statusqueue)); - cout << "Ready status put on queue '" << args.statusqueue << "'" << endl; - } - - if (args.transactional) { - session.txSelect(); - } - - cout << "topic_listener: listening..." << endl; - mgr.run(); - if (args.durable) { - session.queueDelete(arg::queue=control); - } - session.close(); - cout << "closing connection" << endl; - connection.close(); - } - return 0; - } catch (const std::exception& error) { - cout << "topic_listener: " << error.what() << endl; - } - return 1; -} diff --git a/qpid/cpp/src/tests/topic_publisher.cpp b/qpid/cpp/src/tests/topic_publisher.cpp deleted file mode 100644 index 3381132b1a..0000000000 --- a/qpid/cpp/src/tests/topic_publisher.cpp +++ /dev/null @@ -1,230 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This file provides one half of a test and example of a pub-sub - * style of interaction. See topic_listener.cpp for the other half, in - * which the logic for subscribers is defined. - * - * This file contains the publisher logic. The publisher will send a - * number of messages to the exchange with the appropriate routing key - * for the logical 'topic'. Once it has done this it will then send a - * request that each subscriber report back with the number of message - * it has received and the time that elapsed between receiving the - * first one and receiving the report request. Once the expected - * number of reports are received, it sends out a request that each - * subscriber shutdown. - */ - -#include "TestOptions.h" -#include "qpid/client/Connection.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Time.h" -#include -#include - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; -using namespace std; - -namespace qpid { -namespace tests { - -/** - * The publishing logic is defined in this class. It implements - * message listener and can therfore be used to receive messages sent - * back by the subscribers. - */ -class Publisher { - AsyncSession session; - SubscriptionManager mgr; - LocalQueue queue; - const string controlTopic; - const bool transactional; - const bool durable; - - string generateData(int size); - -public: - Publisher(const AsyncSession& session, const string& controlTopic, bool tx, bool durable); - int64_t publish(int msgs, int listeners, int size); - void terminate(); -}; - -/** - * A utility class for managing the options passed in to the test - */ -struct Args : public TestOptions { - int messages; - int subscribers; - bool transactional; - bool durable; - int batches; - int delay; - int size; - string statusqueue; - - Args() : messages(1000), subscribers(1), - transactional(false), durable(false), - batches(1), delay(0), size(256) - { - addOptions() - ("messages", optValue(messages, "N"), "how many messages to send") - ("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from") - ("transactional", optValue(transactional), "client should use transactions") - ("durable", optValue(durable), "messages should be durable") - ("batches", optValue(batches, "N"), "how many batches to run") - ("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch") - ("size", optValue(size, "BYTES"), "size of the published messages") - ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to read status messages from"); - } -}; - -Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) : - session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) -{ - mgr.subscribe(queue, "response"); -} - -int64_t Publisher::publish(int msgs, int listeners, int size){ - Message msg(generateData(size), controlTopic); - if (durable) { - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - } - AbsTime start = now(); - - for(int i = 0; i < msgs; i++){ - session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1); - } - //send report request - Message reportRequest("", controlTopic); - reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1); - if(transactional){ - sync(session).txCommit(); - } - //wait for a response from each listener (TODO, could log these) - for (int i = 0; i < listeners; i++) { - Message report = queue.pop(); - } - - if(transactional){ - sync(session).txCommit(); - } - - AbsTime finish = now(); - return Duration(start, finish); -} - -string Publisher::generateData(int size){ - string data; - for(int i = 0; i < size; i++){ - data += ('A' + (i / 26)); - } - return data; -} - -void Publisher::terminate(){ - //send termination request - Message terminationRequest("", controlTopic); - terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1); - if(transactional){ - session.txCommit(); - } -} - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) { - try{ - Args args; - args.parse(argc, argv); - if(args.help) - cout << args << endl; - else { - Connection connection; - args.open(connection); - AsyncSession session = connection.newSession(); - - // If status-queue is defined, wait for all expected listeners to join in before we start - if( args.statusqueue.length() > 0 ) { - cout << "Waiting for " << args.subscribers << " listeners..." << endl; - SubscriptionManager statusSubs(session); - LocalQueue statusQ; - statusSubs.subscribe(statusQ, args.statusqueue); - for (int i = 0; i < args.subscribers; i++) { - Message m = statusQ.get(); - if( m.getData().find("topic_listener: ", 0) == 0 ) { - cout << "Listener " << (i+1) << " of " << args.subscribers - << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16) - << ")" << endl; - } else { - throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData())); - } - } - } - - if (args.transactional) { - session.txSelect(); - } - session.queueDeclare(arg::queue="response"); - session.exchangeBind(arg::exchange="amq.direct", arg::queue="response", arg::bindingKey="response"); - - Publisher publisher(session, "topic_control", args.transactional, args.durable); - - int batchSize(args.batches); - int64_t max(0); - int64_t min(0); - int64_t sum(0); - for(int i = 0; i < batchSize; i++){ - if(i > 0 && args.delay) qpid::sys::sleep(args.delay); - int64_t msecs = - publisher.publish(args.messages, - args.subscribers, - args.size) / TIME_MSEC; - if(!max || msecs > max) max = msecs; - if(!min || msecs < min) min = msecs; - sum += msecs; - cout << "Completed " << (i+1) << " of " << batchSize - << " in " << msecs << "ms" << endl; - } - publisher.terminate(); - int64_t avg = sum / batchSize; - if(batchSize > 1){ - cout << batchSize << " batches completed. avg=" << avg << - ", max=" << max << ", min=" << min << endl; - } - session.close(); - connection.close(); - } - return 0; - }catch(exception& error) { - cout << error.what() << endl; - } - return 1; -} diff --git a/qpid/cpp/src/tests/topictest b/qpid/cpp/src/tests/topictest index 8fd680ee35..257c24bd81 100755 --- a/qpid/cpp/src/tests/topictest +++ b/qpid/cpp/src/tests/topictest @@ -46,11 +46,11 @@ done subscribe() { echo Start subscriber $1 LOG="subscriber_$1.log" - ./topic_listener $TRANSACTIONAL > $LOG 2>&1 && rm -f $LOG + ./qpid-topic-listener $TRANSACTIONAL > $LOG 2>&1 && rm -f $LOG } publish() { - ./topic_publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS $HOST $TRANSACTIONAL + ./qpid-topic-publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS $HOST $TRANSACTIONAL } for ((i=$SUBSCRIBERS ; i--; )); do diff --git a/qpid/cpp/src/tests/txtest.cpp b/qpid/cpp/src/tests/txtest.cpp deleted file mode 100644 index d0ba2f1245..0000000000 --- a/qpid/cpp/src/tests/txtest.cpp +++ /dev/null @@ -1,340 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include - -#include "TestOptions.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/framing/Array.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/Thread.h" - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - -namespace qpid { -namespace tests { - -typedef std::vector StringSet; - -struct Args : public qpid::TestOptions { - bool init, transfer, check;//actions - uint size; - bool durable; - uint queues; - string base; - uint msgsPerTx; - uint txCount; - uint totalMsgCount; - bool dtx; - bool quiet; - - Args() : init(true), transfer(true), check(true), - size(256), durable(true), queues(2), - base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10), - dtx(false), quiet(false) - { - addOptions() - - ("init", optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.") - ("transfer", optValue(transfer, "yes|no"), "'Move' messages from one queue to another using transactions to ensure no message loss.") - ("check", optValue(check, "yes|no"), "Check that the initial messages are all still available.") - ("size", optValue(size, "N"), "message size") - ("durable", optValue(durable, "yes|no"), "use durable messages") - ("queues", optValue(queues, "N"), "number of queues") - ("queue-base-name", optValue(base, ""), "base name for queues") - ("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction") - ("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'") - ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'") - ("dtx", optValue(dtx, "yes|no"), "use distributed transactions") - ("quiet", optValue(quiet), "reduce output from test"); - } -}; - -const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); - -std::string generateData(uint size) -{ - if (size < chars.length()) { - return chars.substr(0, size); - } - std::string data; - for (uint i = 0; i < (size / chars.length()); i++) { - data += chars; - } - data += chars.substr(0, size % chars.length()); - return data; -} - -void generateSet(const std::string& base, uint count, StringSet& collection) -{ - for (uint i = 0; i < count; i++) { - std::ostringstream out; - out << base << "-" << (i+1); - collection.push_back(out.str()); - } -} - -Args opts; - -struct Client -{ - Connection connection; - AsyncSession session; - - Client() - { - opts.open(connection); - session = connection.newSession(); - } - - ~Client() - { - try{ - session.close(); - connection.close(); - } catch(const std::exception& e) { - std::cout << e.what() << std::endl; - } - } -}; - -struct Transfer : public Client, public Runnable -{ - std::string src; - std::string dest; - Thread thread; - framing::Xid xid; - - Transfer(const std::string& to, const std::string& from) : src(to), dest(from), xid(0x4c414e47, "", from) {} - - void run() - { - try { - - if (opts.dtx) session.dtxSelect(); - else session.txSelect(); - SubscriptionManager subs(session); - - LocalQueue lq; - SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx)); - settings.autoAck = 0; // Disabled - Subscription sub = subs.subscribe(lq, src, settings); - - for (uint t = 0; t < opts.txCount; t++) { - Message in; - Message out("", dest); - if (opts.dtx) { - setNewXid(xid); - session.dtxStart(arg::xid=xid); - } - for (uint m = 0; m < opts.msgsPerTx; m++) { - in = lq.pop(); - std::string& data = in.getData(); - if (data.size() != opts.size) { - std::ostringstream oss; - oss << "Message size incorrect: size=" << in.getData().size() << "; expected " << opts.size; - throw std::runtime_error(oss.str()); - } - out.setData(data); - out.getMessageProperties().setCorrelationId(in.getMessageProperties().getCorrelationId()); - out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); - session.messageTransfer(arg::content=out, arg::acceptMode=1); - } - sub.accept(sub.getUnaccepted()); - if (opts.dtx) { - session.dtxEnd(arg::xid=xid); - session.dtxPrepare(arg::xid=xid); - session.dtxCommit(arg::xid=xid); - } else { - session.txCommit(); - } - } - } catch(const std::exception& e) { - std::cout << "Transfer interrupted: " << e.what() << std::endl; - } - } - - void setNewXid(framing::Xid& xid) { - framing::Uuid uuid(true); - xid.setGlobalId(uuid.str()); - } -}; - -struct Controller : public Client -{ - StringSet ids; - StringSet queues; - - Controller() - { - generateSet(opts.base, opts.queues, queues); - generateSet("msg", opts.totalMsgCount, ids); - } - - void init() - { - //declare queues - for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - session.queueDeclare(arg::queue=*i, arg::durable=opts.durable); - session.sync(); - } - - Message msg(generateData(opts.size), *queues.begin()); - if (opts.durable) { - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - } - - //publish messages - for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { - msg.getMessageProperties().setCorrelationId(*i); - session.messageTransfer(arg::content=msg, arg::acceptMode=1); - } - } - - void transfer() - { - boost::ptr_vector agents(opts.queues); - //launch transfer agents - for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - StringSet::iterator next = i + 1; - if (next == queues.end()) next = queues.begin(); - - if (!opts.quiet) std::cout << "Transfering from " << *i << " to " << *next << std::endl; - agents.push_back(new Transfer(*i, *next)); - agents.back().thread = Thread(agents.back()); - } - - for (boost::ptr_vector::iterator i = agents.begin(); i != agents.end(); i++) { - i->thread.join(); - } - } - - int check() - { - SubscriptionManager subs(session); - - // Recover DTX transactions (if any) - if (opts.dtx) { - std::vector inDoubtXids; - framing::DtxRecoverResult dtxRes = session.dtxRecover().get(); - const framing::Array& xidArr = dtxRes.getInDoubt(); - xidArr.collect(inDoubtXids); - - if (inDoubtXids.size()) { - if (!opts.quiet) std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl; - framing::StructHelper decoder; - framing::Xid xid; - // abort even, commit odd transactions - for (unsigned i = 0; i < inDoubtXids.size(); i++) { - decoder.decode(xid, inDoubtXids[i]); - if (!opts.quiet) std::cout << (i%2 ? " * aborting " : " * committing "); - xid.print(std::cout); - std::cout << std::endl; - if (i%2) { - session.dtxRollback(arg::xid=xid); - } else { - session.dtxCommit(arg::xid=xid); - } - } - } - } - - StringSet drained; - //drain each queue and verify the correct set of messages are available - for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - //subscribe, allocate credit and flushn - LocalQueue lq; - SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE); - subs.subscribe(lq, *i, settings); - session.messageFlush(arg::destination=*i); - session.sync(); - - uint count(0); - while (!lq.empty()) { - Message m = lq.pop(); - //add correlation ids of received messages to drained - drained.push_back(m.getMessageProperties().getCorrelationId()); - ++count; - } - if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; - } - - sort(ids.begin(), ids.end()); - sort(drained.begin(), drained.end()); - - //check that drained == ids - StringSet missing; - set_difference(ids.begin(), ids.end(), drained.begin(), drained.end(), back_inserter(missing)); - - StringSet extra; - set_difference(drained.begin(), drained.end(), ids.begin(), ids.end(), back_inserter(extra)); - - if (missing.empty() && extra.empty()) { - std::cout << "All expected messages were retrieved." << std::endl; - return 0; - } else { - if (!missing.empty()) { - std::cout << "The following ids were missing:" << std::endl; - for (StringSet::iterator i = missing.begin(); i != missing.end(); i++) { - std::cout << " '" << *i << "'" << std::endl; - } - } - if (!extra.empty()) { - std::cout << "The following extra ids were encountered:" << std::endl; - for (StringSet::iterator i = extra.begin(); i != extra.end(); i++) { - std::cout << " '" << *i << "'" << std::endl; - } - } - return 1; - } - } -}; - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) -{ - try { - opts.parse(argc, argv); - Controller controller; - if (opts.init) controller.init(); - if (opts.transfer) controller.transfer(); - if (opts.check) return controller.check(); - return 0; - } catch(const std::exception& e) { - std::cout << e.what() << std::endl; - } - return 2; -} -- cgit v1.2.1