diff options
Diffstat (limited to 'qpid/cpp/src/tests/resuming_receiver.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/resuming_receiver.cpp | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/qpid/cpp/src/tests/resuming_receiver.cpp b/qpid/cpp/src/tests/resuming_receiver.cpp index 807bd83bee..abd62f003e 100644 --- a/qpid/cpp/src/tests/resuming_receiver.cpp +++ b/qpid/cpp/src/tests/resuming_receiver.cpp @@ -27,7 +27,6 @@ #include <iostream> #include <fstream> - using namespace qpid; using namespace qpid::client; using namespace qpid::framing; @@ -35,6 +34,7 @@ using namespace qpid::framing; using namespace std; + namespace qpid { namespace tests { @@ -43,31 +43,34 @@ class Listener : public MessageListener, public FailoverManager::ReconnectionStrategy { public: - Listener ( int report_frequency = 1000, int verbosity = 0 ); + Listener ( int report_frequency = 1000, + int verbosity = 0, + char const * queue_name = "message_queue" ); void received(Message& message); void execute(AsyncSession& session, bool isRetry); void check(); - void editUrlList(std::vector<Url>& urls); + void editUrlList(vector<Url>& urls); private: Subscription subscription; uint count; - uint received_twice; + vector<int> received_twice; uint lastSn; bool gaps; uint reportFrequency; int verbosity; bool done; + string queueName; }; -Listener::Listener(int freq, int verbosity) +Listener::Listener ( int freq, int verbosity, char const * name ) : count(0), - received_twice(0), lastSn(0), gaps(false), reportFrequency(freq), verbosity(verbosity), - done(false) + done(false), + queueName ( name ) {} @@ -78,36 +81,51 @@ void Listener::received(Message & message) done = true; if(verbosity > 0 ) { - std::cout << "Shutting down listener for " - << message.getDestination() << std::endl; + cout << "Shutting down listener for " + << message.getDestination() << endl; - std::cout << "Listener received " + cout << "Listener received " << count << " messages (" - << received_twice + << received_twice.size() << " received_twice)" << endl; + } subscription.cancel(); if ( verbosity > 0 ) - std::cout << "LISTENER COMPLETED\n"; + cout << "LISTENER COMPLETED\n"; + + if ( ! gaps ) { + cout << "no gaps were detected\n"; + cout << received_twice.size() << " messages were received twice.\n"; + } + else { + cout << "gaps detected\n"; + for ( unsigned int i = 0; i < received_twice.size(); ++ i ) + cout << "received_twice " + << received_twice[i] + << endl; + } } else { uint sn = message.getHeaders().getAsInt("sn"); if (lastSn < sn) { if (sn - lastSn > 1) { - std::cerr << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl; + cerr << "Error: gap in sequence between " << lastSn << " and " << sn << endl; gaps = true; } lastSn = sn; ++count; if ( ! ( count % reportFrequency ) ) { if ( verbosity > 0 ) - std::cout << "Listener has received " + cout << "Listener has received " << count - << " messages.\n"; + << " messages on queue " + << queueName + << endl; } } else { - ++received_twice; + received_twice.push_back ( sn ); } } } @@ -119,21 +137,21 @@ void Listener::check() void Listener::execute(AsyncSession& session, bool isRetry) { if (verbosity > 0) - std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl; + cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl; if (!done) { SubscriptionManager subs(session); - subscription = subs.subscribe(*this, "message_queue"); + subscription = subs.subscribe(*this, queueName); subs.run(); } } -void Listener::editUrlList(std::vector<Url>& urls) +void Listener::editUrlList(vector<Url>& urls) { /** * A more realistic algorithm would be to search through the list * for prefered hosts and ensure they come first in the list. */ - if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end()); + if (urls.size() > 1) rotate(urls.begin(), urls.begin() + 1, urls.end()); } }} // namespace qpid::tests @@ -144,9 +162,9 @@ int main(int argc, char ** argv) { ConnectionSettings settings; - if ( argc != 5 ) + if ( argc != 6 ) { - std::cerr << "Usage: resuming_receiver host port report_frequency verbosity\n"; + cerr << "Usage: resuming_receiver host port report_frequency verbosity queue_name\n"; return 1; } @@ -154,8 +172,9 @@ int main(int argc, char ** argv) settings.port = atoi(argv[2]); int reportFrequency = atoi(argv[3]); int verbosity = atoi(argv[4]); + char * queue_name = argv[5]; - Listener listener(reportFrequency, verbosity); + Listener listener ( reportFrequency, verbosity, queue_name ); FailoverManager connection(settings, &listener); try { @@ -163,8 +182,8 @@ int main(int argc, char ** argv) connection.close(); listener.check(); return 0; - } catch(const std::exception& error) { - std::cerr << "Receiver failed: " << error.what() << std::endl; + } catch(const exception& error) { + cerr << "Receiver failed: " << error.what() << endl; } return 1; } |
