From 2bdb82202071953aaaeadced1cbc94c2dec9a0a5 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 26 Oct 2009 20:37:36 +0000 Subject: Added AMQP 0-9-1 support git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@829944 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/handler/AccessRequestHandler.java | 60 ++- .../handler/BasicRecoverSyncMethodHandler.java | 50 ++- .../qpid/server/handler/ChannelOpenHandler.java | 25 ++ .../handler/ConnectionOpenMethodHandler.java | 7 +- .../server/handler/ServerMethodDispatcherImpl.java | 8 + .../handler/ServerMethodDispatcherImpl_0_91.java | 168 ++++++++ .../output/ProtocolOutputConverterRegistry.java | 1 + .../amqp0_9_1/ProtocolOutputConverterImpl.java | 383 +++++++++++++++++ .../java/org/apache/qpid/client/AMQConnection.java | 2 +- .../qpid/client/AMQConnectionDelegate_9_1.java | 32 ++ .../org/apache/qpid/client/AMQSession_0_8.java | 6 + .../client/handler/ClientMethodDispatcherImpl.java | 10 + .../handler/ClientMethodDispatcherImpl_0_91.java | 158 +++++++ .../qpid/client/protocol/AMQProtocolHandler.java | 8 +- java/common/build.xml | 4 +- java/common/protocol-version.xml | 4 +- .../org/apache/qpid/framing/AMQMethodBodyImpl.java | 162 +++++++ .../apache/qpid/framing/ProtocolInitiation.java | 23 +- .../qpid/framing/amqp_0_9/AMQMethodBody_0_9.java | 174 +------- .../qpid/framing/amqp_0_91/AMQMethodBody_0_91.java | 37 ++ .../framing/amqp_0_91/MethodConverter_0_91.java | 132 ++++++ .../qpid/framing/amqp_8_0/AMQMethodBody_8_0.java | 169 -------- specs/amqp0-9-1.stripped.xml | 477 +++++++++++++++++++++ 23 files changed, 1702 insertions(+), 398 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java create mode 100755 java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java create mode 100644 specs/amqp0-9-1.stripped.xml diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java index e64eaeae76..d587ef0c16 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java @@ -1,31 +1,34 @@ package org.apache.qpid.server.handler; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; +import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; /** * @author Apache Software Foundation @@ -54,7 +57,20 @@ public class AccessRequestHandler implements StateAwareMethodListener maxBodySize ? maxBodySize : bodySize; + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity); + + int writtenSize = 0; + + + writtenSize += message.getContent(buf, writtenSize); + buf.flip(); + AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf); + + CompositeAMQBodyBlock + compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + writeFrame(compositeBlock); + + while(writtenSize < bodySize) + { + buf = java.nio.ByteBuffer.allocate(capacity); + + writtenSize += message.getContent(buf, writtenSize); + buf.flip(); + writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf))); + } + } + } + + private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) + { + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + contentHeaderBody); + return contentHeader; + } + + + public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException + { + AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); + writeMessageDelivery(entry, channelId, deliver); + } + + + private AMQBody createEncodedDeliverBody(QueueEntry entry, + final long deliveryTag, + final AMQShortString consumerTag) + throws AMQException + { + + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + final AMQBody returnBlock = new AMQBody() + { + + public AMQBody _underlyingBody; + + public AMQBody createAMQBody() + { + return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, + deliveryTag, + isRedelivered, + exchangeName, + routingKey); + + + + + + } + + public byte getFrameType() + { + return AMQMethodBody.TYPE; + } + + public int getSize() + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + return _underlyingBody.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + if(_underlyingBody == null) + { + _underlyingBody = createAMQBody(); + } + _underlyingBody.writePayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + throws AMQException + { + throw new AMQException("This block should never be dispatched!"); + } + }; + return returnBlock; + } + + private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) + throws AMQException + { + final AMQShortString exchangeName; + final AMQShortString routingKey; + + if(entry.getMessage() instanceof AMQMessage) + { + final AMQMessage message = (AMQMessage) entry.getMessage(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); + } + else + { + MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); + DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class); + exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); + routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); + } + + final boolean isRedelivered = entry.isRedelivered(); + + BasicGetOkBody getOkBody = + METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, + isRedelivered, + exchangeName, + routingKey, + queueSize); + + return getOkBody; + } + + public byte getProtocolMinorVersion() + { + return getProtocolSession().getProtocolMinorVersion(); + } + + public byte getProtocolMajorVersion() + { + return getProtocolSession().getProtocolMajorVersion(); + } + + private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, + int replyCode, + AMQShortString replyText) throws AMQException + { + + BasicReturnBody basicReturnBody = + METHOD_REGISTRY.createBasicReturnBody(replyCode, + replyText, + messagePublishInfo.getExchange(), + messagePublishInfo.getRoutingKey()); + + + return basicReturnBody; + } + + public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) + throws AMQException + { + + AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); + + writeMessageDelivery(message, header, channelId, returnFrame); + } + + + public void writeFrame(AMQDataBlock block) + { + getProtocolSession().writeFrame(block); + } + + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + + BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); + writeFrame(basicCancelOkBody.generateFrame(channelId)); + + } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } + +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b57c834598..461f4d01c4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -308,7 +308,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; - private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this + private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME TGM, shouldn't need this protected AMQConnectionDelegate _delegate; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java new file mode 100755 index 0000000000..1bb93f66a3 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + + +public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0 +{ + + public AMQConnectionDelegate_9_1(AMQConnection conn) + { + super(conn); + } + +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index bc1453beaf..862e23385a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -36,6 +36,7 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; @@ -186,6 +187,11 @@ public final class AMQSession_0_8 extends AMQSession - - + + diff --git a/java/common/protocol-version.xml b/java/common/protocol-version.xml index ee9db6049b..5435a0a582 100644 --- a/java/common/protocol-version.xml +++ b/java/common/protocol-version.xml @@ -27,8 +27,8 @@ - - + + diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index ad7f36f790..cd3d721065 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -93,4 +93,166 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody session.methodFrameReceived(channelId, this); } + public int getSize() + { + return 2 + 2 + getBodySize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + + protected byte readByte(ByteBuffer buffer) + { + return buffer.get(); + } + + protected AMQShortString readAMQShortString(ByteBuffer buffer) + { + return EncodingUtils.readAMQShortString(buffer); + } + + protected int getSizeOf(AMQShortString string) + { + return EncodingUtils.encodedShortStringLength(string); + } + + protected void writeByte(ByteBuffer buffer, byte b) + { + buffer.put(b); + } + + protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) + { + EncodingUtils.writeShortStringBytes(buffer, string); + } + + protected int readInt(ByteBuffer buffer) + { + return buffer.getInt(); + } + + protected void writeInt(ByteBuffer buffer, int i) + { + buffer.putInt(i); + } + + protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + return EncodingUtils.readFieldTable(buffer); + } + + protected int getSizeOf(FieldTable table) + { + return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeFieldTable(ByteBuffer buffer, FieldTable table) + { + EncodingUtils.writeFieldTableBytes(buffer, table); + } + + protected long readLong(ByteBuffer buffer) + { + return buffer.getLong(); + } + + protected void writeLong(ByteBuffer buffer, long l) + { + buffer.putLong(l); + } + + protected int getSizeOf(byte[] response) + { + return (response == null) ? 4 : response.length + 4; + } + + protected void writeBytes(ByteBuffer buffer, byte[] data) + { + EncodingUtils.writeBytes(buffer,data); + } + + protected byte[] readBytes(ByteBuffer buffer) + { + return EncodingUtils.readBytes(buffer); + } + + protected short readShort(ByteBuffer buffer) + { + return EncodingUtils.readShort(buffer); + } + + protected void writeShort(ByteBuffer buffer, short s) + { + EncodingUtils.writeShort(buffer, s); + } + + protected Content readContent(ByteBuffer buffer) + { + return null; //To change body of created methods use File | Settings | File Templates. + } + + protected int getSizeOf(Content body) + { + return 0; //To change body of created methods use File | Settings | File Templates. + } + + protected void writeContent(ByteBuffer buffer, Content body) + { + //To change body of created methods use File | Settings | File Templates. + } + + protected byte readBitfield(ByteBuffer buffer) + { + return readByte(buffer); //To change body of created methods use File | Settings | File Templates. + } + + protected int readUnsignedShort(ByteBuffer buffer) + { + return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeBitfield(ByteBuffer buffer, byte bitfield0) + { + buffer.put(bitfield0); + } + + protected void writeUnsignedShort(ByteBuffer buffer, int s) + { + EncodingUtils.writeUnsignedShort(buffer, s); + } + + protected long readUnsignedInteger(ByteBuffer buffer) + { + return buffer.getUnsignedInt(); + } + protected void writeUnsignedInteger(ByteBuffer buffer, long i) + { + EncodingUtils.writeUnsignedInteger(buffer, i); + } + + + protected short readUnsignedByte(ByteBuffer buffer) + { + return buffer.getUnsigned(); + } + + protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) + { + EncodingUtils.writeUnsignedByte(buffer, unsignedByte); + } + + protected long readTimestamp(ByteBuffer buffer) + { + return EncodingUtils.readTimestamp(buffer); + } + + protected void writeTimestamp(ByteBuffer buffer, long t) + { + EncodingUtils.writeTimestamp(buffer, t); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index cf8a866e47..d503ed7ea3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -54,7 +54,10 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData public ProtocolInitiation(ProtocolVersion pv) { - this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); + this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, + pv.equals(ProtocolVersion.v0_91) ? 0 : TCP_PROTOCOL_INSTANCE, + pv.equals(ProtocolVersion.v0_91) ? 9 : pv.getMajorVersion(), + pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion()); } public ProtocolInitiation(ByteBuffer in) @@ -124,7 +127,6 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData { /** * - * @param session the session * @param in input buffer * @return true if we have enough data to decode the PI frame fully, false if more * data is required @@ -162,13 +164,24 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + _protocolClass, null); } - if (_protocolInstance != TCP_PROTOCOL_INSTANCE) + + ProtocolVersion pv; + + // Hack for 0-9-1 which changed how the header was defined + if(_protocolInstance == 0 && _protocolMajor == 9 && _protocolMinor == 1) + { + pv = ProtocolVersion.v0_91; + + } + else if (_protocolInstance != TCP_PROTOCOL_INSTANCE) { throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " + _protocolInstance, null); } - - ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor); + else + { + pv = new ProtocolVersion(_protocolMajor, _protocolMinor); + } if (!pv.isSupported()) diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java index 948f5baaf6..8d51343507 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java @@ -21,14 +21,6 @@ package org.apache.qpid.framing.amqp_0_9; -import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.Content; - -import org.apache.mina.common.ByteBuffer; - public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl { @@ -40,170 +32,6 @@ public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMetho public byte getMinor() { return 9; - } - - public int getSize() - { - return 2 + 2 + getBodySize(); - } - - public void writePayload(ByteBuffer buffer) - { - EncodingUtils.writeUnsignedShort(buffer, getClazz()); - EncodingUtils.writeUnsignedShort(buffer, getMethod()); - writeMethodPayload(buffer); - } - - - protected byte readByte(ByteBuffer buffer) - { - return buffer.get(); - } - - protected AMQShortString readAMQShortString(ByteBuffer buffer) - { - return EncodingUtils.readAMQShortString(buffer); - } - - protected int getSizeOf(AMQShortString string) - { - return EncodingUtils.encodedShortStringLength(string); - } - - protected void writeByte(ByteBuffer buffer, byte b) - { - buffer.put(b); - } - - protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) - { - EncodingUtils.writeShortStringBytes(buffer, string); - } - - protected int readInt(ByteBuffer buffer) - { - return buffer.getInt(); - } - - protected void writeInt(ByteBuffer buffer, int i) - { - buffer.putInt(i); - } - - protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException - { - return EncodingUtils.readFieldTable(buffer); - } - - protected int getSizeOf(FieldTable table) - { - return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. - } - - protected void writeFieldTable(ByteBuffer buffer, FieldTable table) - { - EncodingUtils.writeFieldTableBytes(buffer, table); - } - - protected long readLong(ByteBuffer buffer) - { - return buffer.getLong(); - } - - protected void writeLong(ByteBuffer buffer, long l) - { - buffer.putLong(l); - } - - protected int getSizeOf(byte[] response) - { - return (response == null) ? 4 :response.length + 4; - } - - protected void writeBytes(ByteBuffer buffer, byte[] data) - { - EncodingUtils.writeBytes(buffer,data); - } - - protected byte[] readBytes(ByteBuffer buffer) - { - return EncodingUtils.readBytes(buffer); - } - - protected short readShort(ByteBuffer buffer) - { - return EncodingUtils.readShort(buffer); - } - - protected void writeShort(ByteBuffer buffer, short s) - { - EncodingUtils.writeShort(buffer, s); - } - - protected Content readContent(ByteBuffer buffer) - { - return null; //To change body of created methods use File | Settings | File Templates. - } - - protected int getSizeOf(Content body) - { - return 0; //To change body of created methods use File | Settings | File Templates. - } - - protected void writeContent(ByteBuffer buffer, Content body) - { - //To change body of created methods use File | Settings | File Templates. - } - - protected byte readBitfield(ByteBuffer buffer) - { - return readByte(buffer); //To change body of created methods use File | Settings | File Templates. - } - - protected int readUnsignedShort(ByteBuffer buffer) - { - return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. - } - - protected void writeBitfield(ByteBuffer buffer, byte bitfield0) - { - buffer.put(bitfield0); - } - - protected void writeUnsignedShort(ByteBuffer buffer, int s) - { - EncodingUtils.writeUnsignedShort(buffer, s); - } - - protected long readUnsignedInteger(ByteBuffer buffer) - { - return buffer.getUnsignedInt(); - } - protected void writeUnsignedInteger(ByteBuffer buffer, long i) - { - EncodingUtils.writeUnsignedInteger(buffer, i); - } - - - protected short readUnsignedByte(ByteBuffer buffer) - { - return buffer.getUnsigned(); - } - - protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) - { - EncodingUtils.writeUnsignedByte(buffer, unsignedByte); - } - - protected long readTimestamp(ByteBuffer buffer) - { - return EncodingUtils.readTimestamp(buffer); - } - - protected void writeTimestamp(ByteBuffer buffer, long t) - { - EncodingUtils.writeTimestamp(buffer, t); - } - + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java new file mode 100644 index 0000000000..60b8a7e1a6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_0_91; + +public abstract class AMQMethodBody_0_91 extends org.apache.qpid.framing.AMQMethodBodyImpl +{ + + public byte getMajor() + { + return 0; + } + + public byte getMinor() + { + return 91; + } + +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java new file mode 100644 index 0000000000..6e330574bc --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_0_91; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.*; + +public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_0_91() + { + super((byte)0,(byte)9); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + if(contentChunk instanceof ContentChunk_0_9) + { + return ((ContentChunk_0_9)contentChunk).toBody(); + } + else + { + return new ContentBody(contentChunk.getData()); + } + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk_0_9(contentBodyChunk); + + } + + public void configure() + { + + _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; + + } + + public AMQBody convertToBody(java.nio.ByteBuffer buf) + { + return new ContentBody(ByteBuffer.wrap(buf)); + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody publishBody = ((BasicPublishBody) methodBody); + + final AMQShortString exchange = publishBody.getExchange(); + final AMQShortString routingKey = publishBody.getRoutingKey(); + + return new MessagePublishInfoImpl(exchange, + publishBody.getImmediate(), + publishBody.getMandatory(), + routingKey); + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBodyImpl(0, + info.getExchange(), + info.getRoutingKey(), + info.isMandatory(), + info.isImmediate()) ; + + } + + private static class ContentChunk_0_9 implements ContentChunk + { + private final ContentBody _contentBodyChunk; + + public ContentChunk_0_9(final ContentBody contentBodyChunk) + { + _contentBodyChunk = contentBodyChunk; + } + + public int getSize() + { + return _contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return _contentBodyChunk.payload; + } + + public void reduceToFit() + { + _contentBodyChunk.reduceBufferToFit(); + } + + public AMQBody toBody() + { + return _contentBodyChunk; + } + } +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java index e9b4447140..35645854c0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java @@ -21,14 +21,6 @@ package org.apache.qpid.framing.amqp_8_0; -import org.apache.qpid.framing.EncodingUtils; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.Content; - -import org.apache.mina.common.ByteBuffer; - public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl { @@ -42,168 +34,7 @@ public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMetho return 0; } - public int getSize() - { - return 2 + 2 + getBodySize(); - } - - public void writePayload(ByteBuffer buffer) - { - EncodingUtils.writeUnsignedShort(buffer, getClazz()); - EncodingUtils.writeUnsignedShort(buffer, getMethod()); - writeMethodPayload(buffer); - } - - protected byte readByte(ByteBuffer buffer) - { - return buffer.get(); - } - - protected AMQShortString readAMQShortString(ByteBuffer buffer) - { - return EncodingUtils.readAMQShortString(buffer); - } - - protected int getSizeOf(AMQShortString string) - { - return EncodingUtils.encodedShortStringLength(string); - } - - protected void writeByte(ByteBuffer buffer, byte b) - { - buffer.put(b); - } - - protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) - { - EncodingUtils.writeShortStringBytes(buffer, string); - } - - protected int readInt(ByteBuffer buffer) - { - return buffer.getInt(); - } - - protected void writeInt(ByteBuffer buffer, int i) - { - buffer.putInt(i); - } - - protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException - { - return EncodingUtils.readFieldTable(buffer); - } - - protected int getSizeOf(FieldTable table) - { - return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. - } - - protected void writeFieldTable(ByteBuffer buffer, FieldTable table) - { - EncodingUtils.writeFieldTableBytes(buffer, table); - } - - protected long readLong(ByteBuffer buffer) - { - return buffer.getLong(); - } - - protected void writeLong(ByteBuffer buffer, long l) - { - buffer.putLong(l); - } - - protected int getSizeOf(byte[] response) - { - return (response == null) ? 4 : response.length + 4; - } - - protected void writeBytes(ByteBuffer buffer, byte[] data) - { - EncodingUtils.writeBytes(buffer,data); - } - - protected byte[] readBytes(ByteBuffer buffer) - { - return EncodingUtils.readBytes(buffer); - } - - protected short readShort(ByteBuffer buffer) - { - return EncodingUtils.readShort(buffer); - } - - protected void writeShort(ByteBuffer buffer, short s) - { - EncodingUtils.writeShort(buffer, s); - } - - protected Content readContent(ByteBuffer buffer) - { - return null; //To change body of created methods use File | Settings | File Templates. - } - - protected int getSizeOf(Content body) - { - return 0; //To change body of created methods use File | Settings | File Templates. - } - - protected void writeContent(ByteBuffer buffer, Content body) - { - //To change body of created methods use File | Settings | File Templates. - } - - protected byte readBitfield(ByteBuffer buffer) - { - return readByte(buffer); //To change body of created methods use File | Settings | File Templates. - } - - protected int readUnsignedShort(ByteBuffer buffer) - { - return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. - } - - protected void writeBitfield(ByteBuffer buffer, byte bitfield0) - { - buffer.put(bitfield0); - } - - protected void writeUnsignedShort(ByteBuffer buffer, int s) - { - EncodingUtils.writeUnsignedShort(buffer, s); - } - - protected long readUnsignedInteger(ByteBuffer buffer) - { - return buffer.getUnsignedInt(); - } - protected void writeUnsignedInteger(ByteBuffer buffer, long i) - { - EncodingUtils.writeUnsignedInteger(buffer, i); - } - - - protected short readUnsignedByte(ByteBuffer buffer) - { - return buffer.getUnsigned(); - } - - protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) - { - EncodingUtils.writeUnsignedByte(buffer, unsignedByte); - } - - protected long readTimestamp(ByteBuffer buffer) - { - return EncodingUtils.readTimestamp(buffer); - } - - protected void writeTimestamp(ByteBuffer buffer, long t) - { - EncodingUtils.writeTimestamp(buffer, t); - } } diff --git a/specs/amqp0-9-1.stripped.xml b/specs/amqp0-9-1.stripped.xml new file mode 100644 index 0000000000..ec55c8dd7a --- /dev/null +++ b/specs/amqp0-9-1.stripped.xml @@ -0,0 +1,477 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + -- cgit v1.2.1