diff options
author | Alan Conway <aconway@apache.org> | 2008-10-10 04:49:48 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-10 04:49:48 +0000 |
commit | 5d07d177cfc5eca21c44981bbe342f0cdcced4e5 (patch) | |
tree | 0f5f83642ed5effed52a5e2547565362ce2aea8c /cpp/examples | |
parent | e7ceead683231ef2cb35a6ee70488e859f023d12 (diff) | |
download | qpid-python-5d07d177cfc5eca21c44981bbe342f0cdcced4e5.tar.gz |
QPID-1340 froM Mick Goulish: preliminary client-side failover support.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/examples')
-rw-r--r-- | cpp/examples/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/examples/failover/Makefile.am | 21 | ||||
-rw-r--r-- | cpp/examples/failover/declare_queues.cpp | 88 | ||||
-rw-r--r-- | cpp/examples/failover/direct_producer.cpp | 148 | ||||
-rw-r--r-- | cpp/examples/failover/listener.cpp | 249 |
5 files changed, 507 insertions, 1 deletions
diff --git a/cpp/examples/Makefile.am b/cpp/examples/Makefile.am index 5415cae1a9..57ad22ec78 100644 --- a/cpp/examples/Makefile.am +++ b/cpp/examples/Makefile.am @@ -1,4 +1,4 @@ -SUBDIRS = direct fanout pub-sub request-response +SUBDIRS = direct fanout pub-sub request-response failover if HAVE_XML SUBDIRS += xml-exchange endif diff --git a/cpp/examples/failover/Makefile.am b/cpp/examples/failover/Makefile.am new file mode 100644 index 0000000000..8fe6b8cba7 --- /dev/null +++ b/cpp/examples/failover/Makefile.am @@ -0,0 +1,21 @@ +examplesdir=$(pkgdatadir)/examples/direct + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=direct_producer listener declare_queues +direct_producer_SOURCES=direct_producer.cpp +direct_producer_LDADD=$(CLIENT_LIB) + +listener_SOURCES=listener.cpp +listener_LDADD=$(CLIENT_LIB) + +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + direct_producer.cpp \ + listener.cpp \ + declare_queues.cpp \ + $(MAKEDIST) + +# FIXME aconway 2008-10-10: add verify scripts. diff --git a/cpp/examples/failover/declare_queues.cpp b/cpp/examples/failover/declare_queues.cpp new file mode 100644 index 0000000000..14e4a1e3cb --- /dev/null +++ b/cpp/examples/failover/declare_queues.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverConnection.h> +#include <qpid/client/Session.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <fstream> + +using namespace qpid::client; +using namespace qpid::framing; + + +using namespace std; + + + + +int +main ( int argc, char ** argv) +{ + if ( argc < 3 ) + { + std::cerr << "Usage: ./declare_queues host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + const char * host = argv[1]; + int port = atoi(argv[2]); + + + try + { + FailoverConnection connection; + FailoverSession * session; + + connection.open ( host, port ); + session = connection.newSession(); + + session->queueDeclare ( "message_queue"); + + /* + session->exchangeBind + ( arg::exchange="amq.direct", + arg::queue="message_queue", + arg::bindingKey="routing_key" + ); + * */ + session->exchangeBind ( "message_queue", + "amq.direct", + "routing_key" + ); + connection.close(); + return 0; + } + catch ( const std::exception& error ) + { + std::cout << error.what() << std::endl; + } + + return 1; +} + + + + + diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp new file mode 100644 index 0000000000..9a510b6fb3 --- /dev/null +++ b/cpp/examples/failover/direct_producer.cpp @@ -0,0 +1,148 @@ +#include <qpid/client/FailoverConnection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <fstream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + + + +int +main ( int argc, char ** argv) +{ + struct timeval broker_killed_time = {0,0}, + failover_complete_time = {0,0}, + duration = {0,0}; + + + if ( argc < 3 ) + { + std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + char const * host = argv[1]; + int port = atoi(argv[2]); + char const * broker_to_kill = 0; + + if ( argc > 3 ) + { + broker_to_kill = argv[3]; + std::cerr << "main: Broker marked for death is process ID " + << broker_to_kill + << endl; + } + else + { + std::cerr << "PRODUCER main: there is no broker to kill.\n"; + } + + FailoverConnection connection; + FailoverSession * session; + Message message; + + string program_name = "PRODUCER"; + + + connection.failoverCompleteTime = & failover_complete_time; + connection.name = program_name; + connection.open ( host, port ); + + session = connection.newSession(); + session->name = program_name; + + int send_this_many = 30, + messages_sent = 0; + + while ( messages_sent < send_this_many ) + { + if ( (messages_sent == 13) && broker_to_kill ) + { + char command[1000]; + std::cerr << program_name << " killing broker " << broker_to_kill << ".\n"; + sprintf(command, "kill -9 %s", broker_to_kill); + system ( command ); + gettimeofday ( & broker_killed_time, 0 ); + } + + message.getDeliveryProperties().setRoutingKey("routing_key"); + + std::cerr << "sending message " + << messages_sent + << " of " + << send_this_many + << ".\n"; + + stringstream message_data; + message_data << messages_sent; + message.setData(message_data.str()); + + try + { + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + } + catch ( const std::exception& error) + { + cerr << program_name << " exception: " << error.what() << endl; + } + + sleep ( 1 ); + ++ messages_sent; + } + + message.setData ( "That's all, folks!" ); + + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); + */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + + session->sync(); + connection.close(); + + // This will be incorrect if you killed more than one... + if ( broker_to_kill ) + { + timersub ( & failover_complete_time, + & broker_killed_time, + & duration + ); + fprintf ( stderr, + "Failover time: %ld.%.6ld\n", + duration.tv_sec, + duration.tv_usec + ); + } + + return 0; +} + + + diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp new file mode 100644 index 0000000000..c58a3b5e71 --- /dev/null +++ b/cpp/examples/failover/listener.cpp @@ -0,0 +1,249 @@ + +#include <qpid/client/FailoverConnection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <fstream> + + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + + + +struct Recorder +{ + unsigned int max_messages; + unsigned int * messages_received; + + Recorder ( ) + { + max_messages = 1000; + messages_received = new unsigned int [ max_messages ]; + memset ( messages_received, 0, max_messages * sizeof(int) ); + } + + + void + received ( int i ) + { + messages_received[i] ++; + } + + + + void + report ( ) + { + int i; + + int last_received_message = 0; + + vector<unsigned int> missed_messages, + multiple_messages; + + /*---------------------------------------------------- + Collect indices of missed and multiple messages. + ----------------------------------------------------*/ + bool seen_first_message = false; + for ( i = max_messages - 1; i >= 0; -- i ) + { + if ( ! seen_first_message ) + { + if ( messages_received [i] > 0 ) + { + seen_first_message = true; + last_received_message = i; + } + } + else + { + if ( messages_received [i] == 0 ) + missed_messages.push_back ( i ); + else + if ( messages_received [i] > 1 ) + { + multiple_messages.push_back ( i ); + } + } + } + + /*-------------------------------------------- + Report missed messages. + --------------------------------------------*/ + char const * verb = ( missed_messages.size() == 1 ) + ? " was " + : " were "; + + char const * plural = ( missed_messages.size() == 1 ) + ? "." + : "s."; + + std::cerr << "Listener::shutdown: There" + << verb + << missed_messages.size() + << " missed message" + << plural + << endl; + + for ( i = 0; i < int(missed_messages.size()); ++ i ) + { + std::cerr << " " << i << " was missed.\n"; + } + + + /*-------------------------------------------- + Report multiple messages. + --------------------------------------------*/ + verb = ( multiple_messages.size() == 1 ) + ? " was " + : " were "; + + plural = ( multiple_messages.size() == 1 ) + ? "." + : "s."; + + std::cerr << "Listener::shutdown: There" + << verb + << multiple_messages.size() + << " multiple message" + << plural + << endl; + + for ( i = 0; i < int(multiple_messages.size()); ++ i ) + { + std::cerr << " " + << multiple_messages[i] + << " was received " + << messages_received [ multiple_messages[i] ] + << " times.\n"; + } + + /* + for ( i = 0; i < last_received_message; ++ i ) + { + std::cerr << "Message " << i << ": " << messages_received[i] << std::endl; + } + */ + } + +}; + + + + +struct Listener : public MessageListener +{ + FailoverSubscriptionManager & subscriptionManager; + Recorder & recorder; + + + Listener ( FailoverSubscriptionManager& subs, + Recorder & recorder + ); + + void shutdown() { recorder.report(); } + void parse_message ( std::string const & msg ); + + virtual void received ( Message & message ); +}; + + + + + +Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : + subscriptionManager(s), + recorder(r) +{ +} + + + + + +void +Listener::received ( Message & message ) +{ + std::cerr << "Listener received: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") + { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptionManager.cancel(message.getDestination()); + + shutdown(); + } + else + { + parse_message ( message.getData() ); + } +} + + + + + +void +Listener::parse_message ( const std::string & msg ) +{ + int msg_number; + if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) + { + std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n"; + return; + } + recorder.received ( msg_number ); +} + + + + + + +int +main ( int argc, char ** argv ) +{ + string program_name = "LISTENER"; + + if ( argc < 3 ) + { + std::cerr << "Usage: ./listener host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + char const * host = argv[1]; + int port = atoi(argv[2]); + + FailoverConnection connection; + FailoverSession * session; + Recorder recorder; + + connection.name = program_name; + + connection.open ( host, port ); + session = connection.newSession(); + session->name = program_name; + + FailoverSubscriptionManager subscriptions ( session ); + subscriptions.name = program_name; + Listener listener ( subscriptions, recorder ); + subscriptions.subscribe ( listener, "message_queue" ); + subscriptions.run ( ); + + connection.close(); + + return 1; +} + + + + |