summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-04-21 15:23:17 +0000
committerAidan Skinner <aidan@apache.org>2009-04-21 15:23:17 +0000
commit2574691af14be919eec4e7fd54530368cba67d04 (patch)
tree4d0f39ae3852c763859d9042ec07a478493ae035 /java/client
parent330a880e9d396815a51d70768a5d2cd12aacbca9 (diff)
downloadqpid-python-2574691af14be919eec4e7fd54530368cba67d04.tar.gz
QPID-1823: Allow recycling of channel IDs
AMQConnection.getNextChannelID: add method to abstract channel id assignment, allow max to be set AMQConnectionDelegate*: add getMaxChannelID AMQConnectionDelegate_0_10: use getNextChannelID for this session-id SessionCreateTest: add test that attempts to create 65555 sessions on one connection AMQConnectionTest: add unit test for getNextChannelID SessionCreateTest takes a long, long time to run so is excluded by default git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@767185 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java60
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java8
4 files changed, 75 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 39acee3a60..e6968100f3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -90,6 +90,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
private int _size = 0;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private AtomicInteger _idFactory = new AtomicInteger(0);
+ private int _maxChannelID;
+ private boolean _cycledIds;
public AMQSession get(int channelId)
{
@@ -179,11 +182,57 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_fastAccessSessions[i] = null;
}
}
+
+ /*
+ * Synchronized on whole method so that we don't need to consider the
+ * increment-then-reset path in too much detail
+ */
+ public synchronized int getNextChannelId()
+ {
+ int id = 0;
+ if (!_cycledIds)
+ {
+ id = _idFactory.incrementAndGet();
+ if (id == _maxChannelID)
+ {
+ _cycledIds = true;
+ _idFactory.set(0); // Go back to the start
+ }
+ }
+ else
+ {
+ boolean done = false;
+ while (!done)
+ {
+ // Needs to work second time through
+ id = _idFactory.incrementAndGet();
+ if (id > _maxChannelID)
+ {
+ _idFactory.set(0);
+ id = _idFactory.incrementAndGet();
+ }
+ if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ done = (_fastAccessSessions[id] == null);
+ }
+ else
+ {
+ done = (!_slowAccessSessions.keySet().contains(id));
+ }
+ }
+ }
+
+ return id;
+ }
+
+ public void setMaxChannelID(int maxChannelID)
+ {
+ _maxChannelID = maxChannelID;
+ }
}
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
- protected AtomicInteger _idFactory = new AtomicInteger(0);
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
@@ -415,6 +464,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_delegate = new AMQConnectionDelegate_0_10(this);
}
+ _sessions.setMaxChannelID(_delegate.getMaxChannelID());
if (_logger.isInfoEnabled())
{
@@ -567,6 +617,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
+ _sessions.setMaxChannelID(_delegate.getMaxChannelID());
}
catch (ClassNotFoundException e)
{
@@ -1395,7 +1446,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_sessions.put(channelId, session);
}
- void deregisterSession(int channelId)
+ public void deregisterSession(int channelId)
{
_sessions.remove(channelId);
}
@@ -1540,4 +1591,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_delegate.setIdleTimeout(l);
}
+
+ public int getNextChannelID()
+ {
+ return _sessions.getNextChannelId();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index cec840f5c6..e5980d8b7d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -50,4 +50,6 @@ public interface AMQConnectionDelegate
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
void setIdleTimeout(long l);
+
+ int getMaxChannelID();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index c2fb05d94e..927929c94a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -79,7 +79,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
throws JMSException
{
_conn.checkNotClosed();
- int channelId = _conn._idFactory.incrementAndGet();
+ int channelId = _conn.getNextChannelID();
AMQSession session;
try
{
@@ -105,7 +105,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
{
_conn.checkNotClosed();
- int channelId = _conn._idFactory.incrementAndGet();
+ int channelId = _conn.getNextChannelID();
XASessionImpl session;
try
{
@@ -284,4 +284,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_qpidConnection.setIdleTimeout(l);
}
+
+ @Override
+ public int getMaxChannelID()
+ {
+ return Integer.MAX_VALUE;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 17090875a7..a0b69b5493 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -138,7 +138,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
{
- int channelId = _conn._idFactory.incrementAndGet();
+ int channelId = _conn.getNextChannelID();
if (_logger.isDebugEnabled())
{
@@ -289,4 +289,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
public void setIdleTimeout(long l){}
+
+ @Override
+ public int getMaxChannelID()
+ {
+ return (int) (Math.pow(2, 16)-1);
+ }
}