summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-08-08 17:08:37 +0000
committerAidan Skinner <aidan@apache.org>2008-08-08 17:08:37 +0000
commit8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7 (patch)
treea30d0180600d2570d3bb02e3d16393de28f9f5ca /qpid/java/client/src
parentbd6c228a52e5085ac4e01e56ddcc0eb9d5fd5218 (diff)
downloadqpid-python-8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7.tar.gz
QPID-1218 Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@684016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java125
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java15
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java24
4 files changed, 171 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 1e65c50304..2ec8737d16 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -48,6 +48,7 @@ import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +89,16 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ // TODO: use system property thingy for this
+ if (System.getProperty("UseTransportIo", "false").equals("false"))
+ {
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ }
+ else
+ {
+ _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
+ }
+
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
new file mode 100644
index 0000000000..43ec7789c2
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
@@ -0,0 +1,125 @@
+package org.apache.qpid.client.protocol;
+
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.mina.common.IdleStatus;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.io.IoSender;
+
+public class AMQIoTransportProtocolSession extends AMQProtocolSession
+{
+
+ protected IoSender _ioSender;
+ private SaslClient _saslClient;
+ private ConnectionTuneParameters _connectionTuneParameters;
+
+ public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ super(protocolHandler, connection);
+ }
+
+ @Override
+ public void closeProtocolSession(boolean waitLast) throws AMQException
+ {
+ _ioSender.close();
+ _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+ }
+
+ @Override
+ public void init()
+ {
+ _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
+ _ioSender.flush();
+ }
+
+ @Override
+ protected AMQShortString generateQueueName()
+ {
+ int id;
+ synchronized (_queueIdLock)
+ {
+ id = _queueId++;
+ }
+ return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
+ }
+
+ @Override
+ public AMQConnection getAMQConnection()
+ {
+ return _connection;
+ }
+
+ @Override
+ public SaslClient getSaslClient()
+ {
+ return _saslClient;
+ }
+
+ @Override
+ public void setSaslClient(SaslClient client)
+ {
+ _saslClient = client;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ @Override
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ // FIXME: actually do something here
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ @Override
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ // FIXME?
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
+ }
+
+ @Override
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ _ioSender.send(frame.toNioByteBuffer());
+ if (wait)
+ {
+ _ioSender.flush();
+ }
+ }
+
+ @Override
+ public void setSender(IoSender sender)
+ {
+ _ioSender = sender;
+ }
+
+ @Override
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return _connectionTuneParameters;
+ }
+
+ @Override
+ public void setConnectionTuneParameters(ConnectionTuneParameters params)
+ {
+ _connectionTuneParameters = params;
+ AMQConnection con = getAMQConnection();
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+ initHeartbeats((int) params.getHeartbeat());
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 8328d87b87..e92817f713 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -47,11 +47,13 @@ import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -253,6 +255,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(), brokerDetail.getPort());
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 6beec3c9ba..6c3ae06ce9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -44,6 +44,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.network.io.IoSender;
import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
@@ -99,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ protected final AMQConnection _connection;
+
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -118,11 +120,20 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = null;
+ _protocolVersion = connection.getProtocolVersion();
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+ this);
+ _connection = connection;
+ }
+
public void init()
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
-
_minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
}
@@ -171,7 +182,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
}
/**
@@ -422,6 +433,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
_logger.debug("Closing protocol session");
+
final CloseFuture future = _minaProtocolSession.close();
// There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
@@ -430,7 +442,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
// error now shouldn't matter.
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-
future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
@@ -535,4 +546,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler.propagateExceptionToAllWaiters(error);
}
+
+ public void setSender(IoSender sender)
+ {
+ // No-op, interface munging
+ }
}