From 810d80bbafb2b4826710a3e9388df89adfa220af Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 27 Jun 2007 15:34:57 +0000 Subject: Merged revisions 549530-550509 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r549530 | rupertlssmith | 2007-06-21 17:14:03 +0100 (Thu, 21 Jun 2007) | 1 line Added minimal checkstyle to project reports. Fixed some problems with site generation. ........ r549849 | rupertlssmith | 2007-06-22 16:39:27 +0100 (Fri, 22 Jun 2007) | 1 line Added Immediate and Mandatory message tests. ........ r550509 | ritchiem | 2007-06-25 15:16:30 +0100 (Mon, 25 Jun 2007) | 1 line Update to provide a SustainedTestCase, this sends batches of messages to the broker. The rate of publication is regulated by the average consume rate advertised by all connected clients. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@551199 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/broker/pom.xml | 1 + .../qpid/server/txn/CleanupMessageOperation.java | 51 +- .../qpid/server/txn/LocalTransactionalContext.java | 82 ++- qpid/java/client-java14/pom.xml | 1 + qpid/java/client/example/pom.xml | 1 + qpid/java/common/pom.xml | 1 + qpid/java/distribution/pom.xml | 1 + qpid/java/etc/coding_standards.xml | 117 ++++ qpid/java/etc/license_header.txt | 20 + qpid/java/integrationtests/pom.xml | 7 + .../interop/coordinator/CoordinatingTestCase.java | 9 +- .../interop/testclient/InteropClientTestCase.java | 9 +- .../apache/qpid/interop/testclient/TestClient.java | 62 +- .../testclient/testcases/TestCase1DummyRun.java | 5 + .../testclient/testcases/TestCase2BasicP2P.java | 5 + .../testclient/testcases/TestCase3BasicPubSub.java | 5 + .../apache/qpid/sustained/SustainedTestClient.java | 582 ++++++++++++----- .../qpid/sustained/SustainedTestCoordinator.java | 9 +- .../org/apache/qpid/util/ConversationFactory.java | 10 +- qpid/java/management/eclipse-plugin/pom.xml | 46 +- qpid/java/perftests/pom.xml | 1 + .../java/org/apache/qpid/ping/PingTestPerf.java | 37 +- qpid/java/pom.xml | 67 +- qpid/java/systests/distribution/pom.xml | 1 + qpid/java/systests/pom.xml | 6 + .../qpid/server/exchange/ImmediateMessageTest.java | 686 +++++++++++++++++++++ .../qpid/server/exchange/MandatoryMessageTest.java | 135 ++++ .../exchange/MessagingTestConfigProperties.java | 282 +++++++++ 28 files changed, 1929 insertions(+), 310 deletions(-) create mode 100644 qpid/java/etc/coding_standards.xml create mode 100644 qpid/java/etc/license_header.txt create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java (limited to 'qpid/java') diff --git a/qpid/java/broker/pom.xml b/qpid/java/broker/pom.xml index bad0d8a52d..81c5d22b22 100644 --- a/qpid/java/broker/pom.xml +++ b/qpid/java/broker/pom.xml @@ -30,6 +30,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java index 609a85c22f..988f589339 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java @@ -1,31 +1,35 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.server.txn; -import java.util.List; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.StoreContext; +import java.util.List; + /** * @author Apache Software Foundation */ @@ -44,33 +48,26 @@ public class CleanupMessageOperation implements TxnOp } public void prepare(StoreContext context) throws AMQException - { - } + { } public void undoPrepare() { - //don't need to do anything here, if the store's txn failed - //when processing prepare then the message was not stored - //or enqueued on any queues and can be discarded + // don't need to do anything here, if the store's txn failed + // when processing prepare then the message was not stored + // or enqueued on any queues and can be discarded } public void commit(StoreContext context) { - - try + // No-op can't be done here has this is before the message has been attempted to be delivered. + /*try { _msg.checkDeliveredToConsumer(); } catch (NoConsumersException e) { - //TODO: store this for delivery after the commit-ok _returns.add(e); - } - catch (AMQException e) - { - _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " + - e, e); - } + }*/ } public void rollback(StoreContext context) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 93459beb45..4e684098d0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -1,26 +1,27 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.server.txn; -import java.util.LinkedList; -import java.util.List; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.ack.TxAck; @@ -28,9 +29,13 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import java.util.LinkedList; +import java.util.List; + /** A transactional context that only supports local transactions. */ public class LocalTransactionalContext implements TransactionalContext { @@ -54,6 +59,7 @@ public class LocalTransactionalContext implements TransactionalContext private boolean _inTran = false; + /** Are there messages to deliver. NOT Has the message been delivered */ private boolean _messageDelivered = false; private static class DeliveryDetails @@ -62,7 +68,6 @@ public class LocalTransactionalContext implements TransactionalContext public AMQQueue queue; private boolean deliverFirst; - public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst) { this.message = message; @@ -72,15 +77,14 @@ public class LocalTransactionalContext implements TransactionalContext } public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext, - List returnMessages) + List returnMessages) { _messageStore = messageStore; _storeContext = storeContext; _returnMessages = returnMessages; - //_txnBuffer.enlist(new StoreMessageOperation(messageStore)); + // _txnBuffer.enlist(new StoreMessageOperation(messageStore)); } - public StoreContext getStoreContext() { return _storeContext; @@ -90,11 +94,12 @@ public class LocalTransactionalContext implements TransactionalContext { _txnBuffer.rollback(_storeContext); // Hack to deal with uncommitted non-transactional writes - if(_messageStore.inTran(_storeContext)) + if (_messageStore.inTran(_storeContext)) { _messageStore.abortTran(_storeContext); _inTran = false; } + _postCommitDeliveryList.clear(); } @@ -106,7 +111,7 @@ public class LocalTransactionalContext implements TransactionalContext // be added for every queue onto which the message is // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. -// message.incrementReference(); + // message.incrementReference(); _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); _messageDelivered = true; _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); @@ -119,7 +124,7 @@ public class LocalTransactionalContext implements TransactionalContext message.incrementReference(); _messageDelivered = true; - */ + */ } private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException @@ -131,16 +136,16 @@ public class LocalTransactionalContext implements TransactionalContext } public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, - UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException + UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException { - //check that the tag exists to give early failure - if (!multiple || deliveryTag > 0) + // check that the tag exists to give early failure + if (!multiple || (deliveryTag > 0)) { checkAck(deliveryTag, unacknowledgedMessageMap); } - //we use a single txn op for all acks and update this op - //as new acks come in. If this is the first ack in the txn - //we will need to create and enlist the op. + // we use a single txn op for all acks and update this op + // as new acks come in. If this is the first ack in the txn + // we will need to create and enlist the op. if (_ackOp == null) { beginTranIfNecessary(); @@ -148,7 +153,7 @@ public class LocalTransactionalContext implements TransactionalContext _txnBuffer.enlist(_ackOp); } // update the op to include this ack request - if (multiple && deliveryTag == 0) + if (multiple && (deliveryTag == 0)) { // if have signalled to ack all, that refers only // to all at this time @@ -178,6 +183,7 @@ public class LocalTransactionalContext implements TransactionalContext { _log.debug("Starting transaction on message store: " + this); } + _messageStore.beginTran(_storeContext); _inTran = true; } @@ -189,12 +195,13 @@ public class LocalTransactionalContext implements TransactionalContext { _log.debug("Committing transactional context: " + this); } + if (_ackOp != null) { _messageDelivered = true; _ackOp.consolidate(); - //already enlisted, after commit will reset regardless of outcome + // already enlisted, after commit will reset regardless of outcome _ackOp = null; } @@ -202,7 +209,7 @@ public class LocalTransactionalContext implements TransactionalContext { _txnBuffer.enlist(new StoreMessageOperation(_messageStore)); } - //fixme fail commit here ... QPID-440 + // fixme fail commit here ... QPID-440 try { _txnBuffer.commit(_storeContext); @@ -215,7 +222,7 @@ public class LocalTransactionalContext implements TransactionalContext try { - postCommitDelivery(); + postCommitDelivery(_returnMessages); } catch (AMQException e) { @@ -224,23 +231,32 @@ public class LocalTransactionalContext implements TransactionalContext } } - private void postCommitDelivery() throws AMQException + private void postCommitDelivery(List returnMessages) throws AMQException { if (_log.isDebugEnabled()) { _log.debug("Performing post commit delivery"); } + try { for (DeliveryDetails dd : _postCommitDeliveryList) { dd.queue.process(_storeContext, dd.message, dd.deliverFirst); + + try + { + dd.message.checkDeliveredToConsumer(); + } + catch (NoConsumersException nce) + { + returnMessages.add(nce); + } } } finally { _postCommitDeliveryList.clear(); } - } } diff --git a/qpid/java/client-java14/pom.xml b/qpid/java/client-java14/pom.xml index 90a49e7d6c..db1644c5b5 100644 --- a/qpid/java/client-java14/pom.xml +++ b/qpid/java/client-java14/pom.xml @@ -32,6 +32,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml diff --git a/qpid/java/client/example/pom.xml b/qpid/java/client/example/pom.xml index 50680666e1..1d48b3afbe 100644 --- a/qpid/java/client/example/pom.xml +++ b/qpid/java/client/example/pom.xml @@ -31,6 +31,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml diff --git a/qpid/java/common/pom.xml b/qpid/java/common/pom.xml index aaa9a556e8..a16573e066 100644 --- a/qpid/java/common/pom.xml +++ b/qpid/java/common/pom.xml @@ -31,6 +31,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml diff --git a/qpid/java/distribution/pom.xml b/qpid/java/distribution/pom.xml index 366b478687..8774b04c18 100644 --- a/qpid/java/distribution/pom.xml +++ b/qpid/java/distribution/pom.xml @@ -31,6 +31,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml diff --git a/qpid/java/etc/coding_standards.xml b/qpid/java/etc/coding_standards.xml new file mode 100644 index 0000000000..00b1a9516a --- /dev/null +++ b/qpid/java/etc/coding_standards.xml @@ -0,0 +1,117 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/qpid/java/etc/license_header.txt b/qpid/java/etc/license_header.txt new file mode 100644 index 0000000000..02ee6e8f98 --- /dev/null +++ b/qpid/java/etc/license_header.txt @@ -0,0 +1,20 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ \ No newline at end of file diff --git a/qpid/java/integrationtests/pom.xml b/qpid/java/integrationtests/pom.xml index 3afdf48204..9ccd153f54 100644 --- a/qpid/java/integrationtests/pom.xml +++ b/qpid/java/integrationtests/pom.xml @@ -31,6 +31,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml @@ -45,6 +46,12 @@ qpid-client + + org.slf4j + slf4j-log4j12 + 1.4.0 + + uk.co.thebadgerset junit-toolkit diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java index 31de84e630..d2042be741 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -1,4 +1,3 @@ -/* Copyright Rupert Smith, 2005 to 2006, all rights reserved. */ /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -21,16 +20,16 @@ */ package org.apache.qpid.interop.coordinator; -import java.util.Map; - -import javax.jms.*; - import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.util.ConversationFactory; +import javax.jms.*; + +import java.util.Map; + /** * A CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a * test case as defined in the interop testing specification diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java index 9f769822de..37952d08c8 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java @@ -79,11 +79,18 @@ public interface InteropClientTestCase extends MessageListener /** * Performs the test case actions. - * + * return from here when you have finished the test.. this will signal the controller that the test has ended. * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. */ public void start() throws JMSException; + /** + * Gives notice of termination of the test case actions. + * + * @throws JMSException Any JMSException resulting from allowed to fall through. + */ + public void terminate() throws JMSException, InterruptedException; + /** * Gets a report on the actions performed by the test case in its assigned role. * diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index 6cca23446f..a82b05e20f 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -20,23 +20,31 @@ */ package org.apache.qpid.interop.testclient; -import java.io.IOException; -import java.util.*; - -import javax.jms.*; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - import org.apache.log4j.Logger; - import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun; import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P; -import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub; -import org.apache.qpid.util.ClasspathScanner; import org.apache.qpid.util.CommandLineParser; import org.apache.qpid.util.PropertiesUtils; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + /** * Implements a test client as described in the interop testing spec * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that @@ -201,7 +209,7 @@ public class TestClient implements MessageListener } // Open a connection to communicate with the coordinator on. - _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost); + _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost); session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -219,17 +227,21 @@ public class TestClient implements MessageListener _connection.start(); } + + public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost) + { + return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost); + } + /** * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple - * convenience method for code that does anticipate handling connection failures. All exceptions that indicate - * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure - * handler. - * - * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it - * to a Utils library class. + * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that + * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler. * * @param connectionPropsResource The name of the connection properties file. - * @param brokerUrl The broker url to connect to, null to use the default from the properties. + * @param clientID + * @param brokerUrl The broker url to connect to, null to use the default from the + * properties. * @param virtualHost The virtual host to connectio to, null to use the default. * * @return A JMS conneciton. @@ -237,7 +249,7 @@ public class TestClient implements MessageListener * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a * Utils library class. */ - public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost) + public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost) { log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called"); @@ -251,7 +263,7 @@ public class TestClient implements MessageListener if (brokerUrl != null) { String connectionString = - "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'"; + "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'"; connectionProps.setProperty(CONNECTION_PROPERTY, connectionString); } @@ -381,6 +393,14 @@ public class TestClient implements MessageListener { log.info("Received termination instruction from coordinator."); +// try +// { +// currentTestCase.terminate(); +// } +// catch (InterruptedException e) +// { +// // +// } // Is a cleaner shutdown needed? _connection.close(); System.exit(0); diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java index 85b89172bb..5f257c0b36 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java @@ -74,6 +74,11 @@ public class TestCase1DummyRun implements InteropClientTestCase // Do nothing. } + public void terminate() throws JMSException + { + //todo + } + public Message getReport(Session session) throws JMSException { log.debug("public Message getReport(Session session): called"); diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java index ea62b46451..ff56ee9b93 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java @@ -170,6 +170,11 @@ public class TestCase2BasicP2P implements InteropClientTestCase } } + public void terminate() throws JMSException + { + //todo + } + /** * Gets a report on the actions performed by the test case in its assigned role. * diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java index 223c4916bf..7b35142c82 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java @@ -202,6 +202,11 @@ public class TestCase3BasicPubSub implements InteropClientTestCase } } + public void terminate() throws JMSException, InterruptedException + { + //todo + } + /** * Gets a report on the actions performed by the test case in its assigned role. * diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java index cabe73e331..1597da6dba 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java @@ -38,6 +38,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; /** * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the @@ -52,7 +53,10 @@ import java.util.Map; public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener { /** Used for debugging. */ - private static final Logger log = Logger.getLogger(SustainedTestClient.class); + private static final Logger debugLog = Logger.getLogger(SustainedTestClient.class); + + private static final Logger log = Logger.getLogger("SustainedTest"); + /** The role to be played by the test. */ private Roles role; @@ -83,9 +87,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti SustainedRateAdapter _rateAdapter; /** */ - int updateInterval; + int _batchSize; + - private boolean _running = true; + private static final long TEN_MILLI_SEC = 10000000; + private static final long FIVE_MILLI_SEC = 5000000; /** * Should provide the name of the test case that this class implements. The exact names are defined in the interop @@ -95,7 +101,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ public String getName() { - log.debug("public String getName(): called"); + debugLog.debug("public String getName(): called"); return "Perf_SustainedPubSub"; } @@ -111,31 +117,34 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ public void assignRole(Roles role, Message assignRoleMessage) throws JMSException { - log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage - + "): called"); + debugLog.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); // Take note of the role to be played. this.role = role; // Extract and retain the test parameters. numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); - updateInterval = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); + _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); - log.debug("numReceivers = " + numReceivers); - log.debug("updateInterval = " + updateInterval); - log.debug("ackMode = " + ackMode); - log.debug("sendKey = " + sendKey); - log.debug("sendUpdateKey = " + sendUpdateKey); - log.debug("role = " + role); + if (debugLog.isDebugEnabled()) + { + debugLog.debug("numReceivers = " + numReceivers); + debugLog.debug("_batchSize = " + _batchSize); + debugLog.debug("ackMode = " + ackMode); + debugLog.debug("sendKey = " + sendKey); + debugLog.debug("sendUpdateKey = " + sendUpdateKey); + debugLog.debug("role = " + role); + } switch (role) { // Check if the sender role is being assigned, and set up a single message producer if so. case SENDER: - log.info("*********** Creating SENDER"); + log.info("Creating Sender"); // Create a new connection to pass the test messages on. connection = new Connection[1]; session = new Session[1]; @@ -164,7 +173,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number // of receiver connections. case RECEIVER: - log.info("*********** Creating RECEIVER"); + log.info("Creating Receiver"); // Create the required number of receiver connections. connection = new Connection[numReceivers]; session = new Session[numReceivers]; @@ -183,7 +192,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti MessageConsumer consumer = session[i].createConsumer(sendDestination); - consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], sendUpdateDestination)); + consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination)); } break; @@ -196,29 +205,32 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } + /** Performs the test case actions. */ public void start() throws JMSException { - log.debug("public void start(): called"); + debugLog.debug("public void start(): called"); // Check that the sender role is being performed. switch (role) { // Check if the sender role is being assigned, and set up a single message producer if so. case SENDER: - Message testMessage = session[0].createTextMessage("test"); - -// for (int i = 0; i < numMessages; i++) - while (_running) - { - producer.send(testMessage); - - _rateAdapter.sentMessage(); - } + _rateAdapter.run(); break; case RECEIVER: } + + //return from here when you have finished the test.. this will signal the controller and + } + + public void terminate() throws JMSException, InterruptedException + { + if (_rateAdapter != null) + { + _rateAdapter.stop(); + } } /** @@ -232,7 +244,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ public Message getReport(Session session) throws JMSException { - log.debug("public Message getReport(Session session): called"); + debugLog.debug("public Message getReport(Session session): called"); // Close the test connections. for (int i = 0; i < connection.length; i++) @@ -252,89 +264,100 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti if (linked != null) { - if (linked instanceof AMQNoRouteException) + if (debugLog.isDebugEnabled()) { - log.warn("No route ."); + debugLog.debug("Linked Exception:" + linked); } - else if (linked instanceof AMQNoConsumersException) - { - log.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage()); - } - else + if ((linked instanceof AMQNoRouteException) + || (linked instanceof AMQNoConsumersException)) { + if (debugLog.isDebugEnabled()) + { + if (linked instanceof AMQNoConsumersException) + { + debugLog.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage()); + } + else + { + debugLog.warn("No route for message"); + } + } - log.warn("LinkedException:" + linked); + // Tell the rate adapter that there are no clients ready yet + _rateAdapter.NO_CLIENTS = true; } - - _rateAdapter.NO_CLIENTS = true; } else { - log.warn("Exception:" + linked); + debugLog.warn("Exception:" + linked); } } + /** + * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and + * 'end' messages. + */ class SustainedListener implements MessageListener { - private int _received = 0; - private int _updateInterval = 0; - private Long _time; + /** Number of messages received */ + private long _received = 0; + /** The number of messages in the batch */ + private int _batchSize = 0; + /** Record of the when the 'start' messagse was sen */ + private Long _startTime; + /** Message producer to use to send reports */ MessageProducer _updater; + /** Session to create the report message on */ Session _session; + /** Record of the client ID used for this SustainedListnener */ String _client; - public SustainedListener(String clientname, int updateInterval, Session session, Destination sendDestination) throws JMSException + /** + * Main Constructor + * + * @param clientname The _client id used to identify this connection. + * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to + * control the interval between sending reports. + * @param session The session used for communication. + * @param sendDestination The destination that update reports should be sent to. + * + * @throws JMSException My occur if creatingthe Producer fails + */ + public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) throws JMSException { - _updateInterval = updateInterval; + _batchSize = batchSize; _client = clientname; _session = session; _updater = session.createProducer(sendDestination); } - public void setReportInterval(int reportInterval) - { - _updateInterval = reportInterval; - _received = 0; - } - public void onMessage(Message message) { - if (log.isDebugEnabled()) + if (debugLog.isTraceEnabled()) { - log.debug("Message " + _received + "received in listener"); + debugLog.trace("Message " + _received + "received in listener"); } + if (message instanceof TextMessage) { - try { - if (((TextMessage) message).getText().equals("test")) + _received++; + if (((TextMessage) message).getText().equals("start")) { - if (_received == 0) - { - _time = System.nanoTime(); - sendStatus(0, _received); - } - - _received++; - - if (_received % _updateInterval == 0) + debugLog.info("Starting Batch"); + _startTime = System.nanoTime(); + } + else if (((TextMessage) message).getText().equals("end")) + { + if (_startTime != null) { - Long currentTime = System.nanoTime(); - - try - { - sendStatus(currentTime - _time, _received); - _time = currentTime; - } - catch (JMSException e) - { - log.error("Unable to send update."); - } + long currentTime = System.nanoTime(); + sendStatus(currentTime - _startTime, _received); + debugLog.info("End Batch"); } - } } catch (JMSException e) @@ -342,37 +365,68 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti //ignore error } } + } - private void sendStatus(long time, int received) throws JMSException + /** + * sendStatus creates and sends the report back to the publisher + * + * @param time taken for the the last batch + * @param received Total number of messages received. + * + * @throws JMSException if an error occurs during the send + */ + private void sendStatus(long time, long received) throws JMSException { Message updateMessage = _session.createTextMessage("update"); - updateMessage.setStringProperty("CLIENT_ID", _client); + updateMessage.setStringProperty("CLIENT_ID", ":" + _client); updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); updateMessage.setLongProperty("RECEIVED", received); updateMessage.setLongProperty("DURATION", time); - log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + if (debugLog.isInfoEnabled()) + { + debugLog.info("**** SENDING [" + received / _batchSize + "]**** " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + } + + // Output on the main log.info the details of this batch + if (received / _batchSize % 10 == 0) + { + log.info("Sending Report [" + received / _batchSize + "] " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + } _updater.send(updateMessage); } - } - class SustainedRateAdapter implements MessageListener + + /** + * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second + * that are sent through the test system. + * + * By keeping a record of the messages recevied and the average time taken to process the batch size can be + * calculated and so the delay can be adjusted to maintain that rate. + * + * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no + * messages in the batch. Otherwise the delay is used at the end of the batch. + */ + class SustainedRateAdapter implements MessageListener, Runnable { private SustainedTestClient _client; - private long _variance = 250; //no. messages to allow drifting + private long _messageVariance = 500; //no. messages to allow drifting + private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) private volatile long _delay; //in nanos private long _sent; private Map _slowClients = new HashMap(); - private static final long PAUSE_SLEEP = 10; // 10 ms - private static final long NO_CLIENT_SLEEP = 1000; // 1s - private static final long MAX_MESSAGE_DRIFT = 1000; // no messages drifted from producer + private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms + private static final long NO_CLIENT_SLEEP = 1000; // 1s private volatile boolean NO_CLIENTS = true; private int _delayShifting; - private static final int REPORTS_WITHOUT_CHANGE = 10; - private static final double MAXIMUM_DELAY_SHIFT = .02; //2% + private static final int REPORTS_WITHOUT_CHANGE = 5; + private boolean _warmedup = false; + private static final long EXPECTED_TIME_PER_BATCH = 100000L; SustainedRateAdapter(SustainedTestClient client) { @@ -381,9 +435,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti public void onMessage(Message message) { - if (log.isDebugEnabled()) + if (debugLog.isDebugEnabled()) { - log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); + debugLog.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); } try @@ -395,15 +449,25 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti { NO_CLIENTS = false; long duration = message.getLongProperty("DURATION"); - long received = message.getLongProperty("RECEIVED"); + long totalReceived = message.getLongProperty("RECEIVED"); String client = message.getStringProperty("CLIENT_ID"); - log.info("**** SENDING **** CLIENT_ID:" + client + " RECEIVED:" + received + " DURATION:" + duration); + if (debugLog.isInfoEnabled()) + { + debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration); + } + + recordSlow(client, totalReceived); + adjustDelay(client, totalReceived, duration); - recordSlow(client, received); - adjustDelay(client, received, duration); + if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2) + { + _warmedup = true; + _warmup.countDown(); + + } } } catch (JMSException e) @@ -412,72 +476,220 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } - class Pair + CountDownLatch _warmup = new CountDownLatch(1); + + int _warmUpBatches = 20; + + int _numBatches = 10000; + + // long[] _timings = new long[_numBatches]; + private boolean _running = true; + + + public void run() + { + log.info("Warming up"); + + doBatch(_warmUpBatches); + + try + { + //wait for warmup to complete. + _warmup.await(); + + //set delay to the average length of the batches + _delay = _totalDuration / _warmUpBatches / delays.size(); + + log.info("Warmup complete delay set : " + _delay + + " based on _totalDuration: " + _totalDuration + + " over no. batches: " + _warmUpBatches + + " with client count: " + delays.size()); + + _totalDuration = 0L; + _totalReceived = 0L; + _sent = 0L; + } + catch (InterruptedException e) + { + // + } + + + doBatch(_numBatches); + + } + + private void doBatch(int batchSize) // long[] timings, { - X item1; - Y item2; + TextMessage testMessage = null; + try + { + testMessage = _client.session[0].createTextMessage("start"); + - Pair(X i1, Y i2) + for (int batch = 0; batch < batchSize; batch++) +// while (_running) + { + long start = System.nanoTime(); + + testMessage.setText("start"); + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + testMessage.setText("test"); + //start at 2 so start and end count as part of batch + for (int m = 2; m < _batchSize; m++) + { + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + } + + testMessage.setText("end"); + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + long end = System.nanoTime(); + + long sendtime = end - start; + + debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + + if (batch % 10 == 0) + { + log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); + } + + _rateAdapter.sleepBatch(); + + } + } + catch (JMSException e) { - item1 = i1; - item2 = i2; + log.error("Runner ended"); } + } - X getItem1() + private String status() + { + return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + + " Delay is " + _delay + " resulting in " + + ((_delay > TEN_MILLI_SEC * _batchSize) ? (_delay / _batchSize) + "/msg" : _delay + "/batch"); + } + + private void sleepBatch() + { + if (checkForSlowClients()) + {//if there werwe slow clients we have already slept so don't sleep anymore again. + return; + } + + //Slow down if gap between send and received is too large + if (_sent - _totalReceived / delays.size() > _messageVariance) { - return item1; + //pause between batches. + debugLog.info("Sleeping to keep sent in check with received"); + log.debug("Increaseing _delay as sending more than receiving"); + _delay += TEN_MILLI_SEC; } - Y getItem2() + //per batch sleep.. if sleep is to small to spread over the batch. + if (_delay <= TEN_MILLI_SEC * _batchSize) { - return item2; + sleepLong(_delay); + } + else + { + debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); } } - Map> delays = new HashMap>(); - Long totalReceived = 0L; - Long totalDuration = 0L; + public void stop() + { + _running = false; + } - private void adjustDelay(String client, long received, long duration) + Map delays = new HashMap(); + Long _totalReceived = 0L; + Long _totalDuration = 0L; + int _skipUpdate = 0; + + /** + * Adjust the delay for sending messages based on this update from the client + * + * @param client The client that send this update + * @param totalReceived The number of messages that this client has received. + * @param duration The time taken for the last batch of messagse + */ + private void adjustDelay(String client, long totalReceived, long duration) { - Pair current = delays.get(client); + //Retrieve the current total time taken for this client. + Long currentTime = delays.get(client); - if (current == null) + // Add the new duration time to this client + if (currentTime == null) { - delays.put(client, new Pair(received, duration)); + currentTime = duration; } else { - //reduce totals - totalReceived -= current.getItem1(); - totalDuration -= current.getItem2(); + currentTime += duration; } - totalReceived += received; - totalDuration += duration; + delays.put(client, currentTime); + + + _totalReceived += _batchSize; + _totalDuration += duration; - long averageDuration = totalDuration / delays.size(); + // Calculate the number of messages in the batch. + long batchCount = (_totalReceived / _batchSize); - long diff = Math.abs(_delay - averageDuration); + //calculate average duration accross clients per batch + long averageDuration = _totalDuration / delays.size() / batchCount; + + //calculate the difference between current send delay and average report delay + long diff = (duration) - averageDuration; + + if (debugLog.isInfoEnabled()) + { + debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers" + + " on batch: " + batchCount + + " Batch Duration: " + duration + + " Average: " + averageDuration + + " so diff: " + diff + " for : " + client + + " Delay is " + _delay + " resulting in " + + ((_delay > TEN_MILLI_SEC * _batchSize) + ? (_delay / _batchSize) + "/msg" : _delay + "/batch")); + } //if the averageDuration differs from the current by more than the specified variane then adjust delay. - if (diff > _variance) + if (Math.abs(diff) > _timeVariance) { - if (averageDuration > _delay) + + // if the the _delay is larger than the required duration to send report + // speed up + if (diff > TEN_MILLI_SEC) { - // we can go faster - _delay -= diff; + _delay -= TEN_MILLI_SEC; + if (_delay < 0) { _delay = 0; + debugLog.info("Reset _delay to 0"); + delayStable(); + } + else + { + delayChanged(); } + } - else + else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance { - // we need to slow down - _delay += diff; + // the report took longer + _delay += TEN_MILLI_SEC; + delayChanged(); } - delayChanged(); } else { @@ -486,11 +698,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } + /** Reset the number of iterations before we say the delay has stabilised. */ private void delayChanged() { _delayShifting = REPORTS_WITHOUT_CHANGE; } + /** + * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will + * output Delay stabilised + */ private void delayStable() { _delayShifting--; @@ -498,14 +715,20 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti if (_delayShifting < 0) { _delayShifting = 0; - log.info("Delay stabilised:" + _delay); + log.debug("Delay stabilised:" + _delay); } } - // Record Slow clients + /** + * Checks that the client has received enough messages. If the client has fallen behind then they are put in the + * _slowClients lists which will increase the delay. + * + * @param client The client identifier to check + * @param received the number of messages received by that client + */ private void recordSlow(String client, long received) { - if (received < (_sent - _variance)) + if (received < (_sent - _messageVariance)) { _slowClients.put(client, received); } @@ -515,20 +738,49 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } + /** Incrment the number of sent messages and then sleep, if required. */ public void sentMessage() { - if (_sent % updateInterval == 0) + + _sent++; + + if (_delay > TEN_MILLI_SEC * _batchSize) { + long batchDelay = _delay / _batchSize; + // less than 10ms sleep doesn't always work. + // _delay is in nano seconds +// if (batchDelay < (TEN_MILLI_SEC)) +// { +// sleep(0, (int) batchDelay); +// } +// else + { +// if (batchDelay < 30000000000L) + { + sleepLong(batchDelay); + } + } + } + } + + /** + * Check at the end of each batch and pause sending messages to allow slow clients to catch up. + * + * @return true if there were slow clients that caught up. + */ + private boolean checkForSlowClients() + { + if (_sent % _batchSize == 0) + { // Cause test to pause when we have slow if (!_slowClients.isEmpty() || NO_CLIENTS) { - log.info("Pausing for slow clients"); - - //_delay <<= 1; + debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray()); while (!_slowClients.isEmpty()) { + debugLog.info(_slowClients.size() + " slow clients."); sleep(PAUSE_SLEEP); } @@ -537,45 +789,67 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti sleep(NO_CLIENT_SLEEP); } - log.debug("Continuing"); - return; + debugLog.debug("Continuing"); + return true; } else { - log.info("Delay:" + _delay); + debugLog.info("Delay:" + _delay); } + } - _sent++; + return false; + } - if (_delay > 0) - { - // less than 10ms sleep doesn't work. - // _delay is in nano seconds - if (_delay < 1000000) - { - sleep(0, (int) _delay); - } - else - { - if (_delay < 30000000000L) - { - sleep(_delay / 1000000, (int) (_delay % 1000000)); - } - } - } + /** + * Sleep normally takes micro-seconds this allows the use of a nano-second value. + * + * @param delay nanoseconds to sleep for. + */ + private void sleepLong(long delay) + { + sleep(delay / 1000000, (int) (delay % 1000000)); } + /** + * Sleep for the specified micro-seconds. + * @param sleep microseconds to sleep for. + */ private void sleep(long sleep) { sleep(sleep, 0); } + /** + * Perform the sleep , swallowing any InteruptException. + * + * NOTE: If a sleep request is > 10s then reset only sleep for 5s + * + * @param milli to sleep for + * @param nano sub miliseconds to sleep for + */ private void sleep(long milli, int nano) { try { - log.debug("Sleep:" + milli + ":" + nano); + debugLog.debug("Sleep:" + milli + ":" + nano); + if (milli > 10000) + { + + if (_delay == milli) + { + _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH; + debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s. Reset _totalDuration:" + _totalDuration); + } + else + { + debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s"); + } + + milli = 5000; + } + Thread.sleep(milli, nano); } catch (InterruptedException e) @@ -583,6 +857,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti // } } + + public void setClient(SustainedTestClient client) + { + _client = client; + } } } + diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java index 4081d87192..b437e165b4 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java @@ -78,11 +78,12 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i Map testConfig = new HashMap(); testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); - //testConfig.put("SUSTAINED_MSG_RATE", 10); - testConfig.put("SUSTAINED_NUM_RECEIVERS", 2); - testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25); + testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); + testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); - testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE); + testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); + + log.info("Created Config: " + testConfig.entrySet().toArray()); sequenceTest(testConfig); } diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java index 4ca2fe8ff5..0090bec3d0 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java @@ -20,16 +20,16 @@ */ package org.apache.qpid.util; +import org.apache.log4j.Logger; + +import javax.jms.*; + import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import javax.jms.*; - -import org.apache.log4j.Logger; - /** * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant @@ -153,7 +153,7 @@ public class ConversationFactory * queue. * @param queueClass The queue implementation class. * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. + * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public ConversationFactory(Connection connection, Destination receiveDestination, Class queueClass) throws JMSException diff --git a/qpid/java/management/eclipse-plugin/pom.xml b/qpid/java/management/eclipse-plugin/pom.xml index 4fbc8a0e3f..6637460822 100644 --- a/qpid/java/management/eclipse-plugin/pom.xml +++ b/qpid/java/management/eclipse-plugin/pom.xml @@ -15,39 +15,40 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - --> +--> + 4.0.0 org.apache.qpid.management org.apache.qpid.management.ui jar 1.0-incubating-M2-SNAPSHOT Qpid Management - http://cwiki.apache.org/confluence/display/qpid org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../../pom.xml - ../../ + ../.. - + repo1.maven.org Maven eclipse Repository http://repo1.maven.org/eclipse - - + + apache.snapshots Apache SNAPSHOT Repository http://people.apache.org/repo/m2-snapshot-repository - true + true @@ -196,40 +197,42 @@ icons/ icons/ - ** + ** icons/ / - splash.bmp + splash.bmp - ${basedir} - / - - plugin.xml - plugin.properties - + ${basedir} + / + + plugin.xml + plugin.properties + - + + diff --git a/qpid/java/perftests/pom.xml b/qpid/java/perftests/pom.xml index d934fee7ec..77d70b7020 100644 --- a/qpid/java/perftests/pom.xml +++ b/qpid/java/perftests/pom.xml @@ -32,6 +32,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 248034af9b..46333db844 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.ping; -import javax.jms.*; - import junit.framework.Assert; import junit.framework.Test; import junit.framework.TestSuite; @@ -35,6 +33,8 @@ import uk.co.thebadgerset.junit.extensions.TestThreadAware; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; +import javax.jms.*; + /** * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times * simultaneously to simluate many clients/producers/connections. @@ -72,39 +72,6 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware super(name); _logger.debug("testParameters = " + testParameters); - - // Sets up the test parameters with defaults. - /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, PingPongProducer.MESSAGE_SIZE_DEAFULT); - testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, - PingPongProducer.PING_QUEUE_NAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, - PingPongProducer.PERSISTENT_MODE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, PingPongProducer.TRANSACTED_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, PingPongProducer.VERBOSE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, PingPongProducer.RATE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, PingPongProducer.PUBSUB_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, PingPongProducer.TIMEOUT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, - PingPongProducer.DESTINATION_COUNT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, - PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, - PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, - PingPongProducer.FAIL_AFTER_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, - PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, PingPongProducer.UNIQUE_DESTS_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, PingPongProducer.ACK_MODE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, - PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ } /** diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml index c86206d90e..2e1a792c49 100644 --- a/qpid/java/pom.xml +++ b/qpid/java/pom.xml @@ -31,9 +31,9 @@ under the License. pom - scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/trunk - scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/trunk - http://svn.apache.org/viewvc/incubator/qpid/trunk/ + scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/branches/M2/java + scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/branches/M2/java + http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java @@ -103,7 +103,7 @@ under the License. never brief - false + true false 1.5 -Xlint:fallthrough,finally @@ -554,30 +554,83 @@ under the License. + + + + + + + + + org.apache.maven.plugins maven-project-info-reports-plugin ${mprojectinfo.version} + + org.apache.maven.plugins maven-surefire-report-plugin ${surefire-report.version} + + + + org.codehaus.mojo + taglist-maven-plugin + + + org.apache.maven.plugins - maven-javadoc-plugin - ${javadoc.version} + maven-jxr-plugin + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + ${basedir}/${topDirectoryLocation}/etc/coding_standards.xml + ${basedir}/${topDirectoryLocation}/etc/license_header.txt + + + + + diff --git a/qpid/java/systests/distribution/pom.xml b/qpid/java/systests/distribution/pom.xml index bff1e0d9e5..70a6a18cce 100644 --- a/qpid/java/systests/distribution/pom.xml +++ b/qpid/java/systests/distribution/pom.xml @@ -31,6 +31,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../../pom.xml diff --git a/qpid/java/systests/pom.xml b/qpid/java/systests/pom.xml index 8a245b73a9..f845b9fb44 100644 --- a/qpid/java/systests/pom.xml +++ b/qpid/java/systests/pom.xml @@ -30,6 +30,7 @@ org.apache.qpid qpid 1.0-incubating-M2-SNAPSHOT + ../pom.xml @@ -55,6 +56,11 @@ compile + + uk.co.thebadgerset + junit-toolkit + + org.slf4j diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java new file mode 100644 index 0000000000..05fbceca20 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java @@ -0,0 +1,686 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; + +import org.apache.log4j.NDC; + +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import java.util.ArrayList; +import java.util.List; + +/** + * ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS + * feature. A message may be marked with an immediate delivery flag, which means that a consumer must be connected + * to receive the message, through a valid route, when it is sent, or when its transaction is committed in the case + * of transactional messaging. If this is not the case, the broker should return the message with a NO_CONSUMERS code. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Check that an immediate message is sent succesfully not using transactions when a consumer is connected. + *
Check that an immediate message is committed succesfully in a transaction when a consumer is connected. + *
Check that an immediate message results in no consumers code, not using transactions, when no consumer is + * connected. + *
Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is + * connected. + *
+ * + * @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties, + * from trailing prop=value pairs on the command line, from test properties files or other sources. This should + * run through stanard JUnit without the JUnit toolkit extensions, and through Maven surefire, and also through + * the JUnit toolkit extended test runners. + * + * @todo Veto test topologies using bounce back. Or else the bounce back client will act as an immediate consumer. + */ +public class ImmediateMessageTest extends TestCase +{ + /** Used for debugging. */ + private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class); + + /** Used to read the tests configurable properties through. */ + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /** All these tests should have the immediate flag on. */ + private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true); + + /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */ + public void test_QPID_517_ImmediateOkNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Send one message with no errors. + PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */ + public void test_QPID_517_ImmediateOkTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Send one message with no errors. + PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */ + public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class); + } + + /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */ + public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class); + } + + protected void setUp() throws Exception + { + NDC.push(getName()); + + // Ensure that the in-vm broker is created. + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + try + { + // Ensure that the in-vm broker is cleaned up so that the next test starts afresh. + TransportConnection.killVMBroker(1); + ApplicationRegistry.remove(1); + } + finally + { + NDC.pop(); + } + } + + /* + * Stuff below: + * + * This will get tidied into some sort on JMS convenience framework, through which practically any usefull test + * topology can be created. This will become a replacement for PingPongProducer. + * + * Base everything on standard connection properties defined in PingPongProducer. Split JMS and AMQP-only properties. + * + * Integrate with ConversationFactory, so that it will work with prod/con pairs. + * + * Support pub/rec pairs. + * Support m*n pub/rec setups. All pubs/recs on one machine. + * + * Support bounce back clients, with configurable bounce back behavior. All, one in X, round robin one in m, etc. + * + * Support pairing of m*n pub/rec setups with bounce back clients. JVM running a test, can simulate m publishers, + * will receive (a known subset of) all messages sent, bounced back to n receivers. Co-location of pub/rec will be + * the normal model to allow accurate timings to be taken. + * + * Support creation of pub or rec only. + * Support clock synching of pub/rec on different JVMs, by calculating clock offsets. Must also provide an accuracy + * estimate to +- the results. + * + * Augment the interop Coordinator, to become a full distributed test coordinator. Capable of querying available + * tests machines, looking at test parameters and farming out tests onto the test machines, passing all test + * parameters, standard naming of pub/rec config parameters used to set up m*n test topologies, run test cases, + * report results, tear down m*n topologies. Need to split the re-usable general purpose distributed test coordinator + * from the Qpid specific test framework for creating test-topoloigies and passing Qpid specific parameters. + * + * Write all tests against pub/rec pairs, without coding to the fact that the topology may be anything from 1:1 in + * JVM to m*n with bounce back clients accross many machines. That is, make the test topology orthogonal to the test + * case. + */ + + private static class ExceptionMonitor implements ExceptionListener + { + List exceptions = new ArrayList(); + + public void onException(JMSException e) + { + log.debug("ExceptionMonitor got JMSException: ", e); + + exceptions.add(e); + } + + public boolean assertNoExceptions() + { + return exceptions.isEmpty(); + } + + public boolean assertOneJMSException() + { + return exceptions.size() == 1; + } + + public boolean assertOneJMSExceptionWithLinkedCause(Class aClass) + { + if (exceptions.size() == 1) + { + JMSException e = exceptions.get(0); + + Exception linkedCause = e.getLinkedException(); + + if ((linkedCause != null) && aClass.isInstance(linkedCause)) + { + return true; + } + } + + return false; + } + + public void reset() + { + exceptions = new ArrayList(); + } + } + + /** + * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple + * convenience method for code that does anticipate handling connection failures. All exceptions that indicate + * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure + * handler. + * + * @param messagingProps Any additional connection properties. + * + * @return A JMS conneciton. + * + * @todo Move this to a Utils library class or base test class. Also move the copy in interop.TestClient too. + * + * @todo Make in VM broker creation step optional on whether one is to be used or not. + */ + public static Connection createConnection(ParsedProperties messagingProps) + { + log.debug("public static Connection createConnection(Properties messagingProps = " + messagingProps + "): called"); + + try + { + // Extract the configured connection properties from the test configuration. + String conUsername = messagingProps.getProperty(USERNAME_PROPNAME); + String conPassword = messagingProps.getProperty(PASSWORD_PROPNAME); + String virtualHost = messagingProps.getProperty(VIRTUAL_HOST_PROPNAME); + String brokerUrl = messagingProps.getProperty(BROKER_PROPNAME); + + // Set up the broker connection url. + String connectionString = + "amqp://" + conUsername + ":" + conPassword + "/" + ((virtualHost != null) ? virtualHost : "") + + "?brokerlist='" + brokerUrl + "'"; + + // messagingProps.setProperty(CONNECTION_PROPNAME, connectionString); + + Context ctx = new InitialContext(messagingProps); + + ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); + Connection connection = cf.createConnection(); + + return connection; + } + catch (NamingException e) + { + log.debug("Got NamingException: ", e); + throw new RuntimeException("Got JNDI NamingException whilst looking up the connection factory.", e); + } + catch (JMSException e) + { + log.debug("Got JMSException: ", e); + throw new RuntimeException("Could not establish connection due to JMSException.", e); + } + } + + /** + * Creates a publisher and a receiver on the same connection, configured according the to specified standard + * properties. + * + * @param messagingProps The connection properties. + * + * @return A publisher/receiver client pair. + */ + public static PublisherReceiver createPublisherReceiverPairSharedConnection(ParsedProperties messagingProps) + { + try + { + int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME); + boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME); + String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME); + String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME); + boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME); + boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME); + boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME); + boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME); + boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME); + + // Check if any Qpid/AMQP specific flags or options need to be set. + boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME); + boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME); + boolean needsQpidOptions = immediate | mandatory; + + log.debug("ackMode = " + ackMode); + log.debug("useTopics = " + useTopics); + log.debug("destinationSendRoot = " + destinationSendRoot); + log.debug("destinationReceiveRoot = " + destinationReceiveRoot); + log.debug("createPublisherProducer = " + createPublisherProducer); + log.debug("createPublisherConsumer = " + createPublisherConsumer); + log.debug("createReceiverProducer = " + createReceiverProducer); + log.debug("createReceiverConsumer = " + createReceiverConsumer); + log.debug("transactional = " + transactional); + log.debug("immediate = " + immediate); + log.debug("mandatory = " + mandatory); + log.debug("needsQpidOptions = " + needsQpidOptions); + + // Create connection, sessions and producer/consumer pairs on each session. + Connection connection = createConnection(messagingProps); + + // Add the connection exception listener to assert on exception conditions with. + ExceptionMonitor exceptionMonitor = new ExceptionMonitor(); + connection.setExceptionListener(exceptionMonitor); + + Session publisherSession = connection.createSession(transactional, ackMode); + Session receiverSession = connection.createSession(transactional, ackMode); + + Destination publisherProducerDestination = + useTopics ? publisherSession.createTopic(destinationSendRoot) + : publisherSession.createQueue(destinationSendRoot); + + MessageProducer publisherProducer = + createPublisherProducer + ? (needsQpidOptions + ? ((AMQSession) publisherSession).createProducer(publisherProducerDestination, mandatory, immediate) + : publisherSession.createProducer(publisherProducerDestination)) : null; + + MessageConsumer publisherConsumer = + createPublisherConsumer + ? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null; + + MessageProducer receiverProducer = + createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot)) + : null; + + MessageConsumer receiverConsumer = + createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot)) + : null; + + // Start listening for incoming messages. + connection.start(); + + // Package everything up. + ProducerConsumerPair publisher = + new ProducerConsumerPairImpl(publisherProducer, publisherConsumer, publisherSession); + ProducerConsumerPair receiver = + new ProducerConsumerPairImpl(receiverProducer, receiverConsumer, receiverSession); + + PublisherReceiver result = new PublisherReceiverImpl(publisher, receiver, connection, exceptionMonitor); + + return result; + } + catch (JMSException e) + { + log.debug("Got JMSException: ", e); + throw new RuntimeException("Could not create publisher/receiver pair due to a JMSException.", e); + } + } + + public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException + { + return client.getSession().createMessage(); + } + + /** + * A ProducerConsumerPair is a pair consisting of one message producer and one message consumer. It is a standard + * unit of connectivity allowing a full-duplex conversation to be held, provided both the consumer and producer + * are instantiated and configured. + * + * In some situations a test, or piece of application code will be written with differing numbers of publishers + * and receivers in different roles, where one role produces only and one consumes only. This messaging topology + * can still make use of producer/consumer pairs as standard building blocks, combined into publisher/receiver + * units to fulfill the different messaging roles, with the publishers consumer uninstantiated and the receivers + * producer uninstantiated. Use a {@link PublisherReceiver} for this. + * + *

+ *
CRC Card
Responsibilities + *
Provide a message producer for sending messages. + *
Provide a message consumer for receiving messages. + *
+ * + * @todo Update the {@link org.apache.qpid.util.ConversationFactory} so that it accepts these as the basic + * conversation connection units. + */ + public static interface ProducerConsumerPair + { + public MessageProducer getProducer(); + + public MessageConsumer getConsumer(); + + public void send(Message message) throws JMSException; + + public Session getSession(); + + public void close() throws JMSException; + } + + /** + * A single producer and consumer. + */ + public static class ProducerConsumerPairImpl implements ProducerConsumerPair + { + MessageProducer producer; + + MessageConsumer consumer; + + Session session; + + public ProducerConsumerPairImpl(MessageProducer producer, MessageConsumer consumer, Session session) + { + this.producer = producer; + this.consumer = consumer; + this.session = session; + } + + public MessageProducer getProducer() + { + return null; + } + + public MessageConsumer getConsumer() + { + return null; + } + + public void send(Message message) throws JMSException + { + producer.send(message); + } + + public Session getSession() + { + return session; + } + + public void close() throws JMSException + { + if (producer != null) + { + producer.close(); + } + + if (consumer != null) + { + consumer.close(); + } + } + } + + /** + * Multiple producers and consumers made to look like a single producer and consumer. All methods repeated accross + * all producers and consumers. + */ + public static class MultiProducerConsumerPairImpl implements ProducerConsumerPair + { + public MessageProducer getProducer() + { + throw new RuntimeException("Not implemented."); + } + + public MessageConsumer getConsumer() + { + throw new RuntimeException("Not implemented."); + } + + public void send(Message message) throws JMSException + { + throw new RuntimeException("Not implemented."); + } + + public Session getSession() + { + throw new RuntimeException("Not implemented."); + } + + public void close() + { + throw new RuntimeException("Not implemented."); + } + } + + /** + * A PublisherReceiver consists of two sets of producer/consumer pairs, one for an 'instigating' publisher + * role, and one for a more 'passive' receiver role. + * + *

A set of publishers and receivers forms a typical test configuration where both roles are to be controlled + * from within a single JVM. This is not a particularly usefull arrangement for applications which want to place + * these roles on physically seperate machines and pass messages between them. It is a faily normal arrangement for + * test code though, either to publish and receive messages through an in-VM message broker in order to test its + * expected behaviour, or to publish and receive (possibly bounced back) messages through a seperate broker instance + * in order to take performance timings. In the case of performance timings, the co-location of the publisher and + * receiver means that the timings are taken on the same machine for accurate timing without the need for clock + * synchronization. + * + *

+ *
CRC Card
Responsibilities + *
Manage an m*n array of publisher and recievers. + *
+ */ + public static interface PublisherReceiver + { + public ProducerConsumerPair getPublisher(); + + public ProducerConsumerPair getReceiver(); + + public void start(); + + public void send(ParsedProperties testProps, int numMessages); + + public ExceptionMonitor getConnectionExceptionMonitor(); + + public ExceptionMonitor getExceptionMonitor(); + + public void close(); + } + + public static class PublisherReceiverImpl implements PublisherReceiver + { + private ProducerConsumerPair publisher; + private ProducerConsumerPair receiver; + private Connection connection; + private ExceptionMonitor connectionExceptionMonitor; + private ExceptionMonitor exceptionMonitor; + + public PublisherReceiverImpl(ProducerConsumerPair publisher, ProducerConsumerPair receiver, Connection connection, + ExceptionMonitor connectionExceptionMonitor) + { + this.publisher = publisher; + this.receiver = receiver; + this.connection = connection; + this.connectionExceptionMonitor = connectionExceptionMonitor; + this.exceptionMonitor = new ExceptionMonitor(); + } + + public ProducerConsumerPair getPublisher() + { + return publisher; + } + + public ProducerConsumerPair getReceiver() + { + return receiver; + } + + public void start() + { } + + public void close() + { + try + { + publisher.close(); + receiver.close(); + connection.close(); + } + catch (JMSException e) + { + throw new RuntimeException("Got JMSException during close.", e); + } + } + + public ExceptionMonitor getConnectionExceptionMonitor() + { + return connectionExceptionMonitor; + } + + public ExceptionMonitor getExceptionMonitor() + { + return exceptionMonitor; + } + + public void send(ParsedProperties testProps, int numMessages) + { + boolean transactional = testProps.getPropertyAsBoolean(TRANSACTED_PROPNAME); + + // Send an immediate message through the publisher and ensure that it results in a JMSException. + try + { + getPublisher().send(createTestMessage(getPublisher(), testProps)); + + if (transactional) + { + getPublisher().getSession().commit(); + } + } + catch (JMSException e) + { + log.debug("Got JMSException: ", e); + exceptionMonitor.onException(e); + } + } + + public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */) + { + PublisherReceiver testClients; + + // Create a standard publisher/receiver test client pair on a shared connection, individual sessions. + testClients = createPublisherReceiverPairSharedConnection(testProps); + testClients.start(); + + testClients.send(testProps, 1); + + pause(1000L); + + String errors = ""; + + if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass)) + { + errors += "Was expecting linked exception type " + aClass.getName() + ".\n"; + } + + // Clean up the publisher/receiver client pair. + testClients.close(); + + assertEquals(errors, "", errors); + } + + /** + */ + public static void testNoExceptions(ParsedProperties testProps) + { + PublisherReceiver testClients; + + // Create a standard publisher/receiver test client pair on a shared connection, individual sessions. + testClients = createPublisherReceiverPairSharedConnection(testProps); + testClients.start(); + + testClients.send(testProps, 1); + + pause(1000L); + + String errors = ""; + + if (!testClients.getConnectionExceptionMonitor().assertNoExceptions()) + { + errors += "There were connection exceptions.\n"; + } + + if (!testClients.getExceptionMonitor().assertNoExceptions()) + { + errors += "There were exceptions on producer.\n"; + } + + // Clean up the publisher/receiver client pair. + testClients.close(); + + assertEquals(errors, "", errors); + } + } + + /** + * Pauses for the specified length of time. In the event of failing to pause for at least that length of time + * due to interuption of the thread, a RutimeException is raised to indicate the failure. The interupted status + * of the thread is restores in that case. This method should only be used when it is expected that the pause + * will be succesfull, for example in test code that relies on inejecting a pause. + * + * @param t The minimum time to pause for in milliseconds. + */ + public static void pause(long t) + { + try + { + Thread.sleep(t); + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + + throw new RuntimeException("Failed to generate the requested pause length.", e); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java new file mode 100644 index 0000000000..f41acca11b --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; + +import org.apache.log4j.NDC; + +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.client.transport.TransportConnection; +import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +/** + * MandatoryMessageTest tests for the desired behaviour of mandatory messages. Mandatory messages are a non-JMS + * feature. A message may be marked with a mandatory delivery flag, which means that a valid route for the message + * must exist, when it is sent, or when its transaction is committed in the case of transactional messaging. If this + * is not the case, the broker should return the message with a NO_CONSUMERS code. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Check that a mandatory message is sent succesfully not using transactions when a consumer is connected. + *
Check that a mandatory message is committed succesfully in a transaction when a consumer is connected. + *
Check that a mandatory message results in no route code, not using transactions, when no consumer is + * connected. + *
Check that a mandatory message results in no route code, upon transaction commit, when a consumer is + * connected. + *
+ */ +public class MandatoryMessageTest extends TestCase +{ + /** Used for debugging. */ + private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class); + + /** Used to read the tests configurable properties through. */ + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /** All these tests should have the mandatory flag on. */ + // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true); + private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true); + + /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */ + public void test_QPID_508_MandatoryOkNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Send one message with no errors. + ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */ + public void test_QPID_508_MandatoryOkTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Send one message with no errors. + ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps); + } + + /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */ + public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class); + } + + /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */ + public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + // Send one message and get a linked no consumers exception. + ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class); + } + + protected void setUp() throws Exception + { + NDC.push(getName()); + + // Ensure that the in-vm broker is created. + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + try + { + // Ensure that the in-vm broker is cleaned up so that the next test starts afresh. + TransportConnection.killVMBroker(1); + ApplicationRegistry.remove(1); + } + finally + { + NDC.pop(); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java new file mode 100644 index 0000000000..9c8cefc492 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java @@ -0,0 +1,282 @@ +package org.apache.qpid.server.exchange; + +import org.apache.qpid.jms.Session; + +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +/** + * MessagingTestConfigProperties defines a set of property names and default values for specifying a messaging topology, + * and test parameters for running a messaging test over that topology. A Properties object holding some of these + * properties, superimposed onto the defaults, is used to establish test topologies and control test behaviour. + * + *

A complete list of the parameters, default values and comments on their usage is provided here: + * + *

+ *
Parameters
Parameter Default Comments + *
messageSize 0 Message size in bytes. Not including any headers. + *
destinationName ping The root name to use to generate destination names to ping. + *
persistent false Determines whether peristent delivery is used. + *
transacted false Determines whether messages are sent/received in transactions. + *
broker tcp://localhost:5672 Determines the broker to connect to. + *
virtualHost test Determines the virtual host to send all ping over. + *
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. + *
verbose false The verbose flag for debugging. Prints to console on every message. + *
pubsub false Whether to ping topics or queues. Uses p2p by default. + *
username guest The username to access the broker with. + *
password guest The password to access the broker with. + *
selector null Not used. Defines a message selector to filter pings with. + *
destinationCount 1 The number of receivers listening to the pings. + *
timeout 30000 In milliseconds. The timeout to stop waiting for replies. + *
commitBatchSize 1 The number of messages per transaction in transactional mode. + *
uniqueDests true Whether each receiver only listens to one ping destination or all. + *
durableDests false Whether or not durable destinations are used. + *
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: + * 0 - SESSION_TRANSACTED + * 1 - AUTO_ACKNOWLEDGE + * 2 - CLIENT_ACKNOWLEDGE + * 3 - DUPS_OK_ACKNOWLEDGE + * 257 - NO_ACKNOWLEDGE + * 258 - PRE_ACKNOWLEDGE + *
maxPending 0 The maximum size in bytes, of messages sent but not yet received. + * Limits the volume of messages currently buffered on the client + * or broker. Can help scale test clients by limiting amount of buffered + * data to avoid out of memory errors. + *
+ * + *

+ *
CRC Card
Responsibilities Collaborations + *
Provide the names and defaults of all test parameters. + *
+ */ +public class MessagingTestConfigProperties +{ + // ====================== Connection Properties ================================== + + /** Holds the name of the default connection configuration. */ + public static final String CONNECTION_NAME = "broker"; + + /** Holds the name of the property to get the initial context factory name from. */ + public static final String INITIAL_CONTEXT_FACTORY_PROPNAME = "java.naming.factory.initial"; + + /** Defines the class to use as the initial context factory by default. */ + public static final String INITIAL_CONTEXT_FACTORY_DEFAULT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + /** Holds the name of the default connection factory configuration property. */ + public static final String CONNECTION_PROPNAME = "connectionfactory.broker"; + + /** Defeins the default connection configuration. */ + public static final String CONNECTION_DEFAULT = "amqp://guest:guest@clientid/?brokerlist='vm://:1'"; + + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "qpid.test.broker"; + + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "vm://:1"; + + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /** Holds the default virtual path for the test. */ + public static final String VIRTUAL_HOST_DEFAULT = ""; + + /** Holds the name of the property to get the broker access username from. */ + public static final String USERNAME_PROPNAME = "username"; + + /** Holds the default broker log on username. */ + public static final String USERNAME_DEFAULT = "guest"; + + /** Holds the name of the property to get the broker access password from. */ + public static final String PASSWORD_PROPNAME = "password"; + + /** Holds the default broker log on password. */ + public static final String PASSWORD_DEFAULT = "guest"; + + // ====================== Messaging Topology Properties ========================== + + /** Holds the name of the property to get the bind publisher procuder flag from. */ + public static final String PUBLISHER_PRODUCER_BIND_PROPNAME = "publisherProducerBind"; + + /** Holds the default value of the publisher producer flag. */ + public static final boolean PUBLISHER_PRODUCER_BIND_DEFAULT = true; + + /** Holds the name of the property to get the bind publisher procuder flag from. */ + public static final String PUBLISHER_CONSUMER_BIND_PROPNAME = "publisherConsumerBind"; + + /** Holds the default value of the publisher consumer flag. */ + public static final boolean PUBLISHER_CONSUMER_BIND_DEFAULT = false; + + /** Holds the name of the property to get the bind receiver procuder flag from. */ + public static final String RECEIVER_PRODUCER_BIND_PROPNAME = "receiverProducerBind"; + + /** Holds the default value of the receiver producer flag. */ + public static final boolean RECEIVER_PRODUCER_BIND_DEFAULT = false; + + /** Holds the name of the property to get the bind receiver procuder flag from. */ + public static final String RECEIVER_CONSUMER_BIND_PROPNAME = "receiverConsumerBind"; + + /** Holds the default value of the receiver consumer flag. */ + public static final boolean RECEIVER_CONSUMER_BIND_DEFAULT = true; + + /** Holds the name of the property to get the destination name root from. */ + public static final String SEND_DESTINATION_NAME_ROOT_PROPNAME = "sendDestinationRoot"; + + /** Holds the root of the name of the default destination to send to. */ + public static final String SEND_DESTINATION_NAME_ROOT_DEFAULT = "sendTo"; + + /** Holds the name of the property to get the destination name root from. */ + public static final String RECEIVE_DESTINATION_NAME_ROOT_PROPNAME = "receiveDestinationRoot"; + + /** Holds the root of the name of the default destination to send to. */ + public static final String RECEIVE_DESTINATION_NAME_ROOT_DEFAULT = "receiveFrom"; + + /** Holds the name of the proeprty to get the destination count from. */ + public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; + + /** Defines the default number of destinations to ping. */ + public static final int DESTINATION_COUNT_DEFAULT = 1; + + /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ + public static final String PUBSUB_PROPNAME = "pubsub"; + + /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + public static final boolean PUBSUB_DEFAULT = false; + + // ====================== JMS Options and Flags ================================= + + /** Holds the name of the property to get the test delivery mode from. */ + public static final String PERSISTENT_MODE_PROPNAME = "persistent"; + + /** Holds the message delivery mode to use for the test. */ + public static final boolean PERSISTENT_MODE_DEFAULT = false; + + /** Holds the name of the property to get the test transactional mode from. */ + public static final String TRANSACTED_PROPNAME = "transacted"; + + /** Holds the transactional mode to use for the test. */ + public static final boolean TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to set the no local flag from. */ + public static final String NO_LOCAL_PROPNAME = "noLocal"; + + /** Defines the default value of the no local flag to use when consuming messages. */ + public static final boolean NO_LOCAL_DEFAULT = false; + + /** Holds the name of the property to get the message acknowledgement mode from. */ + public static final String ACK_MODE_PROPNAME = "ackMode"; + + /** Defines the default message acknowledgement mode. */ + public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + // ====================== Qpid Options and Flags ================================ + + /** Holds the name of the property to set the exclusive flag from. */ + public static final String EXCLUSIVE_PROPNAME = "exclusive"; + + /** Defines the default value of the exclusive flag to use when consuming messages. */ + public static final boolean EXCLUSIVE_DEFAULT = false; + + /** Holds the name of the property to set the immediate flag from. */ + public static final String IMMEDIATE_PROPNAME = "immediate"; + + /** Defines the default value of the immediate flag to use when sending messages. */ + public static final boolean IMMEDIATE_DEFAULT = false; + + /** Holds the name of the property to set the mandatory flag from. */ + public static final String MANDATORY_PROPNAME = "mandatory"; + + /** Defines the default value of the mandatory flag to use when sending messages. */ + public static final boolean MANDATORY_DEFAULT = false; + + /** Holds the name of the property to get the durable destinations flag from. */ + public static final String DURABLE_DESTS_PROPNAME = "durableDests"; + + /** Default value for the durable destinations flag. */ + public static final boolean DURABLE_DESTS_DEFAULT = false; + + /** Holds the name of the proeprty to set the prefetch size from. */ + public static final String PREFECTH_PROPNAME = "prefetch"; + + /** Defines the default prefetch size to use when consuming messages. */ + public static final int PREFETCH_DEFAULT = 100; + + // ====================== Common Test Parameters ================================ + + /** Holds the name of the property to get the test message size from. */ + public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; + + /** Used to set up a default message size. */ + public static final int MESSAGE_SIZE_DEAFULT = 0; + + /** Holds the name of the property to get the message rate from. */ + public static final String RATE_PROPNAME = "rate"; + + /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + public static final int RATE_DEFAULT = 0; + + /** Holds the name of the proeprty to get the. */ + public static final String SELECTOR_PROPNAME = "selector"; + + /** Holds the default message selector. */ + public static final String SELECTOR_DEFAULT = ""; + + /** Holds the name of the property to get the waiting timeout for response messages. */ + public static final String TIMEOUT_PROPNAME = "timeout"; + + /** Default time to wait before assuming that a ping has timed out. */ + public static final long TIMEOUT_DEFAULT = 30000; + + /** Holds the name of the property to get the commit batch size from. */ + public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; + + /** Defines the default number of pings to send in each transaction when running transactionally. */ + public static final int TX_BATCH_SIZE_DEFAULT = 1; + + /** Holds the name of the property to set the maximum amount of pending message data for a producer to hold. */ + public static final String MAX_PENDING_PROPNAME = "maxPending"; + + /** Defines the default maximum quantity of pending message data to allow producers to hold. */ + public static final int MAX_PENDING_DEFAULT = 0; + + /** Holds the name of the property to get the verbose mode proeprty from. */ + public static final String VERBOSE_PROPNAME = "verbose"; + + /** Holds the default verbose mode. */ + public static final boolean VERBOSE_DEFAULT = false; + + /** Holds the default configuration properties. */ + public static ParsedProperties defaults = new ParsedProperties(); + + static + { + defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT); + defaults.setPropertyIfNull(CONNECTION_PROPNAME, CONNECTION_DEFAULT); + defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); + defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT); + defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT); + defaults.setPropertyIfNull(RECEIVER_PRODUCER_BIND_PROPNAME, RECEIVER_PRODUCER_BIND_DEFAULT); + defaults.setPropertyIfNull(RECEIVER_CONSUMER_BIND_PROPNAME, RECEIVER_CONSUMER_BIND_DEFAULT); + defaults.setPropertyIfNull(SEND_DESTINATION_NAME_ROOT_PROPNAME, SEND_DESTINATION_NAME_ROOT_DEFAULT); + defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT); + defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); + defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); + defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); + defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); + defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); + defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); + defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT); + defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT); + defaults.setPropertyIfNull(EXCLUSIVE_PROPNAME, EXCLUSIVE_DEFAULT); + defaults.setPropertyIfNull(IMMEDIATE_PROPNAME, IMMEDIATE_DEFAULT); + defaults.setPropertyIfNull(MANDATORY_PROPNAME, MANDATORY_DEFAULT); + } +} -- cgit v1.2.1