From 582a91714ddfb865614b6a335a3da2841299e07e Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 7 May 2007 09:40:58 +0000 Subject: Merged revisions 534897-534902,534904-535253,535255-535809 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r534897 | bhupendrab | 2007-05-03 15:53:20 +0100 (Thu, 03 May 2007) | 1 line Attribute details background made same as other displays. ........ r535309 | ritchiem | 2007-05-04 17:12:59 +0100 (Fri, 04 May 2007) | 2 lines QPID-466 Changes to FieldTable along with corresponding PropertyValueTest to limit the Java client to only AMQP 0-8 compliant values. ........ r535809 | ritchiem | 2007-05-07 10:28:15 +0100 (Mon, 07 May 2007) | 5 lines QPID-466 Updated FieldTable to ensure no Decimal value is set that is larger than can be transmitted over AMQP. That is a max scale value of Byte.MAX_VALUE and value of up to Integer.MAX_VALUE. Additional tests to ensure this is the case. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@535819 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 7 +- .../org/apache/qpid/client/message/AMQMessage.java | 72 +++++++- .../qpid/client/message/AbstractJMSMessage.java | 151 +++++++++++------ .../qpid/client/message/JMSHeaderAdapter.java | 22 +++ .../qpid/test/unit/basic/PropertyValueTest.java | 64 +++++++- .../unit/client/connection/ConnectionTest.java | 8 +- .../main/java/org/apache/qpid/framing/AMQType.java | 64 +++++--- .../java/org/apache/qpid/framing/FieldTable.java | 177 +++++++++++--------- .../management/ui/views/AttributesTabControl.java | 28 ++-- .../apache/qpid/requestreply/PingPongProducer.java | 182 ++++++++++++--------- .../org/apache/qpid/ping/PingLatencyTestPerf.java | 72 ++++---- 11 files changed, 560 insertions(+), 287 deletions(-) (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 10101976eb..b7615c5b7b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -443,7 +443,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); - _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); + _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); _connection = con; _transacted = transacted; @@ -491,7 +491,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); @@ -785,7 +784,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqe = new AMQException("Closing session forcibly", e); } _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + closeProducersAndConsumers(amqe); } } @@ -2021,7 +2020,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - // If IMMEDIATE_PREFETCH is not set then we need to start fetching + // If IMMEDIATE_PREFETCH is not set then we need to start fetching if (!_immediatePrefetch) { // We do this now if this is the first call on a started connection diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java index feb58678b9..8741a1cbb6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java @@ -24,14 +24,17 @@ import javax.jms.JMSException; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import java.math.BigDecimal; public class AMQMessage { protected ContentHeaderProperties _contentHeaderProperties; - /** - * If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required - */ + /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ protected AMQSession _session; protected final long _deliveryTag; @@ -48,8 +51,9 @@ public class AMQMessage } /** - * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user - * calls acknowledge() + * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls + * acknowledge() + * * @param s the AMQ session that delivered this message */ public void setAMQSession(AMQSession s) @@ -64,6 +68,7 @@ public class AMQMessage /** * Get the AMQ message number assigned to this message + * * @return the message number */ public long getDeliveryTag() @@ -71,11 +76,60 @@ public class AMQMessage return _deliveryTag; } - /** - * Invoked prior to sending the message. Allows the message to be modified if necessary before - * sending. - */ + /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */ public void prepareForSending() throws JMSException { } + + public FieldTable getPropertyHeaders() + { + return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders(); + } + + public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException + { + getPropertyHeaders().setDecimal(propertyName, bd); + } + + public void setIntProperty(AMQShortString propertyName, int i) throws JMSException + { + getPropertyHeaders().setInteger(propertyName, new Integer(i)); + } + + public void setLongStringProperty(AMQShortString propertyName, String value) + { + getPropertyHeaders().setString(propertyName, value); + } + + public void setTimestampProperty(AMQShortString propertyName, long value) + { + getPropertyHeaders().setTimestamp(propertyName, value); + } + + public void setVoidProperty(AMQShortString propertyName) + { + getPropertyHeaders().setVoid(propertyName); + } + + //** Getters + + public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException + { + return getPropertyHeaders().getDecimal(propertyName); + } + + public int getIntegerProperty(AMQShortString propertyName) throws JMSException + { + return getPropertyHeaders().getInteger(propertyName); + } + + public String getLongStringProperty(AMQShortString propertyName) + { + return getPropertyHeaders().getString(propertyName); + } + + public Long getTimestampProperty(AMQShortString propertyName) + { + return getPropertyHeaders().getTimestamp(propertyName); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index f87b4027f6..11102e0925 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -54,6 +54,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach private Destination _destination; private JMSHeaderAdapter _headerAdapter; private BasicMessageConsumer _consumer; + private boolean _strictAMQP; protected AbstractJMSMessage(ByteBuffer data) { @@ -68,6 +69,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _readableMessage = (data != null); _changedData = (data == null); _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + + _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, @@ -289,85 +292,116 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean propertyExists(AMQShortString propertyName) throws JMSException { - checkPropertyName(propertyName); - return getJmsHeaders().propertyExists(propertyName); } public boolean propertyExists(String propertyName) throws JMSException { - checkPropertyName(propertyName); - return getJmsHeaders().propertyExists(propertyName); } public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getBoolean(propertyName); } public boolean getBooleanProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getBoolean(propertyName); } public byte getByteProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getByte(propertyName); } + public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBytes(propertyName); + } + public short getShortProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getShort(propertyName); } public int getIntProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getInteger(propertyName); } public long getLongProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getLong(propertyName); } public float getFloatProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getFloat(propertyName); } public double getDoubleProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getDouble(propertyName); } public String getStringProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } return getJmsHeaders().getString(propertyName); } public Object getObjectProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); - return getJmsHeaders().getObject(propertyName); } @@ -378,83 +412,124 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setBoolean(propertyName, b); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setBoolean(propertyName, b); } public void setByteProperty(String propertyName, byte b) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setByte(propertyName, new Byte(b)); } + public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBytes(propertyName, bytes); + } + public void setShortProperty(String propertyName, short i) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setShort(propertyName, new Short(i)); } public void setIntProperty(String propertyName, int i) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); - getJmsHeaders().setInteger(propertyName, new Integer(i)); + JMSHeaderAdapter.checkPropertyName(propertyName); + super.setIntProperty(new AMQShortString(propertyName), new Integer(i)); } public void setLongProperty(String propertyName, long l) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setLong(propertyName, new Long(l)); } public void setFloatProperty(String propertyName, float f) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setFloat(propertyName, new Float(f)); } public void setDoubleProperty(String propertyName, double v) throws JMSException { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setDouble(propertyName, new Double(v)); } public void setStringProperty(String propertyName, String value) throws JMSException { checkWritableProperties(); - checkPropertyName(propertyName); - getJmsHeaders().setString(propertyName, value); + JMSHeaderAdapter.checkPropertyName(propertyName); + super.setLongStringProperty(new AMQShortString(propertyName), value); } public void setObjectProperty(String propertyName, Object object) throws JMSException { checkWritableProperties(); - checkPropertyName(propertyName); getJmsHeaders().setObject(propertyName, object); } protected void removeProperty(AMQShortString propertyName) throws JMSException { - checkPropertyName(propertyName); getJmsHeaders().remove(propertyName); } protected void removeProperty(String propertyName) throws JMSException { - checkPropertyName(propertyName); getJmsHeaders().remove(propertyName); } @@ -544,17 +619,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach getContentHeaderProperties().setHeaders(messageProperties); } - private void checkPropertyName(CharSequence propertyName) - { - if (propertyName == null) - { - throw new IllegalArgumentException("Property name must not be null"); - } - else if (propertyName.length() == 0) - { - throw new IllegalArgumentException("Property name must not be the empty string"); - } - } public JMSHeaderAdapter getJmsHeaders() { @@ -625,11 +689,4 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _consumer = basicMessageConsumer; } - public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException - { - checkPropertyName(propertyName); - - return getJmsHeaders().getBytes(propertyName); - - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 348a0bd152..39b4e1e27b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -48,6 +48,7 @@ public final class JMSHeaderAdapter public boolean getBoolean(String string) throws JMSException { + checkPropertyName(string); Boolean b = getHeaders().getBoolean(string); if (b == null) @@ -76,6 +77,7 @@ public final class JMSHeaderAdapter public boolean getBoolean(AMQShortString string) throws JMSException { + checkPropertyName(string); Boolean b = getHeaders().getBoolean(string); if (b == null) @@ -104,6 +106,7 @@ public final class JMSHeaderAdapter public char getCharacter(String string) throws JMSException { + checkPropertyName(string); Character c = getHeaders().getCharacter(string); if (c == null) @@ -130,6 +133,8 @@ public final class JMSHeaderAdapter public byte[] getBytes(AMQShortString string) throws JMSException { + checkPropertyName(string); + byte[] bs = getHeaders().getBytes(string); if (bs == null) @@ -144,6 +149,7 @@ public final class JMSHeaderAdapter public byte getByte(String string) throws JMSException { + checkPropertyName(string); Byte b = getHeaders().getByte(string); if (b == null) { @@ -171,6 +177,7 @@ public final class JMSHeaderAdapter public short getShort(String string) throws JMSException { + checkPropertyName(string); Short s = getHeaders().getShort(string); if (s == null) @@ -183,6 +190,7 @@ public final class JMSHeaderAdapter public int getInteger(String string) throws JMSException { + checkPropertyName(string); Integer i = getHeaders().getInteger(string); if (i == null) @@ -195,6 +203,7 @@ public final class JMSHeaderAdapter public long getLong(String string) throws JMSException { + checkPropertyName(string); Long l = getHeaders().getLong(string); if (l == null) @@ -207,6 +216,7 @@ public final class JMSHeaderAdapter public float getFloat(String string) throws JMSException { + checkPropertyName(string); Float f = getHeaders().getFloat(string); if (f == null) @@ -236,6 +246,7 @@ public final class JMSHeaderAdapter public double getDouble(String string) throws JMSException { + checkPropertyName(string); Double d = getHeaders().getDouble(string); if (d == null) @@ -248,6 +259,7 @@ public final class JMSHeaderAdapter public String getString(String string) throws JMSException { + checkPropertyName(string); String s = getHeaders().getString(string); if (s == null) @@ -278,6 +290,7 @@ public final class JMSHeaderAdapter public Object getObject(String string) throws JMSException { + checkPropertyName(string); return getHeaders().getObject(string); } @@ -301,16 +314,19 @@ public final class JMSHeaderAdapter public Object setBytes(AMQShortString string, byte[] bytes) { + checkPropertyName(string); return getHeaders().setBytes(string, bytes); } public Object setBytes(String string, byte[] bytes) { + checkPropertyName(string); return getHeaders().setBytes(string, bytes); } public Object setBytes(String string, byte[] bytes, int start, int length) { + checkPropertyName(string); return getHeaders().setBytes(string, bytes, start, length); } @@ -392,6 +408,7 @@ public final class JMSHeaderAdapter public boolean itemExists(String string) throws JMSException { + checkPropertyName(string); return getHeaders().containsKey(string); } @@ -407,26 +424,31 @@ public final class JMSHeaderAdapter public boolean propertyExists(AMQShortString propertyName) { + checkPropertyName(propertyName); return getHeaders().propertyExists(propertyName); } public boolean propertyExists(String propertyName) { + checkPropertyName(propertyName); return getHeaders().propertyExists(propertyName); } public Object put(Object key, Object value) { + checkPropertyName(key.toString()); return getHeaders().setObject(key.toString(), value); } public Object remove(AMQShortString propertyName) { + checkPropertyName(propertyName); return getHeaders().remove(propertyName); } public Object remove(String propertyName) { + checkPropertyName(propertyName); return getHeaders().remove(propertyName); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index fd997e3abd..90784b0772 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.basic; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.math.BigDecimal; import javax.jms.Destination; import javax.jms.JMSException; @@ -41,6 +42,8 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.framing.AMQShortString; public class PropertyValueTest extends TestCase implements MessageListener { @@ -53,7 +56,7 @@ public class PropertyValueTest extends TestCase implements MessageListener private AMQSession _session; private final List received = new ArrayList(); private final List messages = new ArrayList(); - private int _count = 100; + private int _count = 1; public String _connectionString = "vm://:1"; protected void setUp() throws Exception @@ -118,7 +121,7 @@ public class PropertyValueTest extends TestCase implements MessageListener check(); _logger.info("Completed without failure"); - Thread.sleep(10); + Thread.sleep(10); _connection.close(); _logger.error("End Run Number:" + (run - 1)); @@ -180,6 +183,44 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setShortProperty("Short", (short) Short.MAX_VALUE); m.setStringProperty("String", "Test"); + //AMQP Specific values + + // Timestamp + long nano = System.nanoTime(); + m.setStringProperty("time-str", String.valueOf(nano)); + ((AMQMessage) m).setTimestampProperty(new AMQShortString("time"), nano); + + //Decimal + BigDecimal bd = new BigDecimal(Integer.MAX_VALUE); + ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal"), bd.setScale(Byte.MAX_VALUE)); + + + bd = new BigDecimal((long) Integer.MAX_VALUE + 1L); + + try + { + ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-value"), bd.setScale(Byte.MAX_VALUE)); + fail("UnsupportedOperationException should be thrown as value can't be correctly transmitted"); + } + catch (UnsupportedOperationException uoe) + { + // normal path. + } + + + try + { + ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-scale"), bd.setScale(Byte.MAX_VALUE + 1)); + fail("UnsupportedOperationException should be thrown as scale can't be correctly transmitted"); + } + catch (UnsupportedOperationException uoe) + { + // normal path. + } + + //Void + ((AMQMessage) m).setVoidProperty(new AMQShortString("void")); + _logger.debug("Sending Msg:" + m); producer.send(m); } @@ -235,6 +276,25 @@ public class PropertyValueTest extends TestCase implements MessageListener (long) Long.MAX_VALUE, m.getLongProperty("Long")); Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); + + // AMQP Tests Specific values + + Assert.assertEquals("Check Timestamp properties are correctly transported", + m.getStringProperty("time-str"), + ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString()); + + //Decimal + BigDecimal bd = new BigDecimal(Integer.MAX_VALUE); + + Assert.assertEquals("Check decimal properties are correctly transported", + bd.setScale(Byte.MAX_VALUE), + ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal"))); + + //Void + ((AMQMessage) m).setVoidProperty(new AMQShortString("void")); + + Assert.assertTrue("Check void properties are correctly transported", + ((AMQMessage) m).getPropertyHeaders().containsKey("void")); } received.clear(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index ab0d26b0e0..588c82221e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,12 +47,12 @@ public class ConnectionTest extends TestCase protected void setUp() throws Exception { super.setUp(); - TransportConnection.createVMBroker(1); +// TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { - TransportConnection.killVMBroker(1); +// TransportConnection.killVMBroker(1); } public void testSimpleConnection() diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index 80739e1aee..6dda91a488 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -22,10 +22,11 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import java.math.BigDecimal; +import java.math.BigInteger; + public enum AMQType { - - //AMQP FieldTable Wire Types LONG_STRING('S') @@ -113,55 +114,75 @@ public enum AMQType public int getEncodingSize(Object value) { - // TODO : fixme - throw new UnsupportedOperationException(); + return EncodingUtils.encodedByteLength()+ EncodingUtils.encodedIntegerLength(); } public Object toNativeValue(Object value) { - // TODO : fixme - throw new UnsupportedOperationException(); + if(value instanceof BigDecimal) + { + return (BigDecimal) value; + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + + value.getClass().getName() + ") to BigDecimal."); + } } public void writeValueImpl(Object value, ByteBuffer buffer) { - // TODO : fixme - throw new UnsupportedOperationException(); + BigDecimal bd = (BigDecimal) value; + + byte places = new Integer(bd.scale()).byteValue(); + + int unscaled = bd.intValue(); + + EncodingUtils.writeByte(buffer, places); + + EncodingUtils.writeInteger(buffer, unscaled); } public Object readValueFromBuffer(ByteBuffer buffer) { - // TODO : fixme - throw new UnsupportedOperationException(); + byte places = EncodingUtils.readByte(buffer); + + int unscaled = EncodingUtils.readInteger(buffer); + + BigDecimal bd = new BigDecimal(unscaled); + return bd.setScale(places); } }, TIMESTAMP('T') { - public int getEncodingSize(Object value) { - // TODO : fixme - throw new UnsupportedOperationException(); + return EncodingUtils.encodedLongLength(); } - public Object toNativeValue(Object value) { - // TODO : fixme - throw new UnsupportedOperationException(); + if(value instanceof Long) + { + return (Long) value; + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + + value.getClass().getName() + ") to timestamp."); + } } public void writeValueImpl(Object value, ByteBuffer buffer) { - // TODO : fixme - throw new UnsupportedOperationException(); + EncodingUtils.writeLong(buffer, (Long) value); } + public Object readValueFromBuffer(ByteBuffer buffer) { - // TODO : fixme - throw new UnsupportedOperationException(); + return EncodingUtils.readLong(buffer); } }, @@ -173,7 +194,6 @@ public enum AMQType throw new UnsupportedOperationException(); } - public Object toNativeValue(Object value) { // TODO : fixme @@ -250,7 +270,7 @@ public enum AMQType public void writeValueImpl(Object value, ByteBuffer buffer) { - EncodingUtils.writeLongstr(buffer, (byte[]) value); + EncodingUtils.writeLongstr(buffer, (byte[]) value); } public Object readValueFromBuffer(ByteBuffer buffer) diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index a7544c5747..631a3ae149 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import java.math.BigDecimal; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; @@ -56,26 +57,28 @@ public class FieldTable * * @param buffer the buffer from which to read data. The length byte must be read already * @param length the length of the field table. Must be > 0. + * * @throws AMQFrameDecodingException if there is an error decoding the table */ public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException { this(); _encodedForm = buffer.slice(); - _encodedForm.limit((int)length); + _encodedForm.limit((int) length); _encodedSize = length; - buffer.skip((int)length); + buffer.skip((int) length); } - private AMQTypedValue getProperty(AMQShortString string) { - synchronized(this) + checkPropertyName(string); + + synchronized (this) { - if(_properties == null) + if (_properties == null) { - if(_encodedForm == null) + if (_encodedForm == null) { return null; } @@ -86,7 +89,7 @@ public class FieldTable } } - if(_properties == null) + if (_properties == null) { return null; } @@ -112,17 +115,18 @@ public class FieldTable private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) { + checkPropertyName(key); initMapIfNecessary(); - if(_properties.containsKey(key)) + if (_properties.containsKey(key)) { _encodedForm = null; - if(val == null) + if (val == null) { return removeKey(key); } } - else if(_encodedForm != null && val != null) + else if (_encodedForm != null && val != null) { EncodingUtils.writeShortStringBytes(_encodedForm, key); val.writeToBuffer(_encodedForm); @@ -134,9 +138,8 @@ public class FieldTable } - - AMQTypedValue oldVal = _properties.put(key,val); - if(oldVal != null) + AMQTypedValue oldVal = _properties.put(key, val); + if (oldVal != null) { _encodedSize -= oldVal.getEncodingSize(); } @@ -151,13 +154,13 @@ public class FieldTable private void initMapIfNecessary() { - synchronized(this) + synchronized (this) { - if(_properties == null) + if (_properties == null) { - if(_encodedForm == null || _encodedSize == 0) + if (_encodedForm == null || _encodedSize == 0) { - _properties = new LinkedHashMap(); + _properties = new LinkedHashMap(); } else { @@ -365,7 +368,7 @@ public class FieldTable public Object getObject(AMQShortString string) { AMQTypedValue value = getProperty(string); - if(value != null) + if (value != null) { return value.getValue(); } @@ -376,6 +379,33 @@ public class FieldTable } + public Long getTimestamp(AMQShortString name) + { + AMQTypedValue value = getProperty(name); + if ((value != null) && ((value.getType() == AMQType.TIMESTAMP))) + { + return (Long) value.getValue(); + } + else + { + return null; + } + } + + public BigDecimal getDecimal(AMQShortString propertyName) + { + AMQTypedValue value = getProperty(propertyName); + if ((value != null) && ((value.getType() == AMQType.DECIMAL))) + { + return (BigDecimal) value.getValue(); + } + else + { + return null; + } + } + + // ************ Setters public Object setBoolean(String string, boolean b) { @@ -384,18 +414,16 @@ public class FieldTable public Object setBoolean(AMQShortString string, boolean b) { - checkPropertyName(string); return setProperty(string, AMQType.BOOLEAN.asTypedValue(b)); } public Object setByte(String string, byte b) { return setByte(new AMQShortString(string), b); - } + } public Object setByte(AMQShortString string, byte b) { - checkPropertyName(string); return setProperty(string, AMQType.BYTE.asTypedValue(b)); } @@ -406,7 +434,6 @@ public class FieldTable public Object setShort(AMQShortString string, short i) { - checkPropertyName(string); return setProperty(string, AMQType.SHORT.asTypedValue(i)); } @@ -418,7 +445,6 @@ public class FieldTable public Object setInteger(AMQShortString string, int i) { - checkPropertyName(string); return setProperty(string, AMQType.INT.asTypedValue(i)); } @@ -430,11 +456,9 @@ public class FieldTable public Object setLong(AMQShortString string, long l) { - checkPropertyName(string); return setProperty(string, AMQType.LONG.asTypedValue(l)); } - public Object setFloat(String string, float f) { return setFloat(new AMQShortString(string), f); @@ -442,7 +466,6 @@ public class FieldTable public Object setFloat(AMQShortString string, float v) { - checkPropertyName(string); return setProperty(string, AMQType.FLOAT.asTypedValue(v)); } @@ -451,14 +474,11 @@ public class FieldTable return setDouble(new AMQShortString(string), d); } - public Object setDouble(AMQShortString string, double v) { - checkPropertyName(string); return setProperty(string, AMQType.DOUBLE.asTypedValue(v)); } - public Object setString(String string, String s) { return setString(new AMQShortString(string), s); @@ -466,7 +486,6 @@ public class FieldTable public Object setAsciiString(AMQShortString string, String value) { - checkPropertyName(string); if (value == null) { return setProperty(string, AMQType.VOID.asTypedValue(null)); @@ -479,7 +498,6 @@ public class FieldTable public Object setString(AMQShortString string, String value) { - checkPropertyName(string); if (value == null) { return setProperty(string, AMQType.VOID.asTypedValue(null)); @@ -490,20 +508,16 @@ public class FieldTable } } - public Object setChar(String string, char c) { return setChar(new AMQShortString(string), c); } - public Object setChar(AMQShortString string, char c) { - checkPropertyName(string); return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c)); } - public Object setBytes(String string, byte[] b) { return setBytes(new AMQShortString(string), b); @@ -511,20 +525,18 @@ public class FieldTable public Object setBytes(AMQShortString string, byte[] bytes) { - checkPropertyName(string); return setProperty(string, AMQType.BINARY.asTypedValue(bytes)); } public Object setBytes(String string, byte[] bytes, int start, int length) { - return setBytes(new AMQShortString(string), bytes,start,length); + return setBytes(new AMQShortString(string), bytes, start, length); } public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) { - checkPropertyName(string); byte[] newBytes = new byte[length]; - System.arraycopy(bytes,start,newBytes,0,length); + System.arraycopy(bytes, start, newBytes, 0, length); return setBytes(string, bytes); } @@ -533,6 +545,31 @@ public class FieldTable return setObject(new AMQShortString(string), o); } + public Object setTimestamp(AMQShortString string, long datetime) + { + return setProperty(string, AMQType.TIMESTAMP.asTypedValue(datetime)); + } + + public Object setDecimal(AMQShortString string, BigDecimal decimal) + { + if (decimal.longValue() > Integer.MAX_VALUE) + { + throw new UnsupportedOperationException("AMQP doesnot support decimals larger than " + Integer.MAX_VALUE); + } + + if (decimal.scale() > Byte.MAX_VALUE) + { + throw new UnsupportedOperationException("AMQP doesnot support decimal scales larger than " + Byte.MAX_VALUE); + } + + return setProperty(string, AMQType.DECIMAL.asTypedValue(decimal)); + } + + public Object setVoid(AMQShortString string) + { + return setProperty(string, AMQType.VOID.asTypedValue(null)); + } + public Object setObject(AMQShortString string, Object object) { if (object instanceof Boolean) @@ -579,7 +616,6 @@ public class FieldTable throw new AMQPInvalidClassException("Only Primatives objects allowed Object is:" + object.getClass()); } - public boolean isNullStringValue(String name) { AMQTypedValue value = getProperty(new AMQShortString(name)); @@ -603,10 +639,11 @@ public class FieldTable return itemExists(propertyName); } - public boolean itemExists(AMQShortString string) + public boolean itemExists(AMQShortString propertyName) { + checkPropertyName(propertyName); initMapIfNecessary(); - return _properties.containsKey(string); + return _properties.containsKey(propertyName); } public boolean itemExists(String string) @@ -620,15 +657,13 @@ public class FieldTable return _properties.toString(); } - - private void checkPropertyName(AMQShortString propertyName) { if (propertyName == null) { throw new IllegalArgumentException("Property name must not be null"); } - else if (propertyName.length()==0) + else if (propertyName.length() == 0) { throw new IllegalArgumentException("Property name must not be the empty string"); } @@ -636,7 +671,6 @@ public class FieldTable checkIdentiferFormat(propertyName); } - protected static void checkIdentiferFormat(AMQShortString propertyName) { // AMQP Spec: 4.2.5.5 Field Tables @@ -649,7 +683,6 @@ public class FieldTable // 503 (syntax error). Conformance test: amq_wlp_table_01. // * A peer MUST handle duplicate fields by using only the first instance. - // AMQP length limit if (propertyName.length() > 128) { @@ -666,7 +699,6 @@ public class FieldTable } } - // ************************* Byte Buffer Processing public void writeToBuffer(ByteBuffer buffer) @@ -707,9 +739,9 @@ public class FieldTable { int encodedSize = 0; - if(_properties != null) + if (_properties != null) { - for(Map.Entry e : _properties.entrySet()) + for (Map.Entry e : _properties.entrySet()) { encodedSize += EncodingUtils.encodedShortStringLength(e.getKey()); encodedSize++; // the byte for the encoding Type @@ -732,18 +764,19 @@ public class FieldTable public static interface FieldTableElementProcessor { public boolean processElement(String propertyName, AMQTypedValue value); + public Object getResult(); } public Object processOverElements(FieldTableElementProcessor processor) { initMapIfNecessary(); - if(_properties != null) + if (_properties != null) { - for(Map.Entry e : _properties.entrySet()) + for (Map.Entry e : _properties.entrySet()) { boolean result = processor.processElement(e.getKey().toString(), e.getValue()); - if(!result) + if (!result) { break; } @@ -764,7 +797,7 @@ public class FieldTable public boolean isEmpty() { - return size() ==0; + return size() == 0; } public boolean containsKey(AMQShortString key) @@ -782,7 +815,7 @@ public class FieldTable { initMapIfNecessary(); Set keys = new LinkedHashSet(); - for(AMQShortString key : _properties.keySet()) + for (AMQShortString key : _properties.keySet()) { keys.add(key.toString()); } @@ -797,7 +830,6 @@ public class FieldTable } - public Object put(AMQShortString key, Object value) { return setObject(key, value); @@ -824,7 +856,7 @@ public class FieldTable initMapIfNecessary(); _encodedForm = null; AMQTypedValue value = _properties.remove(key); - if(value == null) + if (value == null) { return null; } @@ -839,11 +871,10 @@ public class FieldTable } - public void clear() { initMapIfNecessary(); - _encodedForm = null; + _encodedForm = null; _properties.clear(); _encodedSize = 0; } @@ -857,19 +888,19 @@ public class FieldTable private void putDataInBuffer(ByteBuffer buffer) { - if(_encodedForm != null) + if (_encodedForm != null) { - if(_encodedForm.position() != 0) + if (_encodedForm.position() != 0) { _encodedForm.flip(); } // _encodedForm.limit((int)getEncodedSize()); buffer.put(_encodedForm); } - else if(_properties != null) + else if (_properties != null) { - final Iterator> it = _properties.entrySet().iterator(); + final Iterator> it = _properties.entrySet().iterator(); //If there are values then write out the encoded Size... could check _encodedSize != 0 // write out the total length, which we have kept up to date as data is added @@ -877,7 +908,7 @@ public class FieldTable while (it.hasNext()) { - final Map.Entry me = it.next(); + final Map.Entry me = it.next(); try { if (_logger.isTraceEnabled()) @@ -889,8 +920,6 @@ public class FieldTable " Remaining:" + buffer.remaining()); } - - //Write the actual parameter name EncodingUtils.writeShortStringBytes(buffer, me.getKey()); me.getValue().writeToBuffer(buffer); @@ -917,12 +946,12 @@ public class FieldTable { final boolean trace = _logger.isTraceEnabled(); - if(length > 0) + if (length > 0) { - final int expectedRemaining = buffer.remaining()-(int)length; + final int expectedRemaining = buffer.remaining() - (int) length; - _properties = new LinkedHashMap(INITIAL_HASHMAP_CAPACITY); + _properties = new LinkedHashMap(INITIAL_HASHMAP_CAPACITY); do { @@ -936,11 +965,9 @@ public class FieldTable } - - _properties.put(key,value); + _properties.put(key, value); - } while (buffer.remaining() > expectedRemaining); @@ -962,15 +989,15 @@ public class FieldTable public boolean equals(Object o) { - if(o == this) + if (o == this) { return true; } - if(o == null) + if (o == null) { return false; } - if(!(o instanceof FieldTable)) + if (!(o instanceof FieldTable)) { return false; } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java index 1ffe34d9c5..3234503fb5 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java @@ -477,37 +477,32 @@ public class AttributesTabControl extends TabControl layout.marginHeight = 20; layout.marginWidth = 20; - Composite parent = new Composite(shell, SWT.NONE); + Composite parent = _toolkit.createComposite(shell, SWT.NONE); parent.setLayoutData(new GridData(SWT.FILL, SWT.FILL, true, true)); parent.setLayout(layout); // Name - Label label = new Label(parent, SWT.NONE); - label.setText(ATTRIBUTE_TABLE_TITLES[0]); + Label label = _toolkit.createLabel(parent, ATTRIBUTE_TABLE_TITLES[0], SWT.NONE); GridData layoutData = new GridData(SWT.TRAIL, SWT.TOP, false, false); label.setLayoutData(layoutData); - Text value = new Text(parent, SWT.BEGINNING | SWT.BORDER |SWT.READ_ONLY); - value.setText(ViewUtility.getDisplayText(attribute.getName())); + int textStyle = SWT.BEGINNING | SWT.BORDER |SWT.READ_ONLY; + Text value = _toolkit.createText(parent, ViewUtility.getDisplayText(attribute.getName()), textStyle); value.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); // Description - label = new Label(parent, SWT.NONE); - label.setText(DESCRIPTION); + label = _toolkit.createLabel(parent, DESCRIPTION, SWT.NONE); label.setLayoutData(new GridData(SWT.TRAIL, SWT.TOP, false, false)); - value = new Text(parent, SWT.BEGINNING | SWT.BORDER | SWT.READ_ONLY); - value.setText(attribute.getDescription()); + value = _toolkit.createText(parent, attribute.getDescription(), textStyle); value.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); // value - label = new Label(parent, SWT.NONE); - label.setText(ATTRIBUTE_TABLE_TITLES[1]); + label = _toolkit.createLabel(parent, ATTRIBUTE_TABLE_TITLES[1], SWT.NONE); label.setLayoutData(new GridData(SWT.TRAIL, SWT.TOP, false, false)); if (!attribute.isReadable()) { - value = new Text(parent, SWT.BEGINNING | SWT.BORDER | SWT.READ_ONLY); - value.setText(""); + value = _toolkit.createText(parent, "", textStyle); value.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); } else @@ -521,11 +516,9 @@ public class AttributesTabControl extends TabControl } else { - int style = 0; if (attribute.isWritable()) { - style = SWT.BEGINNING | SWT.BORDER; - value = new Text(parent, style); + value = _toolkit.createText(parent, "", SWT.BEGINNING | SWT.BORDER); value.addVerifyListener(new NumberVerifyListener()); // set data to access in the listener @@ -533,8 +526,7 @@ public class AttributesTabControl extends TabControl } else { - style = SWT.BEGINNING | SWT.BORDER | SWT.READ_ONLY; - value = new Text(parent, style); + value = _toolkit.createText(parent, "", textStyle); } value.setText(attribute.getValue().toString()); diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 5dec2125ee..642f3077fd 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -37,6 +37,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.*; import org.apache.qpid.client.message.TestMessageFactory; +import org.apache.qpid.client.message.AMQMessage; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.MessageProducer; @@ -374,7 +375,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * ping producers on the same JVM. */ private static Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedMap(new HashMap()); /** A convenient formatter to use when time stamping output. */ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); @@ -549,13 +550,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs * to be started to bounce the pings back again. * - * @param args The command line arguments. + * @param args The command line arguments. */ public static void main(String[] args) { try { - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {})); + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{})); // Create a ping producer overriding its defaults with all options passed on the command line. PingPongProducer pingProducer = new PingPongProducer(options); @@ -597,7 +598,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime); } catch (InterruptedException ie) - { } + { + } } } @@ -648,11 +650,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws JMSException Any JMSExceptions are allowed to fall through. */ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, - boolean durable) throws JMSException, AMQException + boolean durable) throws JMSException, AMQException { log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " - + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called"); + + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " + + durable + "): called"); _pingDestinations = new ArrayList(); @@ -688,8 +690,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis else { destination = - AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), - _clientID, (AMQConnection) _connection); + AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), + _clientID, (AMQConnection) _connection); log.debug("Created durable topic " + destination); } } @@ -698,11 +700,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { AMQShortString destinationName = new AMQShortString(rootName + id); destination = - new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false, - _isDurable); + new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false, + _isDurable); ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false); ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null, - ExchangeDefaults.DIRECT_EXCHANGE_NAME); + ExchangeDefaults.DIRECT_EXCHANGE_NAME); log.debug("Created queue " + destination); } @@ -715,15 +717,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * Creates consumers for the specified destinations and registers this pinger to listen to their messages. * - * @param destinations The destinations to listen to. - * @param selector A selector to filter the messages with. + * @param destinations The destinations to listen to. + * @param selector A selector to filter the messages with. * * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. */ public void createReplyConsumers(Collection destinations, String selector) throws JMSException { log.debug("public void createReplyConsumers(Collection destinations = " + destinations - + ", String selector = " + selector + "): called"); + + ", String selector = " + selector + "): called"); log.debug("Creating " + destinations.size() + " reply consumers."); @@ -731,8 +733,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { // Create a consumer for the destination and set this pinger to listen to its messages. _consumer = - _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, - selector); + _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, + selector); _consumer.setMessageListener(this); log.debug("Set this to listen to replies sent to destination: " + destination); @@ -740,8 +742,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating - * reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map. + * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a + * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the + * replies map. * * @param message The received message. */ @@ -830,26 +833,26 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a - * reply arrives, then a null reply is returned from this method. This method allows the caller to specify the - * correlation id. + * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out + * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify + * the correlation id. * - * @param message The message to send. If this is null, one is generated. - * @param numPings The number of ping messages to send. - * @param timeout The timeout in milliseconds. - * @param messageCorrelationId The message correlation id. If this is null, one is generated. + * @param message The message to send. If this is null, one is generated. + * @param numPings The number of ping messages to send. + * @param timeout The timeout in milliseconds. + * @param messageCorrelationId The message correlation id. If this is null, one is generated. * - * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for - * all prematurely. + * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait + * for all prematurely. * * @throws JMSException All underlying JMSExceptions are allowed to fall through. * @throws InterruptedException When interrupted by a timeout */ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException + throws JMSException, InterruptedException { log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); // Generate a unique correlation id to put on the messages before sending them, if one was not specified. if (messageCorrelationId == null) @@ -929,16 +932,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * Sends the specified number of ping messages and does not wait for correlating replies. * - * @param message The message to send. - * @param numPings The number of pings to send. - * @param messageCorrelationId A correlation id to place on all messages sent. + * @param message The message to send. + * @param numPings The number of pings to send. + * @param messageCorrelationId A correlation id to place on all messages sent. * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException { log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + + ", String messageCorrelationId = " + messageCorrelationId + "): called"); if (message == null) { @@ -1040,9 +1043,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction - * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, - * which will terminate the pinger. + * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch + * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will + * terminate the pinger. */ public void pingLoop() { @@ -1050,7 +1053,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { // Generate a sample message and time stamp it. Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + setTimestamp(msg); // Send the message and wait for a reply. pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); @@ -1068,7 +1071,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here. + * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set + * here. * * @param messageListener The chained message listener. */ @@ -1077,9 +1081,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _chainedMessageListener = messageListener; } - /** - * Removes any chained message listeners from this pinger. - */ + /** Removes any chained message listeners from this pinger. */ public void removeChainedMessageListener() { _chainedMessageListener = null; @@ -1088,9 +1090,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. * - * @param replyQueue The reply-to destination for the message. - * @param messageSize The desired size of the message in bytes. - * @param persistent true if the message should use persistent delivery, false otherwise. + * @param replyQueue The reply-to destination for the message. + * @param messageSize The desired size of the message in bytes. + * @param persistent true if the message should use persistent delivery, false otherwise. * * @return A freshly generated test message. * @@ -1101,23 +1103,50 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); // Timestamp the message in nanoseconds. - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + + setTimestamp(msg); return msg; } + protected void setTimestamp(Message msg) throws JMSException + { + if (((AMQSession) _producerSession).isStrictAMQP()) + { + ((AMQMessage) msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); + } + else + { + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + } + } + + protected long getTimestamp(Message msg) throws JMSException + { + + if (((AMQSession) _producerSession).isStrictAMQP()) + { + Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); + + return value == null ? 0L : value; + } + else + { + return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + } + } + + /** - * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this - * flag has been cleared. + * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag + * has been cleared. */ public void stop() { _publish = false; } - /** - * Implements a ping loop that repeatedly pings until the publish flag becomes false. - */ + /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ public void run() { // Keep running until the publish flag is cleared. @@ -1128,8 +1157,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the connection, - * this clears the publish flag which in turn will halt the ping loop. + * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the + * connection, this clears the publish flag which in turn will halt the ping loop. * * @param e The exception that triggered this callback method. */ @@ -1140,20 +1169,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with - * the runtime system as a shutdown hook. + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered + * with the runtime system as a shutdown hook. * * @return A shutdown hook for the ping loop. */ public Thread getShutdownHook() { return new Thread(new Runnable() - { - public void run() - { - stop(); - } - }); + { + public void run() + { + stop(); + } + }); } /** @@ -1202,19 +1231,18 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is * applied. This flag applies whether the pinger is transactional or not. * - *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit is - * applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the commit - * is applied. These flags will only apply if using a transactional pinger. - * - * @param session The session to commit + *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit + * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the + * commit is applied. These flags will only apply if using a transactional pinger. * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * @param session The session to commit * * @return true if the session was committed, false if it was not. * - * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit - * method, because commits only apply to transactional pingers, but fail after send applied to transactional and - * non-transactional alike. + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional and + * non-transactional alike. */ protected boolean commitTx(Session session) throws JMSException { @@ -1335,12 +1363,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link * PingPongProducer#onMessage} method is called, the chained listener set through the {@link - * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of messages - * with that correlation id. + * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of + * messages with that correlation id. * - *

