summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-31 20:07:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-31 20:07:36 +0000
commit26eab7ed4556717fca50ad93025fdc8d112f9715 (patch)
treec4019683f17a8ec570786ff067a0d261c5c603e0 /qpid/java/common/src/main
parentaef6c73485912be3be3d9bc60bb9671c951368c6 (diff)
downloadqpid-python-26eab7ed4556717fca50ad93025fdc8d112f9715.tar.gz
Separate Byte and ProtocolEvent sender/receivers, add server specific 0-10 encoder
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656248 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java17
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java)12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java)12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java15
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java19
31 files changed, 236 insertions, 166 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 0c643f6322..73c8653677 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
/**
@@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
- public void setSender(Sender<ByteBuffer> sender);
+ public void setSender(ByteBufferSender sender);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index cad5461d83..f73f6d931a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -21,10 +21,9 @@
package org.apache.qpid.protocol;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
@@ -32,7 +31,7 @@ import org.apache.qpid.transport.network.TransportActivity;
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
*/
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
+public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
{
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -58,6 +57,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport
void encryptedTransport();
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
index 8418c42189..f703c01567 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
@@ -26,11 +26,11 @@ package org.apache.qpid.transport;
*
*/
-public interface Binding<E,T>
+public interface Binding<E>
{
- E endpoint(Sender<T> sender);
+ E endpoint(ByteBufferSender sender);
- Receiver<T> receiver(E endpoint);
+ ByteBufferReceiver receiver(E endpoint);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
new file mode 100644
index 0000000000..1015f061c8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferReceiver
+{
+ void received(ByteBuffer msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
new file mode 100644
index 0000000000..7dcaf61a26
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferSender
+{
+ void send(ByteBuffer msg);
+
+ void flush();
+
+ void close();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 92ccdb84af..39f27b0fe0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -27,7 +27,6 @@ import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings;
*/
public class Connection extends ConnectionInvoker
- implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
+ implements ProtocolEventReceiver, ProtocolEventSender
{
protected static final Logger log = Logger.get(Connection.class);
@@ -113,7 +112,7 @@ public class Connection extends ConnectionInvoker
private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
private ConnectionDelegate delegate;
- private Sender<ProtocolEvent> sender;
+ private ProtocolEventSender sender;
final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>();
@@ -151,12 +150,12 @@ public class Connection extends ConnectionInvoker
listeners.add(listener);
}
- public Sender<ProtocolEvent> getSender()
+ public ProtocolEventSender getSender()
{
return sender;
}
- public void setSender(Sender<ProtocolEvent> sender)
+ public void setSender(ProtocolEventSender sender)
{
this.sender = sender;
}
@@ -234,7 +233,7 @@ public class Connection extends ConnectionInvoker
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
final InputHandler inputHandler = new InputHandler(new Assembler(this));
addFrameSizeObserver(inputHandler);
- Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
+ ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
@@ -246,7 +245,7 @@ public class Connection extends ConnectionInvoker
setRemoteAddress(_networkConnection.getRemoteAddress());
setLocalAddress(_networkConnection.getLocalAddress());
- final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
+ final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
if(secureSender instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureSender);
@@ -411,7 +410,7 @@ public class Connection extends ConnectionInvoker
{
log.debug("SEND: [%s] %s", this, event);
}
- Sender<ProtocolEvent> s = sender;
+ ProtocolEventSender s = sender;
if (s == null)
{
throw new ConnectionException("connection closed");
@@ -425,7 +424,7 @@ public class Connection extends ConnectionInvoker
{
log.debug("FLUSH: [%s]", this);
}
- final Sender<ProtocolEvent> theSender = sender;
+ final ProtocolEventSender theSender = sender;
if(theSender != null)
{
theSender.flush();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
new file mode 100644
index 0000000000..8b9c3f4f83
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import org.apache.qpid.transport.network.NetworkEvent;
+
+public interface NetworkEventReceiver
+{
+ void received(NetworkEvent msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
index 2a994580dc..e4ab540ce9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
@@ -20,19 +20,11 @@
*/
package org.apache.qpid.transport;
-
-/**
- * Receiver
- *
- */
-
-public interface Receiver<T>
+public interface ProtocolEventReceiver
{
-
- void received(T msg);
+ void received(ProtocolEvent msg);
void exception(Throwable t);
void closed();
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
index 9a6f675d7f..418f31b42a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
@@ -20,19 +20,11 @@
*/
package org.apache.qpid.transport;
-
-/**
- * Sender
- *
- */
-
-public interface Sender<T>
+public interface ProtocolEventSender
{
-
- void send(T msg);
+ void send(ProtocolEvent msg);
void flush();
void close();
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index 2b93697bfc..070621db9b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import org.apache.qpid.transport.Xid;
import static org.apache.qpid.transport.util.Functions.lsb;
import java.io.UnsupportedEncodingException;
@@ -36,6 +30,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
+import org.apache.qpid.transport.Xid;
+
/**
* AbstractEncoder
@@ -43,7 +43,7 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-abstract class AbstractEncoder implements Encoder
+public abstract class AbstractEncoder implements Encoder
{
private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
index d9150bed65..407df71824 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
@@ -360,8 +360,4 @@ public final class BBEncoder extends AbstractEncoder
}
}
- public void writeMagicNumber()
- {
- out.put("AM2".getBytes());
- }
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
index a9eea13104..b5ab29cdcf 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
/**
* Encoder interface.
@@ -274,9 +275,10 @@ public interface Encoder
* @param bytes the bytes array to be encoded.
*/
void writeBin128(byte [] bytes);
-
- /**
- * Encodes the AMQP magic number.
- */
- void writeMagicNumber();
-} \ No newline at end of file
+
+ int position();
+
+ ByteBuffer underlyingBuffer();
+
+ void init();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index a80b988cea..a7e96167c2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,28 +20,29 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventReceiver;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBDecoder;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Assembler
*
*/
-public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+public class Assembler implements NetworkEventReceiver, NetworkDelegate
{
// Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
// array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
@@ -49,7 +50,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
- private final Receiver<ProtocolEvent> receiver;
+ private final ProtocolEventReceiver receiver;
private final Map<Integer,List<Frame>> segments;
private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
{
@@ -59,7 +60,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
};
- public Assembler(Receiver<ProtocolEvent> receiver)
+ public Assembler(ProtocolEventReceiver receiver)
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 26e8f1850b..5463cd2587 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -38,10 +36,10 @@ import org.apache.qpid.transport.network.security.sasl.SASLSender;
*/
public abstract class ConnectionBinding
- implements Binding<Connection,ByteBuffer>
+ implements Binding<Connection>
{
- public static Binding<Connection,ByteBuffer> get(final Connection connection)
+ public static Binding<Connection> get(final Connection connection)
{
return new ConnectionBinding()
{
@@ -52,7 +50,7 @@ public abstract class ConnectionBinding
};
}
- public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
+ public static Binding<Connection> get(final ConnectionDelegate delegate)
{
return new ConnectionBinding()
{
@@ -69,7 +67,7 @@ public abstract class ConnectionBinding
public abstract Connection connection();
- public Connection endpoint(Sender<ByteBuffer> sender)
+ public Connection endpoint(ByteBufferSender sender)
{
Connection conn = connection();
@@ -87,7 +85,7 @@ public abstract class ConnectionBinding
return conn;
}
- public Receiver<ByteBuffer> receiver(Connection conn)
+ public ByteBufferReceiver receiver(Connection conn)
{
final InputHandler inputHandler = new InputHandler(new Assembler(conn));
conn.addFrameSizeObserver(inputHandler);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 81a4c781a4..c45b2049a1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -30,27 +30,29 @@ import static org.apache.qpid.transport.network.Frame.LAST_SEG;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
/**
* Disassembler
*/
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
{
- private final Sender<ByteBuffer> sender;
+ private final ByteBufferSender sender;
private int maxPayload;
private final Object sendlock = new Object();
- private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+ private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>()
{
public BBEncoder initialValue()
{
@@ -58,7 +60,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
}
};
- public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+ public Disassembler(ByteBufferSender sender, int maxFrame)
{
this.sender = sender;
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
@@ -174,7 +176,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
private void method(Method method, SegmentType type)
{
- BBEncoder enc = _encoder.get();
+ Encoder enc = _encoder.get();
enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index 758c2e1eda..a58bed5877 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -29,11 +29,12 @@ import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.SegmentType;
@@ -43,7 +44,7 @@ import org.apache.qpid.transport.SegmentType;
* @author Rafael H. Schloming
*/
-public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
+public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
{
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
@@ -56,7 +57,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
ERROR
}
- private final Receiver<NetworkEvent> receiver;
+ private final NetworkEventReceiver receiver;
private State state;
private ByteBuffer input = null;
private int needed;
@@ -66,7 +67,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
private byte track;
private int channel;
- public InputHandler(Receiver<NetworkEvent> receiver, State state)
+ public InputHandler(NetworkEventReceiver receiver, State state)
{
this.receiver = receiver;
this.state = state;
@@ -82,7 +83,7 @@ public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
}
}
- public InputHandler(Receiver<NetworkEvent> receiver)
+ public InputHandler(NetworkEventReceiver receiver)
{
this(receiver, PROTO_HDR);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 2810e7a9e1..bef266f214 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -21,13 +21,13 @@
package org.apache.qpid.transport.network;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
-import org.apache.qpid.transport.Sender;
+
+import org.apache.qpid.transport.ByteBufferSender;
public interface NetworkConnection
{
- Sender<ByteBuffer> getSender();
+ ByteBufferSender getSender();
void start();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
index 45231aa05d..f2735f1800 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.transport.network;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-
-import java.nio.ByteBuffer;
public interface OutgoingNetworkTransport extends NetworkTransport
{
public NetworkConnection getConnection();
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity);
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
index 9d0fe5ddf6..8d19c5a2ce 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
-import java.nio.ByteBuffer;
import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -62,7 +61,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo
private AcceptingThread _acceptor;
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity)
{
int sendBufferSize = settings.getWriteBufferSize();
@@ -159,7 +158,7 @@ public abstract class AbstractNetworkTransport implements OutgoingNetworkTranspo
}
protected abstract NetworkConnection createNetworkConnection(Socket socket,
- Receiver<ByteBuffer> engine,
+ ByteBufferReceiver engine,
Integer sendBufferSize,
Integer receiveBufferSize,
int timeout,
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 5c3124c2ec..5008849ef3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io;
import java.net.Socket;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -31,8 +30,8 @@ import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
@@ -49,7 +48,7 @@ public class IoNetworkConnection implements NetworkConnection
private boolean _principalChecked;
private final Object _lock = new Object();
- public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate,
int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
{
_socket = socket;
@@ -70,7 +69,7 @@ public class IoNetworkConnection implements NetworkConnection
_ioReceiver.initiate();
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _ioSender;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index f33f626601..ccab1d93cf 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -21,9 +21,8 @@
package org.apache.qpid.transport.network.io;
import java.net.Socket;
-import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
public class IoNetworkTransport extends AbstractNetworkTransport
{
@@ -31,7 +30,7 @@ public class IoNetworkTransport extends AbstractNetworkTransport
@Override
protected IoNetworkConnection createNetworkConnection(final Socket socket,
- final Receiver<ByteBuffer> engine,
+ final ByteBufferReceiver engine,
final Integer sendBufferSize,
final Integer receiveBufferSize,
final int timeout,
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 467115c76f..790583e92b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLSocket;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
@@ -47,7 +47,7 @@ final class IoReceiver implements Runnable
private static final Logger log = Logger.get(IoReceiver.class);
- private final Receiver<ByteBuffer> receiver;
+ private final ByteBufferReceiver receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
@@ -61,7 +61,7 @@ final class IoReceiver implements Runnable
shutdownBroken = SystemUtils.isWindows();
}
- public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+ public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout)
{
this.receiver = receiver;
this.bufferSize = bufferSize;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 79e99287c4..61beae4c25 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -27,14 +27,14 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
-public final class IoSender implements Runnable, Sender<ByteBuffer>
+public final class IoSender implements Runnable, ByteBufferSender
{
private static final Logger log = Logger.get(IoSender.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index 68670d1a9d..fe6e707f7e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -21,7 +21,6 @@
package org.apache.qpid.transport.network.io;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.util.Set;
@@ -32,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
@@ -88,7 +87,7 @@ public class NonBlockingConnection implements NetworkConnection
{
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _nonBlockingSenderReceiver;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 347a41ee07..02099dee15 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -40,7 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.Ticker;
@@ -48,7 +48,7 @@ import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
import org.apache.qpid.util.SystemUtils;
-public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
+public class NonBlockingSenderReceiver implements ByteBufferSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 51ef266ee9..271135f411 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.transport.network.security;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
public interface SecurityLayer
{
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
+ public ByteBufferSender sender(ByteBufferSender delegate);
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate);
public String getUserID();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
index 2a2f3d8362..d25e97ffe4 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.transport.network.security;
-import java.nio.ByteBuffer;
-
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -110,14 +108,14 @@ public class SecurityLayerFactory
}
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ public ByteBufferSender sender(ByteBufferSender delegate)
{
SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus);
sender.setHostname(_hostname);
return sender;
}
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
{
SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus);
receiver.setHostname(_hostname);
@@ -141,13 +139,13 @@ public class SecurityLayerFactory
_layer = layer;
}
- public SASLSender sender(Sender<ByteBuffer> delegate)
+ public SASLSender sender(ByteBufferSender delegate)
{
SASLSender sender = new SASLSender(_layer.sender(delegate));
return sender;
}
- public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+ public SASLReceiver receiver(ByteBufferReceiver delegate)
{
SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate));
return receiver;
@@ -169,12 +167,12 @@ public class SecurityLayerFactory
{
}
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ public ByteBufferSender sender(ByteBufferSender delegate)
{
return delegate;
}
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
{
return delegate;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
index 59e9453454..983e3bdf90 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
@@ -21,20 +21,21 @@
package org.apache.qpid.transport.network.security.sasl;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
import javax.security.sasl.SaslException;
-import java.nio.ByteBuffer;
-public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> {
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver {
- private Receiver<ByteBuffer> delegate;
+ private ByteBufferReceiver delegate;
private byte[] netData;
private static final Logger log = Logger.get(SASLReceiver.class);
- public SASLReceiver(Receiver<ByteBuffer> delegate)
+ public SASLReceiver(ByteBufferReceiver delegate)
{
this.delegate = delegate;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index fa1801cb65..335f8992ca 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -21,22 +21,24 @@
package org.apache.qpid.transport.network.security.sasl;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
-
-import javax.security.sasl.SaslException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLSender extends SASLEncryptor implements ByteBufferSender
+{
- private Sender<ByteBuffer> delegate;
+ private ByteBufferSender delegate;
private byte[] appData;
private final AtomicBoolean closed = new AtomicBoolean(false);
private static final Logger log = Logger.get(SASLSender.class);
- public SASLSender(Sender<ByteBuffer> delegate)
+ public SASLSender(ByteBufferSender delegate)
{
this.delegate = delegate;
log.debug("SASL Sender enabled");
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index 1bbf166d82..ce3bace9e8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -28,16 +28,16 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
-public class SSLReceiver implements Receiver<ByteBuffer>
+public class SSLReceiver implements ByteBufferReceiver
{
private static final Logger log = Logger.get(SSLReceiver.class);
- private final Receiver<ByteBuffer> delegate;
+ private final ByteBufferReceiver delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer localBuffer;
@@ -47,7 +47,7 @@ public class SSLReceiver implements Receiver<ByteBuffer>
private String _hostname;
- public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus)
+ public SSLReceiver(final SSLEngine engine, final ByteBufferReceiver delegate, final SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 7d64012fea..755f7430ba 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -19,24 +19,25 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-public class SSLSender implements Sender<ByteBuffer>
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLSender implements ByteBufferSender
{
private static final Logger log = Logger.get(SSLSender.class);
- private final Sender<ByteBuffer> delegate;
+ private final ByteBufferSender delegate;
private final SSLEngine engine;
private final int sslBufSize;
private final ByteBuffer netData;
@@ -48,7 +49,7 @@ public class SSLSender implements Sender<ByteBuffer>
private final AtomicBoolean closed = new AtomicBoolean(false);
- public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
+ public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;