summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-14 20:02:03 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-14 20:02:03 +0000
commita22f3f594d6eee7d610fb4f140e18cddd7c880f6 (patch)
tree5adb376ed217d2debaff1c0bdd59af1a1c93e829 /java/common/src
parent9cb1922884c5b258c961046e6fd48e5152aa79d5 (diff)
downloadqpid-python-a22f3f594d6eee7d610fb4f140e18cddd7c880f6.tar.gz
First backmerge from trunk to 0-9 branch for Java. Not all java tests passing yet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionException.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java89
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java41
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java41
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java374
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQType.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java25
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java461
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java527
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java453
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java97
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java141
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Event.java98
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java34
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java157
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java59
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/BindingURL.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java42
-rw-r--r--java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java1016
-rw-r--r--java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java61
-rw-r--r--java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java13
-rw-r--r--java/common/src/test/java/org/apache/qpid/session/TestSession.java20
46 files changed, 2259 insertions, 1955 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index 996ac23b09..503f92b15d 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -22,6 +22,7 @@ package org.apache.qpid;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
public class AMQChannelException extends AMQException
{
@@ -51,6 +52,6 @@ public class AMQChannelException extends AMQException
public AMQMethodBody getCloseMethodBody()
{
- return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage());
+ return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage()));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
index 93754dbee9..c4772624e9 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -21,6 +21,7 @@
package org.apache.qpid;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
public class AMQConnectionException extends AMQException
@@ -30,6 +31,7 @@ public class AMQConnectionException extends AMQException
/* AMQP version for which exception ocurred */
private final byte major;
private final byte minor;
+ boolean _closeConnetion;
public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
{
@@ -51,7 +53,7 @@ public class AMQConnectionException extends AMQException
public ConnectionCloseBody getCloseMethodBody()
{
- return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage());
+ return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage()));
}
public int getClassId()
@@ -62,5 +64,4 @@ public class AMQConnectionException extends AMQException
public int getMethodId(){
return _methodId;
}
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
index 0979598cdb..2406312846 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid;
public class AMQConnectionFailureException extends AMQException
diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
new file mode 100644
index 0000000000..6af681f479
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid;
+
+public class AMQTimeoutException extends AMQException
+{
+ public AMQTimeoutException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
index 70f333a580..81deb91cdc 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid;
public class AMQUnknownExchangeType extends AMQException
diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
index 56219755a3..d8aa9bf5ca 100644
--- a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
+++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
@@ -20,20 +20,22 @@
*/
package org.apache.qpid.common;
+import org.apache.qpid.framing.AMQShortString;
+
public enum AMQPFilterTypes
{
JMS_SELECTOR("x-filter-jms-selector"),
NO_CONSUME("x-filter-no-consume"),
AUTO_CLOSE("x-filter-auto-close");
- private final String _value;
+ private final AMQShortString _value;
AMQPFilterTypes(String value)
{
- _value = value;
+ _value = new AMQShortString(value);
}
- public String getValue()
+ public AMQShortString getValue()
{
return _value;
}
diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
index dca8075f7f..899261ef25 100644
--- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
+++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
@@ -20,21 +20,28 @@
*/
package org.apache.qpid.exchange;
+import org.apache.qpid.framing.AMQShortString;
+
public class ExchangeDefaults
{
- public final static String TOPIC_EXCHANGE_NAME = "amq.topic";
+ public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic");
+
+ public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic");
+
+ public final static AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct");
+
+ public final static AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct");
- public final static String TOPIC_EXCHANGE_CLASS = "topic";
+ public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match");
- public final static String DIRECT_EXCHANGE_NAME = "amq.direct";
+ public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers");
- public final static String DIRECT_EXCHANGE_CLASS = "direct";
+ public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout");
- public final static String HEADERS_EXCHANGE_NAME = "amq.match";
+ public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
- public final static String HEADERS_EXCHANGE_CLASS = "headers";
- public final static String FANOUT_EXCHANGE_NAME = "amq.fanout";
+ public final static AMQShortString SYSTEM_MANAGEMENT_EXCHANGE_NAME = new AMQShortString("qpid.sysmgmt");
- public final static String FANOUT_EXCHANGE_CLASS = "fanout";
+ public final static AMQShortString SYSTEM_MANAGEMENT_CLASS = new AMQShortString("sysmmgmt");
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index 36287d2923..ebeea8d2b4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer;
public abstract class AMQBody
{
- protected abstract byte getFrameType();
+ public abstract byte getFrameType();
/**
* Get the size of the body
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 62699331e9..5e566a5fe8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -24,93 +24,84 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import java.util.HashMap;
import java.util.Map;
public class AMQDataBlockDecoder
{
- Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+ private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
- private final Map _supportedBodies = new HashMap();
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+ static
+ {
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
+ Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
public AMQDataBlockDecoder()
{
- _supportedBodies.put(new Byte(AMQRequestBody.TYPE), new BodyFactory() {
- public AMQBody createBody(ByteBuffer in) {
- return new AMQRequestBody();
- }
- });
- _supportedBodies.put(new Byte(AMQResponseBody.TYPE), new BodyFactory() {
- public AMQBody createBody(ByteBuffer in) {
- return new AMQResponseBody();
- }
- });
- _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory());
}
public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
{
- // type, channel, body size and end byte
- if (in.remaining() < (1 + 2 + 4 + 1))
+ final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
+ // type, channel, body length and end byte
+ if (remainingAfterAttributes < 0)
{
return false;
}
-
- final byte type = in.get();
- final int channel = in.getUnsignedShort();
+ in.skip(1 + 2);
final long bodySize = in.getUnsignedInt();
- // bodySize can be zero
- if (type <= 0 || channel < 0 || bodySize < 0)
+ return (remainingAfterAttributes >= bodySize);
+ }
+
+
+ protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ final byte type = in.get();
+ BodyFactory bodyFactory;
+ if (type == AMQRequestBody.TYPE)
{
- throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
- " bodySize = " + bodySize);
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQRequestBodyFactory(protocolSession);
}
-
- if (in.remaining() < (bodySize + 1))
+ else if (type == AMQResponseBody.TYPE)
{
- return false;
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQResponseBodyFactory(protocolSession);
}
- return true;
- }
-
- private boolean isSupportedFrameType(byte frameType)
- {
- final boolean result = _supportedBodies.containsKey(new Byte(frameType));
-
- if (!result)
+ else
{
- _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType);
+ bodyFactory = _bodiesSupported[type];
}
- return result;
- }
-
- protected Object createAndPopulateFrame(ByteBuffer in)
- throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- final byte type = in.get();
- if (!isSupportedFrameType(type))
+ if(bodyFactory == null)
{
throw new AMQFrameDecodingException("Unsupported frame type: " + type);
}
+
final int channel = in.getUnsignedShort();
final long bodySize = in.getUnsignedInt();
- BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type));
- if (bodyFactory == null)
+ // bodySize can be zero
+ if (channel < 0 || bodySize < 0)
{
- throw new AMQFrameDecodingException("Unsupported body type: " + type);
+ throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
+ " bodySize = " + bodySize);
}
- AMQFrame frame = new AMQFrame();
- frame.populateFromBuffer(in, channel, bodySize, bodyFactory);
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
byte marker = in.get();
if ((marker & 0xFF) != 0xCE)
{
- throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type);
+ throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type);
}
return frame;
}
@@ -118,6 +109,6 @@ public class AMQDataBlockDecoder
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
throws Exception
{
- out.write(createAndPopulateFrame(in));
+ out.write(createAndPopulateFrame(session, in));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
index 3446563d35..478cdeb406 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -28,17 +28,16 @@ import org.apache.mina.filter.codec.demux.MessageEncoder;
import java.util.HashSet;
import java.util.Set;
+import java.util.Collections;
-public class AMQDataBlockEncoder implements MessageEncoder
+public final class AMQDataBlockEncoder implements MessageEncoder
{
- Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
+ private static final Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
- private Set _messageTypes;
+ private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class);
public AMQDataBlockEncoder()
{
- _messageTypes = new HashSet();
- _messageTypes.add(EncodableAMQDataBlock.class);
}
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 6af691fbe8..11f505fd4b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -24,53 +24,52 @@ import org.apache.mina.common.ByteBuffer;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
- public int channel;
+ private final int _channel;
- public AMQBody bodyFrame;
+ private final AMQBody _bodyFrame;
- public AMQFrame()
+
+
+ public AMQFrame(final int channel, final AMQBody bodyFrame)
{
+ _channel = channel;
+ _bodyFrame = bodyFrame;
}
- public AMQFrame(int channel, AMQBody bodyFrame)
+ public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
{
- this.channel = channel;
- this.bodyFrame = bodyFrame;
+ this._channel = channel;
+ this._bodyFrame = bodyFactory.createBody(in,bodySize);
}
public long getSize()
{
- return 1 + 2 + 4 + bodyFrame.getSize() + 1;
+ return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
}
public void writePayload(ByteBuffer buffer)
{
- buffer.put(bodyFrame.getFrameType());
- // TODO: how does channel get populated
- EncodingUtils.writeUnsignedShort(buffer, channel);
- EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
- bodyFrame.writePayload(buffer);
+ buffer.put(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, _channel);
+ EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+ _bodyFrame.writePayload(buffer);
buffer.put((byte) 0xCE);
}
- /**
- *
- * @param buffer
- * @param channel unsigned short
- * @param bodySize unsigned integer
- * @param bodyFactory
- * @throws AMQFrameDecodingException
- */
- public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ public final int getChannel()
{
- this.channel = channel;
- bodyFrame = bodyFactory.createBody(buffer);
- bodyFrame.populateFromBuffer(buffer, bodySize);
+ return _channel;
}
+ public final AMQBody getBodyFrame()
+ {
+ return _bodyFrame;
+ }
+
+
+
public String toString()
{
- return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+ return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index e140a9b334..ccd1ef32b8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -57,7 +57,7 @@ public abstract class AMQMethodBody extends AMQBody
protected abstract void writeMethodPayload(ByteBuffer buffer);
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
@@ -110,10 +110,9 @@ public abstract class AMQMethodBody extends AMQBody
return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
}
-
-
public AMQConnectionException getConnectionException(int code, String message, Throwable cause)
{
return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
}
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
index 3dcbe926fe..6067e2fce5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -22,30 +22,21 @@ package org.apache.qpid.framing;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQMethodBodyFactory implements BodyFactory
{
private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+ private final AMQVersionAwareProtocolSession _protocolSession;
- private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory();
-
- public static AMQMethodBodyFactory getInstance()
- {
- return _instance;
- }
-
- private AMQMethodBodyFactory()
+ public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
{
- _log.debug("Creating method body factory");
+ _protocolSession = protocolSession;
}
- public AMQMethodBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ public AMQMethodBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML
- // segments generated together are now handled by MainRegistry. The Cluster class,
- // if generated together with amqp.xml is a part of MainRegistry.
- // TODO: Connect with version acquired from ProtocolInitiation class.
- return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(),
- (byte)0, (byte)9);
+ return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
new file mode 100644
index 0000000000..5309a80bdb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public abstract interface AMQMethodBodyInstanceFactory
+{
+ public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
index 6df7dff27b..3bc16601b6 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
@@ -21,6 +21,7 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQRequestBody extends AMQBody
{
@@ -30,16 +31,21 @@ public class AMQRequestBody extends AMQBody
protected long requestId;
protected long responseMark;
protected AMQMethodBody methodPayload;
-
+ protected AMQVersionAwareProtocolSession protocolSession;
// Constructor
- public AMQRequestBody() {}
+ public AMQRequestBody(AMQVersionAwareProtocolSession protocolSession)
+ {
+ this.protocolSession = protocolSession;
+ }
+
public AMQRequestBody(long requestId, long responseMark,
AMQMethodBody methodPayload)
{
this.requestId = requestId;
this.responseMark = responseMark;
this.methodPayload = methodPayload;
+ protocolSession = null;
}
@@ -49,7 +55,7 @@ public class AMQRequestBody extends AMQBody
public AMQMethodBody getMethodPayload() { return methodPayload; }
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
@@ -68,14 +74,19 @@ public class AMQRequestBody extends AMQBody
}
protected void populateFromBuffer(ByteBuffer buffer, long size)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ throws AMQFrameDecodingException
{
+ if (protocolSession == null)
+ {
+ throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
+ }
+
requestId = EncodingUtils.readLong(buffer);
responseMark = EncodingUtils.readLong(buffer);
int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away
- AMQMethodBodyFactory factory = AMQMethodBodyFactory.getInstance();
- methodPayload = factory.createBody(buffer);
- methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
+
+ AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+ methodPayload = methodBodyFactory.createBody(buffer, size);
}
public String toString()
@@ -89,9 +100,6 @@ public class AMQRequestBody extends AMQBody
{
AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark,
methodPayload);
- AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = requestFrame;
- return frame;
+ return new AMQFrame(channelId, requestFrame);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
new file mode 100644
index 0000000000..9d47ccd68e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+public class AMQRequestBodyFactory implements BodyFactory
+{
+ private final AMQVersionAwareProtocolSession protocolSession;
+
+ public AMQRequestBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+ {
+ this.protocolSession = protocolSession;
+ }
+
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ {
+ AMQRequestBody rb = new AMQRequestBody(protocolSession);
+ rb.populateFromBuffer(in, bodySize);
+ return rb;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
index c7aee6ac92..7b35aaeb86 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
@@ -21,6 +21,7 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQResponseBody extends AMQBody
{
@@ -31,9 +32,14 @@ public class AMQResponseBody extends AMQBody
protected long requestId;
protected int batchOffset;
protected AMQMethodBody methodPayload;
+ protected AMQVersionAwareProtocolSession protocolSession;
// Constructor
- public AMQResponseBody() {}
+ public AMQResponseBody(AMQVersionAwareProtocolSession protocolSession)
+ {
+ this.protocolSession = protocolSession;
+ }
+
public AMQResponseBody(long responseId, long requestId,
int batchOffset, AMQMethodBody methodPayload)
{
@@ -41,6 +47,7 @@ public class AMQResponseBody extends AMQBody
this.requestId = requestId;
this.batchOffset = batchOffset;
this.methodPayload = methodPayload;
+ protocolSession = null;
}
// Field methods
@@ -49,7 +56,7 @@ public class AMQResponseBody extends AMQBody
public int getBatchOffset() { return batchOffset; }
public AMQMethodBody getMethodPayload() { return methodPayload; }
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
@@ -69,15 +76,18 @@ public class AMQResponseBody extends AMQBody
}
protected void populateFromBuffer(ByteBuffer buffer, long size)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ throws AMQFrameDecodingException
{
+ if (protocolSession == null)
+ throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
+
responseId = EncodingUtils.readLong(buffer);
requestId = EncodingUtils.readLong(buffer);
// XXX
batchOffset = EncodingUtils.readInteger(buffer);
- AMQMethodBodyFactory factory = AMQMethodBodyFactory.getInstance();
- methodPayload = factory.createBody(buffer);
- methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
+
+ AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+ methodPayload = methodBodyFactory.createBody(buffer, size);
}
public String toString()
@@ -91,9 +101,6 @@ public class AMQResponseBody extends AMQBody
{
AMQResponseBody responseFrame = new AMQResponseBody(responseId,
requestId, batchOffset, methodPayload);
- AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = responseFrame;
- return frame;
+ return new AMQFrame(channelId, responseFrame);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
new file mode 100644
index 0000000000..4209aad11f
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+public class AMQResponseBodyFactory implements BodyFactory
+{
+ private final AMQVersionAwareProtocolSession protocolSession;
+
+ public AMQResponseBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+ {
+ this.protocolSession = protocolSession;
+ }
+
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ {
+ AMQResponseBody rb = new AMQResponseBody(protocolSession);
+ rb.populateFromBuffer(in, bodySize);
+ return rb;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
new file mode 100644
index 0000000000..f98a62751c
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -0,0 +1,374 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+
+/**
+ * A short string is a representation of an AMQ Short String
+ * Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
+ * and thus can be held more effectively in a byte buffer.
+ *
+ */
+public final class AMQShortString implements CharSequence
+{
+ private static final Logger _logger = Logger.getLogger(AMQShortString.class);
+
+ private final ByteBuffer _data;
+ private int _hashCode;
+ private static final char[] EMPTY_CHAR_ARRAY = new char[0];
+
+ public AMQShortString(byte[] data)
+ {
+
+ _data = ByteBuffer.wrap(data);
+ }
+
+
+ public AMQShortString(String data)
+ {
+ this(data == null ? EMPTY_CHAR_ARRAY : data.toCharArray());
+ if(data != null) _hashCode = data.hashCode();
+ }
+
+ public AMQShortString(char[] data)
+ {
+ if(data == null)
+ {
+ throw new NullPointerException("Cannot create AMQShortString with null char[]");
+ }
+ final int length = data.length;
+ final byte[] stringBytes = new byte[length];
+ for(int i = 0; i < length; i++)
+ {
+ stringBytes[i] = (byte) (0xFF & data[i]);
+ }
+
+ _data = ByteBuffer.wrap(stringBytes);
+ _data.rewind();
+
+ }
+
+ public AMQShortString(CharSequence charSequence)
+ {
+ final int length = charSequence.length();
+ final byte[] stringBytes = new byte[length];
+ int hash = 0;
+ for(int i = 0 ; i < length; i++)
+ {
+ stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i)));
+ hash = (31 * hash) + stringBytes[i];
+
+ }
+ _data = ByteBuffer.wrap(stringBytes);
+ _data.rewind();
+ _hashCode = hash;
+
+ }
+
+ private AMQShortString(ByteBuffer data)
+ {
+ _data = data;
+
+ }
+
+
+ /**
+ * Get the length of the short string
+ * @return length of the underlying byte array
+ */
+ public int length()
+ {
+ return _data.limit();
+ }
+
+ public char charAt(int index)
+ {
+
+ return (char) _data.get(index);
+
+ }
+
+ public CharSequence subSequence(int start, int end)
+ {
+ return new CharSubSequence(start,end);
+ }
+
+ public int writeToByteArray(byte[] encoding, int pos)
+ {
+ final int size = length();
+ encoding[pos++] = (byte) length();
+ for(int i = 0; i < size; i++)
+ {
+ encoding[pos++] = _data.get(i);
+ }
+ return pos;
+ }
+
+ public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
+ {
+
+ final byte len = byteEncodedDestination[pos];
+ if(len == 0)
+ {
+ return null;
+ }
+ ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination,pos+1,len).slice();
+
+
+ return new AMQShortString(data);
+ }
+
+ public static AMQShortString readFromBuffer(ByteBuffer buffer)
+ {
+ final short length = buffer.getUnsigned();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer data = buffer.slice();
+ data.limit(length);
+ data.rewind();
+ buffer.skip(length);
+
+ return new AMQShortString(data);
+ }
+ }
+
+
+ public byte[] getBytes()
+ {
+
+ if(_data.buf().hasArray() && _data.arrayOffset() == 0)
+ {
+ return _data.array();
+ }
+ else
+ {
+ final int size = length();
+ byte[] b = new byte[size];
+ ByteBuffer buf = _data.duplicate();
+ buf.rewind();
+ buf.get(b);
+
+
+ return b;
+ }
+
+
+ }
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+
+
+ final int size = length();
+ if (size != 0)
+ {
+
+ buffer.put((byte)size);
+ if(_data.buf().hasArray())
+ {
+ buffer.put(_data.array(),_data.arrayOffset(),length());
+ }
+ else
+ {
+
+ for(int i = 0; i < size; i++)
+ {
+
+ buffer.put(_data.get(i));
+ }
+ }
+ }
+ else
+ {
+ // really writing out unsigned byte
+ buffer.put((byte) 0);
+ }
+
+ }
+
+ private final class CharSubSequence implements CharSequence
+ {
+ private final int _offset;
+ private final int _end;
+
+
+ public CharSubSequence(final int offset, final int end)
+ {
+ _offset = offset;
+ _end = end;
+ }
+
+
+ public int length()
+ {
+ return _end - _offset;
+ }
+
+ public char charAt(int index)
+ {
+ return AMQShortString.this.charAt(index + _offset);
+ }
+
+ public CharSequence subSequence(int start, int end)
+ {
+ return new CharSubSequence(start+_offset,end+_offset);
+ }
+ }
+
+
+
+ public char[] asChars()
+ {
+ final int size = length();
+ final char[] chars = new char[size];
+
+
+
+
+ for(int i = 0 ; i < size; i++)
+ {
+ chars[i] = (char) _data.get(i);
+ }
+ return chars;
+ }
+
+
+
+ public String asString()
+ {
+ return new String(asChars());
+ }
+
+ public boolean equals(Object o)
+ {
+ if(o == null)
+ {
+ return false;
+ }
+ if(o == this)
+ {
+ return true;
+ }
+ if(o instanceof AMQShortString)
+ {
+
+ final AMQShortString otherString = (AMQShortString) o;
+
+ if((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ {
+ return false;
+ }
+
+ return _data.equals(otherString._data);
+
+
+
+
+ }
+ return (o instanceof CharSequence) && equals((CharSequence)o);
+
+ }
+
+ public boolean equals(CharSequence s)
+ {
+ if(s == null)
+ {
+ return false;
+ }
+ if(s.length() != length())
+ {
+ return false;
+ }
+ for(int i = 0; i < length(); i++)
+ {
+ if(charAt(i)!= s.charAt(i))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int hash = _hashCode;
+ if(hash == 0)
+ {
+ final int size = length();
+
+
+ for(int i = 0; i < size; i++)
+ {
+ hash = (31 * hash) + _data.get(i);
+ }
+ _hashCode = hash;
+ }
+
+ return hash;
+ }
+
+ public void setDirty()
+ {
+ _hashCode = 0;
+ }
+
+ public String toString()
+ {
+ return asString();
+ }
+
+
+ public int compareTo(AMQShortString name)
+ {
+ if(name == null)
+ {
+ return 1;
+ }
+ else
+ {
+
+ if(name.length() < length())
+ {
+ return - name.compareTo(this);
+ }
+
+
+
+ for(int i = 0; i < length() ; i++)
+ {
+ final byte d = _data.get(i);
+ final byte n = name._data.get(i);
+ if(d < n) return -1;
+ if(d > n) return 1;
+ }
+
+ return length() == name.length() ? 0 : -1;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
index 23c1929205..5175eace1e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
@@ -230,7 +230,7 @@ public enum AMQType
{
public int getEncodingSize(Object value)
{
- return 1 + (value == null ? 0 : ((byte[]) value).length);
+ return EncodingUtils.encodedLongstrLength((byte[]) value);
}
@@ -250,12 +250,12 @@ public enum AMQType
public void writeValueImpl(Object value, ByteBuffer buffer)
{
- EncodingUtils.writeBytes(buffer, (byte[]) value);
+ EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
public Object readValueFromBuffer(ByteBuffer buffer)
{
- return EncodingUtils.readBytes(buffer);
+ return EncodingUtils.readLongstr(buffer);
}
},
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index b29c23c2a2..efdd903f1f 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
@@ -51,4 +71,9 @@ public class AMQTypedValue
AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
+
+ public String toString()
+ {
+ return "["+getType()+": "+getValue()+"]";
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
index cf5708d993..246e91f1fb 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
@@ -21,11 +21,12 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface.
*/
public interface BodyFactory
{
- AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException;
+ AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
index c88c5d3bd3..f31fa8097e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -35,6 +35,7 @@ public class EncodingUtils
public static final int SIZEOF_UNSIGNED_SHORT = 2;
public static final int SIZEOF_UNSIGNED_INT = 4;
+ private static final boolean[] ALL_FALSE_ARRAY = new boolean[8];
public static int encodedShortStringLength(String s)
{
@@ -48,6 +49,120 @@ public class EncodingUtils
}
}
+
+ public static int encodedShortStringLength(short s)
+ {
+ if( s == 0 )
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if(s < 0)
+ {
+ len=1;
+ // sloppy - doesn't work of Integer.MIN_VALUE
+ s=(short)-s;
+ }
+
+ if(s>9999)
+ {
+ return 1+5;
+ }
+ else if(s>999)
+ {
+ return 1+4;
+ }
+ else if(s>99)
+ {
+ return 1+3;
+ }
+ else if(s>9)
+ {
+ return 1+2;
+ }
+ else
+ {
+ return 1+1;
+ }
+
+ }
+
+
+ public static int encodedShortStringLength(int i)
+ {
+ if( i == 0 )
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if(i < 0)
+ {
+ len=1;
+ // sloppy - doesn't work of Integer.MIN_VALUE
+ i=-i;
+ }
+
+ // range is now 1 - 2147483647
+ if(i < Short.MAX_VALUE)
+ {
+ return len + encodedShortStringLength((short)i);
+ }
+ else if (i > 999999)
+ {
+ return len + 6 + encodedShortStringLength((short)(i/1000000));
+ }
+ else // if (i > 99999)
+ {
+ return len + 5 + encodedShortStringLength((short)(i/100000));
+ }
+
+ }
+
+ public static int encodedShortStringLength(long l)
+ {
+ if(l == 0)
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if(l < 0)
+ {
+ len=1;
+ // sloppy - doesn't work of Long.MIN_VALUE
+ l=-l;
+ }
+ if(l < Integer.MAX_VALUE)
+ {
+ return len + encodedShortStringLength((int) l);
+ }
+ else if(l > 9999999999L)
+ {
+ return len + 10 + encodedShortStringLength((int) (l / 10000000000L));
+ }
+ else
+ {
+ return len + 1 + encodedShortStringLength((int) (l / 10L));
+ }
+
+ }
+
+
+ public static int encodedShortStringLength(AMQShortString s)
+ {
+ if (s == null)
+ {
+ return 1;
+ }
+ else
+ {
+ return (short) (1 + s.length());
+ }
+ }
+
+
public static int encodedLongStringLength(String s)
{
if (s == null)
@@ -122,6 +237,21 @@ public class EncodingUtils
}
}
+
+ public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s)
+ {
+ if (s != null)
+ {
+
+ s.writeToBuffer(buffer);
+ }
+ else
+ {
+ // really writing out unsigned byte
+ buffer.put((byte) 0);
+ }
+ }
+
public static void writeLongStringBytes(ByteBuffer buffer, String s)
{
assert s == null || s.length() <= 0xFFFE;
@@ -224,6 +354,7 @@ public class EncodingUtils
}
}
+
public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
{
if (table != null)
@@ -255,6 +386,238 @@ public class EncodingUtils
buffer.put(packedValue);
}
+ public static void writeBooleans(ByteBuffer buffer, boolean value)
+ {
+
+ buffer.put(value ? (byte) 1 : (byte) 0);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+
+ buffer.put(packedValue);
+ }
+
+
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5,
+ boolean value6)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 6));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5,
+ boolean value6,
+ boolean value7)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 6));
+ }
+
+ if (value7)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 7));
+ }
+
+ buffer.put(packedValue);
+ }
+
+
+
+
/**
* This is used for writing longstrs.
*
@@ -282,13 +645,27 @@ public class EncodingUtils
public static boolean[] readBooleans(ByteBuffer buffer)
{
- byte packedValue = buffer.get();
- boolean[] result = new boolean[8];
+ final byte packedValue = buffer.get();
+ if(packedValue == 0)
+ {
+ return ALL_FALSE_ARRAY;
+ }
+ final boolean[] result = new boolean[8];
- for (int i = 0; i < 8; i++)
+ result[0] = ((packedValue & 1) != 0);
+ result[1] = ((packedValue & (1 << 1)) != 0);
+ result[2] = ((packedValue & (1 << 2)) != 0);
+ result[3] = ((packedValue & (1 << 3)) != 0);
+ if((packedValue & 0xF0) == 0)
{
- result[i] = ((packedValue & (1 << i)) != 0);
+ result[0] = ((packedValue & 1) != 0);
}
+ result[4] = ((packedValue & (1 << 4)) != 0);
+ result[5] = ((packedValue & (1 << 5)) != 0);
+ result[6] = ((packedValue & (1 << 6)) != 0);
+ result[7] = ((packedValue & (1 << 7)) != 0);
+
+
return result;
}
@@ -312,6 +689,12 @@ public class EncodingUtils
return content;
}
+ public static AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return AMQShortString.readFromBuffer(buffer);
+
+ }
+
public static String readShortString(ByteBuffer buffer)
{
short length = buffer.getUnsigned();
@@ -363,7 +746,7 @@ public class EncodingUtils
}
}
- public static byte[] readLongstr(ByteBuffer buffer) throws AMQFrameDecodingException
+ public static byte[] readLongstr(ByteBuffer buffer)
{
long length = buffer.getUnsignedInt();
if (length == 0)
@@ -468,7 +851,7 @@ public class EncodingUtils
buffer.put((byte) (aBoolean ? 1 : 0));
}
- public static Boolean readBoolean(ByteBuffer buffer)
+ public static boolean readBoolean(ByteBuffer buffer)
{
byte packedValue = buffer.get();
return (packedValue == 1);
@@ -485,7 +868,7 @@ public class EncodingUtils
buffer.put(aByte);
}
- public static Byte readByte(ByteBuffer buffer)
+ public static byte readByte(ByteBuffer buffer)
{
return buffer.get();
}
@@ -502,7 +885,7 @@ public class EncodingUtils
buffer.putShort(aShort);
}
- public static Short readShort(ByteBuffer buffer)
+ public static short readShort(ByteBuffer buffer)
{
return buffer.getShort();
}
@@ -518,7 +901,7 @@ public class EncodingUtils
buffer.putInt(aInteger);
}
- public static Integer readInteger(ByteBuffer buffer)
+ public static int readInteger(ByteBuffer buffer)
{
return buffer.getInt();
}
@@ -534,7 +917,7 @@ public class EncodingUtils
buffer.putLong(aLong);
}
- public static Long readLong(ByteBuffer buffer)
+ public static long readLong(ByteBuffer buffer)
{
return buffer.getLong();
}
@@ -550,7 +933,7 @@ public class EncodingUtils
buffer.putFloat(aFloat);
}
- public static Float readFloat(ByteBuffer buffer)
+ public static float readFloat(ByteBuffer buffer)
{
return buffer.getFloat();
}
@@ -567,7 +950,7 @@ public class EncodingUtils
buffer.putDouble(aDouble);
}
- public static Double readDouble(ByteBuffer buffer)
+ public static double readDouble(ByteBuffer buffer)
{
return buffer.getDouble();
}
@@ -627,6 +1010,41 @@ public class EncodingUtils
writeByte(buffer, (byte) character);
}
+ public static long readLongAsShortString(ByteBuffer buffer)
+ {
+ short length = buffer.getUnsigned();
+ short pos = 0;
+ if(length == 0)
+ {
+ return 0L;
+ }
+ byte digit = buffer.get();
+ boolean isNegative;
+ long result = 0;
+ if(digit == (byte)'-')
+ {
+ isNegative = true;
+ pos++;
+ digit = buffer.get();
+ }
+ else
+ {
+ isNegative = false;
+ }
+ result = digit - (byte)'0';
+ pos++;
+
+ while(pos < length)
+ {
+ pos++;
+ digit = buffer.get();
+ result = (result << 3) + (result << 1);
+ result += digit - (byte)'0';
+ }
+
+ return result;
+ }
+
public static long readUnsignedInteger(ByteBuffer buffer)
{
long l = 0xFF & buffer.get();
@@ -639,4 +1057,23 @@ public class EncodingUtils
return l;
}
+
+
+ public static void main(String[] args)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.setAutoExpand(true);
+
+ long l = (long) Integer.MAX_VALUE;
+ l += 1024L;
+
+ writeUnsignedInteger(buf, l);
+
+ buf.flip();
+
+ long l2 = readUnsignedInteger(buf);
+
+ System.out.println("before: " + l);
+ System.out.println("after: " + l2);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 3c18683609..61bc9090a1 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -31,17 +31,16 @@ public class FieldTable
{
private static final Logger _logger = Logger.getLogger(FieldTable.class);
- private LinkedHashMap<String, AMQTypedValue> _properties;
+ private ByteBuffer _encodedForm;
+ private LinkedHashMap<AMQShortString, AMQTypedValue> _properties;
+ private long _encodedSize;
+ private static final int INITIAL_HASHMAP_CAPACITY = 16;
public FieldTable()
{
super();
- _properties = new LinkedHashMap<String, AMQTypedValue>();
-
}
-
-
/**
* Construct a new field table.
*
@@ -52,14 +51,105 @@ public class FieldTable
public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException
{
this();
- setFromBuffer(buffer, length);
+ _encodedForm = buffer.slice();
+ _encodedForm.limit((int)length);
+ _encodedSize = length;
+ buffer.skip((int)length);
+ }
+
+
+
+ private AMQTypedValue getProperty(AMQShortString string)
+ {
+ synchronized(this)
+ {
+ if(_properties == null)
+ {
+ if(_encodedForm == null)
+ {
+ return null;
+ }
+ else
+ {
+ populateFromBuffer();
+ }
+ }
+ }
+
+ if(_properties == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.get(string);
+ }
+ }
+
+ private void populateFromBuffer()
+ {
+ try
+ {
+ setFromBuffer(_encodedForm, _encodedSize);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ _logger.error("Error decoding FieldTable in deferred decoding mode ", e);
+ throw new IllegalArgumentException(e);
+ }
}
+ private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
+ {
+ initMapIfNecessary();
+ _encodedForm = null;
+ if(val == null)
+ {
+ return removeKey(key);
+ }
+ AMQTypedValue oldVal = _properties.put(key,val);
+ if(oldVal != null)
+ {
+ _encodedSize -= oldVal.getEncodingSize();
+ }
+ else
+ {
+ _encodedSize += EncodingUtils.encodedShortStringLength(key) + 1;
+ }
+ _encodedSize += val.getEncodingSize();
+
+ return oldVal;
+ }
+
+ private void initMapIfNecessary()
+ {
+ synchronized(this)
+ {
+ if(_properties == null)
+ {
+ if(_encodedForm == null)
+ {
+ _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>();
+ }
+ else
+ {
+ populateFromBuffer();
+ }
+ }
+
+ }
+ }
+
public Boolean getBoolean(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getBoolean(new AMQShortString(string));
+ }
+
+ public Boolean getBoolean(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.BOOLEAN))
{
return (Boolean) value.getValue();
@@ -70,9 +160,15 @@ public class FieldTable
}
}
+
public Byte getByte(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getByte(new AMQShortString(string));
+ }
+
+ public Byte getByte(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.BYTE))
{
return (Byte) value.getValue();
@@ -85,7 +181,12 @@ public class FieldTable
public Short getShort(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getShort(new AMQShortString(string));
+ }
+
+ public Short getShort(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.SHORT))
{
return (Short) value.getValue();
@@ -98,7 +199,12 @@ public class FieldTable
public Integer getInteger(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getInteger(new AMQShortString(string));
+ }
+
+ public Integer getInteger(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.INT))
{
return (Integer) value.getValue();
@@ -111,7 +217,12 @@ public class FieldTable
public Long getLong(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getLong(new AMQShortString(string));
+ }
+
+ public Long getLong(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.LONG))
{
return (Long) value.getValue();
@@ -124,7 +235,12 @@ public class FieldTable
public Float getFloat(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getFloat(new AMQShortString(string));
+ }
+
+ public Float getFloat(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.FLOAT))
{
return (Float) value.getValue();
@@ -137,7 +253,12 @@ public class FieldTable
public Double getDouble(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getDouble(new AMQShortString(string));
+ }
+
+ public Double getDouble(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.DOUBLE))
{
return (Double) value.getValue();
@@ -150,7 +271,12 @@ public class FieldTable
public String getString(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getString(new AMQShortString(string));
+ }
+
+ public String getString(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if ((value != null) && ((value.getType() == AMQType.WIDE_STRING) ||
(value.getType() == AMQType.ASCII_STRING)))
{
@@ -170,7 +296,12 @@ public class FieldTable
public Character getCharacter(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getCharacter(new AMQShortString(string));
+ }
+
+ public Character getCharacter(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.ASCII_CHARACTER))
{
return (Character) value.getValue();
@@ -183,7 +314,12 @@ public class FieldTable
public byte[] getBytes(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getBytes(new AMQShortString(string));
+ }
+
+ public byte[] getBytes(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if (value != null && (value.getType() == AMQType.BINARY))
{
return (byte[]) value.getValue();
@@ -196,7 +332,12 @@ public class FieldTable
public Object getObject(String string)
{
- AMQTypedValue value = _properties.get(string);
+ return getObject(new AMQShortString(string));
+ }
+
+ public Object getObject(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
if(value != null)
{
return value.getValue();
@@ -209,92 +350,163 @@ public class FieldTable
}
// ************ Setters
-
public Object setBoolean(String string, boolean b)
{
+ return setBoolean(new AMQShortString(string), b);
+ }
+
+ public Object setBoolean(AMQShortString string, boolean b)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.BOOLEAN.asTypedValue(b));
+ return setProperty(string, AMQType.BOOLEAN.asTypedValue(b));
}
public Object setByte(String string, byte b)
{
+ return setByte(new AMQShortString(string), b);
+ }
+
+ public Object setByte(AMQShortString string, byte b)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.BYTE.asTypedValue(b));
+ return setProperty(string, AMQType.BYTE.asTypedValue(b));
}
public Object setShort(String string, short i)
{
+ return setShort(new AMQShortString(string), i);
+ }
+
+ public Object setShort(AMQShortString string, short i)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.SHORT.asTypedValue(i));
+ return setProperty(string, AMQType.SHORT.asTypedValue(i));
}
+
public Object setInteger(String string, int i)
{
+ return setInteger(new AMQShortString(string), i);
+ }
+
+ public Object setInteger(AMQShortString string, int i)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.INT.asTypedValue(i));
+ return setProperty(string, AMQType.INT.asTypedValue(i));
}
+
public Object setLong(String string, long l)
{
+ return setLong(new AMQShortString(string), l);
+ }
+
+ public Object setLong(AMQShortString string, long l)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.LONG.asTypedValue(l));
+ return setProperty(string, AMQType.LONG.asTypedValue(l));
}
- public Object setFloat(String string, float v)
+
+ public Object setFloat(String string, float f)
+ {
+ return setFloat(new AMQShortString(string), f);
+ }
+
+ public Object setFloat(AMQShortString string, float v)
{
checkPropertyName(string);
- return _properties.put(string, AMQType.FLOAT.asTypedValue(v));
+ return setProperty(string, AMQType.FLOAT.asTypedValue(v));
}
- public Object setDouble(String string, double v)
+ public Object setDouble(String string, double d)
+ {
+ return setDouble(new AMQShortString(string), d);
+ }
+
+
+ public Object setDouble(AMQShortString string, double v)
{
checkPropertyName(string);
- return _properties.put(string, AMQType.DOUBLE.asTypedValue(v));
+ return setProperty(string, AMQType.DOUBLE.asTypedValue(v));
+ }
+
+
+ public Object setString(String string, String s)
+ {
+ return setString(new AMQShortString(string), s);
}
- public Object setString(String string, String value)
+ public Object setAsciiString(AMQShortString string, String value)
{
checkPropertyName(string);
if (value == null)
{
- return _properties.put(string, AMQType.VOID.asTypedValue(null));
+ return setProperty(string, AMQType.VOID.asTypedValue(null));
}
else
{
- //FIXME: determine string encoding and set either WIDE or ASCII string
-// if ()
- {
- return _properties.put(string, AMQType.WIDE_STRING.asTypedValue(value));
- }
-// else
-// {
-// return _properties.put(string, AMQType.ASCII_STRING.asTypedValue(value));
-// }
+ return setProperty(string, AMQType.ASCII_STRING.asTypedValue(value));
}
}
+ public Object setString(AMQShortString string, String value)
+ {
+ checkPropertyName(string);
+ if (value == null)
+ {
+ return setProperty(string, AMQType.VOID.asTypedValue(null));
+ }
+ else
+ {
+ return setProperty(string, AMQType.LONG_STRING.asTypedValue(value));
+ }
+ }
+
+
public Object setChar(String string, char c)
{
+ return setChar(new AMQShortString(string), c);
+ }
+
+
+ public Object setChar(AMQShortString string, char c)
+ {
checkPropertyName(string);
- return _properties.put(string, AMQType.ASCII_CHARACTER.asTypedValue(c));
+ return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c));
}
- public Object setBytes(String string, byte[] bytes)
+
+ public Object setBytes(String string, byte[] b)
+ {
+ return setBytes(new AMQShortString(string), b);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
{
checkPropertyName(string);
- return _properties.put(string, AMQType.BINARY.asTypedValue(bytes));
+ return setProperty(string, AMQType.BINARY.asTypedValue(bytes));
}
public Object setBytes(String string, byte[] bytes, int start, int length)
{
+ return setBytes(new AMQShortString(string), bytes,start,length);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
+ {
checkPropertyName(string);
byte[] newBytes = new byte[length];
System.arraycopy(bytes,start,newBytes,0,length);
return setBytes(string, bytes);
}
+ public Object setObject(String string, Object o)
+ {
+ return setObject(new AMQShortString(string), o);
+ }
- public Object setObject(String string, Object object)
+ public Object setObject(AMQShortString string, Object object)
{
if (object instanceof Boolean)
{
@@ -343,7 +555,7 @@ public class FieldTable
public boolean isNullStringValue(String name)
{
- AMQTypedValue value = _properties.get(name);
+ AMQTypedValue value = getProperty(new AMQShortString(name));
return (value != null) && (value.getType() == AMQType.VOID);
}
@@ -351,7 +563,12 @@ public class FieldTable
public Enumeration getPropertyNames()
{
- return Collections.enumeration(_properties.keySet());
+ return Collections.enumeration(keys());
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ return itemExists(propertyName);
}
public boolean propertyExists(String propertyName)
@@ -359,25 +576,34 @@ public class FieldTable
return itemExists(propertyName);
}
- public boolean itemExists(String string)
+ public boolean itemExists(AMQShortString string)
{
+ initMapIfNecessary();
return _properties.containsKey(string);
}
+ public boolean itemExists(String string)
+ {
+ return itemExists(new AMQShortString(string));
+ }
+
public String toString()
{
+ initMapIfNecessary();
+// if (_encodedForm != null)
+// _encodedForm.rewind();
return _properties.toString();
}
- private void checkPropertyName(String propertyName)
+ private void checkPropertyName(AMQShortString propertyName)
{
if (propertyName == null)
{
throw new IllegalArgumentException("Property name must not be null");
}
- else if ("".equals(propertyName))
+ else if (propertyName.length()==0)
{
throw new IllegalArgumentException("Property name must not be the empty string");
}
@@ -386,7 +612,7 @@ public class FieldTable
}
- protected static void checkIdentiferFormat(String propertyName)
+ protected static void checkIdentiferFormat(AMQShortString propertyName)
{
// AMQP Spec: 4.2.5.5 Field Tables
// Guidelines for implementers:
@@ -448,20 +674,31 @@ public class FieldTable
public long getEncodedSize()
{
+ return _encodedSize;
+ }
+
+ private void recalculateEncodedSize()
+ {
+
int encodedSize = 0;
- for(Map.Entry<String,AMQTypedValue> e : _properties.entrySet())
+ if(_properties != null)
{
- encodedSize += EncodingUtils.encodedShortStringLength(e.getKey());
- encodedSize++; // the byte for the encoding Type
- encodedSize += e.getValue().getEncodingSize();
+ for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet())
+ {
+ encodedSize += EncodingUtils.encodedShortStringLength(e.getKey());
+ encodedSize++; // the byte for the encoding Type
+ encodedSize += e.getValue().getEncodingSize();
+ }
}
- return encodedSize;
+ _encodedSize = encodedSize;
}
public void addAll(FieldTable fieldTable)
{
+ initMapIfNecessary();
_properties.putAll(fieldTable._properties);
+ recalculateEncodedSize();
}
@@ -473,135 +710,213 @@ public class FieldTable
public Object processOverElements(FieldTableElementProcessor processor)
{
- for(Map.Entry<String,AMQTypedValue> e : _properties.entrySet())
+ initMapIfNecessary();
+ if(_properties != null)
{
- boolean result = processor.processElement(e.getKey(), e.getValue());
- if(!result)
+ for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet())
{
- break;
+ boolean result = processor.processElement(e.getKey().toString(), e.getValue());
+ if(!result)
+ {
+ break;
+ }
}
}
return processor.getResult();
+
+
}
public int size()
{
+ initMapIfNecessary();
return _properties.size();
+
}
public boolean isEmpty()
{
- return _properties.isEmpty();
+ return size() ==0;
}
- public boolean containsKey(String key)
+ public boolean containsKey(AMQShortString key)
{
+ initMapIfNecessary();
return _properties.containsKey(key);
}
+ public boolean containsKey(String key)
+ {
+ return containsKey(new AMQShortString(key));
+ }
+
public Set<String> keys()
{
- return _properties.keySet();
+ initMapIfNecessary();
+ Set<String> keys = new LinkedHashSet<String>();
+ for(AMQShortString key : _properties.keySet())
+ {
+ keys.add(key.toString());
+ }
+ return keys;
}
- public Object get(Object key)
+ public Object get(AMQShortString key)
{
- return getObject((String)key);
+ return getObject(key);
}
- public Object put(Object key, Object value)
+
+ public Object put(AMQShortString key, Object value)
{
- return setObject(key.toString(), value);
+ return setObject(key, value);
}
-
+
public Object remove(String key)
{
+
+ return remove(new AMQShortString(key));
+
+ }
+
+ public Object remove(AMQShortString key)
+ {
+ AMQTypedValue val = removeKey(key);
+ return val == null ? null : val.getValue();
+
+ }
+
+
+ public AMQTypedValue removeKey(AMQShortString key)
+ {
+ initMapIfNecessary();
+ _encodedForm = null;
AMQTypedValue value = _properties.remove(key);
- return value == null ? null : value.getValue();
+ if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ _encodedSize -= EncodingUtils.encodedShortStringLength(key);
+ _encodedSize--;
+ _encodedSize -= value.getEncodingSize();
+ return value;
+ }
+
}
public void clear()
{
+ initMapIfNecessary();
+ _encodedForm = null;
_properties.clear();
+ _encodedSize = 0;
}
- public Set keySet()
+ public Set<AMQShortString> keySet()
{
+ initMapIfNecessary();
return _properties.keySet();
}
private void putDataInBuffer(ByteBuffer buffer)
{
- final Iterator<Map.Entry<String,AMQTypedValue>> it = _properties.entrySet().iterator();
+ if(_encodedForm != null)
+ {
+ if (_encodedForm.remaining() == 0)
+ {
+ _encodedForm.rewind();
+ }
+ buffer.put(_encodedForm);
+ }
+ else if(_properties != null)
+ {
+ final Iterator<Map.Entry<AMQShortString,AMQTypedValue>> it = _properties.entrySet().iterator();
- //If there are values then write out the encoded Size... could check _encodedSize != 0
- // write out the total length, which we have kept up to date as data is added
+ //If there are values then write out the encoded Size... could check _encodedSize != 0
+ // write out the total length, which we have kept up to date as data is added
- while (it.hasNext())
- {
- final Map.Entry<String,AMQTypedValue> me = it.next();
- try
+ while (it.hasNext())
{
- if (_logger.isTraceEnabled())
+ final Map.Entry<AMQShortString,AMQTypedValue> me = it.next();
+ try
{
- _logger.trace("Writing Property:" + me.getKey() +
- " Type:" + me.getValue().getType() +
- " Value:" + me.getValue().getValue());
- _logger.trace("Buffer Position:" + buffer.position() +
- " Remaining:" + buffer.remaining());
- }
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Writing Property:" + me.getKey() +
+ " Type:" + me.getValue().getType() +
+ " Value:" + me.getValue().getValue());
+ _logger.trace("Buffer Position:" + buffer.position() +
+ " Remaining:" + buffer.remaining());
+ }
- //Write the actual parameter name
- EncodingUtils.writeShortStringBytes(buffer, me.getKey());
- me.getValue().writeToBuffer(buffer);
- }
- catch (Exception e)
- {
- if (_logger.isTraceEnabled())
+ //Write the actual parameter name
+ EncodingUtils.writeShortStringBytes(buffer, me.getKey());
+ me.getValue().writeToBuffer(buffer);
+ }
+ catch (Exception e)
{
- _logger.trace("Exception thrown:" + e);
- _logger.trace("Writing Property:" + me.getKey() +
- " Type:" + me.getValue().getType() +
- " Value:" + me.getValue().getValue());
- _logger.trace("Buffer Position:" + buffer.position() +
- " Remaining:" + buffer.remaining());
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Exception thrown:" + e);
+ _logger.trace("Writing Property:" + me.getKey() +
+ " Type:" + me.getValue().getType() +
+ " Value:" + me.getValue().getValue());
+ _logger.trace("Buffer Position:" + buffer.position() +
+ " Remaining:" + buffer.remaining());
+ }
+ throw new RuntimeException(e);
}
- throw new RuntimeException(e);
}
}
}
- public void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
+ private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
{
- final boolean trace = _logger.isTraceEnabled();
- int sizeRead = 0;
- while (sizeRead < length)
+ final boolean trace = _logger.isTraceEnabled();
+ if(length > 0)
{
- int sizeRemaining = buffer.remaining();
- final String key = EncodingUtils.readShortString(buffer);
- AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
- sizeRead += (sizeRemaining - buffer.remaining());
- if (trace)
+ final int expectedRemaining = buffer.remaining()-(int)length;
+
+ _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
+
+ do
{
- _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "' (now read " + sizeRead + " of " + length + " encoded bytes)...");
+
+ final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
+
+ if (trace)
+ {
+ _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "'");
+ }
+
+
+
+ _properties.put(key,value);
+
+
+
}
+ while (buffer.remaining() > expectedRemaining);
- _properties.put(key,value);
}
+ _encodedSize = length;
if (trace)
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index 7a160ef471..7246c4a1cf 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -27,7 +27,21 @@ public class HeartbeatBody extends AMQBody
public static final byte TYPE = 8;
public static AMQFrame FRAME = new HeartbeatBody().toFrame();
- protected byte getFrameType()
+ public HeartbeatBody()
+ {
+
+ }
+
+ public HeartbeatBody(ByteBuffer buffer, long size)
+ {
+ if(size > 0)
+ {
+ //allow other implementations to have a payload, but ignore it:
+ buffer.skip((int) size);
+ }
+ }
+
+ public byte getFrameType()
{
return TYPE;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
index 97bd3d9253..c7ada708dc 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
@@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer;
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java
deleted file mode 100644
index d78034cf2f..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.framing;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQPInvalidClassException;
-
-import javax.jms.MessageFormatException;
-import javax.jms.JMSException;
-import java.util.Enumeration;
-
-
-public class JMSPropertyFieldTable
-{
- private FieldTable _fieldtable;
-
- public JMSPropertyFieldTable(FieldTable table)
- {
- _fieldtable = table;
- }
-
-
- private void checkPropertyName(String propertyName)
- {
- if (propertyName == null)
- {
- throw new IllegalArgumentException("Property name must not be null");
- }
- else if ("".equals(propertyName))
- {
- throw new IllegalArgumentException("Property name must not be the empty string");
- }
-
- checkIdentiferFormat(propertyName);
- }
-
- protected static void checkIdentiferFormat(String propertyName)
- {
-// JMS requirements 3.5.1 Property Names
-// Identifiers:
-// - An identifier is an unlimited-length character sequence that must begin
-// with a Java identifier start character; all following characters must be Java
-// identifier part characters. An identifier start character is any character for
-// which the method Character.isJavaIdentifierStart returns true. This includes
-// '_' and '$'. An identifier part character is any character for which the
-// method Character.isJavaIdentifierPart returns true.
-// - Identifiers cannot be the names NULL, TRUE, or FALSE.
-// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
-// ESCAPE.
-// – Identifiers are either header field references or property references. The
-// type of a property value in a message selector corresponds to the type
-// used to set the property. If a property that does not exist in a message is
-// referenced, its value is NULL. The semantics of evaluating NULL values
-// in a selector are described in Section 3.8.1.2, “Null Values.”
-// – The conversions that apply to the get methods for properties do not
-// apply when a property is used in a message selector expression. For
-// example, suppose you set a property as a string value, as in the
-// following:
-// myMessage.setStringProperty("NumberOfOrders", "2");
-// The following expression in a message selector would evaluate to false,
-// because a string cannot be used in an arithmetic expression:
-// "NumberOfOrders > 1"
-// – Identifiers are case sensitive.
-// – Message header field references are restricted to JMSDeliveryMode,
-// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
-// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
-// null and if so are treated as a NULL value.
-
- if (Boolean.getBoolean("strict-jms"))
- {
- // JMS start character
- if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
- }
-
- // JMS part character
- int length = propertyName.length();
- for (int c = 1; c < length; c++)
- {
- if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
- }
- }
-
- // JMS invalid names
- if ((propertyName.equals("NULL")
- || propertyName.equals("TRUE")
- || propertyName.equals("FALSE")
- || propertyName.equals("NOT")
- || propertyName.equals("AND")
- || propertyName.equals("OR")
- || propertyName.equals("BETWEEN")
- || propertyName.equals("LIKE")
- || propertyName.equals("IN")
- || propertyName.equals("IS")
- || propertyName.equals("ESCAPE")))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
- }
- }
-
- }
-
- // MapMessage Interface
- public boolean getBoolean(String string) throws JMSException
- {
- Boolean b = _fieldtable.getBoolean(string);
-
- if (b == null)
- {
- if (_fieldtable.containsKey(string))
- {
- Object str = _fieldtable.getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
-
- return b;
- }
-
- public char getCharacter(String string) throws JMSException
- {
- Character c = _fieldtable.getCharacter(string);
-
- if (c == null)
- {
- if (_fieldtable.isNullStringValue(string))
- {
- throw new NullPointerException("Cannot convert null char");
- }
- else
- {
- throw new MessageFormatException("getChar can't use " + string + " item.");
- }
- }
- else
- {
- return (char) c;
- }
- }
-
- public byte[] getBytes(String string) throws JMSException
- {
- byte[] bs = _fieldtable.getBytes(string);
-
- if (bs == null)
- {
- throw new MessageFormatException("getBytes can't use " + string + " item.");
- }
- else
- {
- return bs;
- }
- }
-
- public byte getByte(String string) throws JMSException
- {
- Byte b = _fieldtable.getByte(string);
- if (b == null)
- {
- if (_fieldtable.containsKey(string))
- {
- Object str = _fieldtable.getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getByte can't use " + string + " item.");
- }
- else
- {
- return Byte.valueOf((String) str);
- }
- }
- else
- {
- b = Byte.valueOf(null);
- }
- }
-
- return b;
- }
-
- public short getShort(String string) throws JMSException
- {
- Short s = _fieldtable.getShort(string);
-
- if (s == null)
- {
- s = Short.valueOf(getByte(string));
- }
-
- return s;
- }
-
- public int getInteger(String string) throws JMSException
- {
- Integer i = _fieldtable.getInteger(string);
-
- if (i == null)
- {
- i = Integer.valueOf(getShort(string));
- }
-
- return i;
- }
-
- public long getLong(String string) throws JMSException
- {
- Long l = _fieldtable.getLong(string);
-
- if (l == null)
- {
- l = Long.valueOf(getInteger(string));
- }
-
- return l;
- }
-
- public float getFloat(String string) throws JMSException
- {
- Float f = _fieldtable.getFloat(string);
-
- if (f == null)
- {
- if (_fieldtable.containsKey(string))
- {
- Object str = _fieldtable.getObject(string);
-
- if (str == null || !(str instanceof String))
- {
- throw new MessageFormatException("getFloat can't use " + string + " item.");
- }
- else
- {
- return Float.valueOf((String) str);
- }
- }
- else
- {
- f = Float.valueOf(null);
- }
-
- }
-
- return f;
- }
-
- public double getDouble(String string) throws JMSException
- {
- Double d = _fieldtable.getDouble(string);
-
- if (d == null)
- {
- d = Double.valueOf(getFloat(string));
- }
-
- return d;
- }
-
- public String getString(String string) throws JMSException
- {
- String s = _fieldtable.getString(string);
-
- if (s == null)
- {
- if (_fieldtable.containsKey(string))
- {
- Object o = _fieldtable.getObject(string);
- if (o instanceof byte[])
- {
- throw new MessageFormatException("getObject couldn't find " + string + " item.");
- }
- else
- {
- if (o == null)
- {
- return null;
- }
- else
- {
- s = String.valueOf(o);
- }
- }
- }
- }
-
- return s;
- }
-
- public Object getObject(String string) throws JMSException
- {
- return _fieldtable.getObject(string);
- }
-
- public void setBoolean(String string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setBoolean(string, b);
- }
-
- public void setChar(String string, char c) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setChar(string, c);
- }
-
- public Object setBytes(String string, byte[] bytes)
- {
- return _fieldtable.setBytes(string, bytes, 0, bytes.length);
- }
-
- public Object setBytes(String string, byte[] bytes, int start, int length)
- {
- return _fieldtable.setBytes(string, bytes, start, length);
- }
-
- public void setByte(String string, byte b) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setByte(string, b);
- }
-
- public void setShort(String string, short i) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setShort(string, i);
- }
-
- public void setInteger(String string, int i) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setInteger(string, i);
- }
-
- public void setLong(String string, long l) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setLong(string, l);
- }
-
- public void setFloat(String string, float v) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setFloat(string, v);
- }
-
- public void setDouble(String string, double v) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setDouble(string, v);
- }
-
- public void setString(String string, String string1) throws JMSException
- {
- checkPropertyName(string);
- _fieldtable.setString(string, string1);
- }
-
- public void setObject(String string, Object object) throws JMSException
- {
- checkPropertyName(string);
- try
- {
- _fieldtable.setObject(string, object);
- }
- catch (AMQPInvalidClassException aice)
- {
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
- }
- }
-
- public boolean itemExists(String string) throws JMSException
- {
- return _fieldtable.containsKey(string);
- }
-
- public void setFieldTable(FieldTable headers)
- {
- _fieldtable = headers;
- }
-
- public Enumeration getPropertyNames()
- {
- return _fieldtable.getPropertyNames();
- }
-
- public void clear()
- {
- _fieldtable.clear();
- }
-
- public boolean propertyExists(String propertyName)
- {
- return _fieldtable.propertyExists(propertyName);
- }
-
- public Object put(Object key, Object value)
- {
- return _fieldtable.put(key, value);
- }
-
- public Object remove(String propertyName)
- {
- return _fieldtable.remove(propertyName);
- }
-
- public boolean isEmpty()
- {
- return _fieldtable.isEmpty();
- }
-
- public void writeToBuffer(ByteBuffer data)
- {
- _fieldtable.writeToBuffer(data);
- }
-
- public Enumeration getMapNames()
- {
- return getPropertyNames();
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index f2d1a70cdc..697a0f4249 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -164,4 +164,14 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
protocolMajor + "." + protocolMinor + " not found in protocol version list.");
}
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer(new String(header));
+ buffer.append(Integer.toHexString(protocolClass));
+ buffer.append(Integer.toHexString(protocolInstance));
+ buffer.append(Integer.toHexString(protocolMajor));
+ buffer.append(Integer.toHexString(protocolMinor));
+ return buffer.toString();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index 836c4ad985..eab7ad0132 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -40,7 +40,7 @@ public class RequestManager
* to be known.
*/
private boolean serverFlag;
- private int connectionId;
+ private long connectionId;
/**
* Request and response frames must have a requestID and responseID which
@@ -56,7 +56,7 @@ public class RequestManager
private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
- public RequestManager(int connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
+ public RequestManager(long connectionId, int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.protocolWriter = protocolWriter;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index 3148603d65..9896bde5f8 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -43,7 +43,7 @@ public class ResponseManager
* to be known.
*/
private boolean serverFlag;
- private int connectionId;
+ private long connectionId;
private int maxAccumulatedResponses = 20; // Default
// private Class currentResponseMethodBodyClass;
@@ -80,11 +80,18 @@ public class ResponseManager
{
return (int)(requestId - o.requestId);
}
+
+ public String toString()
+ {
+ return requestId + ":" + (responseMethodBody == null ?
+ "null" :
+ "C" + responseMethodBody.getClazz() + " M" + responseMethodBody.getMethod());
+ }
}
private ConcurrentHashMap<Long, ResponseStatus> responseMap;
- public ResponseManager(int connectionId, int channel, AMQMethodListener methodListener,
+ public ResponseManager(long connectionId, int channel, AMQMethodListener methodListener,
AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
new file mode 100644
index 0000000000..0dd1bdc102
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private ByteBuffer _encodedBlock;
+
+ private AMQDataBlock _block;
+
+ public SmallCompositeAMQDataBlock(AMQDataBlock block)
+ {
+ _block = block;
+ }
+
+ /**
+ * The encoded block will be logically first before the AMQDataBlocks which are encoded
+ * into the buffer afterwards.
+ * @param encodedBlock already-encoded data
+ * @param block a block to be encoded.
+ */
+ public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+ {
+ this(block);
+ _encodedBlock = encodedBlock;
+ }
+
+ public AMQDataBlock getBlock()
+ {
+ return _block;
+ }
+
+ public ByteBuffer getEncodedBlock()
+ {
+ return _encodedBlock;
+ }
+
+ public long getSize()
+ {
+ long frameSize = _block.getSize();
+
+ if (_encodedBlock != null)
+ {
+ _encodedBlock.rewind();
+ frameSize += _encodedBlock.remaining();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (_encodedBlock != null)
+ {
+ buffer.put(_encodedBlock);
+ }
+ _block.writePayload(buffer);
+
+ }
+
+ public String toString()
+ {
+ if (_block == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{encodedBlock=").append(_encodedBlock);
+
+ buf.append(" _block=[").append(_block.toString()).append("]");
+
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
new file mode 100644
index 0000000000..9bc8232d61
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+public class VersionSpecificRegistry
+{
+ private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class);
+
+
+ private final byte _protocolMajorVersion;
+ private final byte _protocolMinorVersion;
+
+ private static final int DEFAULT_MAX_CLASS_ID = 200;
+ private static final int DEFAULT_MAX_METHOD_ID = 50;
+
+ private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+
+ public VersionSpecificRegistry(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID)
+ {
+ try
+ {
+ return _registry[classID][methodID];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory)
+ {
+ if(_registry.length <= classID)
+ {
+ AMQMethodBodyInstanceFactory[][] oldRegistry = _registry;
+ _registry = new AMQMethodBodyInstanceFactory[classID+1][];
+ System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length);
+ }
+
+ if(_registry[classID] == null)
+ {
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1];
+ }
+ else if(_registry[classID].length <= methodID)
+ {
+ AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID];
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1];
+ System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length);
+ }
+
+ _registry[classID][methodID] = instanceFactory;
+
+ }
+
+
+ public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size)
+ throws AMQFrameDecodingException
+ {
+ AMQMethodBodyInstanceFactory bodyFactory;
+ try
+ {
+ bodyFactory = _registry[classID][methodID];
+ }
+ catch(NullPointerException e)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+ catch(IndexOutOfBoundsException e)
+ {
+ if(classID >= _registry.length)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ else
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ }
+
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+
+
+ return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java
index 7364b9293a..43ff8f6a19 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Event.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java
@@ -25,90 +25,66 @@ import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IdleStatus;
-/**
- * Represents an operation on IoFilter.
- */
-enum EventType
-{
- OPENED, CLOSED, READ, WRITE, WRITTEN, RECEIVED, SENT, IDLE, EXCEPTION
-}
-class Event
+abstract public class Event
{
- private static final Logger _log = Logger.getLogger(Event.class);
-
- private final EventType type;
- private final IoFilter.NextFilter nextFilter;
- private final Object data;
-
- public Event(IoFilter.NextFilter nextFilter, EventType type, Object data)
- {
- this.type = type;
- this.nextFilter = nextFilter;
- this.data = data;
- if (type == EventType.EXCEPTION)
- {
- _log.error("Exception event constructed: " + data, (Throwable) data);
- }
- }
- public Object getData()
+ public Event()
{
- return data;
}
- public IoFilter.NextFilter getNextFilter()
- {
- return nextFilter;
- }
+ abstract public void process(IoSession session);
- public EventType getType()
+ public static final class ReceivedEvent extends Event
{
- return type;
- }
+ private final Object _data;
- void process(IoSession session)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Processing " + this);
- }
- if (type == EventType.RECEIVED)
- {
- nextFilter.messageReceived(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
- }
- else if (type == EventType.SENT)
+ private final IoFilter.NextFilter _nextFilter;
+
+ public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
{
- nextFilter.messageSent(session, data);
- //ByteBufferUtil.releaseIfPossible( data );
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.EXCEPTION)
+
+ public void process(IoSession session)
{
- nextFilter.exceptionCaught(session, (Throwable) data);
+ _nextFilter.messageReceived(session, _data);
}
- else if (type == EventType.IDLE)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionIdle(session, (IdleStatus) data);
+ return _nextFilter;
}
- else if (type == EventType.OPENED)
+ }
+
+
+ public static final class WriteEvent extends Event
+ {
+ private final IoFilter.WriteRequest _data;
+ private final IoFilter.NextFilter _nextFilter;
+
+ public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
{
- nextFilter.sessionOpened(session);
+ super();
+ _nextFilter = nextFilter;
+ _data = data;
}
- else if (type == EventType.WRITE)
+
+
+ public void process(IoSession session)
{
- nextFilter.filterWrite(session, (IoFilter.WriteRequest) data);
+ _nextFilter.filterWrite(session, _data);
}
- else if (type == EventType.CLOSED)
+
+ public IoFilter.NextFilter getNextFilter()
{
- nextFilter.sessionClosed(session);
+ return _nextFilter;
}
}
- public String toString()
- {
- return "Event: type " + type + ", data: " + data;
- }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
index b9673cd48f..9b3bcfa008 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger;
* Holds events for a session that will be processed asynchronously by
* the thread pool in PoolingFilter.
*/
-class Job implements Runnable
+public class Job implements Runnable
{
private final int _maxEvents;
private final IoSession _session;
private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
private final AtomicBoolean _active = new AtomicBoolean();
- private final AtomicInteger _refCount = new AtomicInteger();
+ //private final AtomicInteger _refCount = new AtomicInteger();
private final JobCompletionHandler _completionHandler;
Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
@@ -45,21 +45,21 @@ class Job implements Runnable
_completionHandler = completionHandler;
_maxEvents = maxEvents;
}
-
- void acquire()
- {
- _refCount.incrementAndGet();
- }
-
- void release()
- {
- _refCount.decrementAndGet();
- }
-
- boolean isReferenced()
- {
- return _refCount.get() > 0;
- }
+//
+// void acquire()
+// {
+// _refCount.incrementAndGet();
+// }
+//
+// void release()
+// {
+// _refCount.decrementAndGet();
+// }
+//
+// boolean isReferenced()
+// {
+// return _refCount.get() > 0;
+// }
void add(Event evt)
{
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index 38cfa68c78..9d04827246 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -25,57 +25,60 @@ import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.EnumSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
{
private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
- public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED));
- public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE));
private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
private final ReferenceCountingExecutorService _poolReference;
- private final Set<EventType> _asyncTypes;
private final String _name;
private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
_poolReference = refCountingPool;
- _asyncTypes = asyncTypes;
_name = name;
}
- private void fireEvent(IoSession session, Event event)
+ void fireAsynchEvent(IoSession session, Event event)
{
- if (_asyncTypes.contains(event.getType()))
- {
- Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
- job.add(event);
+ Job job = getJobForSession(session);
+ // job.acquire(); //prevents this job being removed from _jobs
+ job.add(event);
- //Additional checks on pool to check that it hasn't shutdown.
- // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
- {
- _poolReference.getPool().execute(job);
- }
- }
- else
+ //Additional checks on pool to check that it hasn't shutdown.
+ // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown())
{
- event.process(session);
+ _poolReference.getPool().execute(job);
}
+
+ }
+
+ public void createNewJobForSession(IoSession session)
+ {
+ Job job = new Job(session, this, _maxEvents);
+ session.setAttribute(_name, job);
}
private Job getJobForSession(IoSession session)
{
- Job job = _jobs.get(session);
- return job == null ? createJobForSession(session) : job;
+ return (Job) session.getAttribute(_name);
+
+/* if(job == null)
+ {
+ System.err.println("Error in " + _name);
+ Thread.dumpStack();
+ }
+
+
+ job = _jobs.get(session);
+ return job == null ? createJobForSession(session) : job;*/
}
private Job createJobForSession(IoSession session)
@@ -93,15 +96,16 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
//Job.JobCompletionHandler
public void completed(IoSession session, Job job)
{
- if (job.isComplete())
- {
- job.release();
- if (!job.isReferenced())
- {
- _jobs.remove(session);
- }
- }
- else
+// if (job.isComplete())
+// {
+// job.release();
+// if (!job.isReferenced())
+// {
+// _jobs.remove(session);
+// }
+// }
+// else
+ if(!job.isComplete())
{
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
@@ -114,45 +118,44 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
//IoFilter methods that are processed by threads on the pool
- public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.OPENED, null));
+ nextFilter.sessionOpened(session);
}
- public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception
+ public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.CLOSED, null));
+ nextFilter.sessionClosed(session);
}
- public void sessionIdle(NextFilter nextFilter, IoSession session,
- IdleStatus status) throws Exception
+ public void sessionIdle(final NextFilter nextFilter, final IoSession session,
+ final IdleStatus status) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.IDLE, status));
+ nextFilter.sessionIdle(session, status);
}
- public void exceptionCaught(NextFilter nextFilter, IoSession session,
- Throwable cause) throws Exception
+ public void exceptionCaught(final NextFilter nextFilter, final IoSession session,
+ final Throwable cause) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause));
+ nextFilter.exceptionCaught(session,cause);
}
- public void messageReceived(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message));
+ nextFilter.messageReceived(session,message);
}
- public void messageSent(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
+ public void messageSent(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
{
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.SENT, message));
+ nextFilter.messageSent(session, message);
}
- public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
{
- fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest));
+ nextFilter.filterWrite(session, writeRequest);
}
//IoFilter methods that are processed on current thread (NOT on pooled thread)
@@ -188,5 +191,51 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
// when the reference count gets to zero we release the executor service
_poolReference.releaseExecutorService();
}
+
+ public static class AsynchReadPoolingFilter extends PoolingFilter
+ {
+
+ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+ public void messageReceived(final NextFilter nextFilter, final IoSession session,
+ final Object message) throws Exception
+ {
+ fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+ }
+
+
+ }
+
+ public static class AsynchWritePoolingFilter extends PoolingFilter
+ {
+
+ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ {
+ super(refCountingPool, name);
+ }
+
+
+ public void filterWrite(final NextFilter nextFilter, final IoSession session,
+ final WriteRequest writeRequest) throws Exception
+ {
+ fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+ }
+
+ }
+
+ public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchReadPoolingFilter(refCountingPool,name);
+ }
+
+
+ public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name)
+ {
+ return new AsynchWritePoolingFilter(refCountingPool,name);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
index d4dbf1309a..c2f7f7ac48 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
@@ -26,15 +26,38 @@ import org.apache.mina.common.ThreadModel;
public class ReadWriteThreadModel implements ThreadModel
{
+
+ private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
+
+ private final PoolingFilter _asynchronousReadFilter;
+ private final PoolingFilter _asynchronousWriteFilter;
+
+ private ReadWriteThreadModel()
+ {
+ final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
+ _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+ _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
+ }
+
+ public PoolingFilter getAsynchronousReadFilter()
+ {
+ return _asynchronousReadFilter;
+ }
+
+ public PoolingFilter getAsynchronousWriteFilter()
+ {
+ return _asynchronousWriteFilter;
+ }
+
public void buildFilterChain(IoFilterChain chain) throws Exception
{
- ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executor, PoolingFilter.READ_EVENTS,
- "AsynchronousReadFilter");
- PoolingFilter asyncWrite = new PoolingFilter(executor, PoolingFilter.WRITE_EVENTS,
- "AsynchronousWriteFilter");
-
- chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
- chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
+
+ chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
+ chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
+ }
+
+ public static ReadWriteThreadModel getInstance()
+ {
+ return _instance;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index 8ea0d6ef1a..523a24f278 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.protocol;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.Map;
import java.util.HashMap;
@@ -27,14 +29,14 @@ public final class AMQConstant
{
private int _code;
- private String _name;
+ private AMQShortString _name;
private static Map _codeMap = new HashMap();
private AMQConstant(int code, String name, boolean map)
{
_code = code;
- _name = name;
+ _name = new AMQShortString(name);
if (map)
{
_codeMap.put(new Integer(code), this);
@@ -51,7 +53,7 @@ public final class AMQConstant
return _code;
}
- public String getName()
+ public AMQShortString getName()
{
return _name;
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
new file mode 100644
index 0000000000..a2d3de2f9e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.VersionSpecificRegistry;
+
+public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware
+{
+ public VersionSpecificRegistry getRegistry();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
new file mode 100644
index 0000000000..cbd8b900f3
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolVersionAware
+{
+ public byte getProtocolMinorVersion();
+
+ public byte getProtocolMajorVersion();
+
+ public boolean isProtocolVersionEqual(byte majorVersion, byte minorVersion);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
index b6a0bd500a..11e6652bd7 100644
--- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
@@ -23,6 +23,7 @@ package org.apache.qpid.url;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import java.util.HashMap;
import java.net.URI;
@@ -31,10 +32,10 @@ import java.net.URISyntaxException;
public class AMQBindingURL implements BindingURL
{
String _url;
- String _exchangeClass;
- String _exchangeName;
- String _destinationName;
- String _queueName;
+ AMQShortString _exchangeClass;
+ AMQShortString _exchangeName;
+ AMQShortString _destinationName;
+ AMQShortString _queueName;
private HashMap<String, String> _options;
@@ -84,7 +85,7 @@ public class AMQBindingURL implements BindingURL
if (connection.getPath() == null ||
connection.getPath().equals(""))
{
- URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination or Queue requried", _url);
}
else
@@ -92,7 +93,7 @@ public class AMQBindingURL implements BindingURL
int slash = connection.getPath().indexOf("/", 1);
if (slash == -1)
{
- URLHelper.parseError(_url.indexOf(_exchangeName) + _exchangeName.length(),
+ URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination requried", _url);
}
else
@@ -121,6 +122,26 @@ public class AMQBindingURL implements BindingURL
}
}
+ private void setExchangeClass(String exchangeClass)
+ {
+ setExchangeClass(new AMQShortString(exchangeClass));
+ }
+
+ private void setQueueName(String name)
+ {
+ setQueueName(new AMQShortString(name));
+ }
+
+ private void setDestinationName(String name)
+ {
+ setDestinationName(new AMQShortString(name));
+ }
+
+ private void setExchangeName(String exchangeName)
+ {
+ setExchangeName(new AMQShortString(exchangeName));
+ }
+
private void processOptions()
{
//this is where we would parse any options that needed more than just storage.
@@ -131,22 +152,22 @@ public class AMQBindingURL implements BindingURL
return _url;
}
- public String getExchangeClass()
+ public AMQShortString getExchangeClass()
{
return _exchangeClass;
}
- public void setExchangeClass(String exchangeClass)
+ public void setExchangeClass(AMQShortString exchangeClass)
{
_exchangeClass = exchangeClass;
}
- public String getExchangeName()
+ public AMQShortString getExchangeName()
{
return _exchangeName;
}
- public void setExchangeName(String name)
+ public void setExchangeName(AMQShortString name)
{
_exchangeName = name;
@@ -156,17 +177,17 @@ public class AMQBindingURL implements BindingURL
}
}
- public String getDestinationName()
+ public AMQShortString getDestinationName()
{
return _destinationName;
}
- public void setDestinationName(String name)
+ public void setDestinationName(AMQShortString name)
{
_destinationName = name;
}
- public String getQueueName()
+ public AMQShortString getQueueName()
{
if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
@@ -174,7 +195,7 @@ public class AMQBindingURL implements BindingURL
{
if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
{
- return getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION);
+ return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
}
else
{
@@ -192,7 +213,7 @@ public class AMQBindingURL implements BindingURL
}
}
- public void setQueueName(String name)
+ public void setQueueName(AMQShortString name)
{
_queueName = name;
}
@@ -212,7 +233,7 @@ public class AMQBindingURL implements BindingURL
return _options.containsKey(key);
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
@@ -221,15 +242,15 @@ public class AMQBindingURL implements BindingURL
if (containsOption(BindingURL.OPTION_ROUTING_KEY))
{
- return getOption(OPTION_ROUTING_KEY);
+ return new AMQShortString(getOption(OPTION_ROUTING_KEY));
}
return getDestinationName();
}
- public void setRoutingKey(String key)
+ public void setRoutingKey(AMQShortString key)
{
- setOption(OPTION_ROUTING_KEY, key);
+ setOption(OPTION_ROUTING_KEY, key.toString());
}
diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
index 76690b3230..86a8420d30 100644
--- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.url;
+import org.apache.qpid.framing.AMQShortString;
+
/*
Binding URL format:
<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
@@ -36,21 +38,21 @@ public interface BindingURL
String getURL();
- String getExchangeClass();
+ AMQShortString getExchangeClass();
- void setExchangeClass(String exchangeClass);
+ void setExchangeClass(AMQShortString name);
- String getExchangeName();
+ AMQShortString getExchangeName();
- void setExchangeName(String name);
+ void setExchangeName(AMQShortString name);
- String getDestinationName();
+ AMQShortString getDestinationName();
- void setDestinationName(String name);
+ void setDestinationName(AMQShortString name);
- String getQueueName();
+ AMQShortString getQueueName();
- void setQueueName(String name);
+ void setQueueName(AMQShortString name);
String getOption(String key);
@@ -58,9 +60,9 @@ public interface BindingURL
boolean containsOption(String key);
- String getRoutingKey();
+ AMQShortString getRoutingKey();
- void setRoutingKey(String key);
+ void setRoutingKey(AMQShortString key);
String toString();
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
new file mode 100644
index 0000000000..70a5f2dc5e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.util.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class BooleanLatch extends CountDownLatch
+{
+ public BooleanLatch()
+ {
+ super(1);
+ }
+
+ public void signal()
+ {
+ countDown();
+ }
+
+ public void await(long nanos) throws InterruptedException
+ {
+ await(nanos, TimeUnit.NANOSECONDS);
+ }
+}
diff --git a/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java
deleted file mode 100644
index 94c97ef808..0000000000
--- a/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java
+++ /dev/null
@@ -1,1016 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.framing;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import java.util.Enumeration;
-
-import org.apache.log4j.Logger;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-
-public class JMSPropertyFieldTableTest extends TestCase
-{
-
- private static final Logger _logger = Logger.getLogger(JMSPropertyFieldTableTest.class);
-
-
- public void setUp()
- {
- System.getProperties().setProperty("strict-jms", "true");
- }
-
- public void tearDown()
- {
- System.getProperties().remove("strict-jms");
- }
-
- /**
- * Test that setting a similar named value replaces any previous value set on that name
- */
- public void testReplacement() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- //Set a boolean value
- table1.setBoolean("value", true);
-
- // reset value to an integer
- table1.setInteger("value", Integer.MAX_VALUE);
-
- //Check boolean value is null
- try
- {
- table1.getBoolean("value");
- }
- catch (MessageFormatException mfe)
- {
- //normal execution
- }
- // ... and integer value is good
- Assert.assertEquals(Integer.MAX_VALUE, table1.getInteger("value"));
- }
-
- public void testRemoval() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- //Set a boolean value
- table1.setBoolean("value", true);
-
- Assert.assertTrue(table1.getBoolean("value"));
-
- table1.remove("value");
-
- //Check boolean value is null
- try
- {
- table1.getBoolean("value");
- }
- catch (MessageFormatException mfe)
- {
- //normal execution
- }
- }
-
-
- /**
- * Set a boolean and check that we can only get it back as a boolean and a string
- * Check that attempting to lookup a non existent value returns null
- */
- public void testBoolean() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setBoolean("value", true);
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Test Getting right value back
- Assert.assertEquals(true, table1.getBoolean("value"));
-
- //Check we don't get anything back for other gets
- try
- {
- table1.getByte("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getByte("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getShort("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getDouble("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getFloat("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getInteger("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getLong("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- //except value as a string
- Assert.assertEquals("true", table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- // Table should now have zero length for encoding
- checkEmpty(table1);
-
- //Looking up an invalid value will return false
- Assert.assertFalse(table1.getBoolean("Rubbish"));
- }
-
- /**
- * Set a byte and check that we can only get it back as a byte and a string
- * Check that attempting to lookup a non existent value returns null
- */
- public void testByte() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setByte("value", Byte.MAX_VALUE);
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Tets lookups we shouldn't get anything back for other gets
- //we should get right value back for this type ....
- try
- {
- table1.getBoolean("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getDouble("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getFloat("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getShort("value"));
- Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getInteger("value"));
- Assert.assertEquals(Byte.MAX_VALUE, (byte) table1.getLong("value"));
- Assert.assertEquals(Byte.MAX_VALUE, table1.getByte("value"));
- //... and a the string value of it.
- Assert.assertEquals("" + Byte.MAX_VALUE, table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- // Table should now have zero length for encoding
- checkEmpty(table1);
-
- //Looking up an invalid value returns null
- try
- {
- table1.getByte("Rubbish");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException mfs)
- {
- //normal Execution
- }
-
- }
-
-
- /**
- * Set a short and check that we can only get it back as a short and a string
- * Check that attempting to lookup a non existent value returns null
- */
- public void testShort() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setShort("value", Short.MAX_VALUE);
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Tets lookups we shouldn't get anything back for other gets
- //we should get right value back for this type ....
-
- try
- {
- table1.getBoolean("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getByte("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- try
- {
- table1.getDouble("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getFloat("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
-
- Assert.assertEquals(Short.MAX_VALUE, (short) table1.getLong("value"));
- Assert.assertEquals(Short.MAX_VALUE, (short) table1.getInteger("value"));
- Assert.assertEquals(Short.MAX_VALUE, table1.getShort("value"));
-
- //... and a the string value of it.
- Assert.assertEquals("" + Short.MAX_VALUE, table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- // Table should now have zero length for encoding
- checkEmpty(table1);
-
- //Looking up an invalid value returns null
- try
- {
- table1.getShort("Rubbish");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException mfe)
- {
- //normal path
- }
- }
-
-
- /**
- * Set a double and check that we can only get it back as a double
- * Check that attempting to lookup a non existent value returns null
- */
- public void testDouble() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setDouble("value", Double.MAX_VALUE);
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Tets lookups we shouldn't get anything back for other gets
- //we should get right value back for this type ....
- try
- {
- table1.getBoolean("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getByte("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getShort("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getFloat("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getInteger("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getLong("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
- Assert.assertEquals(Double.MAX_VALUE, table1.getDouble("value"));
- //... and a the string value of it.
- Assert.assertEquals("" + Double.MAX_VALUE, table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- // Table should now have zero length for encoding
- checkEmpty(table1);
-
- //Looking up an invalid value returns null
- try
- {
- table1.getDouble("Rubbish");
- fail("Should throw NullPointerException as float.valueOf will try sunreadJavaFormatString");
- }
- catch (NullPointerException mfe)
- {
- //normal path
- }
-
- }
-
-
- /**
- * Set a float and check that we can only get it back as a float
- * Check that attempting to lookup a non existent value returns null
- */
- public void testFloat() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setFloat("value", Float.MAX_VALUE);
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Tets lookups we shouldn't get anything back for other gets
- //we should get right value back for this type ....
- try
- {
- table1.getBoolean("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getByte("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getShort("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getInteger("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getLong("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
-
- Assert.assertEquals(Float.MAX_VALUE, table1.getFloat("value"));
- Assert.assertEquals(Float.MAX_VALUE, (float) table1.getDouble("value"));
-
- //... and a the string value of it.
- Assert.assertEquals("" + Float.MAX_VALUE, table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- // Table should now have zero length for encoding
- checkEmpty(table1);
-
- //Looking up an invalid value returns null
- try
- {
- table1.getFloat("Rubbish");
- fail("Should throw NullPointerException as float.valueOf will try sunreadJavaFormatString");
- }
- catch (NullPointerException mfe)
- {
- //normal path
- }
- }
-
-
- /**
- * Set an int and check that we can only get it back as an int
- * Check that attempting to lookup a non existent value returns null
- */
- public void testInt() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setInteger("value", Integer.MAX_VALUE);
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Tets lookups we shouldn't get anything back for other gets
- //we should get right value back for this type ....
- try
- {
- table1.getBoolean("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getByte("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getShort("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getDouble("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getFloat("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
-
- Assert.assertEquals(Integer.MAX_VALUE, table1.getLong("value"));
-
- Assert.assertEquals(Integer.MAX_VALUE, table1.getInteger("value"));
-
- //... and a the string value of it.
- Assert.assertEquals("" + Integer.MAX_VALUE, table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- // Table should now have zero length for encoding
- checkEmpty(table1);
-
- //Looking up an invalid value returns null
- try
- {
- table1.getInteger("Rubbish");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException mfe)
- {
- //normal path
- }
- }
-
-
- /**
- * Set a long and check that we can only get it back as a long
- * Check that attempting to lookup a non existent value returns null
- */
- public void testLong() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setLong("value", Long.MAX_VALUE);
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Tets lookups we shouldn't get anything back for other gets
- //we should get right value back for this type ....
- try
- {
- table1.getBoolean("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getByte("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getShort("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getDouble("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getFloat("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
- try
- {
- table1.getInteger("value");
- fail("Should throw MessageFormatException");
- }
- catch (MessageFormatException mfs)
- {
- //normal Execution
- }
-
-
- Assert.assertEquals(Long.MAX_VALUE, table1.getLong("value"));
-
- //... and a the string value of it.
- Assert.assertEquals("" + Long.MAX_VALUE, table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- // Table should now have zero length for encoding
- checkEmpty(table1);
-
- //Looking up an invalid value
- try
- {
- table1.getLong("Rubbish");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException mfs)
- {
- //normal Execution
- }
-
- }
-
-
- /**
- * Calls all methods that can be used to check the table is empty
- * - getEncodedSize
- * - isEmpty
- * - length
- *
- * @param table to check is empty
- */
- private void checkEmpty(JMSPropertyFieldTable table)
- {
- Assert.assertFalse(table.getPropertyNames().hasMoreElements());
- }
-
-
- /**
- * Set a String and check that we can only get it back as a String
- * Check that attempting to lookup a non existent value returns null
- */
- public void testString() throws JMSException
- {
- JMSPropertyFieldTable table1 = new JMSPropertyFieldTable(new FieldTable());
- table1.setString("value", "Hello");
- Assert.assertTrue(table1.propertyExists("value"));
-
- //Tets lookups we shouldn't get anything back for other gets
- //we should get right value back for this type ....
- Assert.assertEquals(false, table1.getBoolean("value"));
-
- try
- {
- table1.getByte("value");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException nfs)
- {
- //normal Execution
- }
- try
- {
- table1.getShort("value");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException nfs)
- {
- //normal Execution
- }
- try
- {
- table1.getDouble("value");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException nfs)
- {
- //normal Execution
- }
- try
- {
- table1.getFloat("value");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException nfs)
- {
- //normal Execution
- }
- try
- {
- table1.getInteger("value");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException nfs)
- {
- //normal Execution
- }
- try
- {
- table1.getLong("value");
- fail("Should throw NumberFormatException");
- }
- catch (NumberFormatException nfs)
- {
- //normal Execution
- }
-
- Assert.assertEquals("Hello", table1.getString("value"));
-
- table1.remove("value");
- //but after a remove it doesn't
- Assert.assertFalse(table1.propertyExists("value"));
-
- checkEmpty(table1);
-
- //Looking up an invalid value returns null
- Assert.assertEquals(null, table1.getString("Rubbish"));
-
- //Additional Test that haven't been covered for string
- table1.setObject("value", "Hello");
- //Check that it was set correctly
- Assert.assertEquals("Hello", table1.getString("value"));
- }
-
-
- public void testValues() throws JMSException
- {
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
- table.setBoolean("bool", true);
- table.setDouble("double", Double.MAX_VALUE);
- table.setFloat("float", Float.MAX_VALUE);
- table.setInteger("int", Integer.MAX_VALUE);
- table.setLong("long", Long.MAX_VALUE);
- table.setShort("short", Short.MAX_VALUE);
- table.setString("string", "Hello");
- table.setString("nullstring", null);
-
- table.setObject("objectbool", true);
- table.setObject("objectdouble", Double.MAX_VALUE);
- table.setObject("objectfloat", Float.MAX_VALUE);
- table.setObject("objectint", Integer.MAX_VALUE);
- table.setObject("objectlong", Long.MAX_VALUE);
- table.setObject("objectshort", Short.MAX_VALUE);
- table.setObject("objectstring", "Hello");
-
-
- Assert.assertEquals(true, table.getBoolean("bool"));
-
- Assert.assertEquals(Double.MAX_VALUE, table.getDouble("double"));
- Assert.assertEquals(Float.MAX_VALUE, table.getFloat("float"));
- Assert.assertEquals(Integer.MAX_VALUE, table.getInteger("int"));
- Assert.assertEquals(Long.MAX_VALUE, table.getLong("long"));
- Assert.assertEquals(Short.MAX_VALUE, table.getShort("short"));
- Assert.assertEquals("Hello", table.getString("string"));
- Assert.assertEquals(null, table.getString("null-string"));
-
- Assert.assertEquals(true, table.getObject("objectbool"));
- Assert.assertEquals(Double.MAX_VALUE, table.getObject("objectdouble"));
- Assert.assertEquals(Float.MAX_VALUE, table.getObject("objectfloat"));
- Assert.assertEquals(Integer.MAX_VALUE, table.getObject("objectint"));
- Assert.assertEquals(Long.MAX_VALUE, table.getObject("objectlong"));
- Assert.assertEquals(Short.MAX_VALUE, table.getObject("objectshort"));
- Assert.assertEquals("Hello", table.getObject("objectstring"));
- }
-
- /**
- * Additional test checkPropertyName doesn't accept Null
- */
- public void testCheckPropertyNameasNull() throws JMSException
- {
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
-
- try
- {
- table.setObject(null, "String");
- fail("Null property name is not allowed");
- }
- catch (IllegalArgumentException iae)
- {
- //normal path
- }
- checkEmpty(table);
- }
-
-
- /**
- * Additional test checkPropertyName doesn't accept an empty String
- */
- public void testCheckPropertyNameasEmptyString() throws JMSException
- {
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
-
- try
- {
- table.setObject("", "String");
- fail("empty property name is not allowed");
- }
- catch (IllegalArgumentException iae)
- {
- //normal path
- }
- checkEmpty(table);
- }
-
-
- /**
- * Additional test checkPropertyName doesn't accept an empty String
- */
- public void testCheckPropertyNamehasMaxLength() throws JMSException
- {
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
-
- StringBuffer longPropertyName = new StringBuffer(129);
-
- for (int i = 0; i < 129; i++)
- {
- longPropertyName.append("x");
- }
-
- try
- {
- table.setObject(longPropertyName.toString(), "String");
- fail("property name must be < 128 characters");
- }
- catch (IllegalArgumentException iae)
- {
- _logger.warn("JMS requires infinite property names AMQP limits us to 128 characters");
- }
-
- checkEmpty(table);
- }
-
-
- /**
- * Additional test checkPropertyName starts with a letter
- */
- public void testCheckPropertyNameStartCharacterIsLetter() throws JMSException
- {
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
-
- //Try a name that starts with a number
- try
- {
- table.setObject("1", "String");
- fail("property name must start with a letter");
- }
- catch (IllegalArgumentException iae)
- {
- //normal path
- }
-
- checkEmpty(table);
- }
-
- /**
- * Additional test checkPropertyName starts with a letter
- */
- public void testCheckPropertyNameContainsInvalidCharacter() throws JMSException
- {
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
-
- //Try a name that starts with a number
- try
- {
- table.setObject("hello there", "String");
- fail("property name cannot contain spaces");
- }
- catch (IllegalArgumentException iae)
- {
- //normal path
- }
-
- checkEmpty(table);
- }
-
-
- /**
- * Additional test checkPropertyName starts with a letter
- */
- public void testCheckPropertyNameIsInvalid() throws JMSException
- {
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
-
- //Try a name that starts with a number
- try
- {
- table.setObject("ESCAPE", "String");
- fail("property name must not contains spaces");
- }
- catch (IllegalArgumentException iae)
- {
- //normal path
- }
-
- checkEmpty(table);
- }
-
- /**
- * Additional test checkPropertyName starts with a hash or a dollar
- */
- public void testCheckPropertyNameStartCharacterIsHashorDollar() throws JMSException
- {
- _logger.warn("Test:testCheckPropertyNameStartCharacterIsHashorDollar will fail JMS compilance as # and $ are not valid in a jms identifier");
-// JMSPropertyFieldTable table = new JMSPropertyFieldTable();
-//
-// //Try a name that starts with a number
-// try
-// {
-// table.setObject("#", "String");
-// table.setObject("$", "String");
-// }
-// catch (IllegalArgumentException iae)
-// {
-// fail("property name are allowed to start with # and $s in AMQP");
-// }
- }
-
- /**
- * Test the contents of the sets
- */
- public void testSets()
- {
-
- JMSPropertyFieldTable table = new JMSPropertyFieldTable(new FieldTable());
-
- table.put("n1", "1");
- table.put("n2", "2");
- table.put("n3", "3");
-
- Enumeration enumerator = table.getPropertyNames();
- Assert.assertEquals("n1", enumerator.nextElement());
- Assert.assertEquals("n2", enumerator.nextElement());
- Assert.assertEquals("n3", enumerator.nextElement());
- Assert.assertFalse(enumerator.hasMoreElements());
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(JMSPropertyFieldTableTest.class);
- }
-
-}
diff --git a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
index c259d3ee8a..6160dc1843 100644
--- a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
+++ b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
@@ -25,8 +25,6 @@ import junit.framework.TestCase;
import java.util.Enumeration;
import java.util.Iterator;
-import java.util.Map;
-import java.util.HashMap;
import org.apache.mina.common.ByteBuffer;
import org.apache.log4j.Logger;
@@ -227,7 +225,7 @@ public class PropertyFieldTableTest extends TestCase
//... and a the string value of it.
Assert.assertEquals("" + Double.MAX_VALUE, table1.getString("value"));
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -265,7 +263,7 @@ public class PropertyFieldTableTest extends TestCase
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -303,7 +301,7 @@ public class PropertyFieldTableTest extends TestCase
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -341,7 +339,7 @@ public class PropertyFieldTableTest extends TestCase
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -380,7 +378,7 @@ public class PropertyFieldTableTest extends TestCase
Assert.assertEquals(null, table1.getString("value"));
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
// Table should now have zero length for encoding
@@ -440,7 +438,7 @@ public class PropertyFieldTableTest extends TestCase
Assert.assertTrue(table1.containsKey("value"));
table1.remove("value");
- //but after a remove it doesn't
+ //but after a removeKey it doesn't
Assert.assertFalse(table1.containsKey("value"));
checkEmpty(table1);
@@ -457,23 +455,7 @@ public class PropertyFieldTableTest extends TestCase
- public void testKeyEnumeration()
- {
- FieldTable table = new FieldTable();
- table.setLong("one", 1L);
- table.setLong("two", 2L);
- table.setLong("three", 3L);
- table.setLong("four", 4L);
- table.setLong("five", 5L);
-
- Enumeration e = table.getPropertyNames();
-
- Assert.assertTrue("one".equals(e.nextElement()));
- Assert.assertTrue("two".equals(e.nextElement()));
- Assert.assertTrue("three".equals(e.nextElement()));
- Assert.assertTrue("four".equals(e.nextElement()));
- Assert.assertTrue("five".equals(e.nextElement()));
- }
+
public void testValues()
{
@@ -546,8 +528,7 @@ public class PropertyFieldTableTest extends TestCase
table.setString("string", "hello");
table.setString("null-string", null);
-
- final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize()); // FIXME XXX: Is cast a problem?
+ final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem?
table.writeToBuffer(buffer);
@@ -597,7 +578,7 @@ public class PropertyFieldTableTest extends TestCase
byte[] _bytes = {99, 98, 97, 96, 95};
result.setBytes("bytes", _bytes);
- size += 1 + EncodingUtils.encodedShortStringLength("bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length;
+ size += 1 + EncodingUtils.encodedShortStringLength("bytes") + 4 + _bytes.length;
Assert.assertEquals(size, result.getEncodedSize());
result.setChar("char", (char) 'c');
@@ -639,7 +620,7 @@ public class PropertyFieldTableTest extends TestCase
Assert.assertEquals(size, result.getEncodedSize());
result.setObject("object-bytes", _bytes);
- size += 1 + EncodingUtils.encodedShortStringLength("object-bytes") + 1 + EncodingUtils.encodedByteLength() * _bytes.length;
+ size += 1 + EncodingUtils.encodedShortStringLength("object-bytes") + 4 + _bytes.length;
Assert.assertEquals(size, result.getEncodedSize());
result.setObject("object-char", 'c');
@@ -758,7 +739,7 @@ public class PropertyFieldTableTest extends TestCase
try
{
- table.setObject(null, "String");
+ table.setObject((String)null, "String");
fail("Null property name is not allowed");
}
catch (IllegalArgumentException iae)
@@ -868,9 +849,9 @@ public class PropertyFieldTableTest extends TestCase
{
FieldTable table = new FieldTable();
- table.put("StringProperty", "String");
+ table.setObject("StringProperty", "String");
- Assert.assertEquals("String", table.get("StringProperty"));
+ Assert.assertEquals("String", table.getString("StringProperty"));
//Test Clear
@@ -887,15 +868,15 @@ public class PropertyFieldTableTest extends TestCase
FieldTable table = new FieldTable();
- table.put("n1", "1");
- table.put("n2", "2");
- table.put("n3", "3");
+ table.setObject("n1", "1");
+ table.setObject("n2", "2");
+ table.setObject("n3", "3");
+
+
+ Assert.assertEquals("1", table.getObject("n1"));
+ Assert.assertEquals("2", table.getObject("n2"));
+ Assert.assertEquals("3", table.getObject("n3"));
- Iterator iterator = table.keySet().iterator();
- Assert.assertEquals("n1", iterator.next());
- Assert.assertEquals("n2", iterator.next());
- Assert.assertEquals("n3", iterator.next());
- Assert.assertFalse(iterator.hasNext());
diff --git a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
index 972a935257..6383d52298 100644
--- a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
+++ b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
@@ -36,25 +36,32 @@ public class PoolingFilterTest extends TestCase
public void setUp()
{
+
//Create Pool
_executorService = ReferenceCountingExecutorService.getInstance();
_executorService.acquireExecutorService();
- _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS,
+ _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
"AsynchronousWriteFilter");
}
public void testRejectedExecution() throws Exception
{
- _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message"));
+
+ TestSession testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
+ _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
//Shutdown the pool
_executorService.getPool().shutdownNow();
try
{
+
+ testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
//prior to fix for QPID-172 this would throw RejectedExecutionException
- _pool.filterWrite(null, new TestSession(), null);
+ _pool.filterWrite(null, testSession, null);
}
catch (RejectedExecutionException rje)
{
diff --git a/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/java/common/src/test/java/org/apache/qpid/session/TestSession.java
index f10d55e9d0..aafc91b03b 100644
--- a/java/common/src/test/java/org/apache/qpid/session/TestSession.java
+++ b/java/common/src/test/java/org/apache/qpid/session/TestSession.java
@@ -24,9 +24,13 @@ import org.apache.mina.common.*;
import java.net.SocketAddress;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
public class TestSession implements IoSession
{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+
public TestSession()
{
}
@@ -68,42 +72,42 @@ public class TestSession implements IoSession
public Object getAttachment()
{
- return null; //TODO
+ return getAttribute("");
}
public Object setAttachment(Object attachment)
{
- return null; //TODO
+ return setAttribute("",attachment);
}
public Object getAttribute(String key)
{
- return null; //TODO
+ return attributes.get(key);
}
public Object setAttribute(String key, Object value)
{
- return null; //TODO
+ return attributes.put(key,value);
}
public Object setAttribute(String key)
{
- return null; //TODO
+ return attributes.put(key, Boolean.TRUE);
}
public Object removeAttribute(String key)
{
- return null; //TODO
+ return attributes.remove(key);
}
public boolean containsAttribute(String key)
{
- return false; //TODO
+ return attributes.containsKey(key);
}
public Set getAttributeKeys()
{
- return null; //TODO
+ return attributes.keySet();
}
public TransportType getTransportType()