summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-12 15:41:46 +0000
committerKeith Wall <kwall@apache.org>2015-03-12 15:41:46 +0000
commit6e98063ae07795f988ad26fdcf49d204d88b39c3 (patch)
tree7fa009399d501d9ad3e9f77f735d85a2b75807cf /qpid/java/common
parentb66b4f357a756449c7e7184be4d963fb36f5b2d4 (diff)
parent49c02f9fcf8c2dd1b063c887f8948f840ec785c2 (diff)
downloadqpid-python-6e98063ae07795f988ad26fdcf49d204d88b39c3.tar.gz
QPID-6429, QPID-6262, QPID-5818: [Java Broker] Utilise NIO, service connections using a thread pool, AMQP model mutating actions should use task executors
Work of Rob Godfrey and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1666224 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java23
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java24
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java)15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java)13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java)12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java19
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java342
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java311
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java23
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java274
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java23
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java10
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java10
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java15
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java11
49 files changed, 775 insertions, 815 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index cb0c78ef37..6860b46546 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+
public interface AMQBody
{
public byte getFrameType();
@@ -39,4 +40,6 @@ public interface AMQBody
public void writePayload(DataOutput buffer) throws IOException;
void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
+
+ long writePayload(ByteBufferSender sender) throws IOException;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
index c234a5e829..8f804bf2d6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.transport.ByteBufferSender;
+
/**
* A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -44,4 +46,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock
*/
public abstract void writePayload(DataOutput buffer) throws IOException;
+ public abstract long writePayload(ByteBufferSender sender) throws IOException;
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 83397c37d8..5fcdfb901a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -57,6 +61,25 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
buffer.writeByte(FRAME_END_BYTE);
}
+ private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
+
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] frameHeader = new byte[7];
+ BytesDataOutput buffer = new BytesDataOutput(frameHeader);
+
+ buffer.writeByte(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, _channel);
+ EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+ sender.send(ByteBuffer.wrap(frameHeader));
+
+ long size = 8 + _bodyFrame.writePayload(sender);
+
+ sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY));
+ return size;
+ }
+
public final int getChannel()
{
return _channel;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index e40452edea..01deed67ed 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -24,12 +24,15 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -105,6 +108,16 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
writeMethodPayload(buffer);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ final int size = getSize();
+ byte[] bytes = new byte[size];
+ BytesDataOutput buffer = new BytesDataOutput(bytes);
+ writePayload(buffer);
+ sender.send(ByteBuffer.wrap(bytes));
+ return size;
+ }
protected int getSizeOf(AMQShortString string)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index ef0da9b918..6481c6ebdb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -23,10 +23,14 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
+
public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
@@ -314,6 +318,26 @@ public class BasicContentHeaderProperties
}
}
+
+ public long writePropertyListPayload(final ByteBufferSender sender) throws IOException
+ {
+ if(useEncodedForm())
+ {
+ sender.send(ByteBuffer.wrap(_encodedForm));
+ return _encodedForm.length;
+ }
+ else
+ {
+ int propertyListSize = getPropertyListSize();
+ byte[] data = new byte[propertyListSize];
+ BytesDataOutput out = new BytesDataOutput(data);
+ writePropertyListPayload(out);
+ sender.send(ByteBuffer.wrap(data));
+ return propertyListSize;
+ }
+
+ }
+
public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
{
_propertyFlags = propertyFlags;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
index 098e3652ad..819446021e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.transport.ByteBufferSender;
+
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -58,6 +60,17 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = 0l;
+ for (int i = 0; i < _blocks.length; i++)
+ {
+ size += _blocks[i].writePayload(sender);
+ }
+ return size;
+ }
+
public String toString()
{
if (_blocks == null)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index 5c322f3845..0f4ba5209b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
public class ContentBody implements AMQBody
{
@@ -72,6 +73,20 @@ public class ContentBody implements AMQBody
session.contentBodyReceived(channelId, this);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ if(_payload != null)
+ {
+ sender.send(ByteBuffer.wrap(_payload));
+ return _payload.length;
+ }
+ else
+ {
+ return 0l;
+ }
+ }
+
public byte[] getPayload()
{
return _payload;
@@ -133,6 +148,23 @@ public class ContentBody implements AMQBody
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ if(_buf.hasArray())
+ {
+ sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length));
+ }
+ else
+ {
+ ByteBuffer buf = _buf.duplicate();
+
+ buf.position(_offset);
+ buf.limit(_offset+_length);
+ sender.send(buf);
+ }
+ return _length;
+ }
public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 377d2e115c..21b8e6c8b6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -24,10 +24,13 @@ import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class ContentHeaderBody implements AMQBody
{
@@ -98,6 +101,19 @@ public class ContentHeaderBody implements AMQBody
_properties.writePropertyListPayload(buffer);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] data = new byte[14];
+ BytesDataOutput buffer = new BytesDataOutput(data);
+ EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
+ EncodingUtils.writeUnsignedShort(buffer, 0);
+ buffer.writeLong(_bodySize);
+ EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags());
+ sender.send(ByteBuffer.wrap(data));
+ return 14 + _properties.writePropertyListPayload(sender);
+ }
+
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
throws AMQException
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index b5f854eb0e..3afc082c89 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
public class HeartbeatBody implements AMQBody
{
@@ -61,6 +62,12 @@ public class HeartbeatBody implements AMQBody
{
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ return 0l;
+ }
+
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
throws AMQException
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index ed1935ca04..9c8d2a8578 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -23,11 +23,14 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -88,6 +91,16 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
buffer.write(_protocolMinor);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] data = new byte[8];
+ BytesDataOutput out = new BytesDataOutput(data);
+ writePayload(out);
+ sender.send(ByteBuffer.wrap(data));
+ return 8l;
+ }
+
public boolean equals(Object o)
{
if (!(o instanceof ProtocolInitiation))
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 0c643f6322..73c8653677 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
/**
@@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
- public void setSender(Sender<ByteBuffer> sender);
+ public void setSender(ByteBufferSender sender);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 6774d0a45a..f73f6d931a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -20,19 +20,18 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import java.net.SocketAddress;
+
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
*/
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
+public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
{
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -56,7 +55,8 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport
// Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
+ void encryptedTransport();
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
index 8418c42189..f703c01567 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
@@ -26,11 +26,11 @@ package org.apache.qpid.transport;
*
*/
-public interface Binding<E,T>
+public interface Binding<E>
{
- E endpoint(Sender<T> sender);
+ E endpoint(ByteBufferSender sender);
- Receiver<T> receiver(E endpoint);
+ ByteBufferReceiver receiver(E endpoint);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
index 5c6918e87d..1015f061c8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
@@ -18,16 +18,15 @@
* under the License.
*
*/
-package org.apache.qpid.protocol;
+package org.apache.qpid.transport;
-import javax.security.auth.Subject;
+import java.nio.ByteBuffer;
-public interface ServerProtocolEngine extends ProtocolEngine
+public interface ByteBufferReceiver
{
- /**
- * Gets the connection ID associated with this ProtocolEngine
- */
- long getConnectionId();
+ void received(ByteBuffer msg);
- Subject getSubject();
+ void exception(Throwable t);
+
+ void closed();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
index 6519702c76..7dcaf61a26 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
@@ -20,20 +20,13 @@
*/
package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
-/**
- * Sender
- *
- */
-
-public interface Sender<T>
+public interface ByteBufferSender
{
- void setIdleTimeout(int i);
-
- void send(T msg);
+ void send(ByteBuffer msg);
void flush();
void close();
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index f8eabef161..7c4e264ade 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -153,11 +153,9 @@ public class ClientDelegate extends ConnectionDelegate
maxFrameSize,
actualHeartbeatInterval);
- int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
- conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
//0 means no implied limit, except available server resources
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 331f96d6da..4ae7e8d47a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -27,7 +27,6 @@ import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings;
*/
public class Connection extends ConnectionInvoker
- implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
+ implements ProtocolEventReceiver, ProtocolEventSender
{
protected static final Logger log = Logger.get(Connection.class);
@@ -120,7 +119,7 @@ public class Connection extends ConnectionInvoker
private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
private ConnectionDelegate delegate;
- private Sender<ProtocolEvent> sender;
+ private ProtocolEventSender sender;
final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>();
@@ -163,15 +162,14 @@ public class Connection extends ConnectionInvoker
return Collections.unmodifiableList(listeners);
}
- public Sender<ProtocolEvent> getSender()
+ public ProtocolEventSender getSender()
{
return sender;
}
- public void setSender(Sender<ProtocolEvent> sender)
+ public void setSender(ProtocolEventSender sender)
{
this.sender = sender;
- sender.setIdleTimeout(idleTimeout);
}
protected void setState(State state)
@@ -248,7 +246,7 @@ public class Connection extends ConnectionInvoker
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
final InputHandler inputHandler = new InputHandler(new Assembler(this));
addFrameSizeObserver(inputHandler);
- Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
+ ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
@@ -260,7 +258,7 @@ public class Connection extends ConnectionInvoker
setRemoteAddress(_networkConnection.getRemoteAddress());
setLocalAddress(_networkConnection.getLocalAddress());
- final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
+ final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
if(secureSender instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureSender);
@@ -425,7 +423,7 @@ public class Connection extends ConnectionInvoker
{
log.debug("SEND: [%s] %s", this, event);
}
- Sender<ProtocolEvent> s = sender;
+ ProtocolEventSender s = sender;
if (s == null)
{
throw new ConnectionException("connection closed");
@@ -439,7 +437,7 @@ public class Connection extends ConnectionInvoker
{
log.debug("FLUSH: [%s]", this);
}
- final Sender<ProtocolEvent> theSender = sender;
+ final ProtocolEventSender theSender = sender;
if(theSender != null)
{
theSender.flush();
@@ -631,6 +629,12 @@ public class Connection extends ConnectionInvoker
close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
}
+
+ protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options)
+ {
+ connectionClose(replyCode, replyText, _options);
+ }
+
public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
synchronized (lock)
@@ -690,20 +694,6 @@ public class Connection extends ConnectionInvoker
}
}
- public void setIdleTimeout(int i)
- {
- idleTimeout = i;
- if (sender != null)
- {
- sender.setIdleTimeout(i);
- }
- }
-
- public int getIdleTimeout()
- {
- return idleTimeout;
- }
-
public String getUserID()
{
return userID;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
new file mode 100644
index 0000000000..8b9c3f4f83
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import org.apache.qpid.transport.network.NetworkEvent;
+
+public interface NetworkEventReceiver
+{
+ void received(NetworkEvent msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
index 2a994580dc..e4ab540ce9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
@@ -20,19 +20,11 @@
*/
package org.apache.qpid.transport;
-
-/**
- * Receiver
- *
- */
-
-public interface Receiver<T>
+public interface ProtocolEventReceiver
{
-
- void received(T msg);
+ void received(ProtocolEvent msg);
void exception(Throwable t);
void closed();
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
new file mode 100644
index 0000000000..418f31b42a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface ProtocolEventSender
+{
+ void send(ProtocolEvent msg);
+
+ void flush();
+
+ void close();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 82a677b8f7..f8fd286f17 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -126,8 +126,11 @@ public class ServerDelegate extends ConnectionDelegate
protected void connectionAuthFailed(final Connection conn, Exception e)
{
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ if (e != null)
+ {
+ conn.exception(e);
+ }
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage());
}
protected void connectionAuthContinue(final Connection conn, byte[] challenge)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index 2b93697bfc..070621db9b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import org.apache.qpid.transport.Xid;
import static org.apache.qpid.transport.util.Functions.lsb;
import java.io.UnsupportedEncodingException;
@@ -36,6 +30,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
+import org.apache.qpid.transport.Xid;
+
/**
* AbstractEncoder
@@ -43,7 +43,7 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-abstract class AbstractEncoder implements Encoder
+public abstract class AbstractEncoder implements Encoder
{
private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
index d9150bed65..407df71824 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
@@ -360,8 +360,4 @@ public final class BBEncoder extends AbstractEncoder
}
}
- public void writeMagicNumber()
- {
- out.put("AM2".getBytes());
- }
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
index a9eea13104..b5ab29cdcf 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
/**
* Encoder interface.
@@ -274,9 +275,10 @@ public interface Encoder
* @param bytes the bytes array to be encoded.
*/
void writeBin128(byte [] bytes);
-
- /**
- * Encodes the AMQP magic number.
- */
- void writeMagicNumber();
-} \ No newline at end of file
+
+ int position();
+
+ ByteBuffer underlyingBuffer();
+
+ void init();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index a80b988cea..a7e96167c2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,28 +20,29 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventReceiver;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBDecoder;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Assembler
*
*/
-public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+public class Assembler implements NetworkEventReceiver, NetworkDelegate
{
// Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
// array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
@@ -49,7 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
- private final Receiver<ProtocolEvent> receiver;
+ private final ProtocolEventReceiver receiver;
private final Map<Integer,List<Frame>> segments;
private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
{
@@ -59,7 +60,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
};
- public Assembler(Receiver<ProtocolEvent> receiver)
+ public Assembler(ProtocolEventReceiver receiver)
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 26e8f1850b..5463cd2587 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -38,10 +36,10 @@ import org.apache.qpid.transport.network.security.sasl.SASLSender;
*/
public abstract class ConnectionBinding
- implements Binding<Connection,ByteBuffer>
+ implements Binding<Connection>
{
- public static Binding<Connection,ByteBuffer> get(final Connection connection)
+ public static Binding<Connection> get(final Connection connection)
{
return new ConnectionBinding()
{
@@ -52,7 +50,7 @@ public abstract class ConnectionBinding
};
}
- public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
+ public static Binding<Connection> get(final ConnectionDelegate delegate)
{
return new ConnectionBinding()
{
@@ -69,7 +67,7 @@ public abstract class ConnectionBinding
public abstract Connection connection();
- public Connection endpoint(Sender<ByteBuffer> sender)
+ public Connection endpoint(ByteBufferSender sender)
{
Connection conn = connection();
@@ -87,7 +85,7 @@ public abstract class ConnectionBinding
return conn;
}
- public Receiver<ByteBuffer> receiver(Connection conn)
+ public ByteBufferReceiver receiver(Connection conn)
{
final InputHandler inputHandler = new InputHandler(new Assembler(conn));
conn.addFrameSizeObserver(inputHandler);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index a804cb2f9d..c45b2049a1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -30,27 +30,29 @@ import static org.apache.qpid.transport.network.Frame.LAST_SEG;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
/**
* Disassembler
*/
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
{
- private final Sender<ByteBuffer> sender;
+ private final ByteBufferSender sender;
private int maxPayload;
private final Object sendlock = new Object();
- private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+ private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>()
{
public BBEncoder initialValue()
{
@@ -58,7 +60,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
}
};
- public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+ public Disassembler(ByteBufferSender sender, int maxFrame)
{
this.sender = sender;
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
@@ -174,7 +176,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
private void method(Method method, SegmentType type)
{
- BBEncoder enc = _encoder.get();
+ Encoder enc = _encoder.get();
enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
@@ -251,11 +253,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
throw new IllegalArgumentException(String.valueOf(error));
}
- public void setIdleTimeout(int i)
- {
- sender.setIdleTimeout(i);
- }
-
@Override
public void setMaxFrameSize(final int maxFrame)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
index e0cd9cac1a..f378c54026 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.transport.network;
+import java.util.Set;
+
import javax.net.ssl.SSLContext;
import org.apache.qpid.protocol.ProtocolEngineFactory;
@@ -29,7 +31,8 @@ public interface IncomingNetworkTransport extends NetworkTransport
{
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
- SSLContext sslContext);
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet);
public int getAcceptingPort();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index 758c2e1eda..a58bed5877 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -29,11 +29,12 @@ import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.SegmentType;
@@ -43,7 +44,7 @@ import org.apache.qpid.transport.SegmentType;
* @author Rafael H. Schloming
*/
-public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
+public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
{
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
@@ -56,7 +57,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
ERROR
}
- private final Receiver<NetworkEvent> receiver;
+ private final NetworkEventReceiver receiver;
private State state;
private ByteBuffer input = null;
private int needed;
@@ -66,7 +67,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
private byte track;
private int channel;
- public InputHandler(Receiver<NetworkEvent> receiver, State state)
+ public InputHandler(NetworkEventReceiver receiver, State state)
{
this.receiver = receiver;
this.state = state;
@@ -82,7 +83,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
}
}
- public InputHandler(Receiver<NetworkEvent> receiver)
+ public InputHandler(NetworkEventReceiver receiver)
{
this(receiver, PROTO_HDR);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 2810e7a9e1..bef266f214 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -21,13 +21,13 @@
package org.apache.qpid.transport.network;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
-import org.apache.qpid.transport.Sender;
+
+import org.apache.qpid.transport.ByteBufferSender;
public interface NetworkConnection
{
- Sender<ByteBuffer> getSender();
+ ByteBufferSender getSender();
void start();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
index 45231aa05d..f2735f1800 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.transport.network;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-
-import java.nio.ByteBuffer;
public interface OutgoingNetworkTransport extends NetworkTransport
{
public NetworkConnection getConnection();
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity);
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
new file mode 100644
index 0000000000..b3f1f1c7dd
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+public enum TransportEncryption
+{
+ NONE, TLS
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
new file mode 100644
index 0000000000..8d19c5a2ce
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -0,0 +1,342 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+// TODO we are no longer using the IncomingNetworkTransport
+public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+{
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private Socket _socket;
+ private NetworkConnection _connection;
+ private AcceptingThread _acceptor;
+
+ public NetworkConnection connect(ConnectionSettings settings,
+ ByteBufferReceiver delegate,
+ TransportActivity transportActivity)
+ {
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
+ try
+ {
+ _socket = new Socket();
+ _socket.setReuseAddress(true);
+ _socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket.setSendBufferSize(sendBufferSize);
+ _socket.setReceiveBufferSize(receiveBufferSize);
+
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
+ LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+ }
+
+ InetAddress address = InetAddress.getByName(settings.getHost());
+
+ _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
+ }
+ catch (SocketException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+
+ try
+ {
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ ticker.setConnection(_connection);
+ _connection.start();
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
+
+ throw new TransportException("Error creating network connection", e);
+ }
+
+ return _connection;
+ }
+
+ public void close()
+ {
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
+ }
+
+ public NetworkConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext, final Set<TransportEncryption> encryptionSet)
+ {
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext);
+ _acceptor.setDaemon(false);
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+ }
+
+ public int getAcceptingPort()
+ {
+ return _acceptor == null ? -1 : _acceptor.getPort();
+ }
+
+ protected abstract NetworkConnection createNetworkConnection(Socket socket,
+ ByteBufferReceiver engine,
+ Integer sendBufferSize,
+ Integer receiveBufferSize,
+ int timeout,
+ IdleTimeoutTicker ticker);
+
+ private class AcceptingThread extends Thread
+ {
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocket _serverSocket;
+ private int _timeout;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext) throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ if(sslContext == null)
+ {
+ _serverSocket = new ServerSocket();
+ }
+ else
+ {
+ SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
+ _serverSocket = socketFactory.createServerSocket();
+
+ SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
+
+ SSLUtil.removeSSLv3Support(sslServerSocket);
+
+ if(config.needClientAuth())
+ {
+ sslServerSocket.setNeedClientAuth(true);
+ }
+ else if(config.wantClientAuth())
+ {
+ sslServerSocket.setWantClientAuth(true);
+ }
+
+ }
+
+ _serverSocket.setReuseAddress(true);
+ _serverSocket.bind(address);
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ LOGGER.debug("Shutting down the Acceptor");
+ _closed = true;
+
+ if (!_serverSocket.isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ private int getPort()
+ {
+ return _serverSocket.getLocalPort();
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (!_closed)
+ {
+ Socket socket = null;
+ try
+ {
+ socket = _serverSocket.accept();
+
+ ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socket.setTcpNoDelay(_config.getTcpNoDelay());
+ socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NetworkConnection connection =
+ createNetworkConnection(socket,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker);
+
+ connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+
+ connection.start();
+ }
+ else
+ {
+ socket.close();
+ }
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket);
+ }
+ catch(IOException e)
+ {
+ if(!_closed)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket);
+ try
+ {
+ //Delay to avoid tight spinning the loop during issues such as too many open files
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.debug("Stopping acceptor due to interrupt request");
+ _closed = true;
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+ + _config.getAddress());
+ }
+ }
+ }
+
+ private void closeSocketIfNecessary(final Socket socket)
+ {
+ if(socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Exception while closing socket", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
index 54a2a360bb..71704fca3a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
@@ -25,7 +25,7 @@ import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportActivity;
-class IdleTimeoutTicker implements Ticker
+public class IdleTimeoutTicker implements Ticker
{
private final TransportActivity _transport;
private final int _defaultTimeout;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 5c3124c2ec..5008849ef3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io;
import java.net.Socket;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -31,8 +30,8 @@ import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
@@ -49,7 +48,7 @@ public class IoNetworkConnection implements NetworkConnection
private boolean _principalChecked;
private final Object _lock = new Object();
- public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate,
int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
{
_socket = socket;
@@ -70,7 +69,7 @@ public class IoNetworkConnection implements NetworkConnection
_ioReceiver.initiate();
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _ioSender;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index b7998ab8d9..ccab1d93cf 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -20,313 +20,24 @@
*/
package org.apache.qpid.transport.network.io;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport extends AbstractNetworkTransport
{
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
- private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
- CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
- CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-
-
- private Socket _socket;
- private IoNetworkConnection _connection;
- private AcceptingThread _acceptor;
-
- public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
- TransportActivity transportActivity)
- {
- int sendBufferSize = settings.getWriteBufferSize();
- int receiveBufferSize = settings.getReadBufferSize();
-
- try
- {
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(settings.isTcpNodelay());
- _socket.setSendBufferSize(sendBufferSize);
- _socket.setReceiveBufferSize(receiveBufferSize);
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
- LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
- }
-
- InetAddress address = InetAddress.getByName(settings.getHost());
-
- _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
- }
- catch (SocketException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
-
- try
- {
- IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
- ticker.setConnection(_connection);
- _connection.start();
- }
- catch(Exception e)
- {
- try
- {
- _socket.close();
- }
- catch(IOException ioe)
- {
- //ignored, throw based on original exception
- }
-
- throw new TransportException("Error creating network connection", e);
- }
-
- return _connection;
- }
-
- public void close()
- {
- if(_connection != null)
- {
- _connection.close();
- }
- if(_acceptor != null)
- {
- _acceptor.close();
- }
- }
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext)
- {
- try
- {
- _acceptor = new AcceptingThread(config, factory, sslContext);
- _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
- _acceptor.setDaemon(false);
- _acceptor.start();
- }
- catch (IOException e)
- {
- throw new TransportException("Failed to start AMQP on port : " + config, e);
- }
- }
-
- public int getAcceptingPort()
- {
- return _acceptor == null ? -1 : _acceptor.getPort();
- }
- private class AcceptingThread extends Thread
+ @Override
+ protected IoNetworkConnection createNetworkConnection(final Socket socket,
+ final ByteBufferReceiver engine,
+ final Integer sendBufferSize,
+ final Integer receiveBufferSize,
+ final int timeout,
+ final IdleTimeoutTicker ticker)
{
- private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocket _serverSocket;
- private int _timeout;
-
- private AcceptingThread(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext) throws IOException
- {
- _config = config;
- _factory = factory;
- _sslContext = sslContext;
- _timeout = TIMEOUT;
-
- InetSocketAddress address = config.getAddress();
-
- if(sslContext == null)
- {
- _serverSocket = new ServerSocket();
- }
- else
- {
- SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
- _serverSocket = socketFactory.createServerSocket();
-
- SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
- SSLUtil.removeSSLv3Support(sslServerSocket);
- SSLUtil.updateEnabledCipherSuites(sslServerSocket, config.getEnabledCipherSuites(), config.getDisabledCipherSuites());
-
- if(config.needClientAuth())
- {
- sslServerSocket.setNeedClientAuth(true);
- }
- else if(config.wantClientAuth())
- {
- sslServerSocket.setWantClientAuth(true);
- }
-
- }
-
- _serverSocket.setReuseAddress(true);
- _serverSocket.bind(address);
- }
-
-
- /**
- Close the underlying ServerSocket if it has not already been closed.
- */
- public void close()
- {
- LOGGER.debug("Shutting down the Acceptor");
- _closed = true;
-
- if (!_serverSocket.isClosed())
- {
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
- }
- }
-
- private int getPort()
- {
- return _serverSocket.getLocalPort();
- }
-
- @Override
- public void run()
- {
- try
- {
- while (!_closed)
- {
- Socket socket = null;
- try
- {
- socket = _serverSocket.accept();
-
- ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
-
- if(engine != null)
- {
- socket.setTcpNoDelay(_config.getTcpNoDelay());
- socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
- NetworkConnection connection =
- new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
- ticker);
-
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
-
- ticker.setConnection(connection);
-
- engine.setNetworkConnection(connection, connection.getSender());
-
- connection.start();
- }
- else
- {
- socket.close();
- }
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- }
- catch(IOException e)
- {
- if(!_closed)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- try
- {
- //Delay to avoid tight spinning the loop during issues such as too many open files
- Thread.sleep(1000);
- }
- catch (InterruptedException ie)
- {
- LOGGER.debug("Stopping acceptor due to interrupt request");
- _closed = true;
- }
- }
- }
- }
- }
- finally
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
- }
- }
- }
-
- private void closeSocketIfNecessary(final Socket socket)
- {
- if(socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- LOGGER.debug("Exception while closing socket", e);
- }
- }
- }
-
+ return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout,
+ ticker);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index b52b59aa15..790583e92b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLSocket;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
@@ -47,7 +47,7 @@ final class IoReceiver implements Runnable
private static final Logger log = Logger.get(IoReceiver.class);
- private final Receiver<ByteBuffer> receiver;
+ private final ByteBufferReceiver receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
@@ -61,7 +61,7 @@ final class IoReceiver implements Runnable
shutdownBroken = SystemUtils.isWindows();
}
- public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+ public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout)
{
this.receiver = receiver;
this.bufferSize = bufferSize;
@@ -78,7 +78,7 @@ final class IoReceiver implements Runnable
throw new RuntimeException("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
- receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress()));
}
public void initiate()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 25222e5285..cd01cddb05 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLSocket;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -37,7 +37,7 @@ import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.util.SystemUtils;
-public final class IoSender implements Runnable, Sender<ByteBuffer>
+public final class IoSender implements Runnable, ByteBufferSender
{
private static final Logger log = Logger.get(IoSender.class);
@@ -97,7 +97,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
senderThread.setDaemon(true);
- senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
+ senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress));
}
public void initiate()
@@ -337,18 +337,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
- public void setIdleTimeout(int i)
- {
- try
- {
- socket.setSoTimeout(i);
- }
- catch (Exception e)
- {
- throw new SenderException(e);
- }
- }
-
public void setReceiver(IoReceiver receiver)
{
_receiver = receiver;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 51ef266ee9..271135f411 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.transport.network.security;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
public interface SecurityLayer
{
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
+ public ByteBufferSender sender(ByteBufferSender delegate);
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate);
public String getUserID();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
index 2a2f3d8362..d25e97ffe4 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.transport.network.security;
-import java.nio.ByteBuffer;
-
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -110,14 +108,14 @@ public class SecurityLayerFactory
}
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ public ByteBufferSender sender(ByteBufferSender delegate)
{
SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus);
sender.setHostname(_hostname);
return sender;
}
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
{
SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus);
receiver.setHostname(_hostname);
@@ -141,13 +139,13 @@ public class SecurityLayerFactory
_layer = layer;
}
- public SASLSender sender(Sender<ByteBuffer> delegate)
+ public SASLSender sender(ByteBufferSender delegate)
{
SASLSender sender = new SASLSender(_layer.sender(delegate));
return sender;
}
- public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+ public SASLReceiver receiver(ByteBufferReceiver delegate)
{
SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate));
return receiver;
@@ -169,12 +167,12 @@ public class SecurityLayerFactory
{
}
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ public ByteBufferSender sender(ByteBufferSender delegate)
{
return delegate;
}
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
{
return delegate;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
index 59e9453454..983e3bdf90 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
@@ -21,20 +21,21 @@
package org.apache.qpid.transport.network.security.sasl;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
import javax.security.sasl.SaslException;
-import java.nio.ByteBuffer;
-public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> {
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver {
- private Receiver<ByteBuffer> delegate;
+ private ByteBufferReceiver delegate;
private byte[] netData;
private static final Logger log = Logger.get(SASLReceiver.class);
- public SASLReceiver(Receiver<ByteBuffer> delegate)
+ public SASLReceiver(ByteBufferReceiver delegate)
{
this.delegate = delegate;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index 098f2fb20c..335f8992ca 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -21,22 +21,24 @@
package org.apache.qpid.transport.network.security.sasl;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
-
-import javax.security.sasl.SaslException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLSender extends SASLEncryptor implements ByteBufferSender
+{
- private Sender<ByteBuffer> delegate;
+ private ByteBufferSender delegate;
private byte[] appData;
private final AtomicBoolean closed = new AtomicBoolean(false);
private static final Logger log = Logger.get(SASLSender.class);
- public SASLSender(Sender<ByteBuffer> delegate)
+ public SASLSender(ByteBufferSender delegate)
{
this.delegate = delegate;
log.debug("SASL Sender enabled");
@@ -103,11 +105,6 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
}
}
- public void setIdleTimeout(int i)
- {
- delegate.setIdleTimeout(i);
- }
-
public void securityLayerEstablished()
{
appData = new byte[getSendBuffSize()];
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
index 24f95d7798..e69de29bb2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.security.ssl;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
-
-public class SSLBufferingSender implements Sender<ByteBuffer>
-{
- private static final Logger log = Logger.get(SSLBufferingSender.class);
- private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
-
- private final Sender<ByteBuffer> delegate;
- private final SSLEngine engine;
- private final int sslBufSize;
- private final ByteBuffer netData;
- private final SSLStatus _sslStatus;
-
- private String _hostname;
-
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
-
-
- public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
- {
- this.engine = engine;
- this.delegate = delegate;
- sslBufSize = engine.getSession().getPacketBufferSize();
- netData = ByteBuffer.allocate(sslBufSize);
- _sslStatus = sslStatus;
- }
-
- public void setHostname(String hostname)
- {
- _hostname = hostname;
- }
-
- public void close()
- {
- if (!closed.getAndSet(true))
- {
- if (engine.isOutboundDone())
- {
- return;
- }
- log.debug("Closing SSL connection");
- doSend();
- engine.closeOutbound();
- try
- {
- tearDownSSLConnection();
- }
- catch(Exception e)
- {
- throw new SenderException("Error closing SSL connection",e);
- }
-
-
- synchronized(_sslStatus.getSslLock())
- {
- while (!engine.isOutboundDone())
- {
- try
- {
- _sslStatus.getSslLock().wait();
- }
- catch(InterruptedException e)
- {
- // pass
- }
-
- }
- }
- delegate.close();
- }
- }
-
- private void tearDownSSLConnection() throws Exception
- {
- SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
- Status status = result.getStatus();
- int read = result.bytesProduced();
- while (status != Status.CLOSED)
- {
- if (status == Status.BUFFER_OVERFLOW)
- {
- netData.clear();
- }
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- delegate.send(data);
- flush();
- }
- result = engine.wrap(ByteBuffer.allocate(0), netData);
- status = result.getStatus();
- read = result.bytesProduced();
- }
- }
-
- public void flush()
- {
- delegate.flush();
- }
-
- public void send()
- {
- if(!closed.get())
- {
- doSend();
- }
- }
-
- public synchronized void send(ByteBuffer appData)
- {
- boolean buffered;
- if(buffered = _appData.hasRemaining())
- {
- ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
- newBuf.put(_appData);
- newBuf.put(appData);
- newBuf.flip();
- _appData = newBuf;
- }
- if (closed.get())
- {
- throw new SenderException("SSL Sender is closed");
- }
- doSend();
- if(!appData.hasRemaining())
- {
- _appData = EMPTY_BYTE_BUFFER;
- }
- else if(!buffered)
- {
- _appData = ByteBuffer.allocate(appData.remaining());
- _appData.put(appData);
- _appData.flip();
- }
- }
-
- private synchronized void doSend()
- {
-
- HandshakeStatus handshakeStatus;
- Status status;
-
- while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
- && !_sslStatus.getSslErrorFlag())
- {
- int read = 0;
- try
- {
- SSLEngineResult result = engine.wrap(_appData, netData);
- read = result.bytesProduced();
- status = result.getStatus();
- handshakeStatus = result.getHandshakeStatus();
- }
- catch(SSLException e)
- {
- // Should this set _sslError??
- throw new SenderException("SSL, Error occurred while encrypting data",e);
- }
-
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- delegate.send(data);
- }
-
- switch(status)
- {
- case CLOSED:
- throw new SenderException("SSLEngine is closed");
-
- case BUFFER_OVERFLOW:
- netData.clear();
- continue;
-
- case OK:
- break; // do nothing
-
- default:
- throw new IllegalStateException("SSLReceiver: Invalid State " + status);
- }
-
- switch (handshakeStatus)
- {
- case NEED_WRAP:
- if (netData.hasRemaining())
- {
- continue;
- }
-
- case NEED_TASK:
- doTasks();
- break;
-
- case NEED_UNWRAP:
- flush();
- return;
-
- case FINISHED:
- if (_hostname != null)
- {
- SSLUtil.verifyHostname(engine, _hostname);
- }
-
- case NOT_HANDSHAKING:
- break; //do nothing
-
- default:
- throw new IllegalStateException("SSLSender: Invalid State " + status);
- }
-
- }
- }
-
- private void doTasks()
- {
- Runnable runnable;
- while ((runnable = engine.getDelegatedTask()) != null) {
- runnable.run();
- }
- }
-
- public void setIdleTimeout(int i)
- {
- delegate.setIdleTimeout(i);
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index 8e1395aa83..49e4ad631a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -28,16 +28,16 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
-public class SSLReceiver implements Receiver<ByteBuffer>
+public class SSLReceiver implements ByteBufferReceiver
{
private static final Logger log = Logger.get(SSLReceiver.class);
- private final Receiver<ByteBuffer> delegate;
+ private final ByteBufferReceiver delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer localBuffer;
@@ -47,7 +47,7 @@ public class SSLReceiver implements Receiver<ByteBuffer>
private String _hostname;
- public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus)
+ public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 53bd7e49b7..3d133cb9b7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -19,24 +19,25 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-public class SSLSender implements Sender<ByteBuffer>
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLSender implements ByteBufferSender
{
private static final Logger log = Logger.get(SSLSender.class);
- private final Sender<ByteBuffer> delegate;
+ private final ByteBufferSender delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer netData;
@@ -48,7 +49,7 @@ public class SSLSender implements Sender<ByteBuffer>
private final AtomicBoolean closed = new AtomicBoolean(false);
- public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
+ public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
@@ -264,8 +265,4 @@ public class SSLSender implements Sender<ByteBuffer>
}
}
- public void setIdleTimeout(int i)
- {
- delegate.setIdleTimeout(i);
- }
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
index 3071594be7..3da2a03f42 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
@@ -21,16 +21,16 @@
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
+import java.util.Set;
import javax.net.ssl.SSLContext;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.io.IoNetworkTransport;
@@ -129,7 +129,7 @@ public class TransportTest extends QpidTestCase
}
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity)
{
throw new UnsupportedOperationException();
@@ -150,7 +150,9 @@ public class TransportTest extends QpidTestCase
}
public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory, SSLContext sslContext)
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
{
throw new UnsupportedOperationException();
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
index a445cff0a7..69724438ec 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
@@ -21,14 +21,12 @@
package org.apache.qpid.transport.network.io;
-import junit.framework.TestCase;
-
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.Sender;
+import junit.framework.TestCase;
+
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
@@ -193,7 +191,7 @@ public class IdleTimeoutTickerTest extends TestCase implements TransportActivity
//-------------------------------------------------------------------------
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return null;
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
index bb864cd434..67d360fa9e 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
@@ -20,16 +20,15 @@
*/
package org.apache.qpid.transport.network.io;
-import org.apache.log4j.Logger;
-import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.TransportException;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.transport.Binding;
/**
@@ -44,9 +43,9 @@ public class IoAcceptor<E> extends Thread
private volatile boolean _closed = false;
private ServerSocket socket;
- private Binding<E,ByteBuffer> binding;
+ private Binding<E> binding;
- public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding)
+ public IoAcceptor(SocketAddress address, Binding<E> binding)
throws IOException
{
socket = new ServerSocket();
@@ -70,7 +69,7 @@ public class IoAcceptor<E> extends Thread
}
}
- public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding)
+ public IoAcceptor(String host, int port, Binding<E> binding)
throws IOException
{
this(new InetSocketAddress(host, port), binding);
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
index f74051aa32..4b5b4448ee 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -20,10 +20,9 @@
package org.apache.qpid.transport.network.io;
import java.net.Socket;
-import java.nio.ByteBuffer;
import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.util.Logger;
/**
@@ -48,18 +47,18 @@ public final class IoTransport<E>
("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
private Socket socket;
- private Sender<ByteBuffer> sender;
+ private ByteBufferSender sender;
private E endpoint;
private IoReceiver receiver;
private long timeout = 60000;
- IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+ IoTransport(Socket socket, Binding<E> binding)
{
this.socket = socket;
setupTransport(socket, binding);
}
- private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding)
+ private void setupTransport(Socket socket, Binding<E> binding)
{
IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
ios.initiate();
@@ -73,7 +72,7 @@ public final class IoTransport<E>
ios.setReceiver(this.receiver);
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return sender;
}