diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-08-08 17:08:37 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-08-08 17:08:37 +0000 |
| commit | 8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7 (patch) | |
| tree | a30d0180600d2570d3bb02e3d16393de28f9f5ca /qpid/java/client/src | |
| parent | bd6c228a52e5085ac4e01e56ddcc0eb9d5fd5218 (diff) | |
| download | qpid-python-8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7.tar.gz | |
QPID-1218 Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@684016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
4 files changed, 171 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 1e65c50304..2ec8737d16 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -48,6 +48,7 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +89,16 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + // TODO: use system property thingy for this + if (System.getProperty("UseTransportIo", "false").equals("false")) + { + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + } + else + { + _conn.getProtocolHandler().createIoTransportSession(brokerDetail); + } + // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java new file mode 100644 index 0000000000..43ec7789c2 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java @@ -0,0 +1,125 @@ +package org.apache.qpid.client.protocol; + +import java.util.UUID; + +import javax.security.sasl.SaslClient; + +import org.apache.commons.lang.StringUtils; +import org.apache.mina.common.IdleStatus; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.transport.network.io.IoSender; + +public class AMQIoTransportProtocolSession extends AMQProtocolSession +{ + + protected IoSender _ioSender; + private SaslClient _saslClient; + private ConnectionTuneParameters _connectionTuneParameters; + + public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + super(protocolHandler, connection); + } + + @Override + public void closeProtocolSession(boolean waitLast) throws AMQException + { + _ioSender.close(); + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); + } + + @Override + public void init() + { + _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer()); + _ioSender.flush(); + } + + @Override + protected AMQShortString generateQueueName() + { + int id; + synchronized (_queueIdLock) + { + id = _queueId++; + } + return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id); + } + + @Override + public AMQConnection getAMQConnection() + { + return _connection; + } + + @Override + public SaslClient getSaslClient() + { + return _saslClient; + } + + @Override + public void setSaslClient(SaslClient client) + { + _saslClient = client; + } + + /** @param delay delay in seconds (not ms) */ + @Override + void initHeartbeats(int delay) + { + if (delay > 0) + { + // FIXME: actually do something here + HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); + } + } + + @Override + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + // FIXME? + _protocolHandler.methodBodyReceived(channel, amqMethodBody, null); + } + + @Override + public void writeFrame(AMQDataBlock frame, boolean wait) + { + _ioSender.send(frame.toNioByteBuffer()); + if (wait) + { + _ioSender.flush(); + } + } + + @Override + public void setSender(IoSender sender) + { + _ioSender = sender; + } + + @Override + public ConnectionTuneParameters getConnectionTuneParameters() + { + return _connectionTuneParameters; + } + + @Override + public void setConnectionTuneParameters(ConnectionTuneParameters params) + { + _connectionTuneParameters = params; + AMQConnection con = getAMQConnection(); + con.setMaximumChannelCount(params.getChannelMax()); + con.setMaximumFrameSize(params.getFrameMax()); + initHeartbeats((int) params.getHeartbeat()); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 8328d87b87..e92817f713 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -47,11 +47,13 @@ import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -253,6 +255,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** + * Called when we want to create a new IoTransport session + * @param brokerDetail + */ + public void createIoTransportSession(BrokerDetails brokerDetail) + { + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager.setProtocolSession(_protocolSession); + IoTransport.connect_0_9(getProtocolSession(), + brokerDetail.getHost(), brokerDetail.getPort()); + _protocolSession.init(); + } + + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6beec3c9ba..6c3ae06ce9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -44,6 +44,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.network.io.IoSender; import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; /** @@ -99,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; + protected final AMQConnection _connection; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) @@ -118,11 +120,20 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } + public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + { + _protocolHandler = protocolHandler; + _minaProtocolSession = null; + _protocolVersion = connection.getProtocolVersion(); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), + this); + _connection = connection; + } + public void init() { // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); } @@ -171,7 +182,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public SaslClient getSaslClient() { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); + return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); } /** @@ -422,6 +433,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } _logger.debug("Closing protocol session"); + final CloseFuture future = _minaProtocolSession.close(); // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED @@ -430,7 +442,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession // error now shouldn't matter. _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); } @@ -535,4 +546,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler.propagateExceptionToAllWaiters(error); } + + public void setSender(IoSender sender) + { + // No-op, interface munging + } } |
