summaryrefslogtreecommitdiff
path: root/cpp/src/tests/topic_publisher.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-12-04 14:19:06 +0000
committerGordon Sim <gsim@apache.org>2007-12-04 14:19:06 +0000
commit6ee7a743ee2a306b9e34f3dc471046b68b21680a (patch)
treea745136f3d3517a8dd89a555c1af1fbd86d9c0f1 /cpp/src/tests/topic_publisher.cpp
parentb4e4f652d251c66b2ca487ad8d347130c874173d (diff)
downloadqpid-python-6ee7a743ee2a306b9e34f3dc471046b68b21680a.tar.gz
Updates to topic test
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@600962 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/topic_publisher.cpp')
-rw-r--r--cpp/src/tests/topic_publisher.cpp61
1 files changed, 24 insertions, 37 deletions
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index f678b0eb21..80c9bf6607 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -55,20 +55,18 @@ using namespace std;
* message listener and can therfore be used to receive messages sent
* back by the subscribers.
*/
-class Publisher : public MessageListener{
+class Publisher {
Session_0_10& session;
+ SubscriptionManager mgr;
+ LocalQueue queue;
const string controlTopic;
const bool transactional;
const bool durable;
- Monitor monitor;
- int count;
- void waitForCompletion(int msgs);
string generateData(int size);
public:
Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable);
- virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
};
@@ -118,11 +116,7 @@ int main(int argc, char** argv) {
//declare queue (relying on default binding):
session.queueDeclare(arg::queue="response");
- //set up listener
- SubscriptionManager mgr(session);
Publisher publisher(session, "topic_control", args.transactional, args.durable);
- mgr.subscribe(publisher, "response");
- mgr.start();
int batchSize(args.batches);
int64_t max(0);
@@ -141,7 +135,6 @@ int main(int argc, char** argv) {
<< " in " << msecs << "ms" << endl;
}
publisher.terminate();
- mgr.stop();
int64_t avg = sum / batchSize;
if(batchSize > 1){
cout << batchSize << " batches completed. avg=" << avg <<
@@ -158,19 +151,9 @@ int main(int argc, char** argv) {
}
Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) :
- session(_session), controlTopic(_controlTopic), transactional(tx), durable(d), count(0) {}
-
-void Publisher::received(Message& ){
- //count responses and when all are received end the current batch
- Monitor::ScopedLock l(monitor);
- if(--count == 0){
- monitor.notify();
- }
-}
-
-void Publisher::waitForCompletion(int msgs){
- count = msgs;
- monitor.wait();
+ session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
+{
+ mgr.subscribe(queue, "response");
}
int64_t Publisher::publish(int msgs, int listeners, int size){
@@ -179,22 +162,26 @@ int64_t Publisher::publish(int msgs, int listeners, int size){
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
AbsTime start = now();
- {
- Monitor::ScopedLock l(monitor);
- for(int i = 0; i < msgs; i++){
- session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
- }
- //send report request
- Message reportRequest("", controlTopic);
- reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
- if(transactional){
- session.txCommit();
- }
-
- waitForCompletion(listeners);
+
+ for(int i = 0; i < msgs; i++){
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
+ }
+ //send report request
+ Message reportRequest("", controlTopic);
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
+ if(transactional){
+ session.txCommit();
+ }
+ //wait for a response from each listener (TODO, could log these)
+ for (int i = 0; i < listeners; i++) {
+ Message report = queue.pop();
}
+ if(transactional){
+ session.txCommit();
+ }
+
AbsTime finish = now();
return Duration(start, finish);
}