summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/resuming_receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/resuming_receiver.cpp')
-rw-r--r--qpid/cpp/src/tests/resuming_receiver.cpp69
1 files changed, 44 insertions, 25 deletions
diff --git a/qpid/cpp/src/tests/resuming_receiver.cpp b/qpid/cpp/src/tests/resuming_receiver.cpp
index 807bd83bee..abd62f003e 100644
--- a/qpid/cpp/src/tests/resuming_receiver.cpp
+++ b/qpid/cpp/src/tests/resuming_receiver.cpp
@@ -27,7 +27,6 @@
#include <iostream>
#include <fstream>
-
using namespace qpid;
using namespace qpid::client;
using namespace qpid::framing;
@@ -35,6 +34,7 @@ using namespace qpid::framing;
using namespace std;
+
namespace qpid {
namespace tests {
@@ -43,31 +43,34 @@ class Listener : public MessageListener,
public FailoverManager::ReconnectionStrategy
{
public:
- Listener ( int report_frequency = 1000, int verbosity = 0 );
+ Listener ( int report_frequency = 1000,
+ int verbosity = 0,
+ char const * queue_name = "message_queue" );
void received(Message& message);
void execute(AsyncSession& session, bool isRetry);
void check();
- void editUrlList(std::vector<Url>& urls);
+ void editUrlList(vector<Url>& urls);
private:
Subscription subscription;
uint count;
- uint received_twice;
+ vector<int> received_twice;
uint lastSn;
bool gaps;
uint reportFrequency;
int verbosity;
bool done;
+ string queueName;
};
-Listener::Listener(int freq, int verbosity)
+Listener::Listener ( int freq, int verbosity, char const * name )
: count(0),
- received_twice(0),
lastSn(0),
gaps(false),
reportFrequency(freq),
verbosity(verbosity),
- done(false)
+ done(false),
+ queueName ( name )
{}
@@ -78,36 +81,51 @@ void Listener::received(Message & message)
done = true;
if(verbosity > 0 )
{
- std::cout << "Shutting down listener for "
- << message.getDestination() << std::endl;
+ cout << "Shutting down listener for "
+ << message.getDestination() << endl;
- std::cout << "Listener received "
+ cout << "Listener received "
<< count
<< " messages ("
- << received_twice
+ << received_twice.size()
<< " received_twice)"
<< endl;
+
}
subscription.cancel();
if ( verbosity > 0 )
- std::cout << "LISTENER COMPLETED\n";
+ cout << "LISTENER COMPLETED\n";
+
+ if ( ! gaps ) {
+ cout << "no gaps were detected\n";
+ cout << received_twice.size() << " messages were received twice.\n";
+ }
+ else {
+ cout << "gaps detected\n";
+ for ( unsigned int i = 0; i < received_twice.size(); ++ i )
+ cout << "received_twice "
+ << received_twice[i]
+ << endl;
+ }
} else {
uint sn = message.getHeaders().getAsInt("sn");
if (lastSn < sn) {
if (sn - lastSn > 1) {
- std::cerr << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl;
+ cerr << "Error: gap in sequence between " << lastSn << " and " << sn << endl;
gaps = true;
}
lastSn = sn;
++count;
if ( ! ( count % reportFrequency ) ) {
if ( verbosity > 0 )
- std::cout << "Listener has received "
+ cout << "Listener has received "
<< count
- << " messages.\n";
+ << " messages on queue "
+ << queueName
+ << endl;
}
} else {
- ++received_twice;
+ received_twice.push_back ( sn );
}
}
}
@@ -119,21 +137,21 @@ void Listener::check()
void Listener::execute(AsyncSession& session, bool isRetry) {
if (verbosity > 0)
- std::cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl;
+ cout << "resuming_receiver " << (isRetry ? "first " : "re-") << "connect." << endl;
if (!done) {
SubscriptionManager subs(session);
- subscription = subs.subscribe(*this, "message_queue");
+ subscription = subs.subscribe(*this, queueName);
subs.run();
}
}
-void Listener::editUrlList(std::vector<Url>& urls)
+void Listener::editUrlList(vector<Url>& urls)
{
/**
* A more realistic algorithm would be to search through the list
* for prefered hosts and ensure they come first in the list.
*/
- if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end());
+ if (urls.size() > 1) rotate(urls.begin(), urls.begin() + 1, urls.end());
}
}} // namespace qpid::tests
@@ -144,9 +162,9 @@ int main(int argc, char ** argv)
{
ConnectionSettings settings;
- if ( argc != 5 )
+ if ( argc != 6 )
{
- std::cerr << "Usage: resuming_receiver host port report_frequency verbosity\n";
+ cerr << "Usage: resuming_receiver host port report_frequency verbosity queue_name\n";
return 1;
}
@@ -154,8 +172,9 @@ int main(int argc, char ** argv)
settings.port = atoi(argv[2]);
int reportFrequency = atoi(argv[3]);
int verbosity = atoi(argv[4]);
+ char * queue_name = argv[5];
- Listener listener(reportFrequency, verbosity);
+ Listener listener ( reportFrequency, verbosity, queue_name );
FailoverManager connection(settings, &listener);
try {
@@ -163,8 +182,8 @@ int main(int argc, char ** argv)
connection.close();
listener.check();
return 0;
- } catch(const std::exception& error) {
- std::cerr << "Receiver failed: " << error.what() << std::endl;
+ } catch(const exception& error) {
+ cerr << "Receiver failed: " << error.what() << endl;
}
return 1;
}