From 8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 18 Jan 2007 21:39:29 +0000 Subject: Changed Content to use ByteBuffer, added Message.Transfer and Message.Cancel handlers git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497585 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/AMQChannel.java | 2 +- .../apache/qpid/server/handler/MessageCancelHandler.java | 13 ++++++++++++- .../apache/qpid/server/handler/MessageTransferHandler.java | 4 ++-- .../main/java/org/apache/qpid/server/queue/AMQMessage.java | 2 +- 4 files changed, 16 insertions(+), 5 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 6b9ceff053..e34104914f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -197,7 +197,7 @@ public class AMQChannel route(message); break; case CONTENT_TYPE_REFERENCE: - getMessages(body.getContent()).add(message); + getMessages(body.getContentAsByteArray()).add(message); break; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java index ad41910141..fb2b9b7252 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java @@ -21,8 +21,11 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; @@ -48,7 +51,15 @@ public class MessageCancelHandler implements StateAwareMethodListener evt) throws AMQException { - // TODO + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + final MessageCancelBody body = evt.getMethod(); + channel.unsubscribeConsumer(protocolSession, body.destination); + + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9); + protocolSession.writeResponse(evt, methodBody); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 353af5bd04..32b34c74a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -74,11 +74,11 @@ public class MessageTransferHandler implements StateAwareMethodListener