summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-09-10 12:39:44 +0000
committerRobert Gemmell <robbie@apache.org>2012-09-10 12:39:44 +0000
commit35e75488ec3f1b0048f2948701bcfdc8106d760e (patch)
treee376eeef90e639be8dc16911d450693c928ae994 /qpid/java/client
parente72bbb1b75c526dd5b07f606357cd03c6541da7a (diff)
downloadqpid-python-35e75488ec3f1b0048f2948701bcfdc8106d760e.tar.gz
QPID-4289: Fix 0-8/0-9/0-9-1 failover issues
Applied patch from Philip Harvey <phil@philharveyonline.com> and Oleksandr Rudyy <orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1382799 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java108
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java7
5 files changed, 91 insertions, 49 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index d80858a7a1..d1c1554705 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1080,7 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _started;
}
- protected final boolean isConnected()
+ public final boolean isConnected()
{
return _connected;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index e1bf007e83..fb18702de6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -90,12 +90,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Connecting to broker:" + brokerDetail);
+ }
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
-
- StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
-
ConnectionSettings settings = brokerDetail.buildConnectionSettings();
settings.setProtocol(brokerDetail.getTransport());
@@ -126,6 +127,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext);
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
+
+ StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
_conn.getProtocolHandler().getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 2cf7b089eb..2e7410f906 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.handler;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Sender;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
{
@@ -91,18 +94,15 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
}
finally
{
+ Sender<ByteBuffer> sender = session.getSender();
if (error != null)
{
session.notifyError(error);
- }
-
- // Close the protocol Session, including any open TCP connections
- session.closeProtocolSession();
+ }
- // Closing the session should not introduce a race condition as this thread will continue to propgate any
- // exception in to the exceptionCaught method of the SessionHandler.
- // Any sessionClosed event should occur after this.
+ // Close the open TCP connection
+ sender.close();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index b314453e31..be3d5fc540 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -67,6 +67,7 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
@@ -210,48 +211,67 @@ public class AMQProtocolHandler implements ProtocolEngine
}
else
{
- _logger.debug("Session closed called with failover state currently " + _failoverState);
-
- // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
- // known through the policy settings.
-
- if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
- {
- _logger.debug("FAILOVER STARTING");
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- _failoverState = FailoverState.IN_PROGRESS;
- startFailoverThread();
- }
- else
- {
- _logger.debug("Not starting failover as state currently " + _failoverState);
- }
- }
- else
+ // Use local variable to keep flag whether fail-over allowed or not,
+ // in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
+ // otherwise it might deadlock with failover mutex
+ boolean failoverNotAllowed = false;
+ synchronized (this)
{
- _logger.debug("Failover not allowed by policy."); // or already in progress?
-
if (_logger.isDebugEnabled())
{
- _logger.debug(_connection.getFailoverPolicy().toString());
+ _logger.debug("Session closed called with failover state " + _failoverState);
}
- if (_failoverState != FailoverState.IN_PROGRESS)
+ // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+ if (_failoverState == FailoverState.NOT_STARTED)
{
- _logger.debug("sessionClose() not allowed to failover");
- _connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.",
- _stateManager.getLastException()));
+ // close the sender
+ try
+ {
+ _sender.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occured on closing the sender", e);
+ }
+ if (_connection.failoverAllowed())
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+
+ _logger.debug("FAILOVER STARTING");
+ startFailoverThread();
+ }
+ else if (_connection.isConnected())
+ {
+ failoverNotAllowed = true;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy());
+ }
+ }
+ else
+ {
+ _logger.debug("We are in process of establishing the initial connection");
+ }
}
else
{
- _logger.debug("sessionClose() failover in progress");
+ _logger.debug("Not starting the failover thread as state currently " + _failoverState);
}
}
+
+ if (failoverNotAllowed)
+ {
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
+ }
}
- _logger.debug("Protocol Session [" + this + "] closed");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Protocol Session [" + this + "] closed");
+ }
}
/** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -297,14 +317,17 @@ public class AMQProtocolHandler implements ProtocolEngine
*/
public void exception(Throwable cause)
{
- if (_failoverState == FailoverState.NOT_STARTED)
+ boolean connectionClosed = (cause instanceof AMQConnectionClosedException || cause instanceof IOException);
+ if (connectionClosed)
{
- if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
+ _network.close();
+ }
+ FailoverState state = getFailoverState();
+ if (state == FailoverState.NOT_STARTED)
+ {
+ if (connectionClosed)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attempt failover
- _network.close();
- closed();
}
else
{
@@ -319,7 +342,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
// we reach this point if failover was attempted and failed therefore we need to let the calling app
// know since we cannot recover the situation
- else if (_failoverState == FailoverState.FAILED)
+ else if (state == FailoverState.FAILED)
{
_logger.error("Exception caught by protocol handler: " + cause, cause);
@@ -329,6 +352,10 @@ public class AMQProtocolHandler implements ProtocolEngine
propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
+ else
+ {
+ _logger.warn("Exception caught by protocol handler: " + cause, cause);
+ }
}
/**
@@ -792,14 +819,14 @@ public class AMQProtocolHandler implements ProtocolEngine
return _protocolSession;
}
- FailoverState getFailoverState()
+ synchronized FailoverState getFailoverState()
{
return _failoverState;
}
- public void setFailoverState(FailoverState failoverState)
+ public synchronized void setFailoverState(FailoverState failoverState)
{
- _failoverState = failoverState;
+ _failoverState= failoverState;
}
public byte getProtocolMajorVersion()
@@ -843,6 +870,11 @@ public class AMQProtocolHandler implements ProtocolEngine
_sender = sender;
}
+ protected Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
+
/** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index af57fd98fc..cf521c8892 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -48,6 +48,8 @@ import org.apache.qpid.transport.TransportException;
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
+
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -372,6 +374,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
+ public Sender<ByteBuffer> getSender()
+ {
+ return _protocolHandler.getSender();
+ }
+
public void failover(String host, int port)
{
_protocolHandler.failover(host, port);