From 8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 18 Jan 2007 21:39:29 +0000 Subject: Changed Content to use ByteBuffer, added Message.Transfer and Message.Cancel handlers git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497585 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 2 +- .../qpid/server/handler/MessageCancelHandler.java | 13 ++++++- .../server/handler/MessageTransferHandler.java | 4 +-- .../org/apache/qpid/server/queue/AMQMessage.java | 2 +- .../apache/qpid/client/BasicMessageProducer.java | 6 ++-- .../client/message/AbstractJMSMessageFactory.java | 2 +- .../main/java/org/apache/qpid/framing/Content.java | 41 ++++++++++++---------- 7 files changed, 42 insertions(+), 28 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 6b9ceff053..e34104914f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -197,7 +197,7 @@ public class AMQChannel route(message); break; case CONTENT_TYPE_REFERENCE: - getMessages(body.getContent()).add(message); + getMessages(body.getContentAsByteArray()).add(message); break; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java index ad41910141..fb2b9b7252 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java @@ -21,8 +21,11 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; @@ -48,7 +51,15 @@ public class MessageCancelHandler implements StateAwareMethodListener evt) throws AMQException { - // TODO + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + final MessageCancelBody body = evt.getMethod(); + channel.unsubscribeConsumer(protocolSession, body.destination); + + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9); + protocolSession.writeResponse(evt, methodBody); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 353af5bd04..32b34c74a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -74,11 +74,11 @@ public class MessageTransferHandler implements StateAwareMethodListener= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array()); + bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice()); remaining -= length; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 5d77b022d5..f1ce5796d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -41,7 +41,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory MessageHeaders contentHeader, Content body) throws AMQException { ByteBuffer data; - data = ByteBuffer.allocate(body.content.length); + data = ByteBuffer.allocate(body.content.remaining()); data.put(body.content); data.flip(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/Content.java b/java/common/src/main/java/org/apache/qpid/framing/Content.java index d1d8c66995..4008ead21f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/Content.java +++ b/java/common/src/main/java/org/apache/qpid/framing/Content.java @@ -43,7 +43,7 @@ public class Content } public ContentTypeEnum contentType; - public byte[] content; + public ByteBuffer content; // Constructors @@ -63,20 +63,13 @@ public class Content throw new IllegalArgumentException("Content cannot be empty for a ref type."); } this.contentType = contentType; - this.content = content; + this.content = ByteBuffer.allocate(content.length); + this.content.put(content); } - public Content(ContentTypeEnum contentType, String content) + public Content(ContentTypeEnum contentType, String contentStr) { - if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE) - { - if (content == null) - throw new IllegalArgumentException("Content cannot be null for a ref type."); - if (content.length() == 0) - throw new IllegalArgumentException("Content cannot be empty for a ref type."); - } - this.contentType = contentType; - this.content = content.getBytes(); + this(contentType, contentStr.getBytes()); } public Content(ContentTypeEnum contentType, ByteBuffer content) @@ -89,18 +82,26 @@ public class Content throw new IllegalArgumentException("Content cannot be empty for a ref type."); } this.contentType = contentType; - this.content = content.array(); + this.content = content; } // Get functions public ContentTypeEnum getContentType() { return contentType; } - public byte[] getContent() { return content; } + public ByteBuffer getContent() { return content; } + + public byte[] getContentAsByteArray() + { + byte[] ba = new byte[content.remaining()]; + content.get(ba); + return ba; + } + public String getContentAsString() { if (content == null) return null; - return new String(content); + return new String(getContentAsByteArray()); } // Wire functions @@ -109,18 +110,22 @@ public class Content { if (content == null) return 1 + 4; - return 1 + 4 + content.length; + return 1 + 4 + content.remaining(); } public void writePayload(ByteBuffer buffer) { EncodingUtils.writeUnsignedByte(buffer, contentType.toByte()); - EncodingUtils.writeLongStringBytes(buffer, content); + EncodingUtils.writeUnsignedInteger(buffer, content.remaining()); + buffer.put(content); } public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException { contentType = ContentTypeEnum.toContentEnum(buffer.get()); - content = EncodingUtils.readLongstr(buffer); + int length = buffer.getInt(); + content = buffer.slice(); + buffer.skip(length); + content.limit(length); } } -- cgit v1.2.1