From ac3a76192ec0782a2110a67c3bab8bfebf7ec0e0 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 23 Apr 2007 15:58:04 +0000 Subject: Update to system test so that the run as part of the build process as they were not running. Change to AMQMessage to ensure that the TxAckTest passes. Was failing as the reference count was being changed out of the increment/decrementReference methods git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@531515 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/queue/AMQMessage.java | 2 +- java/systests/pom.xml | 4 +- .../java/org/apache/qpid/server/ack/TxAckTest.java | 22 +- .../apache/qpid/server/queue/ConcurrencyTest.java | 265 -------------------- .../qpid/server/queue/ConcurrencyTestDisabled.java | 265 ++++++++++++++++++++ .../qpid/server/queue/MessageTestHelper.java | 3 +- .../apache/qpid/server/queue/PersistentTest.java | 276 --------------------- .../qpid/server/queue/PersistentTestManual.java | 276 +++++++++++++++++++++ .../main/java/org/apache/qpid/test/VMTestCase.java | 5 + .../apache/qpid/test/client/QueueBrowserTest.java | 6 +- 10 files changed, 575 insertions(+), 549 deletions(-) delete mode 100644 java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java create mode 100644 java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java delete mode 100644 java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java create mode 100644 java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index e19038504f..955aaa6acb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -366,7 +366,7 @@ public class AMQMessage */ public AMQMessage takeReference() { - _referenceCount.incrementAndGet(); + incrementReference();// _referenceCount.incrementAndGet(); return this; } diff --git a/java/systests/pom.xml b/java/systests/pom.xml index 614166754c..d9d07ed6f9 100644 --- a/java/systests/pom.xml +++ b/java/systests/pom.xml @@ -62,7 +62,8 @@ org.apache.maven.plugins maven-surefire-plugin - true + ${basedir}/src/main + target/classes @@ -89,3 +90,4 @@ + diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 9fcd88b1a8..3ee8277eba 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -24,6 +24,8 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; @@ -102,7 +104,7 @@ public class TxAckTest extends TestCase _storeContext, null, new LinkedList(), new HashSet()); - for(int i = 0; i < messageCount; i++) + for (int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; @@ -144,7 +146,7 @@ public class TxAckTest extends TestCase private void assertCount(List tags, int expected) { - for(long tag : tags) + for (long tag : tags) { UnacknowledgedMessage u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); @@ -161,6 +163,7 @@ public class TxAckTest extends TestCase assertCount(_unacked, 0); } + void undoPrepare() { _op.consolidate(); @@ -175,7 +178,6 @@ public class TxAckTest extends TestCase _op.consolidate(); _op.commit(_storeContext); - //check acked messages are removed from map Set keys = new HashSet(_map.getDeliveryTags()); keys.retainAll(_acked); @@ -195,6 +197,20 @@ public class TxAckTest extends TestCase TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) { super(messageId, publishBody, txnContext); + try + { + setContentHeaderBody(new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); + } + catch (AMQException e) + { + // won't happen + } _tag = tag; } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java deleted file mode 100644 index 4971db2d28..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.util.*; -import java.util.concurrent.Executor; - -/** - * Tests delivery in the face of concurrent incoming _messages, subscription alterations - * and attempts to asynchronously process queued _messages. - */ -public class ConcurrencyTest extends MessageTestHelper -{ - private final Random random = new Random(); - - private final int numMessages = 1000; - - private final List _subscribers = new ArrayList(); - private final Set _active = new HashSet(); - private final List _messages = new ArrayList(); - private int next = 0;//index to next message to send - private final List _received = Collections.synchronizedList(new ArrayList()); - private final Executor _executor = new OnCurrentThreadExecutor(); - private final List _threads = new ArrayList(); - - private final SubscriptionSet _subscriptionMgr = new SubscriptionSet(); - private final DeliveryManager _deliveryMgr; - - private boolean isComplete; - private boolean failed; - private VirtualHost _virtualHost; - - public ConcurrencyTest() throws Exception - { - - IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); - _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, - _virtualHost)); - } - - public void testConcurrent1() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(1, 4, 4, 4); - doRun(); - check(); - } - - public void testConcurrent2() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(4, 2, 2, 2); - doRun(); - check(); - } - - void check() - { - assertFalse("Failed", failed); - - _deliveryMgr.processAsync(_executor); - - assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size()); - for(int i = 0; i < _messages.size(); i++) - { - assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i)); - } - } - - void initSubscriptions(int subscriptions) - { - for(int i = 0; i < subscriptions; i++) - { - _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received)); - } - } - - void initMessages(int messages) throws AMQException - { - for(int i = 0; i < messages; i++) - { - _messages.add(message()); - } - } - - void initThreads(int senders, int subscribers, int suspenders, int processors) - { - addThreads(senders, senders == 1 ? new Sender() : new OrderedSender()); - addThreads(subscribers, new Subscriber()); - addThreads(suspenders, new Suspender()); - addThreads(processors, new Processor()); - } - - void addThreads(int count, Runnable runner) - { - for(int i = 0; i < count; i++) - { - _threads.add(new Thread(runner, runner.toString())); - } - } - - void doRun() throws InterruptedException - { - for(Thread t : _threads) - { - t.start(); - } - - for(Thread t : _threads) - { - t.join(); - } - } - - private void toggle(Subscription s) - { - synchronized (_active) - { - if (_active.contains(s)) - { - _active.remove(s); - Subscription result = _subscriptionMgr.removeSubscriber(s); - assertTrue("Removed subscription " + result + " but trying to remove subscription " + s, - result != null && result.equals(s)); - } - else - { - _active.add(s); - _subscriptionMgr.addSubscriber(s); - } - } - } - - private AMQMessage nextMessage() - { - synchronized (_messages) - { - if (next < _messages.size()) - { - return _messages.get(next++); - } - else - { - if (!_deliveryMgr.hasQueuedMessages()) { - isComplete = true; - } - return null; - } - } - } - - private boolean randomBoolean() - { - return random.nextBoolean(); - } - - private SubscriptionTestHelper randomSubscriber() - { - return _subscribers.get(random.nextInt(_subscribers.size())); - } - - private class Sender extends Runner - { - void doRun() throws Throwable - { - AMQMessage msg = nextMessage(); - if (msg != null) - { - _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); - } - } - } - - private class OrderedSender extends Sender - { - synchronized void doRun() throws Throwable - { - super.doRun(); - } - } - - private class Suspender extends Runner - { - void doRun() throws Throwable - { - randomSubscriber().setSuspended(randomBoolean()); - } - } - - private class Subscriber extends Runner - { - void doRun() throws Throwable - { - toggle(randomSubscriber()); - } - } - - private class Processor extends Runner - { - void doRun() throws Throwable - { - _deliveryMgr.processAsync(_executor); - } - } - - private abstract class Runner implements Runnable - { - public void run() - { - try - { - while (!stop()) - { - doRun(); - } - } - catch (Throwable t) - { - failed = true; - t.printStackTrace(); - } - } - - abstract void doRun() throws Throwable; - - boolean stop() - { - return isComplete || failed; - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ConcurrencyTest.class); - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java new file mode 100644 index 0000000000..068f37574d --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java @@ -0,0 +1,265 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.*; +import java.util.concurrent.Executor; + +/** + * Tests delivery in the face of concurrent incoming _messages, subscription alterations + * and attempts to asynchronously process queued _messages. + */ +public class ConcurrencyTestDisabled extends MessageTestHelper +{ + private final Random random = new Random(); + + private final int numMessages = 1000; + + private final List _subscribers = new ArrayList(); + private final Set _active = new HashSet(); + private final List _messages = new ArrayList(); + private int next = 0;//index to next message to send + private final List _received = Collections.synchronizedList(new ArrayList()); + private final Executor _executor = new OnCurrentThreadExecutor(); + private final List _threads = new ArrayList(); + + private final SubscriptionSet _subscriptionMgr = new SubscriptionSet(); + private final DeliveryManager _deliveryMgr; + + private boolean isComplete; + private boolean failed; + private VirtualHost _virtualHost; + + public ConcurrencyTestDisabled() throws Exception + { + + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, + _virtualHost)); + } + + public void testConcurrent1() throws InterruptedException, AMQException + { + initSubscriptions(10); + initMessages(numMessages); + initThreads(1, 4, 4, 4); + doRun(); + check(); + } + + public void testConcurrent2() throws InterruptedException, AMQException + { + initSubscriptions(10); + initMessages(numMessages); + initThreads(4, 2, 2, 2); + doRun(); + check(); + } + + void check() + { + assertFalse("Failed", failed); + + _deliveryMgr.processAsync(_executor); + + assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size()); + for(int i = 0; i < _messages.size(); i++) + { + assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i)); + } + } + + void initSubscriptions(int subscriptions) + { + for(int i = 0; i < subscriptions; i++) + { + _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received)); + } + } + + void initMessages(int messages) throws AMQException + { + for(int i = 0; i < messages; i++) + { + _messages.add(message()); + } + } + + void initThreads(int senders, int subscribers, int suspenders, int processors) + { + addThreads(senders, senders == 1 ? new Sender() : new OrderedSender()); + addThreads(subscribers, new Subscriber()); + addThreads(suspenders, new Suspender()); + addThreads(processors, new Processor()); + } + + void addThreads(int count, Runnable runner) + { + for(int i = 0; i < count; i++) + { + _threads.add(new Thread(runner, runner.toString())); + } + } + + void doRun() throws InterruptedException + { + for(Thread t : _threads) + { + t.start(); + } + + for(Thread t : _threads) + { + t.join(); + } + } + + private void toggle(Subscription s) + { + synchronized (_active) + { + if (_active.contains(s)) + { + _active.remove(s); + Subscription result = _subscriptionMgr.removeSubscriber(s); + assertTrue("Removed subscription " + result + " but trying to remove subscription " + s, + result != null && result.equals(s)); + } + else + { + _active.add(s); + _subscriptionMgr.addSubscriber(s); + } + } + } + + private AMQMessage nextMessage() + { + synchronized (_messages) + { + if (next < _messages.size()) + { + return _messages.get(next++); + } + else + { + if (!_deliveryMgr.hasQueuedMessages()) { + isComplete = true; + } + return null; + } + } + } + + private boolean randomBoolean() + { + return random.nextBoolean(); + } + + private SubscriptionTestHelper randomSubscriber() + { + return _subscribers.get(random.nextInt(_subscribers.size())); + } + + private class Sender extends Runner + { + void doRun() throws Throwable + { + AMQMessage msg = nextMessage(); + if (msg != null) + { + _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); + } + } + } + + private class OrderedSender extends Sender + { + synchronized void doRun() throws Throwable + { + super.doRun(); + } + } + + private class Suspender extends Runner + { + void doRun() throws Throwable + { + randomSubscriber().setSuspended(randomBoolean()); + } + } + + private class Subscriber extends Runner + { + void doRun() throws Throwable + { + toggle(randomSubscriber()); + } + } + + private class Processor extends Runner + { + void doRun() throws Throwable + { + _deliveryMgr.processAsync(_executor); + } + } + + private abstract class Runner implements Runnable + { + public void run() + { + try + { + while (!stop()) + { + doRun(); + } + } + catch (Throwable t) + { + failed = true; + t.printStackTrace(); + } + } + + abstract void doRun() throws Throwable; + + boolean stop() + { + return isComplete || failed; + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ConcurrencyTestDisabled.class); + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java index 03a56df487..88272023e8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; @@ -51,7 +52,7 @@ class MessageTestHelper extends TestCase MessageTestHelper() throws Exception { - ApplicationRegistry.initialise(new TestApplicationRegistry()); + ApplicationRegistry.initialise(new NullApplicationRegistry()); } AMQMessage message() throws AMQException diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java deleted file mode 100644 index 4ad10b68ff..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTest.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQConnectionClosedException; -import org.apache.qpid.util.CommandLineParser; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; - -import javax.jms.Session; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; -import java.io.IOException; -import java.util.Properties; - -public class PersistentTest -{ - private static final Logger _logger = Logger.getLogger(PersistentTest.class); - - - private static final String QUEUE = "direct://amq.direct//PersistentTest-Queue2?durable='true',exclusive='true'"; - - protected AMQConnection _connection; - - protected Session _session; - - protected Queue _queue; - private Properties properties; - - private String _brokerDetails; - private String _username; - private String _password; - private String _virtualpath; - - public PersistentTest(Properties overrides) - { - properties = new Properties(defaults); - properties.putAll(overrides); - - _brokerDetails = properties.getProperty(BROKER_PROPNAME); - _username = properties.getProperty(USERNAME_PROPNAME); - _password = properties.getProperty(PASSWORD_PROPNAME); - _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); - - createConnection(); - } - - protected void createConnection() - { - try - { - _connection = new AMQConnection(_brokerDetails, _username, _password, "PersistentTest", _virtualpath); - - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _connection.start(); - } - catch (Exception e) - { - _logger.error("Unable to create test class due to:" + e.getMessage(), e); - System.exit(0); - } - } - - public void test() throws AMQException, URLSyntaxException - { - - //Create the Durable Queue - try - { - _session.createConsumer(_session.createQueue(QUEUE)).close(); - } - catch (JMSException e) - { - _logger.error("Unable to create Queue due to:" + e.getMessage(), e); - System.exit(0); - } - - try - { - if (testQueue()) - { - // close connection - _connection.close(); - // wait - System.out.println("Restart Broker Now"); - try - { - System.in.read(); - } - catch (IOException e) - { - // - } - finally - { - System.out.println("Continuing...."); - } - - //Test queue is still there. - AMQConnection connection = new AMQConnection(_brokerDetails, _username, _password, "DifferentClientID", _virtualpath); - - AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - try - { - session.createConsumer(session.createQueue(QUEUE)); - _logger.error("Create consumer succeeded." + - " This shouldn't be allowed as this means the queue didn't exist when it should"); - - connection.close(); - - exit(); - } - catch (JMSException e) - { - try - { - connection.close(); - } - catch (JMSException cce) - { - if (cce.getLinkedException() instanceof AMQConnectionClosedException) - { - _logger.error("Channel Close Bug still present QPID-432, should see an 'Error closing session'"); - } - else - { - exit(cce); - } - } - - if (e.getLinkedException() instanceof AMQChannelClosedException) - { - _logger.info("AMQChannelClosedException received as expected"); - } - else - { - exit(e); - } - } - } - } - catch (JMSException e) - { - _logger.error("Unable to test Queue due to:" + e.getMessage(), e); - System.exit(0); - } - } - - private void exit(JMSException e) - { - _logger.error("JMSException received:" + e.getMessage()); - e.printStackTrace(); - exit(); - } - - private void exit() - { - try - { - _connection.close(); - } - catch (JMSException e) - { - // - } - System.exit(0); - } - - private boolean testQueue() throws JMSException - { - String TEST_TEXT = "init"; - - //Create a new session to send producer - Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue q = session.createQueue(QUEUE); - MessageProducer producer = session.createProducer(q); - - producer.send(session.createTextMessage(TEST_TEXT)); - - //create a new consumer on the original session - TextMessage m = (TextMessage) _session.createConsumer(q).receive(); - - - if ((m != null) && m.getText().equals(TEST_TEXT)) - { - return true; - } - else - { - _logger.error("Incorrect values returned from Queue Test:" + m); - System.exit(0); - return false; - } - } - - /** Holds the name of the property to get the test broker url from. */ - public static final String BROKER_PROPNAME = "broker"; - - /** Holds the default broker url for the test. */ - public static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** Holds the name of the property to get the test broker virtual path. */ - public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; - - /** Holds the default virtual path for the test. */ - public static final String VIRTUAL_HOST_DEFAULT = ""; - - /** Holds the name of the property to get the broker access username from. */ - public static final String USERNAME_PROPNAME = "username"; - - /** Holds the default broker log on username. */ - public static final String USERNAME_DEFAULT = "guest"; - - /** Holds the name of the property to get the broker access password from. */ - public static final String PASSWORD_PROPNAME = "password"; - - /** Holds the default broker log on password. */ - public static final String PASSWORD_DEFAULT = "guest"; - - /** Holds the default configuration properties. */ - public static Properties defaults = new Properties(); - - static - { - defaults.setProperty(BROKER_PROPNAME, BROKER_DEFAULT); - defaults.setProperty(USERNAME_PROPNAME, USERNAME_DEFAULT); - defaults.setProperty(PASSWORD_PROPNAME, PASSWORD_DEFAULT); - defaults.setProperty(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); - } - - public static void main(String[] args) - { - PersistentTest test; - - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{})); - - - test = new PersistentTest(options); - try - { - test.test(); - System.out.println("Test was successfull."); - } - catch (Exception e) - { - _logger.error("Unable to test due to:" + e.getMessage(), e); - } - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java new file mode 100644 index 0000000000..5abbbd2aae --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.Session; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; +import java.io.IOException; +import java.util.Properties; + +public class PersistentTestManual +{ + private static final Logger _logger = Logger.getLogger(PersistentTestManual.class); + + + private static final String QUEUE = "direct://amq.direct//PersistentTest-Queue2?durable='true',exclusive='true'"; + + protected AMQConnection _connection; + + protected Session _session; + + protected Queue _queue; + private Properties properties; + + private String _brokerDetails; + private String _username; + private String _password; + private String _virtualpath; + + public PersistentTestManual(Properties overrides) + { + properties = new Properties(defaults); + properties.putAll(overrides); + + _brokerDetails = properties.getProperty(BROKER_PROPNAME); + _username = properties.getProperty(USERNAME_PROPNAME); + _password = properties.getProperty(PASSWORD_PROPNAME); + _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); + + createConnection(); + } + + protected void createConnection() + { + try + { + _connection = new AMQConnection(_brokerDetails, _username, _password, "PersistentTest", _virtualpath); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _connection.start(); + } + catch (Exception e) + { + _logger.error("Unable to create test class due to:" + e.getMessage(), e); + System.exit(0); + } + } + + public void test() throws AMQException, URLSyntaxException + { + + //Create the Durable Queue + try + { + _session.createConsumer(_session.createQueue(QUEUE)).close(); + } + catch (JMSException e) + { + _logger.error("Unable to create Queue due to:" + e.getMessage(), e); + System.exit(0); + } + + try + { + if (testQueue()) + { + // close connection + _connection.close(); + // wait + System.out.println("Restart Broker Now"); + try + { + System.in.read(); + } + catch (IOException e) + { + // + } + finally + { + System.out.println("Continuing...."); + } + + //Test queue is still there. + AMQConnection connection = new AMQConnection(_brokerDetails, _username, _password, "DifferentClientID", _virtualpath); + + AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try + { + session.createConsumer(session.createQueue(QUEUE)); + _logger.error("Create consumer succeeded." + + " This shouldn't be allowed as this means the queue didn't exist when it should"); + + connection.close(); + + exit(); + } + catch (JMSException e) + { + try + { + connection.close(); + } + catch (JMSException cce) + { + if (cce.getLinkedException() instanceof AMQConnectionClosedException) + { + _logger.error("Channel Close Bug still present QPID-432, should see an 'Error closing session'"); + } + else + { + exit(cce); + } + } + + if (e.getLinkedException() instanceof AMQChannelClosedException) + { + _logger.info("AMQChannelClosedException received as expected"); + } + else + { + exit(e); + } + } + } + } + catch (JMSException e) + { + _logger.error("Unable to test Queue due to:" + e.getMessage(), e); + System.exit(0); + } + } + + private void exit(JMSException e) + { + _logger.error("JMSException received:" + e.getMessage()); + e.printStackTrace(); + exit(); + } + + private void exit() + { + try + { + _connection.close(); + } + catch (JMSException e) + { + // + } + System.exit(0); + } + + private boolean testQueue() throws JMSException + { + String TEST_TEXT = "init"; + + //Create a new session to send producer + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue q = session.createQueue(QUEUE); + MessageProducer producer = session.createProducer(q); + + producer.send(session.createTextMessage(TEST_TEXT)); + + //create a new consumer on the original session + TextMessage m = (TextMessage) _session.createConsumer(q).receive(); + + + if ((m != null) && m.getText().equals(TEST_TEXT)) + { + return true; + } + else + { + _logger.error("Incorrect values returned from Queue Test:" + m); + System.exit(0); + return false; + } + } + + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "broker"; + + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "tcp://localhost:5672"; + + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /** Holds the default virtual path for the test. */ + public static final String VIRTUAL_HOST_DEFAULT = ""; + + /** Holds the name of the property to get the broker access username from. */ + public static final String USERNAME_PROPNAME = "username"; + + /** Holds the default broker log on username. */ + public static final String USERNAME_DEFAULT = "guest"; + + /** Holds the name of the property to get the broker access password from. */ + public static final String PASSWORD_PROPNAME = "password"; + + /** Holds the default broker log on password. */ + public static final String PASSWORD_DEFAULT = "guest"; + + /** Holds the default configuration properties. */ + public static Properties defaults = new Properties(); + + static + { + defaults.setProperty(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setProperty(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setProperty(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setProperty(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + } + + public static void main(String[] args) + { + PersistentTestManual test; + + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{})); + + + test = new PersistentTestManual(options); + try + { + test.test(); + System.out.println("Test was successfull."); + } + catch (Exception e) + { + _logger.error("Unable to test due to:" + e.getMessage(), e); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java index 31fd77691d..540c91ddaf 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -114,4 +114,9 @@ public class VMTestCase extends TestCase TransportConnection.killVMBroker(1); super.tearDown(); } + + public void testDummyinVMTestCase() + { + // keep maven happy + } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java index ac65eec979..ec9df8f1b3 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.test.client; -import org.apache.qpid.test.VMTestCase; import org.apache.log4j.Logger; +import org.apache.qpid.test.VMTestCase; import javax.jms.Queue; import javax.jms.ConnectionFactory; @@ -36,6 +36,8 @@ import javax.jms.QueueReceiver; import javax.jms.Message; import java.util.Enumeration; +import junit.framework.TestCase; + public class QueueBrowserTest extends VMTestCase { private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class); @@ -87,7 +89,7 @@ public class QueueBrowserTest extends VMTestCase * */ - public void queueBrowserMsgsRemainOnQueueTest() throws JMSException + public void testQueueBrowserMsgsRemainOnQueue() throws JMSException { // create QueueBrowser -- cgit v1.2.1