summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/examples/failover/Makefile.am10
-rw-r--r--cpp/examples/failover/direct_producer.cpp116
-rw-r--r--cpp/examples/failover/listener.cpp124
-rw-r--r--cpp/src/Makefile.am7
-rw-r--r--cpp/src/qpid/client/FailoverConnection.cpp197
-rw-r--r--cpp/src/qpid/client/FailoverConnection.h99
-rw-r--r--cpp/src/qpid/client/FailoverSession.cpp1466
-rw-r--r--cpp/src/qpid/client/FailoverSession.h319
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.cpp248
-rw-r--r--cpp/src/qpid/client/FailoverSubscriptionManager.h116
10 files changed, 1 insertions, 2701 deletions
diff --git a/cpp/examples/failover/Makefile.am b/cpp/examples/failover/Makefile.am
index 36969dbd36..a5edd6565a 100644
--- a/cpp/examples/failover/Makefile.am
+++ b/cpp/examples/failover/Makefile.am
@@ -2,13 +2,7 @@ examplesdir=$(pkgdatadir)/examples/failover
include $(top_srcdir)/examples/makedist.mk
-noinst_PROGRAMS=direct_producer listener declare_queues resuming_receiver replaying_sender
-
-direct_producer_SOURCES=direct_producer.cpp
-direct_producer_LDADD=$(CLIENT_LIB)
-
-listener_SOURCES=listener.cpp
-listener_LDADD=$(CLIENT_LIB)
+noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender
declare_queues_SOURCES=declare_queues.cpp
declare_queues_LDADD=$(CLIENT_LIB)
@@ -20,8 +14,6 @@ replaying_sender_SOURCES=replaying_sender.cpp
replaying_sender_LDADD=$(CLIENT_LIB)
examples_DATA= \
- direct_producer.cpp \
- listener.cpp \
declare_queues.cpp \
resuming_receiver.cpp \
replaying_sender.cpp \
diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp
deleted file mode 100644
index 2a0104a994..0000000000
--- a/cpp/examples/failover/direct_producer.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/client/FailoverConnection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/AsyncSession.h>
-#include <qpid/client/Message.h>
-
-
-#include <iostream>
-#include <sstream>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-using namespace std;
-
-int
-main ( int argc, char ** argv)
-{
-
- const char* host = argc>1 ? argv[1] : "127.0.0.1";
- int port = argc>2 ? atoi(argv[2]) : 5672;
- int count = argc>3 ? atoi(argv[3]) : 30;
- string program_name = "PRODUCER";
-
-
- try {
- FailoverConnection connection;
- FailoverSession * session;
- Message message;
-
- connection.open ( host, port );
- session = connection.newSession();
- bool report = true;
- int sent = 0;
- while ( sent < count ) {
-
- message.getDeliveryProperties().setRoutingKey("routing_key");
-
-
- if ( count > 1000 )
- report = !(sent % 1000);
-
- report = false;
-
- if ( report )
- {
- std::cout << "sending message "
- << sent
- << ".\n";
- }
-
- stringstream message_data;
- message_data << sent;
- message.setData(message_data.str());
-
- /* FIXME mgoulish 21 oct 08
- session.messageTransfer ( arg::content=message,
- arg::destination="amq.direct"
- ); */
- session->messageTransfer ( "amq.direct",
- 1,
- 0,
- message
- );
-
- ++ sent;
- }
- message.setData ( "That's all, folks!" );
-
- /* FIXME mgoulish 21 oct 08
- session.messageTransfer ( arg::content=message,
- arg::destination="amq.direct"
- );
- */
- session->messageTransfer ( "amq.direct",
- 1,
- 0,
- message
- );
-
- session->sync();
- connection.close();
- std::cout << program_name
- << " sent "
- << sent
- << " messages.\n";
-
- std::cout << program_name << ": " << " completed without error." << std::endl;
- return 0;
- } catch(const std::exception& error) {
- std::cout << program_name << ": " << error.what() << std::endl;
- std::cout << program_name << "Exiting.\n";
- return 1;
- }
- return 1;
-}
diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp
deleted file mode 100644
index 82913a521a..0000000000
--- a/cpp/examples/failover/listener.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/client/FailoverConnection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/Message.h>
-#include <qpid/client/SubscriptionManager.h>
-
-#include <iostream>
-#include <fstream>
-
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-using namespace std;
-
-
-struct Listener : public MessageListener
-{
- FailoverSubscriptionManager & subscriptionManager;
-
- Listener ( FailoverSubscriptionManager& subs );
-
- void shutdown() { subscriptionManager.stop(); }
-
- virtual void received ( Message & message );
-
- int count;
-};
-
-
-
-
-
-Listener::Listener ( FailoverSubscriptionManager & s ) :
- subscriptionManager(s),
- count(0)
-{
-}
-
-
-
-
-
-void
-Listener::received ( Message & message )
-{
- /*
- if(! (count%1000))
- std::cerr << "\t\tListener received: " << message.getData() << std::endl;
- * */
-
- ++ count;
-
- if (message.getData() == "That's all, folks!")
- {
- std::cout << "Shutting down listener for " << message.getDestination()
- << std::endl;
-
- std::cout << "Listener received " << count << " messages.\n";
- subscriptionManager.cancel(message.getDestination());
- shutdown ( );
- }
-}
-
-
-
-
-
-
-
-int
-main ( int argc, char ** argv )
-{
- const char* host = argc>1 ? argv[1] : "127.0.0.1";
- int port = argc>2 ? atoi(argv[2]) : 5672;
- string program_name = "LISTENER";
-
- try {
-
- FailoverConnection connection;
- FailoverSession * session;
-
- connection.open ( host, port );
- session = connection.newSession();
-
- FailoverSubscriptionManager subscriptions ( session );
- Listener listener ( subscriptions );
- subscriptions.subscribe ( listener, "message_queue" );
- subscriptions.run ( );
-
- connection.close();
- std::cout << program_name << ": " << " completed without error." << std::endl;
- return 0;
-
- } catch(const std::exception& error) {
- std::cout << program_name << ": " << error.what() << std::endl;
- }
-
- return 0;
-}
-
-
-
-
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 774fad3120..020c44cec4 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -379,10 +379,7 @@ libqpidclient_la_SOURCES = \
qpid/client/Connector.cpp \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
- qpid/client/FailoverConnection.cpp \
qpid/client/FailoverManager.cpp \
- qpid/client/FailoverSession.cpp \
- qpid/client/FailoverSubscriptionManager.cpp \
qpid/client/FailoverListener.h \
qpid/client/FailoverListener.cpp \
qpid/client/Future.cpp \
@@ -519,13 +516,9 @@ nobase_include_HEADERS = \
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
- qpid/client/FailoverConnection.h \
- qpid/client/FailoverManager.h \
- qpid/client/FailoverSession.h \
qpid/client/Subscription.h \
qpid/client/SubscriptionImpl.h \
qpid/client/SubscriptionSettings.h \
- qpid/client/FailoverSubscriptionManager.h \
qpid/client/FlowControl.h \
qpid/client/Future.h \
qpid/client/FutureCompletion.h \
diff --git a/cpp/src/qpid/client/FailoverConnection.cpp b/cpp/src/qpid/client/FailoverConnection.cpp
deleted file mode 100644
index 8b37ba5cfa..0000000000
--- a/cpp/src/qpid/client/FailoverConnection.cpp
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-
-#include "qpid/log/Statement.h"
-#include "qpid/client/FailoverConnection.h"
-#include "qpid/client/ConnectionSettings.h"
-
-#include <iostream>
-#include <fstream>
-
-using namespace std;
-
-
-namespace qpid {
-namespace client {
-
-
-FailoverConnection::FailoverConnection ( ) :
- failoverCompleteTime(0)
-{
- connection.registerFailureCallback
- ( boost::bind(&FailoverConnection::failover, this));
-}
-
-FailoverConnection::~FailoverConnection () {}
-
-void
-FailoverConnection::open ( const std::string& host,
- int port,
- const std::string& uid,
- const std::string& pwd,
- const std::string& virtualhost,
- uint16_t maxFrameSize
-)
-{
- ConnectionSettings settings;
-
- settings.host = host;
- settings.port = port;
- settings.username = uid;
- settings.password = pwd;
- settings.virtualhost = virtualhost;
- settings.maxFrameSize = maxFrameSize;
- settings.host = host;
-
- open ( settings );
-}
-
-
-void
-FailoverConnection::open ( ConnectionSettings & settings )
-{
- connection.open ( settings );
-}
-
-
-
-void
-FailoverConnection::close ( )
-{
- connection.close();
-}
-
-
-
-FailoverSession *
-FailoverConnection::newSession ( const std::string& /* name */ )
-{
- FailoverSession * fs = new FailoverSession;
- sessions.push_back ( fs );
- fs->session = connection.newSession();
- return fs;
-}
-
-
-
-void
-FailoverConnection::resume ( FailoverSession & failoverSession )
-{
- connection.resume ( failoverSession.session );
-}
-
-
-bool
-FailoverConnection::isOpen() const
-{
- return connection.isOpen();
-}
-
-
-void
-FailoverConnection::getKnownBrokers ( std::vector<std::string> & /*v*/ )
-{
-}
-
-
-void
-FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ )
-{
-}
-
-void
-FailoverConnection::failover ( )
-{
- std::vector<FailoverSession *>::iterator sessions_iterator;
-
- for ( sessions_iterator = sessions.begin();
- sessions_iterator != sessions.end();
- ++ sessions_iterator )
- {
- FailoverSession * fs = * sessions_iterator;
- fs->failoverStarting();
- }
-
- std::vector<Url> knownBrokers = connection.getKnownBrokers();
- if (knownBrokers.empty())
- throw Exception(QPID_MSG("FailoverConnection::failover no known brokers."));
-
- Connection newConnection;
- for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) {
- try {
- newConnection.open(*i);
- break;
- }
- catch (const std::exception& e) {
- QPID_LOG(info, "Could not fail-over to " << *i << ": " << e.what());
- if ((i + 1) == knownBrokers.end())
- throw;
- }
- }
-
- /*
- * We have a valid new connection. Tell all the sessions
- * (and, through them, their SessionManagers and whatever else)
- * that we are about to failover to this new Connection.
- */
-
- // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession
- for ( sessions_iterator = sessions.begin();
- sessions_iterator < sessions.end();
- ++ sessions_iterator
- )
- {
- FailoverSession * fs = * sessions_iterator;
- fs->prepareForFailover ( newConnection );
- }
-
- connection = newConnection;
- connection.registerFailureCallback
- ( boost::bind(&FailoverConnection::failover, this));
-
- /*
- * Tell all sessions to actually failover to the new connection.
- */
- for ( sessions_iterator = sessions.begin();
- sessions_iterator < sessions.end();
- ++ sessions_iterator
- )
- {
- FailoverSession * fs = * sessions_iterator;
- fs->failover ( );
- }
-
- for ( sessions_iterator = sessions.begin();
- sessions_iterator < sessions.end();
- ++ sessions_iterator
- )
- {
- FailoverSession * fs = * sessions_iterator;
- fs->failoverComplete();
- }
-}
-
-
-
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverConnection.h b/cpp/src/qpid/client/FailoverConnection.h
deleted file mode 100644
index 4a8780afa2..0000000000
--- a/cpp/src/qpid/client/FailoverConnection.h
+++ /dev/null
@@ -1,99 +0,0 @@
-#ifndef QPID_CLIENT_FAILOVERCONNECTION_H
-#define QPID_CLIENT_FAILOVERCONNECTION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-#include <string>
-
-#include "qpid/client/Connection.h"
-#include "qpid/client/FailoverConnection.h"
-#include "qpid/client/FailoverSession.h"
-#include "qpid/client/FailoverSubscriptionManager.h"
-#include "qpid/sys/Mutex.h"
-
-
-namespace qpid {
-namespace client {
-
-struct ConnectionSettings;
-
-
-class FailoverConnection
-{
- public:
-
- FailoverConnection ( );
-
- ~FailoverConnection ( );
-
- void open ( const std::string& host,
- int port,
- const std::string& uid = "guest",
- const std::string& pwd = "guest",
- const std::string& virtualhost = "/",
- uint16_t maxFrameSize=65535
- );
-
- void open ( ConnectionSettings & settings );
-
- void close ( );
-
- FailoverSession * newSession ( const std::string& name = std::string() );
-
- void resume ( FailoverSession & session );
-
- bool isOpen() const;
-
- void getKnownBrokers ( std::vector<std::string> & v );
-
-
- // public interface specific to Failover:
-
- void registerFailureCallback ( boost::function<void ()> fn );
-
- void failover ( );
-
- struct timeval * failoverCompleteTime;
-
-
- private:
-
- typedef sys::Mutex::ScopedLock Lock;
-
- sys::Mutex lock;
-
- Connection connection;
-
- boost::function<void ()> clientFailoverCallback;
-
- std::vector<FailoverSession *> sessions;
-
-
- friend class FailoverSession;
- friend class FailoverSessionManager;
-};
-
-}} // namespace qpid::client
-
-
-#endif /*!QPID_CLIENT_FAILOVERCONNECTION_H*/
diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp
deleted file mode 100644
index 5cb887a9f6..0000000000
--- a/cpp/src/qpid/client/FailoverSession.cpp
+++ /dev/null
@@ -1,1466 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <iostream>
-#include <fstream>
-
-
-#include "qpid/log/Logger.h"
-#include "qpid/log/Options.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Time.h"
-
-#include "qpid/client/FailoverConnection.h"
-#include "qpid/client/FailoverSession.h"
-
-
-using namespace std;
-
-
-namespace qpid {
-namespace client {
-
-FailoverSession::FailoverSession ( ) :
- failover_in_progress(false),
- failover_count(0)
-{
- // The session is created by FailoverConnection::newSession
- failoverSubscriptionManager = 0;
-}
-
-
-FailoverSession::~FailoverSession ( )
-{
-}
-
-
-
-framing::FrameSet::shared_ptr
-FailoverSession::get()
-{
- while(1)
- {
- try
- {
- return session.get();
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-SessionId
-FailoverSession::getId()
-{
- while(1)
- {
- try
- {
- return session.getId();
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-void
-FailoverSession::close()
-{
- while(1)
- {
- try
- {
- session.close();
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-void
-FailoverSession::sync()
-{
- while(1)
- {
- try
- {
- session.sync();
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-uint32_t
-FailoverSession::timeout(uint32_t /*seconds*/ )
-{
- // FIXME mgoulish return session.timeout ( seconds );
- return 0;
-}
-
-
-Execution&
-FailoverSession::getExecution()
-{
- while(1)
- {
- try
- {
- return session.getExecution();
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-void
-FailoverSession::flush()
-{
- while(1)
- {
- try
- {
- session.flush();
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-void
-FailoverSession::markCompleted(const framing::SequenceNumber& id,
- bool cumulative,
- bool notifyPeer
-)
-{
- while(1)
- {
- try
- {
- session.markCompleted ( id, cumulative, notifyPeer );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-// Wrapped functions from Session ----------------------------
-
-void
-FailoverSession::executionSync()
-{
- while(1)
- {
- try
- {
- session.executionSync();
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::executionResult ( const SequenceNumber& commandId,
- const string& value
-)
-{
- while(1)
- {
- try
- {
- session.executionResult ( commandId,
- value
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::executionException ( uint16_t errorCode,
- const SequenceNumber& commandId,
- uint8_t classCode,
- uint8_t commandCode,
- uint8_t fieldIndex,
- const string& description,
- const FieldTable& errorInfo
-)
-{
- while(1)
- {
- try
- {
- session.executionException ( errorCode,
- commandId,
- classCode,
- commandCode,
- fieldIndex,
- description,
- errorInfo
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::messageTransfer ( const string& destination,
- uint8_t acceptMode,
- uint8_t acquireMode,
- const MethodContent& content
-)
-{
-
- while ( 1 )
- {
- try
- {
- session.messageTransfer ( destination,
- acceptMode,
- acquireMode,
- content
- );
- return;
- }
- catch ( ... )
- {
- // Take special action only if there is a failover in progress.
- if ( ! failover_in_progress )
- break;
-
- qpid::sys::usleep ( 1000 );
- }
- }
-}
-
-
-
-void
-FailoverSession::messageAccept ( const SequenceSet& transfers )
-{
- while(1)
- {
- try
- {
- session.messageAccept ( transfers );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::messageReject ( const SequenceSet& transfers,
- uint16_t code,
- const string& text
-)
-{
- while(1)
- {
- try
- {
- session.messageReject ( transfers,
- code,
- text
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::messageRelease ( const SequenceSet& transfers,
- bool setRedelivered
-)
-{
- while(1)
- {
- try
- {
- session.messageRelease ( transfers,
- setRedelivered
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-qpid::framing::MessageAcquireResult
-FailoverSession::messageAcquire ( const SequenceSet& transfers )
-{
- while(1)
- {
- try
- {
- return session.messageAcquire ( transfers );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-qpid::framing::MessageResumeResult
-FailoverSession::messageResume ( const string& destination,
- const string& resumeId
-)
-{
- while(1)
- {
- try
- {
- return session.messageResume ( destination,
- resumeId
- );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::messageSubscribe ( const string& queue,
- const string& destination,
- uint8_t acceptMode,
- uint8_t acquireMode,
- bool exclusive,
- const string& resumeId,
- uint64_t resumeTtl,
- const FieldTable& arguments
-)
-{
- while(1)
- {
- try
- {
- session.messageSubscribe ( queue,
- destination,
- acceptMode,
- acquireMode,
- exclusive,
- resumeId,
- resumeTtl,
- arguments
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::messageCancel ( const string& destinations )
-{
- while(1)
- {
- try
- {
- session.messageCancel ( destinations );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-
-}
-
-
-
-void
-FailoverSession::messageSetFlowMode ( const string& destination,
- uint8_t flowMode
-)
-{
- while(1)
- {
- try
- {
- session.messageSetFlowMode ( destination,
- flowMode
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::messageFlow(const string& destination,
- uint8_t unit,
- uint32_t value)
-{
- while(1)
- {
- try
- {
- session.messageFlow ( destination,
- unit,
- value
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::messageFlush(const string& destination)
-{
- while(1)
- {
- try
- {
- session.messageFlush ( destination );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::messageStop(const string& destination)
-{
- while(1)
- {
- try
- {
- session.messageStop ( destination );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::txSelect()
-{
- while(1)
- {
- try
- {
- session.txSelect ( );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::txCommit()
-{
- while(1)
- {
- try
- {
- session.txCommit ( );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::txRollback()
-{
- while(1)
- {
- try
- {
- session.txRollback ( );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::dtxSelect()
-{
- while(1)
- {
- try
- {
- session.dtxSelect ( );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::XaResult
-FailoverSession::dtxStart(const Xid& xid,
- bool join,
- bool resume)
-{
- while(1)
- {
- try
- {
- return session.dtxStart ( xid,
- join,
- resume
- );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::XaResult
-FailoverSession::dtxEnd(const Xid& xid,
- bool fail,
- bool suspend)
-{
- while(1)
- {
- try
- {
- return session.dtxEnd ( xid,
- fail,
- suspend
- );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::XaResult
-FailoverSession::dtxCommit(const Xid& xid,
- bool onePhase)
-{
- while(1)
- {
- try
- {
- return session.dtxCommit ( xid,
- onePhase
- );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::dtxForget(const Xid& xid)
-{
- while(1)
- {
- try
- {
- session.dtxForget ( xid );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::DtxGetTimeoutResult
-FailoverSession::dtxGetTimeout(const Xid& xid)
-{
- while(1)
- {
- try
- {
- return session.dtxGetTimeout ( xid );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::XaResult
-FailoverSession::dtxPrepare(const Xid& xid)
-{
- while(1)
- {
- try
- {
- return session.dtxPrepare ( xid );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::DtxRecoverResult
-FailoverSession::dtxRecover()
-{
- while(1)
- {
- try
- {
- return session.dtxRecover ( );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::XaResult
-FailoverSession::dtxRollback(const Xid& xid)
-{
- while(1)
- {
- try
- {
- return session.dtxRollback ( xid );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::dtxSetTimeout(const Xid& xid,
- uint32_t timeout)
-{
- while(1)
- {
- try
- {
- session.dtxSetTimeout ( xid,
- timeout
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::exchangeDeclare(const string& exchange,
- const string& type,
- const string& alternateExchange,
- bool passive,
- bool durable,
- bool autoDelete,
- const FieldTable& arguments)
-{
- while(1)
- {
- try
- {
- session.exchangeDeclare ( exchange,
- type,
- alternateExchange,
- passive,
- durable,
- autoDelete,
- arguments
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::exchangeDelete(const string& exchange,
- bool ifUnused)
-{
- while(1)
- {
- try
- {
- session.exchangeDelete ( exchange,
- ifUnused
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::ExchangeQueryResult
-FailoverSession::exchangeQuery(const string& name)
-{
- while(1)
- {
- try
- {
- return session.exchangeQuery ( name );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::exchangeBind(const string& queue,
- const string& exchange,
- const string& bindingKey,
- const FieldTable& arguments)
-{
- while(1)
- {
- try
- {
- session.exchangeBind ( queue,
- exchange,
- bindingKey,
- arguments
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::exchangeUnbind(const string& queue,
- const string& exchange,
- const string& bindingKey)
-{
- while(1)
- {
- try
- {
- session.exchangeUnbind ( queue,
- exchange,
- bindingKey
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::ExchangeBoundResult
-FailoverSession::exchangeBound(const string& exchange,
- const string& queue,
- const string& bindingKey,
- const FieldTable& arguments)
-{
- while(1)
- {
- try
- {
- return session.exchangeBound ( exchange,
- queue,
- bindingKey,
- arguments
- );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::queueDeclare(const string& queue,
- const string& alternateExchange,
- bool passive,
- bool durable,
- bool exclusive,
- bool autoDelete,
- const FieldTable& arguments)
-{
- while(1)
- {
- try
- {
- session.queueDeclare ( queue,
- alternateExchange,
- passive,
- durable,
- exclusive,
- autoDelete,
- arguments
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::queueDelete(const string& queue,
- bool ifUnused,
- bool ifEmpty)
-{
- while(1)
- {
- try
- {
- session.queueDelete ( queue,
- ifUnused,
- ifEmpty
- );
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-void
-FailoverSession::queuePurge(const string& queue)
-{
- while(1)
- {
- try
- {
- session.queuePurge ( queue) ;
- return;
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-
-qpid::framing::QueueQueryResult
-FailoverSession::queueQuery(const string& queue)
-{
- while(1)
- {
- try
- {
- return session.queueQuery ( queue );
- }
- catch ( const std::exception& error )
- {
- if ( ! failover_in_progress )
- throw ( error );
- else
- {
- sys::Monitor::ScopedLock l(lock);
- int current_failover_count = failover_count;
- while ( current_failover_count == failover_count )
- lock.wait();
- }
- }
- }
-}
-
-
-// end Wrapped functions from Session ---------------------------
-
-
-// Get ready for a failover.
-void
-FailoverSession::prepareForFailover ( Connection newConnection )
-{
- failover_in_progress = true;
- try
- {
- newSession = newConnection.newSession();
- }
- catch ( const std::exception& /*error*/ )
- {
- throw Exception(QPID_MSG("Can't create failover session."));
- }
-
- if ( failoverSubscriptionManager )
- {
- failoverSubscriptionManager->prepareForFailover ( newSession );
- }
-}
-
-
-void
-FailoverSession::failoverStarting ( )
-{
- sys::Monitor::ScopedLock l(lock);
- failover_in_progress = true;
-}
-
-
-void
-FailoverSession::failoverComplete ( )
-{
- sys::Monitor::ScopedLock l(lock);
- failover_in_progress = false;
- ++ failover_count;
- lock.notifyAll();
-}
-
-
-
-void
-FailoverSession::failover ( )
-{
- if ( failoverSubscriptionManager )
- {
- failoverSubscriptionManager->failover ( );
- }
- session = newSession;
-}
-
-
-void FailoverSession::setFailoverSubscriptionManager(FailoverSubscriptionManager* fsm) {
- failoverSubscriptionManager = fsm;
-}
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverSession.h b/cpp/src/qpid/client/FailoverSession.h
deleted file mode 100644
index 7a743da452..0000000000
--- a/cpp/src/qpid/client/FailoverSession.h
+++ /dev/null
@@ -1,319 +0,0 @@
-#ifndef QPID_CLIENT_FAILOVERSESSION_H
-#define QPID_CLIENT_FAILOVERSESSION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/client/Session.h"
-#include "qpid/SessionId.h"
-#include "qpid/framing/amqp_structs.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/TransferContent.h"
-#include "qpid/client/Completion.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/Execution.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/TypedResult.h"
-#include "qpid/shared_ptr.h"
-#include "qpid/sys/Monitor.h"
-
-#include <string>
-
-
-
-
-namespace qpid {
-namespace client {
-
-
-class FailoverConnection;
-class FailoverSubscriptionManager;
-
-
-class FailoverSession
-{
- public:
-
- typedef framing::TransferContent DefaultContent;
-
- FailoverSession ( );
- ~FailoverSession ( );
-
- framing::FrameSet::shared_ptr get();
-
- SessionId getId();
-
- void close();
-
- void sync();
-
- uint32_t timeout ( uint32_t seconds);
-
- Execution& getExecution();
-
- void flush();
-
- void markCompleted(const framing::SequenceNumber& id,
- bool cumulative,
- bool notifyPeer
- );
-
- void sendCompletion ( );
-
-
-
- // Wrapped functions from Session ----------------------------
-
- void
- executionSync();
-
-
- void
- executionResult(const SequenceNumber& commandId=SequenceNumber(),
- const string& value=string());
-
-
- void
- executionException(uint16_t errorCode=0,
- const SequenceNumber& commandId=SequenceNumber(),
- uint8_t classCode=0,
- uint8_t commandCode=0,
- uint8_t fieldIndex=0,
- const string& description=string(),
- const FieldTable& errorInfo=FieldTable());
-
-
- void
- messageTransfer(const string& destination=string(),
- uint8_t acceptMode=1,
- uint8_t acquireMode=0,
- const MethodContent& content=DefaultContent(std::string()));
-
-
- void
- messageAccept(const SequenceSet& transfers=SequenceSet());
-
-
- void
- messageReject(const SequenceSet& transfers=SequenceSet(),
- uint16_t code=0,
- const string& text=string());
-
-
- void
- messageRelease(const SequenceSet& transfers=SequenceSet(),
- bool setRedelivered=false);
-
-
- qpid::framing::MessageAcquireResult
- messageAcquire(const SequenceSet& transfers=SequenceSet());
-
-
- qpid::framing::MessageResumeResult
- messageResume(const string& destination=string(),
- const string& resumeId=string());
-
-
- void
- messageSubscribe(const string& queue=string(),
- const string& destination=string(),
- uint8_t acceptMode=0,
- uint8_t acquireMode=0,
- bool exclusive=false,
- const string& resumeId=string(),
- uint64_t resumeTtl=0,
- const FieldTable& arguments=FieldTable());
-
-
- void
- messageCancel(const string& destination=string());
-
-
- void
- messageSetFlowMode(const string& destination=string(),
- uint8_t flowMode=0);
-
-
- void
- messageFlow(const string& destination=string(),
- uint8_t unit=0,
- uint32_t value=0);
-
-
- void
- messageFlush(const string& destination=string());
-
-
- void
- messageStop(const string& destination=string());
-
-
- void
- txSelect();
-
-
- void
- txCommit();
-
-
- void
- txRollback();
-
-
- void
- dtxSelect();
-
-
- qpid::framing::XaResult
- dtxStart(const Xid& xid=Xid(),
- bool join=false,
- bool resume=false);
-
-
- qpid::framing::XaResult
- dtxEnd(const Xid& xid=Xid(),
- bool fail=false,
- bool suspend=false);
-
-
- qpid::framing::XaResult
- dtxCommit(const Xid& xid=Xid(),
- bool onePhase=false);
-
-
- void
- dtxForget(const Xid& xid=Xid());
-
-
- qpid::framing::DtxGetTimeoutResult
- dtxGetTimeout(const Xid& xid=Xid());
-
-
- qpid::framing::XaResult
- dtxPrepare(const Xid& xid=Xid());
-
-
- qpid::framing::DtxRecoverResult
- dtxRecover();
-
-
- qpid::framing::XaResult
- dtxRollback(const Xid& xid=Xid());
-
-
- void
- dtxSetTimeout(const Xid& xid=Xid(),
- uint32_t timeout=0);
-
-
- void
- exchangeDeclare(const string& exchange=string(),
- const string& type=string(),
- const string& alternateExchange=string(),
- bool passive=false,
- bool durable=false,
- bool autoDelete=false,
- const FieldTable& arguments=FieldTable());
-
-
- void
- exchangeDelete(const string& exchange=string(),
- bool ifUnused=false);
-
-
- qpid::framing::ExchangeQueryResult
- exchangeQuery(const string& name=string());
-
-
- void
- exchangeBind(const string& queue=string(),
- const string& exchange=string(),
- const string& bindingKey=string(),
- const FieldTable& arguments=FieldTable());
-
-
- void
- exchangeUnbind(const string& queue=string(),
- const string& exchange=string(),
- const string& bindingKey=string());
-
-
- qpid::framing::ExchangeBoundResult
- exchangeBound(const string& exchange=string(),
- const string& queue=string(),
- const string& bindingKey=string(),
- const FieldTable& arguments=FieldTable());
-
-
- void
- queueDeclare(const string& queue=string(),
- const string& alternateExchange=string(),
- bool passive=false,
- bool durable=false,
- bool exclusive=false,
- bool autoDelete=false,
- const FieldTable& arguments=FieldTable());
-
-
- void
- queueDelete(const string& queue=string(),
- bool ifUnused=false,
- bool ifEmpty=false);
-
-
- void
- queuePurge(const string& queue=string());
-
-
- qpid::framing::QueueQueryResult
- queueQuery(const string& queue=string());
-
- // end Wrapped functions from Session ---------------------------
-
- // Tells the FailoverSession to get ready for a failover.
- void failoverStarting();
- void prepareForFailover ( Connection newConnection );
- void failover ( );
- void failoverComplete();
-
- void setFailoverSubscriptionManager(FailoverSubscriptionManager*);
-
- private:
- sys::Monitor lock;
- bool failover_in_progress;
- int failover_count;
-
-
- FailoverSubscriptionManager * failoverSubscriptionManager;
-
- Session session;
- Session newSession;
-
- friend class FailoverConnection;
- friend class FailoverSubscriptionManager;
-};
-
-}} // namespace qpid::client
-
-
-#endif /*!QPID_CLIENT_FAILOVERSESSION_H*/
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
deleted file mode 100644
index 5fa4cb2800..0000000000
--- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/client/FailoverSession.h"
-#include "qpid/client/FailoverSubscriptionManager.h"
-
-
-
-using namespace std;
-
-
-namespace qpid {
-namespace client {
-
-
-
-FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) :
- newSessionIsValid(false),
- no_failover(false)
-{
- subscriptionManager = new SubscriptionManager(fs->session);
- fs->setFailoverSubscriptionManager(this);
-}
-
-
-
-void
-FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
-{
- sys::Monitor::ScopedLock l(lock);
- newSession = _newSession;
- newSessionIsValid = true;
-}
-
-
-
-void
-FailoverSubscriptionManager::failover ( )
-{
- sys::Monitor::ScopedLock l(lock);
- // Stop the subscription manager thread so it can notice
- // the failover in progress.
- subscriptionManager->stop();
- lock.notifyAll();
-}
-
-
-
-
-void
-FailoverSubscriptionManager::subscribe ( MessageListener & listener,
- const std::string & queue,
- const SubscriptionSettings & settings,
- const std::string & tag,
- bool record_this
-)
-{
- sys::Monitor::ScopedLock l(lock);
-
- subscriptionManager->subscribe ( listener,
- queue,
- settings,
- tag
- );
- if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, settings, tag, false ) );
-}
-
-
-
-void
-FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
- const std::string & queue,
- const SubscriptionSettings & settings,
- const std::string & tag,
- bool record_this
-)
-{
- sys::Monitor::ScopedLock l(lock);
-
- subscriptionManager->subscribe ( localQueue,
- queue,
- settings,
- tag
- );
-
- if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const SubscriptionSettings&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, settings, tag, false ) );
-}
-
-
-
-void
-FailoverSubscriptionManager::subscribe ( MessageListener & listener,
- const std::string & queue,
- const std::string & tag,
- bool record_this
-)
-{
- sys::Monitor::ScopedLock l(lock);
-
- subscriptionManager->subscribe ( listener,
- queue,
- tag
- );
-
- if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag, false ) );
-}
-
-
-
-
-void
-FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue,
- const std::string & queue,
- const std::string & tag,
- bool record_this
-)
-{
- sys::Monitor::ScopedLock l(lock);
-
- subscriptionManager->subscribe ( localQueue,
- queue,
- tag
- );
-
- if ( record_this )
- subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag, false ) );
-}
-
-
-
-bool
-FailoverSubscriptionManager::get ( Message & result,
- const std::string & queue,
- sys::Duration timeout
-)
-{
-
- return subscriptionManager->get ( result, queue, timeout );
-}
-
-
-
-void
-FailoverSubscriptionManager::cancel ( const std::string tag )
-{
-
- subscriptionManager->cancel ( tag );
-}
-
-
-
-void
-FailoverSubscriptionManager::run ( ) // User Thread
-{
- std::vector<subscribeFn> mySubscribeFns;
-
- while ( 1 )
- {
- subscriptionManager->run ( );
-
- // When we drop out of run, if there is a new Session
- // waiting for us, this is a failover. Otherwise, just
- // return control to usercode.
-
- {
- sys::Monitor::ScopedLock l(lock);
-
-
- while ( !newSessionIsValid && !no_failover )
- lock.wait();
-
-
- if ( newSessionIsValid )
- {
- newSessionIsValid = false;
- delete subscriptionManager;
- subscriptionManager = new SubscriptionManager(newSession);
- mySubscribeFns.swap ( subscribeFns );
- }
- else
- {
- // Not a failover, return to user code.
- break;
- }
- }
-
- for ( std::vector<subscribeFn>::iterator i = mySubscribeFns.begin();
- i != mySubscribeFns.end();
- ++ i
- )
- {
- (*i) ();
- }
-
- }
-}
-
-
-void
-FailoverSubscriptionManager::start ( )
-{
-
- subscriptionManager->start ( );
-}
-
-
-
-void
-FailoverSubscriptionManager::setAutoStop ( bool set )
-{
-
- subscriptionManager->setAutoStop ( set );
-}
-
-
-
-void
-FailoverSubscriptionManager::stop ( )
-{
- sys::Monitor::ScopedLock l(lock);
-
- no_failover = true;
- subscriptionManager->stop ( );
- lock.notifyAll();
-}
-
-}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.h b/cpp/src/qpid/client/FailoverSubscriptionManager.h
deleted file mode 100644
index 0556ba15ec..0000000000
--- a/cpp/src/qpid/client/FailoverSubscriptionManager.h
+++ /dev/null
@@ -1,116 +0,0 @@
-#ifndef QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H
-#define QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-#include "qpid/sys/Mutex.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Completion.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/FailoverSession.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/SubscriptionManager.h>
-#include <qpid/client/LocalQueue.h>
-#include <qpid/client/SubscriptionSettings.h>
-#include <qpid/sys/Runnable.h>
-#include <qpid/sys/Monitor.h>
-
-
-
-
-namespace qpid {
-namespace client {
-
-
-class FailoverSubscriptionManager
-{
- public:
-
- FailoverSubscriptionManager ( FailoverSession * fs );
-
- void subscribe ( MessageListener & listener,
- const std::string & queue,
- const SubscriptionSettings & ,
- const std::string & tag = std::string(),
- bool record_this = true );
-
- void subscribe ( LocalQueue & localQueue,
- const std::string & queue,
- const SubscriptionSettings & ,
- const std::string & tag=std::string(),
- bool record_this = true );
-
- void subscribe ( MessageListener & listener,
- const std::string & queue,
- const std::string & tag = std::string(),
- bool record_this = true );
-
- void subscribe ( LocalQueue & localQueue,
- const std::string & queue,
- const std::string & tag=std::string(),
- bool record_this = true );
-
- bool get ( Message & result,
- const std::string & queue,
- sys::Duration timeout=0);
-
- void cancel ( const std::string tag );
-
- void run ( );
-
- void start ( );
-
- void setAutoStop ( bool set = true );
-
- void stop ( );
-
- // Get ready for a failover.
- void prepareForFailover ( Session newSession );
- void failover ( );
-
-
- private:
- sys::Monitor lock;
-
- SubscriptionManager * subscriptionManager;
-
- MessageListener * savedListener;
- std::string savedQueue,
- savedTag;
-
- friend class FailoverConnection;
- friend class FailoverSession;
-
- Session newSession;
- bool newSessionIsValid;
- bool no_failover;
-
-
- typedef boost::function<void ()> subscribeFn;
- std::vector < subscribeFn > subscribeFns;
-};
-
-}} // namespace qpid::client
-
-
-#endif /*!QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H*/