diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
| commit | 394823bba7976c170ac58e53b5d80ad12e0f1690 (patch) | |
| tree | 9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/common/src | |
| parent | e78747f63bc73daa6e2035453358e6eaf3237b84 (diff) | |
| download | qpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz | |
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
25 files changed, 826 insertions, 354 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/java/common/src/main/java/org/apache/qpid/ToyBroker.java index 83d434b20a..db84b83adb 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpid/ToyBroker.java @@ -174,23 +174,14 @@ class ToyBroker extends SessionDelegate public static final void main(String[] args) throws IOException { final ToyExchange exchange = new ToyExchange(); - ConnectionDelegate delegate = new ConnectionDelegate() + ConnectionDelegate delegate = new ServerDelegate() { public SessionDelegate getSessionDelegate() { return new ToyBroker(exchange); } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} }; - //hack - delegate.setUsername("guest"); - delegate.setPassword("guest"); - MinaHandler.accept("0.0.0.0", 5672, delegate); } diff --git a/java/common/src/main/java/org/apache/qpid/ToyClient.java b/java/common/src/main/java/org/apache/qpid/ToyClient.java index cb10859c9f..79bb286d76 100644 --- a/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -56,7 +56,7 @@ class ToyClient extends SessionDelegate public static final void main(String[] args) { Connection conn = MinaHandler.connect("0.0.0.0", 5672, - new ClientDelegate() + new ClientDelegate(null, "guest", "guest") { public SessionDelegate getSessionDelegate() { diff --git a/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java index d6b015930b..d973739ed6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java @@ -112,7 +112,7 @@ public class Channel extends Invoker public void closed() { - log.debug("channel closed: ", this); + log.debug("channel closed: %s", this); if (session != null) { session.closed(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index bbdadfadb9..316c26429e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,21 +20,123 @@ */ package org.apache.qpid.transport; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import java.io.UnsupportedEncodingException; + +import org.apache.qpid.QpidException; + +import org.apache.qpid.security.UsernamePasswordCallbackHandler; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + + +import static org.apache.qpid.transport.Connection.State.*; + /** * ClientDelegate * */ -public abstract class ClientDelegate extends ConnectionDelegate +public class ClientDelegate extends ConnectionDelegate { + private String vhost; + private String username; + private String password; + + public ClientDelegate(String vhost, String username, String password) + { + this.vhost = vhost; + this.username = username; + this.password = password; + } + public void init(Channel ch, ProtocolHeader hdr) { if (hdr.getMajor() != 0 && hdr.getMinor() != 10) { - throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()); + Connection conn = ch.getConnection(); + conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor())); + } + } + + @Override public void connectionStart(Channel ch, ConnectionStart start) + { + Connection conn = ch.getConnection(); + List<Object> mechanisms = start.getMechanisms(); + if (mechanisms == null || mechanisms.isEmpty()) + { + ch.connectionStartOk + (Collections.EMPTY_MAP, null, null, conn.getLocale()); + return; } + + String[] mechs = new String[mechanisms.size()]; + mechanisms.toArray(mechs); + + try + { + UsernamePasswordCallbackHandler handler = + new UsernamePasswordCallbackHandler(); + handler.initialise(username, password); + SaslClient sc = Sasl.createSaslClient + (new String[] {"PLAIN"}, null, "AMQP", "localhost", null, handler); + conn.setSaslClient(sc); + + byte[] response = sc.hasInitialResponse() ? + sc.evaluateChallenge(new byte[0]) : null; + ch.connectionStartOk + (Collections.EMPTY_MAP, sc.getMechanismName(), response, + conn.getLocale()); + } + catch (SaslException e) + { + conn.exception(e); + } + } + + @Override public void connectionSecure(Channel ch, ConnectionSecure secure) + { + Connection conn = ch.getConnection(); + SaslClient sc = conn.getSaslClient(); + try + { + byte[] response = sc.evaluateChallenge(secure.getChallenge()); + ch.connectionSecureOk(response); + } + catch (SaslException e) + { + conn.exception(e); + } + } + + @Override public void connectionTune(Channel ch, ConnectionTune tune) + { + Connection conn = ch.getConnection(); + conn.setChannelMax(tune.getChannelMax()); + ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax()); + ch.connectionOpen(vhost, null, Option.INSIST); + } + + @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok) + { + ch.getConnection().setState(OPEN); + } + + @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir) + { + throw new UnsupportedOperationException(); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 68b9b209bb..ae9420eb1a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,14 +20,22 @@ */ package org.apache.qpid.transport; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; +import java.util.UUID; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslServer; + +import static org.apache.qpid.transport.Connection.State.*; /** @@ -44,23 +52,164 @@ public class Connection implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { + enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + private static final Logger log = Logger.get(Connection.class); - final private Sender<ProtocolEvent> sender; - final private ConnectionDelegate delegate; + class DefaultConnectionListener implements ConnectionListener + { + public void opened(Connection conn) {} + public void exception(Connection conn, ConnectionException exception) + { + throw exception; + } + public void closed(Connection conn) {} + } + + private ConnectionDelegate delegate; + private Sender<ProtocolEvent> sender; + + final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); + + private State state = NEW; + private Object lock = new Object(); + private long timeout = 60000; + private ConnectionListener listener = new DefaultConnectionListener(); + private Throwable error = null; + private int channelMax = 1; + private String locale; + private SaslServer saslServer; + private SaslClient saslClient; + // want to make this final private int _connectionId; - final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); + public Connection() {} - public Connection(Sender<ProtocolEvent> sender, - ConnectionDelegate delegate) + public void setConnectionDelegate(ConnectionDelegate delegate) { - this.sender = sender; this.delegate = delegate; } + public void setConnectionListener(ConnectionListener listener) + { + if (listener == null) + { + this.listener = new DefaultConnectionListener(); + } + else + { + this.listener = listener; + } + } + + public Sender<ProtocolEvent> getSender() + { + return sender; + } + + public void setSender(Sender<ProtocolEvent> sender) + { + this.sender = sender; + } + + void setState(State state) + { + synchronized (lock) + { + this.state = state; + lock.notifyAll(); + } + } + + void setLocale(String locale) + { + this.locale = locale; + } + + String getLocale() + { + return locale; + } + + void setSaslServer(SaslServer saslServer) + { + this.saslServer = saslServer; + } + + SaslServer getSaslServer() + { + return saslServer; + } + + void setSaslClient(SaslClient saslClient) + { + this.saslClient = saslClient; + } + + SaslClient getSaslClient() + { + return saslClient; + } + + public void connect(String host, int port, String vhost, String username, String password) + { + synchronized (lock) + { + state = OPENING; + + delegate = new ClientDelegate(vhost, username, password); + + IoTransport.connect(host, port, ConnectionBinding.get(this)); + send(new ProtocolHeader(1, 0, 10)); + + Waiter w = new Waiter(lock, timeout); + while (w.hasTime() && state == OPENING && error == null) + { + w.await(); + } + + if (error != null) + { + Throwable t = error; + error = null; + close(); + throw new ConnectionException(t); + } + + switch (state) + { + case OPENING: + close(); + throw new ConnectionException("connect() timed out"); + case OPEN: + break; + case CLOSED: + throw new ConnectionException("connect() aborted"); + default: + throw new IllegalStateException(String.valueOf(state)); + } + } + + listener.opened(this); + } + + public Session createSession() + { + return createSession(0); + } + + public Session createSession(long expiryInSeconds) + { + Channel ch = getChannel(); + Session ssn = new Session(UUID.randomUUID().toString().getBytes()); + ssn.attach(ch); + ssn.sessionAttach(ssn.getName()); + ssn.sessionRequestTimeout(expiryInSeconds); + return ssn; + } + public void setConnectionId(int id) { _connectionId = id; @@ -86,7 +235,12 @@ public class Connection public void send(ProtocolEvent event) { log.debug("SEND: [%s] %s", this, event); - sender.send(event); + Sender s = sender; + if (s == null) + { + throw new ConnectionException("connection closed"); + } + s.send(event); } public void flush() @@ -107,7 +261,7 @@ public class Connection public Channel getChannel() { - synchronized (channels) + synchronized (lock) { for (int i = 0; i < getChannelMax(); i++) { @@ -123,7 +277,7 @@ public class Connection public Channel getChannel(int number) { - synchronized (channels) + synchronized (lock) { Channel channel = channels.get(number); if (channel == null) @@ -137,45 +291,146 @@ public class Connection void removeChannel(int number) { - synchronized (channels) + synchronized (lock) { channels.remove(number); } } + public void exception(ConnectionException e) + { + synchronized (lock) + { + switch (state) + { + case OPENING: + case CLOSING: + error = e; + lock.notifyAll(); + break; + default: + listener.exception(this, e); + break; + } + } + } + public void exception(Throwable t) { - delegate.exception(t); + synchronized (lock) + { + switch (state) + { + case OPENING: + case CLOSING: + error = t; + lock.notifyAll(); + break; + default: + listener.exception(this, new ConnectionException(t)); + break; + } + } } void closeCode(ConnectionClose close) { - synchronized (channels) + synchronized (lock) { for (Channel ch : channels.values()) { ch.closeCode(close); } + ConnectionCloseCode code = close.getReplyCode(); + if (code != ConnectionCloseCode.NORMAL) + { + exception(new ConnectionException(close)); + } } } public void closed() { log.debug("connection closed: %s", this); - synchronized (channels) + + if (state == OPEN) + { + exception(new ConnectionException("connection aborted")); + } + + synchronized (lock) { List<Channel> values = new ArrayList<Channel>(channels.values()); for (Channel ch : values) { ch.closed(); } + + sender = null; + setState(CLOSED); } - delegate.closed(); + + listener.closed(this); } public void close() { - sender.close(); + synchronized (lock) + { + switch (state) + { + case OPEN: + Channel ch = getChannel(0); + state = CLOSING; + ch.connectionClose(ConnectionCloseCode.NORMAL, null); + Waiter w = new Waiter(lock, timeout); + while (w.hasTime() && state == CLOSING && error == null) + { + w.await(); + } + + if (error != null) + { + close(); + throw new ConnectionException(error); + } + + switch (state) + { + case CLOSING: + close(); + throw new ConnectionException("close() timed out"); + case CLOSED: + break; + default: + throw new IllegalStateException(String.valueOf(state)); + } + break; + case CLOSED: + break; + default: + if (sender != null) + { + sender.close(); + w = new Waiter(lock, timeout); + while (w.hasTime() && sender != null && error == null) + { + w.await(); + } + + if (error != null) + { + throw new ConnectionException(error); + } + + if (sender != null) + { + throw new ConnectionException("close() timed out"); + } + } + break; + } + } } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 2aa1db7b28..9056a3120b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -22,22 +22,7 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.SecurityHelper; -import org.apache.qpid.QpidException; - -import java.io.UnsupportedEncodingException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; +import static org.apache.qpid.transport.Connection.State.*; /** @@ -57,231 +42,26 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> private static final Logger log = Logger.get(ConnectionDelegate.class); - private String _username = "guest"; - private String _password = "guest";; - private String _mechanism; - private String _virtualHost; - private SaslClient saslClient; - private SaslServer saslServer; - private String _locale = "utf8"; - private int maxFrame = 64*1024; - private Condition _negotiationComplete; - private Lock _negotiationCompleteLock; - - public abstract SessionDelegate getSessionDelegate(); - - public abstract void exception(Throwable t); - - public abstract void closed(); - - public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete) - { - _negotiationComplete = negotiationComplete; - _negotiationCompleteLock = negotiationCompleteLock; - } - - public void init(Channel ch, ProtocolHeader hdr) - { - ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor())); - List<Object> plain = new ArrayList<Object>(); - plain.add("PLAIN"); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - ch.connectionStart(null, plain, utf8); - } - - // ---------------------------------------------- - // Client side - //----------------------------------------------- - @Override public void connectionStart(Channel context, ConnectionStart struct) - { - String mechanism = null; - byte[] response = null; - try - { - mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); - saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, - SecurityHelper.createCallbackHandler(mechanism,_username,_password )); - response = saslClient.evaluateChallenge(new byte[0]); - } - catch (UnsupportedEncodingException e) - { - // need error handling - } - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - } - - Map<String,Object> props = new HashMap<String,Object>(); - context.connectionStartOk(props, mechanism, response, _locale); - } - - @Override public void connectionSecure(Channel context, ConnectionSecure struct) - { - try - { - byte[] response = saslClient.evaluateChallenge(struct.getChallenge()); - context.connectionSecureOk(response); - } - catch (SaslException e) - { - // need error handling - } - } - - @Override public void connectionTune(Channel context, ConnectionTune struct) - { - context.getConnection().setChannelMax(struct.getChannelMax()); - context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax()); - context.connectionOpen(_virtualHost, null, Option.INSIST); - } - - - @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) - { - List<Object> knownHosts = struct.getKnownHosts(); - if(_negotiationCompleteLock != null) - { - _negotiationCompleteLock.lock(); - try - { - _negotiationComplete.signalAll(); - } - finally - { - _negotiationCompleteLock.unlock(); - } - } - } - - public void connectionRedirect(Channel context, ConnectionRedirect struct) - { - // not going to bother at the moment - } - - // ---------------------------------------------- - // Server side - //----------------------------------------------- - @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) + public SessionDelegate getSessionDelegate() { - //set the client side locale on the server side - _locale = struct.getLocale(); - _mechanism = struct.getMechanism(); - - //try - //{ - //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); - byte[] challenge = null; - if ( challenge == null) - { - context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); - } - else - { - try - { - context.connectionSecure(challenge); - } - catch(Exception e) - { - - } - } - - - /*} - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - }*/ + return new SessionDelegate(); } - @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) - { - try - { - saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - byte[] challenge = saslServer.evaluateResponse(struct.getResponse()); - if ( challenge == null) - { - context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); - } - else - { - try - { - context.connectionSecure(challenge); - } - catch(Exception e) - { - - } - } - - - } - catch (SaslException e) - { - // need error handling - } - catch (QpidException e) - { - // need error handling - } - } - - - @Override public void connectionOpen(Channel context, ConnectionOpen struct) - { - List<Object> hosts = new ArrayList<Object>(); - hosts.add("amqp:1223243232325"); - context.connectionOpenOk(hosts); - } + public abstract void init(Channel ch, ProtocolHeader hdr); @Override public void connectionClose(Channel ch, ConnectionClose close) { - ch.getConnection().closeCode(close); + Connection conn = ch.getConnection(); ch.connectionCloseOk(); + conn.getSender().close(); + conn.closeCode(close); + conn.setState(CLOSE_RCVD); } - public String getPassword() - { - return _password; - } - - public void setPassword(String password) - { - _password = password; - } - - public String getUsername() - { - return _username; - } - - public void setUsername(String username) - { - _username = username; - } - - public String getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(String host) + @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok) { - _virtualHost = host; + Connection conn = ch.getConnection(); + conn.getSender().close(); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java index c3239ef684..1bd7d516cf 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java @@ -26,17 +26,37 @@ package org.apache.qpid.transport; * */ -public class ConnectionException extends RuntimeException +public class ConnectionException extends TransportException { private ConnectionClose close; - public ConnectionException(ConnectionClose close) + public ConnectionException(String message, ConnectionClose close, Throwable cause) { - super(close.getReplyText()); + super(message, cause); this.close = close; } + public ConnectionException(String message) + { + this(message, null, null); + } + + public ConnectionException(String message, Throwable cause) + { + this(message, null, cause); + } + + public ConnectionException(Throwable cause) + { + this(cause.getMessage(), null, cause); + } + + public ConnectionException(ConnectionClose close) + { + this(close.getReplyText(), close, null); + } + public ConnectionClose getClose() { return close; diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java new file mode 100644 index 0000000000..616e76825a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * ConnectionListener + * + */ + +public interface ConnectionListener +{ + + void opened(Connection connection); + + void exception(Connection connection, ConnectionException exception); + + void closed(Connection connection); + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/java/common/src/main/java/org/apache/qpid/transport/Echo.java index b2be32331a..89b59c2512 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -43,25 +43,16 @@ public class Echo extends SessionDelegate public static final void main(String[] args) throws IOException { - ConnectionDelegate delegate = new ConnectionDelegate() + ConnectionDelegate delegate = new ServerDelegate() { public SessionDelegate getSessionDelegate() { return new Echo(); } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} }; - //hack - delegate.setUsername("guest"); - delegate.setPassword("guest"); - IoAcceptor ioa = new IoAcceptor - ("0.0.0.0", 5672, new ConnectionBinding(delegate)); + ("0.0.0.0", 5672, ConnectionBinding.get(delegate)); ioa.start(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java index 2de0c169a5..0cca0227a1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java @@ -26,7 +26,7 @@ package org.apache.qpid.transport; * */ -public final class ProtocolVersionException extends TransportException +public final class ProtocolVersionException extends ConnectionException { private final byte major; diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java new file mode 100644 index 0000000000..c27419cadc --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.util.Collections; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import java.io.UnsupportedEncodingException; + +import org.apache.qpid.QpidException; + +import org.apache.qpid.SecurityHelper; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + + +import static org.apache.qpid.transport.Connection.State.*; + + +/** + * ServerDelegate + * + */ + +public class ServerDelegate extends ConnectionDelegate +{ + + private SaslServer saslServer; + + public void init(Channel ch, ProtocolHeader hdr) + { + Connection conn = ch.getConnection(); + conn.send(new ProtocolHeader(1, 0, 10)); + List<Object> utf8 = new ArrayList<Object>(); + utf8.add("utf8"); + ch.connectionStart(null, Collections.EMPTY_LIST, utf8); + } + + @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok) + { + Connection conn = ch.getConnection(); + conn.setLocale(ok.getLocale()); + String mechanism = ok.getMechanism(); + + if (mechanism == null || mechanism.length() == 0) + { + ch.connectionTune + (Integer.MAX_VALUE, + org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, + 0, Integer.MAX_VALUE); + return; + } + + try + { + SaslServer ss = Sasl.createSaslServer + (mechanism, "AMQP", "localhost", null, null); + if (ss == null) + { + ch.connectionClose + (ConnectionCloseCode.CONNECTION_FORCED, + "null SASL mechanism: " + mechanism); + return; + } + conn.setSaslServer(ss); + secure(ch, ok.getResponse()); + } + catch (SaslException e) + { + conn.exception(e); + } + } + + private void secure(Channel ch, byte[] response) + { + Connection conn = ch.getConnection(); + SaslServer ss = conn.getSaslServer(); + try + { + byte[] challenge = ss.evaluateResponse(response); + if (ss.isComplete()) + { + ss.dispose(); + ch.connectionTune + (Integer.MAX_VALUE, + org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, + 0, Integer.MAX_VALUE); + } + else + { + ch.connectionSecure(challenge); + } + } + catch (SaslException e) + { + conn.exception(e); + } + } + + @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok) + { + secure(ch, ok.getResponse()); + } + + @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok) + { + + } + + @Override public void connectionOpen(Channel ch, ConnectionOpen open) + { + Connection conn = ch.getConnection(); + ch.connectionOpenOk(Collections.EMPTY_LIST); + conn.setState(OPEN); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 10ca6cfb0a..df4313e15b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -49,6 +49,26 @@ public class Session extends Invoker private static final Logger log = Logger.get(Session.class); + class DefaultSessionListener implements SessionListener + { + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + log.info("message: %s", xfr); + } + + public void exception(Session ssn, SessionException exc) + { + throw exc; + } + + public void closed(Session ssn) {} + } + + public static final int UNLIMITED_CREDIT = 0xFFFFFFFF; + private static boolean ENABLE_REPLAY = false; static @@ -65,6 +85,7 @@ public class Session extends Invoker } private byte[] name; + private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; @@ -97,6 +118,23 @@ public class Session extends Invoker return name; } + public void setSessionListener(SessionListener listener) + { + if (listener == null) + { + this.listener = new DefaultSessionListener(); + } + else + { + this.listener = listener; + } + } + + public SessionListener getSessionListener() + { + return listener; + } + public void setAutoSync(boolean value) { synchronized (commands) @@ -270,8 +308,8 @@ public class Session extends Invoker { if (closed.get()) { - List<ExecutionException> exc = getExceptions(); - if (!exc.isEmpty()) + ExecutionException exc = getException(); + if (exc != null) { throw new SessionException(exc); } @@ -361,7 +399,7 @@ public class Session extends Invoker { if (closed.get()) { - throw new SessionException(getExceptions()); + throw new SessionException(getException()); } else { @@ -375,8 +413,7 @@ public class Session extends Invoker private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>(); - private List<ExecutionException> exceptions = - new ArrayList<ExecutionException>(); + private ExecutionException exception = null; void result(int command, Struct result) { @@ -388,11 +425,17 @@ public class Session extends Invoker future.set(result); } - void addException(ExecutionException exc) + void setException(ExecutionException exc) { - synchronized (exceptions) + synchronized (results) { - exceptions.add(exc); + if (exception != null) + { + throw new IllegalStateException + (String.format + ("too many exceptions: %s, %s", exception, exc)); + } + exception = exc; } } @@ -403,11 +446,11 @@ public class Session extends Invoker this.close = close; } - List<ExecutionException> getExceptions() + ExecutionException getException() { - synchronized (exceptions) + synchronized (results) { - return new ArrayList<ExecutionException>(exceptions); + return exception; } } @@ -473,7 +516,7 @@ public class Session extends Invoker } else if (closed.get()) { - throw new SessionException(getExceptions()); + throw new SessionException(getException()); } else { diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java index d2c54cf339..354e5c1d15 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java @@ -33,7 +33,7 @@ public class SessionClosedException extends SessionException public SessionClosedException() { - super(Collections.EMPTY_LIST); + super(null); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index b91763509c..3e6fa9d5d9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -29,7 +29,7 @@ import org.apache.qpid.transport.network.Frame; * @author Rafael H. Schloming */ -public abstract class SessionDelegate +public class SessionDelegate extends MethodDelegate<Session> implements ProtocolDelegate<Session> { @@ -57,7 +57,7 @@ public abstract class SessionDelegate @Override public void executionException(Session ssn, ExecutionException exc) { - ssn.addException(exc); + ssn.setException(exc); } @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) @@ -122,4 +122,9 @@ public abstract class SessionDelegate ssn.syncPoint(); } + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) + { + ssn.getSessionListener().message(ssn, xfr); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java index dc294b2206..ae9b4b9cdb 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionException.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionException.java @@ -27,20 +27,20 @@ import java.util.List; * */ -public class SessionException extends RuntimeException +public class SessionException extends TransportException { - private List<ExecutionException> exceptions; + private ExecutionException exception; - public SessionException(List<ExecutionException> exceptions) + public SessionException(ExecutionException exception) { - super(exceptions.isEmpty() ? "" : exceptions.toString()); - this.exceptions = exceptions; + super(String.valueOf(exception)); + this.exception = exception; } - public List<ExecutionException> getExceptions() + public ExecutionException getException() { - return exceptions; + return exception; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java new file mode 100644 index 0000000000..63690177f9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * SessionListener + * + */ + +public interface SessionListener +{ + + void opened(Session session); + + void message(Session ssn, MessageTransfer xfr); + + void exception(Session session, SessionException exception); + + void closed(Session session); + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/java/common/src/main/java/org/apache/qpid/transport/Sink.java index 8653acedbe..617867cae6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Sink.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Sink.java @@ -103,28 +103,17 @@ public class Sink extends SessionDelegate public static final void main(String[] args) throws IOException { - ConnectionDelegate delegate = new ConnectionDelegate() + ConnectionDelegate delegate = new ServerDelegate() { public SessionDelegate getSessionDelegate() { return new Sink(); } - - public void exception(Throwable t) - { - t.printStackTrace(); - } - - public void closed() {} }; - //hack - delegate.setUsername("guest"); - delegate.setPassword("guest"); - IoAcceptor ioa = new IoAcceptor - ("0.0.0.0", 5672, new ConnectionBinding(delegate)); + ("0.0.0.0", 5672, ConnectionBinding.get(delegate)); System.out.println (String.format (FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate")); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 6886cb3a5a..8a2aba2e6d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -33,23 +33,46 @@ import org.apache.qpid.transport.Sender; * */ -public class ConnectionBinding implements Binding<Connection,ByteBuffer> +public abstract class ConnectionBinding + implements Binding<Connection,ByteBuffer> { - private static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - - private final ConnectionDelegate delegate; + public static Binding<Connection,ByteBuffer> get(final Connection connection) + { + return new ConnectionBinding() + { + public Connection connection() + { + return connection; + } + }; + } - public ConnectionBinding(ConnectionDelegate delegate) + public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate) { - this.delegate = delegate; + return new ConnectionBinding() + { + public Connection connection() + { + Connection conn = new Connection(); + conn.setConnectionDelegate(delegate); + return conn; + } + }; } + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + + public abstract Connection connection(); + public Connection endpoint(Sender<ByteBuffer> sender) { + Connection conn = connection(); + // XXX: hardcoded max-frame - return new Connection - (new Disassembler(sender, MAX_FRAME_SIZE), delegate); + Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE); + conn.setSender(dis); + return conn; } public Receiver<ByteBuffer> receiver(Connection conn) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index a1fb0371fd..5efd51d5db 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -61,7 +61,7 @@ final class IoReceiver extends Thread start(); } - void close() + void close(boolean block) { if (!closed.getAndSet(true)) { @@ -75,7 +75,7 @@ final class IoReceiver extends Thread { socket.shutdownInput(); } - if (Thread.currentThread() != this) + if (block && Thread.currentThread() != this) { join(timeout); if (isAlive()) @@ -121,6 +121,7 @@ final class IoReceiver extends Thread } } } + socket.close(); } catch (Throwable t) { @@ -129,7 +130,6 @@ final class IoReceiver extends Thread finally { receiver.closed(); - transport.getSender().close(); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index ef892744ab..f70a13ec3c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -196,17 +196,12 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> throw new TransportException("join timed out"); } } - transport.getReceiver().close(); - socket.close(); + transport.getReceiver().close(false); } catch (InterruptedException e) { throw new TransportException(e); } - catch (IOException e) - { - throw new TransportException(e); - } if (reportException && exception != null) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 70fd8a3c06..be17766740 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -108,7 +108,7 @@ public final class IoTransport<E> public static final Connection connect(String host, int port, ConnectionDelegate delegate) { - return connect(host, port, new ConnectionBinding(delegate)); + return connect(host, port, ConnectionBinding.get(delegate)); } public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java index f8dbec3c3d..b89eed48b0 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -262,13 +262,13 @@ public class MinaHandler<E> implements IoHandler ConnectionDelegate delegate) throws IOException { - accept(host, port, new ConnectionBinding(delegate)); + accept(host, port, ConnectionBinding.get(delegate)); } public static final Connection connect(String host, int port, ConnectionDelegate delegate) { - return connect(host, port, new ConnectionBinding(delegate)); + return connect(host, port, ConnectionBinding.get(delegate)); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java index 318fe0d03b..3bc6730623 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -87,8 +87,9 @@ public class NioHandler implements Runnable } NioSender sender = new NioSender(_ch); - Connection con = new Connection - (new Disassembler(sender, 64*1024 - 1), delegate); + Connection con = new Connection(); + con.setSender(new Disassembler(sender, 64*1024 - 1)); + con.setConnectionDelegate(delegate); con.setConnectionId(_count.incrementAndGet()); _handlers.put(con.getConnectionId(),sender); diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java new file mode 100644 index 0000000000..e034d779ca --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.util; + + +/** + * Waiter + * + */ + +public final class Waiter +{ + + private final Object lock; + private final long timeout; + private final long start; + private long elapsed; + + public Waiter(Object lock, long timeout) + { + this.lock = lock; + this.timeout = timeout; + this.start = System.currentTimeMillis(); + this.elapsed = 0; + } + + public boolean hasTime() + { + return elapsed < timeout; + } + + public void await() + { + try + { + lock.wait(timeout - elapsed); + } + catch (InterruptedException e) + { + // pass + } + elapsed = System.currentTimeMillis() - start; + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index b9ca210483..e61ffb501b 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -50,38 +50,30 @@ public class ConnectionTest extends TestCase port = AvailablePortFinder.getNextAvailable(12000); - ConnectionDelegate server = new ConnectionDelegate() { - public void init(Channel ch, ProtocolHeader hdr) { + ConnectionDelegate server = new ServerDelegate() { + @Override public void connectionOpen(Channel ch, ConnectionOpen open) + { + super.connectionOpen(ch, open); ch.getConnection().close(); } - - public SessionDelegate getSessionDelegate() { - return new SessionDelegate() {}; - } - public void exception(Throwable t) { - log.error(t, "exception caught"); - } - public void closed() {} }; IoAcceptor ioa = new IoAcceptor - ("localhost", port, new ConnectionBinding(server)); + ("localhost", port, ConnectionBinding.get(server)); ioa.start(); } private Connection connect(final Condition closed) { - Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate() + Connection conn = new Connection(); + conn.setConnectionListener(new ConnectionListener() { - public SessionDelegate getSessionDelegate() + public void opened(Connection conn) {} + public void exception(Connection conn, ConnectionException exc) { - return new SessionDelegate() {}; + exc.printStackTrace(); } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() + public void closed(Connection conn) { if (closed != null) { @@ -89,8 +81,7 @@ public class ConnectionTest extends TestCase } } }); - - conn.send(new ProtocolHeader(1, 0, 10)); + conn.connect("localhost", port, null, "guest", "guest"); return conn; } |
