diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-09 16:05:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-09 16:05:05 +0000 |
| commit | c303d65ac74d5324b885da7cf7dbf655af8a93e2 (patch) | |
| tree | 55ef9bb4fa0dd091d84ad0c87cde63ae709e6513 /qpid/java/broker-plugins | |
| parent | b52953d99d197049e34a8e05d033e9f44b44f353 (diff) | |
| download | qpid-python-c303d65ac74d5324b885da7cf7dbf655af8a93e2.tar.gz | |
QPID-4429 : [Java] Implement max frame size negotiation checks in 0-x protocols
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616977 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
7 files changed, 92 insertions, 43 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 8ce014dacc..dc60a37a7f 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -33,6 +33,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; @@ -91,7 +92,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); + Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + _connection.setSender(disassembler); + _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 7751ff765d..bab2d802e8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionOpen; -import org.apache.qpid.transport.ConnectionOpenOk; -import org.apache.qpid.transport.ConnectionStartOk; -import org.apache.qpid.transport.ConnectionTuneOk; -import org.apache.qpid.transport.ServerDelegate; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionAttach; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.NetworkConnection; public class ServerConnectionDelegate extends ServerDelegate @@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate private int _maxNoOfChannels; private Map<String,Object> _clientProperties; private final SubjectCreator _subjectCreator; + private int _maximumFrameSize; - public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator) + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); } private ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, - Broker broker, + Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { @@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _subjectCreator = subjectCreator; + _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE)); } - private static List<String> getFeatures(Broker broker) + private static List<String> getFeatures(Broker<?> broker) { String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES); final List<String> features = new ArrayList<String>(); @@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate return Collections.unmodifiableList(features); } - private static Map<String, Object> createConnectionProperties(final Broker broker) + private static Map<String, Object> createConnectionProperties(final Broker<?> broker) { final Map<String,Object> map = new HashMap<String,Object>(); // Federation tag is used by the client to identify the broker instance @@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate { ServerConnection sconn = (ServerConnection) conn; int okChannelMax = ok.getChannelMax(); + int okMaxFrameSize = ok.getMaxFrameSize(); if (okChannelMax > getChannelMax()) { @@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate return; } + if(okMaxFrameSize > getFrameMax()) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") above the server's offered limit (" + getFrameMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize == 0) + { + okMaxFrameSize = getFrameMax(); + } + final NetworkConnection networkConnection = sconn.getNetworkConnection(); if(ok.hasHeartbeat()) { @@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate } setConnectionTuneOkChannelMax(sconn, okChannelMax); + + conn.setMaxFrameSize(okMaxFrameSize); } @Override @@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate _maxNoOfChannels = channelMax; } + @Override + protected int getFrameMax() + { + return _maximumFrameSize; + } + @Override public void sessionDetach(Connection conn, SessionDetach dtc) { // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 484ca6f404..1c264e52c6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -44,37 +44,38 @@ import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.SessionModelListener; -import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -124,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final AMQStateManager _stateManager; - private AMQCodecFactory _codecFactory; + private AMQDecoder _decoder; private SaslServer _saslServer; @@ -187,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); - _codecFactory = new AMQCodecFactory(true, this); + _decoder = new AMQDecoder(true, this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -250,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setMaxFrameSize(long frameMax) { _maxFrameSize = frameMax; + _decoder.setMaxFrameSize(frameMax); } public long getMaxFrameSize() @@ -299,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _receivedLock.lock(); try { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); for (AMQDataBlock dataBlock : dataBlocks) { try @@ -493,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private synchronized void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); + _decoder.setExpectProtocolInitiation(false); try { // Log incoming protocol negotiation request diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java index a2b596e2b1..92552cb011 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java @@ -33,7 +33,6 @@ import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); SubjectCreator subjectCreator = stateManager.getSubjectCreator(); @@ -99,7 +98,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); session.setAuthorizedSubject(authResult.getSubject()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java index dc4f010a66..d6801c0fbc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java @@ -32,7 +32,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); _logger.info("SASL Mechanism selected: " + body.getMechanism()); @@ -113,7 +112,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class,Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); break; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java index 5fddab6576..108c19dbaf 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -49,8 +52,29 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C _logger.debug(body); } stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + session.initHeartbeats(body.getHeartbeat()); - session.setMaxFrameSize(body.getFrameMax()); + + long brokerFrameMax = stateManager.getBroker().getContextValue(Long.class,Broker.BROKER_FRAME_SIZE); + if(brokerFrameMax != 0 && body.getFrameMax() > brokerFrameMax) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "greater than the broker will allow: " + + brokerFrameMax, + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "which is smaller than the specification definined minimum: " + + AMQConstant.FRAME_MIN_SIZE.getCode(), + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + session.setMaxFrameSize(body.getFrameMax()== 0l ? (brokerFrameMax == 0l ? 0xFFFFFFFFl : brokerFrameMax) : body.getFrameMax()); long maxChannelNumber = body.getChannelMax(); //0 means no implied limit, except that forced by protocol limitations (0xFFFF) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java index cb9295ac49..3c1f1dedc3 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java @@ -51,12 +51,12 @@ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - private final Broker _broker; + private final Broker<?> _broker; private final AMQProtocolSession _protocolSession; /** The current state */ private AMQState _currentState; - public AMQStateManager(Broker broker, AMQProtocolSession protocolSession) + public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession) { _broker = broker; _protocolSession = protocolSession; @@ -69,7 +69,7 @@ public class AMQStateManager implements AMQMethodListener * * @return the Broker */ - public Broker getBroker() + public Broker<?> getBroker() { return _broker; } |
