From b401fdc69fefb860e6f45d65f6fc2e0ccf1ce82f Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 4 Feb 2011 16:15:27 +0000 Subject: QPID-3029: actually set and negotiate the supported max num channels per connection during connection handshake. Enable/make the 0-10 client use channel numbers 0 to N-1 in line with the spec, rather than 1-N. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067210 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/configuration/ServerConfiguration.java | 6 + .../handler/ConnectionSecureOkMethodHandler.java | 2 +- .../handler/ConnectionStartOkMethodHandler.java | 2 +- .../handler/ConnectionTuneOkMethodHandler.java | 5 +- .../qpid/server/protocol/AMQProtocolEngine.java | 2 +- .../server/transport/ServerConnectionDelegate.java | 7 + .../java/org/apache/qpid/client/AMQConnection.java | 170 ++------------------- .../apache/qpid/client/AMQConnectionDelegate.java | 4 +- .../qpid/client/AMQConnectionDelegate_0_10.java | 22 ++- .../qpid/client/AMQConnectionDelegate_8_0.java | 12 +- .../apache/qpid/client/ChannelToSessionMap.java | 147 ++++++++++++++++++ .../handler/ConnectionTuneMethodHandler.java | 5 +- .../qpid/client/protocol/AMQProtocolSession.java | 5 + .../qpid/jms/ChannelLimitReachedException.java | 6 +- .../org/apache/qpid/transport/ClientDelegate.java | 25 ++- .../java/org/apache/qpid/transport/Connection.java | 6 +- .../org/apache/qpid/transport/ServerDelegate.java | 49 ++++-- .../qpid/server/logging/ChannelLoggingTest.java | 6 +- .../qpid/test/unit/client/AMQConnectionTest.java | 13 +- 19 files changed, 298 insertions(+), 196 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index d7dcfa7dc8..7197ec8cdc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -108,6 +108,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa envVarMap.put("QPID_MAXIMUMMESSAGECOUNT", "maximumMessageCount"); envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth"); envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize"); + envVarMap.put("QPID_MAXIMUMCHANNELCOUNT", "maximumChannelCount"); envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap"); envVarMap.put("QPID_QUEUECAPACITY", "capacity"); envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity"); @@ -818,4 +819,9 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa } }; } + + public int getMaxChannelCount() + { + return getIntValue("maximumChannelCount", 256); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index cda8cff25a..d4b79134a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -92,7 +92,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = - methodRegistry.createConnectionTuneBody(0xFFFF, + methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(), ConnectionStartOkMethodHandler.getConfiguredFrameSize(), ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 6512ff1a14..4442f969c4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -113,7 +113,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF, + ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(), getConfiguredFrameSize(), ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index 9f392ffc44..1da2760639 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; @@ -51,5 +50,9 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener _slowAccessSessions = new LinkedHashMap(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; - private AtomicInteger _idFactory = new AtomicInteger(0); - private int _maxChannelID; - private boolean _cycledIds; - - public AMQSession get(int channelId) - { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } - } - - public AMQSession put(int channelId, AMQSession session) - { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - - } - - public AMQSession remove(int channelId) - { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - - } - - public Collection values() - { - ArrayList values = new ArrayList(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; - } - - public int size() - { - return _size; - } - - public void clear() - { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } - } - - /* - * Synchronized on whole method so that we don't need to consider the - * increment-then-reset path in too much detail - */ - public synchronized int getNextChannelId() - { - int id = 0; - if (!_cycledIds) - { - id = _idFactory.incrementAndGet(); - if (id == _maxChannelID) - { - _cycledIds = true; - _idFactory.set(0); // Go back to the start - } - } - else - { - boolean done = false; - while (!done) - { - // Needs to work second time through - id = _idFactory.incrementAndGet(); - if (id > _maxChannelID) - { - _idFactory.set(0); - id = _idFactory.incrementAndGet(); - } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } - } - } - - return id; - } - - public void setMaxChannelID(int maxChannelID) - { - _maxChannelID = maxChannelID; - } - } - private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); @@ -244,9 +94,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session - * and we must prevent the client from opening too many. Zero means unlimited. + * and we must prevent the client from opening too many. */ - protected long _maximumChannelCount; + private long _maximumChannelCount; /** The maximum size of frame supported by the server */ private long _maximumFrameSize; @@ -489,7 +339,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate = new AMQConnectionDelegate_0_10(this); } - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -570,8 +419,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); - if (_logger.isDebugEnabled()) { _logger.debug("Are we connected:" + _connected); @@ -579,6 +426,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } + String message = null; if (connectionException != null) @@ -620,6 +472,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } + _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); + _sessions.setMinChannelID(_delegate.getMinChannelID()); + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -647,7 +504,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); @@ -898,7 +754,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean channelLimitReached() { - return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); + return _sessions.size() >= _maximumChannelCount; } public String getClientID() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 5f93ec6c47..9560bd5c7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -59,6 +59,8 @@ public interface AMQConnectionDelegate T executeRetrySupport(FailoverProtectedOperation operation) throws E; int getMaxChannelID(); - + + int getMinChannelID(); + ProtocolVersion getProtocolVersion(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index adfd178ec3..4b4417b6ef 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -37,6 +37,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Connection; @@ -82,6 +83,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec throws JMSException { _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); + } + int channelId = _conn.getNextChannelID(); AMQSession session; try @@ -120,6 +127,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); + } + int channelId = _conn.getNextChannelID(); XASessionImpl session; try @@ -165,6 +178,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); + _conn.setMaximumChannelCount(_qpidConnection.getChannelMax()); _conn._failoverPolicy.attainedConnection(); } catch (ProtocolVersionException pe) @@ -293,7 +307,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public int getMaxChannelID() { - return Integer.MAX_VALUE; + //For a negotiated channelMax N, there are channels 0 to N-1 available. + return _qpidConnection.getChannelMax() - 1; + } + + public int getMinChannelID() + { + return Connection.MIN_USABLE_CHANNEL_NUM; } public ProtocolVersion getProtocolVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 9cee4dab53..40b332d216 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -36,6 +36,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.transport.TransportConnection; @@ -134,7 +135,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate if (_conn.channelLimitReached()) { - throw new ChannelLimitReachedException(_conn._maximumChannelCount); + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); } return new FailoverRetrySupport( @@ -307,7 +308,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public int getMaxChannelID() { - return (int) (Math.pow(2, 16)-1); + ConnectionTuneParameters params = _conn.getProtocolHandler().getProtocolSession().getConnectionTuneParameters(); + + return params == null ? AMQProtocolSession.MAX_CHANNEL_MAX : params.getChannelMax(); + } + + public int getMinChannelID() + { + return AMQProtocolSession.MIN_USABLE_CHANNEL_NUM; } public ProtocolVersion getProtocolVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java new file mode 100644 index 0000000000..2b7e3d44da --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -0,0 +1,147 @@ +package org.apache.qpid.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public final class ChannelToSessionMap +{ + private final AMQSession[] _fastAccessSessions = new AMQSession[16]; + private final LinkedHashMap _slowAccessSessions = new LinkedHashMap(); + private int _size = 0; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private AtomicInteger _idFactory = new AtomicInteger(0); + private int _maxChannelID; + private int _minChannelID; + + public AMQSession get(int channelId) + { + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + return _fastAccessSessions[channelId]; + } + else + { + return _slowAccessSessions.get(channelId); + } + } + + public AMQSession put(int channelId, AMQSession session) + { + AMQSession oldVal; + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + oldVal = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = session; + } + else + { + oldVal = _slowAccessSessions.put(channelId, session); + } + if ((oldVal != null) && (session == null)) + { + _size--; + } + else if ((oldVal == null) && (session != null)) + { + _size++; + } + + return session; + + } + + public AMQSession remove(int channelId) + { + AMQSession session; + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + session = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = null; + } + else + { + session = _slowAccessSessions.remove(channelId); + } + + if (session != null) + { + _size--; + } + return session; + + } + + public Collection values() + { + ArrayList values = new ArrayList(size()); + + for (int i = 0; i < 16; i++) + { + if (_fastAccessSessions[i] != null) + { + values.add(_fastAccessSessions[i]); + } + } + values.addAll(_slowAccessSessions.values()); + + return values; + } + + public int size() + { + return _size; + } + + public void clear() + { + _size = 0; + _slowAccessSessions.clear(); + for (int i = 0; i < 16; i++) + { + _fastAccessSessions[i] = null; + } + } + + /* + * Synchronized on whole method so that we don't need to consider the + * increment-then-reset path in too much detail + */ + public synchronized int getNextChannelId() + { + int id = _minChannelID; + + boolean done = false; + while (!done) + { + id = _idFactory.getAndIncrement(); + if (id == _maxChannelID) + { + //go back to the start + _idFactory.set(_minChannelID); + } + if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) + { + done = (_fastAccessSessions[id] == null); + } + else + { + done = (!_slowAccessSessions.keySet().contains(id)); + } + } + + return id; + } + + public void setMaxChannelID(int maxChannelID) + { + _maxChannelID = maxChannelID; + } + + public void setMinChannelID(int minChannelID) + { + _minChannelID = minChannelID; + _idFactory.set(_minChannelID); + } +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 287b5957a1..d1b2caf987 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -55,9 +55,12 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener + public static final int MAX_CHANNEL_MAX = 0xFFFF; + public static final int MIN_USABLE_CHANNEL_NUM = 1; + protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; protected static final String AMQ_CONNECTION = "AMQConnection"; @@ -178,6 +182,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); + con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); _protocolHandler.initHeartbeats((int) params.getHeartbeat()); diff --git a/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java b/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java index 3d4a4573ed..e8c2b9d682 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java @@ -33,9 +33,9 @@ public class ChannelLimitReachedException extends ResourceAllocationException public ChannelLimitReachedException(long limit) { - super("Unable to create session since maximum number of sessions per connection is " + - limit + ". Either close one or more sessions or increase the " + - "maximum number of sessions per connection (or contact your AMQP administrator.", ERROR_CODE); + super("Unable to create session, the maximum number of sessions per connection is " + + limit + ". You must either close one or more sessions or increase the " + + "maximum number of sessions available per connection.", ERROR_CODE); _limit = limit; } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 82fb57eb7d..bce64075e5 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -86,7 +86,8 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionStart(Connection conn, ConnectionStart start) + @Override + public void connectionStart(Connection conn, ConnectionStart start) { Map clientProperties = new HashMap(); @@ -156,7 +157,8 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionSecure(Connection conn, ConnectionSecure secure) + @Override + public void connectionSecure(Connection conn, ConnectionSecure secure) { SaslClient sc = conn.getSaslClient(); try @@ -170,9 +172,9 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionTune(Connection conn, ConnectionTune tune) + @Override + public void connectionTune(Connection conn, ConnectionTune tune) { - conn.setChannelMax(tune.getChannelMax()); int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() @@ -182,10 +184,17 @@ public class ClientDelegate extends ConnectionDelegate hb_interval); // The idle timeout is twice the heartbeat amount (in milisecs) conn.setIdleTimeout(hb_interval*1000*2); + + int channelMax = tune.getChannelMax(); + //0 means no implied limit, except available server resources + //(or that forced by protocol limitations [0xFFFF]) + conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax); + conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST); } - @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) + @Override + public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) { SaslClient sc = conn.getSaslClient(); if (sc != null) @@ -210,12 +219,14 @@ public class ClientDelegate extends ConnectionDelegate conn.setState(OPEN); } - @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir) + @Override + public void connectionRedirect(Connection conn, ConnectionRedirect redir) { throw new UnsupportedOperationException(); } - @Override public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + @Override + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) { conn.connectionHeartbeat(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 8abae7a23e..fd19fa0512 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -59,6 +59,9 @@ public class Connection extends ConnectionInvoker protected static final Logger log = Logger.get(Connection.class); + //Usable channels are numbered 0 to - 1 + public static final int MAX_CHANNEL_MAX = 0xFFFF; + public static final int MIN_USABLE_CHANNEL_NUM = 0; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } @@ -404,7 +407,8 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - for (int i = 1; i <= getChannelMax(); i++) + //For a negotiated channelMax N, there are channels 0 to N-1 available. + for (int i = 0; i < getChannelMax(); i++) { if (!channels.containsKey(i)) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 644a2daa58..b8e7616a37 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -30,6 +30,8 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * ServerDelegate @@ -38,8 +40,8 @@ import javax.security.sasl.SaslServer; public class ServerDelegate extends ConnectionDelegate { + protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class); - private SaslServer saslServer; private List _locales; private List _mechanisms; private Map _clientProperties; @@ -47,7 +49,7 @@ public class ServerDelegate extends ConnectionDelegate public ServerDelegate() { - this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8")); + this(null, Collections.emptyList(), Collections.singletonList((Object)"utf8")); } protected ServerDelegate(Map clientProperties, List mechanisms, List locales) @@ -64,7 +66,8 @@ public class ServerDelegate extends ConnectionDelegate conn.connectionStart(_clientProperties, _mechanisms, _locales); } - @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok) + @Override + public void connectionStartOk(Connection conn, ConnectionStartOk ok) { conn.setLocale(ok.getLocale()); String mechanism = ok.getMechanism(); @@ -75,9 +78,9 @@ public class ServerDelegate extends ConnectionDelegate if (mechanism == null || mechanism.length() == 0) { conn.connectionTune - (Integer.MAX_VALUE, + (getChannelMax(), org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, Integer.MAX_VALUE); + 0, getHeartbeatMax()); return; } @@ -118,7 +121,7 @@ public class ServerDelegate extends ConnectionDelegate { ss.dispose(); conn.connectionTune - (Integer.MAX_VALUE, + (getChannelMax(), org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, 0, getHeartbeatMax()); conn.setAuthorizationID(ss.getAuthorizationID()); @@ -140,19 +143,42 @@ public class ServerDelegate extends ConnectionDelegate return Integer.MAX_VALUE; } - @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok) + protected int getChannelMax() + { + return Integer.MAX_VALUE; + } + + @Override + public void connectionSecureOk(Connection conn, ConnectionSecureOk ok) { secure(conn, ok.getResponse()); } - @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) + @Override + public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) { + int okChannelMax = ok.getChannelMax(); + if (okChannelMax > getChannelMax()) + { + _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a channelMax (" + okChannelMax + + ") above the servers offered limit (" + getChannelMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + conn.getSender().close(); + return; + } + + //0 means no implied limit, except available server resources + //(or that forced by protocol limitations [0xFFFF]) + conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax); } - @Override public void connectionOpen(Connection conn, ConnectionOpen open) + @Override + public void connectionOpen(Connection conn, ConnectionOpen open) { - conn.connectionOpenOk(Collections.EMPTY_LIST); + conn.connectionOpenOk(Collections.emptyList()); conn.setState(OPEN); } @@ -168,7 +194,8 @@ public class ServerDelegate extends ConnectionDelegate return new Session(conn, new Binary(atc.getName()), 0); } - @Override public void sessionAttach(Connection conn, SessionAttach atc) + @Override + public void sessionAttach(Connection conn, SessionAttach atc) { Session ssn = getSession(conn, atc); conn.map(ssn, atc.getChannel()); diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java index bd9b18d848..02d0d6f334 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java @@ -75,7 +75,7 @@ public class ChannelLoggingTest extends AbstractTestLogging String log = getLogMessage(results, 0); // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create validateMessageID("CHN-1001", log); - assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log))); if (isBroker08()) { @@ -89,7 +89,7 @@ public class ChannelLoggingTest extends AbstractTestLogging log = getLogMessage(results, 0); // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} validateMessageID("CHN-1004", log); - assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log))); assertTrue("Prefetch Count not correct",getMessageString(fromMessage(log)).endsWith("Count "+PREFETCH)); } @@ -306,7 +306,7 @@ public class ChannelLoggingTest extends AbstractTestLogging validateMessageID("CHN-1001", open); validateMessageID("CHN-1003", close); assertEquals("Message should be Close", "Close", getMessageString(fromMessage(close))); - assertEquals("Incorrect Channel ID closed", 1, getChannelID(fromSubject(close))); + assertEquals("Incorrect Channel ID closed", isBroker010()? 0 : 1, getChannelID(fromSubject(close))); assertEquals("Channel IDs should be the same", getChannelID(fromActor(open)), getChannelID(fromSubject(close))); assertEquals("Connection IDs should be the same", getConnectionID(fromActor(open)), getConnectionID(fromSubject(close))); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 5e83b0569d..292bcd6039 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -266,19 +266,22 @@ public class AMQConnectionTest extends QpidBrokerTestCase } } - public void testGetChannelID() + public void testGetChannelID() throws Exception { - int maxChannelID = 65536; + long maxChannelID = _connection.getMaximumChannelCount(); if (isBroker010()) { - maxChannelID = Integer.MAX_VALUE+1; + //Usable numbers are 0 to N-1 when using 0-10 + //and 1 to N for 0-8/0-9 + maxChannelID = maxChannelID-1; } for (int j = 0; j < 3; j++) { - for (int i = 1; i < maxChannelID; i++) + int i = isBroker010() ? 0 : 1; + for ( ; i <= maxChannelID; i++) { int id = _connection.getNextChannelID(); - assertEquals("On iterartion "+j, i, id); + assertEquals("Unexpected number on iteration "+j, i, id); _connection.deregisterSession(id); } } -- cgit v1.2.1