summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java2
5 files changed, 58 insertions, 25 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index f15af72407..941534c7ff 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -175,6 +175,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
+ //used to track the last failover time for
+ //Address resolution purposes
+ private volatile long _lastFailoverTime = 0;
+
/**
* @param broker brokerdetails
* @param username username
@@ -1076,6 +1080,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public boolean firePreFailover(boolean redirect)
{
+ _lastFailoverTime = System.currentTimeMillis();
boolean proceed = true;
if (_connectionListener != null)
{
@@ -1462,4 +1467,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
+
+ public long getLastFailoverTime()
+ {
+ return _lastFailoverTime;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 0ed3db6ecb..0d48dd5822 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -281,24 +281,29 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
- try
+ _qpidConnection.notifyFailoverRequired();
+
+ synchronized (_conn.getFailoverMutex())
{
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- _conn.failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- return;
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
}
}
@@ -324,6 +329,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
+ if (_conn.isFailingOver())
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
try
{
return operation.execute();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index acd46da11a..f9a38138ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private boolean _browseOnly;
- private boolean _isAddressResolved;
+ private AtomicLong _addressResolved = new AtomicLong(0);
private AMQShortString _queueName;
@@ -77,7 +78,7 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -740,12 +741,12 @@ public abstract class AMQDestination implements Destination, Referenceable
public boolean isAddressResolved()
{
- return _isAddressResolved;
+ return _addressResolved.get() > 0;
}
- public void setAddressResolved(boolean addressResolved)
+ public void setAddressResolved(long addressResolved)
{
- _isAddressResolved = addressResolved;
+ _addressResolved.set(addressResolved);
}
private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination implements Destination, Referenceable
dest.setTargetNode(_targetNode);
dest.setSourceNode(_sourceNode);
dest.setLink(_link);
- dest.setAddressResolved(_isAddressResolved);
+ dest.setAddressResolved(_addressResolved.get());
return dest;
}
@@ -836,4 +837,9 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_isDurable = b;
}
+
+ public boolean isResolvedAfter(long time)
+ {
+ return _addressResolved.get() > time;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index c6a64ec894..2869e96a87 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1179,8 +1179,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved())
- {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ {
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
createSubscriptionQueue(dest);
@@ -1269,7 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(true);
+ dest.setAddressResolved(System.currentTimeMillis());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 57f64c2f92..16afa51c74 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
catch (Exception e)
{
- JMSException jmse = new JMSException("Exception when sending message");
+ JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;