summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-10-15 01:06:23 +0000
committerAidan Skinner <aidan@apache.org>2009-10-15 01:06:23 +0000
commit85ad542c1d5724cb5a1294f12e826a83972ef433 (patch)
treeaabcb3336d6b4b7d3ae9bb0a234bff0c9b4015aa /java/client/src
parentf4809a431d4b374e866f5988a4099ac927f42dc4 (diff)
downloadqpid-python-85ad542c1d5724cb5a1294f12e826a83972ef433.tar.gz
Merge java-network-refactor branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@825362 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java146
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java402
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java110
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java71
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java17
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java5
10 files changed, 284 insertions, 566 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 8a14e911f1..97d0d0516e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -97,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
_conn.getProtocolHandler().createIoTransportSession(brokerDetail);
}
-
+ _conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index cb6c196a45..f312f3c0ec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.client.failover;
-import org.apache.mina.common.IoSession;
-
import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
@@ -81,9 +80,6 @@ public class FailoverHandler implements Runnable
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
- /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
- private final IoSession _session;
-
/** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
private AMQProtocolHandler _amqProtocolHandler;
@@ -99,10 +95,9 @@ public class FailoverHandler implements Runnable
* @param amqProtocolHandler The protocol handler that spans the failover.
* @param session The MINA session, for the failing connection.
*/
- public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+ public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
{
_amqProtocolHandler = amqProtocolHandler;
- _session = session;
}
/**
@@ -139,29 +134,10 @@ public class FailoverHandler implements Runnable
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
-
-
- // We are failing over so lets ensure any existing ProtocolSessions
- // are closed. Closing them will update the stateManager which we
- // probably don't want to record the change to the closed state.
- // So lets make a new one.
- _amqProtocolHandler.setStateManager(new AMQStateManager());
-
- // Close the session, we need to wait for it to close as there may have
- // been data in transit such as an ack that is still valid to send.
- //
- // While we are allowing data to continue to be written to the
- // socket assuming the connection is still valid, we do not consider
- // the possibility that the problem that triggered failover was
- // entirely client side. In that situation the socket will still be
- // open and the we should really send a ConnectionClose to be AMQP
- // compliant.
- _amqProtocolHandler.getProtocolSession().closeProtocolSession();
-
+
// Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
-
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_logger.info("Failover process veto-ed by client");
@@ -240,7 +216,7 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setFailoverState(FailoverState.FAILED);
/*try
{*/
- _amqProtocolHandler.exceptionCaught(_session, e);
+ _amqProtocolHandler.exception(e);
/*}
catch (Exception ex)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
deleted file mode 100644
index 8782e00a12..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package org.apache.qpid.client.protocol;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.UUID;
-
-import javax.security.sasl.SaslClient;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.IdleStatus;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.Sender;
-
-public class AMQIoTransportProtocolSession extends AMQProtocolSession
-{
-
- protected Sender<java.nio.ByteBuffer> _ioSender;
- private SaslClient _saslClient;
- private ConnectionTuneParameters _connectionTuneParameters;
-
- public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
- {
- super(protocolHandler, connection);
- }
-
- @Override
- public void closeProtocolSession(boolean waitLast)
- {
- _ioSender.close();
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- }
-
- @Override
- public void init()
- {
- _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
- _ioSender.flush();
- }
-
- @Override
- protected AMQShortString generateQueueName()
- {
- int id;
- synchronized (_queueIdLock)
- {
- id = _queueId++;
- }
- return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
- }
-
- @Override
- public AMQConnection getAMQConnection()
- {
- return _connection;
- }
-
- @Override
- public SaslClient getSaslClient()
- {
- return _saslClient;
- }
-
- @Override
- public void setSaslClient(SaslClient client)
- {
- _saslClient = client;
- }
-
- /** @param delay delay in seconds (not ms) */
- @Override
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- // FIXME: actually do something here
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
- @Override
- public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
- {
- // FIXME?
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
- }
-
- @Override
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _ioSender.send(frame.toNioByteBuffer());
- if (wait)
- {
- _ioSender.flush();
- }
- }
-
- @Override
- public void setSender(Sender<java.nio.ByteBuffer> sender)
- {
- _ioSender = sender;
- }
-
- @Override
- public ConnectionTuneParameters getConnectionTuneParameters()
- {
- return _connectionTuneParameters;
- }
-
- @Override
- public void setConnectionTuneParameters(ConnectionTuneParameters params)
- {
- _connectionTuneParameters = params;
- AMQConnection con = getAMQConnection();
- con.setMaximumChannelCount(params.getChannelMax());
- con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 6f62070fd0..d625cdb6f2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,24 +20,22 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -46,23 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
* network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
@@ -102,9 +106,6 @@ import java.util.concurrent.CountDownLatch;
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
- *
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
@@ -120,7 +121,7 @@ import java.util.concurrent.CountDownLatch;
* held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandler implements ProtocolEngine
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -137,7 +138,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private volatile AMQProtocolSession _protocolSession;
/** Holds the state of the protocol session. */
- private AMQStateManager _stateManager = new AMQStateManager();
+ private AMQStateManager _stateManager;
/** Holds the method listeners, */
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -166,7 +167,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
-
+ private AMQCodecFactory _codecFactory;
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -175,86 +184,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
- }
-
- /**
- * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
- * session, which filters the events handled by this handler. The filter chain consists of, handing off events
- * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
- *
- * @param session The MINA session.
- *
- * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
- */
- public void sessionCreated(IoSession session) throws Exception
- {
- _logger.debug("Protocol session created for session " + System.identityHashCode(session));
- _failoverHandler = new FailoverHandler(this, session);
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
-
- if (Boolean.getBoolean("amqj.shared_read_write_pool"))
- {
- session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- session.getFilterChain().addLast("protocolFilter", pcf);
- }
- // we only add the SSL filter where we have an SSL connection
- if (_connection.getSSLConfiguration() != null)
- {
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory =
- new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
- SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
- sslFilter.setUseClientMode(true);
- session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
- }
-
- try
- {
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- _logger.error(e.getMessage(), e);
- }
-
- if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
- {
- try
- {
- //Add IO Protection Filters
- IoFilterChain chain = session.getFilterChain();
-
- session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
- writefilter.attach(chain);
- session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
- _logger.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- _protocolSession = new AMQProtocolSession(this, session, _connection);
-
- _stateManager.setProtocolSession(_protocolSession);
-
- _protocolSession.init();
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager = new AMQStateManager(_protocolSession);
+ _codecFactory = new AMQCodecFactory(false, _protocolSession);
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
+ _failoverHandler = new FailoverHandler(this);
}
/**
@@ -283,12 +219,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* may be called first followed by this method. This depends on whether the client was trying to send data at the
* time of the failure.
*
- * @param session The MINA session.
- *
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
- public void sessionClosed(IoSession session)
+ public void closed()
{
if (_connection.isClosed())
{
@@ -327,7 +261,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_logger.debug("sessionClose() not allowed to failover");
_connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.", null));
+ "Server closed connection and reconnection " + "not permitted.",
+ _stateManager.getLastException()));
}
else
{
@@ -350,33 +285,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter
failoverThread.start();
}
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ @Override
+ public void readerIdle()
{
- _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- // write heartbeat frame:
- _logger.debug("Sent heartbeat");
- session.write(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- // failover:
- HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- session.close();
- }
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ // failover:
+ HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ _networkDriver.close();
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ writeFrame(HeartbeatBody.FRAME);
+ HeartbeatDiagnostics.sent();
}
/**
- * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
- * IOException, MINA will close the connection automatically.
- *
- * @param session The MINA session.
- * @param cause The exception that triggered this event.
+ * Invoked when any exception is thrown by the NetworkDriver
*/
- public void exceptionCaught(IoSession session, Throwable cause)
+ public void exception(Throwable cause)
{
if (_failoverState == FailoverState.NOT_STARTED)
{
@@ -384,9 +314,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attemp failover
-
- sessionClosed(session);
+ // this will attempt failover
+ _networkDriver.close();
+ closed();
}
else
{
@@ -490,48 +420,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static int _messageReceivedCount;
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
- if(message instanceof AMQFrame)
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ try
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
+ _readBytes += msg.remaining();
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- if (debug && ((msgNumber % 1000) == 0))
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ @Override
+ public void run()
+ {
+ // Decode buffer
- bodyFrame.handle(frame.getChannel(), _protocolSession);
+ for (AMQDataBlock message : dataBlocks)
+ {
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ try
+ {
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
+ if(message instanceof AMQFrame)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _logger.error("Exception processing frame", e);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
+ }
+ }
+ }
+ });
}
- else if (message instanceof ProtocolInitiation)
+ catch (Exception e)
{
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- ProtocolVersion pv = protocolInit.checkVersion();
- getConnection().setProtocolVersion(pv);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
}
}
- public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
throws AMQException
{
@@ -571,32 +537,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
propagateExceptionToFrameListeners(e);
- exceptionCaught(session, e);
+ exception(e);
}
}
private static int _messagesOut;
- public void messageSent(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
- }
-
- final long sentMessages = _messagesOut++;
-
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug && ((sentMessages % 1000) == 0))
- {
- _logger.debug("Sent " + _messagesOut + " protocol messages");
- }
-
- _connection.bytesSent(session.getWrittenBytes());
- }
-
public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
return getStateManager().createWaiter(states);
@@ -610,12 +557,41 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void writeFrame(AMQDataBlock frame)
{
- _protocolSession.writeFrame(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- _protocolSession.writeFrame(frame, wait);
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _writtenBytes += buf.remaining();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
+ }
+
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && ((sentMessages % 1000) == 0))
+ {
+ _logger.debug("Sent " + _messagesOut + " protocol messages");
+ }
+
+ _connection.bytesSent(_writtenBytes);
+
+ if (wait)
+ {
+ _networkDriver.flush();
+ }
}
/**
@@ -673,7 +649,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
// state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
}
- _protocolSession.writeFrame(frame);
+ writeFrame(frame);
return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
@@ -723,38 +699,36 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final AMQFrame frame = body.generateFrame(0);
//If the connection is already closed then don't do a syncWrite
- if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
- {
- _protocolSession.closeProtocolSession(false);
- }
- else
+ if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
+ _networkDriver.close();
+ closed();
}
catch (AMQTimeoutException e)
{
- _protocolSession.closeProtocolSession(false);
+ closed();
}
catch (FailoverException e)
{
_logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
}
}
+ _poolReference.releaseExecutorService();
}
/** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
- return _protocolSession.getIoSession().getReadBytes();
+ return _readBytes;
}
/** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
- return _protocolSession.getIoSession().getWrittenBytes();
+ return _writtenBytes;
}
public void failover(String host, int port)
@@ -807,6 +781,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _stateManager.setProtocolSession(_protocolSession);
}
public AMQProtocolSession getProtocolSession()
@@ -843,4 +818,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
return _protocolSession.getProtocolVersion();
}
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ public NetworkDriver getNetworkDriver()
+ {
+ return _networkDriver;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 0e872170aa..cd049c24a1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- protected final IoSession _minaProtocolSession;
-
- protected WriteFuture _lastWriteFuture;
-
/**
* The handler from which this session was created and which is used to handle protocol events. We send failover
* events to the handler.
@@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final AMQConnection _connection;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private ConnectionTuneParameters _connectionTuneParameters;
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- _minaProtocolSession.setAttachment(this);
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _protocolVersion = connection.getProtocolVersion();
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
+ private SaslClient _saslClient;
- }
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = null;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+ _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
@@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return getAMQConnection().getPassword();
}
- public IoSession getIoSession()
- {
- return _minaProtocolSession;
- }
-
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return _saslClient;
}
/**
@@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void setSaslClient(SaslClient client)
{
- if (client == null)
- {
- _minaProtocolSession.removeAttribute(SASL_CLIENT);
- }
- else
- {
- _minaProtocolSession.setAttribute(SASL_CLIENT, client);
- }
+ _saslClient = client;
}
public ConnectionTuneParameters getConnectionTuneParameters()
{
- return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+ return _connectionTuneParameters;
}
public void setConnectionTuneParameters(ConnectionTuneParameters params)
{
- _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+ _connectionTuneParameters = params;
AMQConnection con = getAMQConnection();
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
+ _protocolHandler.initHeartbeats((int) params.getHeartbeat());
}
/**
@@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void writeFrame(AMQDataBlock frame)
{
- writeFrame(frame, false);
+ _protocolHandler.writeFrame(frame);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f = _minaProtocolSession.write(frame);
- if (wait)
- {
- // fixme -- time out?
- f.join();
- }
- else
- {
- _lastWriteFuture = f;
- }
+ _protocolHandler.writeFrame(frame, wait);
}
/**
@@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQConnection getAMQConnection()
{
- return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+ return _connection;
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- _logger.debug("Waiting for last write to join.");
- if (waitLast && (_lastWriteFuture != null))
- {
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("Closing protocol session");
-
- final CloseFuture future = _minaProtocolSession.close();
-
- // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
- // then wait for the connection to close.
- // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
- // error now shouldn't matter.
-
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ _protocolHandler.closeConnection(0);
}
public void failover(String host, int port)
@@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
id = _queueId++;
}
// get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+ String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
{
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
}
public void notifyError(Exception error)
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java b/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
deleted file mode 100644
index 6f4a3a3b10..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.qpid.client.transport;
-
-import org.apache.qpid.thread.Threading;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-
-public class QpidThreadExecutor implements Executor
-{
- public void execute(Runnable command)
- {
- try
- {
- Threading.getThreadFactory().createThread(command).start();
- }
- catch(Exception e)
- {
- throw new RuntimeException("Error creating a thread using Qpid thread factory",e);
- }
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index b2f7ae8395..1ac8f62e32 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,27 +20,20 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
+import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
@@ -71,61 +64,27 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
- if (readWriteThreading)
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
- scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
- scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-
final InetSocketAddress address;
if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
{
address = null;
-
- Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
-
- if (socket != null)
- {
- _logger.info("Using existing Socket:" + socket);
-
- ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport:" + brokerDetail);
- }
}
else
{
address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
_logger.info("Attempting connection to " + address);
}
-
-
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
-
- // wait for connection to complete
- if (future.join(brokerDetail.getTimeout()))
- {
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- }
- else
+
+ SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
{
- throw new IOException("Timeout waiting for connection.");
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
}
+
+ MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
+ driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
+ protocolHandler.setNetworkDriver(driver);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 0bacda04ff..a4f8bb0166 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
@@ -30,16 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -61,7 +63,7 @@ public class TransportConnection
private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
- private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+ private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory";
private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
@@ -75,7 +77,7 @@ public class TransportConnection
return _openSocketRegister.remove(socketID);
}
- public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+ public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -91,7 +93,22 @@ public class TransportConnection
{
public IoConnector newSocketConnector()
{
- return new ExistingSocketConnector(1,new QpidThreadExecutor());
+ ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
+
+ Socket socket = TransportConnection.removeOpenSocket(details.getHost());
+
+ if (socket != null)
+ {
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) connector).setOpenSocket(socket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket://<SocketID>' transport:" + details);
+ }
+ return connector;
}
});
case TCP:
@@ -189,8 +206,6 @@ public class TransportConnection
_acceptor = new VmPipeAcceptor();
IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(ReadWriteThreadModel.getInstance());
}
synchronized (_inVmPipeAddress)
{
@@ -275,7 +290,10 @@ public class TransportConnection
{
Class[] cnstr = {Integer.class};
Object[] params = {port};
- provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+
+ provider = new MINANetworkDriver();
+ ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index dca6efba67..504d475740 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -20,25 +20,26 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class VmPipeTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);
private static int _port;
+ private MINANetworkDriver _networkDriver;
+
public VmPipeTransportConnection(int port)
{
_port = port;
@@ -47,16 +48,16 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new QpidVmPipeConnector();
- final IoServiceConfig cfg = ioConnector.getDefaultConfig();
-
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
+ _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler);
+ protocolHandler.setNetworkDriver(_networkDriver);
+ ConnectFuture future = ioConnector.connect(address, _networkDriver);
// wait for connection to complete
future.join();
// we call getSession which throws an IOException if there has been an error connecting
future.getSession();
+ _networkDriver.setProtocolEngine(protocolHandler);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index fc7f8148f0..f520a21ba0 100644
--- a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.TestNetworkDriver;
import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.state.AMQState;
@@ -72,9 +73,7 @@ public class AMQProtocolHandlerTest extends TestCase
{
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
-
- _handler.sessionCreated(new MockIoSession());
-
+ _handler.setNetworkDriver(new TestNetworkDriver());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);