diff options
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 2 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java | 12 |
2 files changed, 9 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2389c9e2da..e3a1a82dc4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -191,7 +191,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Protocol session created for session " + System.identityHashCode(session)); _failoverHandler = new FailoverHandler(this, session); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession)); if (Boolean.getBoolean("amqj.shared_read_write_pool")) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 32cc8c4cb5..4ff24c3607 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -31,7 +31,10 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +65,7 @@ public class TransportConnection private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); - private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); @@ -190,8 +193,6 @@ public class TransportConnection _acceptor = new VmPipeAcceptor(); IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(ReadWriteThreadModel.getInstance()); } synchronized (_inVmPipeAddress) { @@ -276,7 +277,10 @@ public class TransportConnection { Class[] cnstr = {Integer.class}; Object[] params = {port}; - provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + + provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } |
