diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-02-04 16:15:27 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-02-04 16:15:27 +0000 |
| commit | b401fdc69fefb860e6f45d65f6fc2e0ccf1ce82f (patch) | |
| tree | de1a66f3edf29ca2f2b3c1efe2961d2930f1fc96 /java/common | |
| parent | 6741e34eb4f223a5f6c4cc33c7c603a2ba0b015b (diff) | |
| download | qpid-python-b401fdc69fefb860e6f45d65f6fc2e0ccf1ce82f.tar.gz | |
QPID-3029: actually set and negotiate the supported max num channels per connection during connection handshake. Enable/make the 0-10 client use channel numbers 0 to N-1 in line with the spec, rather than 1-N.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
3 files changed, 61 insertions, 19 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 82fb57eb7d..bce64075e5 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -86,7 +86,8 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionStart(Connection conn, ConnectionStart start) + @Override + public void connectionStart(Connection conn, ConnectionStart start) { Map<String,Object> clientProperties = new HashMap<String,Object>(); @@ -156,7 +157,8 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionSecure(Connection conn, ConnectionSecure secure) + @Override + public void connectionSecure(Connection conn, ConnectionSecure secure) { SaslClient sc = conn.getSaslClient(); try @@ -170,9 +172,9 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionTune(Connection conn, ConnectionTune tune) + @Override + public void connectionTune(Connection conn, ConnectionTune tune) { - conn.setChannelMax(tune.getChannelMax()); int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() @@ -182,10 +184,17 @@ public class ClientDelegate extends ConnectionDelegate hb_interval); // The idle timeout is twice the heartbeat amount (in milisecs) conn.setIdleTimeout(hb_interval*1000*2); + + int channelMax = tune.getChannelMax(); + //0 means no implied limit, except available server resources + //(or that forced by protocol limitations [0xFFFF]) + conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax); + conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST); } - @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) + @Override + public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) { SaslClient sc = conn.getSaslClient(); if (sc != null) @@ -210,12 +219,14 @@ public class ClientDelegate extends ConnectionDelegate conn.setState(OPEN); } - @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir) + @Override + public void connectionRedirect(Connection conn, ConnectionRedirect redir) { throw new UnsupportedOperationException(); } - @Override public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + @Override + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) { conn.connectionHeartbeat(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 8abae7a23e..fd19fa0512 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -59,6 +59,9 @@ public class Connection extends ConnectionInvoker protected static final Logger log = Logger.get(Connection.class); + //Usable channels are numbered 0 to <ChannelMax> - 1 + public static final int MAX_CHANNEL_MAX = 0xFFFF; + public static final int MIN_USABLE_CHANNEL_NUM = 0; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } @@ -404,7 +407,8 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - for (int i = 1; i <= getChannelMax(); i++) + //For a negotiated channelMax N, there are channels 0 to N-1 available. + for (int i = 0; i < getChannelMax(); i++) { if (!channels.containsKey(i)) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 644a2daa58..b8e7616a37 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -30,6 +30,8 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * ServerDelegate @@ -38,8 +40,8 @@ import javax.security.sasl.SaslServer; public class ServerDelegate extends ConnectionDelegate { + protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class); - private SaslServer saslServer; private List<Object> _locales; private List<Object> _mechanisms; private Map<String, Object> _clientProperties; @@ -47,7 +49,7 @@ public class ServerDelegate extends ConnectionDelegate public ServerDelegate() { - this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8")); + this(null, Collections.emptyList(), Collections.singletonList((Object)"utf8")); } protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales) @@ -64,7 +66,8 @@ public class ServerDelegate extends ConnectionDelegate conn.connectionStart(_clientProperties, _mechanisms, _locales); } - @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok) + @Override + public void connectionStartOk(Connection conn, ConnectionStartOk ok) { conn.setLocale(ok.getLocale()); String mechanism = ok.getMechanism(); @@ -75,9 +78,9 @@ public class ServerDelegate extends ConnectionDelegate if (mechanism == null || mechanism.length() == 0) { conn.connectionTune - (Integer.MAX_VALUE, + (getChannelMax(), org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, Integer.MAX_VALUE); + 0, getHeartbeatMax()); return; } @@ -118,7 +121,7 @@ public class ServerDelegate extends ConnectionDelegate { ss.dispose(); conn.connectionTune - (Integer.MAX_VALUE, + (getChannelMax(), org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, 0, getHeartbeatMax()); conn.setAuthorizationID(ss.getAuthorizationID()); @@ -140,19 +143,42 @@ public class ServerDelegate extends ConnectionDelegate return Integer.MAX_VALUE; } - @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok) + protected int getChannelMax() + { + return Integer.MAX_VALUE; + } + + @Override + public void connectionSecureOk(Connection conn, ConnectionSecureOk ok) { secure(conn, ok.getResponse()); } - @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) + @Override + public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) { + int okChannelMax = ok.getChannelMax(); + if (okChannelMax > getChannelMax()) + { + _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a channelMax (" + okChannelMax + + ") above the servers offered limit (" + getChannelMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + conn.getSender().close(); + return; + } + + //0 means no implied limit, except available server resources + //(or that forced by protocol limitations [0xFFFF]) + conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax); } - @Override public void connectionOpen(Connection conn, ConnectionOpen open) + @Override + public void connectionOpen(Connection conn, ConnectionOpen open) { - conn.connectionOpenOk(Collections.EMPTY_LIST); + conn.connectionOpenOk(Collections.emptyList()); conn.setState(OPEN); } @@ -168,7 +194,8 @@ public class ServerDelegate extends ConnectionDelegate return new Session(conn, new Binary(atc.getName()), 0); } - @Override public void sessionAttach(Connection conn, SessionAttach atc) + @Override + public void sessionAttach(Connection conn, SessionAttach atc) { Session ssn = getSession(conn, atc); conn.map(ssn, atc.getChannel()); |
