diff options
Diffstat (limited to 'java/client/src')
4 files changed, 85 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 9abc94b3df..cbb98a2dd5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1286,4 +1286,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.get(channelId); } + + public boolean isFailingOver() + { + return (_protocolHandler.getFailoverLatch() != null); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fd795392ee..a052b48426 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -420,7 +420,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @throws IllegalStateException If the session is closed. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws JMSException { if (isClosed()) { @@ -2510,6 +2510,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + consumer.failedOver(); registerConsumer(consumer, true); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 773401d03a..44c10afcf5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -277,8 +275,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session.setInRecovery(false); } - private void acquireReceiving() throws JMSException + /** + * @param immediate if true then return immediately if the connection is failing over + * + * @return boolean if the acquisition was successful + * + * @throws JMSException + * @throws InterruptedException + */ + private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { + if (_connection.isFailingOver()) + { + if (immediate) + { + return false; + } + else + { + _connection.blockUntilNotFailingOver(); + } + } + if (!_receiving.compareAndSet(false, true)) { throw new javax.jms.IllegalStateException("Another thread is already receiving."); @@ -290,6 +308,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } _receivingThread = Thread.currentThread(); + return true; } private void releaseReceiving() @@ -343,7 +362,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer checkPreConditions(); - acquireReceiving(); + try + { + acquireReceiving(false); + } + catch (InterruptedException e) + { + _logger.warn("Interrupted: " + e); + if (isClosed()) + { + return null; + } + } _session.startDistpatcherIfNecessary(); @@ -424,7 +454,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - acquireReceiving(); + try + { + if (!acquireReceiving(true)) + { + //If we couldn't acquire the receiving thread then return null. + // This will occur if failing over. + return null; + } + } + catch (InterruptedException e) + { + /* + * This seems slightly shoddy but should never actually be executed + * since we told acquireReceiving to return immediately and it shouldn't + * block on anything. + */ + + return null; + } _session.startDistpatcherIfNecessary(); @@ -868,11 +916,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() // throws JMSException + public void acknowledge() throws JMSException { - if (!isClosed()) + if (isClosed()) + { + throw new IllegalStateException("Consumer is closed"); + } + else if (_session.hasFailedOver()) + { + throw new JMSException("has failed over"); + } + else { - Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); while (tags.hasNext()) { @@ -880,10 +935,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer tags.remove(); } } - else - { - throw new IllegalStateException("Consumer is closed"); - } } /** Called on recovery to reset the list of delivery tags */ @@ -1022,4 +1073,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _synchronousQueue.clear(); } + + /** to be called when a failover has occured */ + public void failedOver() + { + clearReceiveQueue(); + clearUnackedMessages(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 040b5f7b68..49d528a4f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -99,8 +98,8 @@ public class TransportConnection if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio")) { _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") - ? "Qpid NIO is new default" - : "Sysproperty 'qpidnio' is set")); + ? "Qpid NIO is new default" + : "Sysproperty 'qpidnio' is set")); result = new MultiThreadSocketConnector(); } else @@ -277,8 +276,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); - amqbce.initCause(e); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); throw amqbce; } @@ -291,14 +289,11 @@ public class TransportConnection _acceptor.unbindAll(); synchronized (_inVmPipeAddress) { - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) - { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); - } - } + _inVmPipeAddress.clear(); + } + _acceptor = null; + _currentInstance = -1; + _currentVMPort = -1; } public static void killVMBroker(int port) |
