summaryrefslogtreecommitdiff
path: root/cpp/src/tests/topic_publisher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/topic_publisher.cpp')
-rw-r--r--cpp/src/tests/topic_publisher.cpp59
1 files changed, 30 insertions, 29 deletions
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 1c5b51309b..1354dc1435 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -35,11 +35,10 @@
*/
#include "TestOptions.h"
-#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Exchange.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include <unistd.h>
#include "qpid/sys/Time.h"
@@ -57,7 +56,7 @@ using namespace std;
* back by the subscribers.
*/
class Publisher : public MessageListener{
- Channel* const channel;
+ Session_0_10& session;
const string controlTopic;
const bool transactional;
Monitor monitor;
@@ -67,7 +66,7 @@ class Publisher : public MessageListener{
string generateData(int size);
public:
- Publisher(Channel* channel, const string& controlTopic, bool tx);
+ Publisher(Session_0_10& session, const string& controlTopic, bool tx);
virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
@@ -79,7 +78,7 @@ public:
struct Args : public TestOptions {
int messages;
int subscribers;
- int ackmode;
+ int ack;
bool transactional;
int prefetch;
int batches;
@@ -87,13 +86,13 @@ struct Args : public TestOptions {
int size;
Args() : messages(1000), subscribers(1),
- ackmode(NO_ACK), transactional(false), prefetch(1000),
+ ack(500), transactional(false), prefetch(1000),
batches(1), delay(0), size(256)
{
addOptions()
("messages", optValue(messages, "N"), "how many messages to send")
("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from")
- ("ackmode", optValue(ackmode, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+ ("ack", optValue(ack, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
("transactional", optValue(transactional), "client should use transactions")
("prefetch", optValue(prefetch, "N"), "prefetch count")
("batches", optValue(batches, "N"), "how many batches to run")
@@ -110,18 +109,21 @@ int main(int argc, char** argv) {
cout << args << endl;
else {
Connection connection(args.trace);
- connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
- Channel channel(args.transactional, args.prefetch);
- connection.openChannel(channel);
+ args.open(connection);
+ Session_0_10 session = connection.newSession();
+ if (args.transactional) {
+ session.txSelect();
+ }
+
//declare queue (relying on default binding):
- Queue response("response");
- channel.declareQueue(response);
+ session.queueDeclare(arg::queue="response");
//set up listener
- Publisher publisher(&channel, "topic_control", args.transactional);
- channel.consume(response, "mytag", &publisher, AckMode(args.ackmode));
- channel.start();
+ SubscriptionManager mgr(session);
+ Publisher publisher(session, "topic_control", args.transactional);
+ mgr.subscribe(publisher, "response");
+ mgr.start();
int batchSize(args.batches);
int64_t max(0);
@@ -140,12 +142,13 @@ int main(int argc, char** argv) {
<< " in " << msecs << "ms" << endl;
}
publisher.terminate();
+ mgr.stop();
int64_t avg = sum / batchSize;
if(batchSize > 1){
cout << batchSize << " batches completed. avg=" << avg <<
", max=" << max << ", min=" << min << endl;
}
- channel.close();
+ session.close();
connection.close();
}
return 0;
@@ -155,8 +158,8 @@ int main(int argc, char** argv) {
return 1;
}
-Publisher::Publisher(Channel* _channel, const string& _controlTopic, bool tx) :
- channel(_channel), controlTopic(_controlTopic), transactional(tx){}
+Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx) :
+ session(_session), controlTopic(_controlTopic), transactional(tx){}
void Publisher::received(Message& ){
//count responses and when all are received end the current batch
@@ -172,21 +175,19 @@ void Publisher::waitForCompletion(int msgs){
}
int64_t Publisher::publish(int msgs, int listeners, int size){
- Message msg;
- msg.setData(generateData(size));
+ Message msg(generateData(size), controlTopic);
AbsTime start = now();
{
Monitor::ScopedLock l(monitor);
for(int i = 0; i < msgs; i++){
- channel->publish(
- msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
}
//send report request
- Message reportRequest;
+ Message reportRequest("", controlTopic);
reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
if(transactional){
- channel->commit();
+ session.txCommit();
}
waitForCompletion(listeners);
@@ -206,11 +207,11 @@ string Publisher::generateData(int size){
void Publisher::terminate(){
//send termination request
- Message terminationRequest;
+ Message terminationRequest("", controlTopic);
terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic");
if(transactional){
- channel->commit();
+ session.txCommit();
}
}