From a22f3f594d6eee7d610fb4f140e18cddd7c880f6 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Wed, 14 Feb 2007 20:02:03 +0000 Subject: First backmerge from trunk to 0-9 branch for Java. Not all java tests passing yet git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507672 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/AMQChannelException.java | 3 +- .../org/apache/qpid/AMQConnectionException.java | 5 +- .../apache/qpid/AMQConnectionFailureException.java | 20 + .../java/org/apache/qpid/AMQTimeoutException.java | 29 + .../org/apache/qpid/AMQUnknownExchangeType.java | 20 + .../org/apache/qpid/common/AMQPFilterTypes.java | 8 +- .../org/apache/qpid/exchange/ExchangeDefaults.java | 23 +- .../main/java/org/apache/qpid/framing/AMQBody.java | 2 +- .../apache/qpid/framing/AMQDataBlockDecoder.java | 89 +- .../apache/qpid/framing/AMQDataBlockEncoder.java | 9 +- .../java/org/apache/qpid/framing/AMQFrame.java | 51 +- .../org/apache/qpid/framing/AMQMethodBody.java | 5 +- .../apache/qpid/framing/AMQMethodBodyFactory.java | 23 +- .../qpid/framing/AMQMethodBodyInstanceFactory.java | 30 + .../org/apache/qpid/framing/AMQRequestBody.java | 30 +- .../apache/qpid/framing/AMQRequestBodyFactory.java | 41 + .../org/apache/qpid/framing/AMQResponseBody.java | 27 +- .../qpid/framing/AMQResponseBodyFactory.java | 41 + .../org/apache/qpid/framing/AMQShortString.java | 374 +++++++ .../main/java/org/apache/qpid/framing/AMQType.java | 6 +- .../org/apache/qpid/framing/AMQTypedValue.java | 25 + .../java/org/apache/qpid/framing/BodyFactory.java | 3 +- .../org/apache/qpid/framing/EncodingUtils.java | 461 ++++++++- .../java/org/apache/qpid/framing/FieldTable.java | 527 ++++++++-- .../org/apache/qpid/framing/HeartbeatBody.java | 16 +- .../apache/qpid/framing/HeartbeatBodyFactory.java | 2 +- .../apache/qpid/framing/JMSPropertyFieldTable.java | 453 --------- .../apache/qpid/framing/ProtocolInitiation.java | 10 + .../org/apache/qpid/framing/RequestManager.java | 4 +- .../org/apache/qpid/framing/ResponseManager.java | 11 +- .../qpid/framing/SmallCompositeAMQDataBlock.java | 97 ++ .../qpid/framing/VersionSpecificRegistry.java | 141 +++ .../src/main/java/org/apache/qpid/pool/Event.java | 98 +- .../src/main/java/org/apache/qpid/pool/Job.java | 34 +- .../java/org/apache/qpid/pool/PoolingFilter.java | 157 +-- .../org/apache/qpid/pool/ReadWriteThreadModel.java | 39 +- .../java/org/apache/qpid/protocol/AMQConstant.java | 8 +- .../protocol/AMQVersionAwareProtocolSession.java | 29 + .../apache/qpid/protocol/ProtocolVersionAware.java | 30 + .../java/org/apache/qpid/url/AMQBindingURL.java | 59 +- .../main/java/org/apache/qpid/url/BindingURL.java | 22 +- .../apache/qpid/util/concurrent/BooleanLatch.java | 42 + .../qpid/framing/JMSPropertyFieldTableTest.java | 1016 -------------------- .../qpid/framing/PropertyFieldTableTest.java | 61 +- .../org/apache/qpid/pool/PoolingFilterTest.java | 13 +- .../java/org/apache/qpid/session/TestSession.java | 20 +- 46 files changed, 2259 insertions(+), 1955 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java delete mode 100644 java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java create mode 100644 java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java create mode 100644 java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java delete mode 100644 java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 996ac23b09..503f92b15d 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -22,6 +22,7 @@ package org.apache.qpid; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; public class AMQChannelException extends AMQException { @@ -51,6 +52,6 @@ public class AMQChannelException extends AMQException public AMQMethodBody getCloseMethodBody() { - return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage()); + return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 93754dbee9..c4772624e9 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -21,6 +21,7 @@ package org.apache.qpid; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; public class AMQConnectionException extends AMQException @@ -30,6 +31,7 @@ public class AMQConnectionException extends AMQException /* AMQP version for which exception ocurred */ private final byte major; private final byte minor; + boolean _closeConnetion; public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { @@ -51,7 +53,7 @@ public class AMQConnectionException extends AMQException public ConnectionCloseBody getCloseMethodBody() { - return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage()); + return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); } public int getClassId() @@ -62,5 +64,4 @@ public class AMQConnectionException extends AMQException public int getMethodId(){ return _methodId; } - } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index 0979598cdb..2406312846 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid; public class AMQConnectionFailureException extends AMQException diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java new file mode 100644 index 0000000000..6af681f479 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +public class AMQTimeoutException extends AMQException +{ + public AMQTimeoutException(String message) + { + super(message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java index 70f333a580..81deb91cdc 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid; public class AMQUnknownExchangeType extends AMQException diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 56219755a3..d8aa9bf5ca 100644 --- a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -20,20 +20,22 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + public enum AMQPFilterTypes { JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), AUTO_CLOSE("x-filter-auto-close"); - private final String _value; + private final AMQShortString _value; AMQPFilterTypes(String value) { - _value = value; + _value = new AMQShortString(value); } - public String getValue() + public AMQShortString getValue() { return _value; } diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index dca8075f7f..899261ef25 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -20,21 +20,28 @@ */ package org.apache.qpid.exchange; +import org.apache.qpid.framing.AMQShortString; + public class ExchangeDefaults { - public final static String TOPIC_EXCHANGE_NAME = "amq.topic"; + public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic"); + + public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic"); + + public final static AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct"); + + public final static AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct"); - public final static String TOPIC_EXCHANGE_CLASS = "topic"; + public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match"); - public final static String DIRECT_EXCHANGE_NAME = "amq.direct"; + public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers"); - public final static String DIRECT_EXCHANGE_CLASS = "direct"; + public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout"); - public final static String HEADERS_EXCHANGE_NAME = "amq.match"; + public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); - public final static String HEADERS_EXCHANGE_CLASS = "headers"; - public final static String FANOUT_EXCHANGE_NAME = "amq.fanout"; + public final static AMQShortString SYSTEM_MANAGEMENT_EXCHANGE_NAME = new AMQShortString("qpid.sysmgmt"); - public final static String FANOUT_EXCHANGE_CLASS = "fanout"; + public final static AMQShortString SYSTEM_MANAGEMENT_CLASS = new AMQShortString("sysmmgmt"); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 36287d2923..ebeea8d2b4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public abstract class AMQBody { - protected abstract byte getFrameType(); + public abstract byte getFrameType(); /** * Get the size of the body diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 62699331e9..5e566a5fe8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -24,93 +24,84 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import java.util.HashMap; import java.util.Map; public class AMQDataBlockDecoder { - Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); + private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY"; - private final Map _supportedBodies = new HashMap(); + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + + static + { + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); public AMQDataBlockDecoder() { - _supportedBodies.put(new Byte(AMQRequestBody.TYPE), new BodyFactory() { - public AMQBody createBody(ByteBuffer in) { - return new AMQRequestBody(); - } - }); - _supportedBodies.put(new Byte(AMQResponseBody.TYPE), new BodyFactory() { - public AMQBody createBody(ByteBuffer in) { - return new AMQResponseBody(); - } - }); - _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); } public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException { - // type, channel, body size and end byte - if (in.remaining() < (1 + 2 + 4 + 1)) + final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) { return false; } - - final byte type = in.get(); - final int channel = in.getUnsignedShort(); + in.skip(1 + 2); final long bodySize = in.getUnsignedInt(); - // bodySize can be zero - if (type <= 0 || channel < 0 || bodySize < 0) + return (remainingAfterAttributes >= bodySize); + } + + + protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + throws AMQFrameDecodingException, AMQProtocolVersionException + { + final byte type = in.get(); + BodyFactory bodyFactory; + if (type == AMQRequestBody.TYPE) { - throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + - " bodySize = " + bodySize); + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQRequestBodyFactory(protocolSession); } - - if (in.remaining() < (bodySize + 1)) + else if (type == AMQResponseBody.TYPE) { - return false; + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQResponseBodyFactory(protocolSession); } - return true; - } - - private boolean isSupportedFrameType(byte frameType) - { - final boolean result = _supportedBodies.containsKey(new Byte(frameType)); - - if (!result) + else { - _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType); + bodyFactory = _bodiesSupported[type]; } - return result; - } - - protected Object createAndPopulateFrame(ByteBuffer in) - throws AMQFrameDecodingException, AMQProtocolVersionException - { - final byte type = in.get(); - if (!isSupportedFrameType(type)) + if(bodyFactory == null) { throw new AMQFrameDecodingException("Unsupported frame type: " + type); } + final int channel = in.getUnsignedShort(); final long bodySize = in.getUnsignedInt(); - BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type)); - if (bodyFactory == null) + // bodySize can be zero + if (channel < 0 || bodySize < 0) { - throw new AMQFrameDecodingException("Unsupported body type: " + type); + throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); } - AMQFrame frame = new AMQFrame(); - frame.populateFromBuffer(in, channel, bodySize, bodyFactory); + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); byte marker = in.get(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type); + throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type); } return frame; } @@ -118,6 +109,6 @@ public class AMQDataBlockDecoder public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(in)); + out.write(createAndPopulateFrame(session, in)); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 3446563d35..478cdeb406 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -28,17 +28,16 @@ import org.apache.mina.filter.codec.demux.MessageEncoder; import java.util.HashSet; import java.util.Set; +import java.util.Collections; -public class AMQDataBlockEncoder implements MessageEncoder +public final class AMQDataBlockEncoder implements MessageEncoder { - Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); + private static final Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); - private Set _messageTypes; + private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class); public AMQDataBlockEncoder() { - _messageTypes = new HashSet(); - _messageTypes.add(EncodableAMQDataBlock.class); } public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 6af691fbe8..11f505fd4b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -24,53 +24,52 @@ import org.apache.mina.common.ByteBuffer; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { - public int channel; + private final int _channel; - public AMQBody bodyFrame; + private final AMQBody _bodyFrame; - public AMQFrame() + + + public AMQFrame(final int channel, final AMQBody bodyFrame) { + _channel = channel; + _bodyFrame = bodyFrame; } - public AMQFrame(int channel, AMQBody bodyFrame) + public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException { - this.channel = channel; - this.bodyFrame = bodyFrame; + this._channel = channel; + this._bodyFrame = bodyFactory.createBody(in,bodySize); } public long getSize() { - return 1 + 2 + 4 + bodyFrame.getSize() + 1; + return 1 + 2 + 4 + _bodyFrame.getSize() + 1; } public void writePayload(ByteBuffer buffer) { - buffer.put(bodyFrame.getFrameType()); - // TODO: how does channel get populated - EncodingUtils.writeUnsignedShort(buffer, channel); - EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize()); - bodyFrame.writePayload(buffer); + buffer.put(_bodyFrame.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, _channel); + EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); + _bodyFrame.writePayload(buffer); buffer.put((byte) 0xCE); } - /** - * - * @param buffer - * @param channel unsigned short - * @param bodySize unsigned integer - * @param bodyFactory - * @throws AMQFrameDecodingException - */ - public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory) - throws AMQFrameDecodingException, AMQProtocolVersionException + public final int getChannel() { - this.channel = channel; - bodyFrame = bodyFactory.createBody(buffer); - bodyFrame.populateFromBuffer(buffer, bodySize); + return _channel; } + public final AMQBody getBodyFrame() + { + return _bodyFrame; + } + + + public String toString() { - return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame); + return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index e140a9b334..ccd1ef32b8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -57,7 +57,7 @@ public abstract class AMQMethodBody extends AMQBody protected abstract void writeMethodPayload(ByteBuffer buffer); - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -110,10 +110,9 @@ public abstract class AMQMethodBody extends AMQBody return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); } - - public AMQConnectionException getConnectionException(int code, String message, Throwable cause) { return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 3dcbe926fe..6067e2fce5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -22,30 +22,21 @@ package org.apache.qpid.framing; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQMethodBodyFactory implements BodyFactory { private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private final AMQVersionAwareProtocolSession _protocolSession; - private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory(); - - public static AMQMethodBodyFactory getInstance() - { - return _instance; - } - - private AMQMethodBodyFactory() + public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession) { - _log.debug("Creating method body factory"); + _protocolSession = protocolSession; } - public AMQMethodBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQMethodBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML - // segments generated together are now handled by MainRegistry. The Cluster class, - // if generated together with amqp.xml is a part of MainRegistry. - // TODO: Connect with version acquired from ProtocolInitiation class. - return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), - (byte)0, (byte)9); + return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java new file mode 100644 index 0000000000..5309a80bdb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + + +public abstract interface AMQMethodBodyInstanceFactory +{ + public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index 6df7dff27b..3bc16601b6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -21,6 +21,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQRequestBody extends AMQBody { @@ -30,16 +31,21 @@ public class AMQRequestBody extends AMQBody protected long requestId; protected long responseMark; protected AMQMethodBody methodPayload; - + protected AMQVersionAwareProtocolSession protocolSession; // Constructor - public AMQRequestBody() {} + public AMQRequestBody(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + public AMQRequestBody(long requestId, long responseMark, AMQMethodBody methodPayload) { this.requestId = requestId; this.responseMark = responseMark; this.methodPayload = methodPayload; + protocolSession = null; } @@ -49,7 +55,7 @@ public class AMQRequestBody extends AMQBody public AMQMethodBody getMethodPayload() { return methodPayload; } - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -68,14 +74,19 @@ public class AMQRequestBody extends AMQBody } protected void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { + if (protocolSession == null) + { + throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor."); + } + requestId = EncodingUtils.readLong(buffer); responseMark = EncodingUtils.readLong(buffer); int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away - AMQMethodBodyFactory factory = AMQMethodBodyFactory.getInstance(); - methodPayload = factory.createBody(buffer); - methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); + + AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession); + methodPayload = methodBodyFactory.createBody(buffer, size); } public String toString() @@ -89,9 +100,6 @@ public class AMQRequestBody extends AMQBody { AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark, methodPayload); - AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = requestFrame; - return frame; + return new AMQFrame(channelId, requestFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java new file mode 100644 index 0000000000..9d47ccd68e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +public class AMQRequestBodyFactory implements BodyFactory +{ + private final AMQVersionAwareProtocolSession protocolSession; + + public AMQRequestBodyFactory(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + AMQRequestBody rb = new AMQRequestBody(protocolSession); + rb.populateFromBuffer(in, bodySize); + return rb; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index c7aee6ac92..7b35aaeb86 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -21,6 +21,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQResponseBody extends AMQBody { @@ -31,9 +32,14 @@ public class AMQResponseBody extends AMQBody protected long requestId; protected int batchOffset; protected AMQMethodBody methodPayload; + protected AMQVersionAwareProtocolSession protocolSession; // Constructor - public AMQResponseBody() {} + public AMQResponseBody(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + public AMQResponseBody(long responseId, long requestId, int batchOffset, AMQMethodBody methodPayload) { @@ -41,6 +47,7 @@ public class AMQResponseBody extends AMQBody this.requestId = requestId; this.batchOffset = batchOffset; this.methodPayload = methodPayload; + protocolSession = null; } // Field methods @@ -49,7 +56,7 @@ public class AMQResponseBody extends AMQBody public int getBatchOffset() { return batchOffset; } public AMQMethodBody getMethodPayload() { return methodPayload; } - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -69,15 +76,18 @@ public class AMQResponseBody extends AMQBody } protected void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { + if (protocolSession == null) + throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor."); + responseId = EncodingUtils.readLong(buffer); requestId = EncodingUtils.readLong(buffer); // XXX batchOffset = EncodingUtils.readInteger(buffer); - AMQMethodBodyFactory factory = AMQMethodBodyFactory.getInstance(); - methodPayload = factory.createBody(buffer); - methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); + + AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession); + methodPayload = methodBodyFactory.createBody(buffer, size); } public String toString() @@ -91,9 +101,6 @@ public class AMQResponseBody extends AMQBody { AMQResponseBody responseFrame = new AMQResponseBody(responseId, requestId, batchOffset, methodPayload); - AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = responseFrame; - return frame; + return new AMQFrame(channelId, responseFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java new file mode 100644 index 0000000000..4209aad11f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +public class AMQResponseBodyFactory implements BodyFactory +{ + private final AMQVersionAwareProtocolSession protocolSession; + + public AMQResponseBodyFactory(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + AMQResponseBody rb = new AMQResponseBody(protocolSession); + rb.populateFromBuffer(in, bodySize); + return rb; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java new file mode 100644 index 0000000000..f98a62751c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -0,0 +1,374 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; + +import java.util.Arrays; + +/** + * A short string is a representation of an AMQ Short String + * Short strings differ from the Java String class by being limited to on ASCII characters (0-127) + * and thus can be held more effectively in a byte buffer. + * + */ +public final class AMQShortString implements CharSequence +{ + private static final Logger _logger = Logger.getLogger(AMQShortString.class); + + private final ByteBuffer _data; + private int _hashCode; + private static final char[] EMPTY_CHAR_ARRAY = new char[0]; + + public AMQShortString(byte[] data) + { + + _data = ByteBuffer.wrap(data); + } + + + public AMQShortString(String data) + { + this(data == null ? EMPTY_CHAR_ARRAY : data.toCharArray()); + if(data != null) _hashCode = data.hashCode(); + } + + public AMQShortString(char[] data) + { + if(data == null) + { + throw new NullPointerException("Cannot create AMQShortString with null char[]"); + } + final int length = data.length; + final byte[] stringBytes = new byte[length]; + for(int i = 0; i < length; i++) + { + stringBytes[i] = (byte) (0xFF & data[i]); + } + + _data = ByteBuffer.wrap(stringBytes); + _data.rewind(); + + } + + public AMQShortString(CharSequence charSequence) + { + final int length = charSequence.length(); + final byte[] stringBytes = new byte[length]; + int hash = 0; + for(int i = 0 ; i < length; i++) + { + stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i))); + hash = (31 * hash) + stringBytes[i]; + + } + _data = ByteBuffer.wrap(stringBytes); + _data.rewind(); + _hashCode = hash; + + } + + private AMQShortString(ByteBuffer data) + { + _data = data; + + } + + + /** + * Get the length of the short string + * @return length of the underlying byte array + */ + public int length() + { + return _data.limit(); + } + + public char charAt(int index) + { + + return (char) _data.get(index); + + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start,end); + } + + public int writeToByteArray(byte[] encoding, int pos) + { + final int size = length(); + encoding[pos++] = (byte) length(); + for(int i = 0; i < size; i++) + { + encoding[pos++] = _data.get(i); + } + return pos; + } + + public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos) + { + + final byte len = byteEncodedDestination[pos]; + if(len == 0) + { + return null; + } + ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination,pos+1,len).slice(); + + + return new AMQShortString(data); + } + + public static AMQShortString readFromBuffer(ByteBuffer buffer) + { + final short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + ByteBuffer data = buffer.slice(); + data.limit(length); + data.rewind(); + buffer.skip(length); + + return new AMQShortString(data); + } + } + + + public byte[] getBytes() + { + + if(_data.buf().hasArray() && _data.arrayOffset() == 0) + { + return _data.array(); + } + else + { + final int size = length(); + byte[] b = new byte[size]; + ByteBuffer buf = _data.duplicate(); + buf.rewind(); + buf.get(b); + + + return b; + } + + + } + + public void writeToBuffer(ByteBuffer buffer) + { + + + final int size = length(); + if (size != 0) + { + + buffer.put((byte)size); + if(_data.buf().hasArray()) + { + buffer.put(_data.array(),_data.arrayOffset(),length()); + } + else + { + + for(int i = 0; i < size; i++) + { + + buffer.put(_data.get(i)); + } + } + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + + } + + private final class CharSubSequence implements CharSequence + { + private final int _offset; + private final int _end; + + + public CharSubSequence(final int offset, final int end) + { + _offset = offset; + _end = end; + } + + + public int length() + { + return _end - _offset; + } + + public char charAt(int index) + { + return AMQShortString.this.charAt(index + _offset); + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start+_offset,end+_offset); + } + } + + + + public char[] asChars() + { + final int size = length(); + final char[] chars = new char[size]; + + + + + for(int i = 0 ; i < size; i++) + { + chars[i] = (char) _data.get(i); + } + return chars; + } + + + + public String asString() + { + return new String(asChars()); + } + + public boolean equals(Object o) + { + if(o == null) + { + return false; + } + if(o == this) + { + return true; + } + if(o instanceof AMQShortString) + { + + final AMQShortString otherString = (AMQShortString) o; + + if((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode)) + { + return false; + } + + return _data.equals(otherString._data); + + + + + } + return (o instanceof CharSequence) && equals((CharSequence)o); + + } + + public boolean equals(CharSequence s) + { + if(s == null) + { + return false; + } + if(s.length() != length()) + { + return false; + } + for(int i = 0; i < length(); i++) + { + if(charAt(i)!= s.charAt(i)) + { + return false; + } + } + return true; + } + + public int hashCode() + { + int hash = _hashCode; + if(hash == 0) + { + final int size = length(); + + + for(int i = 0; i < size; i++) + { + hash = (31 * hash) + _data.get(i); + } + _hashCode = hash; + } + + return hash; + } + + public void setDirty() + { + _hashCode = 0; + } + + public String toString() + { + return asString(); + } + + + public int compareTo(AMQShortString name) + { + if(name == null) + { + return 1; + } + else + { + + if(name.length() < length()) + { + return - name.compareTo(this); + } + + + + for(int i = 0; i < length() ; i++) + { + final byte d = _data.get(i); + final byte n = name._data.get(i); + if(d < n) return -1; + if(d > n) return 1; + } + + return length() == name.length() ? 0 : -1; + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index 23c1929205..5175eace1e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -230,7 +230,7 @@ public enum AMQType { public int getEncodingSize(Object value) { - return 1 + (value == null ? 0 : ((byte[]) value).length); + return EncodingUtils.encodedLongstrLength((byte[]) value); } @@ -250,12 +250,12 @@ public enum AMQType public void writeValueImpl(Object value, ByteBuffer buffer) { - EncodingUtils.writeBytes(buffer, (byte[]) value); + EncodingUtils.writeLongstr(buffer, (byte[]) value); } public Object readValueFromBuffer(ByteBuffer buffer) { - return EncodingUtils.readBytes(buffer); + return EncodingUtils.readLongstr(buffer); } }, diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index b29c23c2a2..efdd903f1f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; @@ -51,4 +71,9 @@ public class AMQTypedValue AMQType type = AMQTypeMap.getType(buffer.get()); return new AMQTypedValue(type, buffer); } + + public String toString() + { + return "["+getType()+": "+getValue()+"]"; + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java index cf5708d993..246e91f1fb 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -21,11 +21,12 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. */ public interface BodyFactory { - AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException; + AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index c88c5d3bd3..f31fa8097e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -35,6 +35,7 @@ public class EncodingUtils public static final int SIZEOF_UNSIGNED_SHORT = 2; public static final int SIZEOF_UNSIGNED_INT = 4; + private static final boolean[] ALL_FALSE_ARRAY = new boolean[8]; public static int encodedShortStringLength(String s) { @@ -48,6 +49,120 @@ public class EncodingUtils } } + + public static int encodedShortStringLength(short s) + { + if( s == 0 ) + { + return 1 + 1; + } + + int len = 0; + if(s < 0) + { + len=1; + // sloppy - doesn't work of Integer.MIN_VALUE + s=(short)-s; + } + + if(s>9999) + { + return 1+5; + } + else if(s>999) + { + return 1+4; + } + else if(s>99) + { + return 1+3; + } + else if(s>9) + { + return 1+2; + } + else + { + return 1+1; + } + + } + + + public static int encodedShortStringLength(int i) + { + if( i == 0 ) + { + return 1 + 1; + } + + int len = 0; + if(i < 0) + { + len=1; + // sloppy - doesn't work of Integer.MIN_VALUE + i=-i; + } + + // range is now 1 - 2147483647 + if(i < Short.MAX_VALUE) + { + return len + encodedShortStringLength((short)i); + } + else if (i > 999999) + { + return len + 6 + encodedShortStringLength((short)(i/1000000)); + } + else // if (i > 99999) + { + return len + 5 + encodedShortStringLength((short)(i/100000)); + } + + } + + public static int encodedShortStringLength(long l) + { + if(l == 0) + { + return 1 + 1; + } + + int len = 0; + if(l < 0) + { + len=1; + // sloppy - doesn't work of Long.MIN_VALUE + l=-l; + } + if(l < Integer.MAX_VALUE) + { + return len + encodedShortStringLength((int) l); + } + else if(l > 9999999999L) + { + return len + 10 + encodedShortStringLength((int) (l / 10000000000L)); + } + else + { + return len + 1 + encodedShortStringLength((int) (l / 10L)); + } + + } + + + public static int encodedShortStringLength(AMQShortString s) + { + if (s == null) + { + return 1; + } + else + { + return (short) (1 + s.length()); + } + } + + public static int encodedLongStringLength(String s) { if (s == null) @@ -122,6 +237,21 @@ public class EncodingUtils } } + + public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s) + { + if (s != null) + { + + s.writeToBuffer(buffer); + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + public static void writeLongStringBytes(ByteBuffer buffer, String s) { assert s == null || s.length() <= 0xFFFE; @@ -224,6 +354,7 @@ public class EncodingUtils } } + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) { if (table != null) @@ -255,6 +386,238 @@ public class EncodingUtils buffer.put(packedValue); } + public static void writeBooleans(ByteBuffer buffer, boolean value) + { + + buffer.put(value ? (byte) 1 : (byte) 0); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + + buffer.put(packedValue); + } + + + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6, + boolean value7) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + if (value7) + { + packedValue = (byte) (packedValue | (byte)(1 << 7)); + } + + buffer.put(packedValue); + } + + + + /** * This is used for writing longstrs. * @@ -282,13 +645,27 @@ public class EncodingUtils public static boolean[] readBooleans(ByteBuffer buffer) { - byte packedValue = buffer.get(); - boolean[] result = new boolean[8]; + final byte packedValue = buffer.get(); + if(packedValue == 0) + { + return ALL_FALSE_ARRAY; + } + final boolean[] result = new boolean[8]; - for (int i = 0; i < 8; i++) + result[0] = ((packedValue & 1) != 0); + result[1] = ((packedValue & (1 << 1)) != 0); + result[2] = ((packedValue & (1 << 2)) != 0); + result[3] = ((packedValue & (1 << 3)) != 0); + if((packedValue & 0xF0) == 0) { - result[i] = ((packedValue & (1 << i)) != 0); + result[0] = ((packedValue & 1) != 0); } + result[4] = ((packedValue & (1 << 4)) != 0); + result[5] = ((packedValue & (1 << 5)) != 0); + result[6] = ((packedValue & (1 << 6)) != 0); + result[7] = ((packedValue & (1 << 7)) != 0); + + return result; } @@ -312,6 +689,12 @@ public class EncodingUtils return content; } + public static AMQShortString readAMQShortString(ByteBuffer buffer) + { + return AMQShortString.readFromBuffer(buffer); + + } + public static String readShortString(ByteBuffer buffer) { short length = buffer.getUnsigned(); @@ -363,7 +746,7 @@ public class EncodingUtils } } - public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException + public static byte[] readLongstr(ByteBuffer buffer) { long length = buffer.getUnsignedInt(); if (length == 0) @@ -468,7 +851,7 @@ public class EncodingUtils buffer.put((byte) (aBoolean ? 1 : 0)); } - public static Boolean readBoolean(ByteBuffer buffer) + public static boolean readBoolean(ByteBuffer buffer) { byte packedValue = buffer.get(); return (packedValue == 1); @@ -485,7 +868,7 @@ public class EncodingUtils buffer.put(aByte); } - public static Byte readByte(ByteBuffer buffer) + public static byte readByte(ByteBuffer buffer) { return buffer.get(); } @@ -502,7 +885,7 @@ public class EncodingUtils buffer.putShort(aShort); } - public static Short readShort(ByteBuffer buffer) + public static short readShort(ByteBuffer buffer) { return buffer.getShort(); } @@ -518,7 +901,7 @@ public class EncodingUtils buffer.putInt(aInteger); } - public static Integer readInteger(ByteBuffer buffer) + public static int readInteger(ByteBuffer buffer) { return buffer.getInt(); } @@ -534,7 +917,7 @@ public class EncodingUtils buffer.putLong(aLong); } - public static Long readLong(ByteBuffer buffer) + public static long readLong(ByteBuffer buffer) { return buffer.getLong(); } @@ -550,7 +933,7 @@ public class EncodingUtils buffer.putFloat(aFloat); } - public static Float readFloat(ByteBuffer buffer) + public static float readFloat(ByteBuffer buffer) { return buffer.getFloat(); } @@ -567,7 +950,7 @@ public class EncodingUtils buffer.putDouble(aDouble); } - public static Double readDouble(ByteBuffer buffer) + public static double readDouble(ByteBuffer buffer) { return buffer.getDouble(); } @@ -627,6 +1010,41 @@ public class EncodingUtils writeByte(buffer, (byte) character); } + public static long readLongAsShortString(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + short pos = 0; + if(length == 0) + { + return 0L; + } + byte digit = buffer.get(); + boolean isNegative; + long result = 0; + if(digit == (byte)'-') + { + isNegative = true; + pos++; + digit = buffer.get(); + } + else + { + isNegative = false; + } + result = digit - (byte)'0'; + pos++; + + while(pos < length) + { + pos++; + digit = buffer.get(); + result = (result << 3) + (result << 1); + result += digit - (byte)'0'; + } + + return result; + } + public static long readUnsignedInteger(ByteBuffer buffer) { long l = 0xFF & buffer.get(); @@ -639,4 +1057,23 @@ public class EncodingUtils return l; } + + + public static void main(String[] args) + { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.setAutoExpand(true); + + long l = (long) Integer.MAX_VALUE; + l += 1024L; + + writeUnsignedInteger(buf, l); + + buf.flip(); + + long l2 = readUnsignedInteger(buf); + + System.out.println("before: " + l); + System.out.println("after: " + l2); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 3c18683609..61bc9090a1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -31,17 +31,16 @@ public class FieldTable { private static final Logger _logger = Logger.getLogger(FieldTable.class); - private LinkedHashMap _properties; + private ByteBuffer _encodedForm; + private LinkedHashMap _properties; + private long _encodedSize; + private static final int INITIAL_HASHMAP_CAPACITY = 16; public FieldTable() { super(); - _properties = new LinkedHashMap(); - } - - /** * Construct a new field table. * @@ -52,14 +51,105 @@ public class FieldTable public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException { this(); - setFromBuffer(buffer, length); + _encodedForm = buffer.slice(); + _encodedForm.limit((int)length); + _encodedSize = length; + buffer.skip((int)length); + } + + + + private AMQTypedValue getProperty(AMQShortString string) + { + synchronized(this) + { + if(_properties == null) + { + if(_encodedForm == null) + { + return null; + } + else + { + populateFromBuffer(); + } + } + } + + if(_properties == null) + { + return null; + } + else + { + return _properties.get(string); + } + } + + private void populateFromBuffer() + { + try + { + setFromBuffer(_encodedForm, _encodedSize); + } + catch (AMQFrameDecodingException e) + { + _logger.error("Error decoding FieldTable in deferred decoding mode ", e); + throw new IllegalArgumentException(e); + } } + private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) + { + initMapIfNecessary(); + _encodedForm = null; + if(val == null) + { + return removeKey(key); + } + AMQTypedValue oldVal = _properties.put(key,val); + if(oldVal != null) + { + _encodedSize -= oldVal.getEncodingSize(); + } + else + { + _encodedSize += EncodingUtils.encodedShortStringLength(key) + 1; + } + _encodedSize += val.getEncodingSize(); + + return oldVal; + } + + private void initMapIfNecessary() + { + synchronized(this) + { + if(_properties == null) + { + if(_encodedForm == null) + { + _properties = new LinkedHashMap(); + } + else + { + populateFromBuffer(); + } + } + + } + } + public Boolean getBoolean(String string) { - AMQTypedValue value = _properties.get(string); + return getBoolean(new AMQShortString(string)); + } + + public Boolean getBoolean(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.BOOLEAN)) { return (Boolean) value.getValue(); @@ -70,9 +160,15 @@ public class FieldTable } } + public Byte getByte(String string) { - AMQTypedValue value = _properties.get(string); + return getByte(new AMQShortString(string)); + } + + public Byte getByte(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.BYTE)) { return (Byte) value.getValue(); @@ -85,7 +181,12 @@ public class FieldTable public Short getShort(String string) { - AMQTypedValue value = _properties.get(string); + return getShort(new AMQShortString(string)); + } + + public Short getShort(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.SHORT)) { return (Short) value.getValue(); @@ -98,7 +199,12 @@ public class FieldTable public Integer getInteger(String string) { - AMQTypedValue value = _properties.get(string); + return getInteger(new AMQShortString(string)); + } + + public Integer getInteger(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.INT)) { return (Integer) value.getValue(); @@ -111,7 +217,12 @@ public class FieldTable public Long getLong(String string) { - AMQTypedValue value = _properties.get(string); + return getLong(new AMQShortString(string)); + } + + public Long getLong(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.LONG)) { return (Long) value.getValue(); @@ -124,7 +235,12 @@ public class FieldTable public Float getFloat(String string) { - AMQTypedValue value = _properties.get(string); + return getFloat(new AMQShortString(string)); + } + + public Float getFloat(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.FLOAT)) { return (Float) value.getValue(); @@ -137,7 +253,12 @@ public class FieldTable public Double getDouble(String string) { - AMQTypedValue value = _properties.get(string); + return getDouble(new AMQShortString(string)); + } + + public Double getDouble(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.DOUBLE)) { return (Double) value.getValue(); @@ -150,7 +271,12 @@ public class FieldTable public String getString(String string) { - AMQTypedValue value = _properties.get(string); + return getString(new AMQShortString(string)); + } + + public String getString(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if ((value != null) && ((value.getType() == AMQType.WIDE_STRING) || (value.getType() == AMQType.ASCII_STRING))) { @@ -170,7 +296,12 @@ public class FieldTable public Character getCharacter(String string) { - AMQTypedValue value = _properties.get(string); + return getCharacter(new AMQShortString(string)); + } + + public Character getCharacter(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.ASCII_CHARACTER)) { return (Character) value.getValue(); @@ -183,7 +314,12 @@ public class FieldTable public byte[] getBytes(String string) { - AMQTypedValue value = _properties.get(string); + return getBytes(new AMQShortString(string)); + } + + public byte[] getBytes(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.BINARY)) { return (byte[]) value.getValue(); @@ -196,7 +332,12 @@ public class FieldTable public Object getObject(String string) { - AMQTypedValue value = _properties.get(string); + return getObject(new AMQShortString(string)); + } + + public Object getObject(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if(value != null) { return value.getValue(); @@ -209,83 +350,150 @@ public class FieldTable } // ************ Setters - public Object setBoolean(String string, boolean b) + { + return setBoolean(new AMQShortString(string), b); + } + + public Object setBoolean(AMQShortString string, boolean b) { checkPropertyName(string); - return _properties.put(string, AMQType.BOOLEAN.asTypedValue(b)); + return setProperty(string, AMQType.BOOLEAN.asTypedValue(b)); } public Object setByte(String string, byte b) + { + return setByte(new AMQShortString(string), b); + } + + public Object setByte(AMQShortString string, byte b) { checkPropertyName(string); - return _properties.put(string, AMQType.BYTE.asTypedValue(b)); + return setProperty(string, AMQType.BYTE.asTypedValue(b)); } public Object setShort(String string, short i) + { + return setShort(new AMQShortString(string), i); + } + + public Object setShort(AMQShortString string, short i) { checkPropertyName(string); - return _properties.put(string, AMQType.SHORT.asTypedValue(i)); + return setProperty(string, AMQType.SHORT.asTypedValue(i)); } + public Object setInteger(String string, int i) + { + return setInteger(new AMQShortString(string), i); + } + + public Object setInteger(AMQShortString string, int i) { checkPropertyName(string); - return _properties.put(string, AMQType.INT.asTypedValue(i)); + return setProperty(string, AMQType.INT.asTypedValue(i)); } + public Object setLong(String string, long l) + { + return setLong(new AMQShortString(string), l); + } + + public Object setLong(AMQShortString string, long l) { checkPropertyName(string); - return _properties.put(string, AMQType.LONG.asTypedValue(l)); + return setProperty(string, AMQType.LONG.asTypedValue(l)); } - public Object setFloat(String string, float v) + + public Object setFloat(String string, float f) + { + return setFloat(new AMQShortString(string), f); + } + + public Object setFloat(AMQShortString string, float v) { checkPropertyName(string); - return _properties.put(string, AMQType.FLOAT.asTypedValue(v)); + return setProperty(string, AMQType.FLOAT.asTypedValue(v)); } - public Object setDouble(String string, double v) + public Object setDouble(String string, double d) + { + return setDouble(new AMQShortString(string), d); + } + + + public Object setDouble(AMQShortString string, double v) { checkPropertyName(string); - return _properties.put(string, AMQType.DOUBLE.asTypedValue(v)); + return setProperty(string, AMQType.DOUBLE.asTypedValue(v)); + } + + + public Object setString(String string, String s) + { + return setString(new AMQShortString(string), s); } - public Object setString(String string, String value) + public Object setAsciiString(AMQShortString string, String value) { checkPropertyName(string); if (value == null) { - return _properties.put(string, AMQType.VOID.asTypedValue(null)); + return setProperty(string, AMQType.VOID.asTypedValue(null)); } else { - //FIXME: determine string encoding and set either WIDE or ASCII string -// if () - { - return _properties.put(string, AMQType.WIDE_STRING.asTypedValue(value)); - } -// else -// { -// return _properties.put(string, AMQType.ASCII_STRING.asTypedValue(value)); -// } + return setProperty(string, AMQType.ASCII_STRING.asTypedValue(value)); } } + public Object setString(AMQShortString string, String value) + { + checkPropertyName(string); + if (value == null) + { + return setProperty(string, AMQType.VOID.asTypedValue(null)); + } + else + { + return setProperty(string, AMQType.LONG_STRING.asTypedValue(value)); + } + } + + public Object setChar(String string, char c) + { + return setChar(new AMQShortString(string), c); + } + + + public Object setChar(AMQShortString string, char c) { checkPropertyName(string); - return _properties.put(string, AMQType.ASCII_CHARACTER.asTypedValue(c)); + return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c)); } - public Object setBytes(String string, byte[] bytes) + + public Object setBytes(String string, byte[] b) + { + return setBytes(new AMQShortString(string), b); + } + + public Object setBytes(AMQShortString string, byte[] bytes) { checkPropertyName(string); - return _properties.put(string, AMQType.BINARY.asTypedValue(bytes)); + return setProperty(string, AMQType.BINARY.asTypedValue(bytes)); } public Object setBytes(String string, byte[] bytes, int start, int length) + { + return setBytes(new AMQShortString(string), bytes,start,length); + } + + public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) { checkPropertyName(string); byte[] newBytes = new byte[length]; @@ -293,8 +501,12 @@ public class FieldTable return setBytes(string, bytes); } + public Object setObject(String string, Object o) + { + return setObject(new AMQShortString(string), o); + } - public Object setObject(String string, Object object) + public Object setObject(AMQShortString string, Object object) { if (object instanceof Boolean) { @@ -343,7 +555,7 @@ public class FieldTable public boolean isNullStringValue(String name) { - AMQTypedValue value = _properties.get(name); + AMQTypedValue value = getProperty(new AMQShortString(name)); return (value != null) && (value.getType() == AMQType.VOID); } @@ -351,7 +563,12 @@ public class FieldTable public Enumeration getPropertyNames() { - return Collections.enumeration(_properties.keySet()); + return Collections.enumeration(keys()); + } + + public boolean propertyExists(AMQShortString propertyName) + { + return itemExists(propertyName); } public boolean propertyExists(String propertyName) @@ -359,25 +576,34 @@ public class FieldTable return itemExists(propertyName); } - public boolean itemExists(String string) + public boolean itemExists(AMQShortString string) { + initMapIfNecessary(); return _properties.containsKey(string); } + public boolean itemExists(String string) + { + return itemExists(new AMQShortString(string)); + } + public String toString() { + initMapIfNecessary(); +// if (_encodedForm != null) +// _encodedForm.rewind(); return _properties.toString(); } - private void checkPropertyName(String propertyName) + private void checkPropertyName(AMQShortString propertyName) { if (propertyName == null) { throw new IllegalArgumentException("Property name must not be null"); } - else if ("".equals(propertyName)) + else if (propertyName.length()==0) { throw new IllegalArgumentException("Property name must not be the empty string"); } @@ -386,7 +612,7 @@ public class FieldTable } - protected static void checkIdentiferFormat(String propertyName) + protected static void checkIdentiferFormat(AMQShortString propertyName) { // AMQP Spec: 4.2.5.5 Field Tables // Guidelines for implementers: @@ -448,20 +674,31 @@ public class FieldTable public long getEncodedSize() { + return _encodedSize; + } + + private void recalculateEncodedSize() + { + int encodedSize = 0; - for(Map.Entry e : _properties.entrySet()) + if(_properties != null) { - encodedSize += EncodingUtils.encodedShortStringLength(e.getKey()); - encodedSize++; // the byte for the encoding Type - encodedSize += e.getValue().getEncodingSize(); + for(Map.Entry e : _properties.entrySet()) + { + encodedSize += EncodingUtils.encodedShortStringLength(e.getKey()); + encodedSize++; // the byte for the encoding Type + encodedSize += e.getValue().getEncodingSize(); + } } - return encodedSize; + _encodedSize = encodedSize; } public void addAll(FieldTable fieldTable) { + initMapIfNecessary(); _properties.putAll(fieldTable._properties); + recalculateEncodedSize(); } @@ -473,135 +710,213 @@ public class FieldTable public Object processOverElements(FieldTableElementProcessor processor) { - for(Map.Entry e : _properties.entrySet()) + initMapIfNecessary(); + if(_properties != null) { - boolean result = processor.processElement(e.getKey(), e.getValue()); - if(!result) + for(Map.Entry e : _properties.entrySet()) { - break; + boolean result = processor.processElement(e.getKey().toString(), e.getValue()); + if(!result) + { + break; + } } } return processor.getResult(); + + } public int size() { + initMapIfNecessary(); return _properties.size(); + } public boolean isEmpty() { - return _properties.isEmpty(); + return size() ==0; } - public boolean containsKey(String key) + public boolean containsKey(AMQShortString key) { + initMapIfNecessary(); return _properties.containsKey(key); } + public boolean containsKey(String key) + { + return containsKey(new AMQShortString(key)); + } + public Set keys() { - return _properties.keySet(); + initMapIfNecessary(); + Set keys = new LinkedHashSet(); + for(AMQShortString key : _properties.keySet()) + { + keys.add(key.toString()); + } + return keys; } - public Object get(Object key) + public Object get(AMQShortString key) { - return getObject((String)key); + return getObject(key); } - public Object put(Object key, Object value) + + public Object put(AMQShortString key, Object value) { - return setObject(key.toString(), value); + return setObject(key, value); } - + public Object remove(String key) { + + return remove(new AMQShortString(key)); + + } + + public Object remove(AMQShortString key) + { + AMQTypedValue val = removeKey(key); + return val == null ? null : val.getValue(); + + } + + + public AMQTypedValue removeKey(AMQShortString key) + { + initMapIfNecessary(); + _encodedForm = null; AMQTypedValue value = _properties.remove(key); - return value == null ? null : value.getValue(); + if(value == null) + { + return null; + } + else + { + _encodedSize -= EncodingUtils.encodedShortStringLength(key); + _encodedSize--; + _encodedSize -= value.getEncodingSize(); + return value; + } + } public void clear() { + initMapIfNecessary(); + _encodedForm = null; _properties.clear(); + _encodedSize = 0; } - public Set keySet() + public Set keySet() { + initMapIfNecessary(); return _properties.keySet(); } private void putDataInBuffer(ByteBuffer buffer) { - final Iterator> it = _properties.entrySet().iterator(); + if(_encodedForm != null) + { + if (_encodedForm.remaining() == 0) + { + _encodedForm.rewind(); + } + buffer.put(_encodedForm); + } + else if(_properties != null) + { + final Iterator> it = _properties.entrySet().iterator(); - //If there are values then write out the encoded Size... could check _encodedSize != 0 - // write out the total length, which we have kept up to date as data is added + //If there are values then write out the encoded Size... could check _encodedSize != 0 + // write out the total length, which we have kept up to date as data is added - while (it.hasNext()) - { - final Map.Entry me = it.next(); - try + while (it.hasNext()) { - if (_logger.isTraceEnabled()) + final Map.Entry me = it.next(); + try { - _logger.trace("Writing Property:" + me.getKey() + - " Type:" + me.getValue().getType() + - " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + - " Remaining:" + buffer.remaining()); - } + if (_logger.isTraceEnabled()) + { + _logger.trace("Writing Property:" + me.getKey() + + " Type:" + me.getValue().getType() + + " Value:" + me.getValue().getValue()); + _logger.trace("Buffer Position:" + buffer.position() + + " Remaining:" + buffer.remaining()); + } - //Write the actual parameter name - EncodingUtils.writeShortStringBytes(buffer, me.getKey()); - me.getValue().writeToBuffer(buffer); - } - catch (Exception e) - { - if (_logger.isTraceEnabled()) + //Write the actual parameter name + EncodingUtils.writeShortStringBytes(buffer, me.getKey()); + me.getValue().writeToBuffer(buffer); + } + catch (Exception e) { - _logger.trace("Exception thrown:" + e); - _logger.trace("Writing Property:" + me.getKey() + - " Type:" + me.getValue().getType() + - " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + - " Remaining:" + buffer.remaining()); + if (_logger.isTraceEnabled()) + { + _logger.trace("Exception thrown:" + e); + _logger.trace("Writing Property:" + me.getKey() + + " Type:" + me.getValue().getType() + + " Value:" + me.getValue().getValue()); + _logger.trace("Buffer Position:" + buffer.position() + + " Remaining:" + buffer.remaining()); + } + throw new RuntimeException(e); } - throw new RuntimeException(e); } } } - public void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException + private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException { - final boolean trace = _logger.isTraceEnabled(); - int sizeRead = 0; - while (sizeRead < length) + final boolean trace = _logger.isTraceEnabled(); + if(length > 0) { - int sizeRemaining = buffer.remaining(); - final String key = EncodingUtils.readShortString(buffer); - AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); - sizeRead += (sizeRemaining - buffer.remaining()); - if (trace) + final int expectedRemaining = buffer.remaining()-(int)length; + + _properties = new LinkedHashMap(INITIAL_HASHMAP_CAPACITY); + + do { - _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "' (now read " + sizeRead + " of " + length + " encoded bytes)..."); + + final AMQShortString key = EncodingUtils.readAMQShortString(buffer); + AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); + + if (trace) + { + _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "'"); + } + + + + _properties.put(key,value); + + + } + while (buffer.remaining() > expectedRemaining); - _properties.put(key,value); } + _encodedSize = length; if (trace) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7a160ef471..7246c4a1cf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -27,7 +27,21 @@ public class HeartbeatBody extends AMQBody public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); - protected byte getFrameType() + public HeartbeatBody() + { + + } + + public HeartbeatBody(ByteBuffer buffer, long size) + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + + public byte getFrameType() { return TYPE; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index 97bd3d9253..c7ada708dc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java deleted file mode 100644 index d78034cf2f..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java +++ /dev/null @@ -1,453 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQPInvalidClassException; - -import javax.jms.MessageFormatException; -import javax.jms.JMSException; -import java.util.Enumeration; - - -public class JMSPropertyFieldTable -{ - private FieldTable _fieldtable; - - public JMSPropertyFieldTable(FieldTable table) - { - _fieldtable = table; - } - - - private void checkPropertyName(String propertyName) - { - if (propertyName == null) - { - throw new IllegalArgumentException("Property name must not be null"); - } - else if ("".equals(propertyName)) - { - throw new IllegalArgumentException("Property name must not be the empty string"); - } - - checkIdentiferFormat(propertyName); - } - - protected static void checkIdentiferFormat(String propertyName) - { -// JMS requirements 3.5.1 Property Names -// Identifiers: -// - An identifier is an unlimited-length character sequence that must begin -// with a Java identifier start character; all following characters must be Java -// identifier part characters. An identifier start character is any character for -// which the method Character.isJavaIdentifierStart returns true. This includes -// '_' and '$'. An identifier part character is any character for which the -// method Character.isJavaIdentifierPart returns true. -// - Identifiers cannot be the names NULL, TRUE, or FALSE. -// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or -// ESCAPE. -// – Identifiers are either header field references or property references. The -// type of a property value in a message selector corresponds to the type -// used to set the property. If a property that does not exist in a message is -// referenced, its value is NULL. The semantics of evaluating NULL values -// in a selector are described in Section 3.8.1.2, “Null Values.” -// – The conversions that apply to the get methods for properties do not -// apply when a property is used in a message selector expression. For -// example, suppose you set a property as a string value, as in the -// following: -// myMessage.setStringProperty("NumberOfOrders", "2"); -// The following expression in a message selector would evaluate to false, -// because a string cannot be used in an arithmetic expression: -// "NumberOfOrders > 1" -// – Identifiers are case sensitive. -// – Message header field references are restricted to JMSDeliveryMode, -// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and -// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be -// null and if so are treated as a NULL value. - - if (Boolean.getBoolean("strict-jms")) - { - // JMS start character - if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); - } - - // JMS part character - int length = propertyName.length(); - for (int c = 1; c < length; c++) - { - if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); - } - } - - // JMS invalid names - if ((propertyName.equals("NULL") - || propertyName.equals("TRUE") - || propertyName.equals("FALSE") - || propertyName.equals("NOT") - || propertyName.equals("AND") - || propertyName.equals("OR") - || propertyName.equals("BETWEEN") - || propertyName.equals("LIKE") - || propertyName.equals("IN") - || propertyName.equals("IS") - || propertyName.equals("ESCAPE"))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); - } - } - - } - - // MapMessage Interface - public boolean getBoolean(String string) throws JMSException - { - Boolean b = _fieldtable.getBoolean(string); - - if (b == null) - { - if (_fieldtable.containsKey(string)) - { - Object str = _fieldtable.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } - } - - return b; - } - - public char getCharacter(String string) throws JMSException - { - Character c = _fieldtable.getCharacter(string); - - if (c == null) - { - if (_fieldtable.isNullStringValue(string)) - { - throw new NullPointerException("Cannot convert null char"); - } - else - { - throw new MessageFormatException("getChar can't use " + string + " item."); - } - } - else - { - return (char) c; - } - } - - public byte[] getBytes(String string) throws JMSException - { - byte[] bs = _fieldtable.getBytes(string); - - if (bs == null) - { - throw new MessageFormatException("getBytes can't use " + string + " item."); - } - else - { - return bs; - } - } - - public byte getByte(String string) throws JMSException - { - Byte b = _fieldtable.getByte(string); - if (b == null) - { - if (_fieldtable.containsKey(string)) - { - Object str = _fieldtable.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getByte can't use " + string + " item."); - } - else - { - return Byte.valueOf((String) str); - } - } - else - { - b = Byte.valueOf(null); - } - } - - return b; - } - - public short getShort(String string) throws JMSException - { - Short s = _fieldtable.getShort(string); - - if (s == null) - { - s = Short.valueOf(getByte(string)); - } - - return s; - } - - public int getInteger(String string) throws JMSException - { - Integer i = _fieldtable.getInteger(string); - - if (i == null) - { - i = Integer.valueOf(getShort(string)); - } - - return i; - } - - public long getLong(String string) throws JMSException - { - Long l = _fieldtable.getLong(string); - - if (l == null) - { - l = Long.valueOf(getInteger(string)); - } - - return l; - } - - public float getFloat(String string) throws JMSException - { - Float f = _fieldtable.getFloat(string); - - if (f == null) - { - if (_fieldtable.containsKey(string)) - { - Object str = _fieldtable.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getFloat can't use " + string + " item."); - } - else - { - return Float.valueOf((String) str); - } - } - else - { - f = Float.valueOf(null); - } - - } - - return f; - } - - public double getDouble(String string) throws JMSException - { - Double d = _fieldtable.getDouble(string); - - if (d == null) - { - d = Double.valueOf(getFloat(string)); - } - - return d; - } - - public String getString(String string) throws JMSException - { - String s = _fieldtable.getString(string); - - if (s == null) - { - if (_fieldtable.containsKey(string)) - { - Object o = _fieldtable.getObject(string); - if (o instanceof byte[]) - { - throw new MessageFormatException("getObject couldn't find " + string + " item."); - } - else - { - if (o == null) - { - return null; - } - else - { - s = String.valueOf(o); - } - } - } - } - - return s; - } - - public Object getObject(String string) throws JMSException - { - return _fieldtable.getObject(string); - } - - public void setBoolean(String string, boolean b) throws JMSException - { - checkPropertyName(string); - _fieldtable.setBoolean(string, b); - } - - public void setChar(String string, char c) throws JMSException - { - checkPropertyName(string); - _fieldtable.setChar(string, c); - } - - public Object setBytes(String string, byte[] bytes) - { - return _fieldtable.setBytes(string, bytes, 0, bytes.length); - } - - public Object setBytes(String string, byte[] bytes, int start, int length) - { - return _fieldtable.setBytes(string, bytes, start, length); - } - - public void setByte(String string, byte b) throws JMSException - { - checkPropertyName(string); - _fieldtable.setByte(string, b); - } - - public void setShort(String string, short i) throws JMSException - { - checkPropertyName(string); - _fieldtable.setShort(string, i); - } - - public void setInteger(String string, int i) throws JMSException - { - checkPropertyName(string); - _fieldtable.setInteger(string, i); - } - - public void setLong(String string, long l) throws JMSException - { - checkPropertyName(string); - _fieldtable.setLong(string, l); - } - - public void setFloat(String string, float v) throws JMSException - { - checkPropertyName(string); - _fieldtable.setFloat(string, v); - } - - public void setDouble(String string, double v) throws JMSException - { - checkPropertyName(string); - _fieldtable.setDouble(string, v); - } - - public void setString(String string, String string1) throws JMSException - { - checkPropertyName(string); - _fieldtable.setString(string, string1); - } - - public void setObject(String string, Object object) throws JMSException - { - checkPropertyName(string); - try - { - _fieldtable.setObject(string, object); - } - catch (AMQPInvalidClassException aice) - { - throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); - } - } - - public boolean itemExists(String string) throws JMSException - { - return _fieldtable.containsKey(string); - } - - public void setFieldTable(FieldTable headers) - { - _fieldtable = headers; - } - - public Enumeration getPropertyNames() - { - return _fieldtable.getPropertyNames(); - } - - public void clear() - { - _fieldtable.clear(); - } - - public boolean propertyExists(String propertyName) - { - return _fieldtable.propertyExists(propertyName); - } - - public Object put(Object key, Object value) - { - return _fieldtable.put(key, value); - } - - public Object remove(String propertyName) - { - return _fieldtable.remove(propertyName); - } - - public boolean isEmpty() - { - return _fieldtable.isEmpty(); - } - - public void writeToBuffer(ByteBuffer data) - { - _fieldtable.writeToBuffer(data); - } - - public Enumeration getMapNames() - { - return getPropertyNames(); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index f2d1a70cdc..697a0f4249 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -164,4 +164,14 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData protocolMajor + "." + protocolMinor + " not found in protocol version list."); } } + + public String toString() + { + StringBuffer buffer = new StringBuffer(new String(header)); + buffer.append(Integer.toHexString(protocolClass)); + buffer.append(Integer.toHexString(protocolInstance)); + buffer.append(Integer.toHexString(protocolMajor)); + buffer.append(Integer.toHexString(protocolMinor)); + return buffer.toString(); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index 836c4ad985..eab7ad0132 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -40,7 +40,7 @@ public class RequestManager * to be known. */ private boolean serverFlag; - private int connectionId; + private long connectionId; /** * Request and response frames must have a requestID and responseID which @@ -56,7 +56,7 @@ public class RequestManager private ConcurrentHashMap requestSentMap; - public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag) + public RequestManager(long connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; this.protocolWriter = protocolWriter; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 3148603d65..9896bde5f8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -43,7 +43,7 @@ public class ResponseManager * to be known. */ private boolean serverFlag; - private int connectionId; + private long connectionId; private int maxAccumulatedResponses = 20; // Default // private Class currentResponseMethodBodyClass; @@ -80,11 +80,18 @@ public class ResponseManager { return (int)(requestId - o.requestId); } + + public String toString() + { + return requestId + ":" + (responseMethodBody == null ? + "null" : + "C" + responseMethodBody.getClazz() + " M" + responseMethodBody.getMethod()); + } } private ConcurrentHashMap responseMap; - public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener, + public ResponseManager(long connectionId, int channel, AMQMethodListener methodListener, AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java new file mode 100644 index 0000000000..0dd1bdc102 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock +{ + private ByteBuffer _encodedBlock; + + private AMQDataBlock _block; + + public SmallCompositeAMQDataBlock(AMQDataBlock block) + { + _block = block; + } + + /** + * The encoded block will be logically first before the AMQDataBlocks which are encoded + * into the buffer afterwards. + * @param encodedBlock already-encoded data + * @param block a block to be encoded. + */ + public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block) + { + this(block); + _encodedBlock = encodedBlock; + } + + public AMQDataBlock getBlock() + { + return _block; + } + + public ByteBuffer getEncodedBlock() + { + return _encodedBlock; + } + + public long getSize() + { + long frameSize = _block.getSize(); + + if (_encodedBlock != null) + { + _encodedBlock.rewind(); + frameSize += _encodedBlock.remaining(); + } + return frameSize; + } + + public void writePayload(ByteBuffer buffer) + { + if (_encodedBlock != null) + { + buffer.put(_encodedBlock); + } + _block.writePayload(buffer); + + } + + public String toString() + { + if (_block == null) + { + return "No blocks contained in composite frame"; + } + else + { + StringBuilder buf = new StringBuilder(this.getClass().getName()); + buf.append("{encodedBlock=").append(_encodedBlock); + + buf.append(" _block=[").append(_block.toString()).append("]"); + + buf.append("}"); + return buf.toString(); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java new file mode 100644 index 0000000000..9bc8232d61 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; + +public class VersionSpecificRegistry +{ + private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class); + + + private final byte _protocolMajorVersion; + private final byte _protocolMinorVersion; + + private static final int DEFAULT_MAX_CLASS_ID = 200; + private static final int DEFAULT_MAX_METHOD_ID = 50; + + private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][]; + + public VersionSpecificRegistry(byte major, byte minor) + { + _protocolMajorVersion = major; + _protocolMinorVersion = minor; + } + + public byte getProtocolMajorVersion() + { + return _protocolMajorVersion; + } + + public byte getProtocolMinorVersion() + { + return _protocolMinorVersion; + } + + public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID) + { + try + { + return _registry[classID][methodID]; + } + catch (IndexOutOfBoundsException e) + { + return null; + } + catch (NullPointerException e) + { + return null; + } + } + + public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory) + { + if(_registry.length <= classID) + { + AMQMethodBodyInstanceFactory[][] oldRegistry = _registry; + _registry = new AMQMethodBodyInstanceFactory[classID+1][]; + System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length); + } + + if(_registry[classID] == null) + { + _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1]; + } + else if(_registry[classID].length <= methodID) + { + AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID]; + _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1]; + System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length); + } + + _registry[classID][methodID] = instanceFactory; + + } + + + public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) + throws AMQFrameDecodingException + { + AMQMethodBodyInstanceFactory bodyFactory; + try + { + bodyFactory = _registry[classID][methodID]; + } + catch(NullPointerException e) + { + throw new AMQFrameDecodingException(_log, + "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); + } + catch(IndexOutOfBoundsException e) + { + if(classID >= _registry.length) + { + throw new AMQFrameDecodingException(_log, + "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); + + } + else + { + throw new AMQFrameDecodingException(_log, + "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); + + } + } + + + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(_log, + "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); + } + + + return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size); + + + } +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java index 7364b9293a..43ff8f6a19 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -25,90 +25,66 @@ import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoSession; import org.apache.mina.common.IdleStatus; -/** - * Represents an operation on IoFilter. - */ -enum EventType -{ - OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION -} -class Event +abstract public class Event { - private static final Logger _log = Logger.getLogger(Event.class); - - private final EventType type; - private final IoFilter.NextFilter nextFilter; - private final Object data; - - public Event(IoFilter.NextFilter nextFilter, EventType type, Object data) - { - this.type = type; - this.nextFilter = nextFilter; - this.data = data; - if (type == EventType.EXCEPTION) - { - _log.error("Exception event constructed: " + data, (Throwable) data); - } - } - public Object getData() + public Event() { - return data; } - public IoFilter.NextFilter getNextFilter() - { - return nextFilter; - } + abstract public void process(IoSession session); - public EventType getType() + public static final class ReceivedEvent extends Event { - return type; - } + private final Object _data; - void process(IoSession session) - { - if (_log.isDebugEnabled()) - { - _log.debug("Processing " + this); - } - if (type == EventType.RECEIVED) - { - nextFilter.messageReceived(session, data); - //ByteBufferUtil.releaseIfPossible( data ); - } - else if (type == EventType.SENT) + private final IoFilter.NextFilter _nextFilter; + + public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) { - nextFilter.messageSent(session, data); - //ByteBufferUtil.releaseIfPossible( data ); + super(); + _nextFilter = nextFilter; + _data = data; } - else if (type == EventType.EXCEPTION) + + public void process(IoSession session) { - nextFilter.exceptionCaught(session, (Throwable) data); + _nextFilter.messageReceived(session, _data); } - else if (type == EventType.IDLE) + + public IoFilter.NextFilter getNextFilter() { - nextFilter.sessionIdle(session, (IdleStatus) data); + return _nextFilter; } - else if (type == EventType.OPENED) + } + + + public static final class WriteEvent extends Event + { + private final IoFilter.WriteRequest _data; + private final IoFilter.NextFilter _nextFilter; + + public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) { - nextFilter.sessionOpened(session); + super(); + _nextFilter = nextFilter; + _data = data; } - else if (type == EventType.WRITE) + + + public void process(IoSession session) { - nextFilter.filterWrite(session, (IoFilter.WriteRequest) data); + _nextFilter.filterWrite(session, _data); } - else if (type == EventType.CLOSED) + + public IoFilter.NextFilter getNextFilter() { - nextFilter.sessionClosed(session); + return _nextFilter; } } - public String toString() - { - return "Event: type " + type + ", data: " + data; - } + } diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index b9673cd48f..9b3bcfa008 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger; * Holds events for a session that will be processed asynchronously by * the thread pool in PoolingFilter. */ -class Job implements Runnable +public class Job implements Runnable { private final int _maxEvents; private final IoSession _session; private final java.util.Queue _eventQueue = new ConcurrentLinkedQueue(); private final AtomicBoolean _active = new AtomicBoolean(); - private final AtomicInteger _refCount = new AtomicInteger(); + //private final AtomicInteger _refCount = new AtomicInteger(); private final JobCompletionHandler _completionHandler; Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) @@ -45,21 +45,21 @@ class Job implements Runnable _completionHandler = completionHandler; _maxEvents = maxEvents; } - - void acquire() - { - _refCount.incrementAndGet(); - } - - void release() - { - _refCount.decrementAndGet(); - } - - boolean isReferenced() - { - return _refCount.get() > 0; - } +// +// void acquire() +// { +// _refCount.incrementAndGet(); +// } +// +// void release() +// { +// _refCount.decrementAndGet(); +// } +// +// boolean isReferenced() +// { +// return _refCount.get() > 0; +// } void add(Event evt) { diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 38cfa68c78..9d04827246 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -25,57 +25,60 @@ import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; +import java.util.EnumSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler { private static final Logger _logger = Logger.getLogger(PoolingFilter.class); - public static final Set READ_EVENTS = new HashSet(Arrays.asList(EventType.RECEIVED)); - public static final Set WRITE_EVENTS = new HashSet(Arrays.asList(EventType.WRITE)); private final ConcurrentMap _jobs = new ConcurrentHashMap(); private final ReferenceCountingExecutorService _poolReference; - private final Set _asyncTypes; private final String _name; private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set asyncTypes, String name) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { _poolReference = refCountingPool; - _asyncTypes = asyncTypes; _name = name; } - private void fireEvent(IoSession session, Event event) + void fireAsynchEvent(IoSession session, Event event) { - if (_asyncTypes.contains(event.getType())) - { - Job job = getJobForSession(session); - job.acquire(); //prevents this job being removed from _jobs - job.add(event); + Job job = getJobForSession(session); + // job.acquire(); //prevents this job being removed from _jobs + job.add(event); - //Additional checks on pool to check that it hasn't shutdown. - // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) - { - _poolReference.getPool().execute(job); - } - } - else + //Additional checks on pool to check that it hasn't shutdown. + // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) { - event.process(session); + _poolReference.getPool().execute(job); } + + } + + public void createNewJobForSession(IoSession session) + { + Job job = new Job(session, this, _maxEvents); + session.setAttribute(_name, job); } private Job getJobForSession(IoSession session) { - Job job = _jobs.get(session); - return job == null ? createJobForSession(session) : job; + return (Job) session.getAttribute(_name); + +/* if(job == null) + { + System.err.println("Error in " + _name); + Thread.dumpStack(); + } + + + job = _jobs.get(session); + return job == null ? createJobForSession(session) : job;*/ } private Job createJobForSession(IoSession session) @@ -93,15 +96,16 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH //Job.JobCompletionHandler public void completed(IoSession session, Job job) { - if (job.isComplete()) - { - job.release(); - if (!job.isReferenced()) - { - _jobs.remove(session); - } - } - else +// if (job.isComplete()) +// { +// job.release(); +// if (!job.isReferenced()) +// { +// _jobs.remove(session); +// } +// } +// else + if(!job.isComplete()) { // ritchiem : 2006-12-13 Do we need to perform the additional checks here? // Can the pool be shutdown at this point? @@ -114,45 +118,44 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH //IoFilter methods that are processed by threads on the pool - public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception + public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.OPENED, null)); + nextFilter.sessionOpened(session); } - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception + public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.CLOSED, null)); + nextFilter.sessionClosed(session); } - public void sessionIdle(NextFilter nextFilter, IoSession session, - IdleStatus status) throws Exception + public void sessionIdle(final NextFilter nextFilter, final IoSession session, + final IdleStatus status) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.IDLE, status)); + nextFilter.sessionIdle(session, status); } - public void exceptionCaught(NextFilter nextFilter, IoSession session, - Throwable cause) throws Exception + public void exceptionCaught(final NextFilter nextFilter, final IoSession session, + final Throwable cause) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause)); + nextFilter.exceptionCaught(session,cause); } - public void messageReceived(NextFilter nextFilter, IoSession session, - Object message) throws Exception + public void messageReceived(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception { - //ByteBufferUtil.acquireIfPossible( message ); - fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message)); + nextFilter.messageReceived(session,message); } - public void messageSent(NextFilter nextFilter, IoSession session, - Object message) throws Exception + public void messageSent(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception { - //ByteBufferUtil.acquireIfPossible( message ); - fireEvent(session, new Event(nextFilter, EventType.SENT, message)); + nextFilter.messageSent(session, message); } - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + public void filterWrite(final NextFilter nextFilter, final IoSession session, + final WriteRequest writeRequest) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest)); + nextFilter.filterWrite(session, writeRequest); } //IoFilter methods that are processed on current thread (NOT on pooled thread) @@ -188,5 +191,51 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH // when the reference count gets to zero we release the executor service _poolReference.releaseExecutorService(); } + + public static class AsynchReadPoolingFilter extends PoolingFilter + { + + public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + super(refCountingPool, name); + } + + public void messageReceived(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception + { + fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); + } + + + } + + public static class AsynchWritePoolingFilter extends PoolingFilter + { + + public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + super(refCountingPool, name); + } + + + public void filterWrite(final NextFilter nextFilter, final IoSession session, + final WriteRequest writeRequest) throws Exception + { + fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); + } + + } + + public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + { + return new AsynchReadPoolingFilter(refCountingPool,name); + } + + + public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + { + return new AsynchWritePoolingFilter(refCountingPool,name); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java index d4dbf1309a..c2f7f7ac48 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -26,15 +26,38 @@ import org.apache.mina.common.ThreadModel; public class ReadWriteThreadModel implements ThreadModel { + + private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); + + private final PoolingFilter _asynchronousReadFilter; + private final PoolingFilter _asynchronousWriteFilter; + + private ReadWriteThreadModel() + { + final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); + _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); + _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); + } + + public PoolingFilter getAsynchronousReadFilter() + { + return _asynchronousReadFilter; + } + + public PoolingFilter getAsynchronousWriteFilter() + { + return _asynchronousWriteFilter; + } + public void buildFilterChain(IoFilterChain chain) throws Exception { - ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS, - "AsynchronousReadFilter"); - PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS, - "AsynchronousWriteFilter"); - - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); - chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); + + chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); + chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); + } + + public static ReadWriteThreadModel getInstance() + { + return _instance; } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index 8ea0d6ef1a..523a24f278 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.framing.AMQShortString; + import java.util.Map; import java.util.HashMap; @@ -27,14 +29,14 @@ public final class AMQConstant { private int _code; - private String _name; + private AMQShortString _name; private static Map _codeMap = new HashMap(); private AMQConstant(int code, String name, boolean map) { _code = code; - _name = name; + _name = new AMQShortString(name); if (map) { _codeMap.put(new Integer(code), this); @@ -51,7 +53,7 @@ public final class AMQConstant return _code; } - public String getName() + public AMQShortString getName() { return _name; } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java new file mode 100644 index 0000000000..a2d3de2f9e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.VersionSpecificRegistry; + +public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware +{ + public VersionSpecificRegistry getRegistry(); +} diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java new file mode 100644 index 0000000000..cbd8b900f3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +public interface ProtocolVersionAware +{ + public byte getProtocolMinorVersion(); + + public byte getProtocolMajorVersion(); + + public boolean isProtocolVersionEqual(byte majorVersion, byte minorVersion); +} diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index b6a0bd500a..11e6652bd7 100644 --- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -23,6 +23,7 @@ package org.apache.qpid.url; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import java.util.HashMap; import java.net.URI; @@ -31,10 +32,10 @@ import java.net.URISyntaxException; public class AMQBindingURL implements BindingURL { String _url; - String _exchangeClass; - String _exchangeName; - String _destinationName; - String _queueName; + AMQShortString _exchangeClass; + AMQShortString _exchangeName; + AMQShortString _destinationName; + AMQShortString _queueName; private HashMap _options; @@ -84,7 +85,7 @@ public class AMQBindingURL implements BindingURL if (connection.getPath() == null || connection.getPath().equals("")) { - URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), "Destination or Queue requried", _url); } else @@ -92,7 +93,7 @@ public class AMQBindingURL implements BindingURL int slash = connection.getPath().indexOf("/", 1); if (slash == -1) { - URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), "Destination requried", _url); } else @@ -121,6 +122,26 @@ public class AMQBindingURL implements BindingURL } } + private void setExchangeClass(String exchangeClass) + { + setExchangeClass(new AMQShortString(exchangeClass)); + } + + private void setQueueName(String name) + { + setQueueName(new AMQShortString(name)); + } + + private void setDestinationName(String name) + { + setDestinationName(new AMQShortString(name)); + } + + private void setExchangeName(String exchangeName) + { + setExchangeName(new AMQShortString(exchangeName)); + } + private void processOptions() { //this is where we would parse any options that needed more than just storage. @@ -131,22 +152,22 @@ public class AMQBindingURL implements BindingURL return _url; } - public String getExchangeClass() + public AMQShortString getExchangeClass() { return _exchangeClass; } - public void setExchangeClass(String exchangeClass) + public void setExchangeClass(AMQShortString exchangeClass) { _exchangeClass = exchangeClass; } - public String getExchangeName() + public AMQShortString getExchangeName() { return _exchangeName; } - public void setExchangeName(String name) + public void setExchangeName(AMQShortString name) { _exchangeName = name; @@ -156,17 +177,17 @@ public class AMQBindingURL implements BindingURL } } - public String getDestinationName() + public AMQShortString getDestinationName() { return _destinationName; } - public void setDestinationName(String name) + public void setDestinationName(AMQShortString name) { _destinationName = name; } - public String getQueueName() + public AMQShortString getQueueName() { if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) { @@ -174,7 +195,7 @@ public class AMQBindingURL implements BindingURL { if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) { - return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION)); } else { @@ -192,7 +213,7 @@ public class AMQBindingURL implements BindingURL } } - public void setQueueName(String name) + public void setQueueName(AMQShortString name) { _queueName = name; } @@ -212,7 +233,7 @@ public class AMQBindingURL implements BindingURL return _options.containsKey(key); } - public String getRoutingKey() + public AMQShortString getRoutingKey() { if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { @@ -221,15 +242,15 @@ public class AMQBindingURL implements BindingURL if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return getOption(OPTION_ROUTING_KEY); + return new AMQShortString(getOption(OPTION_ROUTING_KEY)); } return getDestinationName(); } - public void setRoutingKey(String key) + public void setRoutingKey(AMQShortString key) { - setOption(OPTION_ROUTING_KEY, key); + setOption(OPTION_ROUTING_KEY, key.toString()); } diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 76690b3230..86a8420d30 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.url; +import org.apache.qpid.framing.AMQShortString; + /* Binding URL format: :///[]/[]?