diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-09 16:05:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-09 16:05:05 +0000 |
| commit | c303d65ac74d5324b885da7cf7dbf655af8a93e2 (patch) | |
| tree | 55ef9bb4fa0dd091d84ad0c87cde63ae709e6513 /qpid/java/client/src | |
| parent | b52953d99d197049e34a8e05d033e9f44b44f353 (diff) | |
| download | qpid-python-c303d65ac74d5324b885da7cf7dbf655af8a93e2.tar.gz | |
QPID-4429 : [Java] Implement max frame size negotiation checks in 0-x protocols
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616977 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
3 files changed, 24 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 617380e149..1f2df2026b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -72,6 +72,8 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(), params.getFrameMax(), params.getHeartbeat()); + + session.setMaxFrameSize(params.getFrameMax()); // Be aware of possible changes to parameter order as versions change. session.writeFrame(tuneOkBody.generateFrame(channelId)); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 563518bdfe..681082526c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -47,7 +47,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -165,7 +165,7 @@ public class AMQProtocolHandler implements ProtocolEngine /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); - private AMQCodecFactory _codecFactory; + private AMQDecoder _decoder; private ProtocolVersion _suggestedProtocolVersion; @@ -189,7 +189,7 @@ public class AMQProtocolHandler implements ProtocolEngine _connection = con; _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); - _codecFactory = new AMQCodecFactory(false, _protocolSession); + _decoder = new AMQDecoder(false, _protocolSession); _failoverHandler = new FailoverHandler(this); } @@ -460,7 +460,7 @@ public class AMQProtocolHandler implements ProtocolEngine _lastReadTime = System.currentTimeMillis(); try { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); // Decode buffer int size = dataBlocks.size(); @@ -944,4 +944,9 @@ public class AMQProtocolHandler implements ProtocolEngine { _heartbeatListener.heartbeatReceived(); } + + public void setMaxFrameSize(final long frameMax) + { + _decoder.setMaxFrameSize(frameMax == 0l ? 0xffffffffl : frameMax); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 121715d439..2c69aa1b51 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,8 +20,16 @@ */ package org.apache.qpid.client.protocol; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.jms.JMSException; +import javax.security.sasl.SaslClient; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -47,13 +55,6 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; - -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - /** * Wrapper for protocol session that provides type-safe access to session attributes. * <p> @@ -543,4 +544,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return _connectionStartServerProperties; } + + public void setMaxFrameSize(final long frameMax) + { + _protocolHandler.setMaxFrameSize(frameMax); + } } |
