From 521f76262032fd020dd804bc0088955df6c566ec Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Fri, 24 Aug 2007 15:58:08 +0000 Subject: updated for using pure JMS git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569429 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/message/TestMessageFactory.java | 16 +- .../config/AMQConnectionFactoryInitialiser.java | 35 - .../org/apache/qpid/config/AbstractConfig.java | 69 -- .../qpid/config/ConnectionFactoryInitialiser.java | 29 - .../java/org/apache/qpid/config/Connector.java | 40 -- .../org/apache/qpid/config/ConnectorConfig.java | 28 - .../config/JBossConnectionFactoryInitialiser.java | 112 --- .../org/apache/qpid/ping/PingDurableClient.java | 1 - .../org/apache/qpid/ping/PingLatencyTestPerf.java | 8 +- .../org/apache/qpid/ping/PingSendOnlyClient.java | 1 - .../qpid/requestreply/InitialContextHelper.java | 48 ++ .../apache/qpid/requestreply/PingPongBouncer.java | 453 ------------ .../apache/qpid/requestreply/PingPongProducer.java | 774 +++++++++++---------- .../apache/qpid/requestreply/PingPongTestPerf.java | 6 +- .../main/java/org/apache/qpid/topic/Config.java | 326 --------- .../main/java/org/apache/qpid/topic/Listener.java | 303 -------- .../java/org/apache/qpid/topic/MessageFactory.java | 157 ----- .../main/java/org/apache/qpid/topic/Publisher.java | 186 ----- java/perftests/src/main/java/perftests.properties | 39 ++ 19 files changed, 518 insertions(+), 2113 deletions(-) delete mode 100644 java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/config/Connector.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/topic/Config.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/topic/Listener.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java create mode 100644 java/perftests/src/main/java/perftests.properties (limited to 'java') diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java index 64ccb719b6..4d038db0a8 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -20,16 +20,12 @@ */ package org.apache.qpid.client.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.SimpleByteBufferAllocator; - import javax.jms.JMSException; import javax.jms.Session; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.BytesMessage; import javax.jms.TextMessage; -import javax.jms.Queue; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -42,15 +38,6 @@ public class TestMessageFactory return session.createTextMessage(createMessagePayload(size)); } - public static JMSTextMessage newJMSTextMessage(int size, String encoding) throws JMSException - { - ByteBuffer byteBuffer = (new SimpleByteBufferAllocator()).allocate(size, true); - JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding); - message.clearBody(); - message.setText(createMessagePayload(size)); - return message; - } - public static BytesMessage newBytesMessage(Session session, int size) throws JMSException { BytesMessage message = session.createBytesMessage(); @@ -78,7 +65,8 @@ public class TestMessageFactory } /** - * Creates an ObjectMessage with given size and sets the JMS properties (JMSReplyTo and DeliveryMode) + * Creates an ObjectMessage with given size and sets the JMS properties + * (JMSReplyTo and DeliveryMode) * @param session * @param replyDestination * @param size diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java deleted file mode 100644 index cac0064785..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.config; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.config.ConnectionFactoryInitialiser; -import org.apache.qpid.config.ConnectorConfig; - -import javax.jms.ConnectionFactory; - -class AMQConnectionFactoryInitialiser implements ConnectionFactoryInitialiser -{ - public ConnectionFactory getFactory(ConnectorConfig config) - { - return new AMQConnectionFactory(config.getHost(), config.getPort(), "/test_path"); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java deleted file mode 100644 index 14db74438f..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.config; - -public abstract class AbstractConfig -{ - public boolean setOptions(String[] argv) - { - try - { - for(int i = 0; i < argv.length - 1; i += 2) - { - String key = argv[i]; - String value = argv[i+1]; - setOption(key, value); - } - return true; - } - catch(Exception e) - { - System.out.println(e.getMessage()); - } - return false; - } - - protected int parseInt(String msg, String i) - { - try - { - return Integer.parseInt(i); - } - catch(NumberFormatException e) - { - throw new RuntimeException(msg + ": " + i, e); - } - } - - protected long parseLong(String msg, String i) - { - try - { - return Long.parseLong(i); - } - catch(NumberFormatException e) - { - throw new RuntimeException(msg + ": " + i, e); - } - } - - public abstract void setOption(String key, String value); -} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java deleted file mode 100644 index a9984eb09a..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.config; - -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; - -public interface ConnectionFactoryInitialiser -{ - public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException; -} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/Connector.java b/java/perftests/src/main/java/org/apache/qpid/config/Connector.java deleted file mode 100644 index ff2377f087..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/config/Connector.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.config; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; - -public class Connector -{ - public Connection createConnection(ConnectorConfig config) throws Exception - { - return getConnectionFactory(config).createConnection(); - } - - ConnectionFactory getConnectionFactory(ConnectorConfig config) throws Exception - { - String factory = config.getFactory(); - if(factory == null) factory = AMQConnectionFactoryInitialiser.class.getName(); - System.out.println("Using " + factory); - return ((ConnectionFactoryInitialiser) Class.forName(factory).newInstance()).getFactory(config); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java deleted file mode 100644 index b120ed3f12..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.config; - -public interface ConnectorConfig -{ - public String getHost(); - public int getPort(); - public String getFactory(); -} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java deleted file mode 100644 index a0248a8f79..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.config; - -import org.apache.qpid.config.ConnectionFactoryInitialiser; -import org.apache.qpid.config.ConnectorConfig; -import org.apache.qpid.client.JMSAMQException; - -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.management.MBeanServerConnection; -import javax.management.ObjectName; -import javax.management.MBeanException; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.naming.NameNotFoundException; -import java.util.Hashtable; - -public class JBossConnectionFactoryInitialiser implements ConnectionFactoryInitialiser -{ - public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException - { - ConnectionFactory cf = null; - InitialContext ic = null; - Hashtable ht = new Hashtable(); - ht.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); - String jbossHost = System.getProperty("jboss.host", "eqd-lxamq01"); - String jbossPort = System.getProperty("jboss.port", "1099"); - ht.put(InitialContext.PROVIDER_URL, "jnp://" + jbossHost + ":" + jbossPort); - ht.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); - - try - { - ic = new InitialContext(ht); - if (!doesDestinationExist("topictest.messages", ic)) - { - deployTopic("topictest.messages", ic); - } - if (!doesDestinationExist("topictest.control", ic)) - { - deployTopic("topictest.control", ic); - } - - cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); - return cf; - } - catch (NamingException e) - { - throw new JMSAMQException("Unable to lookup object: " + e, e); - } - catch (Exception e) - { - throw new JMSAMQException("Error creating topic: " + e, e); - } - } - - private boolean doesDestinationExist(String name, InitialContext ic) throws Exception - { - try - { - ic.lookup("/" + name); - } - catch (NameNotFoundException e) - { - return false; - } - return true; - } - - private void deployTopic(String name, InitialContext ic) throws Exception - { - MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic); - - ObjectName serverObjectName = new ObjectName("jboss.messaging:service=ServerPeer"); - - String jndiName = "/" + name; - try - { - mBeanServer.invoke(serverObjectName, "createTopic", - new Object[]{name, jndiName}, - new String[]{"java.lang.String", "java.lang.String"}); - } - catch (MBeanException e) - { - System.err.println("Error: " + e); - System.err.println("Cause: " + e.getCause()); - } - } - - private MBeanServerConnection lookupMBeanServerProxy(InitialContext ic) throws NamingException - { - return (MBeanServerConnection) ic.lookup("jmx/invoker/RMIAdaptor"); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index db6f384914..2750790354 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -116,7 +116,6 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); defaults.setProperty(RATE_PROPNAME, "20"); - defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); } diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java index 55414664da..f05d13e4e2 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -25,9 +25,6 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.message.AMQMessage; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.requestreply.PingPongProducer; import uk.co.thebadgerset.junit.extensions.TimingController; @@ -36,7 +33,6 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.ObjectMessage; import java.util.Collections; import java.util.HashMap; @@ -246,9 +242,7 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle public BatchedResultsListener(int batchSize) { _batchSize = batchSize; - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, - AMQSession.STRICT_AMQP_DEFAULT)); + _strictAMQP = false; } /** diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java index 2879f0c322..150f7c0d52 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java @@ -26,7 +26,6 @@ import java.util.Properties; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.ObjectMessage; import org.apache.log4j.Logger; diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java new file mode 100644 index 0000000000..067aad4095 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java @@ -0,0 +1,48 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.requestreply; + +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.io.InputStream; +import java.io.IOException; + +/** + * + * + */ +public class InitialContextHelper +{ + + public static Context getInitialContext(String propertyFile) throws IOException, NamingException + { + if ((propertyFile == null) || (propertyFile.length() == 0)) + { + propertyFile = "/perftests.properties"; + } + + Properties fileProperties = new Properties(); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + // NB: Need to change path to reflect package if moving classes around ! + InputStream is = cl.getResourceAsStream(propertyFile); + fileProperties.load(is); + return new InitialContext(fileProperties); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java deleted file mode 100644 index 82b36bf233..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ /dev/null @@ -1,453 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import java.io.IOException; -import java.net.InetAddress; -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.jms.*; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.Session; -import org.apache.qpid.topic.Config; -import org.apache.qpid.exchange.ExchangeDefaults; - -/** - * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return - * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes - * too. - * - *

The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages - * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique - * temporary queue or the correlation id to correlate the original message to the reply. - * - *

There is a verbose mode flag which causes information about each ping to be output to the console - * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should - * be disabled for real timing tests as writing to the console will slow things down. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Bounce back messages to their reply to destination. - *
Provide command line invocation to start the bounce back on a configurable broker url. - *
- * - * @todo Replace the command line parsing with a neater tool. - * - * @todo Make verbose accept a number of messages, only prints to console every X messages. - */ -public class PingPongBouncer implements MessageListener -{ - private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); - - /** The default prefetch size for the message consumer. */ - private static final int PREFETCH = 1; - - /** The default no local flag for the message consumer. */ - private static final boolean NO_LOCAL = true; - - private static final String DEFAULT_DESTINATION_NAME = "ping"; - - /** The default exclusive flag for the message consumer. */ - private static final boolean EXCLUSIVE = false; - - /** A convenient formatter to use when time stamping output. */ - protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ - private boolean _verbose = false; - - /** Determines whether this bounce back client bounces back messages persistently. */ - private boolean _persistent = false; - - private Destination _consumerDestination; - - /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ - private Destination _lastResponseDest; - - /** The producer for sending replies with. */ - private MessageProducer _replyProducer; - - /** The consumer controlSession. */ - private Session _consumerSession; - - /** The producer controlSession. */ - private Session _producerSession; - - /** Holds the connection to the broker. */ - private AMQConnection _connection; - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - private boolean _isPubSub = false; - - /** - * This flag is used to indicate that the user should be prompted to kill a broker, in order to test - * failover, immediately before committing a transaction. - */ - protected boolean _failBeforeCommit = false; - - /** - * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test - * failover, immediate after committing a transaction. - */ - protected boolean _failAfterCommit = false; - - /** - * Creates a PingPongBouncer on the specified producer and consumer sessions. - * - * @param brokerDetails The addresses of the brokers to connect to. - * @param username The broker username. - * @param password The broker password. - * @param virtualpath The virtual host name within the broker. - * @param destinationName The name of the queue to receive pings on - * (or root of the queue name where many queues are generated). - * @param persistent A flag to indicate that persistent message should be used. - * @param transacted A flag to indicate that pings should be sent within transactions. - * @param selector A message selector to filter received pings with. - * @param verbose A flag to indicate that message timings should be sent to the console. - * - * @throws Exception All underlying exceptions allowed to fall through. This is only test code... - */ - public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, - String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, - boolean pubsub) throws Exception - { - // Create a client id to uniquely identify this client. - InetAddress address = InetAddress.getLocalHost(); - String clientId = address.getHostName() + System.currentTimeMillis(); - _verbose = verbose; - _persistent = persistent; - setPubSub(pubsub); - // Connect to the broker. - setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); - _logger.info("Connected with URL:" + getConnection().toURL()); - - // Set up the failover notifier. - getConnection().setConnectionListener(new FailoverNotifier()); - - // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the - // command line option. - _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - - // Create the queue to listen for message on. - createConsumerDestination(destinationName); - MessageConsumer consumer = - _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); - - // Create a producer for the replies, without a default destination. - _replyProducer = _producerSession.createProducer(null); - _replyProducer.setDisableMessageTimestamp(true); - _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // Set this up to listen for messages on the queue. - consumer.setMessageListener(this); - } - - /** - * Starts a stand alone ping-pong client running in verbose mode. - * - * @param args - */ - public static void main(String[] args) throws Exception - { - System.out.println("Starting..."); - - // Display help on the command line. - if (args.length == 0) - { - _logger.info("Running test with default values..."); - //usage(); - //System.exit(0); - } - - // Extract all command line parameters. - Config config = new Config(); - config.setOptions(args); - String brokerDetails = config.getHost() + ":" + config.getPort(); - String virtualpath = "test"; - String destinationName = config.getDestination(); - if (destinationName == null) - { - destinationName = DEFAULT_DESTINATION_NAME; - } - - String selector = config.getSelector(); - boolean transacted = config.isTransacted(); - boolean persistent = config.usePersistentMessages(); - boolean pubsub = config.isPubSub(); - boolean verbose = true; - - //String selector = null; - - // Instantiate the ping pong client with the command line options and start it running. - PingPongBouncer pingBouncer = - new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, - selector, verbose, pubsub); - pingBouncer.getConnection().start(); - - System.out.println("Waiting..."); - } - - private static void usage() - { - System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" - + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" - + "-persistent : (true/false). Default is false\n" - + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); - } - - /** - * This is a callback method that is notified of all messages for which this has been registered as a message - * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to - * destination of the message. - * - * @param message The message that triggered this callback. - */ - public void onMessage(Message message) - { - try - { - String messageCorrelationId = message.getJMSCorrelationID(); - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " - + messageCorrelationId); - } - - // Get the reply to destination from the message and check it is set. - Destination responseDest = message.getJMSReplyTo(); - - if (responseDest == null) - { - _logger.debug("Cannot send reply because reply-to destination is null."); - - return; - } - - // Spew out some timing information if verbose mode is on. - if (_verbose) - { - Long timestamp = message.getLongProperty("timestamp"); - - if (timestamp != null) - { - long diff = System.currentTimeMillis() - timestamp; - _logger.info("Time to bounce point: " + diff); - } - } - - // Correlate the reply to the original. - message.setJMSCorrelationID(messageCorrelationId); - - // Send the receieved message as the pong reply. - _replyProducer.send(responseDest, message); - - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " - + messageCorrelationId); - } - - // Commit the transaction if running in transactional mode. - commitTx(_producerSession); - } - catch (JMSException e) - { - _logger.debug("There was a JMSException: " + e.getMessage(), e); - } - } - - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public AMQConnection getConnection() - { - return _connection; - } - - /** - * Sets the connection that this ping client is using. - * - * @param connection The ping connection. - */ - public void setConnection(AMQConnection connection) - { - this._connection = connection; - } - - /** - * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. - * - * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. - */ - public void setPubSub(boolean pubsub) - { - _isPubSub = pubsub; - } - - /** - * Checks whether this client is a p2p or pub/sub ping client. - * - * @return true if this client is pinging a topic, false if it is pinging a queue. - */ - public boolean isPubSub() - { - return _isPubSub; - } - - /** - * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not - * a transactional controlSession, this method does nothing. - * - *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the - * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker - * after the commit is applied. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - */ - protected void commitTx(Session session) throws JMSException - { - if (session.getTransacted()) - { - try - { - if (_failBeforeCommit) - { - _logger.trace("Failing Before Commit"); - doFailover(); - } - - session.commit(); - - if (_failAfterCommit) - { - _logger.trace("Failing After Commit"); - doFailover(); - } - - _logger.trace("Session Commited."); - } - catch (JMSException e) - { - _logger.trace("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - _logger.debug("Message rolled back."); - } - catch (JMSException jmse) - { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - } - - /** - * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - * - * @param broker The name of the broker to terminate. - */ - protected void doFailover(String broker) - { - System.out.println("Kill Broker " + broker + " now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - } - - /** - * Prompts the user to terminate the broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - */ - protected void doFailover() - { - System.out.println("Kill Broker now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - - } - - private void createConsumerDestination(String name) - { - if (isPubSub()) - { - _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name); - } - else - { - _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name); - } - } - - /** - * A connection listener that logs out any failover complete events. Could do more interesting things with this - * at some point... - */ - public static class FailoverNotifier implements ConnectionListener - { - public void bytesSent(long count) - { } - - public void bytesReceived(long count) - { } - - public boolean preFailover(boolean redirect) - { - return true; - } - - public boolean preResubscribe() - { - return true; - } - - public void failoverComplete() - { - _logger.info("App got failover complete callback."); - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index bd34fd8f20..638f5ae8c0 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -21,17 +21,8 @@ package org.apache.qpid.requestreply; import org.apache.log4j.Logger; -import org.apache.log4j.NDC; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.*; -import org.apache.qpid.client.message.AMQMessage; import org.apache.qpid.client.message.TestMessageFactory; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.MessageProducer; -import org.apache.qpid.jms.Session; -import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.CommandLineParser; import uk.co.thebadgerset.junit.extensions.BatchedThrottle; @@ -39,6 +30,7 @@ import uk.co.thebadgerset.junit.extensions.Throttle; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; import javax.jms.*; +import javax.naming.Context; import java.io.IOException; import java.net.InetAddress; @@ -56,16 +48,16 @@ import java.util.concurrent.atomic.AtomicLong; * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour * configurable. - * + *

*

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation * id in the ping to be bounced back in the reply correlation id. - * + *

*

This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: - * + *

*

*
Parameters
Parameter Default Comments *
messageSize 0 Message size in bytes. Not including any headers. @@ -92,27 +84,27 @@ import java.util.concurrent.atomic.AtomicLong; *
uniqueDests true Whether each receivers only listens to one ping destination or all. *
durableDests false Whether or not durable destinations are used. *
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: - * 0 - SESSION_TRANSACTED - * 1 - AUTO_ACKNOWLEDGE - * 2 - CLIENT_ACKNOWLEDGE - * 3 - DUPS_OK_ACKNOWLEDGE - * 257 - NO_ACKNOWLEDGE - * 258 - PRE_ACKNOWLEDGE + * 0 - SESSION_TRANSACTED + * 1 - AUTO_ACKNOWLEDGE + * 2 - CLIENT_ACKNOWLEDGE + * 3 - DUPS_OK_ACKNOWLEDGE + * 257 - NO_ACKNOWLEDGE + * 258 - PRE_ACKNOWLEDGE *
consTransacted false Whether or not consumers use transactions. Defaults to the same value - * as the 'transacted' option if not seperately defined. + * as the 'transacted' option if not seperately defined. *
consAckMode AUTO_ACK The message acknowledgement mode for consumers. Defaults to the same - * value as 'ackMode' if not seperately defined. + * value as 'ackMode' if not seperately defined. *
maxPending 0 The maximum size in bytes, of messages sent but not yet received. - * Limits the volume of messages currently buffered on the client - * or broker. Can help scale test clients by limiting amount of buffered - * data to avoid out of memory errors. + * Limits the volume of messages currently buffered on the client + * or broker. Can help scale test clients by limiting amount of buffered + * data to avoid out of memory errors. *
- * + *

*

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also * registered to terminate the ping-pong loop cleanly. - * + *

*

*
CRC Card
Responsibilities Collaborations *
Provide a ping and wait for all responses cycle. @@ -120,168 +112,249 @@ import java.util.concurrent.atomic.AtomicLong; *
* * @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than having - * one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process messages - * concurrently for different ids. Needs to be static so that when using a chained message listener and shared - * destinations between multiple PPPs, it gets notified about all replies, not just those that happen to be picked up - * by the PPP that it is atteched to. - * + * one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process messages + * concurrently for different ids. Needs to be static so that when using a chained message listener and shared + * destinations between multiple PPPs, it gets notified about all replies, not just those that happen to be picked up + * by the PPP that it is atteched to. * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. - * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a - * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last - * message waits until all other messages have been handled before releasing producers but allows messages to be - * processed concurrently, unlike the current synchronized block. - * + * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a + * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last + * message waits until all other messages have been handled before releasing producers but allows messages to be + * processed concurrently, unlike the current synchronized block. * @todo Get rid of pauses between batches, it will impact the timing statistics, and generate meanigless timings. - * Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written - * faster than it can be sent. + * Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written + * faster than it can be sent. */ public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener { - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(PingPongProducer.class); + /** + * Used for debugging. + */ + private static final Logger _log = Logger.getLogger(PingPongProducer.class); - /** Holds the name of the property to get the test message size from. */ + /** + * Helds the factory name + */ + public static final String FACTORY_NAME_PROPNAME = "factoryName"; + public static final String FACTORY_NAME_DEAFULT = "local"; + + /** + * Helds the file properties name + */ + public static final String FILE_PROPERTIES_PROPNAME = "properties"; + public static final String FILE_PROPERTIES_DEAFULT = "/perftests.properties"; + + /** + * Holds the name of the property to get the test message size from. + */ public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; - /** Used to set up a default message size. */ + /** + * Used to set up a default message size. + */ public static final int MESSAGE_SIZE_DEAFULT = 0; - /** Holds the name of the property to get the ping queue name from. */ + /** + * Holds the name of the property to get the ping queue name from. + */ public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; - /** Holds the name of the default destination to send pings on. */ + /** + * Holds the name of the default destination to send pings on. + */ public static final String PING_QUEUE_NAME_DEFAULT = "ping"; - /** Holds the name of the property to get the test delivery mode from. */ + /** + * Holds the name of the property to get the test delivery mode from. + */ public static final String PERSISTENT_MODE_PROPNAME = "persistent"; - /** Holds the message delivery mode to use for the test. */ + /** + * Holds the message delivery mode to use for the test. + */ public static final boolean PERSISTENT_MODE_DEFAULT = false; - /** Holds the name of the property to get the test transactional mode from. */ + /** + * Holds the name of the property to get the test transactional mode from. + */ public static final String TRANSACTED_PROPNAME = "transacted"; - /** Holds the transactional mode to use for the test. */ + /** + * Holds the transactional mode to use for the test. + */ public static final boolean TRANSACTED_DEFAULT = false; public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; - /** Holds the name of the property to get the test broker url from. */ - public static final String BROKER_PROPNAME = "broker"; - - /** Holds the default broker url for the test. */ - public static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** Holds the name of the property to get the test broker virtual path. */ - public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; - - /** Holds the default virtual path for the test. */ - public static final String VIRTUAL_HOST_DEFAULT = ""; - - /** Holds the name of the property to get the message rate from. */ + /** + * Holds the name of the property to get the message rate from. + */ public static final String RATE_PROPNAME = "rate"; - /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + /** + * Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. + */ public static final int RATE_DEFAULT = 0; - /** Holds the name of the property to get the verbose mode proeprty from. */ + /** + * Holds the name of the property to get the verbose mode proeprty from. + */ public static final String VERBOSE_PROPNAME = "verbose"; - /** Holds the default verbose mode. */ + /** + * Holds the default verbose mode. + */ public static final boolean VERBOSE_DEFAULT = false; - /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ + /** + * Holds the name of the property to get the p2p or pub/sub messaging mode from. + */ public static final String PUBSUB_PROPNAME = "pubsub"; - /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + /** + * Holds the pub/sub mode default, true means ping a topic, false means ping a queue. + */ public static final boolean PUBSUB_DEFAULT = false; - /** Holds the name of the property to get the fail after commit flag from. */ + /** + * Holds the name of the property to get the fail after commit flag from. + */ public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; - /** Holds the default failover after commit test flag. */ + /** + * Holds the default failover after commit test flag. + */ public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; - /** Holds the name of the proeprty to get the fail before commit flag from. */ + /** + * Holds the name of the proeprty to get the fail before commit flag from. + */ public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; - /** Holds the default failover before commit test flag. */ + /** + * Holds the default failover before commit test flag. + */ public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; - /** Holds the name of the proeprty to get the fail after send flag from. */ + /** + * Holds the name of the proeprty to get the fail after send flag from. + */ public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; - /** Holds the default failover after send test flag. */ + /** + * Holds the default failover after send test flag. + */ public static final boolean FAIL_AFTER_SEND_DEFAULT = false; - /** Holds the name of the property to get the fail before send flag from. */ + /** + * Holds the name of the property to get the fail before send flag from. + */ public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; - /** Holds the default failover before send test flag. */ + /** + * Holds the default failover before send test flag. + */ public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; - /** Holds the name of the property to get the fail once flag from. */ + /** + * Holds the name of the property to get the fail once flag from. + */ public static final String FAIL_ONCE_PROPNAME = "failOnce"; - /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ + /** + * The default failover once flag, true means only do one failover, false means failover on every commit cycle. + */ public static final boolean FAIL_ONCE_DEFAULT = true; - /** Holds the name of the property to get the broker access username from. */ + /** + * Holds the name of the property to get the broker access username from. + */ public static final String USERNAME_PROPNAME = "username"; - /** Holds the default broker log on username. */ + /** + * Holds the default broker _log on username. + */ public static final String USERNAME_DEFAULT = "guest"; - /** Holds the name of the property to get the broker access password from. */ + /** + * Holds the name of the property to get the broker access password from. + */ public static final String PASSWORD_PROPNAME = "password"; - /** Holds the default broker log on password. */ + /** + * Holds the default broker _log on password. + */ public static final String PASSWORD_DEFAULT = "guest"; - /** Holds the name of the proeprty to get the. */ + /** + * Holds the name of the proeprty to get the. + */ public static final String SELECTOR_PROPNAME = "selector"; - /** Holds the default message selector. */ + /** + * Holds the default message selector. + */ public static final String SELECTOR_DEFAULT = ""; - /** Holds the name of the property to get the destination count from. */ + /** + * Holds the name of the property to get the destination count from. + */ public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; - /** Defines the default number of destinations to ping. */ + /** + * Defines the default number of destinations to ping. + */ public static final int DESTINATION_COUNT_DEFAULT = 1; - /** Holds the name of the property to get the number of consumers per destination from. */ + /** + * Holds the name of the property to get the number of consumers per destination from. + */ public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; - /** Defines the default number consumers per destination. */ + /** + * Defines the default number consumers per destination. + */ public static final int NUM_CONSUMERS_DEFAULT = 1; - /** Holds the name of the property to get the waiting timeout for response messages. */ + /** + * Holds the name of the property to get the waiting timeout for response messages. + */ public static final String TIMEOUT_PROPNAME = "timeout"; - /** Default time to wait before assuming that a ping has timed out. */ + /** + * Default time to wait before assuming that a ping has timed out. + */ public static final long TIMEOUT_DEFAULT = 30000; - /** Holds the name of the property to get the commit batch size from. */ + /** + * Holds the name of the property to get the commit batch size from. + */ public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; - /** Defines the default number of pings to send in each transaction when running transactionally. */ + /** + * Defines the default number of pings to send in each transaction when running transactionally. + */ public static final int TX_BATCH_SIZE_DEFAULT = 1; - /** Holds the name of the property to get the unique destinations flag from. */ + /** + * Holds the name of the property to get the unique destinations flag from. + */ public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; - /** Defines the default value for the unique destinations property. */ + /** + * Defines the default value for the unique destinations property. + */ public static final boolean UNIQUE_DESTS_DEFAULT = true; - public static final String DURABLE_DESTS_PROPNAME = "durableDests"; - public static final boolean DURABLE_DESTS_DEFAULT = false; - - /** Holds the name of the proeprty to get the message acknowledgement mode from. */ + /** + * Holds the name of the proeprty to get the message acknowledgement mode from. + */ public static final String ACK_MODE_PROPNAME = "ackMode"; - /** Defines the default message acknowledgement mode. */ + /** + * Defines the default message acknowledgement mode. + */ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; @@ -290,27 +363,37 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti public static final String MAX_PENDING_PROPNAME = "maxPending"; public static final int MAX_PENDING_DEFAULT = 0; - /** Defines the default prefetch size to use when consuming messages. */ + /** + * Defines the default prefetch size to use when consuming messages. + */ public static final int PREFETCH_DEFAULT = 100; - /** Defines the default value of the no local flag to use when consuming messages. */ + /** + * Defines the default value of the no local flag to use when consuming messages. + */ public static final boolean NO_LOCAL_DEFAULT = false; - /** Defines the default value of the exclusive flag to use when consuming messages. */ + /** + * Defines the default value of the exclusive flag to use when consuming messages. + */ public static final boolean EXCLUSIVE_DEFAULT = false; - /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ + /** + * Holds the name of the property to store nanosecond timestamps in ping messages with. + */ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; - /** Holds the default configuration properties. */ + /** + * Holds the default configuration properties. + */ public static ParsedProperties defaults = new ParsedProperties(); static { - defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); + defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); - defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); @@ -322,7 +405,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); - defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); @@ -336,6 +418,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); } + protected String _factoryName; + protected String _fileProperties; protected String _brokerDetails; protected String _username; protected String _password; @@ -345,54 +429,81 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti protected boolean _transacted; protected boolean _consTransacted; - /** Determines whether this producer sends persistent messages. */ + /** + * Determines whether this producer sends persistent messages. + */ protected boolean _persistent; - /** Holds the acknowledgement mode used for sending and receiving messages. */ + /** + * Holds the acknowledgement mode used for sending and receiving messages. + */ protected int _ackMode; protected int _consAckMode; - /** Determines what size of messages this producer sends. */ + /** + * Determines what size of messages this producer sends. + */ protected int _messageSize; - /** Used to indicate that the ping loop should print out whenever it pings. */ + /** + * Used to indicate that the ping loop should print out whenever it pings. + */ protected boolean _verbose; - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + /** + * Flag used to indicate if this is a point to point or pub/sub ping client. + */ protected boolean _isPubSub; - /** Flag used to indicate if the destinations should be unique client. */ + /** + * Flag used to indicate if the destinations should be unique client. + */ protected boolean _isUnique; - /** Flag used to indicate that durable destination should be used. */ - protected boolean _isDurable; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. + */ protected boolean _failBeforeCommit; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. + */ protected boolean _failAfterCommit; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. + */ protected boolean _failBeforeSend; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. + */ protected boolean _failAfterSend; - /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ + /** + * Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. + */ protected boolean _failOnce; - /** Holds the number of sends that should be performed in every transaction when using transactions. */ + /** + * Holds the number of sends that should be performed in every transaction when using transactions. + */ protected int _txBatchSize; - /** Holds the number of destinations to ping. */ + /** + * Holds the number of destinations to ping. + */ protected int _noOfDestinations; - /** Holds the number of consumers per destination. */ + /** + * Holds the number of consumers per destination. + */ protected int _noOfConsumers; - /** Holds the maximum send rate in herz. */ + /** + * Holds the maximum send rate in herz. + */ protected int _rate; /** @@ -407,10 +518,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ protected Object _sendPauseMonitor = new Object(); - /** Keeps a count of the number of message currently sent but not received. */ + /** + * Keeps a count of the number of message currently sent but not received. + */ protected AtomicInteger _unreceived = new AtomicInteger(0); - /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ + /** + * A source for providing sequential unique correlation ids. These will be unique within the same JVM. + */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); /** @@ -418,33 +533,51 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * ping producers on the same JVM. */ private static Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedMap(new HashMap()); - /** A convenient formatter to use when time stamping output. */ + /** + * A convenient formatter to use when time stamping output. + */ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - /** Holds the connection to the broker. */ + /** + * Holds the connection to the broker. + */ protected Connection _connection; - /** Holds the consumer connections. */ + /** + * Holds the consumer connections. + */ protected Connection[] _consumerConnection; - /** Holds the controlSession on which ping replies are received. */ + /** + * Holds the controlSession on which ping replies are received. + */ protected Session[] _consumerSession; - /** Holds the producer controlSession, needed to create ping messages. */ - protected Session _producerSession; + /** + * Holds the producer controlSession, needed to create ping messages. + */ + protected Session _producerSession = (Session) _connection.createSession(_transacted, _ackMode); - /** Holds the destination where the response messages will arrive. */ + /** + * Holds the destination where the response messages will arrive. + */ protected Destination _replyDestination; - /** Holds the set of destinations that this ping producer pings. */ + /** + * Holds the set of destinations that this ping producer pings. + */ protected List _pingDestinations; - /** Used to restrict the sending rate to a specified limit. */ + /** + * Used to restrict the sending rate to a specified limit. + */ protected Throttle _rateLimiter; - /** Holds a message listener that this message listener chains all its messages to. */ + /** + * Holds a message listener that this message listener chains all its messages to. + */ protected ChainedMessageListener _chainedMessageListener = null; /** @@ -459,20 +592,29 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ protected AtomicInteger _queueSharedID = new AtomicInteger(); - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + /** + * Used to tell the ping loop when to terminate, it only runs while this is true. + */ protected boolean _publish = true; - /** Holds the message producer to send the pings through. */ + /** + * Holds the message producer to send the pings through. + */ protected MessageProducer _producer; - /** Holds the message consumer to receive the ping replies through. */ + /** + * Holds the message consumer to receive the ping replies through. + */ protected MessageConsumer[] _consumer; - /** The prompt to display when asking the user to kill the broker for failover testing. */ + /** + * The prompt to display when asking the user to kill the broker for failover testing. + */ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; - private String _clientID; - /** Keeps count of the total messages sent purely for debugging purposes. */ + /** + * Keeps count of the total messages sent purely for debugging purposes. + */ private static AtomicInteger numSent = new AtomicInteger(); /** @@ -480,23 +622,20 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on * it, to send and recieve its pings and replies on. * - * @param overrides Properties containing any desired overrides to the defaults. - * + * @param overrides Properties containing any desired overrides to the defaults. * @throws Exception Any exceptions are allowed to fall through. */ public PingPongProducer(Properties overrides) throws Exception { - // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); - + // _log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); // Create a set of parsed properties from the defaults overriden by the passed in values. ParsedProperties properties = new ParsedProperties(defaults); properties.putAll(overrides); - // Extract the configuration properties to set the pinger up with. - _brokerDetails = properties.getProperty(BROKER_PROPNAME); + _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); + _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); _username = properties.getProperty(USERNAME_PROPNAME); _password = properties.getProperty(PASSWORD_PROPNAME); - _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); _selector = properties.getProperty(SELECTOR_PROPNAME); _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); @@ -515,26 +654,20 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti _rate = properties.getPropertyAsInteger(RATE_PROPNAME); _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); - _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME); _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); - // Check that one or more destinations were specified. if (_noOfDestinations < 1) { throw new IllegalArgumentException("There must be at least one destination."); } - // Set up a throttle to control the send rate, if a rate > 0 is specified. if (_rate > 0) { _rateLimiter = new BatchedThrottle(); _rateLimiter.setRate(_rate); } - - // Create the connection and message producers/consumers. - // establishConnection(true, true); } /** @@ -543,33 +676,32 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * * @param producer Flag to indicate whether or not the producer should be set up. * @param consumer Flag to indicate whether or not the consumers should be set up. - * * @throws Exception Any exceptions are allowed to fall through. */ public void establishConnection(boolean producer, boolean consumer) throws Exception { - // log.debug("public void establishConnection(): called"); + // _log.debug("public void establishConnection(): called"); // Generate a unique identifying name for this client, based on it ip address and the current time. InetAddress address = InetAddress.getLocalHost(); - _clientID = address.getHostName() + System.currentTimeMillis(); + String _clientID = address.getHostName() + System.currentTimeMillis(); // Create a connection to the broker. createConnection(_clientID); // Create transactional or non-transactional sessions, based on the command line arguments. - _producerSession = (Session) _connection.createSession(_transacted, _ackMode); + _producerSession = _connection.createSession(_transacted, _ackMode); _consumerSession = new Session[_noOfConsumers]; for (int i = 0; i < _noOfConsumers; i++) { - _consumerSession[i] = (Session) _consumerConnection[i].createSession(_consTransacted, _consAckMode); + _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); } // Create the destinations to send pings to and receive replies from. _replyDestination = _consumerSession[0].createTemporaryQueue(); - createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); + createPingDestinations(_noOfDestinations, _selector, _destinationName); // Create the message producer only if instructed to. if (producer) @@ -589,25 +721,24 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * created with. * * @param clientID The clients identifier. - * - * @throws AMQException Any underlying exceptions are allowed to fall through. - * @throws URLSyntaxException Any underlying exceptions are allowed to fall through. + * @throws Exception Any underlying exceptions are allowed to fall through. */ - protected void createConnection(String clientID) throws AMQException, URLSyntaxException + protected void createConnection(String clientID) throws Exception { - // log.debug("protected void createConnection(String clientID = " + clientID + "): called"); + // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); - // log.debug("Creating a connection for the message producer."); + // _log.debug("Creating a connection for the message producer."); + Context context = InitialContextHelper.getInitialContext(_fileProperties); + ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); + _connection = factory.createConnection(_username, _password); - _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); - - // log.debug("Creating " + _noOfConsumers + " connections for the consumers."); + // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); _consumerConnection = new Connection[_noOfConsumers]; for (int i = 0; i < _noOfConsumers; i++) { - _consumerConnection[i] = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); + _consumerConnection[i] = factory.createConnection(_username, _password); } } @@ -621,8 +752,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti { try { - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + Properties options = CommandLineParser + .processCommandLine(args, new CommandLineParser(new String[][]{}), System.getProperties()); // Create a ping producer overriding its defaults with all options passed on the command line. PingPongProducer pingProducer = new PingPongProducer(options); @@ -645,7 +776,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti catch (Exception e) { System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); + _log.error("Top level handler caught execption.", e); System.exit(1); } } @@ -664,7 +795,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti Thread.sleep(sleepTime); } catch (InterruptedException ie) - { } + { + // do nothing + } } } @@ -676,12 +809,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public List getReplyDestinations() { - // log.debug("public List getReplyDestinations(): called"); + // _log.debug("public List getReplyDestinations(): called"); List replyDestinations = new ArrayList(); replyDestinations.add(_replyDestination); - // log.debug("replyDestinations = " + replyDestinations); + // _log.debug("replyDestinations = " + replyDestinations); return replyDestinations; } @@ -694,84 +827,58 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public void createProducer() throws JMSException { - // log.debug("public void createProducer(): called"); - - _producer = (MessageProducer) _producerSession.createProducer(null); + _producer = _producerSession.createProducer(null); _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + if (_log.isDebugEnabled()) + { + _log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + } } /** * Creates consumers for the specified number of destinations. The destinations themselves are also created by this * method. * - * @param noOfDestinations The number of destinations to create consumers for. - * @param selector The message selector to filter the consumers with. - * @param rootName The root of the name, or actual name if only one is being created. - * @param unique true to make the destinations unique to this pinger, false to share the - * numbering with all pingers on the same JVM. - * + * @param noOfDestinations The number of destinations to create consumers for. + * @param selector The message selector to filter the consumers with. + * @param rootName The root of the name, or actual name if only one is being created. * @throws JMSException Any JMSExceptions are allowed to fall through. */ - public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, - boolean durable) throws JMSException, AMQException + public void createPingDestinations(int noOfDestinations, String selector, String rootName) throws JMSException { - /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " - + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called");*/ - + if (_log.isDebugEnabled()) + { + _log.debug( + "public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + selector + ", String rootName = " + rootName); + } _pingDestinations = new ArrayList(); - // Create the desired number of ping destinations and consumers for them. - // log.debug("Creating " + noOfDestinations + " destinations to ping."); - + if (_log.isDebugEnabled()) + { + _log.debug("Creating " + noOfDestinations + " destinations to ping."); + } + String id; for (int i = 0; i < noOfDestinations; i++) { - AMQDestination destination; - - String id; - - // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. - if (unique) - { - // log.debug("Creating unique destinations."); - id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); - } - else - { - // log.debug("Creating shared destinations."); - id = "_" + _queueSharedID.incrementAndGet(); - } - + Destination destination; + id = "_" + _queueSharedID.incrementAndGet(); // Check if this is a pub/sub pinger, in which case create topics. if (_isPubSub) { - if (!durable) + destination = _producerSession.createTopic(rootName + id); + if (_log.isDebugEnabled()) { - destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id); - // log.debug("Created non-durable topic " + destination); - } - else - { - destination = - AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), - _clientID, (AMQConnection) _connection); - // log.debug("Created durable topic " + destination); + _log.debug("Created topic " + rootName + id); } } // Otherwise this is a p2p pinger, in which case create queues. else { - AMQShortString destinationName = new AMQShortString(rootName + id); - destination = - new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false, - _isDurable); - ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false); - ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null, - ExchangeDefaults.DIRECT_EXCHANGE_NAME); - - // log.debug("Created queue " + destination); + destination = _producerSession.createQueue(rootName + id); + if (_log.isDebugEnabled()) + { + _log.debug("Created queue " + rootName + id); + } } // Keep the destination. @@ -784,17 +891,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * * @param destinations The destinations to listen to. * @param selector A selector to filter the messages with. - * * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. */ public void createReplyConsumers(Collection destinations, String selector) throws JMSException { - /*log.debug("public void createReplyConsumers(Collection destinations = " + destinations + /*_log.debug("public void createReplyConsumers(Collection destinations = " + destinations + ", String selector = " + selector + "): called");*/ - // log.debug("There are " + destinations.size() + " destinations."); - // log.debug("Creating " + _noOfConsumers + " consumers on each destination."); - // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); + // _log.debug("There are " + destinations.size() + " destinations."); + // _log.debug("Creating " + _noOfConsumers + " consumers on each destination."); + // _log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); for (Destination destination : destinations) { @@ -803,21 +909,19 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti for (int i = 0; i < _noOfConsumers; i++) { // Create a consumer for the destination and set this pinger to listen to its messages. - _consumer[i] = - _consumerSession[i].createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, - selector); + _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); final int consumerNo = i; _consumer[i].setMessageListener(new MessageListener() + { + public void onMessage(Message message) { - public void onMessage(Message message) - { - onMessageWithConsumerNo(message, consumerNo); - } - }); + onMessageWithConsumerNo(message, consumerNo); + } + }); - // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); + // _log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); } } } @@ -831,7 +935,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public void onMessageWithConsumerNo(Message message, int consumerNo) { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); + // _log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); try { long now = System.nanoTime(); @@ -842,13 +946,13 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Extract the messages correlation id. String correlationID = message.getJMSCorrelationID(); - // log.debug("correlationID = " + correlationID); + // _log.debug("correlationID = " + correlationID); int num = message.getIntProperty("MSG_NUM"); - // log.info("Message " + num + " received."); + // _log.info("Message " + num + " received."); boolean isRedelivered = message.getJMSRedelivered(); - // log.debug("isRedelivered = " + isRedelivered); + // _log.debug("isRedelivered = " + isRedelivered); if (!isRedelivered) { @@ -862,7 +966,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Restart the timeout timer on every message. perCorrelationId.timeOutStart = System.nanoTime(); - // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + // _log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); // Decrement the countdown latch. Before this point, it is possible that two threads might enter this // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block @@ -887,7 +991,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) // && (_sendPauseBarrier.getNumberWaiting() == 1)) { - // log.debug("unreceived size estimate under limit = " + unreceivedSize); + // _log.debug("unreceived size estimate under limit = " + unreceivedSize); // Wait on the send pause barrier for the limit to be re-established. /*try @@ -908,19 +1012,19 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // NDC.push("/rem" + remainingCount); - // log.debug("remainingCount = " + remainingCount); - // log.debug("trueCount = " + trueCount); + // _log.debug("remainingCount = " + remainingCount); + // _log.debug("trueCount = " + trueCount); // Commit on transaction batch size boundaries. At this point in time the waiting producer remains // blocked, even on the last message. // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on // each batch boundary. For pub/sub each consumer gets every message so no division is done. long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); - // log.debug("commitCount = " + commitCount); + // _log.debug("commitCount = " + commitCount); if ((commitCount % _txBatchSize) == 0) { - // log.debug("Trying commit for consumer " + consumerNo + "."); + // _log.debug("Trying commit for consumer " + consumerNo + "."); commitTx(_consumerSession[consumerNo]); } @@ -940,12 +1044,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } else { - log.warn("Got unexpected message with correlationId: " + correlationID); + _log.warn("Got unexpected message with correlationId: " + correlationID); } } else { - log.warn("Got redelivered message, ignoring."); + _log.warn("Got redelivered message, ignoring."); } // Print out ping times for every message in verbose mode only. @@ -956,17 +1060,17 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti if (timestamp != null) { long diff = System.nanoTime() - timestamp; - //log.trace("Time for round trip (nanos): " + diff); + //_log.trace("Time for round trip (nanos): " + diff); } }*/ } catch (JMSException e) { - log.warn("There was a JMSException: " + e.getMessage(), e); + _log.warn("There was a JMSException: " + e.getMessage(), e); } finally { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); + // _log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); // NDC.clear(); } } @@ -980,17 +1084,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. * @param messageCorrelationId The message correlation id. If this is null, one is generated. - * * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait * for all prematurely. - * * @throws JMSException All underlying JMSExceptions are allowed to fall through. * @throws InterruptedException When interrupted by a timeout */ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException + throws JMSException, InterruptedException { - /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + /*_log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ // Generate a unique correlation id to put on the messages before sending them, if one was not specified. @@ -1034,31 +1136,31 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti allMessagesReceived = numReplies == getExpectedNumPings(numPings); - // log.debug("numReplies = " + numReplies); - // log.debug("allMessagesReceived = " + allMessagesReceived); + // _log.debug("numReplies = " + numReplies); + // _log.debug("allMessagesReceived = " + allMessagesReceived); // Recheck the timeout condition. long now = System.nanoTime(); long lastMessageReceievedAt = perCorrelationId.timeOutStart; timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); - // log.debug("now = " + now); - // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); + // _log.debug("now = " + now); + // _log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); } while (!timedOut && !allMessagesReceived); if ((numReplies < getExpectedNumPings(numPings)) && _verbose) { - log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); + _log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); } else if (_verbose) { - log.info("Got all replies on id, " + messageCorrelationId); + _log.info("Got all replies on id, " + messageCorrelationId); } // commitTx(_consumerSession); - // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + // _log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); return numReplies; } @@ -1077,12 +1179,11 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * @param message The message to send. * @param numPings The number of pings to send. * @param messageCorrelationId A correlation id to place on all messages sent. - * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException { - /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + /*_log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ if (message == null) @@ -1112,7 +1213,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Spew out per message timings on every message sonly in verbose mode. /*if (_verbose) { - log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); + _log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); }*/ } @@ -1131,15 +1232,13 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * * @param i The count of messages sent so far in a loop of multiple calls to this send method. * @param message The message to send. - * * @return true if the messages were committed, false otherwise. - * * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. */ protected boolean sendMessage(int i, Message message) throws JMSException { - // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); - // log.debug("_txBatchSize = " + _txBatchSize); + // _log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); + // _log.debug("_txBatchSize = " + _txBatchSize); // Round robin the destinations as the messages are sent. Destination destination = _pingDestinations.get(i % _pingDestinations.size()); @@ -1152,7 +1251,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti _failBeforeSend = false; } - // log.trace("Failing Before Send"); + // _log.trace("Failing Before Send"); waitForUser(KILL_BROKER_PROMPT); } @@ -1167,7 +1266,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti if (unreceivedSize > _maxPendingSize) { - // log.debug("unreceived size estimate over limit = " + unreceivedSize); + // _log.debug("unreceived size estimate over limit = " + unreceivedSize); // Wait on the send pause barrier for the limit to be re-established. try @@ -1200,7 +1299,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti message.setIntProperty("MSG_NUM", num); setTimestamp(message); _producer.send(message); - // log.info("Message " + num + " sent."); + // _log.info("Message " + num + " sent."); } else { @@ -1208,7 +1307,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti message.setIntProperty("MSG_NUM", num); setTimestamp(message); _producer.send(destination, message); - // log.info("Message " + num + " sent."); + // _log.info("Message " + num + " sent."); } // Increase the unreceived size, this may actually happen aftern the message is recevied. @@ -1226,7 +1325,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. if (((i + 1) % _txBatchSize) == 0) { - // log.debug("Trying commit on producer session."); + // _log.debug("Trying commit on producer session."); committed = commitTx(_producerSession); } @@ -1252,12 +1351,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti catch (JMSException e) { _publish = false; - // log.debug("There was a JMSException: " + e.getMessage(), e); + // _log.debug("There was a JMSException: " + e.getMessage(), e); } catch (InterruptedException e) { _publish = false; - // log.debug("There was an interruption: " + e.getMessage(), e); + // _log.debug("There was an interruption: " + e.getMessage(), e); } } @@ -1272,7 +1371,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti _chainedMessageListener = messageListener; } - /** Removes any chained message listeners from this pinger. */ + /** + * Removes any chained message listeners from this pinger. + */ public void removeChainedMessageListener() { _chainedMessageListener = null; @@ -1284,9 +1385,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * @param replyQueue The reply-to destination for the message. * @param messageSize The desired size of the message in bytes. * @param persistent true if the message should use persistent delivery, false otherwise. - * * @return A freshly generated test message. - * * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. */ public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException @@ -1302,29 +1401,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti protected void setTimestamp(Message msg) throws JMSException { - if (((AMQSession) _producerSession).isStrictAMQP()) - { - ((AMQMessage) msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); - } - else - { - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - } + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); } protected long getTimestamp(Message msg) throws JMSException { - - if (((AMQSession) _producerSession).isStrictAMQP()) - { - Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); - - return (value == null) ? 0L : value; - } - else - { - return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); - } + return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); } /** @@ -1346,7 +1428,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } } - /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ + /** + * Implements a ping loop that repeatedly pings until the publish flag becomes false. + */ public void run() { // Keep running until the publish flag is cleared. @@ -1364,7 +1448,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public void onException(JMSException e) { - // log.debug("public void onException(JMSException e = " + e + "): called", e); + // _log.debug("public void onException(JMSException e = " + e + "): called", e); _publish = false; } @@ -1377,12 +1461,12 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti public Thread getShutdownHook() { return new Thread(new Runnable() - { - public void run() - { - stop(); - } - }); + { + public void run() + { + stop(); + } + }); } /** @@ -1392,14 +1476,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public void close() throws JMSException { - // log.debug("public void close(): called"); + // _log.debug("public void close(): called"); try { if (_connection != null) { _connection.close(); - // log.debug("Close connection."); + // _log.debug("Close connection."); } for (int i = 0; i < _noOfConsumers; i++) @@ -1407,7 +1491,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti if (_consumerConnection[i] != null) { _consumerConnection[i].close(); - // log.debug("Closed consumer connection."); + // _log.debug("Closed consumer connection."); } } } @@ -1427,46 +1511,43 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a * transactional controlSession, this method does nothing (unless the failover after send flag is set). - * + *

*

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is * applied. This flag applies whether the pinger is transactional or not. - * + *

*

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the * commit is applied. These flags will only apply if using a transactional pinger. * * @param session The controlSession to commit - * * @return true if the controlSession was committed, false if it was not. - * * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - * * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit * method, because commits only apply to transactional pingers, but fail after send applied to transactional and * non-transactional alike. */ protected boolean commitTx(Session session) throws JMSException { - // log.debug("protected void commitTx(Session session): called"); + // _log.debug("protected void commitTx(Session session): called"); boolean committed = false; - // log.trace("Batch time reached"); + // _log.trace("Batch time reached"); if (_failAfterSend) { - // log.trace("Batch size reached"); + // _log.trace("Batch size reached"); if (_failOnce) { _failAfterSend = false; } - // log.trace("Failing After Send"); + // _log.trace("Failing After Send"); waitForUser(KILL_BROKER_PROMPT); } if (session.getTransacted()) { - // log.debug("Session is transacted."); + // _log.debug("Session is transacted."); try { @@ -1477,14 +1558,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti _failBeforeCommit = false; } - // log.trace("Failing Before Commit"); + // _log.trace("Failing Before Commit"); waitForUser(KILL_BROKER_PROMPT); } long start = System.nanoTime(); session.commit(); committed = true; - // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); + // _log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); if (_failAfterCommit) { @@ -1493,30 +1574,23 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti _failAfterCommit = false; } - // log.trace("Failing After Commit"); + // _log.trace("Failing After Commit"); waitForUser(KILL_BROKER_PROMPT); } - // log.debug("Session Commited."); + // _log.debug("Session Commited."); } catch (JMSException e) { - // log.debug("JMSException on commit:" + e.getMessage(), e); - - // Warn that the bounce back client is not available. - if (e.getLinkedException() instanceof AMQNoConsumersException) - { - // log.debug("No consumers on queue."); - } - + // _log.debug("JMSException on commit:" + e.getMessage(), e); try { session.rollback(); - // log.debug("Message rolled back."); + // _log.debug("Message rolled back."); } catch (JMSException jmse) { - // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); + // _log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); // Both commit and rollback failed. Throw the rollback exception. throw jmse; @@ -1562,14 +1636,13 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * Calculates how many pings are expected to be received for the given number sent. * * @param numpings The number of pings that will be sent. - * * @return The number that should be received, for the test to pass. */ public int getExpectedNumPings(int numpings) { - // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); + // _log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); - // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); + // _log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); return numpings * (_isPubSub ? getConsumersPerDestination() : 1); } @@ -1579,7 +1652,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of * messages with that correlation id. - * + *

*

Provided only one pinger is producing messages with that correlation id, the chained listener will always be * given unique message counts. It will always be called while the producer waiting for all messages to arrive is * still blocked. @@ -1593,7 +1666,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * @param message The newly arrived message. * @param remainingCount The number of messages left to complete the test. * @param latency The nanosecond latency of the message. - * * @throws JMSException Any JMS exceptions is allowed to fall through. */ public void onMessage(Message message, int remainingCount, long latency) throws JMSException; @@ -1605,10 +1677,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ protected static class PerCorrelationId { - /** Holds a countdown on number of expected messages. */ + /** + * Holds a countdown on number of expected messages. + */ CountDownLatch trafficLight; - /** Holds the last timestamp that the timeout was reset to. */ + /** + * Holds the last timestamp that the timeout was reset to. + */ Long timeOutStart; } } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java index f289fe0db2..780589768f 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -172,10 +172,10 @@ public class PingPongTestPerf extends AsymptoticTestCase PerThreadSetup perThreadSetup = new PerThreadSetup(); // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String fileProperties = testParameters.getProperty(PingPongProducer.FILE_PROPERTIES_PROPNAME); + String factoryName = testParameters.getProperty(PingPongProducer.FACTORY_NAME_PROPNAME); String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); - String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); @@ -187,7 +187,7 @@ public class PingPongTestPerf extends AsymptoticTestCase { // Establish a bounce back client on the ping queue to bounce back the pings. perThreadSetup._testPingBouncer = - new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, + new PingPongBouncer(fileProperties, factoryName, username, password, destinationName, persistent, transacted, selector, verbose, pubsub); // Start the connections for client and producer running. diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java deleted file mode 100644 index d5c0979399..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.topic; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.config.ConnectorConfig; -import org.apache.qpid.config.Connector; -import org.apache.qpid.config.AbstractConfig; - -import javax.jms.Connection; - -public class Config extends AbstractConfig implements ConnectorConfig -{ - - private String host = "localhost"; - private int port = 5672; - private String factory = null; - - private int payload = 256; - private int messages = 1000; - private int clients = 1; - private int batch = 1; - private long delay = 1; - private int warmup; - private int ackMode= AMQSession.NO_ACKNOWLEDGE; - private String clientId; - private String subscriptionId; - private String selector; - private String destinationName; - private boolean persistent; - private boolean transacted; - private int destinationsCount; - private int batchSize; - private int rate; - private boolean ispubsub; - private long timeout; - - public Config() - { - } - - public int getAckMode() - { - return ackMode; - } - - public void setPayload(int payload) - { - this.payload = payload; - } - - public int getPayload() - { - return payload; - } - - void setClients(int clients) - { - this.clients = clients; - } - - int getClients() - { - return clients; - } - - void setMessages(int messages) - { - this.messages = messages; - } - - public int getMessages() - { - return messages; - } - - public int getBatchSize() - { - return batchSize; - } - - public int getRate() - { - return rate; - } - - public int getDestinationsCount() - { - return destinationsCount; - } - - public String getHost() - { - return host; - } - - public void setHost(String host) - { - this.host = host; - } - - public int getPort() - { - return port; - } - - public String getFactory() - { - return factory; - } - - public void setPort(int port) - { - this.port = port; - } - - int getBatch() - { - return batch; - } - - void setBatch(int batch) - { - this.batch = batch; - } - - int getWarmup() - { - return warmup; - } - - void setWarmup(int warmup) - { - this.warmup = warmup; - } - - public long getDelay() - { - return delay; - } - - public void setDelay(long delay) - { - this.delay = delay; - } - - public long getTimeout() - { - return timeout; - } - - public void setTimeout(long time) - { - this.timeout = time; - } - - public String getClientId() - { - return clientId; - } - - public String getSubscriptionId() - { - return subscriptionId; - } - - public String getSelector() - { - return selector; - } - - public String getDestination() - { - return destinationName; - } - - public boolean usePersistentMessages() - { - return persistent; - } - - public boolean isTransacted() - { - return transacted; - } - - public boolean isPubSub() - { - return ispubsub; - } - - public void setOption(String key, String value) - { - if("-host".equalsIgnoreCase(key)) - { - setHost(value); - } - else if("-port".equalsIgnoreCase(key)) - { - try - { - setPort(Integer.parseInt(value)); - } - catch(NumberFormatException e) - { - throw new RuntimeException("Bad port number: " + value, e); - } - } - else if("-payload".equalsIgnoreCase(key)) - { - setPayload(parseInt("Bad payload size", value)); - } - else if("-messages".equalsIgnoreCase(key)) - { - setMessages(parseInt("Bad message count", value)); - } - else if("-clients".equalsIgnoreCase(key)) - { - setClients(parseInt("Bad client count", value)); - } - else if("-batch".equalsIgnoreCase(key)) - { - setBatch(parseInt("Bad batch count", value)); - } - else if("-delay".equalsIgnoreCase(key)) - { - setDelay(parseLong("Bad batch delay", value)); - } - else if("-warmup".equalsIgnoreCase(key)) - { - setWarmup(parseInt("Bad warmup count", value)); - } - else if("-ack".equalsIgnoreCase(key)) - { - ackMode = parseInt("Bad ack mode", value); - } - else if("-factory".equalsIgnoreCase(key)) - { - factory = value; - } - else if("-clientId".equalsIgnoreCase(key)) - { - clientId = value; - } - else if("-subscriptionId".equalsIgnoreCase(key)) - { - subscriptionId = value; - } - else if("-persistent".equalsIgnoreCase(key)) - { - persistent = "true".equalsIgnoreCase(value); - } - else if("-transacted".equalsIgnoreCase(key)) - { - transacted = "true".equalsIgnoreCase(value); - } - else if ("-destinationscount".equalsIgnoreCase(key)) - { - destinationsCount = parseInt("Bad destinations count", value); - } - else if ("-batchsize".equalsIgnoreCase(key)) - { - batchSize = parseInt("Bad batch size", value); - } - else if ("-rate".equalsIgnoreCase(key)) - { - rate = parseInt("MEssage rate", value); - } - else if("-pubsub".equalsIgnoreCase(key)) - { - ispubsub = "true".equalsIgnoreCase(value); - } - else if("-selector".equalsIgnoreCase(key)) - { - selector = value; - } - else if("-destinationname".equalsIgnoreCase(key)) - { - destinationName = value; - } - else if("-timeout".equalsIgnoreCase(key)) - { - setTimeout(parseLong("Bad timeout data", value)); - } - else - { - System.out.println("Ignoring unrecognised option: " + key); - } - } - - static String getAckModeDescription(int ackMode) - { - switch(ackMode) - { - case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE"; - case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE"; - case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE"; - case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE"; - case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE"; - } - return "AckMode=" + ackMode; - } - - public Connection createConnection() throws Exception - { - return new Connector().createConnection(this); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java deleted file mode 100644 index 6dcea42bfe..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.topic; - -import java.util.Random; - -import javax.jms.*; - -import org.apache.log4j.Logger; -import org.apache.log4j.NDC; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.exchange.ExchangeDefaults; - -/** - * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for - * cross testing the java and cpp clients. - * - *

How the cpp topic_publisher operates: - * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for - * the specified number of test messages to be sent. - * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST", - * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The - * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message - * about the number of messages received and how long it took, although the publisher never looks at the message content. - * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST", - * which the listener should close its connection and terminate upon receipt of. - * - * @todo I've added lots of field table types in the report message, just to check if the other end can decode them - * correctly. Not really the right place to test this, so remove them from - * {@link #createReportResponseMessage(String)} once a better test exists. - */ -public class Listener implements MessageListener -{ - private static Logger log = Logger.getLogger(Listener.class); - - public static final String CONTROL_TOPIC = "topic_control"; - public static final String RESPONSE_QUEUE = "response"; - - private final Topic _topic; - //private final Topic _control; - - private final Queue _response; - - /** Holds the connection to listen on. */ - private final Connection _connection; - - /** Holds the producer to send control messages on. */ - private final MessageProducer _controller; - - /** Holds the JMS session. */ - private final javax.jms.Session _session; - - /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ - private boolean init; - - /** Holds the count of messages received by this listener. */ - private int count; - - /** Used to hold the start time of the first message. */ - private long start; - private static String clientId; - - Listener(Connection connection, int ackMode, String name) throws Exception - { - log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name - + "): called"); - - _connection = connection; - _session = connection.createSession(false, ackMode); - - if (_session instanceof AMQSession) - { - _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, CONTROL_TOPIC); - //_control = new AMQTopic(CONTROL_TOPIC); - _response = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, RESPONSE_QUEUE); - } - else - { - _topic = _session.createTopic(CONTROL_TOPIC); - //_control = _session.createTopic(CONTROL_TOPIC); - _response = _session.createQueue(RESPONSE_QUEUE); - } - - //register for events - if (name == null) - { - log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)"); - createTopicConsumer().setMessageListener(this); - } - else - { - log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)"); - createDurableTopicConsumer(name).setMessageListener(this); - } - - _connection.start(); - - _controller = createControlPublisher(); - System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode) - + - ((name == null) - ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")")) - + "..."); - } - - public static void main(String[] argv) throws Exception - { - clientId = "Listener-" + System.currentTimeMillis(); - - NDC.push(clientId); - - Config config = new Config(); - config.setOptions(argv); - - //Connection con = config.createConnection(); - Connection con = - new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort() - + "'"); - - if (config.getClientId() != null) - { - con.setClientID(config.getClientId()); - } - - new Listener(con, config.getAckMode(), config.getSubscriptionId()); - - NDC.pop(); - NDC.remove(); - } - - /** - * Checks whether or not a text field on a message has the specified value. - * - * @param m The message to check. - * @param fieldName The name of the field to check. - * @param value The expected value of the field to compare with. - * - * @return trueIf the specified field has the specified value, fals otherwise. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException - { - log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName - + ", String value = " + value + "): called"); - - String comp = m.getStringProperty(fieldName); - log.debug("comp = " + comp); - - boolean result = (comp != null) && comp.equals(value); - log.debug("result = " + result); - - return result; - } - - public void onMessage(Message message) - { - NDC.push(clientId); - - log.debug("public void onMessage(Message message = " + message + "): called"); - - if (!init) - { - start = System.nanoTime() / 1000000; - count = 0; - init = true; - } - - try - { - if (isShutdown(message)) - { - log.debug("Got a shutdown message."); - shutdown(); - } - else if (isReport(message)) - { - log.debug("Got a report request message."); - - // Send the report. - report(); - init = false; - } - } - catch (JMSException e) - { - log.warn("There was a JMSException during onMessage.", e); - } - finally - { - NDC.pop(); - } - } - - Message createReportResponseMessage(String msg) throws JMSException - { - Message message = _session.createTextMessage(msg); - - // Shove some more field table type in the message just to see if the other end can handle it. - message.setBooleanProperty("BOOLEAN", true); - message.setByteProperty("BYTE", (byte) 5); - message.setDoubleProperty("DOUBLE", Math.PI); - message.setFloatProperty("FLOAT", 1.0f); - message.setIntProperty("INT", 1); - message.setShortProperty("SHORT", (short) 1); - message.setLongProperty("LONG", (long) 1827361278); - message.setStringProperty("STRING", "hello"); - - return message; - } - - boolean isShutdown(Message m) throws JMSException - { - boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); - - //log.debug("isShutdown = " + result); - - return result; - } - - boolean isReport(Message m) throws JMSException - { - boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); - - //log.debug("isReport = " + result); - - return result; - } - - MessageConsumer createTopicConsumer() throws Exception - { - return _session.createConsumer(_topic); - } - - MessageConsumer createDurableTopicConsumer(String name) throws Exception - { - return _session.createDurableSubscriber(_topic, name); - } - - MessageProducer createControlPublisher() throws Exception - { - return _session.createProducer(_response); - } - - private void shutdown() - { - try - { - _session.close(); - _connection.stop(); - _connection.close(); - } - catch (Exception e) - { - e.printStackTrace(System.out); - } - } - - private void report() - { - log.debug("private void report(): called"); - - try - { - String msg = getReport(); - _controller.send(createReportResponseMessage(msg)); - log.debug("Sent report: " + msg); - } - catch (Exception e) - { - e.printStackTrace(System.out); - } - } - - private String getReport() - { - long time = ((System.nanoTime() / 1000000) - start); - - return "Received " + count + " in " + time + "ms"; - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java deleted file mode 100644 index 4efdc1cb56..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.topic; - -import javax.jms.*; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.exchange.ExchangeDefaults; - -/** - */ -class MessageFactory -{ - private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); - - private final Session _session; - private final Topic _topic; - private final Topic _control; - private final byte[] _payload; - - MessageFactory(Session session) throws JMSException - { - this(session, 256); - } - - MessageFactory(Session session, int size) throws JMSException - { - _session = session; - if (session instanceof AMQSession) - { - _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topic_control"); - _control = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topictest.control"); - } - else - { - _topic = session.createTopic("topic_control"); - _control = session.createTopic("topictest.control"); - } - - _payload = new byte[size]; - - for (int i = 0; i < size; i++) - { - _payload[i] = (byte) DATA[i % DATA.length]; - } - } - - private static boolean checkText(Message m, String s) - { - try - { - return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - - return false; - } - } - - Topic getTopic() - { - return _topic; - } - - Message createEventMessage() throws JMSException - { - BytesMessage msg = _session.createBytesMessage(); - msg.writeBytes(_payload); - - return msg; - } - - Message createShutdownMessage() throws JMSException - { - return _session.createTextMessage("SHUTDOWN"); - } - - Message createReportRequestMessage() throws JMSException - { - return _session.createTextMessage("REPORT"); - } - - Message createReportResponseMessage(String msg) throws JMSException - { - return _session.createTextMessage(msg); - } - - boolean isShutdown(Message m) - { - return checkText(m, "SHUTDOWN"); - } - - boolean isReport(Message m) - { - return checkText(m, "REPORT"); - } - - Object getReport(Message m) - { - try - { - return ((TextMessage) m).getText(); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - - return e.toString(); - } - } - - MessageConsumer createTopicConsumer() throws Exception - { - return _session.createConsumer(_topic); - } - - MessageConsumer createDurableTopicConsumer(String name) throws Exception - { - return _session.createDurableSubscriber(_topic, name); - } - - MessageConsumer createControlConsumer() throws Exception - { - return _session.createConsumer(_control); - } - - MessageProducer createTopicPublisher() throws Exception - { - return _session.createProducer(_topic); - } - - MessageProducer createControlPublisher() throws Exception - { - return _session.createProducer(_control); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java b/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java deleted file mode 100644 index c3b19b558a..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.topic; - -import javax.jms.*; - -public class Publisher implements MessageListener -{ - private final Object _lock = new Object(); - private final Connection _connection; - private final Session _session; - private final MessageFactory _factory; - private final MessageProducer _publisher; - private int _count; - - Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception - { - _connection = connection; - _session = _connection.createSession(false, ackMode); - _factory = new MessageFactory(_session, size); - _publisher = _factory.createTopicPublisher(); - _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + "."); - } - - private void test(Config config) throws Exception - { - test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup()); - } - - private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception - { - _factory.createControlConsumer().setMessageListener(this); - _connection.start(); - - if (warmup > 0) - { - System.out.println("Runing warmup (" + warmup + " msgs)"); - long time = batch(warmup, consumerCount); - System.out.println("Warmup completed in " + time + "ms"); - } - - long[] times = new long[batches]; - for (int i = 0; i < batches; i++) - { - if (i > 0) - { - Thread.sleep(delay * 1000); - } - times[i] = batch(msgCount, consumerCount); - System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms."); - } - - long min = min(times); - long max = max(times); - System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); - - //request shutdown - _publisher.send(_factory.createShutdownMessage()); - - _connection.stop(); - _connection.close(); - } - - private long batch(int msgCount, int consumerCount) throws Exception - { - _count = consumerCount; - long start = System.currentTimeMillis(); - publish(msgCount); - waitForCompletion(consumerCount); - return System.currentTimeMillis() - start; - } - - private void publish(int count) throws Exception - { - - //send events - for (int i = 0; i < count; i++) - { - _publisher.send(_factory.createEventMessage()); - if ((i + 1) % 100 == 0) - { - System.out.println("Sent " + (i + 1) + " messages"); - } - } - - //request report - _publisher.send(_factory.createReportRequestMessage()); - } - - private void waitForCompletion(int consumers) throws Exception - { - System.out.println("Waiting for completion..."); - synchronized (_lock) - { - while (_count > 0) - { - _lock.wait(); - } - } - } - - - public void onMessage(Message message) - { - System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining"); - if (_count == 0) - { - synchronized (_lock) - { - _lock.notify(); - } - } - } - - static long min(long[] times) - { - long min = times.length > 0 ? times[0] : 0; - for (int i = 0; i < times.length; i++) - { - min = Math.min(min, times[i]); - } - return min; - } - - static long max(long[] times) - { - long max = times.length > 0 ? times[0] : 0; - for (int i = 0; i < times.length; i++) - { - max = Math.max(max, times[i]); - } - return max; - } - - static long avg(long[] times, long min, long max) - { - long sum = 0; - for (int i = 0; i < times.length; i++) - { - sum += times[i]; - } - - int adjustment = 0; - - // Remove min and max if we have run enough batches. - if (times.length > 2) - { - sum -= min; - sum -= max; - adjustment = 2; - } - - return (sum / (times.length - adjustment)); - } - - public static void main(String[] argv) throws Exception - { - Config config = new Config(); - config.setOptions(argv); - - Connection con = config.createConnection(); - int size = config.getPayload(); - int ackMode = config.getAckMode(); - boolean persistent = config.usePersistentMessages(); - new Publisher(con, size, ackMode, persistent).test(config); - } -} diff --git a/java/perftests/src/main/java/perftests.properties b/java/perftests/src/main/java/perftests.properties new file mode 100644 index 0000000000..a60e3964ad --- /dev/null +++ b/java/perftests/src/main/java/perftests.properties @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue -- cgit v1.2.1