summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
commit394823bba7976c170ac58e53b5d80ad12e0f1690 (patch)
tree9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/common/src
parente78747f63bc73daa6e2035453358e6eaf3237b84 (diff)
downloadqpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyBroker.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/ToyClient.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Channel.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java106
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java287
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java242
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java38
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Echo.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java145
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java67
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionException.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionListener.java40
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Sink.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java63
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java33
25 files changed, 826 insertions, 354 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/java/common/src/main/java/org/apache/qpid/ToyBroker.java
index 83d434b20a..db84b83adb 100644
--- a/java/common/src/main/java/org/apache/qpid/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyBroker.java
@@ -174,23 +174,14 @@ class ToyBroker extends SessionDelegate
public static final void main(String[] args) throws IOException
{
final ToyExchange exchange = new ToyExchange();
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new ToyBroker(exchange);
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
MinaHandler.accept("0.0.0.0", 5672, delegate);
}
diff --git a/java/common/src/main/java/org/apache/qpid/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java
index cb10859c9f..79bb286d76 100644
--- a/java/common/src/main/java/org/apache/qpid/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -56,7 +56,7 @@ class ToyClient extends SessionDelegate
public static final void main(String[] args)
{
Connection conn = MinaHandler.connect("0.0.0.0", 5672,
- new ClientDelegate()
+ new ClientDelegate(null, "guest", "guest")
{
public SessionDelegate getSessionDelegate()
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
index d6b015930b..d973739ed6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
@@ -112,7 +112,7 @@ public class Channel extends Invoker
public void closed()
{
- log.debug("channel closed: ", this);
+ log.debug("channel closed: %s", this);
if (session != null)
{
session.closed();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index bbdadfadb9..316c26429e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -20,21 +20,123 @@
*/
package org.apache.qpid.transport;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.qpid.QpidException;
+
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+
+import static org.apache.qpid.transport.Connection.State.*;
+
/**
* ClientDelegate
*
*/
-public abstract class ClientDelegate extends ConnectionDelegate
+public class ClientDelegate extends ConnectionDelegate
{
+ private String vhost;
+ private String username;
+ private String password;
+
+ public ClientDelegate(String vhost, String username, String password)
+ {
+ this.vhost = vhost;
+ this.username = username;
+ this.password = password;
+ }
+
public void init(Channel ch, ProtocolHeader hdr)
{
if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
{
- throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor());
+ Connection conn = ch.getConnection();
+ conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()));
+ }
+ }
+
+ @Override public void connectionStart(Channel ch, ConnectionStart start)
+ {
+ Connection conn = ch.getConnection();
+ List<Object> mechanisms = start.getMechanisms();
+ if (mechanisms == null || mechanisms.isEmpty())
+ {
+ ch.connectionStartOk
+ (Collections.EMPTY_MAP, null, null, conn.getLocale());
+ return;
}
+
+ String[] mechs = new String[mechanisms.size()];
+ mechanisms.toArray(mechs);
+
+ try
+ {
+ UsernamePasswordCallbackHandler handler =
+ new UsernamePasswordCallbackHandler();
+ handler.initialise(username, password);
+ SaslClient sc = Sasl.createSaslClient
+ (new String[] {"PLAIN"}, null, "AMQP", "localhost", null, handler);
+ conn.setSaslClient(sc);
+
+ byte[] response = sc.hasInitialResponse() ?
+ sc.evaluateChallenge(new byte[0]) : null;
+ ch.connectionStartOk
+ (Collections.EMPTY_MAP, sc.getMechanismName(), response,
+ conn.getLocale());
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ @Override public void connectionSecure(Channel ch, ConnectionSecure secure)
+ {
+ Connection conn = ch.getConnection();
+ SaslClient sc = conn.getSaslClient();
+ try
+ {
+ byte[] response = sc.evaluateChallenge(secure.getChallenge());
+ ch.connectionSecureOk(response);
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ @Override public void connectionTune(Channel ch, ConnectionTune tune)
+ {
+ Connection conn = ch.getConnection();
+ conn.setChannelMax(tune.getChannelMax());
+ ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+ ch.connectionOpen(vhost, null, Option.INSIST);
+ }
+
+ @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok)
+ {
+ ch.getConnection().setState(OPEN);
+ }
+
+ @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir)
+ {
+ throw new UnsupportedOperationException();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 68b9b209bb..ae9420eb1a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -20,14 +20,22 @@
*/
package org.apache.qpid.transport;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import static org.apache.qpid.transport.Connection.State.*;
/**
@@ -44,23 +52,164 @@ public class Connection
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
+ enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+
private static final Logger log = Logger.get(Connection.class);
- final private Sender<ProtocolEvent> sender;
- final private ConnectionDelegate delegate;
+ class DefaultConnectionListener implements ConnectionListener
+ {
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException exception)
+ {
+ throw exception;
+ }
+ public void closed(Connection conn) {}
+ }
+
+ private ConnectionDelegate delegate;
+ private Sender<ProtocolEvent> sender;
+
+ final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+
+ private State state = NEW;
+ private Object lock = new Object();
+ private long timeout = 60000;
+ private ConnectionListener listener = new DefaultConnectionListener();
+ private Throwable error = null;
+
private int channelMax = 1;
+ private String locale;
+ private SaslServer saslServer;
+ private SaslClient saslClient;
+
// want to make this final
private int _connectionId;
- final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+ public Connection() {}
- public Connection(Sender<ProtocolEvent> sender,
- ConnectionDelegate delegate)
+ public void setConnectionDelegate(ConnectionDelegate delegate)
{
- this.sender = sender;
this.delegate = delegate;
}
+ public void setConnectionListener(ConnectionListener listener)
+ {
+ if (listener == null)
+ {
+ this.listener = new DefaultConnectionListener();
+ }
+ else
+ {
+ this.listener = listener;
+ }
+ }
+
+ public Sender<ProtocolEvent> getSender()
+ {
+ return sender;
+ }
+
+ public void setSender(Sender<ProtocolEvent> sender)
+ {
+ this.sender = sender;
+ }
+
+ void setState(State state)
+ {
+ synchronized (lock)
+ {
+ this.state = state;
+ lock.notifyAll();
+ }
+ }
+
+ void setLocale(String locale)
+ {
+ this.locale = locale;
+ }
+
+ String getLocale()
+ {
+ return locale;
+ }
+
+ void setSaslServer(SaslServer saslServer)
+ {
+ this.saslServer = saslServer;
+ }
+
+ SaslServer getSaslServer()
+ {
+ return saslServer;
+ }
+
+ void setSaslClient(SaslClient saslClient)
+ {
+ this.saslClient = saslClient;
+ }
+
+ SaslClient getSaslClient()
+ {
+ return saslClient;
+ }
+
+ public void connect(String host, int port, String vhost, String username, String password)
+ {
+ synchronized (lock)
+ {
+ state = OPENING;
+
+ delegate = new ClientDelegate(vhost, username, password);
+
+ IoTransport.connect(host, port, ConnectionBinding.get(this));
+ send(new ProtocolHeader(1, 0, 10));
+
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state == OPENING && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ Throwable t = error;
+ error = null;
+ close();
+ throw new ConnectionException(t);
+ }
+
+ switch (state)
+ {
+ case OPENING:
+ close();
+ throw new ConnectionException("connect() timed out");
+ case OPEN:
+ break;
+ case CLOSED:
+ throw new ConnectionException("connect() aborted");
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ }
+
+ listener.opened(this);
+ }
+
+ public Session createSession()
+ {
+ return createSession(0);
+ }
+
+ public Session createSession(long expiryInSeconds)
+ {
+ Channel ch = getChannel();
+ Session ssn = new Session(UUID.randomUUID().toString().getBytes());
+ ssn.attach(ch);
+ ssn.sessionAttach(ssn.getName());
+ ssn.sessionRequestTimeout(expiryInSeconds);
+ return ssn;
+ }
+
public void setConnectionId(int id)
{
_connectionId = id;
@@ -86,7 +235,12 @@ public class Connection
public void send(ProtocolEvent event)
{
log.debug("SEND: [%s] %s", this, event);
- sender.send(event);
+ Sender s = sender;
+ if (s == null)
+ {
+ throw new ConnectionException("connection closed");
+ }
+ s.send(event);
}
public void flush()
@@ -107,7 +261,7 @@ public class Connection
public Channel getChannel()
{
- synchronized (channels)
+ synchronized (lock)
{
for (int i = 0; i < getChannelMax(); i++)
{
@@ -123,7 +277,7 @@ public class Connection
public Channel getChannel(int number)
{
- synchronized (channels)
+ synchronized (lock)
{
Channel channel = channels.get(number);
if (channel == null)
@@ -137,45 +291,146 @@ public class Connection
void removeChannel(int number)
{
- synchronized (channels)
+ synchronized (lock)
{
channels.remove(number);
}
}
+ public void exception(ConnectionException e)
+ {
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPENING:
+ case CLOSING:
+ error = e;
+ lock.notifyAll();
+ break;
+ default:
+ listener.exception(this, e);
+ break;
+ }
+ }
+ }
+
public void exception(Throwable t)
{
- delegate.exception(t);
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPENING:
+ case CLOSING:
+ error = t;
+ lock.notifyAll();
+ break;
+ default:
+ listener.exception(this, new ConnectionException(t));
+ break;
+ }
+ }
}
void closeCode(ConnectionClose close)
{
- synchronized (channels)
+ synchronized (lock)
{
for (Channel ch : channels.values())
{
ch.closeCode(close);
}
+ ConnectionCloseCode code = close.getReplyCode();
+ if (code != ConnectionCloseCode.NORMAL)
+ {
+ exception(new ConnectionException(close));
+ }
}
}
public void closed()
{
log.debug("connection closed: %s", this);
- synchronized (channels)
+
+ if (state == OPEN)
+ {
+ exception(new ConnectionException("connection aborted"));
+ }
+
+ synchronized (lock)
{
List<Channel> values = new ArrayList<Channel>(channels.values());
for (Channel ch : values)
{
ch.closed();
}
+
+ sender = null;
+ setState(CLOSED);
}
- delegate.closed();
+
+ listener.closed(this);
}
public void close()
{
- sender.close();
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ Channel ch = getChannel(0);
+ state = CLOSING;
+ ch.connectionClose(ConnectionCloseCode.NORMAL, null);
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state == CLOSING && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ close();
+ throw new ConnectionException(error);
+ }
+
+ switch (state)
+ {
+ case CLOSING:
+ close();
+ throw new ConnectionException("close() timed out");
+ case CLOSED:
+ break;
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ break;
+ case CLOSED:
+ break;
+ default:
+ if (sender != null)
+ {
+ sender.close();
+ w = new Waiter(lock, timeout);
+ while (w.hasTime() && sender != null && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ throw new ConnectionException(error);
+ }
+
+ if (sender != null)
+ {
+ throw new ConnectionException("close() timed out");
+ }
+ }
+ break;
+ }
+ }
}
public String toString()
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 2aa1db7b28..9056a3120b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -22,22 +22,7 @@ package org.apache.qpid.transport;
import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.SecurityHelper;
-import org.apache.qpid.QpidException;
-
-import java.io.UnsupportedEncodingException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import static org.apache.qpid.transport.Connection.State.*;
/**
@@ -57,231 +42,26 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
private static final Logger log = Logger.get(ConnectionDelegate.class);
- private String _username = "guest";
- private String _password = "guest";;
- private String _mechanism;
- private String _virtualHost;
- private SaslClient saslClient;
- private SaslServer saslServer;
- private String _locale = "utf8";
- private int maxFrame = 64*1024;
- private Condition _negotiationComplete;
- private Lock _negotiationCompleteLock;
-
- public abstract SessionDelegate getSessionDelegate();
-
- public abstract void exception(Throwable t);
-
- public abstract void closed();
-
- public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
- {
- _negotiationComplete = negotiationComplete;
- _negotiationCompleteLock = negotiationCompleteLock;
- }
-
- public void init(Channel ch, ProtocolHeader hdr)
- {
- ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor()));
- List<Object> plain = new ArrayList<Object>();
- plain.add("PLAIN");
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- ch.connectionStart(null, plain, utf8);
- }
-
- // ----------------------------------------------
- // Client side
- //-----------------------------------------------
- @Override public void connectionStart(Channel context, ConnectionStart struct)
- {
- String mechanism = null;
- byte[] response = null;
- try
- {
- mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
- saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
- SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
- response = saslClient.evaluateChallenge(new byte[0]);
- }
- catch (UnsupportedEncodingException e)
- {
- // need error handling
- }
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }
-
- Map<String,Object> props = new HashMap<String,Object>();
- context.connectionStartOk(props, mechanism, response, _locale);
- }
-
- @Override public void connectionSecure(Channel context, ConnectionSecure struct)
- {
- try
- {
- byte[] response = saslClient.evaluateChallenge(struct.getChallenge());
- context.connectionSecureOk(response);
- }
- catch (SaslException e)
- {
- // need error handling
- }
- }
-
- @Override public void connectionTune(Channel context, ConnectionTune struct)
- {
- context.getConnection().setChannelMax(struct.getChannelMax());
- context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax());
- context.connectionOpen(_virtualHost, null, Option.INSIST);
- }
-
-
- @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
- {
- List<Object> knownHosts = struct.getKnownHosts();
- if(_negotiationCompleteLock != null)
- {
- _negotiationCompleteLock.lock();
- try
- {
- _negotiationComplete.signalAll();
- }
- finally
- {
- _negotiationCompleteLock.unlock();
- }
- }
- }
-
- public void connectionRedirect(Channel context, ConnectionRedirect struct)
- {
- // not going to bother at the moment
- }
-
- // ----------------------------------------------
- // Server side
- //-----------------------------------------------
- @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
+ public SessionDelegate getSessionDelegate()
{
- //set the client side locale on the server side
- _locale = struct.getLocale();
- _mechanism = struct.getMechanism();
-
- //try
- //{
- //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
- byte[] challenge = null;
- if ( challenge == null)
- {
- context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
- }
- else
- {
- try
- {
- context.connectionSecure(challenge);
- }
- catch(Exception e)
- {
-
- }
- }
-
-
- /*}
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }*/
+ return new SessionDelegate();
}
- @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
- {
- try
- {
- saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- byte[] challenge = saslServer.evaluateResponse(struct.getResponse());
- if ( challenge == null)
- {
- context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
- }
- else
- {
- try
- {
- context.connectionSecure(challenge);
- }
- catch(Exception e)
- {
-
- }
- }
-
-
- }
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }
- }
-
-
- @Override public void connectionOpen(Channel context, ConnectionOpen struct)
- {
- List<Object> hosts = new ArrayList<Object>();
- hosts.add("amqp:1223243232325");
- context.connectionOpenOk(hosts);
- }
+ public abstract void init(Channel ch, ProtocolHeader hdr);
@Override public void connectionClose(Channel ch, ConnectionClose close)
{
- ch.getConnection().closeCode(close);
+ Connection conn = ch.getConnection();
ch.connectionCloseOk();
+ conn.getSender().close();
+ conn.closeCode(close);
+ conn.setState(CLOSE_RCVD);
}
- public String getPassword()
- {
- return _password;
- }
-
- public void setPassword(String password)
- {
- _password = password;
- }
-
- public String getUsername()
- {
- return _username;
- }
-
- public void setUsername(String username)
- {
- _username = username;
- }
-
- public String getVirtualHost()
- {
- return _virtualHost;
- }
-
- public void setVirtualHost(String host)
+ @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok)
{
- _virtualHost = host;
+ Connection conn = ch.getConnection();
+ conn.getSender().close();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
index c3239ef684..1bd7d516cf 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
@@ -26,17 +26,37 @@ package org.apache.qpid.transport;
*
*/
-public class ConnectionException extends RuntimeException
+public class ConnectionException extends TransportException
{
private ConnectionClose close;
- public ConnectionException(ConnectionClose close)
+ public ConnectionException(String message, ConnectionClose close, Throwable cause)
{
- super(close.getReplyText());
+ super(message, cause);
this.close = close;
}
+ public ConnectionException(String message)
+ {
+ this(message, null, null);
+ }
+
+ public ConnectionException(String message, Throwable cause)
+ {
+ this(message, null, cause);
+ }
+
+ public ConnectionException(Throwable cause)
+ {
+ this(cause.getMessage(), null, cause);
+ }
+
+ public ConnectionException(ConnectionClose close)
+ {
+ this(close.getReplyText(), close, null);
+ }
+
public ConnectionClose getClose()
{
return close;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
new file mode 100644
index 0000000000..616e76825a
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * ConnectionListener
+ *
+ */
+
+public interface ConnectionListener
+{
+
+ void opened(Connection connection);
+
+ void exception(Connection connection, ConnectionException exception);
+
+ void closed(Connection connection);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index b2be32331a..89b59c2512 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -43,25 +43,16 @@ public class Echo extends SessionDelegate
public static final void main(String[] args) throws IOException
{
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new Echo();
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
IoAcceptor ioa = new IoAcceptor
- ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
ioa.start();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
index 2de0c169a5..0cca0227a1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
@@ -26,7 +26,7 @@ package org.apache.qpid.transport;
*
*/
-public final class ProtocolVersionException extends TransportException
+public final class ProtocolVersionException extends ConnectionException
{
private final byte major;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
new file mode 100644
index 0000000000..c27419cadc
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.Collections;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.qpid.QpidException;
+
+import org.apache.qpid.SecurityHelper;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+
+import static org.apache.qpid.transport.Connection.State.*;
+
+
+/**
+ * ServerDelegate
+ *
+ */
+
+public class ServerDelegate extends ConnectionDelegate
+{
+
+ private SaslServer saslServer;
+
+ public void init(Channel ch, ProtocolHeader hdr)
+ {
+ Connection conn = ch.getConnection();
+ conn.send(new ProtocolHeader(1, 0, 10));
+ List<Object> utf8 = new ArrayList<Object>();
+ utf8.add("utf8");
+ ch.connectionStart(null, Collections.EMPTY_LIST, utf8);
+ }
+
+ @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok)
+ {
+ Connection conn = ch.getConnection();
+ conn.setLocale(ok.getLocale());
+ String mechanism = ok.getMechanism();
+
+ if (mechanism == null || mechanism.length() == 0)
+ {
+ ch.connectionTune
+ (Integer.MAX_VALUE,
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, Integer.MAX_VALUE);
+ return;
+ }
+
+ try
+ {
+ SaslServer ss = Sasl.createSaslServer
+ (mechanism, "AMQP", "localhost", null, null);
+ if (ss == null)
+ {
+ ch.connectionClose
+ (ConnectionCloseCode.CONNECTION_FORCED,
+ "null SASL mechanism: " + mechanism);
+ return;
+ }
+ conn.setSaslServer(ss);
+ secure(ch, ok.getResponse());
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ private void secure(Channel ch, byte[] response)
+ {
+ Connection conn = ch.getConnection();
+ SaslServer ss = conn.getSaslServer();
+ try
+ {
+ byte[] challenge = ss.evaluateResponse(response);
+ if (ss.isComplete())
+ {
+ ss.dispose();
+ ch.connectionTune
+ (Integer.MAX_VALUE,
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, Integer.MAX_VALUE);
+ }
+ else
+ {
+ ch.connectionSecure(challenge);
+ }
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok)
+ {
+ secure(ch, ok.getResponse());
+ }
+
+ @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok)
+ {
+
+ }
+
+ @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ {
+ Connection conn = ch.getConnection();
+ ch.connectionOpenOk(Collections.EMPTY_LIST);
+ conn.setState(OPEN);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 10ca6cfb0a..df4313e15b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -49,6 +49,26 @@ public class Session extends Invoker
private static final Logger log = Logger.get(Session.class);
+ class DefaultSessionListener implements SessionListener
+ {
+
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ log.info("message: %s", xfr);
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ throw exc;
+ }
+
+ public void closed(Session ssn) {}
+ }
+
+ public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
+
private static boolean ENABLE_REPLAY = false;
static
@@ -65,6 +85,7 @@ public class Session extends Invoker
}
private byte[] name;
+ private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
@@ -97,6 +118,23 @@ public class Session extends Invoker
return name;
}
+ public void setSessionListener(SessionListener listener)
+ {
+ if (listener == null)
+ {
+ this.listener = new DefaultSessionListener();
+ }
+ else
+ {
+ this.listener = listener;
+ }
+ }
+
+ public SessionListener getSessionListener()
+ {
+ return listener;
+ }
+
public void setAutoSync(boolean value)
{
synchronized (commands)
@@ -270,8 +308,8 @@ public class Session extends Invoker
{
if (closed.get())
{
- List<ExecutionException> exc = getExceptions();
- if (!exc.isEmpty())
+ ExecutionException exc = getException();
+ if (exc != null)
{
throw new SessionException(exc);
}
@@ -361,7 +399,7 @@ public class Session extends Invoker
{
if (closed.get())
{
- throw new SessionException(getExceptions());
+ throw new SessionException(getException());
}
else
{
@@ -375,8 +413,7 @@ public class Session extends Invoker
private Map<Integer,ResultFuture<?>> results =
new HashMap<Integer,ResultFuture<?>>();
- private List<ExecutionException> exceptions =
- new ArrayList<ExecutionException>();
+ private ExecutionException exception = null;
void result(int command, Struct result)
{
@@ -388,11 +425,17 @@ public class Session extends Invoker
future.set(result);
}
- void addException(ExecutionException exc)
+ void setException(ExecutionException exc)
{
- synchronized (exceptions)
+ synchronized (results)
{
- exceptions.add(exc);
+ if (exception != null)
+ {
+ throw new IllegalStateException
+ (String.format
+ ("too many exceptions: %s, %s", exception, exc));
+ }
+ exception = exc;
}
}
@@ -403,11 +446,11 @@ public class Session extends Invoker
this.close = close;
}
- List<ExecutionException> getExceptions()
+ ExecutionException getException()
{
- synchronized (exceptions)
+ synchronized (results)
{
- return new ArrayList<ExecutionException>(exceptions);
+ return exception;
}
}
@@ -473,7 +516,7 @@ public class Session extends Invoker
}
else if (closed.get())
{
- throw new SessionException(getExceptions());
+ throw new SessionException(getException());
}
else
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
index d2c54cf339..354e5c1d15 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
@@ -33,7 +33,7 @@ public class SessionClosedException extends SessionException
public SessionClosedException()
{
- super(Collections.EMPTY_LIST);
+ super(null);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index b91763509c..3e6fa9d5d9 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -29,7 +29,7 @@ import org.apache.qpid.transport.network.Frame;
* @author Rafael H. Schloming
*/
-public abstract class SessionDelegate
+public class SessionDelegate
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
@@ -57,7 +57,7 @@ public abstract class SessionDelegate
@Override public void executionException(Session ssn, ExecutionException exc)
{
- ssn.addException(exc);
+ ssn.setException(exc);
}
@Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
@@ -122,4 +122,9 @@ public abstract class SessionDelegate
ssn.syncPoint();
}
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ssn.getSessionListener().message(ssn, xfr);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
index dc294b2206..ae9b4b9cdb 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
@@ -27,20 +27,20 @@ import java.util.List;
*
*/
-public class SessionException extends RuntimeException
+public class SessionException extends TransportException
{
- private List<ExecutionException> exceptions;
+ private ExecutionException exception;
- public SessionException(List<ExecutionException> exceptions)
+ public SessionException(ExecutionException exception)
{
- super(exceptions.isEmpty() ? "" : exceptions.toString());
- this.exceptions = exceptions;
+ super(String.valueOf(exception));
+ this.exception = exception;
}
- public List<ExecutionException> getExceptions()
+ public ExecutionException getException()
{
- return exceptions;
+ return exception;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
new file mode 100644
index 0000000000..63690177f9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * SessionListener
+ *
+ */
+
+public interface SessionListener
+{
+
+ void opened(Session session);
+
+ void message(Session ssn, MessageTransfer xfr);
+
+ void exception(Session session, SessionException exception);
+
+ void closed(Session session);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
index 8653acedbe..617867cae6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Sink.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java
@@ -103,28 +103,17 @@ public class Sink extends SessionDelegate
public static final void main(String[] args) throws IOException
{
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new Sink();
}
-
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
-
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
IoAcceptor ioa = new IoAcceptor
- ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
System.out.println
(String.format
(FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate"));
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 6886cb3a5a..8a2aba2e6d 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -33,23 +33,46 @@ import org.apache.qpid.transport.Sender;
*
*/
-public class ConnectionBinding implements Binding<Connection,ByteBuffer>
+public abstract class ConnectionBinding
+ implements Binding<Connection,ByteBuffer>
{
- private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
- private final ConnectionDelegate delegate;
+ public static Binding<Connection,ByteBuffer> get(final Connection connection)
+ {
+ return new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ return connection;
+ }
+ };
+ }
- public ConnectionBinding(ConnectionDelegate delegate)
+ public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
{
- this.delegate = delegate;
+ return new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ Connection conn = new Connection();
+ conn.setConnectionDelegate(delegate);
+ return conn;
+ }
+ };
}
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ public abstract Connection connection();
+
public Connection endpoint(Sender<ByteBuffer> sender)
{
+ Connection conn = connection();
+
// XXX: hardcoded max-frame
- return new Connection
- (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
+ Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE);
+ conn.setSender(dis);
+ return conn;
}
public Receiver<ByteBuffer> receiver(Connection conn)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index a1fb0371fd..5efd51d5db 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -61,7 +61,7 @@ final class IoReceiver extends Thread
start();
}
- void close()
+ void close(boolean block)
{
if (!closed.getAndSet(true))
{
@@ -75,7 +75,7 @@ final class IoReceiver extends Thread
{
socket.shutdownInput();
}
- if (Thread.currentThread() != this)
+ if (block && Thread.currentThread() != this)
{
join(timeout);
if (isAlive())
@@ -121,6 +121,7 @@ final class IoReceiver extends Thread
}
}
}
+ socket.close();
}
catch (Throwable t)
{
@@ -129,7 +130,6 @@ final class IoReceiver extends Thread
finally
{
receiver.closed();
- transport.getSender().close();
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index ef892744ab..f70a13ec3c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -196,17 +196,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer>
throw new TransportException("join timed out");
}
}
- transport.getReceiver().close();
- socket.close();
+ transport.getReceiver().close(false);
}
catch (InterruptedException e)
{
throw new TransportException(e);
}
- catch (IOException e)
- {
- throw new TransportException(e);
- }
if (reportException && exception != null)
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index 70fd8a3c06..be17766740 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -108,7 +108,7 @@ public final class IoTransport<E>
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding(delegate));
+ return connect(host, port, ConnectionBinding.get(delegate));
}
public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
index f8dbec3c3d..b89eed48b0 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
@@ -262,13 +262,13 @@ public class MinaHandler<E> implements IoHandler
ConnectionDelegate delegate)
throws IOException
{
- accept(host, port, new ConnectionBinding(delegate));
+ accept(host, port, ConnectionBinding.get(delegate));
}
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding(delegate));
+ return connect(host, port, ConnectionBinding.get(delegate));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
index 318fe0d03b..3bc6730623 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
@@ -87,8 +87,9 @@ public class NioHandler implements Runnable
}
NioSender sender = new NioSender(_ch);
- Connection con = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
+ Connection con = new Connection();
+ con.setSender(new Disassembler(sender, 64*1024 - 1));
+ con.setConnectionDelegate(delegate);
con.setConnectionId(_count.incrementAndGet());
_handlers.put(con.getConnectionId(),sender);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
new file mode 100644
index 0000000000..e034d779ca
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.util;
+
+
+/**
+ * Waiter
+ *
+ */
+
+public final class Waiter
+{
+
+ private final Object lock;
+ private final long timeout;
+ private final long start;
+ private long elapsed;
+
+ public Waiter(Object lock, long timeout)
+ {
+ this.lock = lock;
+ this.timeout = timeout;
+ this.start = System.currentTimeMillis();
+ this.elapsed = 0;
+ }
+
+ public boolean hasTime()
+ {
+ return elapsed < timeout;
+ }
+
+ public void await()
+ {
+ try
+ {
+ lock.wait(timeout - elapsed);
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ elapsed = System.currentTimeMillis() - start;
+ }
+
+}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index b9ca210483..e61ffb501b 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -50,38 +50,30 @@ public class ConnectionTest extends TestCase
port = AvailablePortFinder.getNextAvailable(12000);
- ConnectionDelegate server = new ConnectionDelegate() {
- public void init(Channel ch, ProtocolHeader hdr) {
+ ConnectionDelegate server = new ServerDelegate() {
+ @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ {
+ super.connectionOpen(ch, open);
ch.getConnection().close();
}
-
- public SessionDelegate getSessionDelegate() {
- return new SessionDelegate() {};
- }
- public void exception(Throwable t) {
- log.error(t, "exception caught");
- }
- public void closed() {}
};
IoAcceptor ioa = new IoAcceptor
- ("localhost", port, new ConnectionBinding(server));
+ ("localhost", port, ConnectionBinding.get(server));
ioa.start();
}
private Connection connect(final Condition closed)
{
- Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate()
+ Connection conn = new Connection();
+ conn.setConnectionListener(new ConnectionListener()
{
- public SessionDelegate getSessionDelegate()
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException exc)
{
- return new SessionDelegate() {};
+ exc.printStackTrace();
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed()
+ public void closed(Connection conn)
{
if (closed != null)
{
@@ -89,8 +81,7 @@ public class ConnectionTest extends TestCase
}
}
});
-
- conn.send(new ProtocolHeader(1, 0, 10));
+ conn.connect("localhost", port, null, "guest", "guest");
return conn;
}