diff options
| author | Keith Wall <kwall@apache.org> | 2015-01-27 13:27:05 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-01-27 13:27:05 +0000 |
| commit | d6ad905aa5da8b0c4be507c87cdae735d778154f (patch) | |
| tree | e300b26b40a55fac54be611c9ef293c9a7992d41 /qpid/java | |
| parent | e08c11b991309252a19a572024961550e5adaf11 (diff) | |
| download | qpid-python-d6ad905aa5da8b0c4be507c87cdae735d778154f.tar.gz | |
QPID-6336: [Java Broker] Change 0-8..0-91 path to await the receiver being closed before continuing with the next connection attempt
Based on work by Oleksandr Rudyy <orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1655034 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 116 insertions, 23 deletions
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java index 9455d788e4..97d380b55f 100644 --- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java @@ -73,9 +73,9 @@ public class GroupCreator private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - private static final int FAILOVER_CYCLECOUNT = 20; + private static final int FAILOVER_CYCLECOUNT = 40; private static final int FAILOVER_RETRIES = 0; - private static final int FAILOVER_CONNECTDELAY = 500; + private static final int FAILOVER_CONNECTDELAY = 250; private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 66cade18a4..35582d92b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -21,16 +21,20 @@ package org.apache.qpid.client; import java.net.ConnectException; +import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.XASession; +import org.apache.qpid.transport.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +71,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; + private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, + Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, + ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); private boolean _messageCompressionSupported; private boolean _addrSyntaxSupported; private boolean _confirmedPublishSupported; @@ -136,7 +143,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler())); + + NetworkConnection network = transport.connect(settings, monitoringReceiver, _conn.getProtocolHandler()); try @@ -171,6 +180,19 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate network.close(); throw e; } + finally + { + // await the receiver to finish its execution (and so the IO threads too) + if (!_conn.isConnected()) + { + boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout); + if (!closedWithinTimeout) + { + _logger.warn("Timed-out waiting for receiver for connection to " + + brokerDetail + " to be closed."); + } + } + } } @@ -503,4 +525,60 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _confirmedPublishNonTransactionalSupported; } + + + private static class ReceiverClosedWaiter implements Receiver<ByteBuffer> + { + private final CountDownLatch _closedWatcher; + private final Receiver<ByteBuffer> _receiver; + + public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver) + { + _receiver = receiver; + _closedWatcher = new CountDownLatch(1); + } + + @Override + public void received(ByteBuffer msg) + { + _receiver.received(msg); + } + + @Override + public void exception(Throwable t) + { + _receiver.exception(t); + } + + @Override + public void closed() + { + try + { + _receiver.closed(); + } + finally + { + _closedWatcher.countDown(); + } + } + + public boolean awaitClose(long timeout) + { + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Waiting " + timeout + "ms for receiver to be closed"); + } + + return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return _closedWatcher.getCount() == 0; + } + } + }; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c61469559a..c2582accdf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -240,7 +240,7 @@ public class AMQProtocolHandler implements ProtocolEngine } catch (Exception e) { - _logger.warn("Exception occured on closing the sender", e); + _logger.warn("Exception occurred on closing the sender", e); } if (_connection.failoverAllowed()) { diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java index 15ec0f9a4d..c59b0ecf34 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java @@ -38,7 +38,6 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestUtils; import org.apache.qpid.util.FileUtils; public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener @@ -48,9 +47,10 @@ public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements C private static final String FAILOVER_VIRTUAL_HOST = "failover"; private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - private static final int FAILOVER_RETRIES = 1; - private static final int FAILOVER_CONNECTDELAY = 1000; - private static final int FAILOVER_FACTOR = 4; + private static final int FAILOVER_RETRIES = 0; + private static final int FAILOVER_CONNECTDELAY = 0; + private static final int FAILOVER_AWAIT_TIME = 10000; + private int[] _brokerPorts; private AMQConnectionURL _connectionURL; @@ -169,7 +169,7 @@ public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements C killBroker(_brokerPorts[1]); - awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * FAILOVER_FACTOR); + awaitForFailoverCompletion(FAILOVER_AWAIT_TIME); assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount()); assertSendReceive(2); @@ -185,7 +185,7 @@ public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements C stopBroker(_brokerPorts[1]); - awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * FAILOVER_FACTOR); + awaitForFailoverCompletion(FAILOVER_AWAIT_TIME); assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount()); assertSendReceive(1); @@ -214,20 +214,12 @@ public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements C } } - private void awaitForFailoverCompletion(long delay) + private void awaitForFailoverCompletion(long delay) throws Exception { _logger.info("Awaiting Failover completion.."); - try - { - if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS)) - { - _logger.warn("Test thread stack:\n\n" + TestUtils.dumpThreads()); - fail("Failover did not complete"); - } - } - catch (InterruptedException e) + if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS)) { - fail("Test was interrupted:" + e.getMessage()); + fail("Failover did not complete within " + delay + "ms."); } } @@ -239,7 +231,7 @@ public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements C receivedMessage instanceof TextMessage); } - private void init(int acknowledgeMode, boolean startConnection) throws JMSException + private void init(int acknowledgeMode, boolean startConnection) throws Exception { boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java index 1dba5ced9d..7c82ea8e55 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java @@ -94,6 +94,29 @@ public class SSLTest extends QpidBrokerTestCase } } + public void testSSLConnectionToPlainPortRejected() throws Exception + { + if (shouldPerformTest()) + { + super.setUp(); + + String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" + + "?ssl='true''"; + + url = String.format(url,QpidBrokerTestCase.DEFAULT_PORT); + + try + { + getConnection(new AMQConnectionURL(url)); + fail("Exception not thrown"); + } + catch (JMSException e) + { + assertTrue("Unexpected exception message", e.getMessage().contains("Unrecognized SSL message, plaintext connection?")); + } + } + } + public void testHostVerificationIsOnByDefault() throws Exception { if (shouldPerformTest()) @@ -116,6 +139,7 @@ public class SSLTest extends QpidBrokerTestCase try { getConnection(new AMQConnectionURL(url)); + fail("Exception not thrown"); } catch(JMSException e) { diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index ed03e83292..191f9d72cf 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -81,7 +81,6 @@ public class ConnectionTest extends QpidBrokerTestCase + "&temporaryQueueExchange='tmp.direct'" + "&temporaryTopicExchange='tmp.topic'"); - System.err.println(url.toString()); conn = new AMQConnection(url); |
