summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-18 21:39:29 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-18 21:39:29 +0000
commit8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd (patch)
tree281f62cc2fe708a5d76d1986fc65e6a491ef2e56 /java/broker/src
parent93bddfd4c9260f958eab861a8a43db55bb836690 (diff)
downloadqpid-python-8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd.tar.gz
Changed Content to use ByteBuffer, added Message.Transfer and Message.Cancel handlers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java2
4 files changed, 16 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 6b9ceff053..e34104914f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -197,7 +197,7 @@ public class AMQChannel
route(message);
break;
case CONTENT_TYPE_REFERENCE:
- getMessages(body.getContent()).add(message);
+ getMessages(body.getContentAsByteArray()).add(message);
break;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
index ad41910141..fb2b9b7252 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
@@ -21,8 +21,11 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -48,7 +51,15 @@ public class MessageCancelHandler implements StateAwareMethodListener<MessageCan
AMQMethodEvent<MessageCancelBody> evt)
throws AMQException
{
- // TODO
+ final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ final MessageCancelBody body = evt.getMethod();
+ channel.unsubscribeConsumer(protocolSession, body.destination);
+
+ // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9);
+ protocolSession.writeResponse(evt, methodBody);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
index 353af5bd04..32b34c74a3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
@@ -74,11 +74,11 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT
protocolSession.closeChannel(evt.getChannelId());
// TODO: modify code gen to make getClazz and getMethod public methods rather than protected
// then we can remove the hardcoded 0,0
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody cf = ChannelCloseBody.createMethodBody
- ((byte)8, (byte)0, // AMQP version (major, minor)
+ ((byte)0, (byte)9, // AMQP version (major, minor)
MessageTransferBody.getClazz((byte)0, (byte)9), // classId
MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
500, // replyCode
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index e0457748f4..44994fe161 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -150,7 +150,7 @@ public class AMQMessage
Content body = _transferBody.getBody();
switch (body.getContentType()) {
case CONTENT_TYPE_INLINE:
- return _transferBody.getBody().getContent().length;
+ return _transferBody.getBody().getContent().limit();
case CONTENT_TYPE_REFERENCE:
return getReferenceSize();
default: