summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/qpid-txtest2.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/qpid-txtest2.cpp')
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp22
1 files changed, 15 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index cdd263a081..a744d07a12 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -88,6 +88,7 @@ struct Options : public qpid::Options {
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
+
bool parse(int argc, char** argv)
{
try {
@@ -109,9 +110,11 @@ struct Options : public qpid::Options {
std::cout << *this << std::endl << std::endl
<< "Transactionally moves messages between queues" << std::endl;
return false;
- } else {
- return true;
}
+ if (totalMsgCount < msgsPerTx) {
+ totalMsgCount = msgsPerTx; // Must have at least msgsPerTx total messages.
+ }
+ return true;
} catch (const std::exception& e) {
std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
return false;
@@ -158,6 +161,7 @@ struct Client
virtual ~Client()
{
try {
+ session.sync();
session.close();
connection.close();
} catch(const std::exception& e) {
@@ -177,12 +181,14 @@ struct Transfer : public TransactionalClient, public Runnable
const std::string target;
const std::string source;
Thread thread;
+ bool failed;
- Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {}
+ Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from), failed(false) {}
void run()
{
try {
+
Sender sender(session.createSender(target));
Receiver receiver(session.createReceiver(source));
receiver.setCapacity(opts.capacity);
@@ -211,7 +217,8 @@ struct Transfer : public TransactionalClient, public Runnable
sender.close();
receiver.close();
} catch(const std::exception& e) {
- std::cout << "Transfer interrupted: " << e.what() << std::endl;
+ failed = true;
+ QPID_LOG(error, "Transfer " << source << " to " << target << " interrupted: " << e.what());
}
}
};
@@ -263,9 +270,11 @@ struct Controller : public Client
agents.back().thread = Thread(agents.back());
}
- for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) {
+ for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
i->thread.join();
- }
+ for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++)
+ if (i->failed)
+ throw std::runtime_error("Transfer agents failed");
}
int check()
@@ -285,7 +294,6 @@ struct Controller : public Client
receiver.close();
if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl;
}
-
sort(ids.begin(), ids.end());
sort(drained.begin(), drained.end());