summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-21 10:14:04 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-21 10:14:04 +0000
commitf505cdd862dbd80c2f44331c30f60fbb8432a226 (patch)
tree0ccca692f9d94867e162839168f1ab7af0e30167 /java/client
parent322aa81b9fa8a358529421fc30b5a7846c11eceb (diff)
downloadqpid-python-f505cdd862dbd80c2f44331c30f60fbb8432a226.tar.gz
QPID-3532: make the 0-10 client hold the failover mutex during the failover. Alter the Address resolution code to allow resolving addresses after failover. Add some more failover tests (inc ADDR based ones). Make the failover process notify any waiters in the session to abort and let failover proceed.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1187279 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
5 files changed, 58 insertions, 25 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index f15af72407..941534c7ff 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -175,6 +175,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
+ //used to track the last failover time for
+ //Address resolution purposes
+ private volatile long _lastFailoverTime = 0;
+
/**
* @param broker brokerdetails
* @param username username
@@ -1076,6 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public boolean firePreFailover(boolean redirect)
{
+ _lastFailoverTime = System.currentTimeMillis();
boolean proceed = true;
if (_connectionListener != null)
{
@@ -1462,4 +1467,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
+
+ public long getLastFailoverTime()
+ {
+ return _lastFailoverTime;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 0ed3db6ecb..0d48dd5822 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -281,24 +281,29 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
- try
+ _qpidConnection.notifyFailoverRequired();
+
+ synchronized (_conn.getFailoverMutex())
{
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- _conn.failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- return;
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
}
}
@@ -324,6 +329,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
+ if (_conn.isFailingOver())
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
try
{
return operation.execute();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index acd46da11a..f9a38138ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private boolean _browseOnly;
- private boolean _isAddressResolved;
+ private AtomicLong _addressResolved = new AtomicLong(0);
private AMQShortString _queueName;
@@ -77,7 +78,7 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -740,12 +741,12 @@ public abstract class AMQDestination implements Destination, Referenceable
public boolean isAddressResolved()
{
- return _isAddressResolved;
+ return _addressResolved.get() > 0;
}
- public void setAddressResolved(boolean addressResolved)
+ public void setAddressResolved(long addressResolved)
{
- _isAddressResolved = addressResolved;
+ _addressResolved.set(addressResolved);
}
private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination implements Destination, Referenceable
dest.setTargetNode(_targetNode);
dest.setSourceNode(_sourceNode);
dest.setLink(_link);
- dest.setAddressResolved(_isAddressResolved);
+ dest.setAddressResolved(_addressResolved.get());
return dest;
}
@@ -836,4 +837,9 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_isDurable = b;
}
+
+ public boolean isResolvedAfter(long time)
+ {
+ return _addressResolved.get() > time;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index c6a64ec894..2869e96a87 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1179,8 +1179,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved())
- {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ {
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
createSubscriptionQueue(dest);
@@ -1269,7 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(true);
+ dest.setAddressResolved(System.currentTimeMillis());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 57f64c2f92..16afa51c74 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
catch (Exception e)
{
- JMSException jmse = new JMSException("Exception when sending message");
+ JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;