summaryrefslogtreecommitdiff
path: root/cpp/examples
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-10 04:49:48 +0000
committerAlan Conway <aconway@apache.org>2008-10-10 04:49:48 +0000
commit5d07d177cfc5eca21c44981bbe342f0cdcced4e5 (patch)
tree0f5f83642ed5effed52a5e2547565362ce2aea8c /cpp/examples
parente7ceead683231ef2cb35a6ee70488e859f023d12 (diff)
downloadqpid-python-5d07d177cfc5eca21c44981bbe342f0cdcced4e5.tar.gz
QPID-1340 froM Mick Goulish: preliminary client-side failover support.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/examples')
-rw-r--r--cpp/examples/Makefile.am2
-rw-r--r--cpp/examples/failover/Makefile.am21
-rw-r--r--cpp/examples/failover/declare_queues.cpp88
-rw-r--r--cpp/examples/failover/direct_producer.cpp148
-rw-r--r--cpp/examples/failover/listener.cpp249
5 files changed, 507 insertions, 1 deletions
diff --git a/cpp/examples/Makefile.am b/cpp/examples/Makefile.am
index 5415cae1a9..57ad22ec78 100644
--- a/cpp/examples/Makefile.am
+++ b/cpp/examples/Makefile.am
@@ -1,4 +1,4 @@
-SUBDIRS = direct fanout pub-sub request-response
+SUBDIRS = direct fanout pub-sub request-response failover
if HAVE_XML
SUBDIRS += xml-exchange
endif
diff --git a/cpp/examples/failover/Makefile.am b/cpp/examples/failover/Makefile.am
new file mode 100644
index 0000000000..8fe6b8cba7
--- /dev/null
+++ b/cpp/examples/failover/Makefile.am
@@ -0,0 +1,21 @@
+examplesdir=$(pkgdatadir)/examples/direct
+
+include $(top_srcdir)/examples/makedist.mk
+
+noinst_PROGRAMS=direct_producer listener declare_queues
+direct_producer_SOURCES=direct_producer.cpp
+direct_producer_LDADD=$(CLIENT_LIB)
+
+listener_SOURCES=listener.cpp
+listener_LDADD=$(CLIENT_LIB)
+
+declare_queues_SOURCES=declare_queues.cpp
+declare_queues_LDADD=$(CLIENT_LIB)
+
+examples_DATA= \
+ direct_producer.cpp \
+ listener.cpp \
+ declare_queues.cpp \
+ $(MAKEDIST)
+
+# FIXME aconway 2008-10-10: add verify scripts.
diff --git a/cpp/examples/failover/declare_queues.cpp b/cpp/examples/failover/declare_queues.cpp
new file mode 100644
index 0000000000..14e4a1e3cb
--- /dev/null
+++ b/cpp/examples/failover/declare_queues.cpp
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+
+using namespace std;
+
+
+
+
+int
+main ( int argc, char ** argv)
+{
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./declare_queues host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
+
+ const char * host = argv[1];
+ int port = atoi(argv[2]);
+
+
+ try
+ {
+ FailoverConnection connection;
+ FailoverSession * session;
+
+ connection.open ( host, port );
+ session = connection.newSession();
+
+ session->queueDeclare ( "message_queue");
+
+ /*
+ session->exchangeBind
+ ( arg::exchange="amq.direct",
+ arg::queue="message_queue",
+ arg::bindingKey="routing_key"
+ );
+ * */
+ session->exchangeBind ( "message_queue",
+ "amq.direct",
+ "routing_key"
+ );
+ connection.close();
+ return 0;
+ }
+ catch ( const std::exception& error )
+ {
+ std::cout << error.what() << std::endl;
+ }
+
+ return 1;
+}
+
+
+
+
+
diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp
new file mode 100644
index 0000000000..9a510b6fb3
--- /dev/null
+++ b/cpp/examples/failover/direct_producer.cpp
@@ -0,0 +1,148 @@
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+
+
+int
+main ( int argc, char ** argv)
+{
+ struct timeval broker_killed_time = {0,0},
+ failover_complete_time = {0,0},
+ duration = {0,0};
+
+
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
+
+ char const * host = argv[1];
+ int port = atoi(argv[2]);
+ char const * broker_to_kill = 0;
+
+ if ( argc > 3 )
+ {
+ broker_to_kill = argv[3];
+ std::cerr << "main: Broker marked for death is process ID "
+ << broker_to_kill
+ << endl;
+ }
+ else
+ {
+ std::cerr << "PRODUCER main: there is no broker to kill.\n";
+ }
+
+ FailoverConnection connection;
+ FailoverSession * session;
+ Message message;
+
+ string program_name = "PRODUCER";
+
+
+ connection.failoverCompleteTime = & failover_complete_time;
+ connection.name = program_name;
+ connection.open ( host, port );
+
+ session = connection.newSession();
+ session->name = program_name;
+
+ int send_this_many = 30,
+ messages_sent = 0;
+
+ while ( messages_sent < send_this_many )
+ {
+ if ( (messages_sent == 13) && broker_to_kill )
+ {
+ char command[1000];
+ std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
+ sprintf(command, "kill -9 %s", broker_to_kill);
+ system ( command );
+ gettimeofday ( & broker_killed_time, 0 );
+ }
+
+ message.getDeliveryProperties().setRoutingKey("routing_key");
+
+ std::cerr << "sending message "
+ << messages_sent
+ << " of "
+ << send_this_many
+ << ".\n";
+
+ stringstream message_data;
+ message_data << messages_sent;
+ message.setData(message_data.str());
+
+ try
+ {
+ /* MICK FIXME
+ session.messageTransfer ( arg::content=message,
+ arg::destination="amq.direct"
+ ); */
+ session->messageTransfer ( "amq.direct",
+ 1,
+ 0,
+ message
+ );
+ }
+ catch ( const std::exception& error)
+ {
+ cerr << program_name << " exception: " << error.what() << endl;
+ }
+
+ sleep ( 1 );
+ ++ messages_sent;
+ }
+
+ message.setData ( "That's all, folks!" );
+
+ /* MICK FIXME
+ session.messageTransfer ( arg::content=message,
+ arg::destination="amq.direct"
+ );
+ */
+ session->messageTransfer ( "amq.direct",
+ 1,
+ 0,
+ message
+ );
+
+ session->sync();
+ connection.close();
+
+ // This will be incorrect if you killed more than one...
+ if ( broker_to_kill )
+ {
+ timersub ( & failover_complete_time,
+ & broker_killed_time,
+ & duration
+ );
+ fprintf ( stderr,
+ "Failover time: %ld.%.6ld\n",
+ duration.tv_sec,
+ duration.tv_usec
+ );
+ }
+
+ return 0;
+}
+
+
+
diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp
new file mode 100644
index 0000000000..c58a3b5e71
--- /dev/null
+++ b/cpp/examples/failover/listener.cpp
@@ -0,0 +1,249 @@
+
+#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+
+
+struct Recorder
+{
+ unsigned int max_messages;
+ unsigned int * messages_received;
+
+ Recorder ( )
+ {
+ max_messages = 1000;
+ messages_received = new unsigned int [ max_messages ];
+ memset ( messages_received, 0, max_messages * sizeof(int) );
+ }
+
+
+ void
+ received ( int i )
+ {
+ messages_received[i] ++;
+ }
+
+
+
+ void
+ report ( )
+ {
+ int i;
+
+ int last_received_message = 0;
+
+ vector<unsigned int> missed_messages,
+ multiple_messages;
+
+ /*----------------------------------------------------
+ Collect indices of missed and multiple messages.
+ ----------------------------------------------------*/
+ bool seen_first_message = false;
+ for ( i = max_messages - 1; i >= 0; -- i )
+ {
+ if ( ! seen_first_message )
+ {
+ if ( messages_received [i] > 0 )
+ {
+ seen_first_message = true;
+ last_received_message = i;
+ }
+ }
+ else
+ {
+ if ( messages_received [i] == 0 )
+ missed_messages.push_back ( i );
+ else
+ if ( messages_received [i] > 1 )
+ {
+ multiple_messages.push_back ( i );
+ }
+ }
+ }
+
+ /*--------------------------------------------
+ Report missed messages.
+ --------------------------------------------*/
+ char const * verb = ( missed_messages.size() == 1 )
+ ? " was "
+ : " were ";
+
+ char const * plural = ( missed_messages.size() == 1 )
+ ? "."
+ : "s.";
+
+ std::cerr << "Listener::shutdown: There"
+ << verb
+ << missed_messages.size()
+ << " missed message"
+ << plural
+ << endl;
+
+ for ( i = 0; i < int(missed_messages.size()); ++ i )
+ {
+ std::cerr << " " << i << " was missed.\n";
+ }
+
+
+ /*--------------------------------------------
+ Report multiple messages.
+ --------------------------------------------*/
+ verb = ( multiple_messages.size() == 1 )
+ ? " was "
+ : " were ";
+
+ plural = ( multiple_messages.size() == 1 )
+ ? "."
+ : "s.";
+
+ std::cerr << "Listener::shutdown: There"
+ << verb
+ << multiple_messages.size()
+ << " multiple message"
+ << plural
+ << endl;
+
+ for ( i = 0; i < int(multiple_messages.size()); ++ i )
+ {
+ std::cerr << " "
+ << multiple_messages[i]
+ << " was received "
+ << messages_received [ multiple_messages[i] ]
+ << " times.\n";
+ }
+
+ /*
+ for ( i = 0; i < last_received_message; ++ i )
+ {
+ std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
+ }
+ */
+ }
+
+};
+
+
+
+
+struct Listener : public MessageListener
+{
+ FailoverSubscriptionManager & subscriptionManager;
+ Recorder & recorder;
+
+
+ Listener ( FailoverSubscriptionManager& subs,
+ Recorder & recorder
+ );
+
+ void shutdown() { recorder.report(); }
+ void parse_message ( std::string const & msg );
+
+ virtual void received ( Message & message );
+};
+
+
+
+
+
+Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
+ subscriptionManager(s),
+ recorder(r)
+{
+}
+
+
+
+
+
+void
+Listener::received ( Message & message )
+{
+ std::cerr << "Listener received: " << message.getData() << std::endl;
+ if (message.getData() == "That's all, folks!")
+ {
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptionManager.cancel(message.getDestination());
+
+ shutdown();
+ }
+ else
+ {
+ parse_message ( message.getData() );
+ }
+}
+
+
+
+
+
+void
+Listener::parse_message ( const std::string & msg )
+{
+ int msg_number;
+ if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) )
+ {
+ std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
+ return;
+ }
+ recorder.received ( msg_number );
+}
+
+
+
+
+
+
+int
+main ( int argc, char ** argv )
+{
+ string program_name = "LISTENER";
+
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./listener host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
+
+ char const * host = argv[1];
+ int port = atoi(argv[2]);
+
+ FailoverConnection connection;
+ FailoverSession * session;
+ Recorder recorder;
+
+ connection.name = program_name;
+
+ connection.open ( host, port );
+ session = connection.newSession();
+ session->name = program_name;
+
+ FailoverSubscriptionManager subscriptions ( session );
+ subscriptions.name = program_name;
+ Listener listener ( subscriptions, recorder );
+ subscriptions.subscribe ( listener, "message_queue" );
+ subscriptions.run ( );
+
+ connection.close();
+
+ return 1;
+}
+
+
+
+