summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-02-04 16:15:27 +0000
committerRobert Gemmell <robbie@apache.org>2011-02-04 16:15:27 +0000
commitb401fdc69fefb860e6f45d65f6fc2e0ccf1ce82f (patch)
treede1a66f3edf29ca2f2b3c1efe2961d2930f1fc96 /java/common
parent6741e34eb4f223a5f6c4cc33c7c603a2ba0b015b (diff)
downloadqpid-python-b401fdc69fefb860e6f45d65f6fc2e0ccf1ce82f.tar.gz
QPID-3029: actually set and negotiate the supported max num channels per connection during connection handshake. Enable/make the 0-10 client use channel numbers 0 to N-1 in line with the spec, rather than 1-N.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java25
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java49
3 files changed, 61 insertions, 19 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 82fb57eb7d..bce64075e5 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -86,7 +86,8 @@ public class ClientDelegate extends ConnectionDelegate
}
}
- @Override public void connectionStart(Connection conn, ConnectionStart start)
+ @Override
+ public void connectionStart(Connection conn, ConnectionStart start)
{
Map<String,Object> clientProperties = new HashMap<String,Object>();
@@ -156,7 +157,8 @@ public class ClientDelegate extends ConnectionDelegate
}
}
- @Override public void connectionSecure(Connection conn, ConnectionSecure secure)
+ @Override
+ public void connectionSecure(Connection conn, ConnectionSecure secure)
{
SaslClient sc = conn.getSaslClient();
try
@@ -170,9 +172,9 @@ public class ClientDelegate extends ConnectionDelegate
}
}
- @Override public void connectionTune(Connection conn, ConnectionTune tune)
+ @Override
+ public void connectionTune(Connection conn, ConnectionTune tune)
{
- conn.setChannelMax(tune.getChannelMax());
int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
@@ -182,10 +184,17 @@ public class ClientDelegate extends ConnectionDelegate
hb_interval);
// The idle timeout is twice the heartbeat amount (in milisecs)
conn.setIdleTimeout(hb_interval*1000*2);
+
+ int channelMax = tune.getChannelMax();
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
+
conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
}
- @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
+ @Override
+ public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
{
SaslClient sc = conn.getSaslClient();
if (sc != null)
@@ -210,12 +219,14 @@ public class ClientDelegate extends ConnectionDelegate
conn.setState(OPEN);
}
- @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir)
+ @Override
+ public void connectionRedirect(Connection conn, ConnectionRedirect redir)
{
throw new UnsupportedOperationException();
}
- @Override public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ @Override
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
{
conn.connectionHeartbeat();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 8abae7a23e..fd19fa0512 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -59,6 +59,9 @@ public class Connection extends ConnectionInvoker
protected static final Logger log = Logger.get(Connection.class);
+ //Usable channels are numbered 0 to <ChannelMax> - 1
+ public static final int MAX_CHANNEL_MAX = 0xFFFF;
+ public static final int MIN_USABLE_CHANNEL_NUM = 0;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
@@ -404,7 +407,8 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
- for (int i = 1; i <= getChannelMax(); i++)
+ //For a negotiated channelMax N, there are channels 0 to N-1 available.
+ for (int i = 0; i < getChannelMax(); i++)
{
if (!channels.containsKey(i))
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 644a2daa58..b8e7616a37 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -30,6 +30,8 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* ServerDelegate
@@ -38,8 +40,8 @@ import javax.security.sasl.SaslServer;
public class ServerDelegate extends ConnectionDelegate
{
+ protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class);
- private SaslServer saslServer;
private List<Object> _locales;
private List<Object> _mechanisms;
private Map<String, Object> _clientProperties;
@@ -47,7 +49,7 @@ public class ServerDelegate extends ConnectionDelegate
public ServerDelegate()
{
- this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8"));
+ this(null, Collections.emptyList(), Collections.singletonList((Object)"utf8"));
}
protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales)
@@ -64,7 +66,8 @@ public class ServerDelegate extends ConnectionDelegate
conn.connectionStart(_clientProperties, _mechanisms, _locales);
}
- @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
+ @Override
+ public void connectionStartOk(Connection conn, ConnectionStartOk ok)
{
conn.setLocale(ok.getLocale());
String mechanism = ok.getMechanism();
@@ -75,9 +78,9 @@ public class ServerDelegate extends ConnectionDelegate
if (mechanism == null || mechanism.length() == 0)
{
conn.connectionTune
- (Integer.MAX_VALUE,
+ (getChannelMax(),
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, Integer.MAX_VALUE);
+ 0, getHeartbeatMax());
return;
}
@@ -118,7 +121,7 @@ public class ServerDelegate extends ConnectionDelegate
{
ss.dispose();
conn.connectionTune
- (Integer.MAX_VALUE,
+ (getChannelMax(),
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, getHeartbeatMax());
conn.setAuthorizationID(ss.getAuthorizationID());
@@ -140,19 +143,42 @@ public class ServerDelegate extends ConnectionDelegate
return Integer.MAX_VALUE;
}
- @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
+ protected int getChannelMax()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
{
secure(conn, ok.getResponse());
}
- @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
+ @Override
+ public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
+ int okChannelMax = ok.getChannelMax();
+ if (okChannelMax > getChannelMax())
+ {
+ _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a channelMax (" + okChannelMax +
+ ") above the servers offered limit (" + getChannelMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ conn.getSender().close();
+ return;
+ }
+
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
}
- @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ @Override
+ public void connectionOpen(Connection conn, ConnectionOpen open)
{
- conn.connectionOpenOk(Collections.EMPTY_LIST);
+ conn.connectionOpenOk(Collections.emptyList());
conn.setState(OPEN);
}
@@ -168,7 +194,8 @@ public class ServerDelegate extends ConnectionDelegate
return new Session(conn, new Binary(atc.getName()), 0);
}
- @Override public void sessionAttach(Connection conn, SessionAttach atc)
+ @Override
+ public void sessionAttach(Connection conn, SessionAttach atc)
{
Session ssn = getSession(conn, atc);
conn.map(ssn, atc.getChannel());