summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-10-17 11:40:37 +0000
committerRafael H. Schloming <rhs@apache.org>2007-10-17 11:40:37 +0000
commitd1316c3af98ae58672f6ba374254cc89cdd57e07 (patch)
treeddb2d26cad076030e617dc73f2dc45c07dd1770c /qpid/java/common
parent51e314d96107657316948f2957f88fd4288eb539 (diff)
downloadqpid-python-d1316c3af98ae58672f6ba374254cc89cdd57e07.tar.gz
added a bit of API around MinaHandler
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@585453 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java36
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java124
2 files changed, 126 insertions, 34 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
new file mode 100644
index 0000000000..18ed97098d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binding.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Binding
+ *
+ */
+
+public interface Binding<E,T>
+{
+
+ E endpoint(Sender<T> sender);
+
+ Receiver<T> receiver(E endpoint);
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
index 30d931663e..6362a6453b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
@@ -35,6 +35,7 @@ import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.qpidity.transport.Binding;
import org.apache.qpidity.transport.Connection;
import org.apache.qpidity.transport.ConnectionDelegate;
import org.apache.qpidity.transport.Receiver;
@@ -54,7 +55,7 @@ import org.apache.qpidity.transport.network.OutputHandler;
* @author Rafael H. Schloming
*/
//RA making this public until we sort out the package issues
-public class MinaHandler implements IoHandler
+public class MinaHandler<E> implements IoHandler
{
private static final Logger log = Logger.get(MinaHandler.class);
@@ -64,18 +65,17 @@ public class MinaHandler implements IoHandler
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
- private final ConnectionDelegate delegate;
- private final InputHandler.State state;
+ private final Binding<E,java.nio.ByteBuffer> binding;
- public MinaHandler(ConnectionDelegate delegate, InputHandler.State state)
+ private MinaHandler(Binding<E,java.nio.ByteBuffer> binding)
{
- this.delegate = delegate;
- this.state = state;
+ this.binding = binding;
}
+
public void messageReceived(IoSession ssn, Object obj)
{
- Attachment attachment = (Attachment) ssn.getAttachment();
+ Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
ByteBuffer buf = (ByteBuffer) obj;
attachment.receiver.received(buf.buf());
}
@@ -98,17 +98,16 @@ public class MinaHandler implements IoHandler
public void sessionOpened(final IoSession ssn)
{
log.debug("opened: %s", this);
- // XXX: hardcoded max-frame
- Connection conn = new Connection
- (new Disassembler(new OutputHandler(new MinaSender(ssn)),
- 64*1024 - 1),
- delegate);
- Receiver<java.nio.ByteBuffer> receiver =
- new InputHandler(new Assembler(conn), state);
- ssn.setAttachment(new Attachment(conn, receiver));
- // XXX
+ E endpoint = binding.endpoint(new MinaSender(ssn));
+ Attachment<E> attachment =
+ new Attachment<E>(endpoint, binding.receiver(endpoint));
+
+ // We need to synchronize and notify here because the MINA
+ // connect future returns the session prior to the attachment
+ // being set. This is arguably a bug in MINA.
synchronized (ssn)
{
+ ssn.setAttachment(attachment);
ssn.notifyAll();
}
}
@@ -116,7 +115,7 @@ public class MinaHandler implements IoHandler
public void sessionClosed(IoSession ssn)
{
log.debug("closed: %s", ssn);
- Attachment attachment = (Attachment) ssn.getAttachment();
+ Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
attachment.receiver.closed();
ssn.setAttachment(null);
}
@@ -126,41 +125,53 @@ public class MinaHandler implements IoHandler
// do nothing
}
- private class Attachment
+ private static class Attachment<E>
{
- Connection connection;
+ E endpoint;
Receiver<java.nio.ByteBuffer> receiver;
- Attachment(Connection connection,
- Receiver<java.nio.ByteBuffer> receiver)
+ Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver)
{
- this.connection = connection;
+ this.endpoint = endpoint;
this.receiver = receiver;
}
}
public static final void accept(String host, int port,
- ConnectionDelegate delegate)
+ Binding<?,java.nio.ByteBuffer> binding)
+ throws IOException
+ {
+ accept(new InetSocketAddress(host, port), binding);
+ }
+
+ public static final <E> void accept(SocketAddress address,
+ Binding<E,java.nio.ByteBuffer> binding)
throws IOException
{
IoAcceptor acceptor = new SocketAcceptor();
- acceptor.bind(new InetSocketAddress(host, port),
- new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
+ acceptor.bind(address, new MinaHandler<E>(binding));
}
- public static final Connection connect(String host, int port,
- ConnectionDelegate delegate)
+ public static final <E> E connect(String host, int port,
+ Binding<E,java.nio.ByteBuffer> binding)
+ {
+ return connect(new InetSocketAddress(host, port), binding);
+ }
+
+ public static final <E> E connect(SocketAddress address,
+ Binding<E,java.nio.ByteBuffer> binding)
{
- MinaHandler handler = new MinaHandler(delegate,
- InputHandler.State.FRAME_HDR);
- SocketAddress addr = new InetSocketAddress(host, port);
+ MinaHandler<E> handler = new MinaHandler<E>(binding);
SocketConnector connector = new SocketConnector();
connector.setWorkerTimeout(0);
- ConnectFuture cf = connector.connect(addr, handler);
+ ConnectFuture cf = connector.connect(address, handler);
cf.join();
IoSession ssn = cf.getSession();
- // XXX
+
+ // We need to synchronize and wait here because the MINA
+ // connect future returns the session prior to the attachment
+ // being set. This is arguably a bug in MINA.
synchronized (ssn)
{
while (ssn.getAttachment() == null)
@@ -175,8 +186,53 @@ public class MinaHandler implements IoHandler
}
}
}
- Attachment attachment = (Attachment) ssn.getAttachment();
- return attachment.connection;
+
+ Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
+ return attachment.endpoint;
+ }
+
+ public static final void accept(String host, int port,
+ ConnectionDelegate delegate)
+ throws IOException
+ {
+ accept(host, port, new ConnectionBinding
+ (delegate, InputHandler.State.PROTO_HDR));
+ }
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ return connect(host, port, new ConnectionBinding
+ (delegate, InputHandler.State.FRAME_HDR));
+ }
+
+ private static class ConnectionBinding
+ implements Binding<Connection,java.nio.ByteBuffer>
+ {
+
+ private final ConnectionDelegate delegate;
+ private final InputHandler.State state;
+
+ ConnectionBinding(ConnectionDelegate delegate,
+ InputHandler.State state)
+ {
+ this.delegate = delegate;
+ this.state = state;
+ }
+
+ public Connection endpoint(Sender<java.nio.ByteBuffer> sender)
+ {
+ // XXX: hardcoded max-frame
+ return new Connection
+ (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+ delegate);
+ }
+
+ public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
+ {
+ return new InputHandler(new Assembler(conn), state);
+ }
+
}
}