diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-18 21:39:29 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-18 21:39:29 +0000 |
| commit | 8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd (patch) | |
| tree | 281f62cc2fe708a5d76d1986fc65e6a491ef2e56 /java | |
| parent | 93bddfd4c9260f958eab861a8a43db55bb836690 (diff) | |
| download | qpid-python-8a5d46b686eebfc0caf2f1e92eae3dea7b868ebd.tar.gz | |
Changed Content to use ByteBuffer, added Message.Transfer and Message.Cancel handlers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497585 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
7 files changed, 42 insertions, 28 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 6b9ceff053..e34104914f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -197,7 +197,7 @@ public class AMQChannel route(message); break; case CONTENT_TYPE_REFERENCE: - getMessages(body.getContent()).add(message); + getMessages(body.getContentAsByteArray()).add(message); break; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java index ad41910141..fb2b9b7252 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java @@ -21,8 +21,11 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; @@ -48,7 +51,15 @@ public class MessageCancelHandler implements StateAwareMethodListener<MessageCan AMQMethodEvent<MessageCancelBody> evt) throws AMQException { - // TODO + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + final MessageCancelBody body = evt.getMethod(); + channel.unsubscribeConsumer(protocolSession, body.destination); + + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9); + protocolSession.writeResponse(evt, methodBody); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 353af5bd04..32b34c74a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -74,11 +74,11 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT protocolSession.closeChannel(evt.getChannelId()); // TODO: modify code gen to make getClazz and getMethod public methods rather than protected // then we can remove the hardcoded 0,0 - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQMethodBody cf = ChannelCloseBody.createMethodBody - ((byte)8, (byte)0, // AMQP version (major, minor) + ((byte)0, (byte)9, // AMQP version (major, minor) MessageTransferBody.getClazz((byte)0, (byte)9), // classId MessageTransferBody.getMethod((byte)0, (byte)9), // methodId 500, // replyCode diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index e0457748f4..44994fe161 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -150,7 +150,7 @@ public class AMQMessage Content body = _transferBody.getBody(); switch (body.getContentType()) { case CONTENT_TYPE_INLINE: - return _transferBody.getBody().getContent().length; + return _transferBody.getBody().getContent().limit(); case CONTENT_TYPE_REFERENCE: return getReferenceSize(); default: diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 9e2beaa964..c585a66c58 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -557,8 +557,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j messageHeaders.setExpiration(0); } } -// messageHeaders.setDeliveryMode((byte) deliveryMode); -// messageHeaders.setPriority((byte) priority); int size = (payload != null) ? payload.limit() : 0; Content[] content = createContent(payload); @@ -656,7 +654,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (frameCount == 1) { - bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.array()); + bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload); } else { @@ -666,7 +664,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j payload.position((int) framePayloadMax * i); int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array()); + bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice()); remaining -= length; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 5d77b022d5..f1ce5796d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -41,7 +41,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory MessageHeaders contentHeader, Content body) throws AMQException { ByteBuffer data; - data = ByteBuffer.allocate(body.content.length); + data = ByteBuffer.allocate(body.content.remaining()); data.put(body.content); data.flip(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/Content.java b/java/common/src/main/java/org/apache/qpid/framing/Content.java index d1d8c66995..4008ead21f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/Content.java +++ b/java/common/src/main/java/org/apache/qpid/framing/Content.java @@ -43,7 +43,7 @@ public class Content } public ContentTypeEnum contentType; - public byte[] content; + public ByteBuffer content; // Constructors @@ -63,20 +63,13 @@ public class Content throw new IllegalArgumentException("Content cannot be empty for a ref type."); } this.contentType = contentType; - this.content = content; + this.content = ByteBuffer.allocate(content.length); + this.content.put(content); } - public Content(ContentTypeEnum contentType, String content) + public Content(ContentTypeEnum contentType, String contentStr) { - if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE) - { - if (content == null) - throw new IllegalArgumentException("Content cannot be null for a ref type."); - if (content.length() == 0) - throw new IllegalArgumentException("Content cannot be empty for a ref type."); - } - this.contentType = contentType; - this.content = content.getBytes(); + this(contentType, contentStr.getBytes()); } public Content(ContentTypeEnum contentType, ByteBuffer content) @@ -89,18 +82,26 @@ public class Content throw new IllegalArgumentException("Content cannot be empty for a ref type."); } this.contentType = contentType; - this.content = content.array(); + this.content = content; } // Get functions public ContentTypeEnum getContentType() { return contentType; } - public byte[] getContent() { return content; } + public ByteBuffer getContent() { return content; } + + public byte[] getContentAsByteArray() + { + byte[] ba = new byte[content.remaining()]; + content.get(ba); + return ba; + } + public String getContentAsString() { if (content == null) return null; - return new String(content); + return new String(getContentAsByteArray()); } // Wire functions @@ -109,18 +110,22 @@ public class Content { if (content == null) return 1 + 4; - return 1 + 4 + content.length; + return 1 + 4 + content.remaining(); } public void writePayload(ByteBuffer buffer) { EncodingUtils.writeUnsignedByte(buffer, contentType.toByte()); - EncodingUtils.writeLongStringBytes(buffer, content); + EncodingUtils.writeUnsignedInteger(buffer, content.remaining()); + buffer.put(content); } public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException { contentType = ContentTypeEnum.toContentEnum(buffer.get()); - content = EncodingUtils.readLongstr(buffer); + int length = buffer.getInt(); + content = buffer.slice(); + buffer.skip(length); + content.limit(length); } } |
