summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java23
4 files changed, 23 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 51b585ecc5..b8db7371b0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -72,7 +72,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
// TODO - set clusterId
- session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 0, null));
+ session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, body.getMajor(), body.getMinor(), null));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 8cc747200f..d87821aa46 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -539,17 +539,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* NOTE: Both major and minor will be set to 0 prior to protocol initiation.
*/
- public byte getAmqpMajor()
+ public byte getProtocolMajorVersion()
{
return _major;
}
- public byte getAmqpMinor()
+ public byte getProtocolMinorVersion()
{
return _minor;
}
- public boolean amqpVersionEquals(byte major, byte minor)
+ public boolean isProtocolVersion(byte major, byte minor)
{
return _major == major && _minor == minor;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 48c05058b0..ed998b33c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -35,6 +35,7 @@ public interface AMQProtocolSession extends AMQProtocolWriter
{
+
public static interface Task
{
public void doTask(AMQProtocolSession session) throws AMQException;
@@ -143,4 +144,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void removeSessionCloseTask(Task task);
+ byte getProtocolMajorVersion();
+
+ byte getProtocolMinorVersion();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index c227cd5094..23a5da0a30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -541,7 +541,7 @@ public class AMQMessage
public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag);
+ ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
@@ -585,7 +585,7 @@ public class AMQMessage
public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
{
- ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize);
+ ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
@@ -627,11 +627,11 @@ public class AMQMessage
}
- private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag)
+ private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
BasicPublishBody pb = getPublishBody();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, (byte) 8, (byte) 0, consumerTag,
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
deliveryTag, pb.exchange, _messageHandle.isRedelivered(),
pb.routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
@@ -640,11 +640,13 @@ public class AMQMessage
return buf;
}
- private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize)
+ private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
BasicPublishBody pb = getPublishBody();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0,
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
deliveryTag, pb.exchange,
queueSize,
_messageHandle.isRedelivered(),
@@ -655,9 +657,12 @@ public class AMQMessage
return buf;
}
- private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ getPublishBody().exchange,
replyCode, replyText,
getPublishBody().routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
@@ -669,7 +674,7 @@ public class AMQMessage
public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
+ ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());