summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-01-27 13:27:05 +0000
committerKeith Wall <kwall@apache.org>2015-01-27 13:27:05 +0000
commitd6ad905aa5da8b0c4be507c87cdae735d778154f (patch)
treee300b26b40a55fac54be611c9ef293c9a7992d41 /qpid/java/client
parente08c11b991309252a19a572024961550e5adaf11 (diff)
downloadqpid-python-d6ad905aa5da8b0c4be507c87cdae735d778154f.tar.gz
QPID-6336: [Java Broker] Change 0-8..0-91 path to await the receiver being closed before continuing with the next connection attempt
Based on work by Oleksandr Rudyy <orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1655034 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java80
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java2
2 files changed, 80 insertions, 2 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 66cade18a4..35582d92b7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -21,16 +21,20 @@
package org.apache.qpid.client;
import java.net.ConnectException;
+import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.XASession;
+import org.apache.qpid.transport.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +71,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
+ private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+ Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+ ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
private boolean _messageCompressionSupported;
private boolean _addrSyntaxSupported;
private boolean _confirmedPublishSupported;
@@ -136,7 +143,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
+ ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));
+
+ NetworkConnection network = transport.connect(settings, monitoringReceiver,
_conn.getProtocolHandler());
try
@@ -171,6 +180,19 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
network.close();
throw e;
}
+ finally
+ {
+ // await the receiver to finish its execution (and so the IO threads too)
+ if (!_conn.isConnected())
+ {
+ boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout);
+ if (!closedWithinTimeout)
+ {
+ _logger.warn("Timed-out waiting for receiver for connection to "
+ + brokerDetail + " to be closed.");
+ }
+ }
+ }
}
@@ -503,4 +525,60 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return _confirmedPublishNonTransactionalSupported;
}
+
+
+ private static class ReceiverClosedWaiter implements Receiver<ByteBuffer>
+ {
+ private final CountDownLatch _closedWatcher;
+ private final Receiver<ByteBuffer> _receiver;
+
+ public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver)
+ {
+ _receiver = receiver;
+ _closedWatcher = new CountDownLatch(1);
+ }
+
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ _receiver.received(msg);
+ }
+
+ @Override
+ public void exception(Throwable t)
+ {
+ _receiver.exception(t);
+ }
+
+ @Override
+ public void closed()
+ {
+ try
+ {
+ _receiver.closed();
+ }
+ finally
+ {
+ _closedWatcher.countDown();
+ }
+ }
+
+ public boolean awaitClose(long timeout)
+ {
+ try
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Waiting " + timeout + "ms for receiver to be closed");
+ }
+
+ return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ return _closedWatcher.getCount() == 0;
+ }
+ }
+ };
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c61469559a..c2582accdf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -240,7 +240,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
catch (Exception e)
{
- _logger.warn("Exception occured on closing the sender", e);
+ _logger.warn("Exception occurred on closing the sender", e);
}
if (_connection.failoverAllowed())
{