diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-14 20:02:03 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-14 20:02:03 +0000 |
| commit | a22f3f594d6eee7d610fb4f140e18cddd7c880f6 (patch) | |
| tree | 5adb376ed217d2debaff1c0bdd59af1a1c93e829 /java/common/src | |
| parent | 9cb1922884c5b258c961046e6fd48e5152aa79d5 (diff) | |
| download | qpid-python-a22f3f594d6eee7d610fb4f140e18cddd7c880f6.tar.gz | |
First backmerge from trunk to 0-9 branch for Java. Not all java tests passing yet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
46 files changed, 2259 insertions, 1955 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 996ac23b09..503f92b15d 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -22,6 +22,7 @@ package org.apache.qpid; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; public class AMQChannelException extends AMQException { @@ -51,6 +52,6 @@ public class AMQChannelException extends AMQException public AMQMethodBody getCloseMethodBody() { - return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage()); + return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 93754dbee9..c4772624e9 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -21,6 +21,7 @@ package org.apache.qpid; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; public class AMQConnectionException extends AMQException @@ -30,6 +31,7 @@ public class AMQConnectionException extends AMQException /* AMQP version for which exception ocurred */ private final byte major; private final byte minor; + boolean _closeConnetion; public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { @@ -51,7 +53,7 @@ public class AMQConnectionException extends AMQException public ConnectionCloseBody getCloseMethodBody() { - return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage()); + return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); } public int getClassId() @@ -62,5 +64,4 @@ public class AMQConnectionException extends AMQException public int getMethodId(){ return _methodId; } - } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index 0979598cdb..2406312846 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid; public class AMQConnectionFailureException extends AMQException diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java new file mode 100644 index 0000000000..6af681f479 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +public class AMQTimeoutException extends AMQException +{ + public AMQTimeoutException(String message) + { + super(message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java index 70f333a580..81deb91cdc 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid; public class AMQUnknownExchangeType extends AMQException diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 56219755a3..d8aa9bf5ca 100644 --- a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -20,20 +20,22 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + public enum AMQPFilterTypes { JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), AUTO_CLOSE("x-filter-auto-close"); - private final String _value; + private final AMQShortString _value; AMQPFilterTypes(String value) { - _value = value; + _value = new AMQShortString(value); } - public String getValue() + public AMQShortString getValue() { return _value; } diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index dca8075f7f..899261ef25 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -20,21 +20,28 @@ */ package org.apache.qpid.exchange; +import org.apache.qpid.framing.AMQShortString; + public class ExchangeDefaults { - public final static String TOPIC_EXCHANGE_NAME = "amq.topic"; + public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic"); + + public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic"); + + public final static AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct"); + + public final static AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct"); - public final static String TOPIC_EXCHANGE_CLASS = "topic"; + public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match"); - public final static String DIRECT_EXCHANGE_NAME = "amq.direct"; + public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers"); - public final static String DIRECT_EXCHANGE_CLASS = "direct"; + public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout"); - public final static String HEADERS_EXCHANGE_NAME = "amq.match"; + public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); - public final static String HEADERS_EXCHANGE_CLASS = "headers"; - public final static String FANOUT_EXCHANGE_NAME = "amq.fanout"; + public final static AMQShortString SYSTEM_MANAGEMENT_EXCHANGE_NAME = new AMQShortString("qpid.sysmgmt"); - public final static String FANOUT_EXCHANGE_CLASS = "fanout"; + public final static AMQShortString SYSTEM_MANAGEMENT_CLASS = new AMQShortString("sysmmgmt"); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 36287d2923..ebeea8d2b4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public abstract class AMQBody { - protected abstract byte getFrameType(); + public abstract byte getFrameType(); /** * Get the size of the body diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 62699331e9..5e566a5fe8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -24,93 +24,84 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import java.util.HashMap; import java.util.Map; public class AMQDataBlockDecoder { - Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); + private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY"; - private final Map _supportedBodies = new HashMap(); + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + + static + { + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); public AMQDataBlockDecoder() { - _supportedBodies.put(new Byte(AMQRequestBody.TYPE), new BodyFactory() { - public AMQBody createBody(ByteBuffer in) { - return new AMQRequestBody(); - } - }); - _supportedBodies.put(new Byte(AMQResponseBody.TYPE), new BodyFactory() { - public AMQBody createBody(ByteBuffer in) { - return new AMQResponseBody(); - } - }); - _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); } public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException { - // type, channel, body size and end byte - if (in.remaining() < (1 + 2 + 4 + 1)) + final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) { return false; } - - final byte type = in.get(); - final int channel = in.getUnsignedShort(); + in.skip(1 + 2); final long bodySize = in.getUnsignedInt(); - // bodySize can be zero - if (type <= 0 || channel < 0 || bodySize < 0) + return (remainingAfterAttributes >= bodySize); + } + + + protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + throws AMQFrameDecodingException, AMQProtocolVersionException + { + final byte type = in.get(); + BodyFactory bodyFactory; + if (type == AMQRequestBody.TYPE) { - throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + - " bodySize = " + bodySize); + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQRequestBodyFactory(protocolSession); } - - if (in.remaining() < (bodySize + 1)) + else if (type == AMQResponseBody.TYPE) { - return false; + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQResponseBodyFactory(protocolSession); } - return true; - } - - private boolean isSupportedFrameType(byte frameType) - { - final boolean result = _supportedBodies.containsKey(new Byte(frameType)); - - if (!result) + else { - _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType); + bodyFactory = _bodiesSupported[type]; } - return result; - } - - protected Object createAndPopulateFrame(ByteBuffer in) - throws AMQFrameDecodingException, AMQProtocolVersionException - { - final byte type = in.get(); - if (!isSupportedFrameType(type)) + if(bodyFactory == null) { throw new AMQFrameDecodingException("Unsupported frame type: " + type); } + final int channel = in.getUnsignedShort(); final long bodySize = in.getUnsignedInt(); - BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type)); - if (bodyFactory == null) + // bodySize can be zero + if (channel < 0 || bodySize < 0) { - throw new AMQFrameDecodingException("Unsupported body type: " + type); + throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); } - AMQFrame frame = new AMQFrame(); - frame.populateFromBuffer(in, channel, bodySize, bodyFactory); + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); byte marker = in.get(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type); + throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type); } return frame; } @@ -118,6 +109,6 @@ public class AMQDataBlockDecoder public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(in)); + out.write(createAndPopulateFrame(session, in)); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 3446563d35..478cdeb406 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -28,17 +28,16 @@ import org.apache.mina.filter.codec.demux.MessageEncoder; import java.util.HashSet; import java.util.Set; +import java.util.Collections; -public class AMQDataBlockEncoder implements MessageEncoder +public final class AMQDataBlockEncoder implements MessageEncoder { - Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); + private static final Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); - private Set _messageTypes; + private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class); public AMQDataBlockEncoder() { - _messageTypes = new HashSet(); - _messageTypes.add(EncodableAMQDataBlock.class); } public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 6af691fbe8..11f505fd4b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -24,53 +24,52 @@ import org.apache.mina.common.ByteBuffer; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { - public int channel; + private final int _channel; - public AMQBody bodyFrame; + private final AMQBody _bodyFrame; - public AMQFrame() + + + public AMQFrame(final int channel, final AMQBody bodyFrame) { + _channel = channel; + _bodyFrame = bodyFrame; } - public AMQFrame(int channel, AMQBody bodyFrame) + public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException { - this.channel = channel; - this.bodyFrame = bodyFrame; + this._channel = channel; + this._bodyFrame = bodyFactory.createBody(in,bodySize); } public long getSize() { - return 1 + 2 + 4 + bodyFrame.getSize() + 1; + return 1 + 2 + 4 + _bodyFrame.getSize() + 1; } public void writePayload(ByteBuffer buffer) { - buffer.put(bodyFrame.getFrameType()); - // TODO: how does channel get populated - EncodingUtils.writeUnsignedShort(buffer, channel); - EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize()); - bodyFrame.writePayload(buffer); + buffer.put(_bodyFrame.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, _channel); + EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); + _bodyFrame.writePayload(buffer); buffer.put((byte) 0xCE); } - /** - * - * @param buffer - * @param channel unsigned short - * @param bodySize unsigned integer - * @param bodyFactory - * @throws AMQFrameDecodingException - */ - public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory) - throws AMQFrameDecodingException, AMQProtocolVersionException + public final int getChannel() { - this.channel = channel; - bodyFrame = bodyFactory.createBody(buffer); - bodyFrame.populateFromBuffer(buffer, bodySize); + return _channel; } + public final AMQBody getBodyFrame() + { + return _bodyFrame; + } + + + public String toString() { - return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame); + return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index e140a9b334..ccd1ef32b8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -57,7 +57,7 @@ public abstract class AMQMethodBody extends AMQBody protected abstract void writeMethodPayload(ByteBuffer buffer); - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -110,10 +110,9 @@ public abstract class AMQMethodBody extends AMQBody return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); } - - public AMQConnectionException getConnectionException(int code, String message, Throwable cause) { return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 3dcbe926fe..6067e2fce5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -22,30 +22,21 @@ package org.apache.qpid.framing; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQMethodBodyFactory implements BodyFactory { private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private final AMQVersionAwareProtocolSession _protocolSession; - private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory(); - - public static AMQMethodBodyFactory getInstance() - { - return _instance; - } - - private AMQMethodBodyFactory() + public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession) { - _log.debug("Creating method body factory"); + _protocolSession = protocolSession; } - public AMQMethodBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQMethodBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML - // segments generated together are now handled by MainRegistry. The Cluster class, - // if generated together with amqp.xml is a part of MainRegistry. - // TODO: Connect with version acquired from ProtocolInitiation class. - return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), - (byte)0, (byte)9); + return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java new file mode 100644 index 0000000000..5309a80bdb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -0,0 +1,30 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public abstract interface AMQMethodBodyInstanceFactory
+{
+ public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index 6df7dff27b..3bc16601b6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -21,6 +21,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQRequestBody extends AMQBody { @@ -30,16 +31,21 @@ public class AMQRequestBody extends AMQBody protected long requestId; protected long responseMark; protected AMQMethodBody methodPayload; - + protected AMQVersionAwareProtocolSession protocolSession; // Constructor - public AMQRequestBody() {} + public AMQRequestBody(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + public AMQRequestBody(long requestId, long responseMark, AMQMethodBody methodPayload) { this.requestId = requestId; this.responseMark = responseMark; this.methodPayload = methodPayload; + protocolSession = null; } @@ -49,7 +55,7 @@ public class AMQRequestBody extends AMQBody public AMQMethodBody getMethodPayload() { return methodPayload; } - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -68,14 +74,19 @@ public class AMQRequestBody extends AMQBody } protected void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { + if (protocolSession == null) + { + throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor."); + } + requestId = EncodingUtils.readLong(buffer); responseMark = EncodingUtils.readLong(buffer); int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away - AMQMethodBodyFactory factory = AMQMethodBodyFactory.getInstance(); - methodPayload = factory.createBody(buffer); - methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); + + AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession); + methodPayload = methodBodyFactory.createBody(buffer, size); } public String toString() @@ -89,9 +100,6 @@ public class AMQRequestBody extends AMQBody { AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark, methodPayload); - AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = requestFrame; - return frame; + return new AMQFrame(channelId, requestFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java new file mode 100644 index 0000000000..9d47ccd68e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +public class AMQRequestBodyFactory implements BodyFactory +{ + private final AMQVersionAwareProtocolSession protocolSession; + + public AMQRequestBodyFactory(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + AMQRequestBody rb = new AMQRequestBody(protocolSession); + rb.populateFromBuffer(in, bodySize); + return rb; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index c7aee6ac92..7b35aaeb86 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -21,6 +21,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQResponseBody extends AMQBody { @@ -31,9 +32,14 @@ public class AMQResponseBody extends AMQBody protected long requestId; protected int batchOffset; protected AMQMethodBody methodPayload; + protected AMQVersionAwareProtocolSession protocolSession; // Constructor - public AMQResponseBody() {} + public AMQResponseBody(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + public AMQResponseBody(long responseId, long requestId, int batchOffset, AMQMethodBody methodPayload) { @@ -41,6 +47,7 @@ public class AMQResponseBody extends AMQBody this.requestId = requestId; this.batchOffset = batchOffset; this.methodPayload = methodPayload; + protocolSession = null; } // Field methods @@ -49,7 +56,7 @@ public class AMQResponseBody extends AMQBody public int getBatchOffset() { return batchOffset; } public AMQMethodBody getMethodPayload() { return methodPayload; } - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -69,15 +76,18 @@ public class AMQResponseBody extends AMQBody } protected void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { + if (protocolSession == null) + throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor."); + responseId = EncodingUtils.readLong(buffer); requestId = EncodingUtils.readLong(buffer); // XXX batchOffset = EncodingUtils.readInteger(buffer); - AMQMethodBodyFactory factory = AMQMethodBodyFactory.getInstance(); - methodPayload = factory.createBody(buffer); - methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); + + AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession); + methodPayload = methodBodyFactory.createBody(buffer, size); } public String toString() @@ -91,9 +101,6 @@ public class AMQResponseBody extends AMQBody { AMQResponseBody responseFrame = new AMQResponseBody(responseId, requestId, batchOffset, methodPayload); - AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = responseFrame; - return frame; + return new AMQFrame(channelId, responseFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java new file mode 100644 index 0000000000..4209aad11f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +public class AMQResponseBodyFactory implements BodyFactory +{ + private final AMQVersionAwareProtocolSession protocolSession; + + public AMQResponseBodyFactory(AMQVersionAwareProtocolSession protocolSession) + { + this.protocolSession = protocolSession; + } + + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + AMQResponseBody rb = new AMQResponseBody(protocolSession); + rb.populateFromBuffer(in, bodySize); + return rb; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java new file mode 100644 index 0000000000..f98a62751c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -0,0 +1,374 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+
+/**
+ * A short string is a representation of an AMQ Short String
+ * Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
+ * and thus can be held more effectively in a byte buffer.
+ *
+ */
+public final class AMQShortString implements CharSequence
+{
+ private static final Logger _logger = Logger.getLogger(AMQShortString.class);
+
+ private final ByteBuffer _data;
+ private int _hashCode;
+ private static final char[] EMPTY_CHAR_ARRAY = new char[0];
+
+ public AMQShortString(byte[] data)
+ {
+
+ _data = ByteBuffer.wrap(data);
+ }
+
+
+ public AMQShortString(String data)
+ {
+ this(data == null ? EMPTY_CHAR_ARRAY : data.toCharArray());
+ if(data != null) _hashCode = data.hashCode();
+ }
+
+ public AMQShortString(char[] data)
+ {
+ if(data == null)
+ {
+ throw new NullPointerException("Cannot create AMQShortString with null char[]");
+ }
+ final int length = data.length;
+ final byte[] stringBytes = new byte[length];
+ for(int i = 0; i < length; i++)
+ {
+ stringBytes[i] = (byte) (0xFF & data[i]);
+ }
+
+ _data = ByteBuffer.wrap(stringBytes);
+ _data.rewind();
+
+ }
+
+ public AMQShortString(CharSequence charSequence)
+ {
+ final int length = charSequence.length();
+ final byte[] stringBytes = new byte[length];
+ int hash = 0;
+ for(int i = 0 ; i < length; i++)
+ {
+ stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i)));
+ hash = (31 * hash) + stringBytes[i];
+
+ }
+ _data = ByteBuffer.wrap(stringBytes);
+ _data.rewind();
+ _hashCode = hash;
+
+ }
+
+ private AMQShortString(ByteBuffer data)
+ {
+ _data = data;
+
+ }
+
+
+ /**
+ * Get the length of the short string
+ * @return length of the underlying byte array
+ */
+ public int length()
+ {
+ return _data.limit();
+ }
+
+ public char charAt(int index)
+ {
+
+ return (char) _data.get(index);
+
+ }
+
+ public CharSequence subSequence(int start, int end)
+ {
+ return new CharSubSequence(start,end);
+ }
+
+ public int writeToByteArray(byte[] encoding, int pos)
+ {
+ final int size = length();
+ encoding[pos++] = (byte) length();
+ for(int i = 0; i < size; i++)
+ {
+ encoding[pos++] = _data.get(i);
+ }
+ return pos;
+ }
+
+ public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
+ {
+
+ final byte len = byteEncodedDestination[pos];
+ if(len == 0)
+ {
+ return null;
+ }
+ ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination,pos+1,len).slice();
+
+
+ return new AMQShortString(data);
+ }
+
+ public static AMQShortString readFromBuffer(ByteBuffer buffer)
+ {
+ final short length = buffer.getUnsigned();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer data = buffer.slice();
+ data.limit(length);
+ data.rewind();
+ buffer.skip(length);
+
+ return new AMQShortString(data);
+ }
+ }
+
+
+ public byte[] getBytes()
+ {
+
+ if(_data.buf().hasArray() && _data.arrayOffset() == 0)
+ {
+ return _data.array();
+ }
+ else
+ {
+ final int size = length();
+ byte[] b = new byte[size];
+ ByteBuffer buf = _data.duplicate();
+ buf.rewind();
+ buf.get(b);
+
+
+ return b;
+ }
+
+
+ }
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+
+
+ final int size = length();
+ if (size != 0)
+ {
+
+ buffer.put((byte)size);
+ if(_data.buf().hasArray())
+ {
+ buffer.put(_data.array(),_data.arrayOffset(),length());
+ }
+ else
+ {
+
+ for(int i = 0; i < size; i++)
+ {
+
+ buffer.put(_data.get(i));
+ }
+ }
+ }
+ else
+ {
+ // really writing out unsigned byte
+ buffer.put((byte) 0);
+ }
+
+ }
+
+ private final class CharSubSequence implements CharSequence
+ {
+ private final int _offset;
+ private final int _end;
+
+
+ public CharSubSequence(final int offset, final int end)
+ {
+ _offset = offset;
+ _end = end;
+ }
+
+
+ public int length()
+ {
+ return _end - _offset;
+ }
+
+ public char charAt(int index)
+ {
+ return AMQShortString.this.charAt(index + _offset);
+ }
+
+ public CharSequence subSequence(int start, int end)
+ {
+ return new CharSubSequence(start+_offset,end+_offset);
+ }
+ }
+
+
+
+ public char[] asChars()
+ {
+ final int size = length();
+ final char[] chars = new char[size];
+
+
+
+
+ for(int i = 0 ; i < size; i++)
+ {
+ chars[i] = (char) _data.get(i);
+ }
+ return chars;
+ }
+
+
+
+ public String asString()
+ {
+ return new String(asChars());
+ }
+
+ public boolean equals(Object o)
+ {
+ if(o == null)
+ {
+ return false;
+ }
+ if(o == this)
+ {
+ return true;
+ }
+ if(o instanceof AMQShortString)
+ {
+
+ final AMQShortString otherString = (AMQShortString) o;
+
+ if((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ {
+ return false;
+ }
+
+ return _data.equals(otherString._data);
+
+
+
+
+ }
+ return (o instanceof CharSequence) && equals((CharSequence)o);
+
+ }
+
+ public boolean equals(CharSequence s)
+ {
+ if(s == null)
+ {
+ return false;
+ }
+ if(s.length() != length())
+ {
+ return false;
+ }
+ for(int i = 0; i < length(); i++)
+ {
+ if(charAt(i)!= s.charAt(i))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int hash = _hashCode;
+ if(hash == 0)
+ {
+ final int size = length();
+
+
+ for(int i = 0; i < size; i++)
+ {
+ hash = (31 * hash) + _data.get(i);
+ }
+ _hashCode = hash;
+ }
+
+ return hash;
+ }
+
+ public void setDirty()
+ {
+ _hashCode = 0;
+ }
+
+ public String toString()
+ {
+ return asString();
+ }
+
+
+ public int compareTo(AMQShortString name)
+ {
+ if(name == null)
+ {
+ return 1;
+ }
+ else
+ {
+
+ if(name.length() < length())
+ {
+ return - name.compareTo(this);
+ }
+
+
+
+ for(int i = 0; i < length() ; i++)
+ {
+ final byte d = _data.get(i);
+ final byte n = name._data.get(i);
+ if(d < n) return -1;
+ if(d > n) return 1;
+ }
+
+ return length() == name.length() ? 0 : -1;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index 23c1929205..5175eace1e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -230,7 +230,7 @@ public enum AMQType {
public int getEncodingSize(Object value)
{
- return 1 + (value == null ? 0 : ((byte[]) value).length);
+ return EncodingUtils.encodedLongstrLength((byte[]) value);
}
@@ -250,12 +250,12 @@ public enum AMQType public void writeValueImpl(Object value, ByteBuffer buffer)
{
- EncodingUtils.writeBytes(buffer, (byte[]) value);
+ EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
public Object readValueFromBuffer(ByteBuffer buffer)
{
- return EncodingUtils.readBytes(buffer);
+ return EncodingUtils.readLongstr(buffer);
}
},
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index b29c23c2a2..efdd903f1f 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -1,3 +1,23 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
@@ -51,4 +71,9 @@ public class AMQTypedValue AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
+
+ public String toString()
+ {
+ return "["+getType()+": "+getValue()+"]";
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java index cf5708d993..246e91f1fb 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -21,11 +21,12 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. */ public interface BodyFactory { - AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException; + AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index c88c5d3bd3..f31fa8097e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -35,6 +35,7 @@ public class EncodingUtils public static final int SIZEOF_UNSIGNED_SHORT = 2; public static final int SIZEOF_UNSIGNED_INT = 4; + private static final boolean[] ALL_FALSE_ARRAY = new boolean[8]; public static int encodedShortStringLength(String s) { @@ -48,6 +49,120 @@ public class EncodingUtils } } + + public static int encodedShortStringLength(short s) + { + if( s == 0 ) + { + return 1 + 1; + } + + int len = 0; + if(s < 0) + { + len=1; + // sloppy - doesn't work of Integer.MIN_VALUE + s=(short)-s; + } + + if(s>9999) + { + return 1+5; + } + else if(s>999) + { + return 1+4; + } + else if(s>99) + { + return 1+3; + } + else if(s>9) + { + return 1+2; + } + else + { + return 1+1; + } + + } + + + public static int encodedShortStringLength(int i) + { + if( i == 0 ) + { + return 1 + 1; + } + + int len = 0; + if(i < 0) + { + len=1; + // sloppy - doesn't work of Integer.MIN_VALUE + i=-i; + } + + // range is now 1 - 2147483647 + if(i < Short.MAX_VALUE) + { + return len + encodedShortStringLength((short)i); + } + else if (i > 999999) + { + return len + 6 + encodedShortStringLength((short)(i/1000000)); + } + else // if (i > 99999) + { + return len + 5 + encodedShortStringLength((short)(i/100000)); + } + + } + + public static int encodedShortStringLength(long l) + { + if(l == 0) + { + return 1 + 1; + } + + int len = 0; + if(l < 0) + { + len=1; + // sloppy - doesn't work of Long.MIN_VALUE + l=-l; + } + if(l < Integer.MAX_VALUE) + { + return len + encodedShortStringLength((int) l); + } + else if(l > 9999999999L) + { + return len + 10 + encodedShortStringLength((int) (l / 10000000000L)); + } + else + { + return len + 1 + encodedShortStringLength((int) (l / 10L)); + } + + } + + + public static int encodedShortStringLength(AMQShortString s) + { + if (s == null) + { + return 1; + } + else + { + return (short) (1 + s.length()); + } + } + + public static int encodedLongStringLength(String s) { if (s == null) @@ -122,6 +237,21 @@ public class EncodingUtils } } + + public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s) + { + if (s != null) + { + + s.writeToBuffer(buffer); + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + public static void writeLongStringBytes(ByteBuffer buffer, String s) { assert s == null || s.length() <= 0xFFFE; @@ -224,6 +354,7 @@ public class EncodingUtils } } + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) { if (table != null) @@ -255,6 +386,238 @@ public class EncodingUtils buffer.put(packedValue); } + public static void writeBooleans(ByteBuffer buffer, boolean value) + { + + buffer.put(value ? (byte) 1 : (byte) 0); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + + buffer.put(packedValue); + } + + + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6, + boolean value7) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + if (value7) + { + packedValue = (byte) (packedValue | (byte)(1 << 7)); + } + + buffer.put(packedValue); + } + + + + /** * This is used for writing longstrs. * @@ -282,13 +645,27 @@ public class EncodingUtils public static boolean[] readBooleans(ByteBuffer buffer) { - byte packedValue = buffer.get(); - boolean[] result = new boolean[8]; + final byte packedValue = buffer.get(); + if(packedValue == 0) + { + return ALL_FALSE_ARRAY; + } + final boolean[] result = new boolean[8]; - for (int i = 0; i < 8; i++) + result[0] = ((packedValue & 1) != 0); + result[1] = ((packedValue & (1 << 1)) != 0); + result[2] = ((packedValue & (1 << 2)) != 0); + result[3] = ((packedValue & (1 << 3)) != 0); + if((packedValue & 0xF0) == 0) { - result[i] = ((packedValue & (1 << i)) != 0); + result[0] = ((packedValue & 1) != 0); } + result[4] = ((packedValue & (1 << 4)) != 0); + result[5] = ((packedValue & (1 << 5)) != 0); + result[6] = ((packedValue & (1 << 6)) != 0); + result[7] = ((packedValue & (1 << 7)) != 0); + + return result; } @@ -312,6 +689,12 @@ public class EncodingUtils return content; } + public static AMQShortString readAMQShortString(ByteBuffer buffer) + { + return AMQShortString.readFromBuffer(buffer); + + } + public static String readShortString(ByteBuffer buffer) { short length = buffer.getUnsigned(); @@ -363,7 +746,7 @@ public class EncodingUtils } } - public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException + public static byte[] readLongstr(ByteBuffer buffer) { long length = buffer.getUnsignedInt(); if (length == 0) @@ -468,7 +851,7 @@ public class EncodingUtils buffer.put((byte) (aBoolean ? 1 : 0)); } - public static Boolean readBoolean(ByteBuffer buffer) + public static boolean readBoolean(ByteBuffer buffer) { byte packedValue = buffer.get(); return (packedValue == 1); @@ -485,7 +868,7 @@ public class EncodingUtils buffer.put(aByte); } - public static Byte readByte(ByteBuffer buffer) + public static byte readByte(ByteBuffer buffer) { return buffer.get(); } @@ -502,7 +885,7 @@ public class EncodingUtils buffer.putShort(aShort); } - public static Short readShort(ByteBuffer buffer) + public static short readShort(ByteBuffer buffer) { return buffer.getShort(); } @@ -518,7 +901,7 @@ public class EncodingUtils buffer.putInt(aInteger); } - public static Integer readInteger(ByteBuffer buffer) + public static int readInteger(ByteBuffer buffer) { return buffer.getInt(); } @@ -534,7 +917,7 @@ public class EncodingUtils buffer.putLong(aLong); } - public static Long readLong(ByteBuffer buffer) + public static long readLong(ByteBuffer buffer) { return buffer.getLong(); } @@ -550,7 +933,7 @@ public class EncodingUtils buffer.putFloat(aFloat); } - public static Float readFloat(ByteBuffer buffer) + public static float readFloat(ByteBuffer buffer) { return buffer.getFloat(); } @@ -567,7 +950,7 @@ public class EncodingUtils buffer.putDouble(aDouble); } - public static Double readDouble(ByteBuffer buffer) + public static double readDouble(ByteBuffer buffer) { return buffer.getDouble(); } @@ -627,6 +1010,41 @@ public class EncodingUtils writeByte(buffer, (byte) character); } + public static long readLongAsShortString(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + short pos = 0; + if(length == 0) + { + return 0L; + } + byte digit = buffer.get(); + boolean isNegative; + long result = 0; + if(digit == (byte)'-') + { + isNegative = true; + pos++; + digit = buffer.get(); + } + else + { + isNegative = false; + } + result = digit - (byte)'0'; + pos++; + + while(pos < length) + { + pos++; + digit = buffer.get(); + result = (result << 3) + (result << 1); + result += digit - (byte)'0'; + } + + return result; + } + public static long readUnsignedInteger(ByteBuffer buffer) { long l = 0xFF & buffer.get(); @@ -639,4 +1057,23 @@ public class EncodingUtils return l; } + + + public static void main(String[] args) + { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.setAutoExpand(true); + + long l = (long) Integer.MAX_VALUE; + l += 1024L; + + writeUnsignedInteger(buf, l); + + buf.flip(); + + long l2 = readUnsignedInteger(buf); + + System.out.println("before: " + l); + System.out.println("after: " + l2); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 3c18683609..61bc9090a1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -31,17 +31,16 @@ public class FieldTable { private static final Logger _logger = Logger.getLogger(FieldTable.class); - private LinkedHashMap<String, AMQTypedValue> _properties; + private ByteBuffer _encodedForm; + private LinkedHashMap<AMQShortString, AMQTypedValue> _properties; + private long _encodedSize; + private static final int INITIAL_HASHMAP_CAPACITY = 16; public FieldTable() { super(); - _properties = new LinkedHashMap<String, AMQTypedValue>(); - } - - /** * Construct a new field table. * @@ -52,14 +51,105 @@ public class FieldTable public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException { this(); - setFromBuffer(buffer, length); + _encodedForm = buffer.slice(); + _encodedForm.limit((int)length); + _encodedSize = length; + buffer.skip((int)length); + } + + + + private AMQTypedValue getProperty(AMQShortString string) + { + synchronized(this) + { + if(_properties == null) + { + if(_encodedForm == null) + { + return null; + } + else + { + populateFromBuffer(); + } + } + } + + if(_properties == null) + { + return null; + } + else + { + return _properties.get(string); + } + } + + private void populateFromBuffer() + { + try + { + setFromBuffer(_encodedForm, _encodedSize); + } + catch (AMQFrameDecodingException e) + { + _logger.error("Error decoding FieldTable in deferred decoding mode ", e); + throw new IllegalArgumentException(e); + } } + private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) + { + initMapIfNecessary(); + _encodedForm = null; + if(val == null) + { + return removeKey(key); + } + AMQTypedValue oldVal = _properties.put(key,val); + if(oldVal != null) + { + _encodedSize -= oldVal.getEncodingSize(); + } + else + { + _encodedSize += EncodingUtils.encodedShortStringLength(key) + 1; + } + _encodedSize += val.getEncodingSize(); + + return oldVal; + } + + private void initMapIfNecessary() + { + synchronized(this) + { + if(_properties == null) + { + if(_encodedForm == null) + { + _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>(); + } + else + { + populateFromBuffer(); + } + } + + } + } + public Boolean getBoolean(String string) { - AMQTypedValue value = _properties.get(string); + return getBoolean(new AMQShortString(string)); + } + + public Boolean getBoolean(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.BOOLEAN)) { return (Boolean) value.getValue(); @@ -70,9 +160,15 @@ public class FieldTable } } + public Byte getByte(String string) { - AMQTypedValue value = _properties.get(string); + return getByte(new AMQShortString(string)); + } + + public Byte getByte(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.BYTE)) { return (Byte) value.getValue(); @@ -85,7 +181,12 @@ public class FieldTable public Short getShort(String string) { - AMQTypedValue value = _properties.get(string); + return getShort(new AMQShortString(string)); + } + + public Short getShort(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.SHORT)) { return (Short) value.getValue(); @@ -98,7 +199,12 @@ public class FieldTable public Integer getInteger(String string) { - AMQTypedValue value = _properties.get(string); + return getInteger(new AMQShortString(string)); + } + + public Integer getInteger(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.INT)) { return (Integer) value.getValue(); @@ -111,7 +217,12 @@ public class FieldTable public Long getLong(String string) { - AMQTypedValue value = _properties.get(string); + return getLong(new AMQShortString(string)); + } + + public Long getLong(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.LONG)) { return (Long) value.getValue(); @@ -124,7 +235,12 @@ public class FieldTable public Float getFloat(String string) { - AMQTypedValue value = _properties.get(string); + return getFloat(new AMQShortString(string)); + } + + public Float getFloat(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.FLOAT)) { return (Float) value.getValue(); @@ -137,7 +253,12 @@ public class FieldTable public Double getDouble(String string) { - AMQTypedValue value = _properties.get(string); + return getDouble(new AMQShortString(string)); + } + + public Double getDouble(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.DOUBLE)) { return (Double) value.getValue(); @@ -150,7 +271,12 @@ public class FieldTable public String getString(String string) { - AMQTypedValue value = _properties.get(string); + return getString(new AMQShortString(string)); + } + + public String getString(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if ((value != null) && ((value.getType() == AMQType.WIDE_STRING) || (value.getType() == AMQType.ASCII_STRING))) { @@ -170,7 +296,12 @@ public class FieldTable public Character getCharacter(String string) { - AMQTypedValue value = _properties.get(string); + return getCharacter(new AMQShortString(string)); + } + + public Character getCharacter(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.ASCII_CHARACTER)) { return (Character) value.getValue(); @@ -183,7 +314,12 @@ public class FieldTable public byte[] getBytes(String string) { - AMQTypedValue value = _properties.get(string); + return getBytes(new AMQShortString(string)); + } + + public byte[] getBytes(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if (value != null && (value.getType() == AMQType.BINARY)) { return (byte[]) value.getValue(); @@ -196,7 +332,12 @@ public class FieldTable public Object getObject(String string) { - AMQTypedValue value = _properties.get(string); + return getObject(new AMQShortString(string)); + } + + public Object getObject(AMQShortString string) + { + AMQTypedValue value = getProperty(string); if(value != null) { return value.getValue(); @@ -209,92 +350,163 @@ public class FieldTable } // ************ Setters - public Object setBoolean(String string, boolean b) { + return setBoolean(new AMQShortString(string), b); + } + + public Object setBoolean(AMQShortString string, boolean b) + { checkPropertyName(string); - return _properties.put(string, AMQType.BOOLEAN.asTypedValue(b)); + return setProperty(string, AMQType.BOOLEAN.asTypedValue(b)); } public Object setByte(String string, byte b) { + return setByte(new AMQShortString(string), b); + } + + public Object setByte(AMQShortString string, byte b) + { checkPropertyName(string); - return _properties.put(string, AMQType.BYTE.asTypedValue(b)); + return setProperty(string, AMQType.BYTE.asTypedValue(b)); } public Object setShort(String string, short i) { + return setShort(new AMQShortString(string), i); + } + + public Object setShort(AMQShortString string, short i) + { checkPropertyName(string); - return _properties.put(string, AMQType.SHORT.asTypedValue(i)); + return setProperty(string, AMQType.SHORT.asTypedValue(i)); } + public Object setInteger(String string, int i) { + return setInteger(new AMQShortString(string), i); + } + + public Object setInteger(AMQShortString string, int i) + { checkPropertyName(string); - return _properties.put(string, AMQType.INT.asTypedValue(i)); + return setProperty(string, AMQType.INT.asTypedValue(i)); } + public Object setLong(String string, long l) { + return setLong(new AMQShortString(string), l); + } + + public Object setLong(AMQShortString string, long l) + { checkPropertyName(string); - return _properties.put(string, AMQType.LONG.asTypedValue(l)); + return setProperty(string, AMQType.LONG.asTypedValue(l)); } - public Object setFloat(String string, float v) + + public Object setFloat(String string, float f) + { + return setFloat(new AMQShortString(string), f); + } + + public Object setFloat(AMQShortString string, float v) { checkPropertyName(string); - return _properties.put(string, AMQType.FLOAT.asTypedValue(v)); + return setProperty(string, AMQType.FLOAT.asTypedValue(v)); } - public Object setDouble(String string, double v) + public Object setDouble(String string, double d) + { + return setDouble(new AMQShortString(string), d); + } + + + public Object setDouble(AMQShortString string, double v) { checkPropertyName(string); - return _properties.put(string, AMQType.DOUBLE.asTypedValue(v)); + return setProperty(string, AMQType.DOUBLE.asTypedValue(v)); + } + + + public Object setString(String string, String s) + { + return setString(new AMQShortString(string), s); } - public Object setString(String string, String value) + public Object setAsciiString(AMQShortString string, String value) { checkPropertyName(string); if (value == null) { - return _properties.put(string, AMQType.VOID.asTypedValue(null)); + return setProperty(string, AMQType.VOID.asTypedValue(null)); } else { - //FIXME: determine string encoding and set either WIDE or ASCII string -// if () - { - return _properties.put(string, AMQType.WIDE_STRING.asTypedValue(value)); - } -// else -// { -// return _properties.put(string, AMQType.ASCII_STRING.asTypedValue(value)); -// } + return setProperty(string, AMQType.ASCII_STRING.asTypedValue(value)); } } + public Object setString(AMQShortString string, String value) + { + checkPropertyName(string); + if (value == null) + { + return setProperty(string, AMQType.VOID.asTypedValue(null)); + } + else + { + return setProperty(string, AMQType.LONG_STRING.asTypedValue(value)); + } + } + + public Object setChar(String string, char c) { + return setChar(new AMQShortString(string), c); + } + + + public Object setChar(AMQShortString string, char c) + { checkPropertyName(string); - return _properties.put(string, AMQType.ASCII_CHARACTER.asTypedValue(c)); + return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c)); } - public Object setBytes(String string, byte[] bytes) + + public Object setBytes(String string, byte[] b) + { + return setBytes(new AMQShortString(string), b); + } + + public Object setBytes(AMQShortString string, byte[] bytes) { checkPropertyName(string); - return _properties.put(string, AMQType.BINARY.asTypedValue(bytes)); + return setProperty(string, AMQType.BINARY.asTypedValue(bytes)); } public Object setBytes(String string, byte[] bytes, int start, int length) { + return setBytes(new AMQShortString(string), bytes,start,length); + } + + public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) + { checkPropertyName(string); byte[] newBytes = new byte[length]; System.arraycopy(bytes,start,newBytes,0,length); return setBytes(string, bytes); } + public Object setObject(String string, Object o) + { + return setObject(new AMQShortString(string), o); + } - public Object setObject(String string, Object object) + public Object setObject(AMQShortString string, Object object) { if (object instanceof Boolean) { @@ -343,7 +555,7 @@ public class FieldTable public boolean isNullStringValue(String name) { - AMQTypedValue value = _properties.get(name); + AMQTypedValue value = getProperty(new AMQShortString(name)); return (value != null) && (value.getType() == AMQType.VOID); } @@ -351,7 +563,12 @@ public class FieldTable public Enumeration getPropertyNames() { - return Collections.enumeration(_properties.keySet()); + return Collections.enumeration(keys()); + } + + public boolean propertyExists(AMQShortString propertyName) + { + return itemExists(propertyName); } public boolean propertyExists(String propertyName) @@ -359,25 +576,34 @@ public class FieldTable return itemExists(propertyName); } - public boolean itemExists(String string) + public boolean itemExists(AMQShortString string) { + initMapIfNecessary(); return _properties.containsKey(string); } + public boolean itemExists(String string) + { + return itemExists(new AMQShortString(string)); + } + public String toString() { + initMapIfNecessary(); +// if (_encodedForm != null) +// _encodedForm.rewind(); return _properties.toString(); } - private void checkPropertyName(String propertyName) + private void checkPropertyName(AMQShortString propertyName) { if (propertyName == null) { throw new IllegalArgumentException("Property name must not be null"); } - else if ("".equals(propertyName)) + else if (propertyName.length()==0) { throw new IllegalArgumentException("Property name must not be the empty string"); } @@ -386,7 +612,7 @@ public class FieldTable } - protected static void checkIdentiferFormat(String propertyName) + protected static void checkIdentiferFormat(AMQShortString propertyName) { // AMQP Spec: 4.2.5.5 Field Tables // Guidelines for implementers: @@ -448,20 +674,31 @@ public class FieldTable public long getEncodedSize() { + return _encodedSize; + } + + private void recalculateEncodedSize() + { + int encodedSize = 0; - for(Map.Entry<String,AMQTypedValue> e : _properties.entrySet()) + if(_properties != null) { - encodedSize += EncodingUtils.encodedShortStringLength(e.getKey()); - encodedSize++; // the byte for the encoding Type - encodedSize += e.getValue().getEncodingSize(); + for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet()) + { + encodedSize += EncodingUtils.encodedShortStringLength(e.getKey()); + encodedSize++; // the byte for the encoding Type + encodedSize += e.getValue().getEncodingSize(); + } } - return encodedSize; + _encodedSize = encodedSize; } public void addAll(FieldTable fieldTable) { + initMapIfNecessary(); _properties.putAll(fieldTable._properties); + recalculateEncodedSize(); } @@ -473,135 +710,213 @@ public class FieldTable public Object processOverElements(FieldTableElementProcessor processor) { - for(Map.Entry<String,AMQTypedValue> e : _properties.entrySet()) + initMapIfNecessary(); + if(_properties != null) { - boolean result = processor.processElement(e.getKey(), e.getValue()); - if(!result) + for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet()) { - break; + boolean result = processor.processElement(e.getKey().toString(), e.getValue()); + if(!result) + { + break; + } } } return processor.getResult(); + + } public int size() { + initMapIfNecessary(); return _properties.size(); + } public boolean isEmpty() { - return _properties.isEmpty(); + return size() ==0; } - public boolean containsKey(String key) + public boolean containsKey(AMQShortString key) { + initMapIfNecessary(); return _properties.containsKey(key); } + public boolean containsKey(String key) + { + return containsKey(new AMQShortString(key)); + } + public Set<String> keys() { - return _properties.keySet(); + initMapIfNecessary(); + Set<String> keys = new LinkedHashSet<String>(); + for(AMQShortString key : _properties.keySet()) + { + keys.add(key.toString()); + } + return keys; } - public Object get(Object key) + public Object get(AMQShortString key) { - return getObject((String)key); + return getObject(key); } - public Object put(Object key, Object value) + + public Object put(AMQShortString key, Object value) { - return setObject(key.toString(), value); + return setObject(key, value); } - + public Object remove(String key) { + + return remove(new AMQShortString(key)); + + } + + public Object remove(AMQShortString key) + { + AMQTypedValue val = removeKey(key); + return val == null ? null : val.getValue(); + + } + + + public AMQTypedValue removeKey(AMQShortString key) + { + initMapIfNecessary(); + _encodedForm = null; AMQTypedValue value = _properties.remove(key); - return value == null ? null : value.getValue(); + if(value == null) + { + return null; + } + else + { + _encodedSize -= EncodingUtils.encodedShortStringLength(key); + _encodedSize--; + _encodedSize -= value.getEncodingSize(); + return value; + } + } public void clear() { + initMapIfNecessary(); + _encodedForm = null; _properties.clear(); + _encodedSize = 0; } - public Set keySet() + public Set<AMQShortString> keySet() { + initMapIfNecessary(); return _properties.keySet(); } private void putDataInBuffer(ByteBuffer buffer) { - final Iterator<Map.Entry<String,AMQTypedValue>> it = _properties.entrySet().iterator(); + if(_encodedForm != null) + { + if (_encodedForm.remaining() == 0) + { + _encodedForm.rewind(); + } + buffer.put(_encodedForm); + } + else if(_properties != null) + { + final Iterator<Map.Entry<AMQShortString,AMQTypedValue>> it = _properties.entrySet().iterator(); - //If there are values then write out the encoded Size... could check _encodedSize != 0 - // write out the total length, which we have kept up to date as data is added + //If there are values then write out the encoded Size... could check _encodedSize != 0 + // write out the total length, which we have kept up to date as data is added - while (it.hasNext()) - { - final Map.Entry<String,AMQTypedValue> me = it.next(); - try + while (it.hasNext()) { - if (_logger.isTraceEnabled()) + final Map.Entry<AMQShortString,AMQTypedValue> me = it.next(); + try { - _logger.trace("Writing Property:" + me.getKey() + - " Type:" + me.getValue().getType() + - " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + - " Remaining:" + buffer.remaining()); - } + if (_logger.isTraceEnabled()) + { + _logger.trace("Writing Property:" + me.getKey() + + " Type:" + me.getValue().getType() + + " Value:" + me.getValue().getValue()); + _logger.trace("Buffer Position:" + buffer.position() + + " Remaining:" + buffer.remaining()); + } - //Write the actual parameter name - EncodingUtils.writeShortStringBytes(buffer, me.getKey()); - me.getValue().writeToBuffer(buffer); - } - catch (Exception e) - { - if (_logger.isTraceEnabled()) + //Write the actual parameter name + EncodingUtils.writeShortStringBytes(buffer, me.getKey()); + me.getValue().writeToBuffer(buffer); + } + catch (Exception e) { - _logger.trace("Exception thrown:" + e); - _logger.trace("Writing Property:" + me.getKey() + - " Type:" + me.getValue().getType() + - " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + - " Remaining:" + buffer.remaining()); + if (_logger.isTraceEnabled()) + { + _logger.trace("Exception thrown:" + e); + _logger.trace("Writing Property:" + me.getKey() + + " Type:" + me.getValue().getType() + + " Value:" + me.getValue().getValue()); + _logger.trace("Buffer Position:" + buffer.position() + + " Remaining:" + buffer.remaining()); + } + throw new RuntimeException(e); } - throw new RuntimeException(e); } } } - public void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException + private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException { - final boolean trace = _logger.isTraceEnabled(); - int sizeRead = 0; - while (sizeRead < length) + final boolean trace = _logger.isTraceEnabled(); + if(length > 0) { - int sizeRemaining = buffer.remaining(); - final String key = EncodingUtils.readShortString(buffer); - AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); - sizeRead += (sizeRemaining - buffer.remaining()); - if (trace) + final int expectedRemaining = buffer.remaining()-(int)length; + + _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>(INITIAL_HASHMAP_CAPACITY); + + do { - _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "' (now read " + sizeRead + " of " + length + " encoded bytes)..."); + + final AMQShortString key = EncodingUtils.readAMQShortString(buffer); + AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); + + if (trace) + { + _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "'"); + } + + + + _properties.put(key,value); + + + } + while (buffer.remaining() > expectedRemaining); - _properties.put(key,value); } + _encodedSize = length; if (trace) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7a160ef471..7246c4a1cf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -27,7 +27,21 @@ public class HeartbeatBody extends AMQBody public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); - protected byte getFrameType() + public HeartbeatBody() + { + + } + + public HeartbeatBody(ByteBuffer buffer, long size) + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + + public byte getFrameType() { return TYPE; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index 97bd3d9253..c7ada708dc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java deleted file mode 100644 index d78034cf2f..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java +++ /dev/null @@ -1,453 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQPInvalidClassException; - -import javax.jms.MessageFormatException; -import javax.jms.JMSException; -import java.util.Enumeration; - - -public class JMSPropertyFieldTable -{ - private FieldTable _fieldtable; - - public JMSPropertyFieldTable(FieldTable table) - { - _fieldtable = table; - } - - - private void checkPropertyName(String propertyName) - { - if (propertyName == null) - { - throw new IllegalArgumentException("Property name must not be null"); - } - else if ("".equals(propertyName)) - { - throw new IllegalArgumentException("Property name must not be the empty string"); - } - - checkIdentiferFormat(propertyName); - } - - protected static void checkIdentiferFormat(String propertyName) - { -// JMS requirements 3.5.1 Property Names -// Identifiers: -// - An identifier is an unlimited-length character sequence that must begin -// with a Java identifier start character; all following characters must be Java -// identifier part characters. An identifier start character is any character for -// which the method Character.isJavaIdentifierStart returns true. This includes -// '_' and '$'. An identifier part character is any character for which the -// method Character.isJavaIdentifierPart returns true. -// - Identifiers cannot be the names NULL, TRUE, or FALSE. -// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or -// ESCAPE. -// – Identifiers are either header field references or property references. The -// type of a property value in a message selector corresponds to the type -// used to set the property. If a property that does not exist in a message is -// referenced, its value is NULL. The semantics of evaluating NULL values -// in a selector are described in Section 3.8.1.2, “Null Values.” -// – The conversions that apply to the get methods for properties do not -// apply when a property is used in a message selector expression. For -// example, suppose you set a property as a string value, as in the -// following: -// myMessage.setStringProperty("NumberOfOrders", "2"); -// The following expression in a message selector would evaluate to false, -// because a string cannot be used in an arithmetic expression: -// "NumberOfOrders > 1" -// – Identifiers are case sensitive. -// – Message header field references are restricted to JMSDeliveryMode, -// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and -// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be -// null and if so are treated as a NULL value. - - if (Boolean.getBoolean("strict-jms")) - { - // JMS start character - if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); - } - - // JMS part character - int length = propertyName.length(); - for (int c = 1; c < length; c++) - { - if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); - } - } - - // JMS invalid names - if ((propertyName.equals("NULL") - || propertyName.equals("TRUE") - || propertyName.equals("FALSE") - || propertyName.equals("NOT") - || propertyName.equals("AND") - || propertyName.equals("OR") - || propertyName.equals("BETWEEN") - || propertyName.equals("LIKE") - || propertyName.equals("IN") - || propertyName.equals("IS") - || propertyName.equals("ESCAPE"))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); - } - } - - } - - // MapMessage Interface - public boolean getBoolean(String string) throws JMSException - { - Boolean b = _fieldtable.getBoolean(string); - - if (b == null) - { - if (_fieldtable.containsKey(string)) - { - Object str = _fieldtable.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } - } - - return b; - } - - public char getCharacter(String string) throws JMSException - { - Character c = _fieldtable.getCharacter(string); - - if (c == null) - { - if (_fieldtable.isNullStringValue(string)) - { - throw new NullPointerException("Cannot convert null char"); - } - else - { - throw new MessageFormatException("getChar can't use " + string + " item."); - } - } - else - { - return (char) c; - } - } - - public byte[] getBytes(String string) throws JMSException - { - byte[] bs = _fieldtable.getBytes(string); - - if (bs == null) - { - throw new MessageFormatException("getBytes can't use " + string + " item."); - } - else - { - return bs; - } - } - - public byte getByte(String string) throws JMSException - { - Byte b = _fieldtable.getByte(string); - if (b == null) - { - if (_fieldtable.containsKey(string)) - { - Object str = _fieldtable.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getByte can't use " + string + " item."); - } - else - { - return Byte.valueOf((String) str); - } - } - else - { - b = Byte.valueOf(null); - } - } - - return b; - } - - public short getShort(String string) throws JMSException - { - Short s = _fieldtable.getShort(string); - - if (s == null) - { - s = Short.valueOf(getByte(string)); - } - - return s; - } - - public int getInteger(String string) throws JMSException - { - Integer i = _fieldtable.getInteger(string); - - if (i == null) - { - i = Integer.valueOf(getShort(string)); - } - - return i; - } - - public long getLong(String string) throws JMSException - { - Long l = _fieldtable.getLong(string); - - if (l == null) - { - l = Long.valueOf(getInteger(string)); - } - - return l; - } - - public float getFloat(String string) throws JMSException - { - Float f = _fieldtable.getFloat(string); - - if (f == null) - { - if (_fieldtable.containsKey(string)) - { - Object str = _fieldtable.getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getFloat can't use " + string + " item."); - } - else - { - return Float.valueOf((String) str); - } - } - else - { - f = Float.valueOf(null); - } - - } - - return f; - } - - public double getDouble(String string) throws JMSException - { - Double d = _fieldtable.getDouble(string); - - if (d == null) - { - d = Double.valueOf(getFloat(string)); - } - - return d; - } - - public String getString(String string) throws JMSException - { - String s = _fieldtable.getString(string); - - if (s == null) - { - if (_fieldtable.containsKey(string)) - { - Object o = _fieldtable.getObject(string); - if (o instanceof byte[]) - { - throw new MessageFormatException("getObject couldn't find " + string + " item."); - } - else - { - if (o == null) - { - return null; - } - else - { - s = String.valueOf(o); - } - } - } - } - - return s; - } - - public Object getObject(String string) throws JMSException - { - return _fieldtable.getObject(string); - } - - public void setBoolean(String string, boolean b) throws JMSException - { - checkPropertyName(string); - _fieldtable.setBoolean(string, b); - } - - public void setChar(String string, char c) throws JMSException - { - checkPropertyName(string); - _fieldtable.setChar(string, c); - } - - public Object setBytes(String string, byte[] bytes) - { - return _fieldtable.setBytes(string, bytes, 0, bytes.length); - } - - public Object setBytes(String string, byte[] bytes, int start, int length) - { - return _fieldtable.setBytes(string, bytes, start, length); - } - - public void setByte(String string, byte b) throws JMSException - { - checkPropertyName(string); - _fieldtable.setByte(string, b); - } - - public void setShort(String string, short i) throws JMSException - { - checkPropertyName(string); - _fieldtable.setShort(string, i); - } - - public void setInteger(String string, int i) throws JMSException - { - checkPropertyName(string); - _fieldtable.setInteger(string, i); - } - - public void setLong(String string, long l) throws JMSException - { - checkPropertyName(string); - _fieldtable.setLong(string, l); - } - - public void setFloat(String string, float v) throws JMSException - { - checkPropertyName(string); - _fieldtable.setFloat(string, v); - } - - public void setDouble(String string, double v) throws JMSException - { - checkPropertyName(string); - _fieldtable.setDouble(string, v); - } - - public void setString(String string, String string1) throws JMSException - { - checkPropertyName(string); - _fieldtable.setString(string, string1); - } - - public void setObject(String string, Object object) throws JMSException - { - checkPropertyName(string); - try - { - _fieldtable.setObject(string, object); - } - catch (AMQPInvalidClassException aice) - { - throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); - } - } - - public boolean itemExists(String string) throws JMSException - { - return _fieldtable.containsKey(string); - } - - public void setFieldTable(FieldTable headers) - { - _fieldtable = headers; - } - - public Enumeration getPropertyNames() - { - return _fieldtable.getPropertyNames(); - } - - public void clear() - { - _fieldtable.clear(); - } - - public boolean propertyExists(String propertyName) - { - return _fieldtable.propertyExists(propertyName); - } - - public Object put(Object key, Object value) - { - return _fieldtable.put(key, value); - } - - public Object remove(String propertyName) - { - return _fieldtable.remove(propertyName); - } - - public boolean isEmpty() - { - return _fieldtable.isEmpty(); - } - - public void writeToBuffer(ByteBuffer data) - { - _fieldtable.writeToBuffer(data); - } - - public Enumeration getMapNames() - { - return getPropertyNames(); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index f2d1a70cdc..697a0f4249 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -164,4 +164,14 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData protocolMajor + "." + protocolMinor + " not found in protocol version list."); } } + + public String toString() + { + StringBuffer buffer = new StringBuffer(new String(header)); + buffer.append(Integer.toHexString(protocolClass)); + buffer.append(Integer.toHexString(protocolInstance)); + buffer.append(Integer.toHexString(protocolMajor)); + buffer.append(Integer.toHexString(protocolMinor)); + return buffer.toString(); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index 836c4ad985..eab7ad0132 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -40,7 +40,7 @@ public class RequestManager * to be known. */ private boolean serverFlag; - private int connectionId; + private long connectionId; /** * Request and response frames must have a requestID and responseID which @@ -56,7 +56,7 @@ public class RequestManager private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap; - public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag) + public RequestManager(long connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; this.protocolWriter = protocolWriter; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 3148603d65..9896bde5f8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -43,7 +43,7 @@ public class ResponseManager * to be known. */ private boolean serverFlag; - private int connectionId; + private long connectionId; private int maxAccumulatedResponses = 20; // Default // private Class currentResponseMethodBodyClass; @@ -80,11 +80,18 @@ public class ResponseManager { return (int)(requestId - o.requestId); } + + public String toString() + { + return requestId + ":" + (responseMethodBody == null ? + "null" : + "C" + responseMethodBody.getClazz() + " M" + responseMethodBody.getMethod()); + } } private ConcurrentHashMap<Long, ResponseStatus> responseMap; - public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener, + public ResponseManager(long connectionId, int channel, AMQMethodListener methodListener, AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java new file mode 100644 index 0000000000..0dd1bdc102 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -0,0 +1,97 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private ByteBuffer _encodedBlock;
+
+ private AMQDataBlock _block;
+
+ public SmallCompositeAMQDataBlock(AMQDataBlock block)
+ {
+ _block = block;
+ }
+
+ /**
+ * The encoded block will be logically first before the AMQDataBlocks which are encoded
+ * into the buffer afterwards.
+ * @param encodedBlock already-encoded data
+ * @param block a block to be encoded.
+ */
+ public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+ {
+ this(block);
+ _encodedBlock = encodedBlock;
+ }
+
+ public AMQDataBlock getBlock()
+ {
+ return _block;
+ }
+
+ public ByteBuffer getEncodedBlock()
+ {
+ return _encodedBlock;
+ }
+
+ public long getSize()
+ {
+ long frameSize = _block.getSize();
+
+ if (_encodedBlock != null)
+ {
+ _encodedBlock.rewind();
+ frameSize += _encodedBlock.remaining();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (_encodedBlock != null)
+ {
+ buffer.put(_encodedBlock);
+ }
+ _block.writePayload(buffer);
+
+ }
+
+ public String toString()
+ {
+ if (_block == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{encodedBlock=").append(_encodedBlock);
+
+ buf.append(" _block=[").append(_block.toString()).append("]");
+
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java new file mode 100644 index 0000000000..9bc8232d61 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -0,0 +1,141 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+public class VersionSpecificRegistry
+{
+ private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class);
+
+
+ private final byte _protocolMajorVersion;
+ private final byte _protocolMinorVersion;
+
+ private static final int DEFAULT_MAX_CLASS_ID = 200;
+ private static final int DEFAULT_MAX_METHOD_ID = 50;
+
+ private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+
+ public VersionSpecificRegistry(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID)
+ {
+ try
+ {
+ return _registry[classID][methodID];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory)
+ {
+ if(_registry.length <= classID)
+ {
+ AMQMethodBodyInstanceFactory[][] oldRegistry = _registry;
+ _registry = new AMQMethodBodyInstanceFactory[classID+1][];
+ System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length);
+ }
+
+ if(_registry[classID] == null)
+ {
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1];
+ }
+ else if(_registry[classID].length <= methodID)
+ {
+ AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID];
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1];
+ System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length);
+ }
+
+ _registry[classID][methodID] = instanceFactory;
+
+ }
+
+
+ public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size)
+ throws AMQFrameDecodingException
+ {
+ AMQMethodBodyInstanceFactory bodyFactory;
+ try
+ {
+ bodyFactory = _registry[classID][methodID];
+ }
+ catch(NullPointerException e)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+ catch(IndexOutOfBoundsException e)
+ {
+ if(classID >= _registry.length)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ else
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ }
+
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+
+
+ return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java index 7364b9293a..43ff8f6a19 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -25,90 +25,66 @@ import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoSession; import org.apache.mina.common.IdleStatus; -/** - * Represents an operation on IoFilter. - */ -enum EventType -{ - OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION -} -class Event +abstract public class Event { - private static final Logger _log = Logger.getLogger(Event.class); - - private final EventType type; - private final IoFilter.NextFilter nextFilter; - private final Object data; - - public Event(IoFilter.NextFilter nextFilter, EventType type, Object data) - { - this.type = type; - this.nextFilter = nextFilter; - this.data = data; - if (type == EventType.EXCEPTION) - { - _log.error("Exception event constructed: " + data, (Throwable) data); - } - } - public Object getData() + public Event() { - return data; } - public IoFilter.NextFilter getNextFilter() - { - return nextFilter; - } + abstract public void process(IoSession session); - public EventType getType() + public static final class ReceivedEvent extends Event { - return type; - } + private final Object _data; - void process(IoSession session) - { - if (_log.isDebugEnabled()) - { - _log.debug("Processing " + this); - } - if (type == EventType.RECEIVED) - { - nextFilter.messageReceived(session, data); - //ByteBufferUtil.releaseIfPossible( data ); - } - else if (type == EventType.SENT) + private final IoFilter.NextFilter _nextFilter; + + public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) { - nextFilter.messageSent(session, data); - //ByteBufferUtil.releaseIfPossible( data ); + super(); + _nextFilter = nextFilter; + _data = data; } - else if (type == EventType.EXCEPTION) + + public void process(IoSession session) { - nextFilter.exceptionCaught(session, (Throwable) data); + _nextFilter.messageReceived(session, _data); } - else if (type == EventType.IDLE) + + public IoFilter.NextFilter getNextFilter() { - nextFilter.sessionIdle(session, (IdleStatus) data); + return _nextFilter; } - else if (type == EventType.OPENED) + } + + + public static final class WriteEvent extends Event + { + private final IoFilter.WriteRequest _data; + private final IoFilter.NextFilter _nextFilter; + + public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) { - nextFilter.sessionOpened(session); + super(); + _nextFilter = nextFilter; + _data = data; } - else if (type == EventType.WRITE) + + + public void process(IoSession session) { - nextFilter.filterWrite(session, (IoFilter.WriteRequest) data); + _nextFilter.filterWrite(session, _data); } - else if (type == EventType.CLOSED) + + public IoFilter.NextFilter getNextFilter() { - nextFilter.sessionClosed(session); + return _nextFilter; } } - public String toString() - { - return "Event: type " + type + ", data: " + data; - } + } diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index b9673cd48f..9b3bcfa008 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger; * Holds events for a session that will be processed asynchronously by * the thread pool in PoolingFilter. */ -class Job implements Runnable +public class Job implements Runnable { private final int _maxEvents; private final IoSession _session; private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); private final AtomicBoolean _active = new AtomicBoolean(); - private final AtomicInteger _refCount = new AtomicInteger(); + //private final AtomicInteger _refCount = new AtomicInteger(); private final JobCompletionHandler _completionHandler; Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) @@ -45,21 +45,21 @@ class Job implements Runnable _completionHandler = completionHandler; _maxEvents = maxEvents; } - - void acquire() - { - _refCount.incrementAndGet(); - } - - void release() - { - _refCount.decrementAndGet(); - } - - boolean isReferenced() - { - return _refCount.get() > 0; - } +// +// void acquire() +// { +// _refCount.incrementAndGet(); +// } +// +// void release() +// { +// _refCount.decrementAndGet(); +// } +// +// boolean isReferenced() +// { +// return _refCount.get() > 0; +// } void add(Event evt) { diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 38cfa68c78..9d04827246 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -25,57 +25,60 @@ import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; +import java.util.EnumSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler { private static final Logger _logger = Logger.getLogger(PoolingFilter.class); - public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED)); - public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE)); private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); private final ReferenceCountingExecutorService _poolReference; - private final Set<EventType> _asyncTypes; private final String _name; private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { _poolReference = refCountingPool; - _asyncTypes = asyncTypes; _name = name; } - private void fireEvent(IoSession session, Event event) + void fireAsynchEvent(IoSession session, Event event) { - if (_asyncTypes.contains(event.getType())) - { - Job job = getJobForSession(session); - job.acquire(); //prevents this job being removed from _jobs - job.add(event); + Job job = getJobForSession(session); + // job.acquire(); //prevents this job being removed from _jobs + job.add(event); - //Additional checks on pool to check that it hasn't shutdown. - // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) - { - _poolReference.getPool().execute(job); - } - } - else + //Additional checks on pool to check that it hasn't shutdown. + // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) { - event.process(session); + _poolReference.getPool().execute(job); } + + } + + public void createNewJobForSession(IoSession session) + { + Job job = new Job(session, this, _maxEvents); + session.setAttribute(_name, job); } private Job getJobForSession(IoSession session) { - Job job = _jobs.get(session); - return job == null ? createJobForSession(session) : job; + return (Job) session.getAttribute(_name); + +/* if(job == null) + { + System.err.println("Error in " + _name); + Thread.dumpStack(); + } + + + job = _jobs.get(session); + return job == null ? createJobForSession(session) : job;*/ } private Job createJobForSession(IoSession session) @@ -93,15 +96,16 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH //Job.JobCompletionHandler public void completed(IoSession session, Job job) { - if (job.isComplete()) - { - job.release(); - if (!job.isReferenced()) - { - _jobs.remove(session); - } - } - else +// if (job.isComplete()) +// { +// job.release(); +// if (!job.isReferenced()) +// { +// _jobs.remove(session); +// } +// } +// else + if(!job.isComplete()) { // ritchiem : 2006-12-13 Do we need to perform the additional checks here? // Can the pool be shutdown at this point? @@ -114,45 +118,44 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH //IoFilter methods that are processed by threads on the pool - public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception + public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.OPENED, null)); + nextFilter.sessionOpened(session); } - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception + public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.CLOSED, null)); + nextFilter.sessionClosed(session); } - public void sessionIdle(NextFilter nextFilter, IoSession session, - IdleStatus status) throws Exception + public void sessionIdle(final NextFilter nextFilter, final IoSession session, + final IdleStatus status) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.IDLE, status)); + nextFilter.sessionIdle(session, status); } - public void exceptionCaught(NextFilter nextFilter, IoSession session, - Throwable cause) throws Exception + public void exceptionCaught(final NextFilter nextFilter, final IoSession session, + final Throwable cause) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause)); + nextFilter.exceptionCaught(session,cause); } - public void messageReceived(NextFilter nextFilter, IoSession session, - Object message) throws Exception + public void messageReceived(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception { - //ByteBufferUtil.acquireIfPossible( message ); - fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message)); + nextFilter.messageReceived(session,message); } - public void messageSent(NextFilter nextFilter, IoSession session, - Object message) throws Exception + public void messageSent(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception { - //ByteBufferUtil.acquireIfPossible( message ); - fireEvent(session, new Event(nextFilter, EventType.SENT, message)); + nextFilter.messageSent(session, message); } - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + public void filterWrite(final NextFilter nextFilter, final IoSession session, + final WriteRequest writeRequest) throws Exception { - fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest)); + nextFilter.filterWrite(session, writeRequest); } //IoFilter methods that are processed on current thread (NOT on pooled thread) @@ -188,5 +191,51 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH // when the reference count gets to zero we release the executor service _poolReference.releaseExecutorService(); } + + public static class AsynchReadPoolingFilter extends PoolingFilter + { + + public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + super(refCountingPool, name); + } + + public void messageReceived(final NextFilter nextFilter, final IoSession session, + final Object message) throws Exception + { + fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); + } + + + } + + public static class AsynchWritePoolingFilter extends PoolingFilter + { + + public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + super(refCountingPool, name); + } + + + public void filterWrite(final NextFilter nextFilter, final IoSession session, + final WriteRequest writeRequest) throws Exception + { + fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); + } + + } + + public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + { + return new AsynchReadPoolingFilter(refCountingPool,name); + } + + + public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + { + return new AsynchWritePoolingFilter(refCountingPool,name); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java index d4dbf1309a..c2f7f7ac48 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -26,15 +26,38 @@ import org.apache.mina.common.ThreadModel; public class ReadWriteThreadModel implements ThreadModel { + + private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); + + private final PoolingFilter _asynchronousReadFilter; + private final PoolingFilter _asynchronousWriteFilter; + + private ReadWriteThreadModel() + { + final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); + _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); + _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); + } + + public PoolingFilter getAsynchronousReadFilter() + { + return _asynchronousReadFilter; + } + + public PoolingFilter getAsynchronousWriteFilter() + { + return _asynchronousWriteFilter; + } + public void buildFilterChain(IoFilterChain chain) throws Exception { - ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS, - "AsynchronousReadFilter"); - PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS, - "AsynchronousWriteFilter"); - - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); - chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); + + chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); + chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); + } + + public static ReadWriteThreadModel getInstance() + { + return _instance; } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index 8ea0d6ef1a..523a24f278 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.framing.AMQShortString; + import java.util.Map; import java.util.HashMap; @@ -27,14 +29,14 @@ public final class AMQConstant { private int _code; - private String _name; + private AMQShortString _name; private static Map _codeMap = new HashMap(); private AMQConstant(int code, String name, boolean map) { _code = code; - _name = name; + _name = new AMQShortString(name); if (map) { _codeMap.put(new Integer(code), this); @@ -51,7 +53,7 @@ public final class AMQConstant return _code; } - public String getName() + public AMQShortString getName() { return _name; } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java new file mode 100644 index 0000000000..a2d3de2f9e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -0,0 +1,29 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.VersionSpecificRegistry;
+
+public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware
+{
+ public VersionSpecificRegistry getRegistry();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java new file mode 100644 index 0000000000..cbd8b900f3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -0,0 +1,30 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolVersionAware
+{
+ public byte getProtocolMinorVersion();
+
+ public byte getProtocolMajorVersion();
+
+ public boolean isProtocolVersionEqual(byte majorVersion, byte minorVersion);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index b6a0bd500a..11e6652bd7 100644 --- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -23,6 +23,7 @@ package org.apache.qpid.url; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import java.util.HashMap; import java.net.URI; @@ -31,10 +32,10 @@ import java.net.URISyntaxException; public class AMQBindingURL implements BindingURL { String _url; - String _exchangeClass; - String _exchangeName; - String _destinationName; - String _queueName; + AMQShortString _exchangeClass; + AMQShortString _exchangeName; + AMQShortString _destinationName; + AMQShortString _queueName; private HashMap<String, String> _options; @@ -84,7 +85,7 @@ public class AMQBindingURL implements BindingURL if (connection.getPath() == null || connection.getPath().equals("")) { - URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), "Destination or Queue requried", _url); } else @@ -92,7 +93,7 @@ public class AMQBindingURL implements BindingURL int slash = connection.getPath().indexOf("/", 1); if (slash == -1) { - URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(), + URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), "Destination requried", _url); } else @@ -121,6 +122,26 @@ public class AMQBindingURL implements BindingURL } } + private void setExchangeClass(String exchangeClass) + { + setExchangeClass(new AMQShortString(exchangeClass)); + } + + private void setQueueName(String name) + { + setQueueName(new AMQShortString(name)); + } + + private void setDestinationName(String name) + { + setDestinationName(new AMQShortString(name)); + } + + private void setExchangeName(String exchangeName) + { + setExchangeName(new AMQShortString(exchangeName)); + } + private void processOptions() { //this is where we would parse any options that needed more than just storage. @@ -131,22 +152,22 @@ public class AMQBindingURL implements BindingURL return _url; } - public String getExchangeClass() + public AMQShortString getExchangeClass() { return _exchangeClass; } - public void setExchangeClass(String exchangeClass) + public void setExchangeClass(AMQShortString exchangeClass) { _exchangeClass = exchangeClass; } - public String getExchangeName() + public AMQShortString getExchangeName() { return _exchangeName; } - public void setExchangeName(String name) + public void setExchangeName(AMQShortString name) { _exchangeName = name; @@ -156,17 +177,17 @@ public class AMQBindingURL implements BindingURL } } - public String getDestinationName() + public AMQShortString getDestinationName() { return _destinationName; } - public void setDestinationName(String name) + public void setDestinationName(AMQShortString name) { _destinationName = name; } - public String getQueueName() + public AMQShortString getQueueName() { if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) { @@ -174,7 +195,7 @@ public class AMQBindingURL implements BindingURL { if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) { - return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION)); } else { @@ -192,7 +213,7 @@ public class AMQBindingURL implements BindingURL } } - public void setQueueName(String name) + public void setQueueName(AMQShortString name) { _queueName = name; } @@ -212,7 +233,7 @@ public class AMQBindingURL implements BindingURL return _options.containsKey(key); } - public String getRoutingKey() + public AMQShortString getRoutingKey() { if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) { @@ -221,15 +242,15 @@ public class AMQBindingURL implements BindingURL if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return getOption(OPTION_ROUTING_KEY); + return new AMQShortString(getOption(OPTION_ROUTING_KEY)); } return getDestinationName(); } - public void setRoutingKey(String key) + public void setRoutingKey(AMQShortString key) { - setOption(OPTION_ROUTING_KEY, key); + setOption(OPTION_ROUTING_KEY, key.toString()); } diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 76690b3230..86a8420d30 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.url; +import org.apache.qpid.framing.AMQShortString; + /* Binding URL format: <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* @@ -36,21 +38,21 @@ public interface BindingURL String getURL(); - String getExchangeClass(); + AMQShortString getExchangeClass(); - void setExchangeClass(String exchangeClass); + void setExchangeClass(AMQShortString name); - String getExchangeName(); + AMQShortString getExchangeName(); - void setExchangeName(String name); + void setExchangeName(AMQShortString name); - String getDestinationName(); + AMQShortString getDestinationName(); - void setDestinationName(String name); + void setDestinationName(AMQShortString name); - String getQueueName(); + AMQShortString getQueueName(); - void setQueueName(String name); + void setQueueName(AMQShortString name); String getOption(String key); @@ -58,9 +60,9 @@ public interface BindingURL boolean containsOption(String key); - String getRoutingKey(); + AMQShortString getRoutingKey(); - void setRoutingKey(String key); + void setRoutingKey(AMQShortString key); String toString(); } diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java new file mode 100644 index 0000000000..70a5f2dc5e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.util.concurrent; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author Apache Software Foundation + */ +public class BooleanLatch extends CountDownLatch +{ + public BooleanLatch() + { + super(1); + } + + public void signal() + { + countDown(); + } + + public void await(long nanos) throws InterruptedException + { + await(nanos, TimeUnit.NANOSECONDS); + } +} diff --git a/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java deleted file mode 100644 index 94c97ef808..0000000000 --- a/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java +++ /dev/null @@ -1,1016 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.framing; - -import junit.framework.Assert; -import junit.framework.TestCase; - -import java.util.Enumeration; - -import org.apache.log4j.Logger; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; - -public class JMSPropertyFieldTableTest extends TestCase -{ - - private static final Logger _logger = Logger.getLogger(JMSPropertyFieldTableTest.class); - - - public void setUp() - { - System.getProperties().setProperty("strict-jms", "true"); - } - - public void tearDown() - { - System.getProperties().remove("strict-jms"); - } - - /** - * Test that setting a similar named value replaces any previous value set on that name - */ - public void testReplacement() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - //Set a boolean value - table1.setBoolean("value", true); - - // reset value to an integer - table1.setInteger("value", Integer.MAX_VALUE); - - //Check boolean value is null - try - { - table1.getBoolean("value"); - } - catch (MessageFormatException mfe) - { - //normal execution - } - // ... and integer value is good - Assert.assertEquals(Integer.MAX_VALUE, table1.getInteger("value")); - } - - public void testRemoval() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - //Set a boolean value - table1.setBoolean("value", true); - - Assert.assertTrue(table1.getBoolean("value")); - - table1.remove("value"); - - //Check boolean value is null - try - { - table1.getBoolean("value"); - } - catch (MessageFormatException mfe) - { - //normal execution - } - } - - - /** - * Set a boolean and check that we can only get it back as a boolean and a string - * Check that attempting to lookup a non existent value returns null - */ - public void testBoolean() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setBoolean("value", true); - Assert.assertTrue(table1.propertyExists("value")); - - //Test Getting right value back - Assert.assertEquals(true, table1.getBoolean("value")); - - //Check we don't get anything back for other gets - try - { - table1.getByte("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getByte("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getShort("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getDouble("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getFloat("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getInteger("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getLong("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - //except value as a string - Assert.assertEquals("true", table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - // Table should now have zero length for encoding - checkEmpty(table1); - - //Looking up an invalid value will return false - Assert.assertFalse(table1.getBoolean("Rubbish")); - } - - /** - * Set a byte and check that we can only get it back as a byte and a string - * Check that attempting to lookup a non existent value returns null - */ - public void testByte() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setByte("value", Byte.MAX_VALUE); - Assert.assertTrue(table1.propertyExists("value")); - - //Tets lookups we shouldn't get anything back for other gets - //we should get right value back for this type .... - try - { - table1.getBoolean("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getDouble("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getFloat("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getShort("value")); - Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getInteger("value")); - Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getLong("value")); - Assert.assertEquals(Byte.MAX_VALUE, table1.getByte("value")); - //... and a the string value of it. - Assert.assertEquals("" + Byte.MAX_VALUE, table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - // Table should now have zero length for encoding - checkEmpty(table1); - - //Looking up an invalid value returns null - try - { - table1.getByte("Rubbish"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException mfs) - { - //normal Execution - } - - } - - - /** - * Set a short and check that we can only get it back as a short and a string - * Check that attempting to lookup a non existent value returns null - */ - public void testShort() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setShort("value", Short.MAX_VALUE); - Assert.assertTrue(table1.propertyExists("value")); - - //Tets lookups we shouldn't get anything back for other gets - //we should get right value back for this type .... - - try - { - table1.getBoolean("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getByte("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - try - { - table1.getDouble("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getFloat("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - - Assert.assertEquals(Short.MAX_VALUE, (short) table1.getLong("value")); - Assert.assertEquals(Short.MAX_VALUE, (short) table1.getInteger("value")); - Assert.assertEquals(Short.MAX_VALUE, table1.getShort("value")); - - //... and a the string value of it. - Assert.assertEquals("" + Short.MAX_VALUE, table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - // Table should now have zero length for encoding - checkEmpty(table1); - - //Looking up an invalid value returns null - try - { - table1.getShort("Rubbish"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException mfe) - { - //normal path - } - } - - - /** - * Set a double and check that we can only get it back as a double - * Check that attempting to lookup a non existent value returns null - */ - public void testDouble() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setDouble("value", Double.MAX_VALUE); - Assert.assertTrue(table1.propertyExists("value")); - - //Tets lookups we shouldn't get anything back for other gets - //we should get right value back for this type .... - try - { - table1.getBoolean("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getByte("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getShort("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getFloat("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getInteger("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getLong("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - Assert.assertEquals(Double.MAX_VALUE, table1.getDouble("value")); - //... and a the string value of it. - Assert.assertEquals("" + Double.MAX_VALUE, table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - // Table should now have zero length for encoding - checkEmpty(table1); - - //Looking up an invalid value returns null - try - { - table1.getDouble("Rubbish"); - fail("Should throw NullPointerException as float.valueOf will try sunreadJavaFormatString"); - } - catch (NullPointerException mfe) - { - //normal path - } - - } - - - /** - * Set a float and check that we can only get it back as a float - * Check that attempting to lookup a non existent value returns null - */ - public void testFloat() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setFloat("value", Float.MAX_VALUE); - Assert.assertTrue(table1.propertyExists("value")); - - //Tets lookups we shouldn't get anything back for other gets - //we should get right value back for this type .... - try - { - table1.getBoolean("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getByte("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getShort("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getInteger("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getLong("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - - Assert.assertEquals(Float.MAX_VALUE, table1.getFloat("value")); - Assert.assertEquals(Float.MAX_VALUE, (float) table1.getDouble("value")); - - //... and a the string value of it. - Assert.assertEquals("" + Float.MAX_VALUE, table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - // Table should now have zero length for encoding - checkEmpty(table1); - - //Looking up an invalid value returns null - try - { - table1.getFloat("Rubbish"); - fail("Should throw NullPointerException as float.valueOf will try sunreadJavaFormatString"); - } - catch (NullPointerException mfe) - { - //normal path - } - } - - - /** - * Set an int and check that we can only get it back as an int - * Check that attempting to lookup a non existent value returns null - */ - public void testInt() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setInteger("value", Integer.MAX_VALUE); - Assert.assertTrue(table1.propertyExists("value")); - - //Tets lookups we shouldn't get anything back for other gets - //we should get right value back for this type .... - try - { - table1.getBoolean("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getByte("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getShort("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getDouble("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getFloat("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - - Assert.assertEquals(Integer.MAX_VALUE, table1.getLong("value")); - - Assert.assertEquals(Integer.MAX_VALUE, table1.getInteger("value")); - - //... and a the string value of it. - Assert.assertEquals("" + Integer.MAX_VALUE, table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - // Table should now have zero length for encoding - checkEmpty(table1); - - //Looking up an invalid value returns null - try - { - table1.getInteger("Rubbish"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException mfe) - { - //normal path - } - } - - - /** - * Set a long and check that we can only get it back as a long - * Check that attempting to lookup a non existent value returns null - */ - public void testLong() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setLong("value", Long.MAX_VALUE); - Assert.assertTrue(table1.propertyExists("value")); - - //Tets lookups we shouldn't get anything back for other gets - //we should get right value back for this type .... - try - { - table1.getBoolean("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getByte("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getShort("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getDouble("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getFloat("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - try - { - table1.getInteger("value"); - fail("Should throw MessageFormatException"); - } - catch (MessageFormatException mfs) - { - //normal Execution - } - - - Assert.assertEquals(Long.MAX_VALUE, table1.getLong("value")); - - //... and a the string value of it. - Assert.assertEquals("" + Long.MAX_VALUE, table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - // Table should now have zero length for encoding - checkEmpty(table1); - - //Looking up an invalid value - try - { - table1.getLong("Rubbish"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException mfs) - { - //normal Execution - } - - } - - - /** - * Calls all methods that can be used to check the table is empty - * - getEncodedSize - * - isEmpty - * - length - * - * @param table to check is empty - */ - private void checkEmpty(JMSPropertyFieldTable table) - { - Assert.assertFalse(table.getPropertyNames().hasMoreElements()); - } - - - /** - * Set a String and check that we can only get it back as a String - * Check that attempting to lookup a non existent value returns null - */ - public void testString() throws JMSException - { - JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable()); - table1.setString("value", "Hello"); - Assert.assertTrue(table1.propertyExists("value")); - - //Tets lookups we shouldn't get anything back for other gets - //we should get right value back for this type .... - Assert.assertEquals(false, table1.getBoolean("value")); - - try - { - table1.getByte("value"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException nfs) - { - //normal Execution - } - try - { - table1.getShort("value"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException nfs) - { - //normal Execution - } - try - { - table1.getDouble("value"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException nfs) - { - //normal Execution - } - try - { - table1.getFloat("value"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException nfs) - { - //normal Execution - } - try - { - table1.getInteger("value"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException nfs) - { - //normal Execution - } - try - { - table1.getLong("value"); - fail("Should throw NumberFormatException"); - } - catch (NumberFormatException nfs) - { - //normal Execution - } - - Assert.assertEquals("Hello", table1.getString("value")); - - table1.remove("value"); - //but after a remove it doesn't - Assert.assertFalse(table1.propertyExists("value")); - - checkEmpty(table1); - - //Looking up an invalid value returns null - Assert.assertEquals(null, table1.getString("Rubbish")); - - //Additional Test that haven't been covered for string - table1.setObject("value", "Hello"); - //Check that it was set correctly - Assert.assertEquals("Hello", table1.getString("value")); - } - - - public void testValues() throws JMSException - { - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - table.setBoolean("bool", true); - table.setDouble("double", Double.MAX_VALUE); - table.setFloat("float", Float.MAX_VALUE); - table.setInteger("int", Integer.MAX_VALUE); - table.setLong("long", Long.MAX_VALUE); - table.setShort("short", Short.MAX_VALUE); - table.setString("string", "Hello"); - table.setString("nullstring", null); - - table.setObject("objectbool", true); - table.setObject("objectdouble", Double.MAX_VALUE); - table.setObject("objectfloat", Float.MAX_VALUE); - table.setObject("objectint", Integer.MAX_VALUE); - table.setObject("objectlong", Long.MAX_VALUE); - table.setObject("objectshort", Short.MAX_VALUE); - table.setObject("objectstring", "Hello"); - - - Assert.assertEquals(true, table.getBoolean("bool")); - - Assert.assertEquals(Double.MAX_VALUE, table.getDouble("double")); - Assert.assertEquals(Float.MAX_VALUE, table.getFloat("float")); - Assert.assertEquals(Integer.MAX_VALUE, table.getInteger("int")); - Assert.assertEquals(Long.MAX_VALUE, table.getLong("long")); - Assert.assertEquals(Short.MAX_VALUE, table.getShort("short")); - Assert.assertEquals("Hello", table.getString("string")); - Assert.assertEquals(null, table.getString("null-string")); - - Assert.assertEquals(true, table.getObject("objectbool")); - Assert.assertEquals(Double.MAX_VALUE, table.getObject("objectdouble")); - Assert.assertEquals(Float.MAX_VALUE, table.getObject("objectfloat")); - Assert.assertEquals(Integer.MAX_VALUE, table.getObject("objectint")); - Assert.assertEquals(Long.MAX_VALUE, table.getObject("objectlong")); - Assert.assertEquals(Short.MAX_VALUE, table.getObject("objectshort")); - Assert.assertEquals("Hello", table.getObject("objectstring")); - } - - /** - * Additional test checkPropertyName doesn't accept Null - */ - public void testCheckPropertyNameasNull() throws JMSException - { - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - - try - { - table.setObject(null, "String"); - fail("Null property name is not allowed"); - } - catch (IllegalArgumentException iae) - { - //normal path - } - checkEmpty(table); - } - - - /** - * Additional test checkPropertyName doesn't accept an empty String - */ - public void testCheckPropertyNameasEmptyString() throws JMSException - { - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - - try - { - table.setObject("", "String"); - fail("empty property name is not allowed"); - } - catch (IllegalArgumentException iae) - { - //normal path - } - checkEmpty(table); - } - - - /** - * Additional test checkPropertyName doesn't accept an empty String - */ - public void testCheckPropertyNamehasMaxLength() throws JMSException - { - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - - StringBuffer longPropertyName = new StringBuffer(129); - - for (int i = 0; i < 129; i++) - { - longPropertyName.append("x"); - } - - try - { - table.setObject(longPropertyName.toString(), "String"); - fail("property name must be < 128 characters"); - } - catch (IllegalArgumentException iae) - { - _logger.warn("JMS requires infinite property names AMQP limits us to 128 characters"); - } - - checkEmpty(table); - } - - - /** - * Additional test checkPropertyName starts with a letter - */ - public void testCheckPropertyNameStartCharacterIsLetter() throws JMSException - { - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - - //Try a name that starts with a number - try - { - table.setObject("1", "String"); - fail("property name must start with a letter"); - } - catch (IllegalArgumentException iae) - { - //normal path - } - - checkEmpty(table); - } - - /** - * Additional test checkPropertyName starts with a letter - */ - public void testCheckPropertyNameContainsInvalidCharacter() throws JMSException - { - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - - //Try a name that starts with a number - try - { - table.setObject("hello there", "String"); - fail("property name cannot contain spaces"); - } - catch (IllegalArgumentException iae) - { - //normal path - } - - checkEmpty(table); - } - - - /** - * Additional test checkPropertyName starts with a letter - */ - public void testCheckPropertyNameIsInvalid() throws JMSException - { - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - - //Try a name that starts with a number - try - { - table.setObject("ESCAPE", "String"); - fail("property name must not contains spaces"); - } - catch (IllegalArgumentException iae) - { - //normal path - } - - checkEmpty(table); - } - - /** - * Additional test checkPropertyName starts with a hash or a dollar - */ - public void testCheckPropertyNameStartCharacterIsHashorDollar() throws JMSException - { - _logger.warn("Test:testCheckPropertyNameStartCharacterIsHashorDollar will fail JMS compilance as # and $ are not valid in a jms identifier"); -// JMSPropertyFieldTable table = new JMSPropertyFieldTable(); -// -// //Try a name that starts with a number -// try -// { -// table.setObject("#", "String"); -// table.setObject("$", "String"); -// } -// catch (IllegalArgumentException iae) -// { -// fail("property name are allowed to start with # and $s in AMQP"); -// } - } - - /** - * Test the contents of the sets - */ - public void testSets() - { - - JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable()); - - table.put("n1", "1"); - table.put("n2", "2"); - table.put("n3", "3"); - - Enumeration enumerator = table.getPropertyNames(); - Assert.assertEquals("n1", enumerator.nextElement()); - Assert.assertEquals("n2", enumerator.nextElement()); - Assert.assertEquals("n3", enumerator.nextElement()); - Assert.assertFalse(enumerator.hasMoreElements()); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(JMSPropertyFieldTableTest.class); - } - -} diff --git a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index c259d3ee8a..6160dc1843 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -25,8 +25,6 @@ import junit.framework.TestCase; import java.util.Enumeration; import java.util.Iterator; -import java.util.Map; -import java.util.HashMap; import org.apache.mina.common.ByteBuffer; import org.apache.log4j.Logger; @@ -227,7 +225,7 @@ public class PropertyFieldTableTest extends TestCase //... and a the string value of it. Assert.assertEquals("" + Double.MAX_VALUE, table1.getString("value")); table1.remove("value"); - //but after a remove it doesn't + //but after a removeKey it doesn't Assert.assertFalse(table1.containsKey("value")); // Table should now have zero length for encoding @@ -265,7 +263,7 @@ public class PropertyFieldTableTest extends TestCase table1.remove("value"); - //but after a remove it doesn't + //but after a removeKey it doesn't Assert.assertFalse(table1.containsKey("value")); // Table should now have zero length for encoding @@ -303,7 +301,7 @@ public class PropertyFieldTableTest extends TestCase table1.remove("value"); - //but after a remove it doesn't + //but after a removeKey it doesn't Assert.assertFalse(table1.containsKey("value")); // Table should now have zero length for encoding @@ -341,7 +339,7 @@ public class PropertyFieldTableTest extends TestCase table1.remove("value"); - //but after a remove it doesn't + //but after a removeKey it doesn't Assert.assertFalse(table1.containsKey("value")); // Table should now have zero length for encoding @@ -380,7 +378,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals(null, table1.getString("value")); table1.remove("value"); - //but after a remove it doesn't + //but after a removeKey it doesn't Assert.assertFalse(table1.containsKey("value")); // Table should now have zero length for encoding @@ -440,7 +438,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertTrue(table1.containsKey("value")); table1.remove("value"); - //but after a remove it doesn't + //but after a removeKey it doesn't Assert.assertFalse(table1.containsKey("value")); checkEmpty(table1); @@ -457,23 +455,7 @@ public class PropertyFieldTableTest extends TestCase - public void testKeyEnumeration() - { - FieldTable table = new FieldTable(); - table.setLong("one", 1L); - table.setLong("two", 2L); - table.setLong("three", 3L); - table.setLong("four", 4L); - table.setLong("five", 5L); - - Enumeration e = table.getPropertyNames(); - - Assert.assertTrue("one".equals(e.nextElement())); - Assert.assertTrue("two".equals(e.nextElement())); - Assert.assertTrue("three".equals(e.nextElement())); - Assert.assertTrue("four".equals(e.nextElement())); - Assert.assertTrue("five".equals(e.nextElement())); - } + public void testValues() { @@ -546,8 +528,7 @@ public class PropertyFieldTableTest extends TestCase table.setString("string", "hello"); table.setString("null-string", null); - - final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize()); // FIXME XXX: Is cast a problem? + final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem? table.writeToBuffer(buffer); @@ -597,7 +578,7 @@ public class PropertyFieldTableTest extends TestCase byte[] _bytes = {99, 98, 97, 96, 95}; result.setBytes("bytes", _bytes); - size += 1 + EncodingUtils.encodedShortStringLength("bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length; + size += 1 + EncodingUtils.encodedShortStringLength("bytes") + 4 + _bytes.length; Assert.assertEquals(size, result.getEncodedSize()); result.setChar("char", (char) 'c'); @@ -639,7 +620,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals(size, result.getEncodedSize()); result.setObject("object-bytes", _bytes); - size += 1 + EncodingUtils.encodedShortStringLength("object-bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length; + size += 1 + EncodingUtils.encodedShortStringLength("object-bytes") + 4 + _bytes.length; Assert.assertEquals(size, result.getEncodedSize()); result.setObject("object-char", 'c'); @@ -758,7 +739,7 @@ public class PropertyFieldTableTest extends TestCase try { - table.setObject(null, "String"); + table.setObject((String)null, "String"); fail("Null property name is not allowed"); } catch (IllegalArgumentException iae) @@ -868,9 +849,9 @@ public class PropertyFieldTableTest extends TestCase { FieldTable table = new FieldTable(); - table.put("StringProperty", "String"); + table.setObject("StringProperty", "String"); - Assert.assertEquals("String", table.get("StringProperty")); + Assert.assertEquals("String", table.getString("StringProperty")); //Test Clear @@ -887,15 +868,15 @@ public class PropertyFieldTableTest extends TestCase FieldTable table = new FieldTable(); - table.put("n1", "1"); - table.put("n2", "2"); - table.put("n3", "3"); + table.setObject("n1", "1"); + table.setObject("n2", "2"); + table.setObject("n3", "3"); + + + Assert.assertEquals("1", table.getObject("n1")); + Assert.assertEquals("2", table.getObject("n2")); + Assert.assertEquals("3", table.getObject("n3")); - Iterator iterator = table.keySet().iterator(); - Assert.assertEquals("n1", iterator.next()); - Assert.assertEquals("n2", iterator.next()); - Assert.assertEquals("n3", iterator.next()); - Assert.assertFalse(iterator.hasNext()); diff --git a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java index 972a935257..6383d52298 100644 --- a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java +++ b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java @@ -36,25 +36,32 @@ public class PoolingFilterTest extends TestCase public void setUp() { + //Create Pool _executorService = ReferenceCountingExecutorService.getInstance(); _executorService.acquireExecutorService(); - _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS, + _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, "AsynchronousWriteFilter"); } public void testRejectedExecution() throws Exception { - _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message")); + + TestSession testSession = new TestSession(); + _pool.createNewJobForSession(testSession); + _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message")); //Shutdown the pool _executorService.getPool().shutdownNow(); try { + + testSession = new TestSession(); + _pool.createNewJobForSession(testSession); //prior to fix for QPID-172 this would throw RejectedExecutionException - _pool.filterWrite(null, new TestSession(), null); + _pool.filterWrite(null, testSession, null); } catch (RejectedExecutionException rje) { diff --git a/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/java/common/src/test/java/org/apache/qpid/session/TestSession.java index f10d55e9d0..aafc91b03b 100644 --- a/java/common/src/test/java/org/apache/qpid/session/TestSession.java +++ b/java/common/src/test/java/org/apache/qpid/session/TestSession.java @@ -24,9 +24,13 @@ import org.apache.mina.common.*; import java.net.SocketAddress; import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; public class TestSession implements IoSession { + private final ConcurrentMap attributes = new ConcurrentHashMap(); + public TestSession() { } @@ -68,42 +72,42 @@ public class TestSession implements IoSession public Object getAttachment() { - return null; //TODO + return getAttribute(""); } public Object setAttachment(Object attachment) { - return null; //TODO + return setAttribute("",attachment); } public Object getAttribute(String key) { - return null; //TODO + return attributes.get(key); } public Object setAttribute(String key, Object value) { - return null; //TODO + return attributes.put(key,value); } public Object setAttribute(String key) { - return null; //TODO + return attributes.put(key, Boolean.TRUE); } public Object removeAttribute(String key) { - return null; //TODO + return attributes.remove(key); } public boolean containsAttribute(String key) { - return false; //TODO + return attributes.containsKey(key); } public Set getAttributeKeys() { - return null; //TODO + return attributes.keySet(); } public TransportType getTransportType() |
