diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-18 21:39:29 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-18 21:39:29 +0000 |
| commit | 8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd (patch) | |
| tree | 281f62cc2fe708a5d76d1986fc65e6a491ef2e56 /java/broker/src | |
| parent | 93bddfd4c9260f958eab861a8a43db55bb836690 (diff) | |
| download | qpid-python-8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd.tar.gz | |
Changed Content to use ByteBuffer, added Message.Transfer and Message.Cancel handlers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
4 files changed, 16 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 6b9ceff053..e34104914f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -197,7 +197,7 @@ public class AMQChannel route(message); break; case CONTENT_TYPE_REFERENCE: - getMessages(body.getContent()).add(message); + getMessages(body.getContentAsByteArray()).add(message); break; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java index ad41910141..fb2b9b7252 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java @@ -21,8 +21,11 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; @@ -48,7 +51,15 @@ public class MessageCancelHandler implements StateAwareMethodListener<MessageCan AMQMethodEvent<MessageCancelBody> evt) throws AMQException { - // TODO + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + final MessageCancelBody body = evt.getMethod(); + channel.unsubscribeConsumer(protocolSession, body.destination); + + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9); + protocolSession.writeResponse(evt, methodBody); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 353af5bd04..32b34c74a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -74,11 +74,11 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT protocolSession.closeChannel(evt.getChannelId()); // TODO: modify code gen to make getClazz and getMethod public methods rather than protected // then we can remove the hardcoded 0,0 - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQMethodBody cf = ChannelCloseBody.createMethodBody - ((byte)8, (byte)0, // AMQP version (major, minor) + ((byte)0, (byte)9, // AMQP version (major, minor) MessageTransferBody.getClazz((byte)0, (byte)9), // classId MessageTransferBody.getMethod((byte)0, (byte)9), // methodId 500, // replyCode diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index e0457748f4..44994fe161 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -150,7 +150,7 @@ public class AMQMessage Content body = _transferBody.getBody(); switch (body.getContentType()) { case CONTENT_TYPE_INLINE: - return _transferBody.getBody().getContent().length; + return _transferBody.getBody().getContent().limit(); case CONTENT_TYPE_REFERENCE: return getReferenceSize(); default: |
