diff options
| author | Aidan Skinner <aidan@apache.org> | 2009-04-21 15:23:17 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2009-04-21 15:23:17 +0000 |
| commit | 2574691af14be919eec4e7fd54530368cba67d04 (patch) | |
| tree | 4d0f39ae3852c763859d9042ec07a478493ae035 /java/client | |
| parent | 330a880e9d396815a51d70768a5d2cd12aacbca9 (diff) | |
| download | qpid-python-2574691af14be919eec4e7fd54530368cba67d04.tar.gz | |
QPID-1823: Allow recycling of channel IDs
AMQConnection.getNextChannelID: add method to abstract channel id assignment, allow max to be set
AMQConnectionDelegate*: add getMaxChannelID
AMQConnectionDelegate_0_10: use getNextChannelID for this session-id
SessionCreateTest: add test that attempts to create 65555 sessions on one connection
AMQConnectionTest: add unit test for getNextChannelID
SessionCreateTest takes a long, long time to run so is excluded by default
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@767185 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
4 files changed, 75 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 39acee3a60..e6968100f3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -90,6 +90,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); private int _size = 0; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private AtomicInteger _idFactory = new AtomicInteger(0); + private int _maxChannelID; + private boolean _cycledIds; public AMQSession get(int channelId) { @@ -179,11 +182,57 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _fastAccessSessions[i] = null; } } + + /* + * Synchronized on whole method so that we don't need to consider the + * increment-then-reset path in too much detail + */ + public synchronized int getNextChannelId() + { + int id = 0; + if (!_cycledIds) + { + id = _idFactory.incrementAndGet(); + if (id == _maxChannelID) + { + _cycledIds = true; + _idFactory.set(0); // Go back to the start + } + } + else + { + boolean done = false; + while (!done) + { + // Needs to work second time through + id = _idFactory.incrementAndGet(); + if (id > _maxChannelID) + { + _idFactory.set(0); + id = _idFactory.incrementAndGet(); + } + if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) + { + done = (_fastAccessSessions[id] == null); + } + else + { + done = (!_slowAccessSessions.keySet().contains(id)); + } + } + } + + return id; + } + + public void setMaxChannelID(int maxChannelID) + { + _maxChannelID = maxChannelID; + } } private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); - protected AtomicInteger _idFactory = new AtomicInteger(0); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -415,6 +464,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate = new AMQConnectionDelegate_0_10(this); } + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -567,6 +617,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); } catch (ClassNotFoundException e) { @@ -1395,7 +1446,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _sessions.put(channelId, session); } - void deregisterSession(int channelId) + public void deregisterSession(int channelId) { _sessions.remove(channelId); } @@ -1540,4 +1591,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate.setIdleTimeout(l); } + + public int getNextChannelID() + { + return _sessions.getNextChannelId(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index cec840f5c6..e5980d8b7d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -50,4 +50,6 @@ public interface AMQConnectionDelegate <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; void setIdleTimeout(long l); + + int getMaxChannelID(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index c2fb05d94e..927929c94a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -79,7 +79,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); AMQSession session; try { @@ -105,7 +105,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); XASessionImpl session; try { @@ -284,4 +284,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _qpidConnection.setIdleTimeout(l); } + + @Override + public int getMaxChannelID() + { + return Integer.MAX_VALUE; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 17090875a7..a0b69b5493 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -138,7 +138,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); if (_logger.isDebugEnabled()) { @@ -289,4 +289,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } public void setIdleTimeout(long l){} + + @Override + public int getMaxChannelID() + { + return (int) (Math.pow(2, 16)-1); + } } |
