diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-02-04 16:15:27 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-02-04 16:15:27 +0000 |
| commit | b401fdc69fefb860e6f45d65f6fc2e0ccf1ce82f (patch) | |
| tree | de1a66f3edf29ca2f2b3c1efe2961d2930f1fc96 /java/client/src | |
| parent | 6741e34eb4f223a5f6c4cc33c7c603a2ba0b015b (diff) | |
| download | qpid-python-b401fdc69fefb860e6f45d65f6fc2e0ccf1ce82f.tar.gz | |
QPID-3029: actually set and negotiate the supported max num channels per connection during connection handshake. Enable/make the 0-10 client use channel numbers 0 to N-1 in line with the spec, rather than 1-N.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
8 files changed, 206 insertions, 165 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b483406949..af0d8a3a1d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -26,15 +26,12 @@ import java.net.ConnectException; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -84,153 +81,6 @@ import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { - public static final class ChannelToSessionMap - { - private final AMQSession[] _fastAccessSessions = new AMQSession[16]; - private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; - private AtomicInteger _idFactory = new AtomicInteger(0); - private int _maxChannelID; - private boolean _cycledIds; - - public AMQSession get(int channelId) - { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } - } - - public AMQSession put(int channelId, AMQSession session) - { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - - } - - public AMQSession remove(int channelId) - { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - - } - - public Collection<AMQSession> values() - { - ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; - } - - public int size() - { - return _size; - } - - public void clear() - { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } - } - - /* - * Synchronized on whole method so that we don't need to consider the - * increment-then-reset path in too much detail - */ - public synchronized int getNextChannelId() - { - int id = 0; - if (!_cycledIds) - { - id = _idFactory.incrementAndGet(); - if (id == _maxChannelID) - { - _cycledIds = true; - _idFactory.set(0); // Go back to the start - } - } - else - { - boolean done = false; - while (!done) - { - // Needs to work second time through - id = _idFactory.incrementAndGet(); - if (id > _maxChannelID) - { - _idFactory.set(0); - id = _idFactory.incrementAndGet(); - } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } - } - } - - return id; - } - - public void setMaxChannelID(int maxChannelID) - { - _maxChannelID = maxChannelID; - } - } - private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); @@ -244,9 +94,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session - * and we must prevent the client from opening too many. Zero means unlimited. + * and we must prevent the client from opening too many. */ - protected long _maximumChannelCount; + private long _maximumChannelCount; /** The maximum size of frame supported by the server */ private long _maximumFrameSize; @@ -489,7 +339,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate = new AMQConnectionDelegate_0_10(this); } - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -570,8 +419,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); - if (_logger.isDebugEnabled()) { _logger.debug("Are we connected:" + _connected); @@ -579,6 +426,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } + String message = null; if (connectionException != null) @@ -620,6 +472,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } + _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); + _sessions.setMinChannelID(_delegate.getMinChannelID()); + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -647,7 +504,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); - _sessions.setMaxChannelID(_delegate.getMaxChannelID()); //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); @@ -898,7 +754,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean channelLimitReached() { - return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); + return _sessions.size() >= _maximumChannelCount; } public String getClientID() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 5f93ec6c47..9560bd5c7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -59,6 +59,8 @@ public interface AMQConnectionDelegate <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; int getMaxChannelID(); - + + int getMinChannelID(); + ProtocolVersion getProtocolVersion(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index adfd178ec3..4b4417b6ef 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -37,6 +37,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Connection; @@ -82,6 +83,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec throws JMSException { _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); + } + int channelId = _conn.getNextChannelID(); AMQSession session; try @@ -120,6 +127,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); + } + int channelId = _conn.getNextChannelID(); XASessionImpl session; try @@ -165,6 +178,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); + _conn.setMaximumChannelCount(_qpidConnection.getChannelMax()); _conn._failoverPolicy.attainedConnection(); } catch (ProtocolVersionException pe) @@ -293,7 +307,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public int getMaxChannelID() { - return Integer.MAX_VALUE; + //For a negotiated channelMax N, there are channels 0 to N-1 available. + return _qpidConnection.getChannelMax() - 1; + } + + public int getMinChannelID() + { + return Connection.MIN_USABLE_CHANNEL_NUM; } public ProtocolVersion getProtocolVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 9cee4dab53..40b332d216 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -36,6 +36,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.transport.TransportConnection; @@ -134,7 +135,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate if (_conn.channelLimitReached()) { - throw new ChannelLimitReachedException(_conn._maximumChannelCount); + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); } return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( @@ -307,7 +308,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public int getMaxChannelID() { - return (int) (Math.pow(2, 16)-1); + ConnectionTuneParameters params = _conn.getProtocolHandler().getProtocolSession().getConnectionTuneParameters(); + + return params == null ? AMQProtocolSession.MAX_CHANNEL_MAX : params.getChannelMax(); + } + + public int getMinChannelID() + { + return AMQProtocolSession.MIN_USABLE_CHANNEL_NUM; } public ProtocolVersion getProtocolVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java new file mode 100644 index 0000000000..2b7e3d44da --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -0,0 +1,147 @@ +package org.apache.qpid.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public final class ChannelToSessionMap +{ + private final AMQSession[] _fastAccessSessions = new AMQSession[16]; + private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); + private int _size = 0; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private AtomicInteger _idFactory = new AtomicInteger(0); + private int _maxChannelID; + private int _minChannelID; + + public AMQSession get(int channelId) + { + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + return _fastAccessSessions[channelId]; + } + else + { + return _slowAccessSessions.get(channelId); + } + } + + public AMQSession put(int channelId, AMQSession session) + { + AMQSession oldVal; + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + oldVal = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = session; + } + else + { + oldVal = _slowAccessSessions.put(channelId, session); + } + if ((oldVal != null) && (session == null)) + { + _size--; + } + else if ((oldVal == null) && (session != null)) + { + _size++; + } + + return session; + + } + + public AMQSession remove(int channelId) + { + AMQSession session; + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + session = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = null; + } + else + { + session = _slowAccessSessions.remove(channelId); + } + + if (session != null) + { + _size--; + } + return session; + + } + + public Collection<AMQSession> values() + { + ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); + + for (int i = 0; i < 16; i++) + { + if (_fastAccessSessions[i] != null) + { + values.add(_fastAccessSessions[i]); + } + } + values.addAll(_slowAccessSessions.values()); + + return values; + } + + public int size() + { + return _size; + } + + public void clear() + { + _size = 0; + _slowAccessSessions.clear(); + for (int i = 0; i < 16; i++) + { + _fastAccessSessions[i] = null; + } + } + + /* + * Synchronized on whole method so that we don't need to consider the + * increment-then-reset path in too much detail + */ + public synchronized int getNextChannelId() + { + int id = _minChannelID; + + boolean done = false; + while (!done) + { + id = _idFactory.getAndIncrement(); + if (id == _maxChannelID) + { + //go back to the start + _idFactory.set(_minChannelID); + } + if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) + { + done = (_fastAccessSessions[id] == null); + } + else + { + done = (!_slowAccessSessions.keySet().contains(id)); + } + } + + return id; + } + + public void setMaxChannelID(int maxChannelID) + { + _maxChannelID = maxChannelID; + } + + public void setMinChannelID(int minChannelID) + { + _minChannelID = minChannelID; + _idFactory.set(_minChannelID); + } +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 287b5957a1..d1b2caf987 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -55,9 +55,12 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con { params = new ConnectionTuneParameters(); } + + int maxChannelNumber = frame.getChannelMax(); + //0 implies no limit, except that forced by protocol limitations (0xFFFF) + params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber); params.setFrameMax(frame.getFrameMax()); - params.setChannelMax(frame.getChannelMax()); params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); session.setConnectionTuneParameters(params); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index f0edd0d7bf..7976760696 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -54,6 +54,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; + //Usable channels are numbered 1 to <ChannelMax> + public static final int MAX_CHANNEL_MAX = 0xFFFF; + public static final int MIN_USABLE_CHANNEL_NUM = 1; + protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; protected static final String AMQ_CONNECTION = "AMQConnection"; @@ -178,6 +182,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); + con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); _protocolHandler.initHeartbeats((int) params.getHeartbeat()); diff --git a/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java b/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java index 3d4a4573ed..e8c2b9d682 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java @@ -33,9 +33,9 @@ public class ChannelLimitReachedException extends ResourceAllocationException public ChannelLimitReachedException(long limit) { - super("Unable to create session since maximum number of sessions per connection is " + - limit + ". Either close one or more sessions or increase the " + - "maximum number of sessions per connection (or contact your AMQP administrator.", ERROR_CODE); + super("Unable to create session, the maximum number of sessions per connection is " + + limit + ". You must either close one or more sessions or increase the " + + "maximum number of sessions available per connection.", ERROR_CODE); _limit = limit; } |