Provided only one pinger is producing messages with that correlation id, the chained listener will always be given - * unique message counts. It will always be called while the producer waiting for all messages to arrive is still - * blocked. + *

Provided only one pinger is producing messages with that correlation id, the chained listener will always be + * given unique message counts. It will always be called while the producer waiting for all messages to arrive is + * still blocked. */ public static interface ChainedMessageListener { @@ -1348,8 +1376,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be added to - * this: read/write lock to make onMessage more concurrent as described in class header comment. + * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be + * added to this: read/write lock to make onMessage more concurrent as described in class header comment. */ protected static class PerCorrelationId { diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java index c822964152..a6d12d7c42 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -35,6 +35,9 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; import org.apache.qpid.requestreply.PingPongProducer; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.framing.AMQShortString; import uk.co.thebadgerset.junit.extensions.TimingController; import uk.co.thebadgerset.junit.extensions.TimingControllerAware; @@ -48,18 +51,16 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * waiting until all expected replies are received. * *

This test does not output timings for every single ping message, as when running at high volume, writing the test - * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The frequency - * of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the default of every - * {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. + * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The + * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the + * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. * - *

The size parameter logged for each individual ping is set to the size of the batch of messages that the individual - * timed ping was taken from, rather than 1 for a single message. This is so that the total throughput (messages / time) - * can be calculated in order to examine the relationship between throughput and latency. + *

The size parameter logged for each individual ping is set to the size of the batch of messages that the + * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput + * (messages / time) can be calculated in order to examine the relationship between throughput and latency. * - *

- *
CRC Card
Responsibilities Collaborations - *
Send many ping messages and output timings for sampled individual pings. - *
+ *

CRC Card
Responsibilities Collaborations
Send many ping + * messages and output timings for sampled individual pings.
*/ public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware { @@ -77,9 +78,12 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle /** Used to generate unique correlation ids for each test run. */ private AtomicLong corellationIdGenerator = new AtomicLong(); - /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + /** + * Holds test specifics by correlation id. This consists of the expected number of messages and the timing + * controler. + */ private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedMap(new HashMap()); /** Holds the batched results listener, that does logging on batch boundaries. */ private BatchedResultsListener batchedResultsListener = null; @@ -98,9 +102,7 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); } - /** - * Compile all the tests into a test suite. - */ + /** Compile all the tests into a test suite. */ public static Test suite() { // Build a new test suite @@ -133,8 +135,8 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle } /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until - * all replies have been received or a time out occurs before exiting this method. + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all + * replies have been received or a time out occurs before exiting this method. * * @param numPings The number of pings to send. */ @@ -169,9 +171,9 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle // Generate a sample message of the specified size. Message msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); @@ -190,9 +192,7 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle perCorrelationIds.remove(messageCorrelationId); } - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ public void threadSetUp() { _logger.debug("public void threadSetUp(): called"); @@ -228,14 +228,15 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle /** * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can - * be attached to the pinger, in order to receive notifications about every message received and the number remaining - * to be received. Whenever the number remaining crosses a batch size boundary this results listener outputs a test - * timing for the actual number of messages received in the current batch. + * be attached to the pinger, in order to receive notifications about every message received and the number + * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener + * outputs a test timing for the actual number of messages received in the current batch. */ private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener { /** The test results logging batch size. */ int _batchSize; + private boolean _strictAMQP; /** * Creates a results listener on the specified batch size. @@ -245,6 +246,7 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle public BatchedResultsListener(int batchSize) { _batchSize = batchSize; + _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); } /** @@ -278,7 +280,19 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle // Extract the send time from the message and work out from the current time, what the ping latency was. // The ping producer time stamps messages in nanoseconds. - long startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + long startTime; + + if (_strictAMQP) + { + Long value = ((AMQMessage) message).getTimestampProperty(new AMQShortString(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME)); + + startTime = (value == null ? 0L : value); + } + else + { + startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + } + long now = System.nanoTime(); long pingTime = now - startTime; @@ -306,8 +320,8 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle } /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of - * the total expected number of messages, and the timing controller for the thread sending those message ids. + * Holds state specific to each correlation id, needed to output test results. This consists of the count of the + * total expected number of messages, and the timing controller for the thread sending those message ids. */ private static class PerCorrelationId { -- cgit v1.2.1