summaryrefslogtreecommitdiff
path: root/cpp/examples
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-17 16:45:24 +0000
committerAlan Conway <aconway@apache.org>2008-10-17 16:45:24 +0000
commit7db0c0970eac260626263314c30f0e20d4ef6c21 (patch)
tree231024436b5b7185f63972d90318acce97816c22 /cpp/examples
parenta039e57108ed06586e73a255dc824ed27fc6de2a (diff)
downloadqpid-python-7db0c0970eac260626263314c30f0e20d4ef6c21.tar.gz
QPID-1367 Mick Goulish: improvements to client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/examples')
-rw-r--r--cpp/examples/direct/direct_producer.cpp6
-rw-r--r--cpp/examples/failover/direct_producer.cpp33
-rw-r--r--cpp/examples/failover/listener.cpp176
3 files changed, 46 insertions, 169 deletions
diff --git a/cpp/examples/direct/direct_producer.cpp b/cpp/examples/direct/direct_producer.cpp
index 40fc644bf3..baa8d9092b 100644
--- a/cpp/examples/direct/direct_producer.cpp
+++ b/cpp/examples/direct/direct_producer.cpp
@@ -64,6 +64,7 @@ using std::string;
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ int count = argc>3 ? atoi(argv[3]) : 10;
Connection connection;
Message message;
try {
@@ -81,14 +82,15 @@ int main(int argc, char** argv) {
// Now send some messages ...
- for (int i=0; i<10; i++) {
+ for (int i=0; i<count; i++) {
stringstream message_data;
message_data << "Message " << i;
message.setData(message_data.str());
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=message, arg::destination="amq.direct");
+ // async(session).messageTransfer(arg::content=message, arg::destination="amq.direct");
+ session.messageTransfer(arg::content=message, arg::destination="amq.direct");
}
// And send a final message to indicate termination.
diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp
index 1bee56e164..513971197e 100644
--- a/cpp/examples/failover/direct_producer.cpp
+++ b/cpp/examples/failover/direct_producer.cpp
@@ -36,12 +36,13 @@ using namespace std;
int
main ( int argc, char ** argv)
{
+
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
int count = argc>3 ? atoi(argv[3]) : 30;
- int delayMs = argc>4 ? atoi(argv[4]) : 1000;
string program_name = "PRODUCER";
+
try {
FailoverConnection connection;
FailoverSession * session;
@@ -49,14 +50,23 @@ main ( int argc, char ** argv)
connection.open ( host, port );
session = connection.newSession();
+ bool report = true;
int sent = 0;
while ( sent < count ) {
+
message.getDeliveryProperties().setRoutingKey("routing_key");
- std::cout << "sending message "
- << sent
- << " of "
- << count
- << ".\n";
+
+
+ if ( count > 1000 )
+ report = !(sent % 1000);
+
+ if ( report )
+ {
+ std::cout << "sending message "
+ << sent
+ << ".\n";
+ }
+
stringstream message_data;
message_data << sent;
message.setData(message_data.str());
@@ -70,12 +80,12 @@ main ( int argc, char ** argv)
0,
message
);
- usleep ( 1000*delayMs );
+
++ sent;
}
message.setData ( "That's all, folks!" );
- /* MICK FIXME
+ /* FIXME mgoulish 16 Oct 08
session.messageTransfer ( arg::content=message,
arg::destination="amq.direct"
);
@@ -88,10 +98,17 @@ main ( int argc, char ** argv)
session->sync();
connection.close();
+ std::cout << program_name
+ << " sent "
+ << sent
+ << " messages.\n";
+
std::cout << program_name << ": " << " completed without error." << std::endl;
return 0;
} catch(const std::exception& error) {
std::cout << program_name << ": " << error.what() << std::endl;
+ std::cout << program_name << "Exiting.\n";
+ return 1;
}
return 1;
}
diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp
index 1c47127389..d8cb78c9ce 100644
--- a/cpp/examples/failover/listener.cpp
+++ b/cpp/examples/failover/listener.cpp
@@ -34,150 +34,26 @@ using namespace qpid::framing;
using namespace std;
-struct Recorder
-{
- unsigned int max_messages;
- unsigned int * messages_received;
-
- Recorder ( )
- {
- max_messages = 1000;
- messages_received = new unsigned int [ max_messages ];
- memset ( messages_received, 0, max_messages * sizeof(int) );
- }
-
-
- void
- received ( int i )
- {
- messages_received[i] ++;
- }
-
-
-
- void
- report ( )
- {
- int i;
-
- int last_received_message = 0;
-
- vector<unsigned int> missed_messages,
- multiple_messages;
-
- /*----------------------------------------------------
- Collect indices of missed and multiple messages.
- ----------------------------------------------------*/
- bool seen_first_message = false;
- for ( i = max_messages - 1; i >= 0; -- i )
- {
- if ( ! seen_first_message )
- {
- if ( messages_received [i] > 0 )
- {
- seen_first_message = true;
- last_received_message = i;
- }
- }
- else
- {
- if ( messages_received [i] == 0 )
- missed_messages.push_back ( i );
- else
- if ( messages_received [i] > 1 )
- {
- multiple_messages.push_back ( i );
- }
- }
- }
-
- /*--------------------------------------------
- Report missed messages.
- --------------------------------------------*/
- char const * verb = ( missed_messages.size() == 1 )
- ? " was "
- : " were ";
-
- char const * plural = ( missed_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << missed_messages.size()
- << " missed message"
- << plural
- << endl;
-
- for ( i = 0; i < int(missed_messages.size()); ++ i )
- {
- std::cerr << " " << i << " was missed.\n";
- }
-
-
- /*--------------------------------------------
- Report multiple messages.
- --------------------------------------------*/
- verb = ( multiple_messages.size() == 1 )
- ? " was "
- : " were ";
-
- plural = ( multiple_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << multiple_messages.size()
- << " multiple message"
- << plural
- << endl;
-
- for ( i = 0; i < int(multiple_messages.size()); ++ i )
- {
- std::cerr << " "
- << multiple_messages[i]
- << " was received "
- << messages_received [ multiple_messages[i] ]
- << " times.\n";
- }
-
- /*
- for ( i = 0; i < last_received_message; ++ i )
- {
- std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
- }
- */
- }
-
-};
-
-
-
-
struct Listener : public MessageListener
{
FailoverSubscriptionManager & subscriptionManager;
- Recorder & recorder;
+ Listener ( FailoverSubscriptionManager& subs );
- Listener ( FailoverSubscriptionManager& subs,
- Recorder & recorder
- );
-
- void shutdown() { recorder.report(); }
- void parse_message ( std::string const & msg );
+ void shutdown() { subscriptionManager.stop(); }
virtual void received ( Message & message );
+
+ int count;
};
-Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
+Listener::Listener ( FailoverSubscriptionManager & s ) :
subscriptionManager(s),
- recorder(r)
+ count(0)
{
}
@@ -188,18 +64,19 @@ Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
void
Listener::received ( Message & message )
{
- std::cerr << "Listener received: " << message.getData() << std::endl;
+ if(! (count%1000))
+ std::cerr << "\t\tListener received: " << message.getData() << std::endl;
+
+ ++ count;
+
if (message.getData() == "That's all, folks!")
{
std::cout << "Shutting down listener for " << message.getDestination()
<< std::endl;
- subscriptionManager.cancel(message.getDestination());
- shutdown();
- }
- else
- {
- parse_message ( message.getData() );
+ std::cout << "Listener received " << count << " messages.\n";
+ subscriptionManager.cancel(message.getDestination());
+ shutdown ( );
}
}
@@ -207,21 +84,6 @@ Listener::received ( Message & message )
-void
-Listener::parse_message ( const std::string & msg )
-{
- int msg_number;
- if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) )
- {
- std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
- return;
- }
- recorder.received ( msg_number );
-}
-
-
-
-
int
@@ -235,17 +97,12 @@ main ( int argc, char ** argv )
FailoverConnection connection;
FailoverSession * session;
- Recorder recorder;
-
- connection.name = program_name;
connection.open ( host, port );
session = connection.newSession();
- session->name = program_name;
FailoverSubscriptionManager subscriptions ( session );
- subscriptions.name = program_name;
- Listener listener ( subscriptions, recorder );
+ Listener listener ( subscriptions );
subscriptions.subscribe ( listener, "message_queue" );
subscriptions.run ( );
@@ -256,7 +113,8 @@ main ( int argc, char ** argv )
} catch(const std::exception& error) {
std::cout << program_name << ": " << error.what() << std::endl;
}
- return 1;
+
+ return 0;
}