summaryrefslogtreecommitdiff
path: root/cpp/src/tests/topic_publisher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/topic_publisher.cpp')
-rw-r--r--cpp/src/tests/topic_publisher.cpp161
1 files changed, 46 insertions, 115 deletions
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 9384053e68..f792540c09 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -34,6 +34,7 @@
* subscriber shutdown.
*/
+#include "TestOptions.h"
#include "qpid/QpidError.h"
#include "qpid/client/ClientChannel.h"
#include "qpid/client/Connection.h"
@@ -46,9 +47,10 @@
#include <cstdlib>
#include <iostream>
+using namespace qpid;
using namespace qpid::client;
using namespace qpid::sys;
-using std::string;
+using namespace std;
/**
* The publishing logic is defined in this class. It implements
@@ -57,7 +59,7 @@ using std::string;
*/
class Publisher : public MessageListener{
Channel* const channel;
- const std::string controlTopic;
+ const string controlTopic;
const bool transactional;
Monitor monitor;
int count;
@@ -66,7 +68,7 @@ class Publisher : public MessageListener{
string generateData(int size);
public:
- Publisher(Channel* channel, const std::string& controlTopic, bool tx);
+ Publisher(Channel* channel, const string& controlTopic, bool tx);
virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
@@ -75,51 +77,42 @@ public:
/**
* A utility class for managing the options passed in to the test
*/
-class Args{
- string host;
- int port;
+struct Args : public TestOptions {
int messages;
int subscribers;
- AckMode ackMode;
+ int ackmode;
bool transactional;
int prefetch;
int batches;
int delay;
int size;
- bool trace;
- bool help;
-public:
- inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1),
- ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1),
- delay(0), size(256), trace(false), help(false){}
-
- void parse(int argc, char** argv);
- void usage();
- const string& getHost() const { return host;}
- int getPort() const { return port; }
- int getMessages() const { return messages; }
- int getSubscribers() const { return subscribers; }
- AckMode getAckMode(){ return ackMode; }
- bool getTransactional() const { return transactional; }
- int getPrefetch(){ return prefetch; }
- int getBatches(){ return batches; }
- int getDelay(){ return delay; }
- int getSize(){ return size; }
- bool getTrace() const { return trace; }
- bool getHelp() const { return help; }
+ Args() : messages(1000), subscribers(1),
+ ackmode(NO_ACK), transactional(false), prefetch(1000),
+ batches(1), delay(0), size(256)
+ {
+ addOptions()
+ ("messages", optValue(messages, "N"), "how many messages to send")
+ ("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from")
+ ("ackmode", optValue(ackmode, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+ ("transactional", optValue(transactional), "client should use transactions")
+ ("prefetch", optValue(prefetch, "N"), "prefetch count")
+ ("batches", optValue(batches, "N"), "how many batches to run")
+ ("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch")
+ ("size", optValue(size, "BYTES"), "size of the published messages");
+ }
};
int main(int argc, char** argv) {
- Args args;
- args.parse(argc, argv);
- if(args.getHelp()){
- args.usage();
- } else {
- try{
- Connection connection(args.getTrace());
- connection.open(args.getHost(), args.getPort(), "guest", "guest", "/test");
- Channel channel(args.getTransactional(), args.getPrefetch());
+ try{
+ Args args;
+ args.parse(argc, argv);
+ if(args.help)
+ cout << args << endl;
+ else {
+ Connection connection(args.trace);
+ connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
+ Channel channel(args.transactional, args.prefetch);
connection.openChannel(channel);
//declare queue (relying on default binding):
@@ -127,44 +120,44 @@ int main(int argc, char** argv) {
channel.declareQueue(response);
//set up listener
- Publisher publisher(&channel, "topic_control", args.getTransactional());
- std::string tag("mytag");
- channel.consume(response, tag, &publisher, args.getAckMode());
+ Publisher publisher(&channel, "topic_control", args.transactional);
+ string tag("mytag");
+ channel.consume(response, tag, &publisher, AckMode(args.ackmode));
channel.start();
- int batchSize(args.getBatches());
+ int batchSize(args.batches);
int64_t max(0);
int64_t min(0);
int64_t sum(0);
for(int i = 0; i < batchSize; i++){
- if(i > 0 && args.getDelay()) sleep(args.getDelay());
+ if(i > 0 && args.delay) sleep(args.delay);
int64_t msecs =
- publisher.publish(args.getMessages(),
- args.getSubscribers(),
- args.getSize()) / TIME_MSEC;
+ publisher.publish(args.messages,
+ args.subscribers,
+ args.size) / TIME_MSEC;
if(!max || msecs > max) max = msecs;
if(!min || msecs < min) min = msecs;
sum += msecs;
- std::cout << "Completed " << (i+1) << " of " << batchSize
- << " in " << msecs << "ms" << std::endl;
+ cout << "Completed " << (i+1) << " of " << batchSize
+ << " in " << msecs << "ms" << endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
if(batchSize > 1){
- std::cout << batchSize << " batches completed. avg=" << avg <<
- ", max=" << max << ", min=" << min << std::endl;
+ cout << batchSize << " batches completed. avg=" << avg <<
+ ", max=" << max << ", min=" << min << endl;
}
channel.close();
connection.close();
- return 0;
- }catch(std::exception& error) {
- std::cout << error.what() << std::endl;
}
+ return 0;
+ }catch(exception& error) {
+ cout << error.what() << endl;
}
return 1;
}
-Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :
+Publisher::Publisher(Channel* _channel, const string& _controlTopic, bool tx) :
channel(_channel), controlTopic(_controlTopic), transactional(tx){}
void Publisher::received(Message& ){
@@ -223,65 +216,3 @@ void Publisher::terminate(){
}
}
-void Args::parse(int argc, char** argv){
- for(int i = 1; i < argc; i++){
- string name(argv[i]);
- if("-help" == name){
- help = true;
- break;
- }else if("-host" == name){
- host = argv[++i];
- }else if("-port" == name){
- port = atoi(argv[++i]);
- }else if("-messages" == name){
- messages = atoi(argv[++i]);
- }else if("-subscribers" == name){
- subscribers = atoi(argv[++i]);
- }else if("-ack_mode" == name){
- ackMode = AckMode(atoi(argv[++i]));
- }else if("-transactional" == name){
- transactional = true;
- }else if("-prefetch" == name){
- prefetch = atoi(argv[++i]);
- }else if("-batches" == name){
- batches = atoi(argv[++i]);
- }else if("-delay" == name){
- delay = atoi(argv[++i]);
- }else if("-size" == name){
- size = atoi(argv[++i]);
- }else if("-trace" == name){
- trace = true;
- }else{
- std::cout << "Warning: unrecognised option " << name << std::endl;
- }
- }
-}
-
-void Args::usage(){
- std::cout << "Options:" << std::endl;
- std::cout << " -help" << std::endl;
- std::cout << " Prints this usage message" << std::endl;
- std::cout << " -host <host>" << std::endl;
- std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
- std::cout << " -port <port>" << std::endl;
- std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
- std::cout << " -messages <count>" << std::endl;
- std::cout << " Specifies how many messages to send" << std::endl;
- std::cout << " -subscribers <count>" << std::endl;
- std::cout << " Specifies how many subscribers to expect reports from" << std::endl;
- std::cout << " -ack_mode <mode>" << std::endl;
- std::cout << " Sets the acknowledgement mode" << std::endl;
- std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
- std::cout << " -transactional" << std::endl;
- std::cout << " Indicates the client should use transactions" << std::endl;
- std::cout << " -prefetch <count>" << std::endl;
- std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
- std::cout << " -batches <count>" << std::endl;
- std::cout << " Specifies how many batches to run" << std::endl;
- std::cout << " -delay <seconds>" << std::endl;
- std::cout << " Causes a delay between each batch" << std::endl;
- std::cout << " -size <bytes>" << std::endl;
- std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl;
- std::cout << " -trace" << std::endl;
- std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
-}