diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-09-10 12:39:44 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-09-10 12:39:44 +0000 |
| commit | 35e75488ec3f1b0048f2948701bcfdc8106d760e (patch) | |
| tree | e376eeef90e639be8dc16911d450693c928ae994 /qpid/java/client | |
| parent | e72bbb1b75c526dd5b07f606357cd03c6541da7a (diff) | |
| download | qpid-python-35e75488ec3f1b0048f2948701bcfdc8106d760e.tar.gz | |
QPID-4289: Fix 0-8/0-9/0-9-1 failover issues
Applied patch from Philip Harvey <phil@philharveyonline.com> and Oleksandr Rudyy <orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1382799 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
5 files changed, 91 insertions, 49 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d80858a7a1..d1c1554705 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1080,7 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _started; } - protected final boolean isConnected() + public final boolean isConnected() { return _connected; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index e1bf007e83..fb18702de6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -90,12 +90,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { + if (_logger.isDebugEnabled()) + { + _logger.debug("Connecting to broker:" + brokerDetail); + } final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); - - StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); - ConnectionSettings settings = brokerDetail.buildConnectionSettings(); settings.setProtocol(brokerDetail.getTransport()); @@ -126,6 +127,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext); _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); + + StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); _conn.getProtocolHandler().getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 2cf7b089eb..2e7410f906 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.handler; +import java.nio.ByteBuffer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Sender; public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> { @@ -91,18 +94,15 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co } finally { + Sender<ByteBuffer> sender = session.getSender(); if (error != null) { session.notifyError(error); - } - - // Close the protocol Session, including any open TCP connections - session.closeProtocolSession(); + } - // Closing the session should not introduce a race condition as this thread will continue to propgate any - // exception in to the exceptionCaught method of the SessionHandler. - // Any sessionClosed event should occur after this. + // Close the open TCP connection + sender.close(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index b314453e31..be3d5fc540 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -67,6 +67,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the @@ -210,48 +211,67 @@ public class AMQProtocolHandler implements ProtocolEngine } else { - _logger.debug("Session closed called with failover state currently " + _failoverState); - - // reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) - { - _logger.debug("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.debug("Not starting failover as state currently " + _failoverState); - } - } - else + // Use local variable to keep flag whether fail-over allowed or not, + // in order to execute AMQConnection#exceptionRecievedout out of synchronization block, + // otherwise it might deadlock with failover mutex + boolean failoverNotAllowed = false; + synchronized (this) { - _logger.debug("Failover not allowed by policy."); // or already in progress? - if (_logger.isDebugEnabled()) { - _logger.debug(_connection.getFailoverPolicy().toString()); + _logger.debug("Session closed called with failover state " + _failoverState); } - if (_failoverState != FailoverState.IN_PROGRESS) + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + if (_failoverState == FailoverState.NOT_STARTED) { - _logger.debug("sessionClose() not allowed to failover"); - _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", - _stateManager.getLastException())); + // close the sender + try + { + _sender.close(); + } + catch (Exception e) + { + _logger.warn("Exception occured on closing the sender", e); + } + if (_connection.failoverAllowed()) + { + _failoverState = FailoverState.IN_PROGRESS; + + _logger.debug("FAILOVER STARTING"); + startFailoverThread(); + } + else if (_connection.isConnected()) + { + failoverNotAllowed = true; + if (_logger.isDebugEnabled()) + { + _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy()); + } + } + else + { + _logger.debug("We are in process of establishing the initial connection"); + } } else { - _logger.debug("sessionClose() failover in progress"); + _logger.debug("Not starting the failover thread as state currently " + _failoverState); } } + + if (failoverNotAllowed) + { + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection not permitted.", _stateManager.getLastException())); + } } - _logger.debug("Protocol Session [" + this + "] closed"); + if (_logger.isDebugEnabled()) + { + _logger.debug("Protocol Session [" + this + "] closed"); + } } /** See {@link FailoverHandler} to see rationale for separate thread. */ @@ -297,14 +317,17 @@ public class AMQProtocolHandler implements ProtocolEngine */ public void exception(Throwable cause) { - if (_failoverState == FailoverState.NOT_STARTED) + boolean connectionClosed = (cause instanceof AMQConnectionClosedException || cause instanceof IOException); + if (connectionClosed) { - if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) + _network.close(); + } + FailoverState state = getFailoverState(); + if (state == FailoverState.NOT_STARTED) + { + if (connectionClosed) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attempt failover - _network.close(); - closed(); } else { @@ -319,7 +342,7 @@ public class AMQProtocolHandler implements ProtocolEngine } // we reach this point if failover was attempted and failed therefore we need to let the calling app // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) + else if (state == FailoverState.FAILED) { _logger.error("Exception caught by protocol handler: " + cause, cause); @@ -329,6 +352,10 @@ public class AMQProtocolHandler implements ProtocolEngine propagateExceptionToAllWaiters(amqe); _connection.exceptionReceived(cause); } + else + { + _logger.warn("Exception caught by protocol handler: " + cause, cause); + } } /** @@ -792,14 +819,14 @@ public class AMQProtocolHandler implements ProtocolEngine return _protocolSession; } - FailoverState getFailoverState() + synchronized FailoverState getFailoverState() { return _failoverState; } - public void setFailoverState(FailoverState failoverState) + public synchronized void setFailoverState(FailoverState failoverState) { - _failoverState = failoverState; + _failoverState= failoverState; } public byte getProtocolMajorVersion() @@ -843,6 +870,11 @@ public class AMQProtocolHandler implements ProtocolEngine _sender = sender; } + protected Sender<ByteBuffer> getSender() + { + return _sender; + } + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index af57fd98fc..cf521c8892 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -48,6 +48,8 @@ import org.apache.qpid.transport.TransportException; import javax.jms.JMSException; import javax.security.sasl.SaslClient; + +import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -372,6 +374,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } + public Sender<ByteBuffer> getSender() + { + return _protocolHandler.getSender(); + } + public void failover(String host, int port) { _protocolHandler.failover(host, port); |
