diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-01-31 20:07:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-01-31 20:07:36 +0000 |
| commit | 26eab7ed4556717fca50ad93025fdc8d112f9715 (patch) | |
| tree | c4019683f17a8ec570786ff067a0d261c5c603e0 /qpid/java/common/src/main | |
| parent | aef6c73485912be3be3d9bc60bb9671c951368c6 (diff) | |
| download | qpid-python-26eab7ed4556717fca50ad93025fdc8d112f9715.tar.gz | |
Separate Byte and ProtocolEvent sender/receivers, add server specific 0-10 encoder
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656248 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
31 files changed, 236 insertions, 166 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 0c643f6322..73c8653677 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.transport.Sender; - -import java.nio.ByteBuffer; +import org.apache.qpid.transport.ByteBufferSender; /** @@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; - public void setSender(Sender<ByteBuffer> sender); + public void setSender(ByteBufferSender sender); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index cad5461d83..f73f6d931a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -21,10 +21,9 @@ package org.apache.qpid.protocol; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; @@ -32,7 +31,7 @@ import org.apache.qpid.transport.network.TransportActivity; * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity +public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity { // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); @@ -58,6 +57,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport void encryptedTransport(); - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java index 8418c42189..f703c01567 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java @@ -26,11 +26,11 @@ package org.apache.qpid.transport; * */ -public interface Binding<E,T> +public interface Binding<E> { - E endpoint(Sender<T> sender); + E endpoint(ByteBufferSender sender); - Receiver<T> receiver(E endpoint); + ByteBufferReceiver receiver(E endpoint); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java new file mode 100644 index 0000000000..1015f061c8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +public interface ByteBufferReceiver +{ + void received(ByteBuffer msg); + + void exception(Throwable t); + + void closed(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java new file mode 100644 index 0000000000..7dcaf61a26 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +public interface ByteBufferSender +{ + void send(ByteBuffer msg); + + void flush(); + + void close(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 92ccdb84af..39f27b0fe0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -27,7 +27,6 @@ import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings; */ public class Connection extends ConnectionInvoker - implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> + implements ProtocolEventReceiver, ProtocolEventSender { protected static final Logger log = Logger.get(Connection.class); @@ -113,7 +112,7 @@ public class Connection extends ConnectionInvoker private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY; private ConnectionDelegate delegate; - private Sender<ProtocolEvent> sender; + private ProtocolEventSender sender; final private Map<Binary,Session> sessions = new HashMap<Binary,Session>(); final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>(); @@ -151,12 +150,12 @@ public class Connection extends ConnectionInvoker listeners.add(listener); } - public Sender<ProtocolEvent> getSender() + public ProtocolEventSender getSender() { return sender; } - public void setSender(Sender<ProtocolEvent> sender) + public void setSender(ProtocolEventSender sender) { this.sender = sender; } @@ -234,7 +233,7 @@ public class Connection extends ConnectionInvoker OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); final InputHandler inputHandler = new InputHandler(new Assembler(this)); addFrameSizeObserver(inputHandler); - Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler); + ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler); if(secureReceiver instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureReceiver); @@ -246,7 +245,7 @@ public class Connection extends ConnectionInvoker setRemoteAddress(_networkConnection.getRemoteAddress()); setLocalAddress(_networkConnection.getLocalAddress()); - final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender()); + final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender()); if(secureSender instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureSender); @@ -411,7 +410,7 @@ public class Connection extends ConnectionInvoker { log.debug("SEND: [%s] %s", this, event); } - Sender<ProtocolEvent> s = sender; + ProtocolEventSender s = sender; if (s == null) { throw new ConnectionException("connection closed"); @@ -425,7 +424,7 @@ public class Connection extends ConnectionInvoker { log.debug("FLUSH: [%s]", this); } - final Sender<ProtocolEvent> theSender = sender; + final ProtocolEventSender theSender = sender; if(theSender != null) { theSender.flush(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java new file mode 100644 index 0000000000..8b9c3f4f83 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import org.apache.qpid.transport.network.NetworkEvent; + +public interface NetworkEventReceiver +{ + void received(NetworkEvent msg); + + void exception(Throwable t); + + void closed(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java index 2a994580dc..e4ab540ce9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java @@ -20,19 +20,11 @@ */ package org.apache.qpid.transport; - -/** - * Receiver - * - */ - -public interface Receiver<T> +public interface ProtocolEventReceiver { - - void received(T msg); + void received(ProtocolEvent msg); void exception(Throwable t); void closed(); - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java index 9a6f675d7f..418f31b42a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java @@ -20,19 +20,11 @@ */ package org.apache.qpid.transport; - -/** - * Sender - * - */ - -public interface Sender<T> +public interface ProtocolEventSender { - - void send(T msg); + void send(ProtocolEvent msg); void flush(); void close(); - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index 2b93697bfc..070621db9b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.transport.codec; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; -import org.apache.qpid.transport.Type; - -import org.apache.qpid.transport.Xid; import static org.apache.qpid.transport.util.Functions.lsb; import java.io.UnsupportedEncodingException; @@ -36,6 +30,12 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.Type; +import org.apache.qpid.transport.Xid; + /** * AbstractEncoder @@ -43,7 +43,7 @@ import java.util.UUID; * @author Rafael H. Schloming */ -abstract class AbstractEncoder implements Encoder +public abstract class AbstractEncoder implements Encoder { private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java index d9150bed65..407df71824 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java @@ -360,8 +360,4 @@ public final class BBEncoder extends AbstractEncoder } } - public void writeMagicNumber() - { - out.put("AM2".getBytes()); - } -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java index a9eea13104..b5ab29cdcf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.transport.codec; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Struct; - +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Struct; + /** * Encoder interface. @@ -274,9 +275,10 @@ public interface Encoder * @param bytes the bytes array to be encoded. */ void writeBin128(byte [] bytes); - - /** - * Encodes the AMQP magic number. - */ - void writeMagicNumber(); -}
\ No newline at end of file + + int position(); + + ByteBuffer underlyingBuffer(); + + void init(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index a80b988cea..a7e96167c2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,28 +20,29 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.NetworkEventReceiver; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventReceiver; import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBDecoder; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Assembler * */ -public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate +public class Assembler implements NetworkEventReceiver, NetworkDelegate { // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. @@ -49,7 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>(); - private final Receiver<ProtocolEvent> receiver; + private final ProtocolEventReceiver receiver; private final Map<Integer,List<Frame>> segments; private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>() { @@ -59,7 +60,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } }; - public Assembler(Receiver<ProtocolEvent> receiver) + public Assembler(ProtocolEventReceiver receiver) { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 26e8f1850b..5463cd2587 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.Constant; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; @@ -38,10 +36,10 @@ import org.apache.qpid.transport.network.security.sasl.SASLSender; */ public abstract class ConnectionBinding - implements Binding<Connection,ByteBuffer> + implements Binding<Connection> { - public static Binding<Connection,ByteBuffer> get(final Connection connection) + public static Binding<Connection> get(final Connection connection) { return new ConnectionBinding() { @@ -52,7 +50,7 @@ public abstract class ConnectionBinding }; } - public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate) + public static Binding<Connection> get(final ConnectionDelegate delegate) { return new ConnectionBinding() { @@ -69,7 +67,7 @@ public abstract class ConnectionBinding public abstract Connection connection(); - public Connection endpoint(Sender<ByteBuffer> sender) + public Connection endpoint(ByteBufferSender sender) { Connection conn = connection(); @@ -87,7 +85,7 @@ public abstract class ConnectionBinding return conn; } - public Receiver<ByteBuffer> receiver(Connection conn) + public ByteBufferReceiver receiver(Connection conn) { final InputHandler inputHandler = new InputHandler(new Assembler(conn)); conn.addFrameSizeObserver(inputHandler); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 81a4c781a4..c45b2049a1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -30,27 +30,29 @@ import static org.apache.qpid.transport.network.Frame.LAST_SEG; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.FrameSizeObserver; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; +import org.apache.qpid.transport.ProtocolEventSender; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.SegmentType; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Encoder; /** * Disassembler */ -public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver +public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver { - private final Sender<ByteBuffer> sender; + private final ByteBufferSender sender; private int maxPayload; private final Object sendlock = new Object(); - private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() + private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>() { public BBEncoder initialValue() { @@ -58,7 +60,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega } }; - public Disassembler(Sender<ByteBuffer> sender, int maxFrame) + public Disassembler(ByteBufferSender sender, int maxFrame) { this.sender = sender; if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) @@ -174,7 +176,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega private void method(Method method, SegmentType type) { - BBEncoder enc = _encoder.get(); + Encoder enc = _encoder.get(); enc.init(); enc.writeUint16(method.getEncodedType()); if (type == SegmentType.COMMAND) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 758c2e1eda..a58bed5877 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -29,11 +29,12 @@ import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.NetworkEventReceiver; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.SegmentType; @@ -43,7 +44,7 @@ import org.apache.qpid.transport.SegmentType; * @author Rafael H. Schloming */ -public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver +public class InputHandler implements ByteBufferReceiver, FrameSizeObserver { private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE; @@ -56,7 +57,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver ERROR } - private final Receiver<NetworkEvent> receiver; + private final NetworkEventReceiver receiver; private State state; private ByteBuffer input = null; private int needed; @@ -66,7 +67,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver private byte track; private int channel; - public InputHandler(Receiver<NetworkEvent> receiver, State state) + public InputHandler(NetworkEventReceiver receiver, State state) { this.receiver = receiver; this.state = state; @@ -82,7 +83,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver } } - public InputHandler(Receiver<NetworkEvent> receiver) + public InputHandler(NetworkEventReceiver receiver) { this(receiver, PROTO_HDR); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 2810e7a9e1..bef266f214 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -21,13 +21,13 @@ package org.apache.qpid.transport.network; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; -import org.apache.qpid.transport.Sender; + +import org.apache.qpid.transport.ByteBufferSender; public interface NetworkConnection { - Sender<ByteBuffer> getSender(); + ByteBufferSender getSender(); void start(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 45231aa05d..f2735f1800 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.transport.network; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; - -import java.nio.ByteBuffer; public interface OutgoingNetworkTransport extends NetworkTransport { public NetworkConnection getConnection(); public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, + ByteBufferReceiver delegate, TransportActivity transportActivity); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java index 9d0fe5ddf6..8d19c5a2ce 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java @@ -26,7 +26,6 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.nio.ByteBuffer; import java.util.Set; import javax.net.ssl.SSLContext; @@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.CommonProperties; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkConnection; @@ -62,7 +61,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo private AcceptingThread _acceptor; public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, + ByteBufferReceiver delegate, TransportActivity transportActivity) { int sendBufferSize = settings.getWriteBufferSize(); @@ -159,7 +158,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo } protected abstract NetworkConnection createNetworkConnection(Socket socket, - Receiver<ByteBuffer> engine, + ByteBufferReceiver engine, Integer sendBufferSize, Integer receiveBufferSize, int timeout, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 5c3124c2ec..5008849ef3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.security.Principal; import javax.net.ssl.SSLPeerUnverifiedException; @@ -31,8 +30,8 @@ import javax.net.ssl.SSLSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; @@ -49,7 +48,7 @@ public class IoNetworkConnection implements NetworkConnection private boolean _principalChecked; private final Object _lock = new Object(); - public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, + public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate, int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) { _socket = socket; @@ -70,7 +69,7 @@ public class IoNetworkConnection implements NetworkConnection _ioReceiver.initiate(); } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _ioSender; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index f33f626601..ccab1d93cf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -21,9 +21,8 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; -import java.nio.ByteBuffer; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; public class IoNetworkTransport extends AbstractNetworkTransport { @@ -31,7 +30,7 @@ public class IoNetworkTransport extends AbstractNetworkTransport @Override protected IoNetworkConnection createNetworkConnection(final Socket socket, - final Receiver<ByteBuffer> engine, + final ByteBufferReceiver engine, final Integer sendBufferSize, final Integer receiveBufferSize, final int timeout, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 467115c76f..790583e92b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLSocket; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; @@ -47,7 +47,7 @@ final class IoReceiver implements Runnable private static final Logger log = Logger.get(IoReceiver.class); - private final Receiver<ByteBuffer> receiver; + private final ByteBufferReceiver receiver; private final int bufferSize; private final Socket socket; private final long timeout; @@ -61,7 +61,7 @@ final class IoReceiver implements Runnable shutdownBroken = SystemUtils.isWindows(); } - public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) + public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout) { this.receiver = receiver; this.bufferSize = bufferSize; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 79e99287c4..61beae4c25 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -27,14 +27,14 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; -public final class IoSender implements Runnable, Sender<ByteBuffer> +public final class IoSender implements Runnable, ByteBufferSender { private static final Logger log = Logger.get(IoSender.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java index 68670d1a9d..fe6e707f7e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -21,7 +21,6 @@ package org.apache.qpid.transport.network.io; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.security.Principal; import java.util.Set; @@ -32,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportEncryption; @@ -88,7 +87,7 @@ public class NonBlockingConnection implements NetworkConnection { } - public Sender<ByteBuffer> getSender() + public ByteBufferSender getSender() { return _nonBlockingSenderReceiver; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 347a41ee07..02099dee15 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -40,7 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.network.Ticker; @@ -48,7 +48,7 @@ import org.apache.qpid.transport.network.TransportEncryption; import org.apache.qpid.transport.network.security.ssl.SSLUtil; import org.apache.qpid.util.SystemUtils; -public class NonBlockingSenderReceiver implements Sender<ByteBuffer> +public class NonBlockingSenderReceiver implements ByteBufferSender { private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java index 51ef266ee9..271135f411 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -20,16 +20,14 @@ */ package org.apache.qpid.transport.network.security; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; - -import java.nio.ByteBuffer; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; public interface SecurityLayer { - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate); - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate); + public ByteBufferSender sender(ByteBufferSender delegate); + public ByteBufferReceiver receiver(ByteBufferReceiver delegate); public String getUserID(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java index 2a2f3d8362..d25e97ffe4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.transport.network.security; -import java.nio.ByteBuffer; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; @@ -110,14 +108,14 @@ public class SecurityLayerFactory } - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) + public ByteBufferSender sender(ByteBufferSender delegate) { SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus); sender.setHostname(_hostname); return sender; } - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate) + public ByteBufferReceiver receiver(ByteBufferReceiver delegate) { SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus); receiver.setHostname(_hostname); @@ -141,13 +139,13 @@ public class SecurityLayerFactory _layer = layer; } - public SASLSender sender(Sender<ByteBuffer> delegate) + public SASLSender sender(ByteBufferSender delegate) { SASLSender sender = new SASLSender(_layer.sender(delegate)); return sender; } - public SASLReceiver receiver(Receiver<ByteBuffer> delegate) + public SASLReceiver receiver(ByteBufferReceiver delegate) { SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate)); return receiver; @@ -169,12 +167,12 @@ public class SecurityLayerFactory { } - public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) + public ByteBufferSender sender(ByteBufferSender delegate) { return delegate; } - public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate) + public ByteBufferReceiver receiver(ByteBufferReceiver delegate) { return delegate; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java index 59e9453454..983e3bdf90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java @@ -21,20 +21,21 @@ package org.apache.qpid.transport.network.security.sasl; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; import javax.security.sasl.SaslException; -import java.nio.ByteBuffer; -public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> { +import org.apache.qpid.transport.ByteBufferReceiver; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver { - private Receiver<ByteBuffer> delegate; + private ByteBufferReceiver delegate; private byte[] netData; private static final Logger log = Logger.get(SASLReceiver.class); - public SASLReceiver(Receiver<ByteBuffer> delegate) + public SASLReceiver(ByteBufferReceiver delegate) { this.delegate = delegate; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index fa1801cb65..335f8992ca 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -21,22 +21,24 @@ package org.apache.qpid.transport.network.security.sasl; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.util.Logger; - -import javax.security.sasl.SaslException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; -public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { +import javax.security.sasl.SaslException; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SASLSender extends SASLEncryptor implements ByteBufferSender +{ - private Sender<ByteBuffer> delegate; + private ByteBufferSender delegate; private byte[] appData; private final AtomicBoolean closed = new AtomicBoolean(false); private static final Logger log = Logger.get(SASLSender.class); - public SASLSender(Sender<ByteBuffer> delegate) + public SASLSender(ByteBufferSender delegate) { this.delegate = delegate; log.debug("SASL Sender enabled"); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index 1bbf166d82..ce3bace9e8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -28,16 +28,16 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.ByteBufferReceiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.security.SSLStatus; import org.apache.qpid.transport.util.Logger; -public class SSLReceiver implements Receiver<ByteBuffer> +public class SSLReceiver implements ByteBufferReceiver { private static final Logger log = Logger.get(SSLReceiver.class); - private final Receiver<ByteBuffer> delegate; + private final ByteBufferReceiver delegate; private final SSLEngine engine; private final int sslBufSize; private final ByteBuffer localBuffer; @@ -47,7 +47,7 @@ public class SSLReceiver implements Receiver<ByteBuffer> private String _hostname; - public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus) + public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java index 7d64012fea..755f7430ba 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java @@ -19,24 +19,25 @@ */ package org.apache.qpid.transport.network.security.ssl; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -public class SSLSender implements Sender<ByteBuffer> +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; + +public class SSLSender implements ByteBufferSender { private static final Logger log = Logger.get(SSLSender.class); - private final Sender<ByteBuffer> delegate; + private final ByteBufferSender delegate; private final SSLEngine engine; private final int sslBufSize; private final ByteBuffer netData; @@ -48,7 +49,7 @@ public class SSLSender implements Sender<ByteBuffer> private final AtomicBoolean closed = new AtomicBoolean(false); - public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) + public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; |
