diff options
Diffstat (limited to 'cpp/examples/failover/direct_producer.cpp')
-rw-r--r-- | cpp/examples/failover/direct_producer.cpp | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp new file mode 100644 index 0000000000..9a510b6fb3 --- /dev/null +++ b/cpp/examples/failover/direct_producer.cpp @@ -0,0 +1,148 @@ +#include <qpid/client/FailoverConnection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <fstream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + + + +int +main ( int argc, char ** argv) +{ + struct timeval broker_killed_time = {0,0}, + failover_complete_time = {0,0}, + duration = {0,0}; + + + if ( argc < 3 ) + { + std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + char const * host = argv[1]; + int port = atoi(argv[2]); + char const * broker_to_kill = 0; + + if ( argc > 3 ) + { + broker_to_kill = argv[3]; + std::cerr << "main: Broker marked for death is process ID " + << broker_to_kill + << endl; + } + else + { + std::cerr << "PRODUCER main: there is no broker to kill.\n"; + } + + FailoverConnection connection; + FailoverSession * session; + Message message; + + string program_name = "PRODUCER"; + + + connection.failoverCompleteTime = & failover_complete_time; + connection.name = program_name; + connection.open ( host, port ); + + session = connection.newSession(); + session->name = program_name; + + int send_this_many = 30, + messages_sent = 0; + + while ( messages_sent < send_this_many ) + { + if ( (messages_sent == 13) && broker_to_kill ) + { + char command[1000]; + std::cerr << program_name << " killing broker " << broker_to_kill << ".\n"; + sprintf(command, "kill -9 %s", broker_to_kill); + system ( command ); + gettimeofday ( & broker_killed_time, 0 ); + } + + message.getDeliveryProperties().setRoutingKey("routing_key"); + + std::cerr << "sending message " + << messages_sent + << " of " + << send_this_many + << ".\n"; + + stringstream message_data; + message_data << messages_sent; + message.setData(message_data.str()); + + try + { + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + } + catch ( const std::exception& error) + { + cerr << program_name << " exception: " << error.what() << endl; + } + + sleep ( 1 ); + ++ messages_sent; + } + + message.setData ( "That's all, folks!" ); + + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); + */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + + session->sync(); + connection.close(); + + // This will be incorrect if you killed more than one... + if ( broker_to_kill ) + { + timersub ( & failover_complete_time, + & broker_killed_time, + & duration + ); + fprintf ( stderr, + "Failover time: %ld.%.6ld\n", + duration.tv_sec, + duration.tv_usec + ); + } + + return 0; +} + + + |