diff options
-rw-r--r-- | cpp/examples/failover/Makefile.am | 10 | ||||
-rw-r--r-- | cpp/examples/failover/direct_producer.cpp | 116 | ||||
-rw-r--r-- | cpp/examples/failover/listener.cpp | 124 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverConnection.cpp | 197 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverConnection.h | 99 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSession.cpp | 1466 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSession.h | 319 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 248 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverSubscriptionManager.h | 116 |
10 files changed, 1 insertions, 2701 deletions
diff --git a/cpp/examples/failover/Makefile.am b/cpp/examples/failover/Makefile.am index 36969dbd36..a5edd6565a 100644 --- a/cpp/examples/failover/Makefile.am +++ b/cpp/examples/failover/Makefile.am @@ -2,13 +2,7 @@ examplesdir=$(pkgdatadir)/examples/failover include $(top_srcdir)/examples/makedist.mk -noinst_PROGRAMS=direct_producer listener declare_queues resuming_receiver replaying_sender - -direct_producer_SOURCES=direct_producer.cpp -direct_producer_LDADD=$(CLIENT_LIB) - -listener_SOURCES=listener.cpp -listener_LDADD=$(CLIENT_LIB) +noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender declare_queues_SOURCES=declare_queues.cpp declare_queues_LDADD=$(CLIENT_LIB) @@ -20,8 +14,6 @@ replaying_sender_SOURCES=replaying_sender.cpp replaying_sender_LDADD=$(CLIENT_LIB) examples_DATA= \ - direct_producer.cpp \ - listener.cpp \ declare_queues.cpp \ resuming_receiver.cpp \ replaying_sender.cpp \ diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp deleted file mode 100644 index 2a0104a994..0000000000 --- a/cpp/examples/failover/direct_producer.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/client/FailoverConnection.h> -#include <qpid/client/Session.h> -#include <qpid/client/AsyncSession.h> -#include <qpid/client/Message.h> - - -#include <iostream> -#include <sstream> - -using namespace qpid::client; -using namespace qpid::framing; - -using namespace std; - -int -main ( int argc, char ** argv) -{ - - const char* host = argc>1 ? argv[1] : "127.0.0.1"; - int port = argc>2 ? atoi(argv[2]) : 5672; - int count = argc>3 ? atoi(argv[3]) : 30; - string program_name = "PRODUCER"; - - - try { - FailoverConnection connection; - FailoverSession * session; - Message message; - - connection.open ( host, port ); - session = connection.newSession(); - bool report = true; - int sent = 0; - while ( sent < count ) { - - message.getDeliveryProperties().setRoutingKey("routing_key"); - - - if ( count > 1000 ) - report = !(sent % 1000); - - report = false; - - if ( report ) - { - std::cout << "sending message " - << sent - << ".\n"; - } - - stringstream message_data; - message_data << sent; - message.setData(message_data.str()); - - /* FIXME mgoulish 21 oct 08 - session.messageTransfer ( arg::content=message, - arg::destination="amq.direct" - ); */ - session->messageTransfer ( "amq.direct", - 1, - 0, - message - ); - - ++ sent; - } - message.setData ( "That's all, folks!" ); - - /* FIXME mgoulish 21 oct 08 - session.messageTransfer ( arg::content=message, - arg::destination="amq.direct" - ); - */ - session->messageTransfer ( "amq.direct", - 1, - 0, - message - ); - - session->sync(); - connection.close(); - std::cout << program_name - << " sent " - << sent - << " messages.\n"; - - std::cout << program_name << ": " << " completed without error." << std::endl; - return 0; - } catch(const std::exception& error) { - std::cout << program_name << ": " << error.what() << std::endl; - std::cout << program_name << "Exiting.\n"; - return 1; - } - return 1; -} diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp deleted file mode 100644 index 82913a521a..0000000000 --- a/cpp/examples/failover/listener.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/client/FailoverConnection.h> -#include <qpid/client/Session.h> -#include <qpid/client/Message.h> -#include <qpid/client/SubscriptionManager.h> - -#include <iostream> -#include <fstream> - - -using namespace qpid::client; -using namespace qpid::framing; - -using namespace std; - - -struct Listener : public MessageListener -{ - FailoverSubscriptionManager & subscriptionManager; - - Listener ( FailoverSubscriptionManager& subs ); - - void shutdown() { subscriptionManager.stop(); } - - virtual void received ( Message & message ); - - int count; -}; - - - - - -Listener::Listener ( FailoverSubscriptionManager & s ) : - subscriptionManager(s), - count(0) -{ -} - - - - - -void -Listener::received ( Message & message ) -{ - /* - if(! (count%1000)) - std::cerr << "\t\tListener received: " << message.getData() << std::endl; - * */ - - ++ count; - - if (message.getData() == "That's all, folks!") - { - std::cout << "Shutting down listener for " << message.getDestination() - << std::endl; - - std::cout << "Listener received " << count << " messages.\n"; - subscriptionManager.cancel(message.getDestination()); - shutdown ( ); - } -} - - - - - - - -int -main ( int argc, char ** argv ) -{ - const char* host = argc>1 ? argv[1] : "127.0.0.1"; - int port = argc>2 ? atoi(argv[2]) : 5672; - string program_name = "LISTENER"; - - try { - - FailoverConnection connection; - FailoverSession * session; - - connection.open ( host, port ); - session = connection.newSession(); - - FailoverSubscriptionManager subscriptions ( session ); - Listener listener ( subscriptions ); - subscriptions.subscribe ( listener, "message_queue" ); - subscriptions.run ( ); - - connection.close(); - std::cout << program_name << ": " << " completed without error." << std::endl; - return 0; - - } catch(const std::exception& error) { - std::cout << program_name << ": " << error.what() << std::endl; - } - - return 0; -} - - - - diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 774fad3120..020c44cec4 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -379,10 +379,7 @@ libqpidclient_la_SOURCES = \ qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ qpid/client/Dispatcher.cpp \ - qpid/client/FailoverConnection.cpp \ qpid/client/FailoverManager.cpp \ - qpid/client/FailoverSession.cpp \ - qpid/client/FailoverSubscriptionManager.cpp \ qpid/client/FailoverListener.h \ qpid/client/FailoverListener.cpp \ qpid/client/Future.cpp \ @@ -519,13 +516,9 @@ nobase_include_HEADERS = \ qpid/client/Demux.h \ qpid/client/Dispatcher.h \ qpid/client/Execution.h \ - qpid/client/FailoverConnection.h \ - qpid/client/FailoverManager.h \ - qpid/client/FailoverSession.h \ qpid/client/Subscription.h \ qpid/client/SubscriptionImpl.h \ qpid/client/SubscriptionSettings.h \ - qpid/client/FailoverSubscriptionManager.h \ qpid/client/FlowControl.h \ qpid/client/Future.h \ qpid/client/FutureCompletion.h \ diff --git a/cpp/src/qpid/client/FailoverConnection.cpp b/cpp/src/qpid/client/FailoverConnection.cpp deleted file mode 100644 index 8b37ba5cfa..0000000000 --- a/cpp/src/qpid/client/FailoverConnection.cpp +++ /dev/null @@ -1,197 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - - -#include "qpid/log/Statement.h" -#include "qpid/client/FailoverConnection.h" -#include "qpid/client/ConnectionSettings.h" - -#include <iostream> -#include <fstream> - -using namespace std; - - -namespace qpid { -namespace client { - - -FailoverConnection::FailoverConnection ( ) : - failoverCompleteTime(0) -{ - connection.registerFailureCallback - ( boost::bind(&FailoverConnection::failover, this)); -} - -FailoverConnection::~FailoverConnection () {} - -void -FailoverConnection::open ( const std::string& host, - int port, - const std::string& uid, - const std::string& pwd, - const std::string& virtualhost, - uint16_t maxFrameSize -) -{ - ConnectionSettings settings; - - settings.host = host; - settings.port = port; - settings.username = uid; - settings.password = pwd; - settings.virtualhost = virtualhost; - settings.maxFrameSize = maxFrameSize; - settings.host = host; - - open ( settings ); -} - - -void -FailoverConnection::open ( ConnectionSettings & settings ) -{ - connection.open ( settings ); -} - - - -void -FailoverConnection::close ( ) -{ - connection.close(); -} - - - -FailoverSession * -FailoverConnection::newSession ( const std::string& /* name */ ) -{ - FailoverSession * fs = new FailoverSession; - sessions.push_back ( fs ); - fs->session = connection.newSession(); - return fs; -} - - - -void -FailoverConnection::resume ( FailoverSession & failoverSession ) -{ - connection.resume ( failoverSession.session ); -} - - -bool -FailoverConnection::isOpen() const -{ - return connection.isOpen(); -} - - -void -FailoverConnection::getKnownBrokers ( std::vector<std::string> & /*v*/ ) -{ -} - - -void -FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ ) -{ -} - -void -FailoverConnection::failover ( ) -{ - std::vector<FailoverSession *>::iterator sessions_iterator; - - for ( sessions_iterator = sessions.begin(); - sessions_iterator != sessions.end(); - ++ sessions_iterator ) - { - FailoverSession * fs = * sessions_iterator; - fs->failoverStarting(); - } - - std::vector<Url> knownBrokers = connection.getKnownBrokers(); - if (knownBrokers.empty()) - throw Exception(QPID_MSG("FailoverConnection::failover no known brokers.")); - - Connection newConnection; - for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) { - try { - newConnection.open(*i); - break; - } - catch (const std::exception& e) { - QPID_LOG(info, "Could not fail-over to " << *i << ": " << e.what()); - if ((i + 1) == knownBrokers.end()) - throw; - } - } - - /* - * We have a valid new connection. Tell all the sessions - * (and, through them, their SessionManagers and whatever else) - * that we are about to failover to this new Connection. - */ - - // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession - for ( sessions_iterator = sessions.begin(); - sessions_iterator < sessions.end(); - ++ sessions_iterator - ) - { - FailoverSession * fs = * sessions_iterator; - fs->prepareForFailover ( newConnection ); - } - - connection = newConnection; - connection.registerFailureCallback - ( boost::bind(&FailoverConnection::failover, this)); - - /* - * Tell all sessions to actually failover to the new connection. - */ - for ( sessions_iterator = sessions.begin(); - sessions_iterator < sessions.end(); - ++ sessions_iterator - ) - { - FailoverSession * fs = * sessions_iterator; - fs->failover ( ); - } - - for ( sessions_iterator = sessions.begin(); - sessions_iterator < sessions.end(); - ++ sessions_iterator - ) - { - FailoverSession * fs = * sessions_iterator; - fs->failoverComplete(); - } -} - - - - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/FailoverConnection.h b/cpp/src/qpid/client/FailoverConnection.h deleted file mode 100644 index 4a8780afa2..0000000000 --- a/cpp/src/qpid/client/FailoverConnection.h +++ /dev/null @@ -1,99 +0,0 @@ -#ifndef QPID_CLIENT_FAILOVERCONNECTION_H -#define QPID_CLIENT_FAILOVERCONNECTION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include <string> - -#include "qpid/client/Connection.h" -#include "qpid/client/FailoverConnection.h" -#include "qpid/client/FailoverSession.h" -#include "qpid/client/FailoverSubscriptionManager.h" -#include "qpid/sys/Mutex.h" - - -namespace qpid { -namespace client { - -struct ConnectionSettings; - - -class FailoverConnection -{ - public: - - FailoverConnection ( ); - - ~FailoverConnection ( ); - - void open ( const std::string& host, - int port, - const std::string& uid = "guest", - const std::string& pwd = "guest", - const std::string& virtualhost = "/", - uint16_t maxFrameSize=65535 - ); - - void open ( ConnectionSettings & settings ); - - void close ( ); - - FailoverSession * newSession ( const std::string& name = std::string() ); - - void resume ( FailoverSession & session ); - - bool isOpen() const; - - void getKnownBrokers ( std::vector<std::string> & v ); - - - // public interface specific to Failover: - - void registerFailureCallback ( boost::function<void ()> fn ); - - void failover ( ); - - struct timeval * failoverCompleteTime; - - - private: - - typedef sys::Mutex::ScopedLock Lock; - - sys::Mutex lock; - - Connection connection; - - boost::function<void ()> clientFailoverCallback; - - std::vector<FailoverSession *> sessions; - - - friend class FailoverSession; - friend class FailoverSessionManager; -}; - -}} // namespace qpid::client - - -#endif /*!QPID_CLIENT_FAILOVERCONNECTION_H*/ diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp deleted file mode 100644 index 5cb887a9f6..0000000000 --- a/cpp/src/qpid/client/FailoverSession.cpp +++ /dev/null @@ -1,1466 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <iostream> -#include <fstream> - - -#include "qpid/log/Logger.h" -#include "qpid/log/Options.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Time.h" - -#include "qpid/client/FailoverConnection.h" -#include "qpid/client/FailoverSession.h" - - -using namespace std; - - -namespace qpid { -namespace client { - -FailoverSession::FailoverSession ( ) : - failover_in_progress(false), - failover_count(0) -{ - // The session is created by FailoverConnection::newSession - failoverSubscriptionManager = 0; -} - - -FailoverSession::~FailoverSession ( ) -{ -} - - - -framing::FrameSet::shared_ptr -FailoverSession::get() -{ - while(1) - { - try - { - return session.get(); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - -SessionId -FailoverSession::getId() -{ - while(1) - { - try - { - return session.getId(); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - -void -FailoverSession::close() -{ - while(1) - { - try - { - session.close(); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - -void -FailoverSession::sync() -{ - while(1) - { - try - { - session.sync(); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - -uint32_t -FailoverSession::timeout(uint32_t /*seconds*/ ) -{ - // FIXME mgoulish return session.timeout ( seconds ); - return 0; -} - - -Execution& -FailoverSession::getExecution() -{ - while(1) - { - try - { - return session.getExecution(); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - -void -FailoverSession::flush() -{ - while(1) - { - try - { - session.flush(); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - -void -FailoverSession::markCompleted(const framing::SequenceNumber& id, - bool cumulative, - bool notifyPeer -) -{ - while(1) - { - try - { - session.markCompleted ( id, cumulative, notifyPeer ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -// Wrapped functions from Session ---------------------------- - -void -FailoverSession::executionSync() -{ - while(1) - { - try - { - session.executionSync(); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::executionResult ( const SequenceNumber& commandId, - const string& value -) -{ - while(1) - { - try - { - session.executionResult ( commandId, - value - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::executionException ( uint16_t errorCode, - const SequenceNumber& commandId, - uint8_t classCode, - uint8_t commandCode, - uint8_t fieldIndex, - const string& description, - const FieldTable& errorInfo -) -{ - while(1) - { - try - { - session.executionException ( errorCode, - commandId, - classCode, - commandCode, - fieldIndex, - description, - errorInfo - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::messageTransfer ( const string& destination, - uint8_t acceptMode, - uint8_t acquireMode, - const MethodContent& content -) -{ - - while ( 1 ) - { - try - { - session.messageTransfer ( destination, - acceptMode, - acquireMode, - content - ); - return; - } - catch ( ... ) - { - // Take special action only if there is a failover in progress. - if ( ! failover_in_progress ) - break; - - qpid::sys::usleep ( 1000 ); - } - } -} - - - -void -FailoverSession::messageAccept ( const SequenceSet& transfers ) -{ - while(1) - { - try - { - session.messageAccept ( transfers ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::messageReject ( const SequenceSet& transfers, - uint16_t code, - const string& text -) -{ - while(1) - { - try - { - session.messageReject ( transfers, - code, - text - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::messageRelease ( const SequenceSet& transfers, - bool setRedelivered -) -{ - while(1) - { - try - { - session.messageRelease ( transfers, - setRedelivered - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -qpid::framing::MessageAcquireResult -FailoverSession::messageAcquire ( const SequenceSet& transfers ) -{ - while(1) - { - try - { - return session.messageAcquire ( transfers ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -qpid::framing::MessageResumeResult -FailoverSession::messageResume ( const string& destination, - const string& resumeId -) -{ - while(1) - { - try - { - return session.messageResume ( destination, - resumeId - ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::messageSubscribe ( const string& queue, - const string& destination, - uint8_t acceptMode, - uint8_t acquireMode, - bool exclusive, - const string& resumeId, - uint64_t resumeTtl, - const FieldTable& arguments -) -{ - while(1) - { - try - { - session.messageSubscribe ( queue, - destination, - acceptMode, - acquireMode, - exclusive, - resumeId, - resumeTtl, - arguments - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::messageCancel ( const string& destinations ) -{ - while(1) - { - try - { - session.messageCancel ( destinations ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } - -} - - - -void -FailoverSession::messageSetFlowMode ( const string& destination, - uint8_t flowMode -) -{ - while(1) - { - try - { - session.messageSetFlowMode ( destination, - flowMode - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::messageFlow(const string& destination, - uint8_t unit, - uint32_t value) -{ - while(1) - { - try - { - session.messageFlow ( destination, - unit, - value - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::messageFlush(const string& destination) -{ - while(1) - { - try - { - session.messageFlush ( destination ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::messageStop(const string& destination) -{ - while(1) - { - try - { - session.messageStop ( destination ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::txSelect() -{ - while(1) - { - try - { - session.txSelect ( ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::txCommit() -{ - while(1) - { - try - { - session.txCommit ( ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::txRollback() -{ - while(1) - { - try - { - session.txRollback ( ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::dtxSelect() -{ - while(1) - { - try - { - session.dtxSelect ( ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::XaResult -FailoverSession::dtxStart(const Xid& xid, - bool join, - bool resume) -{ - while(1) - { - try - { - return session.dtxStart ( xid, - join, - resume - ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::XaResult -FailoverSession::dtxEnd(const Xid& xid, - bool fail, - bool suspend) -{ - while(1) - { - try - { - return session.dtxEnd ( xid, - fail, - suspend - ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::XaResult -FailoverSession::dtxCommit(const Xid& xid, - bool onePhase) -{ - while(1) - { - try - { - return session.dtxCommit ( xid, - onePhase - ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::dtxForget(const Xid& xid) -{ - while(1) - { - try - { - session.dtxForget ( xid ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::DtxGetTimeoutResult -FailoverSession::dtxGetTimeout(const Xid& xid) -{ - while(1) - { - try - { - return session.dtxGetTimeout ( xid ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::XaResult -FailoverSession::dtxPrepare(const Xid& xid) -{ - while(1) - { - try - { - return session.dtxPrepare ( xid ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::DtxRecoverResult -FailoverSession::dtxRecover() -{ - while(1) - { - try - { - return session.dtxRecover ( ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::XaResult -FailoverSession::dtxRollback(const Xid& xid) -{ - while(1) - { - try - { - return session.dtxRollback ( xid ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::dtxSetTimeout(const Xid& xid, - uint32_t timeout) -{ - while(1) - { - try - { - session.dtxSetTimeout ( xid, - timeout - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::exchangeDeclare(const string& exchange, - const string& type, - const string& alternateExchange, - bool passive, - bool durable, - bool autoDelete, - const FieldTable& arguments) -{ - while(1) - { - try - { - session.exchangeDeclare ( exchange, - type, - alternateExchange, - passive, - durable, - autoDelete, - arguments - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::exchangeDelete(const string& exchange, - bool ifUnused) -{ - while(1) - { - try - { - session.exchangeDelete ( exchange, - ifUnused - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::ExchangeQueryResult -FailoverSession::exchangeQuery(const string& name) -{ - while(1) - { - try - { - return session.exchangeQuery ( name ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::exchangeBind(const string& queue, - const string& exchange, - const string& bindingKey, - const FieldTable& arguments) -{ - while(1) - { - try - { - session.exchangeBind ( queue, - exchange, - bindingKey, - arguments - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::exchangeUnbind(const string& queue, - const string& exchange, - const string& bindingKey) -{ - while(1) - { - try - { - session.exchangeUnbind ( queue, - exchange, - bindingKey - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::ExchangeBoundResult -FailoverSession::exchangeBound(const string& exchange, - const string& queue, - const string& bindingKey, - const FieldTable& arguments) -{ - while(1) - { - try - { - return session.exchangeBound ( exchange, - queue, - bindingKey, - arguments - ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::queueDeclare(const string& queue, - const string& alternateExchange, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - const FieldTable& arguments) -{ - while(1) - { - try - { - session.queueDeclare ( queue, - alternateExchange, - passive, - durable, - exclusive, - autoDelete, - arguments - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::queueDelete(const string& queue, - bool ifUnused, - bool ifEmpty) -{ - while(1) - { - try - { - session.queueDelete ( queue, - ifUnused, - ifEmpty - ); - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -void -FailoverSession::queuePurge(const string& queue) -{ - while(1) - { - try - { - session.queuePurge ( queue) ; - return; - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - - -qpid::framing::QueueQueryResult -FailoverSession::queueQuery(const string& queue) -{ - while(1) - { - try - { - return session.queueQuery ( queue ); - } - catch ( const std::exception& error ) - { - if ( ! failover_in_progress ) - throw ( error ); - else - { - sys::Monitor::ScopedLock l(lock); - int current_failover_count = failover_count; - while ( current_failover_count == failover_count ) - lock.wait(); - } - } - } -} - - -// end Wrapped functions from Session --------------------------- - - -// Get ready for a failover. -void -FailoverSession::prepareForFailover ( Connection newConnection ) -{ - failover_in_progress = true; - try - { - newSession = newConnection.newSession(); - } - catch ( const std::exception& /*error*/ ) - { - throw Exception(QPID_MSG("Can't create failover session.")); - } - - if ( failoverSubscriptionManager ) - { - failoverSubscriptionManager->prepareForFailover ( newSession ); - } -} - - -void -FailoverSession::failoverStarting ( ) -{ - sys::Monitor::ScopedLock l(lock); - failover_in_progress = true; -} - - -void -FailoverSession::failoverComplete ( ) -{ - sys::Monitor::ScopedLock l(lock); - failover_in_progress = false; - ++ failover_count; - lock.notifyAll(); -} - - - -void -FailoverSession::failover ( ) -{ - if ( failoverSubscriptionManager ) - { - failoverSubscriptionManager->failover ( ); - } - session = newSession; -} - - -void FailoverSession::setFailoverSubscriptionManager(FailoverSubscriptionManager* fsm) { - failoverSubscriptionManager = fsm; -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/FailoverSession.h b/cpp/src/qpid/client/FailoverSession.h deleted file mode 100644 index 7a743da452..0000000000 --- a/cpp/src/qpid/client/FailoverSession.h +++ /dev/null @@ -1,319 +0,0 @@ -#ifndef QPID_CLIENT_FAILOVERSESSION_H -#define QPID_CLIENT_FAILOVERSESSION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/client/Session.h" -#include "qpid/SessionId.h" -#include "qpid/framing/amqp_structs.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/TransferContent.h" -#include "qpid/client/Completion.h" -#include "qpid/client/Connection.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/Execution.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/TypedResult.h" -#include "qpid/shared_ptr.h" -#include "qpid/sys/Monitor.h" - -#include <string> - - - - -namespace qpid { -namespace client { - - -class FailoverConnection; -class FailoverSubscriptionManager; - - -class FailoverSession -{ - public: - - typedef framing::TransferContent DefaultContent; - - FailoverSession ( ); - ~FailoverSession ( ); - - framing::FrameSet::shared_ptr get(); - - SessionId getId(); - - void close(); - - void sync(); - - uint32_t timeout ( uint32_t seconds); - - Execution& getExecution(); - - void flush(); - - void markCompleted(const framing::SequenceNumber& id, - bool cumulative, - bool notifyPeer - ); - - void sendCompletion ( ); - - - - // Wrapped functions from Session ---------------------------- - - void - executionSync(); - - - void - executionResult(const SequenceNumber& commandId=SequenceNumber(), - const string& value=string()); - - - void - executionException(uint16_t errorCode=0, - const SequenceNumber& commandId=SequenceNumber(), - uint8_t classCode=0, - uint8_t commandCode=0, - uint8_t fieldIndex=0, - const string& description=string(), - const FieldTable& errorInfo=FieldTable()); - - - void - messageTransfer(const string& destination=string(), - uint8_t acceptMode=1, - uint8_t acquireMode=0, - const MethodContent& content=DefaultContent(std::string())); - - - void - messageAccept(const SequenceSet& transfers=SequenceSet()); - - - void - messageReject(const SequenceSet& transfers=SequenceSet(), - uint16_t code=0, - const string& text=string()); - - - void - messageRelease(const SequenceSet& transfers=SequenceSet(), - bool setRedelivered=false); - - - qpid::framing::MessageAcquireResult - messageAcquire(const SequenceSet& transfers=SequenceSet()); - - - qpid::framing::MessageResumeResult - messageResume(const string& destination=string(), - const string& resumeId=string()); - - - void - messageSubscribe(const string& queue=string(), - const string& destination=string(), - uint8_t acceptMode=0, - uint8_t acquireMode=0, - bool exclusive=false, - const string& resumeId=string(), - uint64_t resumeTtl=0, - const FieldTable& arguments=FieldTable()); - - - void - messageCancel(const string& destination=string()); - - - void - messageSetFlowMode(const string& destination=string(), - uint8_t flowMode=0); - - - void - messageFlow(const string& destination=string(), - uint8_t unit=0, - uint32_t value=0); - - - void - messageFlush(const string& destination=string()); - - - void - messageStop(const string& destination=string()); - - - void - txSelect(); - - - void - txCommit(); - - - void - txRollback(); - - - void - dtxSelect(); - - - qpid::framing::XaResult - dtxStart(const Xid& xid=Xid(), - bool join=false, - bool resume=false); - - - qpid::framing::XaResult - dtxEnd(const Xid& xid=Xid(), - bool fail=false, - bool suspend=false); - - - qpid::framing::XaResult - dtxCommit(const Xid& xid=Xid(), - bool onePhase=false); - - - void - dtxForget(const Xid& xid=Xid()); - - - qpid::framing::DtxGetTimeoutResult - dtxGetTimeout(const Xid& xid=Xid()); - - - qpid::framing::XaResult - dtxPrepare(const Xid& xid=Xid()); - - - qpid::framing::DtxRecoverResult - dtxRecover(); - - - qpid::framing::XaResult - dtxRollback(const Xid& xid=Xid()); - - - void - dtxSetTimeout(const Xid& xid=Xid(), - uint32_t timeout=0); - - - void - exchangeDeclare(const string& exchange=string(), - const string& type=string(), - const string& alternateExchange=string(), - bool passive=false, - bool durable=false, - bool autoDelete=false, - const FieldTable& arguments=FieldTable()); - - - void - exchangeDelete(const string& exchange=string(), - bool ifUnused=false); - - - qpid::framing::ExchangeQueryResult - exchangeQuery(const string& name=string()); - - - void - exchangeBind(const string& queue=string(), - const string& exchange=string(), - const string& bindingKey=string(), - const FieldTable& arguments=FieldTable()); - - - void - exchangeUnbind(const string& queue=string(), - const string& exchange=string(), - const string& bindingKey=string()); - - - qpid::framing::ExchangeBoundResult - exchangeBound(const string& exchange=string(), - const string& queue=string(), - const string& bindingKey=string(), - const FieldTable& arguments=FieldTable()); - - - void - queueDeclare(const string& queue=string(), - const string& alternateExchange=string(), - bool passive=false, - bool durable=false, - bool exclusive=false, - bool autoDelete=false, - const FieldTable& arguments=FieldTable()); - - - void - queueDelete(const string& queue=string(), - bool ifUnused=false, - bool ifEmpty=false); - - - void - queuePurge(const string& queue=string()); - - - qpid::framing::QueueQueryResult - queueQuery(const string& queue=string()); - - // end Wrapped functions from Session --------------------------- - - // Tells the FailoverSession to get ready for a failover. - void failoverStarting(); - void prepareForFailover ( Connection newConnection ); - void failover ( ); - void failoverComplete(); - - void setFailoverSubscriptionManager(FailoverSubscriptionManager*); - - private: - sys::Monitor lock; - bool failover_in_progress; - int failover_count; - - - FailoverSubscriptionManager * failoverSubscriptionManager; - - Session session; - Session newSession; - - friend class FailoverConnection; - friend class FailoverSubscriptionManager; -}; - -}} // namespace qpid::client - - -#endif /*!QPID_CLIENT_FAILOVERSESSION_H*/ diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp deleted file mode 100644 index 5fa4cb2800..0000000000 --- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ /dev/null @@ -1,248 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/client/FailoverSession.h" -#include "qpid/client/FailoverSubscriptionManager.h" - - - -using namespace std; - - -namespace qpid { -namespace client { - - - -FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : - newSessionIsValid(false), - no_failover(false) -{ - subscriptionManager = new SubscriptionManager(fs->session); - fs->setFailoverSubscriptionManager(this); -} - - - -void -FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) -{ - sys::Monitor::ScopedLock l(lock); - newSession = _newSession; - newSessionIsValid = true; -} - - - -void -FailoverSubscriptionManager::failover ( ) -{ - sys::Monitor::ScopedLock l(lock); - // Stop the subscription manager thread so it can notice - // the failover in progress. - subscriptionManager->stop(); - lock.notifyAll(); -} - - - - -void -FailoverSubscriptionManager::subscribe ( MessageListener & listener, - const std::string & queue, - const SubscriptionSettings & settings, - const std::string & tag, - bool record_this -) -{ - sys::Monitor::ScopedLock l(lock); - - subscriptionManager->subscribe ( listener, - queue, - settings, - tag - ); - if ( record_this ) - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, settings, tag, false ) ); -} - - - -void -FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, - const std::string & queue, - const SubscriptionSettings & settings, - const std::string & tag, - bool record_this -) -{ - sys::Monitor::ScopedLock l(lock); - - subscriptionManager->subscribe ( localQueue, - queue, - settings, - tag - ); - - if ( record_this ) - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, settings, tag, false ) ); -} - - - -void -FailoverSubscriptionManager::subscribe ( MessageListener & listener, - const std::string & queue, - const std::string & tag, - bool record_this -) -{ - sys::Monitor::ScopedLock l(lock); - - subscriptionManager->subscribe ( listener, - queue, - tag - ); - - if ( record_this ) - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag, false ) ); -} - - - - -void -FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, - const std::string & queue, - const std::string & tag, - bool record_this -) -{ - sys::Monitor::ScopedLock l(lock); - - subscriptionManager->subscribe ( localQueue, - queue, - tag - ); - - if ( record_this ) - subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag, false ) ); -} - - - -bool -FailoverSubscriptionManager::get ( Message & result, - const std::string & queue, - sys::Duration timeout -) -{ - - return subscriptionManager->get ( result, queue, timeout ); -} - - - -void -FailoverSubscriptionManager::cancel ( const std::string tag ) -{ - - subscriptionManager->cancel ( tag ); -} - - - -void -FailoverSubscriptionManager::run ( ) // User Thread -{ - std::vector<subscribeFn> mySubscribeFns; - - while ( 1 ) - { - subscriptionManager->run ( ); - - // When we drop out of run, if there is a new Session - // waiting for us, this is a failover. Otherwise, just - // return control to usercode. - - { - sys::Monitor::ScopedLock l(lock); - - - while ( !newSessionIsValid && !no_failover ) - lock.wait(); - - - if ( newSessionIsValid ) - { - newSessionIsValid = false; - delete subscriptionManager; - subscriptionManager = new SubscriptionManager(newSession); - mySubscribeFns.swap ( subscribeFns ); - } - else - { - // Not a failover, return to user code. - break; - } - } - - for ( std::vector<subscribeFn>::iterator i = mySubscribeFns.begin(); - i != mySubscribeFns.end(); - ++ i - ) - { - (*i) (); - } - - } -} - - -void -FailoverSubscriptionManager::start ( ) -{ - - subscriptionManager->start ( ); -} - - - -void -FailoverSubscriptionManager::setAutoStop ( bool set ) -{ - - subscriptionManager->setAutoStop ( set ); -} - - - -void -FailoverSubscriptionManager::stop ( ) -{ - sys::Monitor::ScopedLock l(lock); - - no_failover = true; - subscriptionManager->stop ( ); - lock.notifyAll(); -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.h b/cpp/src/qpid/client/FailoverSubscriptionManager.h deleted file mode 100644 index 0556ba15ec..0000000000 --- a/cpp/src/qpid/client/FailoverSubscriptionManager.h +++ /dev/null @@ -1,116 +0,0 @@ -#ifndef QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H -#define QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include "qpid/sys/Mutex.h" -#include <qpid/client/Dispatcher.h> -#include <qpid/client/Completion.h> -#include <qpid/client/Session.h> -#include <qpid/client/FailoverSession.h> -#include <qpid/client/MessageListener.h> -#include <qpid/client/SubscriptionManager.h> -#include <qpid/client/LocalQueue.h> -#include <qpid/client/SubscriptionSettings.h> -#include <qpid/sys/Runnable.h> -#include <qpid/sys/Monitor.h> - - - - -namespace qpid { -namespace client { - - -class FailoverSubscriptionManager -{ - public: - - FailoverSubscriptionManager ( FailoverSession * fs ); - - void subscribe ( MessageListener & listener, - const std::string & queue, - const SubscriptionSettings & , - const std::string & tag = std::string(), - bool record_this = true ); - - void subscribe ( LocalQueue & localQueue, - const std::string & queue, - const SubscriptionSettings & , - const std::string & tag=std::string(), - bool record_this = true ); - - void subscribe ( MessageListener & listener, - const std::string & queue, - const std::string & tag = std::string(), - bool record_this = true ); - - void subscribe ( LocalQueue & localQueue, - const std::string & queue, - const std::string & tag=std::string(), - bool record_this = true ); - - bool get ( Message & result, - const std::string & queue, - sys::Duration timeout=0); - - void cancel ( const std::string tag ); - - void run ( ); - - void start ( ); - - void setAutoStop ( bool set = true ); - - void stop ( ); - - // Get ready for a failover. - void prepareForFailover ( Session newSession ); - void failover ( ); - - - private: - sys::Monitor lock; - - SubscriptionManager * subscriptionManager; - - MessageListener * savedListener; - std::string savedQueue, - savedTag; - - friend class FailoverConnection; - friend class FailoverSession; - - Session newSession; - bool newSessionIsValid; - bool no_failover; - - - typedef boost::function<void ()> subscribeFn; - std::vector < subscribeFn > subscribeFns; -}; - -}} // namespace qpid::client - - -#endif /*!QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H*/ |