diff options
Diffstat (limited to 'java/broker')
4 files changed, 23 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 51b585ecc5..b8db7371b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -72,7 +72,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB // TODO - set clusterId
- session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 0, null));
+ session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, body.getMajor(), body.getMinor(), null));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 8cc747200f..d87821aa46 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -539,17 +539,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * NOTE: Both major and minor will be set to 0 prior to protocol initiation. */ - public byte getAmqpMajor() + public byte getProtocolMajorVersion() { return _major; } - public byte getAmqpMinor() + public byte getProtocolMinorVersion() { return _minor; } - public boolean amqpVersionEquals(byte major, byte minor) + public boolean isProtocolVersion(byte major, byte minor) { return _major == major && _minor == minor; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 48c05058b0..ed998b33c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -35,6 +35,7 @@ public interface AMQProtocolSession extends AMQProtocolWriter { + public static interface Task { public void doTask(AMQProtocolSession session) throws AMQException; @@ -143,4 +144,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter void removeSessionCloseTask(Task task); + byte getProtocolMajorVersion(); + + byte getProtocolMinorVersion(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index c227cd5094..23a5da0a30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -541,7 +541,7 @@ public class AMQMessage public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { - ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag); + ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); @@ -585,7 +585,7 @@ public class AMQMessage public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { - ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize); + ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); @@ -627,11 +627,11 @@ public class AMQMessage } - private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag) + private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { BasicPublishBody pb = getPublishBody(); - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, (byte) 8, (byte) 0, consumerTag, + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, deliveryTag, pb.exchange, _messageHandle.isRedelivered(), pb.routingKey); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? @@ -640,11 +640,13 @@ public class AMQMessage return buf; } - private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize) + private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { BasicPublishBody pb = getPublishBody(); - AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0, + AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), deliveryTag, pb.exchange, queueSize, _messageHandle.isRedelivered(), @@ -655,9 +657,12 @@ public class AMQMessage return buf; } - private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException + private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange, + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + getPublishBody().exchange, replyCode, replyText, getPublishBody().routingKey); ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? @@ -669,7 +674,7 @@ public class AMQMessage public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText); + ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); |
