diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-12 15:41:46 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-12 15:41:46 +0000 |
| commit | 6e98063ae07795f988ad26fdcf49d204d88b39c3 (patch) | |
| tree | 7fa009399d501d9ad3e9f77f735d85a2b75807cf /qpid/java/common | |
| parent | b66b4f357a756449c7e7184be4d963fb36f5b2d4 (diff) | |
| parent | 49c02f9fcf8c2dd1b063c887f8948f840ec785c2 (diff) | |
| download | qpid-python-6e98063ae07795f988ad26fdcf49d204d88b39c3.tar.gz | |
QPID-6429, QPID-6262, QPID-5818: [Java Broker] Utilise NIO, service connections using a thread pool, AMQP model mutating actions should use task executors
Work of Rob Godfrey and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1666224 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
49 files changed, 775 insertions, 815 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index cb0c78ef37..6860b46546 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; + public interface AMQBody { public byte getFrameType(); @@ -39,4 +40,6 @@ public interface AMQBody public void writePayload(DataOutput buffer) throws IOException; void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException; + + long writePayload(ByteBufferSender sender) throws IOException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index c234a5e829..8f804bf2d6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -23,6 +23,8 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.transport.ByteBufferSender; + /** * A data block represents something that has a size in bytes and the ability to write itself to a byte @@ -44,4 +46,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock */ public abstract void writePayload(DataOutput buffer) throws IOException; + public abstract long writePayload(ByteBufferSender sender) throws IOException; + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 83397c37d8..5fcdfb901a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -22,6 +22,10 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { @@ -57,6 +61,25 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock buffer.writeByte(FRAME_END_BYTE); } + private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE }; + + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + byte[] frameHeader = new byte[7]; + BytesDataOutput buffer = new BytesDataOutput(frameHeader); + + buffer.writeByte(_bodyFrame.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, _channel); + EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); + sender.send(ByteBuffer.wrap(frameHeader)); + + long size = 8 + _bodyFrame.writePayload(sender); + + sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY)); + return size; + } + public final int getChannel() { return _channel; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index e40452edea..01deed67ed 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -24,12 +24,15 @@ package org.apache.qpid.framing; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public abstract class AMQMethodBodyImpl implements AMQMethodBody { @@ -105,6 +108,16 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody writeMethodPayload(buffer); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + final int size = getSize(); + byte[] bytes = new byte[size]; + BytesDataOutput buffer = new BytesDataOutput(bytes); + writePayload(buffer); + sender.send(ByteBuffer.wrap(bytes)); + return size; + } protected int getSizeOf(AMQShortString string) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index ef0da9b918..6481c6ebdb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -23,10 +23,14 @@ package org.apache.qpid.framing; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; + public class BasicContentHeaderProperties { //persistent & non-persistent constants, values as per JMS DeliveryMode @@ -314,6 +318,26 @@ public class BasicContentHeaderProperties } } + + public long writePropertyListPayload(final ByteBufferSender sender) throws IOException + { + if(useEncodedForm()) + { + sender.send(ByteBuffer.wrap(_encodedForm)); + return _encodedForm.length; + } + else + { + int propertyListSize = getPropertyListSize(); + byte[] data = new byte[propertyListSize]; + BytesDataOutput out = new BytesDataOutput(data); + writePropertyListPayload(out); + sender.send(ByteBuffer.wrap(data)); + return propertyListSize; + } + + } + public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException { _propertyFlags = propertyFlags; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 098e3652ad..819446021e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -23,6 +23,8 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.transport.ByteBufferSender; + public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { @@ -58,6 +60,17 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD } } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = 0l; + for (int i = 0; i < _blocks.length; i++) + { + size += _blocks[i].writePayload(sender); + } + return size; + } + public String toString() { if (_blocks == null) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 5c322f3845..0f4ba5209b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; public class ContentBody implements AMQBody { @@ -72,6 +73,20 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + if(_payload != null) + { + sender.send(ByteBuffer.wrap(_payload)); + return _payload.length; + } + else + { + return 0l; + } + } + public byte[] getPayload() { return _payload; @@ -133,6 +148,23 @@ public class ContentBody implements AMQBody } } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + if(_buf.hasArray()) + { + sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length)); + } + else + { + ByteBuffer buf = _buf.duplicate(); + + buf.position(_offset); + buf.limit(_offset+_length); + sender.send(buf); + } + return _length; + } public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 377d2e115c..21b8e6c8b6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -24,10 +24,13 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public class ContentHeaderBody implements AMQBody { @@ -98,6 +101,19 @@ public class ContentHeaderBody implements AMQBody _properties.writePropertyListPayload(buffer); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + byte[] data = new byte[14]; + BytesDataOutput buffer = new BytesDataOutput(data); + EncodingUtils.writeUnsignedShort(buffer, CLASS_ID); + EncodingUtils.writeUnsignedShort(buffer, 0); + buffer.writeLong(_bodySize); + EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags()); + sender.send(ByteBuffer.wrap(data)); + return 14 + _properties.writePropertyListPayload(sender); + } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index b5f854eb0e..3afc082c89 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -27,6 +27,7 @@ import java.io.IOException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; public class HeartbeatBody implements AMQBody { @@ -61,6 +62,12 @@ public class HeartbeatBody implements AMQBody { } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + return 0l; + } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index ed1935ca04..9c8d2a8578 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -23,11 +23,14 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -88,6 +91,16 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData buffer.write(_protocolMinor); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + byte[] data = new byte[8]; + BytesDataOutput out = new BytesDataOutput(data); + writePayload(out); + sender.send(ByteBuffer.wrap(data)); + return 8l; + } + public boolean equals(Object o) { if (!(o instanceof ProtocolInitiation)) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 0c643f6322..73c8653677 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.transport.Sender; - -import java.nio.ByteBuffer; +import org.apache.qpid.transport.ByteBufferSender; /** @@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; - public void setSender(Sender<ByteBuffer> sender); + public void setSender(ByteBufferSender sender); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 6774d0a45a..f73f6d931a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -20,19 +20,18 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; +import java.net.SocketAddress; + +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity +public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity { // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); @@ -56,7 +55,8 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) void readerIdle(); + void encryptedTransport(); - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java index 8418c42189..f703c01567 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java @@ -26,11 +26,11 @@ package org.apache.qpid.transport; * */ -public interface Binding<E,T> +public interface Binding<E> { - E endpoint(Sender<T> sender); + E endpoint(ByteBufferSender sender); - Receiver<T> receiver(E endpoint); + ByteBufferReceiver receiver(E endpoint); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java index 5c6918e87d..1015f061c8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java @@ -18,16 +18,15 @@ * under the License. * */ -package org.apache.qpid.protocol; +package org.apache.qpid.transport; -import javax.security.auth.Subject; +import java.nio.ByteBuffer; -public interface ServerProtocolEngine extends ProtocolEngine +public interface ByteBufferReceiver { - /** - * Gets the connection ID associated with this ProtocolEngine - */ - long getConnectionId(); + void received(ByteBuffer msg); - Subject getSubject(); + void exception(Throwable t); + + void closed(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java index 6519702c76..7dcaf61a26 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java @@ -20,20 +20,13 @@ */ package org.apache.qpid.transport; +import java.nio.ByteBuffer; -/** - * Sender - * - */ - -public interface Sender<T> +public interface ByteBufferSender { - void setIdleTimeout(int i); - - void send(T msg); + void send(ByteBuffer msg); void flush(); void close(); - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index f8eabef161..7c4e264ade 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -153,11 +153,9 @@ public class ClientDelegate extends ConnectionDelegate maxFrameSize, actualHeartbeatInterval); - int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize); - conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); //0 means no implied limit, except available server resources diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 331f96d6da..4ae7e8d47a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -27,7 +27,6 @@ import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings; */ public class Connection extends ConnectionInvoker - implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> + implements ProtocolEventReceiver, ProtocolEventSender { protected static final Logger log = Logger.get(Connection.class); @@ -120,7 +119,7 @@ public class Connection extends ConnectionInvoker private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY; private ConnectionDelegate delegate; - private Sender<ProtocolEvent> sender; + private ProtocolEventSender sender; final private Map<Binary,Session> sessions = new HashMap<Binary,Session>(); final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>(); @@ -163,15 +162,14 @@ public class Connection extends ConnectionInvoker return Collections.unmodifiableList(listeners); } - public Sender<ProtocolEvent> getSender() + public ProtocolEventSender getSender() { return sender; } - public void setSender(Sender<ProtocolEvent> sender) + public void setSender(ProtocolEventSender sender) { this.sender = sender; - sender.setIdleTimeout(idleTimeout); } protected void setState(State state) @@ -248,7 +246,7 @@ public class Connection extends ConnectionInvoker OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); final InputHandler inputHandler = new InputHandler(new Assembler(this)); addFrameSizeObserver(inputHandler); - Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler); + ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler); if(secureReceiver instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureReceiver); @@ -260,7 +258,7 @@ public class Connection extends ConnectionInvoker setRemoteAddress(_networkConnection.getRemoteAddress()); setLocalAddress(_networkConnection.getLocalAddress()); - final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender()); + final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender()); if(secureSender instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureSender); @@ -425,7 +423,7 @@ public class Connection extends ConnectionInvoker { log.debug("SEND: [%s] %s", this, event); } - Sender<ProtocolEvent> s = sender; + ProtocolEventSender s = sender; if (s == null) { throw new ConnectionException("connection closed"); @@ -439,7 +437,7 @@ public class Connection extends ConnectionInvoker { log.debug("FLUSH: [%s]", this); } - final Sender<ProtocolEvent> theSender = sender; + final ProtocolEventSender theSender = sender; if(theSender != null) { theSender.flush(); @@ -631,6 +629,12 @@ public class Connection extends ConnectionInvoker close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface."); } + + protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options) + { + connectionClose(replyCode, replyText, _options); + } + public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options) { synchronized (lock) @@ -690,20 +694,6 @@ public class Connection extends ConnectionInvoker } } - public void setIdleTimeout(int i) - { - idleTimeout = i; - if (sender != null) - { - sender.setIdleTimeout(i); - } - } - - public int getIdleTimeout() - { - return idleTimeout; - } - public String getUserID() { return userID; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java new file mode 100644 index 0000000000..8b9c3f4f83 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import org.apache.qpid.transport.network.NetworkEvent; + +public interface NetworkEventReceiver +{ + void received(NetworkEvent msg); + + void exception(Throwable t); + + void closed(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java index 2a994580dc..e4ab540ce9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java @@ -20,19 +20,11 @@ */ package org.apache.qpid.transport; - -/** - * Receiver - * - */ - -public interface Receiver<T> +public interface ProtocolEventReceiver { - - void received(T msg); + void received(ProtocolEvent msg); void exception(Throwable t); void closed(); - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java new file mode 100644 index 0000000000..418f31b42a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +public interface ProtocolEventSender +{ + void send(ProtocolEvent msg); + + void flush(); + + void close(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 82a677b8f7..f8fd286f17 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -126,8 +126,11 @@ public class ServerDelegate extends ConnectionDelegate protected void connectionAuthFailed(final Connection conn, Exception e) { - conn.exception(e); - conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()); + if (e != null) + { + conn.exception(e); + } + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage()); } protected void connectionAuthContinue(final Connection conn, byte[] challenge) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index 2b93697bfc..070621db9b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.transport.codec; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.Type; - -import org.apache.qpid.transport.Xid; import static org.apache.qpid.transport.util.Functions.lsb; import java.io.UnsupportedEncodingException; @@ -36,6 +30,12 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.Type; +import org.apache.qpid.transport.Xid; + /** * AbstractEncoder @@ -43,7 +43,7 @@ import java.util.UUID; * @author Rafael H. Schloming */ -abstract class AbstractEncoder implements Encoder +public abstract class AbstractEncoder implements Encoder { private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java index d9150bed65..407df71824 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -360,8 +360,4 @@ public final class BBEncoder extends AbstractEncoder } } - public void writeMagicNumber() - { - out.put("AM2".getBytes()); - } -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java index a9eea13104..b5ab29cdcf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.transport.codec; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; - +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; + /** * Encoder interface. @@ -274,9 +275,10 @@ public interface Encoder * @param bytes the bytes array to be encoded. */ void writeBin128(byte [] bytes); - - /** - * Encodes the AMQP magic number. - */ - void writeMagicNumber(); -}
\ No newline at end of file + + int position(); + + ByteBuffer underlyingBuffer(); + + void init(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index a80b988cea..a7e96167c2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,28 +20,29 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.NetworkEventReceiver; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventReceiver; import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBDecoder; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Assembler * */ -public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate +public class Assembler implements NetworkEventReceiver, NetworkDelegate { // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. @@ -49,7 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>(); - private final Receiver<ProtocolEvent> receiver; + private final ProtocolEventReceiver receiver; private final Map<Integer,List<Frame>> segments; private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>() { @@ -59,7 +60,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } }; - public Assembler(Receiver<ProtocolEvent> receiver) + public Assembler(ProtocolEventReceiver receiver) { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 26e8f1850b..5463cd2587 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; @@ -38,10 +36,10 @@ import org.apache.qpid.transport.network.security.sasl.SASLSender; */ public abstract class ConnectionBinding - implements Binding<Connection,ByteBuffer> + implements Binding<Connection> { - public static Binding<Connection,ByteBuffer> get(final Connection connection) + public static Binding<Connection> get(final Connection connection) { return new ConnectionBinding() { @@ -52,7 +50,7 @@ public abstract class ConnectionBinding }; } - public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate) + public static Binding<Connection> get(final ConnectionDelegate delegate) { return new ConnectionBinding() { @@ -69,7 +67,7 @@ public abstract class ConnectionBinding public abstract Connection connection(); - public Connection endpoint(Sender<ByteBuffer> sender) + public Connection endpoint(ByteBufferSender sender) { Connection conn = connection(); @@ -87,7 +85,7 @@ public abstract class ConnectionBinding return conn; } - public Receiver<ByteBuffer> receiver(Connection conn) + public ByteBufferReceiver receiver(Connection conn) { final InputHandler inputHandler = new InputHandler(new Assembler(conn)); conn.addFrameSizeObserver(inputHandler); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index a804cb2f9d..c45b2049a1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -30,27 +30,29 @@ import static org.apache.qpid.transport.network.Frame.LAST_SEG; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.FrameSizeObserver; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventSender; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.SegmentType; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Encoder; /** * Disassembler */ -public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver +public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver { - private final Sender<ByteBuffer> sender; + private final ByteBufferSender sender; private int maxPayload; private final Object sendlock = new Object(); - private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() + private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>() { public BBEncoder initialValue() { @@ -58,7 +60,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega } }; - public Disassembler(Sender<ByteBuffer> sender, int maxFrame) + public Disassembler(ByteBufferSender sender, int maxFrame) { this.sender = sender; if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) @@ -174,7 +176,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega private void method(Method method, SegmentType type) { - BBEncoder enc = _encoder.get(); + Encoder enc = _encoder.get(); enc.init(); enc.writeUint16(method.getEncodedType()); if (type == SegmentType.COMMAND) @@ -251,11 +253,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega throw new IllegalArgumentException(String.valueOf(error)); } - public void setIdleTimeout(int i) - { - sender.setIdleTimeout(i); - } - @Override public void setMaxFrameSize(final int maxFrame) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java index e0cd9cac1a..f378c54026 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.transport.network; +import java.util.Set; + import javax.net.ssl.SSLContext; import org.apache.qpid.protocol.ProtocolEngineFactory; @@ -29,7 +31,8 @@ public interface IncomingNetworkTransport extends NetworkTransport { public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, - SSLContext sslContext); + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet); public int getAcceptingPort(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 758c2e1eda..a58bed5877 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -29,11 +29,12 @@ import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.NetworkEventReceiver; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.SegmentType; @@ -43,7 +44,7 @@ import org.apache.qpid.transport.SegmentType; * @author Rafael H. Schloming */ -public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver +public class InputHandler implements ByteBufferReceiver, FrameSizeObserver { private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE; @@ -56,7 +57,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver ERROR } - private final Receiver<NetworkEvent> receiver; + private final NetworkEventReceiver receiver; private State state; private ByteBuffer input = null; private int needed; @@ -66,7 +67,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver private byte track; private int channel; - public InputHandler(Receiver<NetworkEvent> receiver, State state) + public InputHandler(NetworkEventReceiver receiver, State state) { this.receiver = receiver; this.state = state; @@ -82,7 +83,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver } } - public InputHandler(Receiver<NetworkEvent> receiver) + public InputHandler(NetworkEventReceiver receiver) { this(receiver, PROTO_HDR); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 2810e7a9e1..bef266f214 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -21,13 +21,13 @@ package org.apache.qpid.transport.network; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; -import org.apache.qpid.transport.Sender; + +import org.apache.qpid.transport.ByteBufferSender; public interface NetworkConnection { - Sender<ByteBuffer> getSender(); + ByteBufferSender getSender(); void start(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 45231aa05d..f2735f1800 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.transport.network; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; - -import java.nio.ByteBuffer; public interface OutgoingNetworkTransport extends NetworkTransport { public NetworkConnection getConnection(); public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, + ByteBufferReceiver delegate, TransportActivity transportActivity); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java new file mode 100644 index 0000000000..b3f1f1c7dd --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +public enum TransportEncryption +{ + NONE, TLS +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java new file mode 100644 index 0000000000..8d19c5a2ce --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java @@ -0,0 +1,342 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.Set; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.TransportActivity; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; + +// TODO we are no longer using the IncomingNetworkTransport +public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +{ + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private Socket _socket; + private NetworkConnection _connection; + private AcceptingThread _acceptor; + + public NetworkConnection connect(ConnectionSettings settings, + ByteBufferReceiver delegate, + TransportActivity transportActivity) + { + int sendBufferSize = settings.getWriteBufferSize(); + int receiveBufferSize = settings.getReadBufferSize(); + + try + { + _socket = new Socket(); + _socket.setReuseAddress(true); + _socket.setTcpNoDelay(settings.isTcpNodelay()); + _socket.setSendBufferSize(sendBufferSize); + _socket.setReceiveBufferSize(receiveBufferSize); + + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); + LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); + LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); + } + + InetAddress address = InetAddress.getByName(settings.getHost()); + + _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout()); + } + catch (SocketException e) + { + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + + try + { + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); + ticker.setConnection(_connection); + _connection.start(); + } + catch(Exception e) + { + try + { + _socket.close(); + } + catch(IOException ioe) + { + //ignored, throw based on original exception + } + + throw new TransportException("Error creating network connection", e); + } + + return _connection; + } + + public void close() + { + if(_connection != null) + { + _connection.close(); + } + if(_acceptor != null) + { + _acceptor.close(); + } + } + + public NetworkConnection getConnection() + { + return _connection; + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext, final Set<TransportEncryption> encryptionSet) + { + try + { + _acceptor = new AcceptingThread(config, factory, sslContext); + _acceptor.setDaemon(false); + _acceptor.start(); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + } + + public int getAcceptingPort() + { + return _acceptor == null ? -1 : _acceptor.getPort(); + } + + protected abstract NetworkConnection createNetworkConnection(Socket socket, + ByteBufferReceiver engine, + Integer sendBufferSize, + Integer receiveBufferSize, + int timeout, + IdleTimeoutTicker ticker); + + private class AcceptingThread extends Thread + { + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocket _serverSocket; + private int _timeout; + + private AcceptingThread(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) throws IOException + { + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + if(sslContext == null) + { + _serverSocket = new ServerSocket(); + } + else + { + SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); + _serverSocket = socketFactory.createServerSocket(); + + SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket; + + SSLUtil.removeSSLv3Support(sslServerSocket); + + if(config.needClientAuth()) + { + sslServerSocket.setNeedClientAuth(true); + } + else if(config.wantClientAuth()) + { + sslServerSocket.setWantClientAuth(true); + } + + } + + _serverSocket.setReuseAddress(true); + _serverSocket.bind(address); + } + + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() + { + LOGGER.debug("Shutting down the Acceptor"); + _closed = true; + + if (!_serverSocket.isClosed()) + { + try + { + _serverSocket.close(); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + + private int getPort() + { + return _serverSocket.getLocalPort(); + } + + @Override + public void run() + { + try + { + while (!_closed) + { + Socket socket = null; + try + { + socket = _serverSocket.accept(); + + ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress()); + + if(engine != null) + { + socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socket.setSendBufferSize(sendBufferSize); + socket.setReceiveBufferSize(receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NetworkConnection connection = + createNetworkConnection(socket, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker); + + connection.setMaxReadIdle(HANSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + engine.setNetworkConnection(connection, connection.getSender()); + + connection.start(); + } + else + { + socket.close(); + } + } + catch(RuntimeException e) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket); + } + catch(IOException e) + { + if(!_closed) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket); + try + { + //Delay to avoid tight spinning the loop during issues such as too many open files + Thread.sleep(1000); + } + catch (InterruptedException ie) + { + LOGGER.debug("Stopping acceptor due to interrupt request"); + _closed = true; + } + } + } + } + } + finally + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + + _config.getAddress()); + } + } + } + + private void closeSocketIfNecessary(final Socket socket) + { + if(socket != null) + { + try + { + socket.close(); + } + catch (IOException e) + { + LOGGER.debug("Exception while closing socket", e); + } + } + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java index 54a2a360bb..71704fca3a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -25,7 +25,7 @@ import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportActivity; -class IdleTimeoutTicker implements Ticker +public class IdleTimeoutTicker implements Ticker { private final TransportActivity _transport; private final int _defaultTimeout; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 5c3124c2ec..5008849ef3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; import javax.net.ssl.SSLPeerUnverifiedException; @@ -31,8 +30,8 @@ import javax.net.ssl.SSLSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; @@ -49,7 +48,7 @@ public class IoNetworkConnection implements NetworkConnection private boolean _principalChecked; private final Object _lock = new Object(); - public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, + public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate, int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) { _socket = socket; @@ -70,7 +69,7 @@ public class IoNetworkConnection implements NetworkConnection _ioReceiver.initiate(); } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _ioSender; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index b7998ab8d9..ccab1d93cf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -20,313 +20,24 @@ */ package org.apache.qpid.transport.network.io; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLServerSocket; -import javax.net.ssl.SSLServerSocketFactory; +import org.apache.qpid.transport.ByteBufferReceiver; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.configuration.CommonProperties; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.TransportActivity; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; - -public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +public class IoNetworkTransport extends AbstractNetworkTransport { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class); - private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, - CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); - private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , - CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - - - private Socket _socket; - private IoNetworkConnection _connection; - private AcceptingThread _acceptor; - - public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, - TransportActivity transportActivity) - { - int sendBufferSize = settings.getWriteBufferSize(); - int receiveBufferSize = settings.getReadBufferSize(); - - try - { - _socket = new Socket(); - _socket.setReuseAddress(true); - _socket.setTcpNoDelay(settings.isTcpNodelay()); - _socket.setSendBufferSize(sendBufferSize); - _socket.setReceiveBufferSize(receiveBufferSize); - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); - LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); - LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); - } - - InetAddress address = InetAddress.getByName(settings.getHost()); - - _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout()); - } - catch (SocketException e) - { - throw new TransportException("Error connecting to broker", e); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker", e); - } - - try - { - IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); - ticker.setConnection(_connection); - _connection.start(); - } - catch(Exception e) - { - try - { - _socket.close(); - } - catch(IOException ioe) - { - //ignored, throw based on original exception - } - - throw new TransportException("Error creating network connection", e); - } - - return _connection; - } - - public void close() - { - if(_connection != null) - { - _connection.close(); - } - if(_acceptor != null) - { - _acceptor.close(); - } - } - - public NetworkConnection getConnection() - { - return _connection; - } - - public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext) - { - try - { - _acceptor = new AcceptingThread(config, factory, sslContext); - _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress())); - _acceptor.setDaemon(false); - _acceptor.start(); - } - catch (IOException e) - { - throw new TransportException("Failed to start AMQP on port : " + config, e); - } - } - - public int getAcceptingPort() - { - return _acceptor == null ? -1 : _acceptor.getPort(); - } - private class AcceptingThread extends Thread + @Override + protected IoNetworkConnection createNetworkConnection(final Socket socket, + final ByteBufferReceiver engine, + final Integer sendBufferSize, + final Integer receiveBufferSize, + final int timeout, + final IdleTimeoutTicker ticker) { - private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocket _serverSocket; - private int _timeout; - - private AcceptingThread(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext) throws IOException - { - _config = config; - _factory = factory; - _sslContext = sslContext; - _timeout = TIMEOUT; - - InetSocketAddress address = config.getAddress(); - - if(sslContext == null) - { - _serverSocket = new ServerSocket(); - } - else - { - SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); - _serverSocket = socketFactory.createServerSocket(); - - SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket; - - SSLUtil.removeSSLv3Support(sslServerSocket); - SSLUtil.updateEnabledCipherSuites(sslServerSocket, config.getEnabledCipherSuites(), config.getDisabledCipherSuites()); - - if(config.needClientAuth()) - { - sslServerSocket.setNeedClientAuth(true); - } - else if(config.wantClientAuth()) - { - sslServerSocket.setWantClientAuth(true); - } - - } - - _serverSocket.setReuseAddress(true); - _serverSocket.bind(address); - } - - - /** - Close the underlying ServerSocket if it has not already been closed. - */ - public void close() - { - LOGGER.debug("Shutting down the Acceptor"); - _closed = true; - - if (!_serverSocket.isClosed()) - { - try - { - _serverSocket.close(); - } - catch (IOException e) - { - throw new TransportException(e); - } - } - } - - private int getPort() - { - return _serverSocket.getLocalPort(); - } - - @Override - public void run() - { - try - { - while (!_closed) - { - Socket socket = null; - try - { - socket = _serverSocket.accept(); - - ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress()); - - if(engine != null) - { - socket.setTcpNoDelay(_config.getTcpNoDelay()); - socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socket.setSendBufferSize(sendBufferSize); - socket.setReceiveBufferSize(receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - NetworkConnection connection = - new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, - ticker); - - connection.setMaxReadIdle(HANSHAKE_TIMEOUT); - - ticker.setConnection(connection); - - engine.setNetworkConnection(connection, connection.getSender()); - - connection.start(); - } - else - { - socket.close(); - } - } - catch(RuntimeException e) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket); - } - catch(IOException e) - { - if(!_closed) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket); - try - { - //Delay to avoid tight spinning the loop during issues such as too many open files - Thread.sleep(1000); - } - catch (InterruptedException ie) - { - LOGGER.debug("Stopping acceptor due to interrupt request"); - _closed = true; - } - } - } - } - } - finally - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress()); - } - } - } - - private void closeSocketIfNecessary(final Socket socket) - { - if(socket != null) - { - try - { - socket.close(); - } - catch (IOException e) - { - LOGGER.debug("Exception while closing socket", e); - } - } - } - + return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, + ticker); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index b52b59aa15..790583e92b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLSocket; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; @@ -47,7 +47,7 @@ final class IoReceiver implements Runnable private static final Logger log = Logger.get(IoReceiver.class); - private final Receiver<ByteBuffer> receiver; + private final ByteBufferReceiver receiver; private final int bufferSize; private final Socket socket; private final long timeout; @@ -61,7 +61,7 @@ final class IoReceiver implements Runnable shutdownBroken = SystemUtils.isWindows(); } - public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) + public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout) { this.receiver = receiver; this.bufferSize = bufferSize; @@ -78,7 +78,7 @@ final class IoReceiver implements Runnable throw new RuntimeException("Error creating IOReceiver thread",e); } receiverThread.setDaemon(true); - receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress())); } public void initiate() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 25222e5285..cd01cddb05 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLSocket; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; @@ -37,7 +37,7 @@ import org.apache.qpid.transport.util.Logger; import org.apache.qpid.util.SystemUtils; -public final class IoSender implements Runnable, Sender<ByteBuffer> +public final class IoSender implements Runnable, ByteBufferSender { private static final Logger log = Logger.get(IoSender.class); @@ -97,7 +97,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); + senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress)); } public void initiate() @@ -337,18 +337,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - public void setIdleTimeout(int i) - { - try - { - socket.setSoTimeout(i); - } - catch (Exception e) - { - throw new SenderException(e); - } - } - public void setReceiver(IoReceiver receiver) { _receiver = receiver; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java index 51ef266ee9..271135f411 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.transport.network.security; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; - -import java.nio.ByteBuffer; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; public interface SecurityLayer { - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate); - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate); + public ByteBufferSender sender(ByteBufferSender delegate); + public ByteBufferReceiver receiver(ByteBufferReceiver delegate); public String getUserID(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java index 2a2f3d8362..d25e97ffe4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.transport.network.security; -import java.nio.ByteBuffer; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; @@ -110,14 +108,14 @@ public class SecurityLayerFactory } - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) + public ByteBufferSender sender(ByteBufferSender delegate) { SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus); sender.setHostname(_hostname); return sender; } - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate) + public ByteBufferReceiver receiver(ByteBufferReceiver delegate) { SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus); receiver.setHostname(_hostname); @@ -141,13 +139,13 @@ public class SecurityLayerFactory _layer = layer; } - public SASLSender sender(Sender<ByteBuffer> delegate) + public SASLSender sender(ByteBufferSender delegate) { SASLSender sender = new SASLSender(_layer.sender(delegate)); return sender; } - public SASLReceiver receiver(Receiver<ByteBuffer> delegate) + public SASLReceiver receiver(ByteBufferReceiver delegate) { SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate)); return receiver; @@ -169,12 +167,12 @@ public class SecurityLayerFactory { } - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) + public ByteBufferSender sender(ByteBufferSender delegate) { return delegate; } - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate) + public ByteBufferReceiver receiver(ByteBufferReceiver delegate) { return delegate; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java index 59e9453454..983e3bdf90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java @@ -21,20 +21,21 @@ package org.apache.qpid.transport.network.security.sasl; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; import javax.security.sasl.SaslException; -import java.nio.ByteBuffer; -public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> { +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver { - private Receiver<ByteBuffer> delegate; + private ByteBufferReceiver delegate; private byte[] netData; private static final Logger log = Logger.get(SASLReceiver.class); - public SASLReceiver(Receiver<ByteBuffer> delegate) + public SASLReceiver(ByteBufferReceiver delegate) { this.delegate = delegate; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index 098f2fb20c..335f8992ca 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -21,22 +21,24 @@ package org.apache.qpid.transport.network.security.sasl; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.util.Logger; - -import javax.security.sasl.SaslException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; -public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { +import javax.security.sasl.SaslException; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SASLSender extends SASLEncryptor implements ByteBufferSender +{ - private Sender<ByteBuffer> delegate; + private ByteBufferSender delegate; private byte[] appData; private final AtomicBoolean closed = new AtomicBoolean(false); private static final Logger log = Logger.get(SASLSender.class); - public SASLSender(Sender<ByteBuffer> delegate) + public SASLSender(ByteBufferSender delegate) { this.delegate = delegate; log.debug("SASL Sender enabled"); @@ -103,11 +105,6 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { } } - public void setIdleTimeout(int i) - { - delegate.setIdleTimeout(i); - } - public void securityLayerEstablished() { appData = new byte[getSendBuffSize()]; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java index 24f95d7798..e69de29bb2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.security.ssl; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; -import javax.net.ssl.SSLEngineResult.Status; -import javax.net.ssl.SSLException; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; - -public class SSLBufferingSender implements Sender<ByteBuffer> -{ - private static final Logger log = Logger.get(SSLBufferingSender.class); - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - - private final Sender<ByteBuffer> delegate; - private final SSLEngine engine; - private final int sslBufSize; - private final ByteBuffer netData; - private final SSLStatus _sslStatus; - - private String _hostname; - - private final AtomicBoolean closed = new AtomicBoolean(false); - private ByteBuffer _appData = EMPTY_BYTE_BUFFER; - - - public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) - { - this.engine = engine; - this.delegate = delegate; - sslBufSize = engine.getSession().getPacketBufferSize(); - netData = ByteBuffer.allocate(sslBufSize); - _sslStatus = sslStatus; - } - - public void setHostname(String hostname) - { - _hostname = hostname; - } - - public void close() - { - if (!closed.getAndSet(true)) - { - if (engine.isOutboundDone()) - { - return; - } - log.debug("Closing SSL connection"); - doSend(); - engine.closeOutbound(); - try - { - tearDownSSLConnection(); - } - catch(Exception e) - { - throw new SenderException("Error closing SSL connection",e); - } - - - synchronized(_sslStatus.getSslLock()) - { - while (!engine.isOutboundDone()) - { - try - { - _sslStatus.getSslLock().wait(); - } - catch(InterruptedException e) - { - // pass - } - - } - } - delegate.close(); - } - } - - private void tearDownSSLConnection() throws Exception - { - SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData); - Status status = result.getStatus(); - int read = result.bytesProduced(); - while (status != Status.CLOSED) - { - if (status == Status.BUFFER_OVERFLOW) - { - netData.clear(); - } - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - delegate.send(data); - flush(); - } - result = engine.wrap(ByteBuffer.allocate(0), netData); - status = result.getStatus(); - read = result.bytesProduced(); - } - } - - public void flush() - { - delegate.flush(); - } - - public void send() - { - if(!closed.get()) - { - doSend(); - } - } - - public synchronized void send(ByteBuffer appData) - { - boolean buffered; - if(buffered = _appData.hasRemaining()) - { - ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining()); - newBuf.put(_appData); - newBuf.put(appData); - newBuf.flip(); - _appData = newBuf; - } - if (closed.get()) - { - throw new SenderException("SSL Sender is closed"); - } - doSend(); - if(!appData.hasRemaining()) - { - _appData = EMPTY_BYTE_BUFFER; - } - else if(!buffered) - { - _appData = ByteBuffer.allocate(appData.remaining()); - _appData.put(appData); - _appData.flip(); - } - } - - private synchronized void doSend() - { - - HandshakeStatus handshakeStatus; - Status status; - - while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) - && !_sslStatus.getSslErrorFlag()) - { - int read = 0; - try - { - SSLEngineResult result = engine.wrap(_appData, netData); - read = result.bytesProduced(); - status = result.getStatus(); - handshakeStatus = result.getHandshakeStatus(); - } - catch(SSLException e) - { - // Should this set _sslError?? - throw new SenderException("SSL, Error occurred while encrypting data",e); - } - - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - delegate.send(data); - } - - switch(status) - { - case CLOSED: - throw new SenderException("SSLEngine is closed"); - - case BUFFER_OVERFLOW: - netData.clear(); - continue; - - case OK: - break; // do nothing - - default: - throw new IllegalStateException("SSLReceiver: Invalid State " + status); - } - - switch (handshakeStatus) - { - case NEED_WRAP: - if (netData.hasRemaining()) - { - continue; - } - - case NEED_TASK: - doTasks(); - break; - - case NEED_UNWRAP: - flush(); - return; - - case FINISHED: - if (_hostname != null) - { - SSLUtil.verifyHostname(engine, _hostname); - } - - case NOT_HANDSHAKING: - break; //do nothing - - default: - throw new IllegalStateException("SSLSender: Invalid State " + status); - } - - } - } - - private void doTasks() - { - Runnable runnable; - while ((runnable = engine.getDelegatedTask()) != null) { - runnable.run(); - } - } - - public void setIdleTimeout(int i) - { - delegate.setIdleTimeout(i); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index 8e1395aa83..49e4ad631a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -28,16 +28,16 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.security.SSLStatus; import org.apache.qpid.transport.util.Logger; -public class SSLReceiver implements Receiver<ByteBuffer> +public class SSLReceiver implements ByteBufferReceiver { private static final Logger log = Logger.get(SSLReceiver.class); - private final Receiver<ByteBuffer> delegate; + private final ByteBufferReceiver delegate; private final SSLEngine engine; private final int sslBufSize; private final ByteBuffer localBuffer; @@ -47,7 +47,7 @@ public class SSLReceiver implements Receiver<ByteBuffer> private String _hostname; - public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus) + public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java index 53bd7e49b7..3d133cb9b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java @@ -19,24 +19,25 @@ */ package org.apache.qpid.transport.network.security.ssl; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -public class SSLSender implements Sender<ByteBuffer> +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; + +public class SSLSender implements ByteBufferSender { private static final Logger log = Logger.get(SSLSender.class); - private final Sender<ByteBuffer> delegate; + private final ByteBufferSender delegate; private final SSLEngine engine; private final int sslBufSize; private final ByteBuffer netData; @@ -48,7 +49,7 @@ public class SSLSender implements Sender<ByteBuffer> private final AtomicBoolean closed = new AtomicBoolean(false); - public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) + public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; @@ -264,8 +265,4 @@ public class SSLSender implements Sender<ByteBuffer> } } - public void setIdleTimeout(int i) - { - delegate.setIdleTimeout(i); - } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java index 3071594be7..3da2a03f42 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java @@ -21,16 +21,16 @@ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; +import java.util.Set; import javax.net.ssl.SSLContext; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.io.IoNetworkTransport; @@ -129,7 +129,7 @@ public class TransportTest extends QpidTestCase } public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, + ByteBufferReceiver delegate, TransportActivity transportActivity) { throw new UnsupportedOperationException(); @@ -150,7 +150,9 @@ public class TransportTest extends QpidTestCase } public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, SSLContext sslContext) + ProtocolEngineFactory factory, + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet) { throw new UnsupportedOperationException(); } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java index a445cff0a7..69724438ec 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java @@ -21,14 +21,12 @@ package org.apache.qpid.transport.network.io; -import junit.framework.TestCase; - import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.Sender; +import junit.framework.TestCase; + +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; @@ -193,7 +191,7 @@ public class IdleTimeoutTickerTest extends TestCase implements TransportActivity //------------------------------------------------------------------------- @Override - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return null; } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java index bb864cd434..67d360fa9e 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -20,16 +20,15 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.log4j.Logger; -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.TransportException; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; -import java.nio.ByteBuffer; + +import org.apache.log4j.Logger; + +import org.apache.qpid.transport.Binding; /** @@ -44,9 +43,9 @@ public class IoAcceptor<E> extends Thread private volatile boolean _closed = false; private ServerSocket socket; - private Binding<E,ByteBuffer> binding; + private Binding<E> binding; - public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding) + public IoAcceptor(SocketAddress address, Binding<E> binding) throws IOException { socket = new ServerSocket(); @@ -70,7 +69,7 @@ public class IoAcceptor<E> extends Thread } } - public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding) + public IoAcceptor(String host, int port, Binding<E> binding) throws IOException { this(new InetSocketAddress(host, port), binding); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java index f74051aa32..4b5b4448ee 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -20,10 +20,9 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; -import java.nio.ByteBuffer; import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.util.Logger; /** @@ -48,18 +47,18 @@ public final class IoTransport<E> ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); private Socket socket; - private Sender<ByteBuffer> sender; + private ByteBufferSender sender; private E endpoint; private IoReceiver receiver; private long timeout = 60000; - IoTransport(Socket socket, Binding<E,ByteBuffer> binding) + IoTransport(Socket socket, Binding<E> binding) { this.socket = socket; setupTransport(socket, binding); } - private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding) + private void setupTransport(Socket socket, Binding<E> binding) { IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); ios.initiate(); @@ -73,7 +72,7 @@ public final class IoTransport<E> ios.setReceiver(this.receiver); } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return sender; } |
