From 0314cbe225dce796e09ae9abbd450323808fe493 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 18 Aug 2009 16:16:10 +0000 Subject: QPID-2024: Add ProtocolEngine and NetworkDriver interfaces and a NetworkDriver implementation that uses MINA. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@805477 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/transport/QpidThreadExecutor.java | 22 ---------------------- .../qpid/client/transport/TransportConnection.java | 1 + 2 files changed, 1 insertion(+), 22 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java deleted file mode 100644 index 3de410fb69..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.apache.qpid.client.transport; - -import org.apache.qpid.thread.Threading; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; - -public class QpidThreadExecutor implements Executor -{ - @Override - public void execute(Runnable command) - { - try - { - Threading.getThreadFactory().createThread(command).start(); - } - catch(Exception e) - { - throw new RuntimeException("Error creating a thread using Qpid thread factory",e); - } - } - -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 0bacda04ff..32cc8c4cb5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -31,6 +31,7 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.thread.QpidThreadExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -- cgit v1.2.1 From a7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 1 Sep 2009 16:27:52 +0000 Subject: QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@810110 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/protocol/AMQProtocolHandler.java | 2 +- .../apache/qpid/client/transport/TransportConnection.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2389c9e2da..e3a1a82dc4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -191,7 +191,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Protocol session created for session " + System.identityHashCode(session)); _failoverHandler = new FailoverHandler(this, session); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession)); if (Boolean.getBoolean("amqj.shared_read_write_pool")) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 32cc8c4cb5..4ff24c3607 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -31,7 +31,10 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +65,7 @@ public class TransportConnection private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); - private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; private static Map _openSocketRegister = new ConcurrentHashMap(); @@ -190,8 +193,6 @@ public class TransportConnection _acceptor = new VmPipeAcceptor(); IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(ReadWriteThreadModel.getInstance()); } synchronized (_inVmPipeAddress) { @@ -276,7 +277,10 @@ public class TransportConnection { Class[] cnstr = {Integer.class}; Object[] params = {port}; - provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + + provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } -- cgit v1.2.1 From c1ebe66bfab328c5192a35c21ea290b5c45f40f5 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 9 Sep 2009 13:05:43 +0000 Subject: Merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@812936 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/AMQAuthenticationException.java | 5 -- .../java/org/apache/qpid/client/AMQSession.java | 51 +++++++----- .../org/apache/qpid/client/AMQSession_0_8.java | 24 ++++-- .../client/handler/ChannelCloseMethodHandler.java | 91 ++++++++++++---------- .../handler/ConnectionOpenOkMethodHandler.java | 1 - .../handler/ConnectionTuneMethodHandler.java | 1 - .../protocol/AMQIoTransportProtocolSession.java | 2 +- .../qpid/client/protocol/AMQProtocolHandler.java | 54 ++++++------- .../qpid/client/protocol/AMQProtocolSession.java | 4 +- .../apache/qpid/client/state/AMQStateManager.java | 19 ++++- .../client/util/FlowControllingBlockingQueue.java | 18 ++++- .../org/apache/qpid/client/MockAMQConnection.java | 2 +- .../client/protocol/AMQProtocolHandlerTest.java | 11 +-- 13 files changed, 159 insertions(+), 124 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 05ac3dca9e..6bae0166d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -39,9 +39,4 @@ public class AMQAuthenticationException extends AMQException { super(error, msg, cause); } - public boolean isHardError() - { - return true; - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 118be75705..2e3e417c95 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -65,6 +65,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -205,9 +206,9 @@ public abstract class AMQSession @@ -86,7 +87,7 @@ public class AMQStateManager implements AMQMethodListener return _currentState; } - public void changeState(AMQState newState) throws AMQException + public void changeState(AMQState newState) { _logger.debug("State changing to " + newState + " from old state " + _currentState); @@ -136,6 +137,22 @@ public class AMQStateManager implements AMQMethodListener */ public void error(Exception error) { + if (error instanceof AMQException) + { + // AMQException should be being notified before closing the + // ProtocolSession. Which will change the State to CLOSED. + // if we have a hard error. + if (((AMQException)error).isHardError()) + { + changeState(AMQState.CONNECTION_CLOSING); + } + } + else + { + // Be on the safe side here and mark the connection closed + changeState(AMQState.CONNECTION_CLOSED); + } + if (_waiters.size() == 0) { _logger.error("No Waiters for error saving as last error:" + error.getMessage()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index bddbc329ab..ee7fc533a3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -22,10 +22,11 @@ package org.apache.qpid.client.util; import java.util.Iterator; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class FlowControllingBlockingQueue { + private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final Queue _queue = new ConcurrentLinkedQueue(); @@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + + private boolean disableFlowControl; public boolean isEmpty() { @@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue _flowControlHighThreshold = highThreshold; _flowControlLowThreshold = lowThreshold; _listener = listener; + if (highThreshold == 0) + { + disableFlowControl = true; + } } public Object take() throws InterruptedException @@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue } } } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { @@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue _listener.underThreshold(_count); } } + } return o; @@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue notifyAll(); } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index ce79080e97..da44822ec3 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -85,7 +85,7 @@ public class MockAMQConnection extends AMQConnection } @Override - public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException { _connected = true; _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 10ec220d9e..fc7f8148f0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -200,15 +200,8 @@ public class AMQProtocolHandlerTest extends TestCase _handler.getStateManager().error(trigger); _logger.info("Setting state to be CONNECTION_CLOSED."); - try - { - _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - } - catch (AMQException e) - { - _logger.error("Unable to change the state to closed.", e); - fail("Unable to change the state to closed due to :"+e.getMessage()); - } + + _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); _logger.info("Firing exception"); _handler.propagateExceptionToFrameListeners(trigger); -- cgit v1.2.1 From 9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 16 Sep 2009 10:06:55 +0000 Subject: QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine. Allow an existing SocketConnector to be passed into a MINANetworkDriver, for use with the ExistingSocket bit of TransportConnection. Move the ExistingSocket stuff to one place, use MINANetworkDriver in TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession. Move fireAsynchEvent to Job Add getLocalAddress to AMQProtocolEngine Move TestNetworkDriver to common Use correct class for logger in AMQProtocolEngine Check the exception is thrown properly in SimpleACLTest, make it a little less prone to obscure race conditions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/client/build.xml | 1 + .../qpid/client/AMQConnectionDelegate_8_0.java | 2 +- .../qpid/client/failover/FailoverHandler.java | 10 +- .../qpid/client/protocol/AMQProtocolHandler.java | 363 +++++++++++---------- .../qpid/client/protocol/AMQProtocolSession.java | 110 +------ .../transport/SocketTransportConnection.java | 62 ++-- .../qpid/client/transport/TransportConnection.java | 19 +- .../transport/VmPipeTransportConnection.java | 11 +- .../client/protocol/AMQProtocolHandlerTest.java | 5 +- 9 files changed, 251 insertions(+), 332 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml index 321e613d94..3c6132dc5b 100644 --- a/qpid/java/client/build.xml +++ b/qpid/java/client/build.xml @@ -21,6 +21,7 @@ + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index a0b69b5493..9876393d4c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -97,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn.getProtocolHandler().createIoTransportSession(brokerDetail); } - + _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 927f660932..8223cd5394 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.failover; -import org.apache.mina.common.IoSession; - import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; @@ -81,9 +79,6 @@ public class FailoverHandler implements Runnable /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class); - /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */ - private final IoSession _session; - /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */ private AMQProtocolHandler _amqProtocolHandler; @@ -99,10 +94,9 @@ public class FailoverHandler implements Runnable * @param amqProtocolHandler The protocol handler that spans the failover. * @param session The MINA session, for the failing connection. */ - public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) + public FailoverHandler(AMQProtocolHandler amqProtocolHandler) { _amqProtocolHandler = amqProtocolHandler; - _session = session; } /** @@ -221,7 +215,7 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setFailoverState(FailoverState.FAILED); /*try {*/ - _amqProtocolHandler.exceptionCaught(_session, e); + _amqProtocolHandler.exception(e); /*} catch (Exception ex) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ab3ff8ecb0..c7e2493025 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,10 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; @@ -48,16 +44,25 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.pool.Event; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.PoolingFilter; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -120,7 +125,7 @@ import java.util.concurrent.CountDownLatch; * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ -public class AMQProtocolHandler extends IoHandlerAdapter +public class AMQProtocolHandler implements ProtocolEngine { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); @@ -137,7 +142,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private volatile AMQProtocolSession _protocolSession; /** Holds the state of the protocol session. */ - private AMQStateManager _stateManager = new AMQStateManager(); + private AMQStateManager _stateManager; /** Holds the method listeners, */ private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); @@ -166,7 +171,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); - + private AMQCodecFactory _codecFactory; + private Job _readJob; + private Job _writeJob; + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private NetworkDriver _networkDriver; + + private long _writtenBytes; + private long _readBytes; + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -175,86 +188,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQProtocolHandler(AMQConnection con) { _connection = con; - } - - /** - * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the - * session, which filters the events handled by this handler. The filter chain consists of, handing off events - * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. - * - * @param session The MINA session. - * - * @throws Exception Any underlying exceptions are allowed to fall through to MINA. - */ - public void sessionCreated(IoSession session) throws Exception - { - _logger.debug("Protocol session created for session " + System.identityHashCode(session)); - _failoverHandler = new FailoverHandler(this, session); - - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession)); - - if (Boolean.getBoolean("amqj.shared_read_write_pool")) - { - session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - session.getFilterChain().addLast("protocolFilter", pcf); - } - // we only add the SSL filter where we have an SSL connection - if (_connection.getSSLConfiguration() != null) - { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = - new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); - sslFilter.setUseClientMode(true); - session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); - } - - try - { - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - _logger.error(e.getMessage(), e); - } - - if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME)) - { - try - { - //Add IO Protection Filters - IoFilterChain chain = session.getFilterChain(); - - session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty( - ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty( - ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT))); - writefilter.attach(chain); - session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - - _logger.info("Using IO Read/Write Filter Protection"); - } - catch (Exception e) - { - _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); - } - } - _protocolSession = new AMQProtocolSession(this, session, _connection); - - _stateManager.setProtocolSession(_protocolSession); - - _protocolSession.init(); + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager = new AMQStateManager(_protocolSession); + _codecFactory = new AMQCodecFactory(false, _protocolSession); + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); + _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _poolReference.acquireExecutorService(); + _failoverHandler = new FailoverHandler(this); } /** @@ -283,12 +224,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter * may be called first followed by this method. This depends on whether the client was trying to send data at the * time of the failure. * - * @param session The MINA session. - * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and * not otherwise? The above comment doesn't make that clear. */ - public void sessionClosed(IoSession session) + public void closed() { if (_connection.isClosed()) { @@ -327,7 +266,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _logger.debug("sessionClose() not allowed to failover"); _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", null)); + "Server closed connection and reconnection " + "not permitted.", + _stateManager.getLastException())); } else { @@ -350,43 +290,39 @@ public class AMQProtocolHandler extends IoHandlerAdapter failoverThread.start(); } - public void sessionIdle(IoSession session, IdleStatus status) throws Exception + @Override + public void readerIdle() { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - // write heartbeat frame: - _logger.debug("Sent heartbeat"); - session.write(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - // failover: - HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - session.close(); - } + _logger.debug("Protocol Session [" + this + "] idle: reader"); + // failover: + HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + _networkDriver.close(); + } + + @Override + public void writerIdle() + { + _logger.debug("Protocol Session [" + this + "] idle: reader"); + writeFrame(HeartbeatBody.FRAME); + HeartbeatDiagnostics.sent(); } /** - * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an - * IOException, MINA will close the connection automatically. - * - * @param session The MINA session. - * @param cause The exception that triggered this event. + * Invoked when any exception is thrown by the NetworkDriver */ - public void exceptionCaught(IoSession session, Throwable cause) + public void exception(Throwable cause) { + _logger.info("AS: HELLO"); if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attemp failover - - sessionClosed(session); + // this will attempt failover + _networkDriver.close(); + closed(); } else { @@ -437,6 +373,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void propagateExceptionToAllWaiters(Exception e) { getStateManager().error(e); + propagateExceptionToFrameListeners(e); } @@ -490,48 +427,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static int _messageReceivedCount; - public void messageReceived(IoSession session, Object message) throws Exception - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); - } - if(message instanceof AMQFrame) + @Override + public void received(ByteBuffer msg) + { + try { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; + _readBytes += msg.remaining(); + final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - if (debug && ((msgNumber % 1000) == 0)) + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + @Override + public void run() + { + // Decode buffer - bodyFrame.handle(frame.getChannel(), _protocolSession); + for (AMQDataBlock message : dataBlocks) + { - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + try + { + if (PROTOCOL_DEBUG) + { + _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); + } + + if(message instanceof AMQFrame) + { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; + + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + bodyFrame.handle(frame.getChannel(), _protocolSession); + + _connection.bytesReceived(_readBytes); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + ProtocolVersion pv = protocolInit.checkVersion(); + getConnection().setProtocolVersion(pv); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + } + catch (Exception e) + { + e.printStackTrace(); + _logger.error("Exception processing frame", e); + propagateExceptionToFrameListeners(e); + exception(e); + } + } + } + })); } - else if (message instanceof ProtocolInitiation) + catch (Exception e) { - // We get here if the server sends a response to our initial protocol header - // suggesting an alternate ProtocolVersion; the server will then close the - // connection. - ProtocolInitiation protocolInit = (ProtocolInitiation) message; - ProtocolVersion pv = protocolInit.checkVersion(); - getConnection().setProtocolVersion(pv); - - // get round a bug in old versions of qpid whereby the connection is not closed - _stateManager.changeState(AMQState.CONNECTION_CLOSED); + propagateExceptionToFrameListeners(e); + exception(e); } } - public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session) + public void methodBodyReceived(final int channelId, final AMQBody bodyFrame) throws AMQException { @@ -571,32 +544,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter { propagateExceptionToFrameListeners(e); - exceptionCaught(session, e); + exception(e); } } private static int _messagesOut; - public void messageSent(IoSession session, Object message) throws Exception - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.debug(String.format("SEND: [%s] %s", this, message)); - } - - final long sentMessages = _messagesOut++; - - final boolean debug = _logger.isDebugEnabled(); - - if (debug && ((sentMessages % 1000) == 0)) - { - _logger.debug("Sent " + _messagesOut + " protocol messages"); - } - - _connection.bytesSent(session.getWrittenBytes()); - } - public StateWaiter createWaiter(Set states) throws AMQException { return getStateManager().createWaiter(states); @@ -610,12 +564,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void writeFrame(AMQDataBlock frame) { - _protocolSession.writeFrame(frame); + writeFrame(frame, false); } public void writeFrame(AMQDataBlock frame, boolean wait) { - _protocolSession.writeFrame(frame, wait); + ByteBuffer buf = frame.toNioByteBuffer(); + _writtenBytes += buf.remaining(); + _networkDriver.send(buf); + if (PROTOCOL_DEBUG) + { + _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); + } + + final long sentMessages = _messagesOut++; + + final boolean debug = _logger.isDebugEnabled(); + + if (debug && ((sentMessages % 1000) == 0)) + { + _logger.debug("Sent " + _messagesOut + " protocol messages"); + } + + _connection.bytesSent(_writtenBytes); + + if (wait) + { + _networkDriver.flush(); + } } /** @@ -673,7 +649,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter //FIXME: At this point here we should check or before add we should check _stateManager is in an open // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 } - _protocolSession.writeFrame(frame); + writeFrame(frame); return listener.blockForFrame(timeout); // When control resumes before this line, a reply will have been received @@ -723,20 +699,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter final AMQFrame frame = body.generateFrame(0); //If the connection is already closed then don't do a syncWrite - if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) - { - _protocolSession.closeProtocolSession(false); - } - else + if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); + _networkDriver.close(); + closed(); } catch (AMQTimeoutException e) { - _protocolSession.closeProtocolSession(false); + closed(); } catch (FailoverException e) { @@ -748,13 +721,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** @return the number of bytes read from this protocol session */ public long getReadBytes() { - return _protocolSession.getIoSession().getReadBytes(); + return _readBytes; } /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { - return _protocolSession.getIoSession().getWrittenBytes(); + return _writtenBytes; } public void failover(String host, int port) @@ -807,6 +780,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; + _stateManager.setProtocolSession(_protocolSession); } public AMQProtocolSession getProtocolSession() @@ -843,4 +817,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter { return _protocolSession.getProtocolVersion(); } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + /** @param delay delay in seconds (not ms) */ + void initHeartbeats(int delay) + { + if (delay > 0) + { + getNetworkDriver().setMaxWriteIdle(delay); + getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); + } + } + + public NetworkDriver getNetworkDriver() + { + return _networkDriver; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 0e872170aa..cd049c24a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.commons.lang.StringUtils; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang.StringUtils; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - protected final IoSession _minaProtocolSession; - - protected WriteFuture _lastWriteFuture; - /** * The handler from which this session was created and which is used to handle protocol events. We send failover * events to the handler. @@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final AMQConnection _connection; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private ConnectionTuneParameters _connectionTuneParameters; - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) - { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - _minaProtocolSession.setAttachment(this); - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - // fixme - real value needed - _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _protocolVersion = connection.getProtocolVersion(); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); - _connection = connection; + private SaslClient _saslClient; - } + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - _protocolHandler = protocolHandler; - _minaProtocolSession = null; + _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); @@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() @@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return getAMQConnection().getPassword(); } - public IoSession getIoSession() - { - return _minaProtocolSession; - } - public SaslClient getSaslClient() { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); + return _saslClient; } /** @@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void setSaslClient(SaslClient client) { - if (client == null) - { - _minaProtocolSession.removeAttribute(SASL_CLIENT); - } - else - { - _minaProtocolSession.setAttribute(SASL_CLIENT, client); - } + _saslClient = client; } public ConnectionTuneParameters getConnectionTuneParameters() { - return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS); + return _connectionTuneParameters; } public void setConnectionTuneParameters(ConnectionTuneParameters params) { - _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params); + _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); + _protocolHandler.initHeartbeats((int) params.getHeartbeat()); } /** @@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void writeFrame(AMQDataBlock frame) { - writeFrame(frame, false); + _protocolHandler.writeFrame(frame); } public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f = _minaProtocolSession.write(frame); - if (wait) - { - // fixme -- time out? - f.join(); - } - else - { - _lastWriteFuture = f; - } + _protocolHandler.writeFrame(frame, wait); } /** @@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQConnection getAMQConnection() { - return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); + return _connection; } - public void closeProtocolSession() + public void closeProtocolSession() throws AMQException { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - _logger.debug("Waiting for last write to join."); - if (waitLast && (_lastWriteFuture != null)) - { - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("Closing protocol session"); - - final CloseFuture future = _minaProtocolSession.close(); - - // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED - // then wait for the connection to close. - // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any - // error now shouldn't matter. - - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + _protocolHandler.closeConnection(0); } public void failover(String host, int port) @@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession id = _queueId++; } // get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", ""); + String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", ""); return new AMQShortString("tmp_" + localAddress + "_" + id); } - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { final AMQSession session = getSession(channelId); @@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException { - _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + _protocolHandler.methodBodyReceived(channel, amqMethodBody); } public void notifyError(Exception error) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index b2f7ae8395..77c9c40e82 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -24,23 +24,33 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.SSLFilter; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.net.InetAddressCachePolicy; + import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.security.GeneralSecurityException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.net.ssl.SSLEngine; + public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); @@ -71,61 +81,27 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); - scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); - scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address; if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) { address = null; - - Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://' transport:" + brokerDetail); - } } else { address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); _logger.info("Attempting connection to " + address); } - - - ConnectFuture future = ioConnector.connect(address, protocolHandler); - - // wait for connection to complete - if (future.join(brokerDetail.getTimeout())) - { - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - } - else + + SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) { - throw new IOException("Timeout waiting for connection."); + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); } + + MINANetworkDriver driver = new MINANetworkDriver(ioConnector); + driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); + protocolHandler.setNetworkDriver(driver); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 4ff24c3607..45194750dc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -79,7 +79,7 @@ public class TransportConnection return _openSocketRegister.remove(socketID); } - public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException + public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -95,7 +95,22 @@ public class TransportConnection { public IoConnector newSocketConnector() { - return new ExistingSocketConnector(1,new QpidThreadExecutor()); + ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); + + Socket socket = TransportConnection.removeOpenSocket(details.getHost()); + + if (socket != null) + { + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) connector).setOpenSocket(socket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://' transport:" + details); + } + return connector; } }); case TCP: diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index dca6efba67..3de6f9b9ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -28,6 +28,7 @@ import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ public class VmPipeTransportConnection implements ITransportConnection private static int _port; + private MINANetworkDriver _networkDriver; + public VmPipeTransportConnection(int port) { _port = port; @@ -47,16 +50,16 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new QpidVmPipeConnector(); - final IoServiceConfig cfg = ioConnector.getDefaultConfig(); - - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, protocolHandler); + _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); + protocolHandler.setNetworkDriver(_networkDriver); + ConnectFuture future = ioConnector.connect(address, _networkDriver); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting future.getSession(); + _networkDriver.setProtocolEngine(protocolHandler); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index fc7f8148f0..f520a21ba0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.TestNetworkDriver; import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.state.AMQState; @@ -72,9 +73,7 @@ public class AMQProtocolHandlerTest extends TestCase { //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); - - _handler.sessionCreated(new MockIoSession()); - + _handler.setNetworkDriver(new TestNetworkDriver()); AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); _blockFrame = new AMQFrame(0, body); -- cgit v1.2.1 From 93fa7d17feecb3d27cead67e11b250af1fcc595e Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 16 Sep 2009 11:32:28 +0000 Subject: QPID-2015: Remove AMQIoTransportProtocolSession. Release the executor service in the same class as it's acquired git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815729 13f79535-47bb-0310-9956-ffa450edef68 --- .../protocol/AMQIoTransportProtocolSession.java | 146 --------------------- .../qpid/client/protocol/AMQProtocolHandler.java | 1 + 2 files changed, 1 insertion(+), 146 deletions(-) delete mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java deleted file mode 100644 index 8782e00a12..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java +++ /dev/null @@ -1,146 +0,0 @@ -package org.apache.qpid.client.protocol; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.UUID; - -import javax.security.sasl.SaslClient; - -import org.apache.commons.lang.StringUtils; -import org.apache.mina.common.IdleStatus; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.Sender; - -public class AMQIoTransportProtocolSession extends AMQProtocolSession -{ - - protected Sender _ioSender; - private SaslClient _saslClient; - private ConnectionTuneParameters _connectionTuneParameters; - - public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) - { - super(protocolHandler, connection); - } - - @Override - public void closeProtocolSession(boolean waitLast) - { - _ioSender.close(); - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - } - - @Override - public void init() - { - _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer()); - _ioSender.flush(); - } - - @Override - protected AMQShortString generateQueueName() - { - int id; - synchronized (_queueIdLock) - { - id = _queueId++; - } - return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id); - } - - @Override - public AMQConnection getAMQConnection() - { - return _connection; - } - - @Override - public SaslClient getSaslClient() - { - return _saslClient; - } - - @Override - public void setSaslClient(SaslClient client) - { - _saslClient = client; - } - - /** @param delay delay in seconds (not ms) */ - @Override - void initHeartbeats(int delay) - { - if (delay > 0) - { - // FIXME: actually do something here - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - - @Override - public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException - { - // FIXME? - _protocolHandler.methodBodyReceived(channel, amqMethodBody, null); - } - - @Override - public void writeFrame(AMQDataBlock frame, boolean wait) - { - _ioSender.send(frame.toNioByteBuffer()); - if (wait) - { - _ioSender.flush(); - } - } - - @Override - public void setSender(Sender sender) - { - _ioSender = sender; - } - - @Override - public ConnectionTuneParameters getConnectionTuneParameters() - { - return _connectionTuneParameters; - } - - @Override - public void setConnectionTuneParameters(ConnectionTuneParameters params) - { - _connectionTuneParameters = params; - AMQConnection con = getAMQConnection(); - con.setMaximumChannelCount(params.getChannelMax()); - con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c7e2493025..99366101d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -716,6 +716,7 @@ public class AMQProtocolHandler implements ProtocolEngine _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); } } + _poolReference.releaseExecutorService(); } /** @return the number of bytes read from this protocol session */ -- cgit v1.2.1 From 31bbc100ac6b3a31eb25d29f407d60ff23334d1f Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 17 Sep 2009 15:19:54 +0000 Subject: QPID-2024 QPID-2105: Remove now unnecessary classes like Event, PoolingFilter, ReadWriteThreadModel. Move the couple of necessary methods to Job. Fix imports. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816232 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/protocol/AMQProtocolHandler.java | 53 ++++++++++------------ .../transport/SocketTransportConnection.java | 23 ++-------- .../qpid/client/transport/TransportConnection.java | 14 +++--- .../transport/VmPipeTransportConnection.java | 6 +-- 4 files changed, 35 insertions(+), 61 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 99366101d1..be75fc150f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,20 +20,22 @@ */ package org.apache.qpid.client.protocol; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; + import org.apache.mina.filter.codec.ProtocolCodecException; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; @@ -42,32 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.Event; import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the @@ -107,9 +106,6 @@ import java.util.concurrent.CountDownLatch; * *

*
CRC Card
Responsibilities Collaborations - *
Create the filter chain to filter this handlers events. - * {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. - * *
Maintain fail-over state. *
*
@@ -191,9 +187,8 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); - _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); _failoverHandler = new FailoverHandler(this); } @@ -436,7 +431,7 @@ public class AMQProtocolHandler implements ProtocolEngine _readBytes += msg.remaining(); final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { @Override public void run() @@ -495,7 +490,7 @@ public class AMQProtocolHandler implements ProtocolEngine } } } - })); + }); } catch (Exception e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 77c9c40e82..1ac8f62e32 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,37 +20,20 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; - import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.network.mina.MINANetworkDriver; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.net.InetAddressCachePolicy; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.GeneralSecurityException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.net.ssl.SSLEngine; - public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 45194750dc..a4f8bb0166 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; @@ -30,20 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.thread.QpidThreadExecutor; import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.net.Socket; - /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 3de6f9b9ea..504d475740 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -20,20 +20,18 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; + import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class VmPipeTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); -- cgit v1.2.1 From 2296769193754d1bc09e1dc3b998709a5808ecbb Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 18 Sep 2009 12:54:23 +0000 Subject: QPID-2104 AMQProtocolHandler: hand the actual write off to a seperate thread git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816612 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/protocol/AMQProtocolHandler.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index be75fc150f..06a1fe2696 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -564,9 +564,16 @@ public class AMQProtocolHandler implements ProtocolEngine public void writeFrame(AMQDataBlock frame, boolean wait) { - ByteBuffer buf = frame.toNioByteBuffer(); + final ByteBuffer buf = frame.toNioByteBuffer(); _writtenBytes += buf.remaining(); - _networkDriver.send(buf); + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() + { + @Override + public void run() + { + _networkDriver.send(buf); + } + }); if (PROTOCOL_DEBUG) { _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); -- cgit v1.2.1 From 98cc985dbd81a84cd0b0a969c57cb941680ec81f Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Sun, 11 Oct 2009 23:22:08 +0000 Subject: Merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@824198 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/client/src/main/java/log4j.xml | 36 +++++ .../java/org/apache/qpid/client/AMQConnection.java | 12 +- .../apache/qpid/client/AMQConnectionDelegate.java | 9 ++ .../qpid/client/AMQConnectionDelegate_0_10.java | 13 +- .../qpid/client/AMQConnectionDelegate_8_0.java | 13 +- .../java/org/apache/qpid/client/AMQSession.java | 152 +++++++++++++++------ .../org/apache/qpid/client/AMQSession_0_10.java | 3 - .../org/apache/qpid/client/AMQSession_0_8.java | 11 +- .../apache/qpid/client/BasicMessageConsumer.java | 1 + .../apache/qpid/client/BasicMessageProducer.java | 2 +- .../org/apache/qpid/client/XAConnectionImpl.java | 2 +- .../client/configuration/ClientProperties.java | 2 +- .../qpid/client/failover/FailoverHandler.java | 2 + .../client/handler/ClientMethodDispatcherImpl.java | 4 +- .../qpid/client/protocol/AMQProtocolHandler.java | 1 - .../apache/qpid/client/util/BlockingWaiter.java | 2 +- .../qpid/jms/failover/FailoverExchangeMethod.java | 13 +- 17 files changed, 216 insertions(+), 62 deletions(-) create mode 100644 qpid/java/client/src/main/java/log4j.xml (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/log4j.xml b/qpid/java/client/src/main/java/log4j.xml new file mode 100644 index 0000000000..c27acba818 --- /dev/null +++ b/qpid/java/client/src/main/java/log4j.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0d2adcec8a..ed122a772e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -313,7 +313,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages - protected int _maxPrefetch; + private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; @@ -450,7 +450,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - // use the defaul value set for all connections + // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } @@ -512,7 +512,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect boolean retryAllowed = true; Exception connectionException = null; - while (!_connected && retryAllowed) + while (!_connected && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try @@ -691,12 +691,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean attemptReconnection() { - while (_failoverPolicy.failoverAllowed()) + BrokerDetails broker = null; + while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { try { - makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - + makeBrokerConnection(broker); return true; } catch (Exception e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index e5980d8b7d..e6c3473cb1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -39,6 +39,15 @@ public interface AMQConnectionDelegate Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + XASession createXASession() throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; void failoverPrep(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 927929c94a..4d10180667 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -99,6 +99,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return session; } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + /** * create an XA Session and start it if required. */ @@ -285,7 +297,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.setIdleTimeout(l); } - @Override public int getMaxChannelID() { return Integer.MAX_VALUE; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 9876393d4c..97d0d0516e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -191,6 +191,18 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate }, _conn).execute(); } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { @@ -290,7 +302,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public void setIdleTimeout(long l){} - @Override public int getMaxChannelID() { return (int) (Math.pow(2, 16)-1); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2e3e417c95..dd9a00ce10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,7 +21,6 @@ package org.apache.qpid.client; import java.io.Serializable; -import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -60,6 +59,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -113,7 +113,6 @@ import org.slf4j.LoggerFactory; public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -198,16 +197,32 @@ public abstract class AMQSession= System.currentTimeMillis() ) + { + + _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + } + if(!_flowControl.getFlowControl()) { - _flowControl.wait(); + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 0644bd88a8..1587d6a6bf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession extends Closeable implements Messa else { _session.addDeliveredMessage(msg.getDeliveryTag()); + _session.markDirty(); } break; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5ff6066ddc..44ce59975a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -60,7 +60,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac /** * Priority of messages created by this producer. */ - private int _messagePriority; + private int _messagePriority = Message.DEFAULT_PRIORITY; /** * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index 20fa68605a..43025bd724 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2); + return _delegate.createXASession(); } //-- Interface XAQueueConnection diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 3627618e68..be0d283470 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -39,7 +39,7 @@ public class ClientProperties * type: long */ public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch"; - public static final String MAX_PREFETCH_DEFAULT = "5000"; + public static final String MAX_PREFETCH_DEFAULT = "500"; /** * When true a sync command is sent after every persistent messages. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 8223cd5394..7761215450 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.failover; import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; @@ -134,6 +135,7 @@ public class FailoverHandler implements Runnable // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); + // Use a fresh new StateManager for the reconnection attempts _amqProtocolHandler.setStateManager(new AMQStateManager()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9c791730ca..0e3333940a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance(); private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); @@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - return false; + _channelFlowMethodHandler.methodReceived(_session, body, channelId); + return true; } public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 06a1fe2696..e645f4f214 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -308,7 +308,6 @@ public class AMQProtocolHandler implements ProtocolEngine */ public void exception(Throwable cause) { - _logger.info("AS: HELLO"); if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 67cda957fb..a3d015eadc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -253,7 +253,7 @@ public abstract class BlockingWaiter } else { - System.err.println("WARNING: new error arrived while old one not yet processed"); + System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage()); } try diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index e05a7ab6e2..960661daea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -189,7 +189,8 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener { synchronized (_brokerListLock) { - return _connectionDetails.getBrokerDetails(_currentBrokerIndex); + _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + return _currentBrokerDetail; } } @@ -214,7 +215,15 @@ public class FailoverExchangeMethod implements FailoverMethod, MessageListener broker.getHost().equals(_currentBrokerDetail.getHost()) && broker.getPort() == _currentBrokerDetail.getPort()) { - return getNextBrokerDetails(); + if (_connectionDetails.getBrokerCount() > 1) + { + return getNextBrokerDetails(); + } + else + { + _failedAttemps ++; + return null; + } } String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); -- cgit v1.2.1 From f62711351f225b7ac77cebe5066fa0a69cdbdf00 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 16 Dec 2014 16:56:59 +0000 Subject: QPID-5099 : [Java Client] release pre-acquired messages after explicit consumer close git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646009 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3d0e972ca2..b1e606b8e9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -645,6 +645,12 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa _receivingThread.interrupt(); } + + + if(!(isBrowseOnly() || getSession().isClosing())) + { + rollback(); + } } } -- cgit v1.2.1 From 6631e8e980107ad609105d4ef1bb2ee5c4275e8b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 2 Jan 2015 09:47:21 +0000 Subject: QPID-6294 : [Java Client] Allow use of 0 prefetch in AMQP 0-8/9/9-1 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1648987 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_8.java | 61 +++++++++++++++++----- .../qpid/client/BasicMessageConsumer_0_8.java | 14 +++-- 2 files changed, 57 insertions(+), 18 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index d86a2739f2..bb0f0d9b13 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -824,12 +824,13 @@ public class AMQSession_0_8 extends AMQSession 0 || sizePrefetch > 0) + { + BasicQosBody basicQosBody = + getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + } } @@ -842,13 +843,17 @@ public class AMQSession_0_8 extends AMQSession= getPrefetch()) + if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0) { BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry() .createBasicQosBody(0, currentPrefetch + 1, false); getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + if(currentPrefetch == 0 && !isSuspended()) + { + sendSuspendChannel(false); + } _creditChanged.set(true); return true; } @@ -863,8 +868,7 @@ public class AMQSession_0_8 extends AMQSession Date: Tue, 20 Jan 2015 16:47:50 +0000 Subject: QPID-6294 : Fix issue whereby AUTO ACK receive consumer would only ever receive one message since _currentPrefetch would never be zeroed git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653293 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/example/Hello.java | 37 ++++++++++++++++++++-- .../qpid/client/BasicMessageConsumer_0_8.java | 7 ++-- 2 files changed, 39 insertions(+), 5 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java index 7e956698d1..aab08ebf0f 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java @@ -34,6 +34,9 @@ import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.configuration.ClientProperties; + public class Hello { @@ -42,10 +45,38 @@ public class Hello { } - public static void main(String[] args) + public static void main(String[] args) throws Exception { - Hello hello = new Hello(); - hello.runTest(); + System.setProperty(ClientProperties.AMQP_VERSION, "0-91"); + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0"); + System.setProperty(ClientProperties.DEST_SYNTAX, "BURL"); + + Connection conn = new AMQConnection("127.0.0.1", 5672, "admin","admin", "client", "/"); + + conn.start(); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("queue"); + + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + for(int i = 0 ; i < 2 ; i ++) + { + TextMessage message = (TextMessage) consumer.receive(1000l); + System.out.println(message == null ? "null" : message.getText()); + } + for(int i = 0 ; i < 2 ; i ++) + { + TextMessage message = session.createTextMessage("Hello " + i); + producer.send(message); + } + + for(int i = 0 ; i < 2 ; i ++) + { + TextMessage message = (TextMessage) consumer.receive(1000l); + System.out.println(message == null ? "null" : message.getText()); + } } private void runTest() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 3cc50512ed..1d7bb6087a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,7 +183,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer Date: Tue, 20 Jan 2015 16:50:09 +0000 Subject: QPID-6294 : revert unintended hanges to example file git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653295 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/example/Hello.java | 37 ++-------------------- 1 file changed, 3 insertions(+), 34 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java index aab08ebf0f..109a72bcbf 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java @@ -34,9 +34,6 @@ import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.configuration.ClientProperties; - public class Hello { @@ -45,38 +42,10 @@ public class Hello { } - public static void main(String[] args) throws Exception + public static void main(String[] args) { - System.setProperty(ClientProperties.AMQP_VERSION, "0-91"); - System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0"); - System.setProperty(ClientProperties.DEST_SYNTAX, "BURL"); - - Connection conn = new AMQConnection("127.0.0.1", 5672, "admin","admin", "client", "/"); - - conn.start(); - - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("queue"); - - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - for(int i = 0 ; i < 2 ; i ++) - { - TextMessage message = (TextMessage) consumer.receive(1000l); - System.out.println(message == null ? "null" : message.getText()); - } - for(int i = 0 ; i < 2 ; i ++) - { - TextMessage message = session.createTextMessage("Hello " + i); - producer.send(message); - } - - for(int i = 0 ; i < 2 ; i ++) - { - TextMessage message = (TextMessage) consumer.receive(1000l); - System.out.println(message == null ? "null" : message.getText()); - } + Hello hello = new Hello(); + hello.runTest(); } private void runTest() -- cgit v1.2.1 From d6ad905aa5da8b0c4be507c87cdae735d778154f Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 27 Jan 2015 13:27:05 +0000 Subject: QPID-6336: [Java Broker] Change 0-8..0-91 path to await the receiver being closed before continuing with the next connection attempt Based on work by Oleksandr Rudyy git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1655034 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/AMQConnectionDelegate_8_0.java | 80 +++++++++++++++++++++- .../qpid/client/protocol/AMQProtocolHandler.java | 2 +- 2 files changed, 80 insertions(+), 2 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 66cade18a4..35582d92b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -21,16 +21,20 @@ package org.apache.qpid.client; import java.net.ConnectException; +import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.XASession; +import org.apache.qpid.transport.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +71,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; + private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, + Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, + ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); private boolean _messageCompressionSupported; private boolean _addrSyntaxSupported; private boolean _confirmedPublishSupported; @@ -136,7 +143,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler())); + + NetworkConnection network = transport.connect(settings, monitoringReceiver, _conn.getProtocolHandler()); try @@ -171,6 +180,19 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate network.close(); throw e; } + finally + { + // await the receiver to finish its execution (and so the IO threads too) + if (!_conn.isConnected()) + { + boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout); + if (!closedWithinTimeout) + { + _logger.warn("Timed-out waiting for receiver for connection to " + + brokerDetail + " to be closed."); + } + } + } } @@ -503,4 +525,60 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _confirmedPublishNonTransactionalSupported; } + + + private static class ReceiverClosedWaiter implements Receiver + { + private final CountDownLatch _closedWatcher; + private final Receiver _receiver; + + public ReceiverClosedWaiter(Receiver receiver) + { + _receiver = receiver; + _closedWatcher = new CountDownLatch(1); + } + + @Override + public void received(ByteBuffer msg) + { + _receiver.received(msg); + } + + @Override + public void exception(Throwable t) + { + _receiver.exception(t); + } + + @Override + public void closed() + { + try + { + _receiver.closed(); + } + finally + { + _closedWatcher.countDown(); + } + } + + public boolean awaitClose(long timeout) + { + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Waiting " + timeout + "ms for receiver to be closed"); + } + + return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return _closedWatcher.getCount() == 0; + } + } + }; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c61469559a..c2582accdf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -240,7 +240,7 @@ public class AMQProtocolHandler implements ProtocolEngine } catch (Exception e) { - _logger.warn("Exception occured on closing the sender", e); + _logger.warn("Exception occurred on closing the sender", e); } if (_connection.failoverAllowed()) { -- cgit v1.2.1