From f83677056891e436bf5ba99e79240df2a44528cd Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Fri, 21 Oct 2011 14:42:12 +0000 Subject: Merged out from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/examples/old_api/failover/CMakeLists.txt | 22 ++ cpp/examples/old_api/failover/Makefile.am | 47 +++ cpp/examples/old_api/failover/declare_queues.cpp | 61 ++++ .../failover/failover_declare_queues.vcproj | 394 +++++++++++++++++++++ .../failover/failover_replaying_sender.vcproj | 394 +++++++++++++++++++++ .../failover/failover_resuming_receiver.vcproj | 394 +++++++++++++++++++++ cpp/examples/old_api/failover/replaying_sender.cpp | 97 +++++ .../old_api/failover/resuming_receiver.cpp | 127 +++++++ 8 files changed, 1536 insertions(+) create mode 100644 cpp/examples/old_api/failover/CMakeLists.txt create mode 100644 cpp/examples/old_api/failover/Makefile.am create mode 100644 cpp/examples/old_api/failover/declare_queues.cpp create mode 100644 cpp/examples/old_api/failover/failover_declare_queues.vcproj create mode 100644 cpp/examples/old_api/failover/failover_replaying_sender.vcproj create mode 100644 cpp/examples/old_api/failover/failover_resuming_receiver.vcproj create mode 100644 cpp/examples/old_api/failover/replaying_sender.cpp create mode 100644 cpp/examples/old_api/failover/resuming_receiver.cpp (limited to 'cpp/examples/old_api/failover') diff --git a/cpp/examples/old_api/failover/CMakeLists.txt b/cpp/examples/old_api/failover/CMakeLists.txt new file mode 100644 index 0000000000..05db8fad51 --- /dev/null +++ b/cpp/examples/old_api/failover/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +add_example(failover declare_queues) +add_example(failover resuming_receiver) +add_example(failover replaying_sender) diff --git a/cpp/examples/old_api/failover/Makefile.am b/cpp/examples/old_api/failover/Makefile.am new file mode 100644 index 0000000000..8b1da80f2c --- /dev/null +++ b/cpp/examples/old_api/failover/Makefile.am @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/old_api/failover + +MAKELDFLAGS=$(CLIENTFLAGS) +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender + +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(CLIENT_LIB) + +resuming_receiver_SOURCES=resuming_receiver.cpp +resuming_receiver_LDADD=$(CLIENT_LIB) + +replaying_sender_SOURCES=replaying_sender.cpp +replaying_sender_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + declare_queues.cpp \ + resuming_receiver.cpp \ + replaying_sender.cpp \ + $(MAKEDIST) + +# FIXME aconway 2008-10-10: add verify scripts. + +EXTRA_DIST= \ + CMakeLists.txt \ + failover_declare_queues.vcproj \ + failover_replaying_sender.vcproj \ + failover_resuming_receiver.vcproj diff --git a/cpp/examples/old_api/failover/declare_queues.cpp b/cpp/examples/old_api/failover/declare_queues.cpp new file mode 100644 index 0000000000..a677870c53 --- /dev/null +++ b/cpp/examples/old_api/failover/declare_queues.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include + +#include +#include + +using namespace qpid::client; + +using namespace std; + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + if (argc > 1) settings.host = argv[1]; + if (argc > 2) settings.port = atoi(argv[2]); + + FailoverManager connection(settings); + try { + bool complete = false; + while (!complete) { + Session session = connection.connect().newSession(); + try { + session.queueDeclare(arg::queue="message_queue"); + complete = true; + } catch (const qpid::TransportFailure&) {} + } + connection.close(); + return 0; + } catch (const std::exception& error) { + std::cout << "Failed:" << error.what() << std::endl; + return 1; + } + +} + + + + + diff --git a/cpp/examples/old_api/failover/failover_declare_queues.vcproj b/cpp/examples/old_api/failover/failover_declare_queues.vcproj new file mode 100644 index 0000000000..c87c72affd --- /dev/null +++ b/cpp/examples/old_api/failover/failover_declare_queues.vcproj @@ -0,0 +1,394 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cpp/examples/old_api/failover/failover_replaying_sender.vcproj b/cpp/examples/old_api/failover/failover_replaying_sender.vcproj new file mode 100644 index 0000000000..6d22fa6770 --- /dev/null +++ b/cpp/examples/old_api/failover/failover_replaying_sender.vcproj @@ -0,0 +1,394 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cpp/examples/old_api/failover/failover_resuming_receiver.vcproj b/cpp/examples/old_api/failover/failover_resuming_receiver.vcproj new file mode 100644 index 0000000000..ba5061e248 --- /dev/null +++ b/cpp/examples/old_api/failover/failover_resuming_receiver.vcproj @@ -0,0 +1,394 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cpp/examples/old_api/failover/replaying_sender.cpp b/cpp/examples/old_api/failover/replaying_sender.cpp new file mode 100644 index 0000000000..22a7e1ebd3 --- /dev/null +++ b/cpp/examples/old_api/failover/replaying_sender.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +class Sender : public FailoverManager::Command +{ + public: + Sender(const std::string& queue, uint count); + void execute(AsyncSession& session, bool isRetry); + uint getSent(); + private: + MessageReplayTracker sender; + const uint count; + uint sent; + Message message; + +}; + +Sender::Sender(const std::string& queue, uint count_) : sender(10), count(count_), sent(0) +{ + message.getDeliveryProperties().setRoutingKey(queue); +} + +void Sender::execute(AsyncSession& session, bool isRetry) +{ + if (isRetry) sender.replay(session); + else sender.init(session); + while (sent < count) { + stringstream message_data; + message_data << ++sent; + message.setData(message_data.str()); + message.getHeaders().setInt("sn", sent); + sender.send(message); + if (count > 1000 && !(sent % 1000)) { + std::cout << "sent " << sent << " of " << count << std::endl; + } + } + message.setData("That's all, folks!"); + sender.send(message); +} + +uint Sender::getSent() +{ + return sent; +} + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + if (argc > 1) settings.host = argv[1]; + if (argc > 2) settings.port = atoi(argv[2]); + + FailoverManager connection(settings); + Sender sender("message_queue", argc > 3 ? atoi(argv[3]) : 1000); + try { + connection.execute(sender); + std::cout << "Sent " << sender.getSent() << " messages." << std::endl; + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << "Failed: " << error.what() << std::endl; + } + return 1; +} diff --git a/cpp/examples/old_api/failover/resuming_receiver.cpp b/cpp/examples/old_api/failover/resuming_receiver.cpp new file mode 100644 index 0000000000..d1886ce861 --- /dev/null +++ b/cpp/examples/old_api/failover/resuming_receiver.cpp @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include + +#include +#include + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + +class Listener : public MessageListener, + public FailoverManager::Command, + public FailoverManager::ReconnectionStrategy +{ + public: + Listener(); + void received(Message& message); + void execute(AsyncSession& session, bool isRetry); + void check(); + void editUrlList(std::vector& urls); + private: + Subscription subscription; + uint count; + uint skipped; + uint lastSn; + bool gaps; +}; + +Listener::Listener() : count(0), skipped(0), lastSn(0), gaps(false) {} + +void Listener::received(Message & message) +{ + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + + std::cout << "Listener received " << count << " messages (" << skipped << " skipped)" << std::endl; + subscription.cancel(); + } else { + uint sn = message.getHeaders().getAsInt("sn"); + if (lastSn < sn) { + if (sn - lastSn > 1) { + std::cout << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl; + gaps = true; + } + lastSn = sn; + ++count; + } else { + ++skipped; + } + } +} + +void Listener::check() +{ + if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost."); +} + +void Listener::execute(AsyncSession& session, bool isRetry) +{ + if (isRetry) { + std::cout << "Resuming from " << count << std::endl; + } + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, "message_queue"); + subs.run(); +} + +void Listener::editUrlList(std::vector& urls) +{ + /** + * A more realistic algorithm would be to search through the list + * for prefered hosts and ensure they come first in the list. + */ + if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end()); +} + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + if (argc > 1) settings.host = argv[1]; + if (argc > 2) settings.port = atoi(argv[2]); + + Listener listener; + FailoverManager connection(settings, &listener); + + try { + connection.execute(listener); + connection.close(); + listener.check(); + std::cout << "Completed without error." << std::endl; + return 0; + } catch(const std::exception& error) { + std::cout << "Failure: " << error.what() << std::endl; + } + return 1; +} + + + -- cgit v1.2.1