diff options
Diffstat (limited to 'qpid/java')
19 files changed, 298 insertions, 196 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index d7dcfa7dc8..7197ec8cdc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -108,6 +108,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa envVarMap.put("QPID_MAXIMUMMESSAGECOUNT", "maximumMessageCount"); envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth"); envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize"); + envVarMap.put("QPID_MAXIMUMCHANNELCOUNT", "maximumChannelCount"); envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap"); envVarMap.put("QPID_QUEUECAPACITY", "capacity"); envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity"); @@ -818,4 +819,9 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa } }; } + + public int getMaxChannelCount() + { + return getIntValue("maximumChannelCount", 256); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index cda8cff25a..d4b79134a2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -92,7 +92,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = - methodRegistry.createConnectionTuneBody(0xFFFF, + methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(), ConnectionStartOkMethodHandler.getConfiguredFrameSize(), ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 6512ff1a14..4442f969c4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -113,7 +113,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF, + ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(), getConfiguredFrameSize(), ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index 9f392ffc44..1da2760639 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; @@ -51,5 +50,9 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); session.initHeartbeats(body.getHeartbeat()); session.setMaxFrameSize(body.getFrameMax()); + + long maxChannelNumber = body.getChannelMax(); + //0 means no implied limit, except that forced by protocol limitations (0xFFFF) + session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 5368dfe532..a1ffe272fd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -136,7 +136,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol protected volatile boolean _closed; // maximum number of channels this session should have - private long _maxNoOfChannels = 1000; + private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 7ba85ffe14..2b9e92f685 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -145,4 +146,10 @@ public class ServerConnectionDelegate extends ServerDelegate //TODO: implement broker support for actually sending heartbeats return 0; } + + @Override + protected int getChannelMax() + { + return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b483406949..af0d8a3a1d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -26,15 +26,12 @@ import java.net.ConnectException; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -84,153 +81,6 @@ import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { - public static final class ChannelToSessionMap - { - private final AMQSession[] _fastAccessSessions = new AMQSession[16]; - private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; - private AtomicInteger _idFactory = new AtomicInteger(0); - private int _maxChannelID; - private boolean _cycledIds; - - public AMQSession get(int channelId) - { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } - } - - public AMQSession put(int channelId, AMQSession session) - { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - - } - - public AMQSession remove(int channelId) - { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - - } - - public Collection<AMQSession> values() - { - ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; - } - - public int size() - { - return _size; - } - - public void clear() - { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } - } - - /* - * Synchronized on whole method so that we don't need to consider the - * increment-then-reset path in too much detail - */ - public synchronized int getNextChannelId() - { - int id = 0; - if (!_cycledIds) - { - id = _idFactory.incrementAndGet(); - if (id == _maxChannelID) - { - _cycledIds = true; - _idFactory.set(0); // Go back to the start - } - } - else - { - boolean done = false; - while (!done) - { - // Needs to work second time through - id = _idFactory.incrementAndGet(); - if (id > _maxChannelID) - { - _idFactory.set(0); - id = _idFactory.incrementAndGet(); - } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } - } - } - - return id; - } - - public void setMaxChannelID(int maxChannelID) - { - _maxChannelID = maxChannelID; - } - } - private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); @@ -244,9 +94,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session - * and we must prevent the client from opening too many. Zero means unlimited. + * and we must prevent the client from opening too many. */ - protected long _maximumChannelCount; + private long _maximumChannelCount; /** The maximum size of frame supported by the server */ private long _maximumFrameSize; @@ -489,7 +339,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate = new AMQConnectionDelegate_0_10(this); } - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -570,8 +419,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); - if (_logger.isDebugEnabled()) { _logger.debug("Are we connected:" + _connected); @@ -579,6 +426,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } + String message = null; if (connectionException != null) @@ -620,6 +472,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } + _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); + _sessions.setMinChannelID(_delegate.getMinChannelID()); + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -647,7 +504,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); @@ -898,7 +754,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean channelLimitReached() { - return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); + return _sessions.size() >= _maximumChannelCount; } public String getClientID() throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 5f93ec6c47..9560bd5c7c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -59,6 +59,8 @@ public interface AMQConnectionDelegate <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; int getMaxChannelID(); - + + int getMinChannelID(); + ProtocolVersion getProtocolVersion(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index adfd178ec3..4b4417b6ef 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -37,6 +37,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Connection; @@ -82,6 +83,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec throws JMSException { _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); + } + int channelId = _conn.getNextChannelID(); AMQSession session; try @@ -120,6 +127,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); + } + int channelId = _conn.getNextChannelID(); XASessionImpl session; try @@ -165,6 +178,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); + _conn.setMaximumChannelCount(_qpidConnection.getChannelMax()); _conn._failoverPolicy.attainedConnection(); } catch (ProtocolVersionException pe) @@ -293,7 +307,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public int getMaxChannelID() { - return Integer.MAX_VALUE; + //For a negotiated channelMax N, there are channels 0 to N-1 available. + return _qpidConnection.getChannelMax() - 1; + } + + public int getMinChannelID() + { + return Connection.MIN_USABLE_CHANNEL_NUM; } public ProtocolVersion getProtocolVersion() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 9cee4dab53..40b332d216 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -36,6 +36,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.transport.TransportConnection; @@ -134,7 +135,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate if (_conn.channelLimitReached()) { - throw new ChannelLimitReachedException(_conn._maximumChannelCount); + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); } return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( @@ -307,7 +308,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public int getMaxChannelID() { - return (int) (Math.pow(2, 16)-1); + ConnectionTuneParameters params = _conn.getProtocolHandler().getProtocolSession().getConnectionTuneParameters(); + + return params == null ? AMQProtocolSession.MAX_CHANNEL_MAX : params.getChannelMax(); + } + + public int getMinChannelID() + { + return AMQProtocolSession.MIN_USABLE_CHANNEL_NUM; } public ProtocolVersion getProtocolVersion() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java new file mode 100644 index 0000000000..2b7e3d44da --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -0,0 +1,147 @@ +package org.apache.qpid.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public final class ChannelToSessionMap +{ + private final AMQSession[] _fastAccessSessions = new AMQSession[16]; + private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); + private int _size = 0; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private AtomicInteger _idFactory = new AtomicInteger(0); + private int _maxChannelID; + private int _minChannelID; + + public AMQSession get(int channelId) + { + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + return _fastAccessSessions[channelId]; + } + else + { + return _slowAccessSessions.get(channelId); + } + } + + public AMQSession put(int channelId, AMQSession session) + { + AMQSession oldVal; + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + oldVal = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = session; + } + else + { + oldVal = _slowAccessSessions.put(channelId, session); + } + if ((oldVal != null) && (session == null)) + { + _size--; + } + else if ((oldVal == null) && (session != null)) + { + _size++; + } + + return session; + + } + + public AMQSession remove(int channelId) + { + AMQSession session; + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + session = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = null; + } + else + { + session = _slowAccessSessions.remove(channelId); + } + + if (session != null) + { + _size--; + } + return session; + + } + + public Collection<AMQSession> values() + { + ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); + + for (int i = 0; i < 16; i++) + { + if (_fastAccessSessions[i] != null) + { + values.add(_fastAccessSessions[i]); + } + } + values.addAll(_slowAccessSessions.values()); + + return values; + } + + public int size() + { + return _size; + } + + public void clear() + { + _size = 0; + _slowAccessSessions.clear(); + for (int i = 0; i < 16; i++) + { + _fastAccessSessions[i] = null; + } + } + + /* + * Synchronized on whole method so that we don't need to consider the + * increment-then-reset path in too much detail + */ + public synchronized int getNextChannelId() + { + int id = _minChannelID; + + boolean done = false; + while (!done) + { + id = _idFactory.getAndIncrement(); + if (id == _maxChannelID) + { + //go back to the start + _idFactory.set(_minChannelID); + } + if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) + { + done = (_fastAccessSessions[id] == null); + } + else + { + done = (!_slowAccessSessions.keySet().contains(id)); + } + } + + return id; + } + + public void setMaxChannelID(int maxChannelID) + { + _maxChannelID = maxChannelID; + } + + public void setMinChannelID(int minChannelID) + { + _minChannelID = minChannelID; + _idFactory.set(_minChannelID); + } +}
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 287b5957a1..d1b2caf987 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -55,9 +55,12 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con { params = new ConnectionTuneParameters(); } + + int maxChannelNumber = frame.getChannelMax(); + //0 implies no limit, except that forced by protocol limitations (0xFFFF) + params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber); params.setFrameMax(frame.getFrameMax()); - params.setChannelMax(frame.getChannelMax()); params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); session.setConnectionTuneParameters(params); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index f0edd0d7bf..7976760696 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -54,6 +54,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; + //Usable channels are numbered 1 to <ChannelMax> + public static final int MAX_CHANNEL_MAX = 0xFFFF; + public static final int MIN_USABLE_CHANNEL_NUM = 1; + protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; protected static final String AMQ_CONNECTION = "AMQConnection"; @@ -178,6 +182,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); + con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); _protocolHandler.initHeartbeats((int) params.getHeartbeat()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java index 3d4a4573ed..e8c2b9d682 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java @@ -33,9 +33,9 @@ public class ChannelLimitReachedException extends ResourceAllocationException public ChannelLimitReachedException(long limit) { - super("Unable to create session since maximum number of sessions per connection is " + - limit + ". Either close one or more sessions or increase the " + - "maximum number of sessions per connection (or contact your AMQP administrator.", ERROR_CODE); + super("Unable to create session, the maximum number of sessions per connection is " + + limit + ". You must either close one or more sessions or increase the " + + "maximum number of sessions available per connection.", ERROR_CODE); _limit = limit; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 82fb57eb7d..bce64075e5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -86,7 +86,8 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionStart(Connection conn, ConnectionStart start) + @Override + public void connectionStart(Connection conn, ConnectionStart start) { Map<String,Object> clientProperties = new HashMap<String,Object>(); @@ -156,7 +157,8 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionSecure(Connection conn, ConnectionSecure secure) + @Override + public void connectionSecure(Connection conn, ConnectionSecure secure) { SaslClient sc = conn.getSaslClient(); try @@ -170,9 +172,9 @@ public class ClientDelegate extends ConnectionDelegate } } - @Override public void connectionTune(Connection conn, ConnectionTune tune) + @Override + public void connectionTune(Connection conn, ConnectionTune tune) { - conn.setChannelMax(tune.getChannelMax()); int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() @@ -182,10 +184,17 @@ public class ClientDelegate extends ConnectionDelegate hb_interval); // The idle timeout is twice the heartbeat amount (in milisecs) conn.setIdleTimeout(hb_interval*1000*2); + + int channelMax = tune.getChannelMax(); + //0 means no implied limit, except available server resources + //(or that forced by protocol limitations [0xFFFF]) + conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax); + conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST); } - @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) + @Override + public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) { SaslClient sc = conn.getSaslClient(); if (sc != null) @@ -210,12 +219,14 @@ public class ClientDelegate extends ConnectionDelegate conn.setState(OPEN); } - @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir) + @Override + public void connectionRedirect(Connection conn, ConnectionRedirect redir) { throw new UnsupportedOperationException(); } - @Override public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + @Override + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) { conn.connectionHeartbeat(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 8abae7a23e..fd19fa0512 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -59,6 +59,9 @@ public class Connection extends ConnectionInvoker protected static final Logger log = Logger.get(Connection.class); + //Usable channels are numbered 0 to <ChannelMax> - 1 + public static final int MAX_CHANNEL_MAX = 0xFFFF; + public static final int MIN_USABLE_CHANNEL_NUM = 0; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } @@ -404,7 +407,8 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - for (int i = 1; i <= getChannelMax(); i++) + //For a negotiated channelMax N, there are channels 0 to N-1 available. + for (int i = 0; i < getChannelMax(); i++) { if (!channels.containsKey(i)) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 644a2daa58..b8e7616a37 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -30,6 +30,8 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * ServerDelegate @@ -38,8 +40,8 @@ import javax.security.sasl.SaslServer; public class ServerDelegate extends ConnectionDelegate { + protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class); - private SaslServer saslServer; private List<Object> _locales; private List<Object> _mechanisms; private Map<String, Object> _clientProperties; @@ -47,7 +49,7 @@ public class ServerDelegate extends ConnectionDelegate public ServerDelegate() { - this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8")); + this(null, Collections.emptyList(), Collections.singletonList((Object)"utf8")); } protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales) @@ -64,7 +66,8 @@ public class ServerDelegate extends ConnectionDelegate conn.connectionStart(_clientProperties, _mechanisms, _locales); } - @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok) + @Override + public void connectionStartOk(Connection conn, ConnectionStartOk ok) { conn.setLocale(ok.getLocale()); String mechanism = ok.getMechanism(); @@ -75,9 +78,9 @@ public class ServerDelegate extends ConnectionDelegate if (mechanism == null || mechanism.length() == 0) { conn.connectionTune - (Integer.MAX_VALUE, + (getChannelMax(), org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, Integer.MAX_VALUE); + 0, getHeartbeatMax()); return; } @@ -118,7 +121,7 @@ public class ServerDelegate extends ConnectionDelegate { ss.dispose(); conn.connectionTune - (Integer.MAX_VALUE, + (getChannelMax(), org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, 0, getHeartbeatMax()); conn.setAuthorizationID(ss.getAuthorizationID()); @@ -140,19 +143,42 @@ public class ServerDelegate extends ConnectionDelegate return Integer.MAX_VALUE; } - @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok) + protected int getChannelMax() + { + return Integer.MAX_VALUE; + } + + @Override + public void connectionSecureOk(Connection conn, ConnectionSecureOk ok) { secure(conn, ok.getResponse()); } - @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) + @Override + public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) { + int okChannelMax = ok.getChannelMax(); + if (okChannelMax > getChannelMax()) + { + _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a channelMax (" + okChannelMax + + ") above the servers offered limit (" + getChannelMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + conn.getSender().close(); + return; + } + + //0 means no implied limit, except available server resources + //(or that forced by protocol limitations [0xFFFF]) + conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax); } - @Override public void connectionOpen(Connection conn, ConnectionOpen open) + @Override + public void connectionOpen(Connection conn, ConnectionOpen open) { - conn.connectionOpenOk(Collections.EMPTY_LIST); + conn.connectionOpenOk(Collections.emptyList()); conn.setState(OPEN); } @@ -168,7 +194,8 @@ public class ServerDelegate extends ConnectionDelegate return new Session(conn, new Binary(atc.getName()), 0); } - @Override public void sessionAttach(Connection conn, SessionAttach atc) + @Override + public void sessionAttach(Connection conn, SessionAttach atc) { Session ssn = getSession(conn, atc); conn.map(ssn, atc.getChannel()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java index bd9b18d848..02d0d6f334 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java @@ -75,7 +75,7 @@ public class ChannelLoggingTest extends AbstractTestLogging String log = getLogMessage(results, 0); // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create validateMessageID("CHN-1001", log); - assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log))); if (isBroker08()) { @@ -89,7 +89,7 @@ public class ChannelLoggingTest extends AbstractTestLogging log = getLogMessage(results, 0); // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} validateMessageID("CHN-1004", log); - assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log))); assertTrue("Prefetch Count not correct",getMessageString(fromMessage(log)).endsWith("Count "+PREFETCH)); } @@ -306,7 +306,7 @@ public class ChannelLoggingTest extends AbstractTestLogging validateMessageID("CHN-1001", open); validateMessageID("CHN-1003", close); assertEquals("Message should be Close", "Close", getMessageString(fromMessage(close))); - assertEquals("Incorrect Channel ID closed", 1, getChannelID(fromSubject(close))); + assertEquals("Incorrect Channel ID closed", isBroker010()? 0 : 1, getChannelID(fromSubject(close))); assertEquals("Channel IDs should be the same", getChannelID(fromActor(open)), getChannelID(fromSubject(close))); assertEquals("Connection IDs should be the same", getConnectionID(fromActor(open)), getConnectionID(fromSubject(close))); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 5e83b0569d..292bcd6039 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -266,19 +266,22 @@ public class AMQConnectionTest extends QpidBrokerTestCase } } - public void testGetChannelID() + public void testGetChannelID() throws Exception { - int maxChannelID = 65536; + long maxChannelID = _connection.getMaximumChannelCount(); if (isBroker010()) { - maxChannelID = Integer.MAX_VALUE+1; + //Usable numbers are 0 to N-1 when using 0-10 + //and 1 to N for 0-8/0-9 + maxChannelID = maxChannelID-1; } for (int j = 0; j < 3; j++) { - for (int i = 1; i < maxChannelID; i++) + int i = isBroker010() ? 0 : 1; + for ( ; i <= maxChannelID; i++) { int id = _connection.getNextChannelID(); - assertEquals("On iterartion "+j, i, id); + assertEquals("Unexpected number on iteration "+j, i, id); _connection.deregisterSession(id); } } |
