diff options
| author | Aidan Skinner <aidan@apache.org> | 2009-09-16 10:06:55 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2009-09-16 10:06:55 +0000 |
| commit | 9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 (patch) | |
| tree | 3834f1b7f1fe3fbdd632a9c78f6295e54595abc5 /qpid/java/client | |
| parent | c1ebe66bfab328c5192a35c21ea290b5c45f40f5 (diff) | |
| download | qpid-python-9c4ecc45da929750ff7f0e0a5d7ada4e674b9105.tar.gz | |
QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine.
Allow an existing SocketConnector to be passed into a MINANetworkDriver, for
use with the ExistingSocket bit of TransportConnection.
Move the ExistingSocket stuff to one place, use MINANetworkDriver in
TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession.
Move fireAsynchEvent to Job
Add getLocalAddress to AMQProtocolEngine
Move TestNetworkDriver to common
Use correct class for logger in AMQProtocolEngine
Check the exception is thrown properly in SimpleACLTest, make it a little less
prone to obscure race conditions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
9 files changed, 251 insertions, 332 deletions
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml index 321e613d94..3c6132dc5b 100644 --- a/qpid/java/client/build.xml +++ b/qpid/java/client/build.xml @@ -21,6 +21,7 @@ <project name="AMQ Client" default="build"> <property name="module.depends" value="common"/> + <property name="module.test.depends" value="common/test" /> <property name="module.genpom" value="true"/> <property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index a0b69b5493..9876393d4c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -97,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn.getProtocolHandler().createIoTransportSession(brokerDetail); } - + _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 927f660932..8223cd5394 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.client.failover; -import org.apache.mina.common.IoSession; - import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; @@ -81,9 +79,6 @@ public class FailoverHandler implements Runnable /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class); - /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */ - private final IoSession _session; - /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */ private AMQProtocolHandler _amqProtocolHandler; @@ -99,10 +94,9 @@ public class FailoverHandler implements Runnable * @param amqProtocolHandler The protocol handler that spans the failover. * @param session The MINA session, for the failing connection. */ - public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) + public FailoverHandler(AMQProtocolHandler amqProtocolHandler) { _amqProtocolHandler = amqProtocolHandler; - _session = session; } /** @@ -221,7 +215,7 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setFailoverState(FailoverState.FAILED); /*try {*/ - _amqProtocolHandler.exceptionCaught(_session, e); + _amqProtocolHandler.exception(e); /*} catch (Exception ex) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ab3ff8ecb0..c7e2493025 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,10 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; @@ -48,16 +44,25 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.pool.Event; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.PoolingFilter; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -120,7 +125,7 @@ import java.util.concurrent.CountDownLatch; * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ -public class AMQProtocolHandler extends IoHandlerAdapter +public class AMQProtocolHandler implements ProtocolEngine { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); @@ -137,7 +142,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private volatile AMQProtocolSession _protocolSession; /** Holds the state of the protocol session. */ - private AMQStateManager _stateManager = new AMQStateManager(); + private AMQStateManager _stateManager; /** Holds the method listeners, */ private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); @@ -166,7 +171,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); - + private AMQCodecFactory _codecFactory; + private Job _readJob; + private Job _writeJob; + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private NetworkDriver _networkDriver; + + private long _writtenBytes; + private long _readBytes; + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -175,86 +188,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQProtocolHandler(AMQConnection con) { _connection = con; - } - - /** - * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the - * session, which filters the events handled by this handler. The filter chain consists of, handing off events - * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. - * - * @param session The MINA session. - * - * @throws Exception Any underlying exceptions are allowed to fall through to MINA. - */ - public void sessionCreated(IoSession session) throws Exception - { - _logger.debug("Protocol session created for session " + System.identityHashCode(session)); - _failoverHandler = new FailoverHandler(this, session); - - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession)); - - if (Boolean.getBoolean("amqj.shared_read_write_pool")) - { - session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - session.getFilterChain().addLast("protocolFilter", pcf); - } - // we only add the SSL filter where we have an SSL connection - if (_connection.getSSLConfiguration() != null) - { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = - new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); - sslFilter.setUseClientMode(true); - session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); - } - - try - { - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - _logger.error(e.getMessage(), e); - } - - if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME)) - { - try - { - //Add IO Protection Filters - IoFilterChain chain = session.getFilterChain(); - - session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty( - ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty( - ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT))); - writefilter.attach(chain); - session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - - _logger.info("Using IO Read/Write Filter Protection"); - } - catch (Exception e) - { - _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); - } - } - _protocolSession = new AMQProtocolSession(this, session, _connection); - - _stateManager.setProtocolSession(_protocolSession); - - _protocolSession.init(); + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager = new AMQStateManager(_protocolSession); + _codecFactory = new AMQCodecFactory(false, _protocolSession); + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); + _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _poolReference.acquireExecutorService(); + _failoverHandler = new FailoverHandler(this); } /** @@ -283,12 +224,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter * may be called first followed by this method. This depends on whether the client was trying to send data at the * time of the failure. * - * @param session The MINA session. - * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and * not otherwise? The above comment doesn't make that clear. */ - public void sessionClosed(IoSession session) + public void closed() { if (_connection.isClosed()) { @@ -327,7 +266,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _logger.debug("sessionClose() not allowed to failover"); _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", null)); + "Server closed connection and reconnection " + "not permitted.", + _stateManager.getLastException())); } else { @@ -350,43 +290,39 @@ public class AMQProtocolHandler extends IoHandlerAdapter failoverThread.start(); } - public void sessionIdle(IoSession session, IdleStatus status) throws Exception + @Override + public void readerIdle() { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - // write heartbeat frame: - _logger.debug("Sent heartbeat"); - session.write(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - // failover: - HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - session.close(); - } + _logger.debug("Protocol Session [" + this + "] idle: reader"); + // failover: + HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + _networkDriver.close(); + } + + @Override + public void writerIdle() + { + _logger.debug("Protocol Session [" + this + "] idle: reader"); + writeFrame(HeartbeatBody.FRAME); + HeartbeatDiagnostics.sent(); } /** - * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an - * IOException, MINA will close the connection automatically. - * - * @param session The MINA session. - * @param cause The exception that triggered this event. + * Invoked when any exception is thrown by the NetworkDriver */ - public void exceptionCaught(IoSession session, Throwable cause) + public void exception(Throwable cause) { + _logger.info("AS: HELLO"); if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attemp failover - - sessionClosed(session); + // this will attempt failover + _networkDriver.close(); + closed(); } else { @@ -437,6 +373,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void propagateExceptionToAllWaiters(Exception e) { getStateManager().error(e); + propagateExceptionToFrameListeners(e); } @@ -490,48 +427,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static int _messageReceivedCount; - public void messageReceived(IoSession session, Object message) throws Exception - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); - } - if(message instanceof AMQFrame) + @Override + public void received(ByteBuffer msg) + { + try { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; + _readBytes += msg.remaining(); + final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - if (debug && ((msgNumber % 1000) == 0)) + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + @Override + public void run() + { + // Decode buffer - bodyFrame.handle(frame.getChannel(), _protocolSession); + for (AMQDataBlock message : dataBlocks) + { - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + try + { + if (PROTOCOL_DEBUG) + { + _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); + } + + if(message instanceof AMQFrame) + { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; + + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + bodyFrame.handle(frame.getChannel(), _protocolSession); + + _connection.bytesReceived(_readBytes); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + ProtocolVersion pv = protocolInit.checkVersion(); + getConnection().setProtocolVersion(pv); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + } + catch (Exception e) + { + e.printStackTrace(); + _logger.error("Exception processing frame", e); + propagateExceptionToFrameListeners(e); + exception(e); + } + } + } + })); } - else if (message instanceof ProtocolInitiation) + catch (Exception e) { - // We get here if the server sends a response to our initial protocol header - // suggesting an alternate ProtocolVersion; the server will then close the - // connection. - ProtocolInitiation protocolInit = (ProtocolInitiation) message; - ProtocolVersion pv = protocolInit.checkVersion(); - getConnection().setProtocolVersion(pv); - - // get round a bug in old versions of qpid whereby the connection is not closed - _stateManager.changeState(AMQState.CONNECTION_CLOSED); + propagateExceptionToFrameListeners(e); + exception(e); } } - public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session) + public void methodBodyReceived(final int channelId, final AMQBody bodyFrame) throws AMQException { @@ -571,32 +544,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter { propagateExceptionToFrameListeners(e); - exceptionCaught(session, e); + exception(e); } } private static int _messagesOut; - public void messageSent(IoSession session, Object message) throws Exception - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.debug(String.format("SEND: [%s] %s", this, message)); - } - - final long sentMessages = _messagesOut++; - - final boolean debug = _logger.isDebugEnabled(); - - if (debug && ((sentMessages % 1000) == 0)) - { - _logger.debug("Sent " + _messagesOut + " protocol messages"); - } - - _connection.bytesSent(session.getWrittenBytes()); - } - public StateWaiter createWaiter(Set<AMQState> states) throws AMQException { return getStateManager().createWaiter(states); @@ -610,12 +564,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void writeFrame(AMQDataBlock frame) { - _protocolSession.writeFrame(frame); + writeFrame(frame, false); } public void writeFrame(AMQDataBlock frame, boolean wait) { - _protocolSession.writeFrame(frame, wait); + ByteBuffer buf = frame.toNioByteBuffer(); + _writtenBytes += buf.remaining(); + _networkDriver.send(buf); + if (PROTOCOL_DEBUG) + { + _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); + } + + final long sentMessages = _messagesOut++; + + final boolean debug = _logger.isDebugEnabled(); + + if (debug && ((sentMessages % 1000) == 0)) + { + _logger.debug("Sent " + _messagesOut + " protocol messages"); + } + + _connection.bytesSent(_writtenBytes); + + if (wait) + { + _networkDriver.flush(); + } } /** @@ -673,7 +649,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter //FIXME: At this point here we should check or before add we should check _stateManager is in an open // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 } - _protocolSession.writeFrame(frame); + writeFrame(frame); return listener.blockForFrame(timeout); // When control resumes before this line, a reply will have been received @@ -723,20 +699,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter final AMQFrame frame = body.generateFrame(0); //If the connection is already closed then don't do a syncWrite - if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) - { - _protocolSession.closeProtocolSession(false); - } - else + if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); + _networkDriver.close(); + closed(); } catch (AMQTimeoutException e) { - _protocolSession.closeProtocolSession(false); + closed(); } catch (FailoverException e) { @@ -748,13 +721,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** @return the number of bytes read from this protocol session */ public long getReadBytes() { - return _protocolSession.getIoSession().getReadBytes(); + return _readBytes; } /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { - return _protocolSession.getIoSession().getWrittenBytes(); + return _writtenBytes; } public void failover(String host, int port) @@ -807,6 +780,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; + _stateManager.setProtocolSession(_protocolSession); } public AMQProtocolSession getProtocolSession() @@ -843,4 +817,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter { return _protocolSession.getProtocolVersion(); } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + /** @param delay delay in seconds (not ms) */ + void initHeartbeats(int delay) + { + if (delay > 0) + { + getNetworkDriver().setMaxWriteIdle(delay); + getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); + } + } + + public NetworkDriver getNetworkDriver() + { + return _networkDriver; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 0e872170aa..cd049c24a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.commons.lang.StringUtils; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang.StringUtils; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - protected final IoSession _minaProtocolSession; - - protected WriteFuture _lastWriteFuture; - /** * The handler from which this session was created and which is used to handle protocol events. We send failover * events to the handler. @@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final AMQConnection _connection; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private ConnectionTuneParameters _connectionTuneParameters; - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) - { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - _minaProtocolSession.setAttachment(this); - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - // fixme - real value needed - _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _protocolVersion = connection.getProtocolVersion(); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); - _connection = connection; + private SaslClient _saslClient; - } + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - _protocolHandler = protocolHandler; - _minaProtocolSession = null; + _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); @@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() @@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return getAMQConnection().getPassword(); } - public IoSession getIoSession() - { - return _minaProtocolSession; - } - public SaslClient getSaslClient() { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); + return _saslClient; } /** @@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void setSaslClient(SaslClient client) { - if (client == null) - { - _minaProtocolSession.removeAttribute(SASL_CLIENT); - } - else - { - _minaProtocolSession.setAttribute(SASL_CLIENT, client); - } + _saslClient = client; } public ConnectionTuneParameters getConnectionTuneParameters() { - return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS); + return _connectionTuneParameters; } public void setConnectionTuneParameters(ConnectionTuneParameters params) { - _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params); + _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); + _protocolHandler.initHeartbeats((int) params.getHeartbeat()); } /** @@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void writeFrame(AMQDataBlock frame) { - writeFrame(frame, false); + _protocolHandler.writeFrame(frame); } public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f = _minaProtocolSession.write(frame); - if (wait) - { - // fixme -- time out? - f.join(); - } - else - { - _lastWriteFuture = f; - } + _protocolHandler.writeFrame(frame, wait); } /** @@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQConnection getAMQConnection() { - return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); + return _connection; } - public void closeProtocolSession() + public void closeProtocolSession() throws AMQException { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - _logger.debug("Waiting for last write to join."); - if (waitLast && (_lastWriteFuture != null)) - { - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("Closing protocol session"); - - final CloseFuture future = _minaProtocolSession.close(); - - // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED - // then wait for the connection to close. - // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any - // error now shouldn't matter. - - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + _protocolHandler.closeConnection(0); } public void failover(String host, int port) @@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession id = _queueId++; } // get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", ""); + String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", ""); return new AMQShortString("tmp_" + localAddress + "_" + id); } - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { final AMQSession session = getSession(channelId); @@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException { - _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + _protocolHandler.methodBodyReceived(channel, amqMethodBody); } public void notifyError(Exception error) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index b2f7ae8395..77c9c40e82 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -24,23 +24,33 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.SSLFilter; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.net.InetAddressCachePolicy; + import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.security.GeneralSecurityException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.net.ssl.SSLEngine; + public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); @@ -71,61 +81,27 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); - scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); - scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address; if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) { address = null; - - Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport:" + brokerDetail); - } } else { address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); _logger.info("Attempting connection to " + address); } - - - ConnectFuture future = ioConnector.connect(address, protocolHandler); - - // wait for connection to complete - if (future.join(brokerDetail.getTimeout())) - { - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - } - else + + SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) { - throw new IOException("Timeout waiting for connection."); + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); } + + MINANetworkDriver driver = new MINANetworkDriver(ioConnector); + driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); + protocolHandler.setNetworkDriver(driver); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 4ff24c3607..45194750dc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -79,7 +79,7 @@ public class TransportConnection return _openSocketRegister.remove(socketID); } - public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException + public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -95,7 +95,22 @@ public class TransportConnection { public IoConnector newSocketConnector() { - return new ExistingSocketConnector(1,new QpidThreadExecutor()); + ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); + + Socket socket = TransportConnection.removeOpenSocket(details.getHost()); + + if (socket != null) + { + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) connector).setOpenSocket(socket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://<SocketID>' transport:" + details); + } + return connector; } }); case TCP: diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index dca6efba67..3de6f9b9ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -28,6 +28,7 @@ import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ public class VmPipeTransportConnection implements ITransportConnection private static int _port; + private MINANetworkDriver _networkDriver; + public VmPipeTransportConnection(int port) { _port = port; @@ -47,16 +50,16 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new QpidVmPipeConnector(); - final IoServiceConfig cfg = ioConnector.getDefaultConfig(); - - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, protocolHandler); + _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); + protocolHandler.setNetworkDriver(_networkDriver); + ConnectFuture future = ioConnector.connect(address, _networkDriver); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting future.getSession(); + _networkDriver.setProtocolEngine(protocolHandler); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index fc7f8148f0..f520a21ba0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.TestNetworkDriver; import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.state.AMQState; @@ -72,9 +73,7 @@ public class AMQProtocolHandlerTest extends TestCase { //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); - - _handler.sessionCreated(new MockIoSession()); - + _handler.setNetworkDriver(new TestNetworkDriver()); AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); _blockFrame = new AMQFrame(0, body); |
