diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-08 20:01:50 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-08 20:01:50 +0000 |
| commit | c2055dbbf7d8ecd4a53d82b95dc2f666dbbb5fbc (patch) | |
| tree | f0a3253494f7f53e6b7ed5138191280514ce4af1 /java/common | |
| parent | 0fe65867ec72e6e3ff10a1ac817a734bd416b3a6 (diff) | |
| download | qpid-python-c2055dbbf7d8ecd4a53d82b95dc2f666dbbb5fbc.tar.gz | |
Completed first version of new Content class and the two new framing classes AMQRequest and AMQResponse. Removed old Basic* and Content* classes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@494181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
13 files changed, 117 insertions, 1251 deletions
diff --git a/java/common/pom.xml b/java/common/pom.xml index 952131a74c..dc0db7f954 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -36,12 +36,11 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> <gentools.home>${topDirectoryLocation}/../gentools</gentools.home> - <generated.path>${project.build.directory}/generated-sources/xsl</generated.path> + <generated.path>${project.build.directory}/generated-sources/gentools</generated.path> <generated.package>org/apache/qpid/framing</generated.package> <generated.dir>${generated.path}/${generated.package}</generated.dir> <generated.timestamp>${generated.dir}/timestamp</generated.timestamp> <specs.dir>${topDirectoryLocation}/../specs</specs.dir> - <cluster.asl>${basedir}/src/main/xsl/cluster.asl</cluster.asl> </properties> <build> @@ -55,14 +54,7 @@ <phase>generate-sources</phase> <configuration> <tasks> - <ant antfile="protocol-version.xml"> - <property name="gentools.home" value="${gentools.home}"/> - <property name="generated.dir" value="${generated.dir}"/> - <property name="generated.timestamp" value="${generated.timestamp}"/> - <property name="xml.spec.dir" value="${specs.dir}"/> - <property name="xml.spec.deps" value="amqp.0-9.xml cluster.0-9.xml amqp-nogen.0-9.xml"/> - <property name="xml.spec.list" value="${specs.dir}/amqp.0-9.xml ${specs.dir}/cluster.0-9.xml ${specs.dir}/amqp-nogen.0-9.xml"/> - </ant> + <ant antfile="protocol-version.xml" /> </tasks> <sourceRoot>${generated.path}</sourceRoot> </configuration> diff --git a/java/common/protocol-version.xml b/java/common/protocol-version.xml index d318f07901..ed56e87703 100644 --- a/java/common/protocol-version.xml +++ b/java/common/protocol-version.xml @@ -19,11 +19,24 @@ - --> <project name="Qpid Common Protocol Versions" default="generate"> + <property name="topDirectoryLocation" location=".." /> + <property name="project.build.directory" location="target" /> + <property name="gentools.home" location="${topDirectoryLocation}/../gentools" /> + <property name="generated.path" location="${project.build.directory}/generated-sources/gentools" /> + <property name="generated.package" value="org/apache/qpid/framing" /> + <property name="generated.dir" location="${generated.path}/${generated.package}" /> + <property name="generated.timestamp" location="${generated.dir}/timestamp" /> + <property name="xml.spec.dir" location="${topDirectoryLocation}/../specs" /> + <property name="xml.spec.deps" value="amqp.0-9.xml cluster.0-9.xml amqp-nogen.0-9.xml" /> + <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-9.xml ${xml.spec.dir}/cluster.0-9.xml ${xml.spec.dir}/amqp-nogen.0-9.xml" /> <target name="generate" depends="compile_generator,check_generate_deps" unless="generation.notRequired"> <mkdir dir="${generated.dir}"/> - <java classname="org.apache.qpid.gentools.Main" fork="true" dir="${gentools.home}/src"> + <java classname="org.apache.qpid.gentools.Main" fork="true" dir="${gentools.home}/src" failonerror="true"> <arg line="-j -o ${generated.dir} -t ${gentools.home}/templ.java ${xml.spec.list}" /> + <classpath> + <pathelement path="${gentools.home}/src" /> + </classpath> </java> <touch file="${generated.timestamp}" /> </target> @@ -39,5 +52,9 @@ </target> <target name="precompile" depends="generate"/> + + <target name="clean"> + <delete dir="${generated.path}" /> + </target> </project> diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 2a999fe130..c9b683cc36 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -37,8 +37,6 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance()); - _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance()); - _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance()); _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java deleted file mode 100644 index fc80d93f82..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ /dev/null @@ -1,624 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - -public class BasicContentHeaderProperties implements ContentHeaderProperties -{ - private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class); - - /** - * We store the encoded form when we decode the content header so that if we need to - * write it out without modifying it we can do so without incurring the expense of - * reencoding it - */ - private byte[] _encodedForm; - - /** - * Flag indicating whether the entire content header has been decoded yet - */ - private boolean _decoded = true; - - /** - * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker - * for routing in some cases so we can decode that separately. - */ - private boolean _decodedHeaders = true; - - /** - * We have some optimisations for partial decoding for maximum performance. The content type is used by all - * clients to determine the message type - */ - private boolean _decodedContentType = true; - - private String _contentType; - - private String _encoding; - - private FieldTable _headers; - - private JMSPropertyFieldTable _jmsHeaders; - - private byte _deliveryMode; - - private byte _priority; - - private String _correlationId; - - private String _replyTo; - - private long _expiration; - - private String _messageId; - - private long _timestamp; - - private String _type; - - private String _userId; - - private String _appId; - - private String _clusterId; - - private int _propertyFlags = 0; - - public BasicContentHeaderProperties() - { - } - - public int getPropertyListSize() - { - if (_encodedForm != null) - { - return _encodedForm.length; - } - else - { - int size = 0; - - if ((_propertyFlags & (1 << 15)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_contentType); - } - if ((_propertyFlags & (1 << 14)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_encoding); - } - if ((_propertyFlags & (1 << 13)) > 0) - { - size += EncodingUtils.encodedFieldTableLength(_headers); - } - if ((_propertyFlags & (1 << 12)) > 0) - { - size += 1; - } - if ((_propertyFlags & (1 << 11)) > 0) - { - size += 1; - } - if ((_propertyFlags & (1 << 10)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_correlationId); - } - if ((_propertyFlags & (1 << 9)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_replyTo); - } - if ((_propertyFlags & (1 << 8)) > 0) - { - size += EncodingUtils.encodedShortStringLength(String.valueOf(_expiration)); - } - if ((_propertyFlags & (1 << 7)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_messageId); - } - if ((_propertyFlags & (1 << 6)) > 0) - { - size += 8; - } - if ((_propertyFlags & (1 << 5)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_type); - } - if ((_propertyFlags & (1 << 4)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_userId); - } - if ((_propertyFlags & (1 << 3)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_appId); - } - if ((_propertyFlags & (1 << 2)) > 0) - { - size += EncodingUtils.encodedShortStringLength(_clusterId); - } - return size; - } - } - - private void clearEncodedForm() - { - if (!_decoded && _encodedForm != null) - { - //decode(); - } - _encodedForm = null; - } - - public void setPropertyFlags(int propertyFlags) - { - clearEncodedForm(); - _propertyFlags = propertyFlags; - } - - public int getPropertyFlags() - { - return _propertyFlags; - } - - public void writePropertyListPayload(ByteBuffer buffer) - { - if (_encodedForm != null) - { - buffer.put(_encodedForm); - } - else - { - if ((_propertyFlags & (1 << 15)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _contentType); - } - if ((_propertyFlags & (1 << 14)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _encoding); - } - if ((_propertyFlags & (1 << 13)) > 0) - { - EncodingUtils.writeFieldTableBytes(buffer, _headers); - } - if ((_propertyFlags & (1 << 12)) > 0) - { - buffer.put(_deliveryMode); - } - if ((_propertyFlags & (1 << 11)) > 0) - { - buffer.put(_priority); - } - if ((_propertyFlags & (1 << 10)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _correlationId); - } - if ((_propertyFlags & (1 << 9)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _replyTo); - } - if ((_propertyFlags & (1 << 8)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); - } - if ((_propertyFlags & (1 << 7)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _messageId); - } - if ((_propertyFlags & (1 << 6)) > 0) - { - EncodingUtils.writeTimestamp(buffer, _timestamp); - } - if ((_propertyFlags & (1 << 5)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _type); - } - if ((_propertyFlags & (1 << 4)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _userId); - } - if ((_propertyFlags & (1 << 3)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _appId); - } - if ((_propertyFlags & (1 << 2)) > 0) - { - EncodingUtils.writeShortStringBytes(buffer, _clusterId); - } - } - } - - public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException - { - _propertyFlags = propertyFlags; - - if (_logger.isDebugEnabled()) - { - _logger.debug("Property flags: " + _propertyFlags); - } - decode(buffer); - /*_encodedForm = new byte[size]; - buffer.get(_encodedForm, 0, size); - _decoded = false; - _decodedHeaders = false; - _decodedContentType = false;*/ - } - - private void decode(ByteBuffer buffer) - { - //ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - int pos = buffer.position(); - try - { - if ((_propertyFlags & (1 << 15)) > 0) - { - _contentType = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 14)) > 0) - { - _encoding = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 13)) > 0) - { - _headers = EncodingUtils.readFieldTable(buffer); - setJMSHeaders(); - } - if ((_propertyFlags & (1 << 12)) > 0) - { - _deliveryMode = buffer.get(); - } - if ((_propertyFlags & (1 << 11)) > 0) - { - _priority = buffer.get(); - } - if ((_propertyFlags & (1 << 10)) > 0) - { - _correlationId = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 9)) > 0) - { - _replyTo = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 8)) > 0) - { - _expiration = Long.parseLong(EncodingUtils.readShortString(buffer)); - } - if ((_propertyFlags & (1 << 7)) > 0) - { - _messageId = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 6)) > 0) - { - _timestamp = EncodingUtils.readTimestamp(buffer); - } - if ((_propertyFlags & (1 << 5)) > 0) - { - _type = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 4)) > 0) - { - _userId = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 3)) > 0) - { - _appId = EncodingUtils.readShortString(buffer); - } - if ((_propertyFlags & (1 << 2)) > 0) - { - _clusterId = EncodingUtils.readShortString(buffer); - } - } - catch (AMQFrameDecodingException e) - { - throw new RuntimeException("Error in content header data: " + e); - } - - final int endPos = buffer.position(); - buffer.position(pos); - final int len = endPos - pos; - _encodedForm = new byte[len]; - final int limit = buffer.limit(); - buffer.limit(endPos); - buffer.get(_encodedForm, 0, len); - buffer.limit(limit); - buffer.position(endPos); - _decoded = true; - } - - - private void decodeUpToHeaders() - { - ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - try - { - if ((_propertyFlags & (1 << 15)) > 0) - { - byte length = buffer.get(); - buffer.skip(length); - } - if ((_propertyFlags & (1 << 14)) > 0) - { - byte length = buffer.get(); - buffer.skip(length); - } - if ((_propertyFlags & (1 << 13)) > 0) - { - _headers = EncodingUtils.readFieldTable(buffer); - setJMSHeaders(); - - } - _decodedHeaders = true; - } - catch (AMQFrameDecodingException e) - { - throw new RuntimeException("Error in content header data: " + e); - } - } - - private void decodeUpToContentType() - { - ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - - if ((_propertyFlags & (1 << 15)) > 0) - { - _contentType = EncodingUtils.readShortString(buffer); - } - - _decodedContentType = true; - } - - private void decodeIfNecessary() - { - if (!_decoded) - { - //decode(); - } - } - - private void decodeHeadersIfNecessary() - { - if (!_decoded && !_decodedHeaders) - { - decodeUpToHeaders(); - } - } - - private void decodeContentTypeIfNecessary() - { - if (!_decoded && !_decodedContentType) - { - decodeUpToContentType(); - } - } - - public String getContentType() - { - decodeContentTypeIfNecessary(); - return _contentType; - } - - public void setContentType(String contentType) - { - clearEncodedForm(); - _propertyFlags |= (1 << 15); - _contentType = contentType; - } - - public String getEncoding() - { - decodeIfNecessary(); - return _encoding; - } - - public void setEncoding(String encoding) - { - clearEncodedForm(); - _propertyFlags |= (1 << 14); - _encoding = encoding; - } - - public FieldTable getHeaders() - { - decodeHeadersIfNecessary(); - - if (_headers == null) - { - setHeaders(FieldTableFactory.newFieldTable()); - } - - return _headers; - } - - public void setHeaders(FieldTable headers) - { - clearEncodedForm(); - _propertyFlags |= (1 << 13); - _headers = headers; - setJMSHeaders(); - } - - private void setJMSHeaders() - { - if (_jmsHeaders == null) - { - _jmsHeaders = new JMSPropertyFieldTable(_headers); - } - else - { - _jmsHeaders.setFieldTable(_headers); - } - } - - public JMSPropertyFieldTable getJMSHeaders() - { - //This will ensure we have a blank header - getHeaders(); - return _jmsHeaders; - } - - public byte getDeliveryMode() - { - decodeIfNecessary(); - return _deliveryMode; - } - - public void setDeliveryMode(byte deliveryMode) - { - clearEncodedForm(); - _propertyFlags |= (1 << 12); - _deliveryMode = deliveryMode; - } - - public byte getPriority() - { - decodeIfNecessary(); - return _priority; - } - - public void setPriority(byte priority) - { - clearEncodedForm(); - _propertyFlags |= (1 << 11); - _priority = priority; - } - - public String getCorrelationId() - { - decodeIfNecessary(); - return _correlationId; - } - - public void setCorrelationId(String correlationId) - { - clearEncodedForm(); - _propertyFlags |= (1 << 10); - _correlationId = correlationId; - } - - public String getReplyTo() - { - decodeIfNecessary(); - return _replyTo; - } - - public void setReplyTo(String replyTo) - { - clearEncodedForm(); - _propertyFlags |= (1 << 9); - _replyTo = replyTo; - } - - public long getExpiration() - { - decodeIfNecessary(); - return _expiration; - } - - public void setExpiration(long expiration) - { - clearEncodedForm(); - _propertyFlags |= (1 << 8); - _expiration = expiration; - } - - - public String getMessageId() - { - decodeIfNecessary(); - return _messageId; - } - - public void setMessageId(String messageId) - { - clearEncodedForm(); - _propertyFlags |= (1 << 7); - _messageId = messageId; - } - - public long getTimestamp() - { - decodeIfNecessary(); - return _timestamp; - } - - public void setTimestamp(long timestamp) - { - clearEncodedForm(); - _propertyFlags |= (1 << 6); - _timestamp = timestamp; - } - - public String getType() - { - decodeIfNecessary(); - return _type; - } - - public void setType(String type) - { - clearEncodedForm(); - _propertyFlags |= (1 << 5); - _type = type; - } - - public String getUserId() - { - decodeIfNecessary(); - return _userId; - } - - public void setUserId(String userId) - { - clearEncodedForm(); - _propertyFlags |= (1 << 4); - _userId = userId; - } - - public String getAppId() - { - decodeIfNecessary(); - return _appId; - } - - public void setAppId(String appId) - { - clearEncodedForm(); - _propertyFlags |= (1 << 3); - _appId = appId; - } - - public String getClusterId() - { - decodeIfNecessary(); - return _clusterId; - } - - public void setClusterId(String clusterId) - { - clearEncodedForm(); - _propertyFlags |= (1 << 2); - _clusterId = clusterId; - } - - public String toString() - { - return "reply-to = " + _replyTo + " propertyFlags = " + _propertyFlags; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/Content.java b/java/common/src/main/java/org/apache/qpid/framing/Content.java index e5feeec2a4..0ed13094c7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/Content.java +++ b/java/common/src/main/java/org/apache/qpid/framing/Content.java @@ -20,7 +20,94 @@ */ package org.apache.qpid.framing; -public interface Content +import org.apache.mina.common.ByteBuffer; + +public class Content { - // TODO: New Content class required for AMQP 0-9. + enum ContentTypeEnum + { + CONTENT_TYPE_INLINE((byte)0), CONTENT_TYPE_REFERENCE((byte)1); + private byte type; + ContentTypeEnum(byte type) { this.type = type; } + public byte toByte() { return type; } + public static ContentTypeEnum toContentEnum(byte b) + { + switch (b) + { + case 0: return CONTENT_TYPE_INLINE; + case 1: return CONTENT_TYPE_REFERENCE; + default: throw new IllegalArgumentException("Illegal value " + b + + ", not represented in ContentTypeEnum."); + } + } + } + + public ContentTypeEnum contentType; + public byte[] content; + + // Constructors + + public Content() + { + contentType = ContentTypeEnum.CONTENT_TYPE_INLINE; // default + content = null; + } + + public Content(ContentTypeEnum contentType, byte[] content) + { + if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE) + { + if (content == null) + throw new IllegalArgumentException("Content cannot be null for a ref type."); + if (content.length == 0) + throw new IllegalArgumentException("Content cannot be empty for a ref type."); + } + this.contentType = contentType; + this.content = content; + } + + public Content(ContentTypeEnum contentType, String content) + { + if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE) + { + if (content == null) + throw new IllegalArgumentException("Content cannot be null for a ref type."); + if (content.length() == 0) + throw new IllegalArgumentException("Content cannot be empty for a ref type."); + } + this.contentType = contentType; + this.content = content.getBytes(); + } + + // Get functions + + public ContentTypeEnum getContentType() { return contentType; } + public byte[] getContent() { return content; } + public String getContentAsString() + { + if (content == null) + return null; + return new String(content); + } + + // Wire functions + + public long getEncodedSize() + { + if (content == null) + return 1 + 4; + return 1 + 4 + content.length; + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedByte(buffer, contentType.toByte()); + EncodingUtils.writeLongStringBytes(buffer, content); + } + + public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + contentType = ContentTypeEnum.toContentEnum(buffer.get()); + content = EncodingUtils.readLongstr(buffer); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java deleted file mode 100644 index 3a2e4b3b3c..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -public class ContentBody extends AMQBody -{ - public static final byte TYPE = 3; - - public ByteBuffer payload; - - protected byte getFrameType() - { - return TYPE; - } - - public int getSize() - { - return (payload == null ? 0 : payload.limit()); - } - - public void writePayload(ByteBuffer buffer) - { - if (payload != null) - { - ByteBuffer copy = payload.duplicate(); - buffer.put(copy.rewind()); - } - } - - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException - { - if (size > 0) - { - payload = buffer.slice(); - payload.limit((int) size); - buffer.skip((int) size); - } - - } - - public void reduceBufferToFit() - { - if (payload != null && (payload.remaining() < payload.capacity() / 2)) - { - int size = payload.limit(); - ByteBuffer newPayload = ByteBuffer.allocate(size); - - newPayload.put(payload); - newPayload.flip(); - - //reduce reference count on payload - payload.release(); - - payload = newPayload; - } - } - - public static AMQFrame createAMQFrame(int channelId, ContentBody body) - { - final AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = body; - return frame; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java deleted file mode 100644 index 22af331ab7..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - -public class ContentBodyFactory implements BodyFactory -{ - private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); - - private static final ContentBodyFactory _instance = new ContentBodyFactory(); - - public static ContentBodyFactory getInstance() - { - return _instance; - } - - private ContentBodyFactory() - { - _log.debug("Creating content body factory"); - } - - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException - { - return new ContentBody(); - } -} - diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java deleted file mode 100644 index 4ee36ee831..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -public class ContentHeaderBody extends AMQBody -{ - public static final byte TYPE = 2; - - public int classId; - - public int weight; - - /** unsigned long but java can't handle that anyway when allocating byte array */ - public long bodySize; - - /** must never be null */ - public ContentHeaderProperties properties; - - public ContentHeaderBody() - { - } - - public ContentHeaderBody(ContentHeaderProperties props, int classId) - { - properties = props; - this.classId = classId; - } - - public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize) - { - this(props, classId); - this.weight = weight; - this.bodySize = bodySize; - } - - protected byte getFrameType() - { - return TYPE; - } - - protected void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException - { - classId = buffer.getUnsignedShort(); - weight = buffer.getUnsignedShort(); - bodySize = buffer.getLong(); - int propertyFlags = buffer.getUnsignedShort(); - ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); - properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); - } - - /** - * Helper method that is used currently by the persistence layer (by BDB at the moment). - * @param buffer - * @param size - * @return - * @throws AMQFrameDecodingException - */ - public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException - { - ContentHeaderBody body = new ContentHeaderBody(); - body.populateFromBuffer(buffer, size); - return body; - } - - public int getSize() - { - return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); - } - - public void writePayload(ByteBuffer buffer) - { - EncodingUtils.writeUnsignedShort(buffer, classId); - EncodingUtils.writeUnsignedShort(buffer, weight); - buffer.putLong(bodySize); - EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); - properties.writePropertyListPayload(buffer); - } - - public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, - long bodySize) - { - final AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize); - return frame; - } - - public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) - { - final AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = body; - return frame; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java deleted file mode 100644 index ddf63f8aa3..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - -public class ContentHeaderBodyFactory implements BodyFactory -{ - private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); - - private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory(); - - public static ContentHeaderBodyFactory getInstance() - { - return _instance; - } - - private ContentHeaderBodyFactory() - { - _log.debug("Creating content header body factory"); - } - - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException - { - // all content headers are the same - it is only the properties that differ. - // the content header body further delegates construction of properties - return new ContentHeaderBody(); - } - - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java deleted file mode 100644 index 88bdefca88..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -/** - * There will be an implementation of this interface for each content type. All content types have associated - * header properties and this provides a way to encode and decode them. - */ -public interface ContentHeaderProperties -{ - /** - * Writes the property list to the buffer, in a suitably encoded form. - * @param buffer The buffer to write to - */ - void writePropertyListPayload(ByteBuffer buffer); - - /** - * Populates the properties from buffer. - * @param buffer The buffer to read from. - * @param propertyFlags he property flags. - * @throws AMQFrameDecodingException when the buffer does not contain valid data - */ - void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException; - - /** - * @return the size of the encoded property list in bytes. - */ - int getPropertyListSize(); - - /** - * Gets the property flags. Property flags indicate which properties are set in the list. The - * position and meaning of each flag is defined in the protocol specification for the particular - * content type with which these properties are associated. - * @return flags - */ - int getPropertyFlags(); -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java deleted file mode 100644 index e81adc791d..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -public class ContentHeaderPropertiesFactory -{ - private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); - - public static ContentHeaderPropertiesFactory getInstance() - { - return _instance; - } - - private ContentHeaderPropertiesFactory() - { - } - - public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, - ByteBuffer buffer, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException - { - ContentHeaderProperties properties; - // AMQP version change: "Hardwired" version to major=0, minor=9 - // TODO: Change so that the actual version is obtained from - // the ProtocolInitiation object for this session. - if (classId == BasicConsumeBody.getClazz((byte)0, (byte)9)) - { - properties = new BasicContentHeaderProperties(); - } - else - { - throw new AMQFrameDecodingException("Unsupport content header class id: " + classId); - } - properties.populatePropertiesFromBuffer(buffer, propertyFlags, size); - return properties; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index ebda2c5d2b..1c3faf49f8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -97,13 +97,11 @@ public class EncodingUtils return (int) table.getEncodedSize() + 4; } } - - public static int encodedContentLength(Content table) + + public static int encodedContentLength(Content content) { - // TODO: New Content class required for AMQP 0-9. - return 0; + return (int)content.getEncodedSize(); } - public static void writeShortStringBytes(ByteBuffer buffer, String s) { @@ -240,7 +238,7 @@ public class EncodingUtils public static void writeContentBytes(ByteBuffer buffer, Content content) { - // TODO: New Content class required for AMQP 0-9. + content.writePayload(buffer); } public static void writeBooleans(ByteBuffer buffer, boolean[] values) @@ -309,8 +307,10 @@ public class EncodingUtils public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException { - // TODO: New Content class required for AMQP 0-9. - return null; + long length = buffer.getUnsignedInt(); + Content content = new Content(); + content.populateFromBuffer(buffer, length); + return content; } public static String readShortString(ByteBuffer buffer) diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java deleted file mode 100644 index ffbdf730a9..0000000000 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -import java.util.HashMap; - -import junit.framework.TestCase; - - -public class BasicContentHeaderPropertiesTest extends TestCase -{ - - BasicContentHeaderProperties _testProperties; - FieldTable _testTable; - String _testString = "This is a test string"; - int _testint = 666; - - /** - * Currently only test setting/getting String, int and boolean props - */ - public BasicContentHeaderPropertiesTest() - { - _testProperties = new BasicContentHeaderProperties(); - } - - public void setUp() - { - _testTable = new FieldTable(); - _testTable.setString("TestString", _testString); - _testTable.setInteger("Testint", _testint); - _testProperties = new BasicContentHeaderProperties(); - _testProperties.setHeaders(_testTable); - } - - public void testGetPropertyListSize() - { - //needs a better test but at least we're exercising the code ! - // FT length is encoded in an int - int expectedSize = EncodingUtils.encodedIntegerLength(); - - expectedSize += EncodingUtils.encodedShortStringLength("TestInt"); - // 1 is for the Encoding Letter. here an 'i' - expectedSize += 1 + EncodingUtils.encodedIntegerLength(); - - expectedSize += EncodingUtils.encodedShortStringLength("TestString"); - // 1 is for the Encoding Letter. here an 'S' - expectedSize += 1 + EncodingUtils.encodedLongStringLength(_testString); - - - int size = _testProperties.getPropertyListSize(); - - assertEquals(expectedSize, size); - } - - public void testGetSetPropertyFlags() - { - _testProperties.setPropertyFlags(99); - assertEquals(99, _testProperties.getPropertyFlags()); - } - - public void testWritePropertyListPayload() - { - ByteBuffer buf = ByteBuffer.allocate(300); - _testProperties.writePropertyListPayload(buf); - } - - public void testPopulatePropertiesFromBuffer() throws Exception - { - ByteBuffer buf = ByteBuffer.allocate(300); - _testProperties.populatePropertiesFromBuffer(buf, 99, 99); - } - - public void testSetGetContentType() - { - String contentType = "contentType"; - _testProperties.setContentType(contentType); - assertEquals(contentType, _testProperties.getContentType()); - } - - public void testSetGetEncoding() - { - String encoding = "encoding"; - _testProperties.setEncoding(encoding); - assertEquals(encoding, _testProperties.getEncoding()); - } - - public void testSetGetHeaders() - { - _testProperties.setHeaders(_testTable); - assertEquals(_testTable, _testProperties.getHeaders()); - } - - public void testSetGetDeliveryMode() - { - byte deliveryMode = 1; - _testProperties.setDeliveryMode(deliveryMode); - assertEquals(deliveryMode, _testProperties.getDeliveryMode()); - } - - public void testSetGetPriority() - { - byte priority = 1; - _testProperties.setPriority(priority); - assertEquals(priority, _testProperties.getPriority()); - } - - public void testSetGetCorrelationId() - { - String correlationId = "correlationId"; - _testProperties.setCorrelationId(correlationId); - assertEquals(correlationId, _testProperties.getCorrelationId()); - } - - public void testSetGetReplyTo() - { - String replyTo = "replyTo"; - _testProperties.setReplyTo(replyTo); - assertEquals(replyTo, _testProperties.getReplyTo()); - } - - public void testSetGetExpiration() - { - long expiration = 999999999; - _testProperties.setExpiration(expiration); - assertEquals(expiration, _testProperties.getExpiration()); - } - - public void testSetGetMessageId() - { - String messageId = "messageId"; - _testProperties.setMessageId(messageId); - assertEquals(messageId, _testProperties.getMessageId()); - } - - public void testSetGetTimestamp() - { - long timestamp = 999999999; - _testProperties.setTimestamp(timestamp); - assertEquals(timestamp, _testProperties.getTimestamp()); - } - - public void testSetGetType() - { - String type = "type"; - _testProperties.setType(type); - assertEquals(type, _testProperties.getType()); - } - - public void testSetGetUserId() - { - String userId = "userId"; - _testProperties.setUserId(userId); - assertEquals(userId, _testProperties.getUserId()); - } - - public void testSetGetAppId() - { - String appId = "appId"; - _testProperties.setAppId(appId); - assertEquals(appId, _testProperties.getAppId()); - } - - public void testSetGetClusterId() - { - String clusterId = "clusterId"; - _testProperties.setClusterId(clusterId); - assertEquals(clusterId, _testProperties.getClusterId()); - } - -} |
