diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-17 16:45:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-17 16:45:24 +0000 |
| commit | 7db0c0970eac260626263314c30f0e20d4ef6c21 (patch) | |
| tree | 231024436b5b7185f63972d90318acce97816c22 /cpp/src/qpid | |
| parent | a039e57108ed06586e73a255dc824ed27fc6de2a (diff) | |
| download | qpid-python-7db0c0970eac260626263314c30f0e20d4ef6c21.tar.gz | |
QPID-1367 Mick Goulish: improvements to client-side failover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverConnection.cpp | 24 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverConnection.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverSession.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverSession.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverSubscriptionManager.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverSubscriptionManager.h | 3 |
8 files changed, 48 insertions, 25 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 652d59f448..92cf756580 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -151,8 +151,11 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { static const std::string CONN_CLOSED("Connection closed by broker"); void ConnectionImpl::shutdown() { - Mutex::ScopedLock l(lock); + + if ( failureCallback ) + failureCallback(); + if (handler.isClosed()) return; // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have @@ -161,9 +164,6 @@ void ConnectionImpl::shutdown() { closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); handler.fail(CONN_CLOSED); - - if ( failureCallback ) - failureCallback(); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index af805a3808..c26dba188d 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -42,6 +42,7 @@ Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a) void Subscriber::received(Message& msg) { + if (listener) { listener->received(msg); autoAck.ack(msg, session); diff --git a/cpp/src/qpid/client/FailoverConnection.cpp b/cpp/src/qpid/client/FailoverConnection.cpp index e98de868de..33b06a6a1a 100644 --- a/cpp/src/qpid/client/FailoverConnection.cpp +++ b/cpp/src/qpid/client/FailoverConnection.cpp @@ -36,7 +36,6 @@ namespace client { FailoverConnection::FailoverConnection ( ) : - name(), failoverCompleteTime(0) { connection.registerFailureCallback @@ -59,7 +58,6 @@ FailoverConnection::open ( const std::string& host, settings.host = host; settings.port = port; settings.username = uid; - settings.username = uid; settings.password = pwd; settings.virtualhost = virtualhost; settings.maxFrameSize = maxFrameSize; @@ -124,9 +122,19 @@ FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ ) void FailoverConnection::failover ( ) { + std::vector<FailoverSession *>::iterator sessions_iterator; + + for ( sessions_iterator = sessions.begin(); + sessions_iterator != sessions.end(); + ++ sessions_iterator ) + { + FailoverSession * fs = * sessions_iterator; + fs->failover_in_progress = true; + } + std::vector<Url> knownBrokers = connection.getKnownBrokers(); if (knownBrokers.empty()) - throw Exception(QPID_MSG("FailoverConnection::failover " << name << " no known brokers.")); + throw Exception(QPID_MSG("FailoverConnection::failover no known brokers.")); Connection newConnection; for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) { @@ -148,7 +156,6 @@ FailoverConnection::failover ( ) */ // FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent newSession - std::vector<FailoverSession *>::iterator sessions_iterator; for ( sessions_iterator = sessions.begin(); sessions_iterator < sessions.end(); ++ sessions_iterator @@ -173,6 +180,15 @@ FailoverConnection::failover ( ) FailoverSession * fs = * sessions_iterator; fs->failover ( ); } + + for ( sessions_iterator = sessions.begin(); + sessions_iterator < sessions.end(); + ++ sessions_iterator + ) + { + FailoverSession * fs = * sessions_iterator; + fs->failover_in_progress = false; + } } diff --git a/cpp/src/qpid/client/FailoverConnection.h b/cpp/src/qpid/client/FailoverConnection.h index 09e9c8bfa4..a84f0c2189 100644 --- a/cpp/src/qpid/client/FailoverConnection.h +++ b/cpp/src/qpid/client/FailoverConnection.h @@ -71,10 +71,6 @@ class FailoverConnection void registerFailureCallback ( boost::function<void ()> fn ); - // If you have more than 1 connection and you want to give them - // separate names for debugging... - std::string name; - void failover ( ); struct timeval * failoverCompleteTime; diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp index a088a8c91b..25867c2a24 100644 --- a/cpp/src/qpid/client/FailoverSession.cpp +++ b/cpp/src/qpid/client/FailoverSession.cpp @@ -38,7 +38,7 @@ namespace qpid { namespace client { FailoverSession::FailoverSession ( ) : - name("no_name") + failover_in_progress(false) { // The session is created by FailoverConnection::newSession failoverSubscriptionManager = 0; @@ -170,11 +170,26 @@ FailoverSession::messageTransfer ( const string& destination, ) { - session.messageTransfer ( destination, - acceptMode, - acquireMode, - content - ); + while ( 1 ) + { + try + { + session.messageTransfer ( destination, + acceptMode, + acquireMode, + content + ); + break; + } + catch ( ... ) + { + // Take special action only if there is a failover in progress. + if ( ! failover_in_progress ) + break; + + usleep ( 1000 ); + } + } } @@ -583,7 +598,6 @@ FailoverSession::prepareForFailover ( Connection newConnection ) if ( failoverSubscriptionManager ) { - // failoverSubscriptionManager->prepareForFailover ( newSession ); } } diff --git a/cpp/src/qpid/client/FailoverSession.h b/cpp/src/qpid/client/FailoverSession.h index 7ff26f8079..b301353968 100644 --- a/cpp/src/qpid/client/FailoverSession.h +++ b/cpp/src/qpid/client/FailoverSession.h @@ -59,8 +59,6 @@ class FailoverSession FailoverSession ( ); ~FailoverSession ( ); - std::string name; - framing::FrameSet::shared_ptr get(); SessionId getId() const; @@ -82,6 +80,8 @@ class FailoverSession void sendCompletion ( ); + bool failover_in_progress; + // Wrapped functions from Session ---------------------------- diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp index c1ef7d00c4..d12d976ef5 100644 --- a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -33,7 +33,6 @@ namespace client { FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : - name("no_name"), newSessionIsValid(false), no_failover(false) { diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.h b/cpp/src/qpid/client/FailoverSubscriptionManager.h index 651e2549c2..b7631d3a98 100644 --- a/cpp/src/qpid/client/FailoverSubscriptionManager.h +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.h @@ -114,9 +114,6 @@ class FailoverSubscriptionManager void prepareForFailover ( Session newSession ); void failover ( ); - std::string name; - - private: sys::Monitor lock; |
