diff options
Diffstat (limited to 'qpid/cpp/src/tests/qpid-txtest2.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 22 |
1 files changed, 15 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index cdd263a081..a744d07a12 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -88,6 +88,7 @@ struct Options : public qpid::Options { ("help", qpid::optValue(help), "print this usage statement"); add(log); } + bool parse(int argc, char** argv) { try { @@ -109,9 +110,11 @@ struct Options : public qpid::Options { std::cout << *this << std::endl << std::endl << "Transactionally moves messages between queues" << std::endl; return false; - } else { - return true; } + if (totalMsgCount < msgsPerTx) { + totalMsgCount = msgsPerTx; // Must have at least msgsPerTx total messages. + } + return true; } catch (const std::exception& e) { std::cerr << *this << std::endl << std::endl << e.what() << std::endl; return false; @@ -158,6 +161,7 @@ struct Client virtual ~Client() { try { + session.sync(); session.close(); connection.close(); } catch(const std::exception& e) { @@ -177,12 +181,14 @@ struct Transfer : public TransactionalClient, public Runnable const std::string target; const std::string source; Thread thread; + bool failed; - Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {} + Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from), failed(false) {} void run() { try { + Sender sender(session.createSender(target)); Receiver receiver(session.createReceiver(source)); receiver.setCapacity(opts.capacity); @@ -211,7 +217,8 @@ struct Transfer : public TransactionalClient, public Runnable sender.close(); receiver.close(); } catch(const std::exception& e) { - std::cout << "Transfer interrupted: " << e.what() << std::endl; + failed = true; + QPID_LOG(error, "Transfer " << source << " to " << target << " interrupted: " << e.what()); } } }; @@ -263,9 +270,11 @@ struct Controller : public Client agents.back().thread = Thread(agents.back()); } - for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) { + for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) i->thread.join(); - } + for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) + if (i->failed) + throw std::runtime_error("Transfer agents failed"); } int check() @@ -285,7 +294,6 @@ struct Controller : public Client receiver.close(); if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; } - sort(ids.begin(), ids.end()); sort(drained.begin(), drained.end()); |
