summaryrefslogtreecommitdiff
path: root/java/systests/src/test
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-12 01:03:21 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-12 01:03:21 +0000
commit7d02021e1405afa249b3f9c6427b3a6ea7095d15 (patch)
tree1e06e7776c7a4bb2838ad2495579ad24bc4a503f /java/systests/src/test
parent9eef1e1f7ddc4c4ace42c3edbc3fa9db1e509666 (diff)
downloadqpid-python-7d02021e1405afa249b3f9c6427b3a6ea7095d15.tar.gz
QPID-146 QPID-112 QPID-278
Summary Reworked a lot of the distribution work done by the build system. This ended up with me creating a reduced client distribution (hope that is ok Steve) Each module now has has a distribution directory (except common it may need a tests build later) This will build the individual components in to a distribution binary only, binary with tests and source. To build the binary with tests in the distribution directory use profile tests so $mvn -Ptests In all cases the dependencies have been reduced and correctly assigned to the correct scope. There were a couple of cases where a runtime dependency of one of our dependencies didn't make it in to the distributions so they were added explicitly. This should be looked at again. Specifics Broker: Three new assembly files are located in the distribution/src directory (broker-bin taking heavily from distribution - bin) these generate the three distributions. SimpleFilterManager.java removed slf4j reference broker/test directory removed as it was left over from the ant system Client: Added intelij files to ignore list. client/dist deleted as it was left over from the ant system client/distribution as for the broker three assemblies matching the three distributions Renamed log4j.properties to client.log4j to prevent issues when it is packaged into the jar. Removed old_test ping and requestreply1 as they have been moved to perftests Moved broker back to a test dependency. This required modifying AMQSession.java to remove reference to ExchangeBoundHandler.java Common: Added more common dependencies from broker and client here. Distribution: Reduced the assemblies to only build the full project binary, binary with tests and source. Perftests: Added building of perftests distribution so this can be bundled separately. Resources: Moved Resources from distribution project to root level this allows them to be easily incorporated in all projects. Systests: as with perftests now builds a separate distribution that can be used on an existing installation. renamed log4j.properties to systests.log4j to prevent logging problems. As systests is a module having the code under the test folder isn't accurate as it is the main code. Test code here should be testing the tests :D !! git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495455 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src/test')
-rw-r--r--java/systests/src/test/java/log4j.properties28
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java83
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java196
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java274
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java129
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java101
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java149
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java110
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java297
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java205
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java337
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java258
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java51
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java179
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java76
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java135
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java102
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java144
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java123
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java55
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java123
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java105
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java50
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java297
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java66
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java57
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java107
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java52
-rw-r--r--java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java52
29 files changed, 0 insertions, 3941 deletions
diff --git a/java/systests/src/test/java/log4j.properties b/java/systests/src/test/java/log4j.properties
deleted file mode 100644
index 6d596d1d19..0000000000
--- a/java/systests/src/test/java/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-log4j.rootLogger=${root.logging.level}
-
-
-log4j.logger.org.apache.qpid=${amqj.logging.level}, console
-log4j.additivity.org.apache.qpid=false
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=all
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
deleted file mode 100644
index ec27b8a191..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server;
-
-import junit.framework.TestCase;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.ManagedBroker;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQBrokerManagerMBeanTest extends TestCase
-{
- private QueueRegistry _queueRegistry;
- private ExchangeRegistry _exchangeRegistry;
-
- public void testExchangeOperations() throws Exception
- {
- String exchange1 = "testExchange1_" + System.currentTimeMillis();
- String exchange2 = "testExchange2_" + System.currentTimeMillis();
- String exchange3 = "testExchange3_" + System.currentTimeMillis();
-
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null);
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null);
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
-
- ManagedBroker mbean = new AMQBrokerManagerMBean();
- mbean.createNewExchange(exchange1,"direct",false, false);
- mbean.createNewExchange(exchange2,"topic",false, false);
- mbean.createNewExchange(exchange3,"headers",false, false);
-
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null);
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null);
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) != null);
-
- mbean.unregisterExchange(exchange1);
- mbean.unregisterExchange(exchange2);
- mbean.unregisterExchange(exchange3);
-
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null);
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null);
- assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
- }
-
- public void testQueueOperations() throws Exception
- {
- String queueName = "testQueue_" + System.currentTimeMillis();
- ManagedBroker mbean = new AMQBrokerManagerMBean();
-
- assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
-
- mbean.createNewQueue(queueName, false, "test", true);
- assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null);
-
- mbean.deleteQueue(queueName);
- assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
deleted file mode 100644
index 9d3c588fc8..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.ack;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-
-import java.util.*;
-
-public class TxAckTest extends TestCase
-{
- private Scenario individual;
- private Scenario multiple;
- private Scenario combined;
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- //ack only 5th msg
- individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
- individual.update(5, false);
-
- //ack all up to and including 5th msg
- multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
- multiple.update(5, true);
-
- //leave only 8th and 9th unacked
- combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
- combined.update(3, false);
- combined.update(5, true);
- combined.update(7, true);
- combined.update(2, true);//should be ignored
- combined.update(1, false);//should be ignored
- combined.update(10, false);
- }
-
- public void testPrepare() throws AMQException
- {
- individual.prepare();
- multiple.prepare();
- combined.prepare();
- }
-
- public void testUndoPrepare() throws AMQException
- {
- individual.undoPrepare();
- multiple.undoPrepare();
- combined.undoPrepare();
- }
-
- public void testCommit() throws AMQException
- {
- individual.commit();
- multiple.commit();
- combined.commit();
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TxAckTest.class);
- }
-
- private class Scenario
- {
- private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
- private final TxAck _op = new TxAck(_map);
- private final List<Long> _acked;
- private final List<Long> _unacked;
- private StoreContext _storeContext = new StoreContext();
-
- Scenario(int messageCount, List<Long> acked, List<Long> unacked)
- {
- TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
- _storeContext, null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
- for(int i = 0; i < messageCount; i++)
- {
- long deliveryTag = i + 1;
- // TODO: fix hardcoded protocol version data
- TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
- (byte)0,
- null,
- false,
- false,
- null,
- 0), txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
- }
- _acked = acked;
- _unacked = unacked;
- }
-
- void update(long deliverytag, boolean multiple)
- {
- _op.update(deliverytag, multiple);
- }
-
- private void assertCount(List<Long> tags, int expected)
- {
- for(long tag : tags)
- {
- UnacknowledgedMessage u = _map.get(tag);
- assertTrue("Message not found for tag " + tag, u != null);
- ((TestMessage) u.message).assertCountEquals(expected);
- }
- }
-
- void prepare() throws AMQException
- {
- _op.consolidate();
- _op.prepare(_storeContext);
-
- assertCount(_acked, -1);
- assertCount(_unacked, 0);
-
- }
- void undoPrepare()
- {
- _op.consolidate();
- _op.undoPrepare();
-
- assertCount(_acked, 1);
- assertCount(_unacked, 0);
- }
-
- void commit()
- {
- _op.consolidate();
- _op.commit(_storeContext);
-
-
- //check acked messages are removed from map
- Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
- keys.retainAll(_acked);
- assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
- //check unacked messages are still in map
- keys = new HashSet<Long>(_unacked);
- keys.removeAll(_map.getDeliveryTags());
- assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
- }
- }
-
- private class TestMessage extends AMQMessage
- {
- private final long _tag;
- private int _count;
-
- TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
- {
- super(messageId, publishBody, txnContext);
- _tag = tag;
- }
-
- public void incrementReference()
- {
- _count++;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
-
- void assertCountEquals(int expected)
- {
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
deleted file mode 100644
index ea576a5661..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.log4j.Logger;
-
-import java.util.*;
-
-public class AbstractHeadersExchangeTestBase extends TestCase
-{
- private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
-
- private final HeadersExchange exchange = new HeadersExchange();
- protected final Set<TestQueue> queues = new HashSet<TestQueue>();
-
- /**
- * Not used in this test, just there to stub out the routing calls
- */
- private MessageStore _store = new MemoryMessageStore();
-
- private StoreContext _storeContext = new StoreContext();
-
- private MessageHandleFactory _handleFactory = new MessageHandleFactory();
-
- private int count;
-
- public void testDoNothing()
- {
- // this is here only to make junit under Eclipse happy
- }
-
- protected TestQueue bindDefault(String... bindings) throws AMQException
- {
- return bind("Queue" + (++count), bindings);
- }
-
- protected TestQueue bind(String queueName, String... bindings) throws AMQException
- {
- return bind(queueName, getHeaders(bindings));
- }
-
- protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
- {
- return bind(new TestQueue(new AMQShortString(queue)), bindings);
- }
-
- protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
- {
- return bind(queue, getHeaders(bindings));
- }
-
- protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
- {
- queues.add(queue);
- exchange.registerQueue(null, queue, bindings);
- return queue;
- }
-
-
- protected void route(Message m) throws AMQException
- {
- m.route(exchange);
- m.routingComplete(_store, _storeContext, _handleFactory);
- }
-
- protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, false, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, expectReturn, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
- {
- try
- {
- route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
- for (TestQueue q : queues)
- {
- if (expected.contains(q))
- {
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
- }
- }
- }
-
- catch (NoRouteException ex)
- {
- assertTrue("Expected "+m+" not to be returned",expectReturn);
- }
-
- }
-
- static FieldTable getHeaders(String... entries)
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- for (String s : entries)
- {
- String[] parts = s.split("=", 2);
- headers.setObject(parts[0], parts.length > 1 ? parts[1] : "");
- }
- return headers;
- }
-
- static BasicPublishBody getPublishRequest(String id)
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0);
-
- return request;
- }
-
- static ContentHeaderBody getContentHeader(FieldTable headers)
- {
- ContentHeaderBody header = new ContentHeaderBody();
- header.properties = getProperties(headers);
- return header;
- }
-
- static BasicContentHeaderProperties getProperties(FieldTable headers)
- {
- BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
- properties.setHeaders(headers);
- return properties;
- }
-
- static class TestQueue extends AMQQueue
- {
- final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
-
- public TestQueue(AMQShortString name) throws AMQException
- {
- super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getQueueRegistry());
- }
-
- /**
- * We override this method so that the default behaviour, which attempts to use a delivery manager, is
- * not invoked. It is unnecessary since for this test we only care to know whether the message was
- * sent to the queue; the queue processing logic is not being tested.
- * @param msg
- * @throws AMQException
- */
- public void process(StoreContext context, AMQMessage msg) throws AMQException
- {
- messages.add(new HeadersExchangeTest.Message(msg));
- }
- }
-
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends AMQMessage
- {
- private static MessageStore _messageStore = new SkeletonMessageStore();
-
- private static StoreContext _storeContext = new StoreContext();
-
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
-
- Message(String id, String... headers) throws AMQException
- {
- this(id, getHeaders(headers));
- }
-
- Message(String id, FieldTable headers) throws AMQException
- {
- this(getPublishRequest(id), getContentHeader(headers), null);
- }
-
- private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
- {
- super(_messageStore.getNewMessageId(), publish, _txnContext, header);
- }
-
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg);
- }
-
- void route(Exchange exchange) throws AMQException
- {
- exchange.route(this);
- }
-
- boolean isInQueue(TestQueue queue)
- {
- return queue.messages.contains(this);
- }
-
- public int hashCode()
- {
- return getKey().hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
- }
-
- private boolean equals(HeadersExchangeTest.Message m)
- {
- return getKey().equals(m.getKey());
- }
-
- public String toString()
- {
- return getKey().toString();
- }
-
- private Object getKey()
- {
- try
- {
- return getPublishBody().routingKey;
- }
- catch (AMQException e)
- {
- _log.error("Error getting routing key: " + e, e);
- return null;
- }
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
deleted file mode 100644
index 39c47118da..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import junit.framework.TestCase;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-import java.util.ArrayList;
-
-/**
- * Unit test class for testing different Exchange MBean operations
- */
-public class ExchangeMBeanTest extends TestCase
-{
- private AMQQueue _queue;
- private QueueRegistry _queueRegistry;
-
- /**
- * Test for direct exchange mbean
- * @throws Exception
- */
-
- public void testDirectExchangeMBean() throws Exception
- {
- DestNameExchange exchange = new DestNameExchange();
- exchange.initialise(ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
- ManagedObject managedObj = exchange.getManagedObject();
- ManagedExchange mbean = (ManagedExchange)managedObj;
-
- mbean.createNewBinding(_queue.getName().toString(), "binding1");
- mbean.createNewBinding(_queue.getName().toString(), "binding2");
-
- TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
- assertTrue(list.size() == 2);
-
- // test general exchange properties
- assertEquals(mbean.getName(), "amq.direct");
- assertEquals(mbean.getExchangeType(), "direct");
- assertTrue(mbean.getTicketNo() == 0);
- assertTrue(!mbean.isDurable());
- assertTrue(mbean.isAutoDelete());
- }
-
- /**
- * Test for "topic" exchange mbean
- * @throws Exception
- */
-
- public void testTopicExchangeMBean() throws Exception
- {
- DestWildExchange exchange = new DestWildExchange();
- exchange.initialise(ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
- ManagedObject managedObj = exchange.getManagedObject();
- ManagedExchange mbean = (ManagedExchange)managedObj;
-
- mbean.createNewBinding(_queue.getName().toString(), "binding1");
- mbean.createNewBinding(_queue.getName().toString(), "binding2");
-
- TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
- assertTrue(list.size() == 2);
-
- // test general exchange properties
- assertEquals(mbean.getName(), "amq.topic");
- assertEquals(mbean.getExchangeType(), "topic");
- assertTrue(mbean.getTicketNo() == 0);
- assertTrue(!mbean.isDurable());
- assertTrue(mbean.isAutoDelete());
- }
-
- /**
- * Test for "Headers" exchange mbean
- * @throws Exception
- */
-
- public void testHeadersExchangeMBean() throws Exception
- {
- HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
- ManagedObject managedObj = exchange.getManagedObject();
- ManagedExchange mbean = (ManagedExchange)managedObj;
-
- mbean.createNewBinding(_queue.getName().toString(), "key1=binding1,key2=binding2");
- mbean.createNewBinding(_queue.getName().toString(), "key3=binding3");
-
- TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
- assertTrue(list.size() == 2);
-
- // test general exchange properties
- assertEquals(mbean.getName(), "amq.match");
- assertEquals(mbean.getExchangeType(), "headers");
- assertTrue(mbean.getTicketNo() == 0);
- assertTrue(!mbean.isDurable());
- assertTrue(mbean.isAutoDelete());
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _queueRegistry);
- _queueRegistry.registerQueue(_queue);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
deleted file mode 100644
index c01241d11d..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.framing.BasicPublishBody;
-
-public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
-{
- protected void setUp() throws Exception
- {
- super.setUp();
- ApplicationRegistry.initialise(new TestApplicationRegistry());
- }
-
- public void testSimple() throws AMQException
- {
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
- TestQueue q4 = bindDefault("F0001=Bear");
- TestQueue q5 = bindDefault("F0000", "F0001");
- TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
- TestQueue q7 = bindDefault("F0000", "F0001=Bear");
- TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
-
- routeAndTest(new Message("Message1", "F0000"), q1);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
- q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message("Message6", "F0002"));
-
- Message m7 = new Message("Message7", "XXXXX");
-
- BasicPublishBody pb7 = m7.getPublishBody();
- pb7.mandatory = true;
- routeAndTest(m7,true);
-
- Message m8 = new Message("Message8", "F0000");
- BasicPublishBody pb8 = m8.getPublishBody();
- pb8.mandatory = true;
- routeAndTest(m8,false,q1);
-
-
- }
-
- public void testAny() throws AMQException
- {
- TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
- TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
- TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
- TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
- TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
-
- routeAndTest(new Message("Message1", "F0000"), q1, q3);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message6", "F0002"));
- }
-
- public void testMandatory() throws AMQException
- {
- bindDefault("F0000");
- Message m1 = new Message("Message1", "XXXXX");
- Message m2 = new Message("Message2", "F0000");
- BasicPublishBody pb1 = m1.getPublishBody();
- pb1.mandatory = true;
- BasicPublishBody pb2 = m2.getPublishBody();
- pb2.mandatory = true;
- routeAndTest(m1,true);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(HeadersExchangeTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
deleted file mode 100644
index 546c61eda0..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package org.apache.qpid.server.exchange;
-
-import junit.framework.TestCase;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.client.*;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.FieldTable;
-
-import javax.jms.*;
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
-
-public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener
-{
- private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
-
- private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
-
- static
- {
- String workdir = System.getProperty("QPID_WORK");
- if (workdir == null || workdir.equals(""))
- {
- String tempdir = System.getProperty("java.io.tmpdir");
- System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
- System.setProperty("QPID_WORK", tempdir);
- }
-// DOMConfigurator.configure("../broker/etc/log4j.xml");
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- TransportConnection.createVMBroker(1);
- ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- TransportConnection.killAllVMBrokers();
- }
-
- /**
- * Tests that mandatory message which are not routable are returned to the producer
- *
- * @throws Exception
- */
- public void testReturnUnroutableMandatoryMessage() throws Exception
- {
- _bouncedMessageList.clear();
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
-
-
- AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
- FieldTable ft = new FieldTable();
- ft.setString("F1000", "1");
- MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
-
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
-
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
-
- con2.setExceptionListener(this);
- AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Need to start the "producer" connection in order to receive bounced messages
- _logger.info("Starting producer connection");
- con2.start();
-
-
- MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false);
- MessageProducer mandatoryProducer = producerSession.createProducer(queue);
-
- // First test - should neither be bounced nor routed
- _logger.info("Sending non-routable non-mandatory message");
- TextMessage msg1 = producerSession.createTextMessage("msg1");
- nonMandatoryProducer.send(msg1);
-
- // Second test - should be bounced
- _logger.info("Sending non-routable mandatory message");
- TextMessage msg2 = producerSession.createTextMessage("msg2");
- mandatoryProducer.send(msg2);
-
- // Third test - should be routed
- _logger.info("Sending routable message");
- TextMessage msg3 = producerSession.createTextMessage("msg3");
- msg3.setStringProperty("F1000", "1");
- mandatoryProducer.send(msg3);
-
-
- _logger.info("Starting consumer connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive(1000L);
-
- assertTrue("No message routed to receiver", tm != null);
- assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
-
- try
- {
- Thread.sleep(1000L);
- }
- catch (InterruptedException e)
- {
- ;
- }
-
- assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
- Message m = _bouncedMessageList.get(0);
- assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
-
-
- con.close();
- con2.close();
-
-
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class);
- }
-
- public void onException(JMSException jmsException)
- {
-
- Exception linkedException = jmsException.getLinkedException();
- if (linkedException instanceof AMQNoRouteException)
- {
- AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
- Message bounced = (Message) noRoute.getUndeliveredMessage();
- _bouncedMessageList.add(bounced);
- _logger.info("Caught expected NoRouteException");
- }
- else
- {
- _logger.warn("Caught exception on producer: ", jmsException);
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
deleted file mode 100644
index c2ac099855..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import junit.framework.TestCase;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.AMQException;
-
-import javax.management.JMException;
-
-/**
- * Test class to test MBean operations for AMQMinaProtocolSession.
- */
-public class AMQProtocolSessionMBeanTest extends TestCase
-{
- private IoSession _mockIOSession;
- private MessageStore _messageStore = new SkeletonMessageStore();
- private AMQMinaProtocolSession _protocolSession;
- private AMQChannel _channel;
- private QueueRegistry _queueRegistry;
- private ExchangeRegistry _exchangeRegistry;
- private AMQProtocolSessionMBean _mbean;
-
- public void testChannels() throws Exception
- {
- // check the channel count is correct
- int channelCount = _mbean.channels().size();
- assertTrue(channelCount == 1);
- _protocolSession.addChannel(new AMQChannel(2, _messageStore, null));
- channelCount = _mbean.channels().size();
- assertTrue(channelCount == 2);
-
- // general properties test
- _mbean.setMaximumNumberOfChannels(1000L);
- assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
-
- // check APIs
- AMQChannel channel3 = new AMQChannel(3, _messageStore, null);
- channel3.setLocalTransactional();
- _protocolSession.addChannel(channel3);
- _mbean.rollbackTransactions(2);
- _mbean.rollbackTransactions(3);
- _mbean.commitTransactions(2);
- _mbean.commitTransactions(3);
-
- // This should throw exception, because the channel does't exist
- try
- {
- _mbean.commitTransactions(4);
- fail();
- }
- catch (JMException ex)
- {
- System.out.println("expected exception is thrown :" + ex.getMessage());
- }
-
- // check if closing of session works
- _protocolSession.addChannel(new AMQChannel(5, _messageStore, null));
- _mbean.closeConnection();
- try
- {
- channelCount = _mbean.channels().size();
- assertTrue(channelCount == 0);
- // session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(6, _messageStore, null));
- fail();
- }
- catch(AMQException ex)
- {
- System.out.println("expected exception is thrown :" + ex.getMessage());
- }
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _channel = new AMQChannel(1, _messageStore, null);
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory());
- _mockIOSession = new MockIoSession();
- _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true));
- _protocolSession.addChannel(_channel);
- _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
deleted file mode 100644
index cf6366b513..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.DefaultCloseFuture;
-import org.apache.mina.common.support.DefaultWriteFuture;
-
-import java.net.SocketAddress;
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-public class MockIoSession implements IoSession
-{
- private AMQProtocolSession _protocolSession;
-
- /**
- * Stores the last response written
- */
- private Object _lastWrittenObject;
-
- private boolean _closing;
-
- public MockIoSession()
- {
- }
-
- public Object getLastWrittenObject()
- {
- return _lastWrittenObject;
- }
-
- public IoService getService()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return null;
- }
-
- public IoHandler getHandler()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoSessionConfig getConfig()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoFilterChain getFilterChain()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public WriteFuture write(Object message)
- {
- WriteFuture wf = new DefaultWriteFuture(null);
- _lastWrittenObject = message;
- return wf;
- }
-
- public CloseFuture close()
- {
- _closing = true;
- CloseFuture cf = new DefaultCloseFuture(null);
- cf.setClosed();
- return cf;
- }
-
- public Object getAttachment()
- {
- return _protocolSession;
- }
-
- public Object setAttachment(Object attachment)
- {
- Object current = _protocolSession;
- _protocolSession = (AMQProtocolSession) attachment;
- return current;
- }
-
- public Object getAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key, Object value)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object removeAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean containsAttribute(String key)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Set getAttributeKeys()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TransportType getTransportType()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isConnected()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isClosing()
- {
- return _closing;
- }
-
- public CloseFuture getCloseFuture()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getLocalAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getServiceAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getWriteTimeout()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TrafficMask getTrafficMask()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWrittenBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadMessages()
- {
- return 0L;
- }
-
- public long getWrittenMessages()
- {
- return 0L;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteBytes()
- {
- return 0; //TODO
- }
-
- public long getCreationTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIoTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastReadTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastWriteTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
deleted file mode 100644
index 91a26632a1..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-
-import javax.management.JMException;
-import java.util.LinkedList;
-import java.util.HashSet;
-
-/**
- * Test class to test AMQQueueMBean attribtues and operations
- */
-public class AMQQueueMBeanTest extends TestCase
-{
- private AMQQueue _queue;
- private AMQQueueMBean _queueMBean;
- private QueueRegistry _queueRegistry;
- private MessageStore _messageStore = new SkeletonMessageStore();
- private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
- private MockProtocolSession _protocolSession;
- private AMQChannel _channel;
-
- public void testMessageCount() throws Exception
- {
- int messageCount = 10;
- sendMessages(messageCount);
- assertTrue(_queueMBean.getMessageCount() == messageCount);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- assertTrue(_queueMBean.getQueueDepth() == 10);
-
- _queueMBean.deleteMessageFromTop();
- assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-
- _queueMBean.clearQueue();
- assertTrue(_queueMBean.getMessageCount() == 0);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- }
-
- public void testConsumerCount() throws Exception
- {
- SubscriptionManager mgr = _queue.getSubscribers();
- assertFalse(mgr.hasActiveSubscribers());
- assertTrue(_queueMBean.getActiveConsumerCount() == 0);
-
- _channel = new AMQChannel(1, _messageStore, null);
- _protocolSession = new MockProtocolSession(_messageStore);
- _protocolSession.addChannel(_channel);
-
- _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null);
- assertTrue(_queueMBean.getActiveConsumerCount() == 1);
-
- SubscriptionSet _subscribers = (SubscriptionSet) mgr;
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
- _subscribers.addSubscriber(s1);
- _subscribers.addSubscriber(s2);
- assertTrue(_queueMBean.getActiveConsumerCount() == 3);
- assertTrue(_queueMBean.getConsumerCount() == 3);
-
- s1.setSuspended(true);
- assertTrue(_queueMBean.getActiveConsumerCount() == 2);
- assertTrue(_queueMBean.getConsumerCount() == 3);
- }
-
- public void testGeneralProperties()
- {
- _queueMBean.setMaximumMessageCount(50000);
- _queueMBean.setMaximumMessageSize(2000l);
- _queueMBean.setMaximumQueueDepth(1000l);
-
- assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
- assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
- assertTrue(_queueMBean.getMaximumQueueDepth() == 1000);
-
- assertTrue(_queueMBean.getName().equals("testQueue"));
- assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
- assertFalse(_queueMBean.isAutoDelete());
- assertFalse(_queueMBean.isDurable());
- }
-
- public void testExceptions() throws Exception
- {
- try
- {
- _queueMBean.viewMessages(0, 3);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- try
- {
- _queueMBean.viewMessages(2, 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- try
- {
- _queueMBean.viewMessages(-1, 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- AMQMessage msg = message(false);
- long id = msg.getMessageId();
- _queue.clearQueue(_storeContext);
- _queue.process(_storeContext, msg);
- msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- _queueMBean.viewMessageContent(id);
- try
- {
- _queueMBean.viewMessageContent(id + 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
- }
-
- private AMQMessage message(boolean immediate) throws AMQException
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- null,
- immediate,
- false,
- null,
- 0);
-
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1000; // in bytes
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _queueRegistry = new DefaultQueueRegistry();
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _queueRegistry);
- _queueMBean = new AMQQueueMBean(_queue);
- }
-
- private void sendMessages(int messageCount) throws AMQException
- {
- AMQMessage[] messages = new AMQMessage[messageCount];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message(false);
- }
- for (int i = 0; i < messageCount; i++)
- {
- _queue.process(_storeContext, messages[i]);
- }
-
- for (int i = 0; i < messages.length; i++)
- {
- messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
deleted file mode 100644
index d10d5acdd0..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.HashSet;
-
-/**
- * Tests that acknowledgements are handled correctly.
- */
-public class AckTest extends TestCase
-{
- private static final Logger _log = Logger.getLogger(AckTest.class);
-
- private SubscriptionImpl _subscription;
-
- private MockProtocolSession _protocolSession;
-
- private TestableMemoryMessageStore _messageStore;
-
- private StoreContext _storeContext = new StoreContext();
-
- private AMQChannel _channel;
-
- private SubscriptionSet _subscriptionManager;
-
- private AMQQueue _queue;
-
- private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag");
-
- public AckTest() throws Exception
- {
- ApplicationRegistry.initialise(new TestApplicationRegistry());
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _messageStore = new TestableMemoryMessageStore();
- _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
- _protocolSession = new MockProtocolSession(_messageStore);
- _protocolSession.addChannel(_channel);
- _subscriptionManager = new SubscriptionSet();
- _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, new DefaultQueueRegistry(), _subscriptionManager);
- }
-
- private void publishMessages(int count) throws AMQException
- {
- publishMessages(count, false);
- }
-
- private void publishMessages(int count, boolean persistent) throws AMQException
- {
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
- MessageHandleFactory factory = new MessageHandleFactory();
- for (int i = 1; i <= count; i++)
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publishBody = new BasicPublishBody((byte)8,
- (byte)0,
- new AMQShortString("someExchange"),
- false,
- false,
- new AMQShortString("rk"),
- 0);
- AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
- if (persistent)
- {
- BasicContentHeaderProperties b = new BasicContentHeaderProperties();
- //This is DeliveryMode.PERSISTENT
- b.setDeliveryMode((byte) 2);
- ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
- msg.setContentHeaderBody(cb);
- }
- else
- {
- msg.setContentHeaderBody(new ContentHeaderBody());
- }
- // we increment the reference here since we are not delivering the messaging to any queues, which is where
- // the reference is normally incremented. The test is easier to construct if we have direct access to the
- // subscription
- msg.incrementReference();
- msg.routingComplete(_messageStore, _storeContext, factory);
- // we manually send the message to the subscription
- _subscription.send(msg, _queue);
- }
- }
-
- /**
- * Tests that the acknowledgements are correctly associated with a channel and
- * order is preserved when acks are enabled
- */
- public void testAckChannelAssociationTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- final int msgCount = 10;
- publishMessages(msgCount, true);
-
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i);
- i++;
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
- }
-
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
- }
-
- /**
- * Tests that in no-ack mode no messages are retained
- */
- public void testNoAckMode() throws AMQException
- {
- // false arg means no acks expected
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- }
-
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
- public void testSingleAckReceivedTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(5, false);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount - 1);
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
- // 5 is the delivery tag of the message that *should* be removed
- if (++i == 5)
- {
- ++i;
- }
- }
- }
-
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
- public void testMultiAckReceivedTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(5, true);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 5);
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i + 5);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
- ++i;
- }
- }
-
- /**
- * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
- */
- public void testMultiAckAllReceivedTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(0, true);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 0);
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i + 5);
- UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
- ++i;
- }
- }
-
- public void testPrefetchHighLow() throws AMQException
- {
- int lowMark = 5;
- int highMark = 10;
-
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- _channel.setPrefetchLowMarkCount(lowMark);
- _channel.setPrefetchHighMarkCount(highMark);
-
- assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
- assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
-
- publishMessages(highMark);
-
- // at this point we should have sent out only highMark messages
- // which have not bee received so will be queued up in the channel
- // which should be suspended
- assertTrue(_subscription.isSuspended());
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == highMark);
-
- //acknowledge messages so we are just above lowMark
- _channel.acknowledgeMessage(lowMark - 1, true);
-
- //we should still be suspended
- assertTrue(_subscription.isSuspended());
- assertTrue(map.size() == lowMark + 1);
-
- //acknowledge one more message
- _channel.acknowledgeMessage(lowMark, true);
-
- //and suspension should be lifted
- assertTrue(!_subscription.isSuspended());
-
- //pubilsh more msgs so we are just below the limit
- publishMessages(lowMark - 1);
-
- //we should not be suspended
- assertTrue(!_subscription.isSuspended());
-
- //acknowledge all messages
- _channel.acknowledgeMessage(0, true);
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- _log.error("Error: " + e, e);
- }
- //map will be empty
- assertTrue(map.size() == 0);
- }
-
- public void testPrefetch() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
- _channel.setPrefetchCount(5);
-
- assertTrue(_channel.getPrefetchCount() == 5);
-
- final int msgCount = 5;
- publishMessages(msgCount);
-
- // at this point we should have sent out only 5 messages with a further 5 queued
- // up in the channel which should now be suspended
- assertTrue(_subscription.isSuspended());
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 5);
- _channel.acknowledgeMessage(5, true);
- assertTrue(!_subscription.isSuspended());
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- _log.error("Error: " + e, e);
- }
- assertTrue(map.size() == 0);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(AckTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
deleted file mode 100644
index e428b9ef60..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-
-import java.util.*;
-import java.util.concurrent.Executor;
-
-/**
- * Tests delivery in the face of concurrent incoming _messages, subscription alterations
- * and attempts to asynchronously process queued _messages.
- */
-public class ConcurrencyTest extends MessageTestHelper
-{
- private final Random random = new Random();
-
- private final int numMessages = 1000;
-
- private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
- private final Set<Subscription> _active = new HashSet<Subscription>();
- private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
- private int next = 0;//index to next message to send
- private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
- private final Executor _executor = new OnCurrentThreadExecutor();
- private final List<Thread> _threads = new ArrayList<Thread>();
-
- private final SubscriptionSet _subscriptionMgr = new SubscriptionSet();
- private final DeliveryManager _deliveryMgr;
-
- private boolean isComplete;
- private boolean failed;
-
- public ConcurrencyTest() throws Exception
- {
- _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
- new DefaultQueueRegistry()));
- }
-
- public void testConcurrent1() throws InterruptedException, AMQException
- {
- initSubscriptions(10);
- initMessages(numMessages);
- initThreads(1, 4, 4, 4);
- doRun();
- check();
- }
-
- public void testConcurrent2() throws InterruptedException, AMQException
- {
- initSubscriptions(10);
- initMessages(numMessages);
- initThreads(4, 2, 2, 2);
- doRun();
- check();
- }
-
- void check()
- {
- assertFalse("Failed", failed);
-
- _deliveryMgr.processAsync(_executor);
-
- assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size());
- for(int i = 0; i < _messages.size(); i++)
- {
- assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i));
- }
- }
-
- void initSubscriptions(int subscriptions)
- {
- for(int i = 0; i < subscriptions; i++)
- {
- _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received));
- }
- }
-
- void initMessages(int messages) throws AMQException
- {
- for(int i = 0; i < messages; i++)
- {
- _messages.add(message());
- }
- }
-
- void initThreads(int senders, int subscribers, int suspenders, int processors)
- {
- addThreads(senders, senders == 1 ? new Sender() : new OrderedSender());
- addThreads(subscribers, new Subscriber());
- addThreads(suspenders, new Suspender());
- addThreads(processors, new Processor());
- }
-
- void addThreads(int count, Runnable runner)
- {
- for(int i = 0; i < count; i++)
- {
- _threads.add(new Thread(runner, runner.toString()));
- }
- }
-
- void doRun() throws InterruptedException
- {
- for(Thread t : _threads)
- {
- t.start();
- }
-
- for(Thread t : _threads)
- {
- t.join();
- }
- }
-
- private void toggle(Subscription s)
- {
- synchronized (_active)
- {
- if (_active.contains(s))
- {
- _active.remove(s);
- Subscription result = _subscriptionMgr.removeSubscriber(s);
- assertTrue("Removed subscription " + result + " but trying to remove subscription " + s,
- result != null && result.equals(s));
- }
- else
- {
- _active.add(s);
- _subscriptionMgr.addSubscriber(s);
- }
- }
- }
-
- private AMQMessage nextMessage()
- {
- synchronized (_messages)
- {
- if (next < _messages.size())
- {
- return _messages.get(next++);
- }
- else
- {
- if (!_deliveryMgr.hasQueuedMessages()) {
- isComplete = true;
- }
- return null;
- }
- }
- }
-
- private boolean randomBoolean()
- {
- return random.nextBoolean();
- }
-
- private SubscriptionTestHelper randomSubscriber()
- {
- return _subscribers.get(random.nextInt(_subscribers.size()));
- }
-
- private class Sender extends Runner
- {
- void doRun() throws Throwable
- {
- AMQMessage msg = nextMessage();
- if (msg != null)
- {
- _deliveryMgr.deliver(null, new AMQShortString(toString()), msg);
- }
- }
- }
-
- private class OrderedSender extends Sender
- {
- synchronized void doRun() throws Throwable
- {
- super.doRun();
- }
- }
-
- private class Suspender extends Runner
- {
- void doRun() throws Throwable
- {
- randomSubscriber().setSuspended(randomBoolean());
- }
- }
-
- private class Subscriber extends Runner
- {
- void doRun() throws Throwable
- {
- toggle(randomSubscriber());
- }
- }
-
- private class Processor extends Runner
- {
- void doRun() throws Throwable
- {
- _deliveryMgr.processAsync(_executor);
- }
- }
-
- private abstract class Runner implements Runnable
- {
- public void run()
- {
- try
- {
- while (!stop())
- {
- doRun();
- }
- }
- catch (Throwable t)
- {
- failed = true;
- t.printStackTrace();
- }
- }
-
- abstract void doRun() throws Throwable;
-
- boolean stop()
- {
- return isComplete || failed;
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ConcurrencyTest.class);
- }
-
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
deleted file mode 100644
index 1943532a51..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-
-public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
-{
- public ConcurrentDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","true");
- _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ConcurrentDeliveryManagerTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
deleted file mode 100644
index d88614298f..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-
-import junit.framework.TestSuite;
-
-abstract public class DeliveryManagerTest extends MessageTestHelper
-{
- protected final SubscriptionSet _subscriptions = new SubscriptionSet();
- protected DeliveryManager _mgr;
- protected StoreContext _storeContext = new StoreContext();
- private static final AMQShortString DEFAULT_QUEUE_NAME = new AMQShortString("Me");
-
- public DeliveryManagerTest() throws Exception
- {
- }
-
- public void testStartInQueueingMode() throws AMQException
- {
- AMQMessage[] messages = new AMQMessage[10];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message();
- }
- int batch = messages.length / 2;
-
- for (int i = 0; i < batch; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
- }
-
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("2");
- _subscriptions.addSubscriber(s1);
- _subscriptions.addSubscriber(s2);
-
- for (int i = batch; i < messages.length; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
- }
-
- assertTrue(s1.getMessages().isEmpty());
- assertTrue(s2.getMessages().isEmpty());
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
-
- assertEquals(messages.length / 2, s1.getMessages().size());
- assertEquals(messages.length / 2, s2.getMessages().size());
-
- for (int i = 0; i < messages.length; i++)
- {
- if (i % 2 == 0)
- {
- assertTrue(s1.getMessages().get(i / 2) == messages[i]);
- }
- else
- {
- assertTrue(s2.getMessages().get(i / 2) == messages[i]);
- }
- }
- }
-
- public void testStartInDirectMode() throws AMQException
- {
- AMQMessage[] messages = new AMQMessage[10];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message();
- }
- int batch = messages.length / 2;
-
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
- _subscriptions.addSubscriber(s1);
-
- for (int i = 0; i < batch; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
- }
-
- assertEquals(batch, s1.getMessages().size());
- for (int i = 0; i < batch; i++)
- {
- assertTrue(messages[i] == s1.getMessages().get(i));
- }
- s1.getMessages().clear();
- assertEquals(0, s1.getMessages().size());
-
- s1.setSuspended(true);
- for (int i = batch; i < messages.length; i++)
- {
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
- }
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
- assertEquals(0, s1.getMessages().size());
- s1.setSuspended(false);
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
- assertEquals(messages.length - batch, s1.getMessages().size());
-
- for (int i = batch; i < messages.length; i++)
- {
- assertTrue(messages[i] == s1.getMessages().get(i - batch));
- }
-
- }
-
- public void testNoConsumers() throws AMQException
- {
- try
- {
- AMQMessage msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
- msg.checkDeliveredToConsumer();
- fail("expected exception did not occur");
- }
- catch (NoConsumersException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NoConsumersException, got " + e);
- }
- }
-
- public void testNoActiveConsumers() throws AMQException
- {
- try
- {
- SubscriptionTestHelper s = new SubscriptionTestHelper("A");
- _subscriptions.addSubscriber(s);
- s.setSuspended(true);
- AMQMessage msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
- msg.checkDeliveredToConsumer();
- fail("expected exception did not occur");
- }
- catch (NoConsumersException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NoConsumersException, got " + e);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
- return suite;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
deleted file mode 100644
index 6c48bb2bf4..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.AMQException;
-
-import junit.framework.TestCase;
-
-import java.util.LinkedList;
-import java.util.HashSet;
-
-class MessageTestHelper extends TestCase
-{
- private final MessageStore _messageStore = new SkeletonMessageStore();
-
- private final StoreContext _storeContext = new StoreContext();
-
- private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
-
- MessageTestHelper() throws Exception
- {
- ApplicationRegistry.initialise(new TestApplicationRegistry());
- }
-
- AMQMessage message() throws AMQException
- {
- return message(false);
- }
-
- AMQMessage message(boolean immediate) throws AMQException
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- null,
- immediate,
- false,
- null,
- 0);
-
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
- new ContentHeaderBody());
- }
-
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
deleted file mode 100644
index 3586749f53..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
-
-import javax.security.sasl.SaslServer;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A protocol session that can be used for testing purposes.
- */
-public class MockProtocolSession implements AMQProtocolSession
-{
- private MessageStore _messageStore;
-
- private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
-
- public MockProtocolSession(MessageStore messageStore)
- {
- _messageStore = messageStore;
- }
-
- public void dataBlockReceived(AMQDataBlock message) throws Exception
- {
- }
-
- public void writeFrame(AMQDataBlock frame)
- {
- }
-
- public AMQShortString getContextKey()
- {
- return null;
- }
-
- public void setContextKey(AMQShortString contextKey)
- {
- }
-
- public AMQChannel getChannel(int channelId)
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new IllegalArgumentException("Invalid channel id: " + channelId);
- }
- else
- {
- return channel;
- }
- }
-
- public void addChannel(AMQChannel channel)
- {
- if (channel == null)
- {
- throw new IllegalArgumentException("Channel must not be null");
- }
- else
- {
- _channelMap.put(channel.getChannelId(), channel);
- }
- }
-
- public void closeChannel(int channelId) throws AMQException
- {
- }
-
- public void removeChannel(int channelId)
- {
- _channelMap.remove(channelId);
- }
-
- public void initHeartbeats(int delay)
- {
- }
-
- public void closeSession() throws AMQException
- {
- }
-
- public Object getKey()
- {
- return null;
- }
-
- public String getLocalFQDN()
- {
- return null;
- }
-
- public SaslServer getSaslServer()
- {
- return null;
- }
-
- public void setSaslServer(SaslServer saslServer)
- {
- }
-
- public FieldTable getClientProperties()
- {
- return null;
- }
-
- public void setClientProperties(FieldTable clientProperties)
- {
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
deleted file mode 100644
index d3ec3c11d4..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-
-public class SubscriptionManagerTest extends TestCase
-{
- private final SubscriptionSet mgr = new SubscriptionSet();
-
- public void testBasicSubscriptionManagement()
- {
- assertTrue(mgr.isEmpty());
- assertFalse(mgr.hasActiveSubscribers());
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
- mgr.addSubscriber(s1);
- assertFalse(mgr.isEmpty());
- assertTrue(mgr.hasActiveSubscribers());
-
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
- mgr.addSubscriber(s2);
-
- s2.setSuspended(true);
- assertFalse(mgr.isEmpty());
- assertTrue(mgr.hasActiveSubscribers());
- assertTrue(s2.isSuspended());
- assertFalse(s1.isSuspended());
-
- s1.setSuspended(true);
- assertFalse(mgr.hasActiveSubscribers());
-
- mgr.removeSubscriber(new SubscriptionTestHelper("S1"));
- assertFalse(mgr.isEmpty());
- mgr.removeSubscriber(new SubscriptionTestHelper("S2"));
- assertTrue(mgr.isEmpty());
- }
-
- public void testRoundRobin()
- {
- SubscriptionTestHelper a = new SubscriptionTestHelper("A");
- SubscriptionTestHelper b = new SubscriptionTestHelper("B");
- SubscriptionTestHelper c = new SubscriptionTestHelper("C");
- SubscriptionTestHelper d = new SubscriptionTestHelper("D");
- mgr.addSubscriber(a);
- mgr.addSubscriber(b);
- mgr.addSubscriber(c);
- mgr.addSubscriber(d);
-
- for (int i = 0; i < 3; i++)
- {
- assertEquals(a, mgr.nextSubscriber(null));
- assertEquals(b, mgr.nextSubscriber(null));
- assertEquals(c, mgr.nextSubscriber(null));
- assertEquals(d, mgr.nextSubscriber(null));
- }
-
- c.setSuspended(true);
-
- for (int i = 0; i < 3; i++)
- {
- assertEquals(a, mgr.nextSubscriber(null));
- assertEquals(b, mgr.nextSubscriber(null));
- assertEquals(d, mgr.nextSubscriber(null));
- }
-
- mgr.removeSubscriber(a);
- d.setSuspended(true);
- c.setSuspended(false);
- Subscription e = new SubscriptionTestHelper("D");
- mgr.addSubscriber(e);
-
- for (int i = 0; i < 3; i++)
- {
- assertEquals(b, mgr.nextSubscriber(null));
- assertEquals(c, mgr.nextSubscriber(null));
- assertEquals(e, mgr.nextSubscriber(null));
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(SubscriptionManagerTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
deleted file mode 100644
index bcf54693d3..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-
-public class SubscriptionSetTest extends TestCase
-{
- /**
- * A SubscriptionSet that counts the number of items scanned.
- */
- static class TestSubscriptionSet extends SubscriptionSet
- {
- private int scanned = 0;
-
- void resetScanned()
- {
- scanned = 0;
- }
-
- protected void subscriberScanned()
- {
- ++scanned;
- }
-
- int getScanned()
- {
- return scanned;
- }
- }
-
- final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1");
- final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2");
- final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3");
-
- final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true);
- final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true);
- final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true);
-
- public void testNextMessage()
- {
- SubscriptionSet ss = new SubscriptionSet();
- assertNull(ss.nextSubscriber(null));
- assertEquals(0, ss.getCurrentSubscriber());
-
- ss.addSubscriber(sub1);
- assertEquals(sub1, ss.nextSubscriber(null));
- assertEquals(1, ss.getCurrentSubscriber());
- assertEquals(sub1, ss.nextSubscriber(null));
- assertEquals(1, ss.getCurrentSubscriber());
-
- ss.addSubscriber(sub2);
- ss.addSubscriber(sub3);
-
- assertEquals(sub2, ss.nextSubscriber(null));
- assertEquals(2, ss.getCurrentSubscriber());
-
- assertEquals(sub3, ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
- }
-
- public void testNextMessageWhenAllSuspended()
- {
- SubscriptionSet ss = createAllSuspendedSubscriptionSet();
- assertNull(ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
-
- assertNull(ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
- }
-
- private TestSubscriptionSet createAllSuspendedSubscriptionSet()
- {
- TestSubscriptionSet ss = new TestSubscriptionSet();
- ss.addSubscriber(suspendedSub1);
- ss.addSubscriber(suspendedSub2);
- ss.addSubscriber(suspendedSub3);
- return ss;
- }
-
- public void testNextMessageAfterRemove()
- {
- SubscriptionSet ss = new SubscriptionSet();
- ss.addSubscriber(suspendedSub1);
- ss.addSubscriber(suspendedSub2);
- ss.addSubscriber(sub3);
- assertEquals(sub3, ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
-
- assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1));
-
- assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here.
- assertEquals(2, ss.getCurrentSubscriber());
- }
-
- public void testNextMessageOverScanning()
- {
- TestSubscriptionSet ss = new TestSubscriptionSet();
- SubscriptionTestHelper sub = new SubscriptionTestHelper("test");
- ss.addSubscriber(suspendedSub1);
- ss.addSubscriber(sub);
- ss.addSubscriber(suspendedSub3);
- assertEquals(sub, ss.nextSubscriber(null));
- assertEquals(2, ss.getCurrentSubscriber());
- assertEquals(2, ss.getScanned());
-
- ss.resetScanned();
- sub.setSuspended(true);
- assertNull(ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
- // Current implementation overscans by one item here.
- assertEquals(ss.size() + 1, ss.getScanned());
- }
-
- public void testNextMessageOverscanWorstCase() {
- TestSubscriptionSet ss = createAllSuspendedSubscriptionSet();
- ss.nextSubscriber(null);
- // Scans the subscriptions twice.
- assertEquals(ss.size() * 2, ss.getScanned());
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(SubscriptionSetTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
deleted file mode 100644
index fea3c93280..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-
-public class SubscriptionTestHelper implements Subscription
-{
- private final List<AMQMessage> messages;
- private final Object key;
- private boolean isSuspended;
-
- public SubscriptionTestHelper(Object key)
- {
- this(key, new ArrayList<AMQMessage>());
- }
-
- public SubscriptionTestHelper(final Object key, final boolean isSuspended)
- {
- this(key);
- setSuspended(isSuspended);
- }
-
- SubscriptionTestHelper(Object key, List<AMQMessage> messages)
- {
- this.key = key;
- this.messages = messages;
- }
-
- List<AMQMessage> getMessages()
- {
- return messages;
- }
-
- public void send(AMQMessage msg, AMQQueue queue)
- {
- messages.add(msg);
- }
-
- public void setSuspended(boolean suspended)
- {
- isSuspended = suspended;
- }
-
- public boolean isSuspended()
- {
- return isSuspended;
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- }
-
- public boolean hasFilters()
- {
- return false;
- }
-
- public boolean hasInterest(AMQMessage msg)
- {
- return true;
- }
-
- public Queue<AMQMessage> getPreDeliveryQueue()
- {
- return null;
- }
-
- public void enqueueForPreDelivery(AMQMessage msg)
- {
- //no-op
- }
-
- public boolean isAutoClose()
- {
- return false;
- }
-
- public void close()
- {
- //no-op
- }
-
- public boolean isBrowser()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int hashCode()
- {
- return key.hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key);
- }
-
- public String toString()
- {
- return key.toString();
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
deleted file mode 100644
index 3c5aab0911..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-
-import junit.framework.TestSuite;
-
-public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest
-{
- public SynchronizedDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","false");
- _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
- return suite;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
deleted file mode 100644
index 34f70bd2db..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A message store that does nothing. Designed to be used in tests that do not want to use any message store
- * functionality.
- */
-public class SkeletonMessageStore implements MessageStore
-{
- private final AtomicLong _messageId = new AtomicLong(1);
-
- public void configure(String base, Configuration config) throws Exception
- {
- }
-
- public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public void removeMessage(StoreContext s, long messageId)
- {
- }
-
- public void createQueue(AMQQueue queue) throws AMQException
- {
- }
-
- public void beginTran(StoreContext s) throws AMQException
- {
- }
-
- public boolean inTran(StoreContext sc)
- {
- return false;
- }
-
- public void commitTran(StoreContext storeContext) throws AMQException
- {
- }
-
- public void abortTran(StoreContext storeContext) throws AMQException
- {
- }
-
- public List<AMQQueue> createQueues() throws AMQException
- {
- return null;
- }
-
- public long getNewMessageId()
- {
- return _messageId.getAndIncrement();
- }
-
- public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException
- {
-
- }
-
- public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException
- {
-
- }
-
- public MessageMetaData getMessageMetaData(long messageId) throws AMQException
- {
- return null;
- }
-
- public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
- {
- return null;
- }
-
- public void removeQueue(AMQShortString name) throws AMQException
- {
-
- }
-
- public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
- {
-
- }
-
- public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
- {
-
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
deleted file mode 100644
index e2500d9865..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-
-/**
- * Tests that reference counting works correctly with AMQMessage and the message store
- */
-public class TestReferenceCounting extends TestCase
-{
- private TestableMemoryMessageStore _store;
-
- private StoreContext _storeContext = new StoreContext();
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _store = new TestableMemoryMessageStore();
- }
-
- /**
- * Check that when the reference count is decremented the message removes itself from the store
- */
- public void testMessageGetsRemoved() throws AMQException
- {
- createPersistentContentHeader();
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- null,
- false,
- false,
- null,
- 0),
- new NonTransactionalContext(_store, _storeContext, null, null, null),
- createPersistentContentHeader());
- message.incrementReference();
- // we call routing complete to set up the handle
- message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertTrue(_store.getMessageMetaDataMap().size() == 1);
- message.decrementReference(_storeContext);
- assertTrue(_store.getMessageMetaDataMap().size() == 0);
- }
-
- private ContentHeaderBody createPersistentContentHeader()
- {
- ContentHeaderBody chb = new ContentHeaderBody();
- BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
- bchp.setDeliveryMode((byte)2);
- chb.properties = bchp;
- return chb;
- }
-
- public void testMessageRemains() throws AMQException
- {
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- null,
- false,
- false,
- null,
- 0),
- new NonTransactionalContext(_store, _storeContext, null, null, null),
- createPersistentContentHeader());
- message.incrementReference();
- // we call routing complete to set up the handle
- message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertTrue(_store.getMessageMetaDataMap().size() == 1);
- message.incrementReference();
- message.decrementReference(_storeContext);
- assertTrue(_store.getMessageMetaDataMap().size() == 1);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TestReferenceCounting.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
deleted file mode 100644
index 9a649421dd..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.List;
-
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestableMemoryMessageStore extends MemoryMessageStore
-{
- public TestableMemoryMessageStore()
- {
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>();
- }
-
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
- {
- return _metaDataMap;
- }
-
- public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap()
- {
- return _contentBodyMap;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
deleted file mode 100644
index 1d9e30c24e..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.txn;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-
-import java.util.LinkedList;
-
-public class TxnBufferTest extends TestCase
-{
- private final LinkedList<MockOp> ops = new LinkedList<MockOp>();
-
- public void testCommit() throws AMQException
- {
- MockStore store = new MockStore();
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- //check relative ordering
- MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit();
- buffer.enlist(op);
- buffer.enlist(op);
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
-
- buffer.commit(null);
-
- validateOps();
- store.validate();
- }
-
- public void testRollback() throws AMQException
- {
- MockStore store = new MockStore();
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new MockOp().expectRollback());
- buffer.enlist(new MockOp().expectRollback());
- buffer.enlist(new MockOp().expectRollback());
-
- buffer.rollback(null);
-
- validateOps();
- store.validate();
- }
-
- public void testCommitWithFailureDuringPrepare() throws AMQException
- {
- MockStore store = new MockStore();
- store.beginTran(null);
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new StoreMessageOperation(store));
- buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
- buffer.enlist(new TxnTester(store));
- buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
- buffer.enlist(new FailedPrepare());
- buffer.enlist(new MockOp());
-
- buffer.commit(null);
- validateOps();
- store.validate();
- }
-
- public void testCommitWithPersistance() throws AMQException
- {
- MockStore store = new MockStore();
- store.beginTran(null);
- store.expectCommit();
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new StoreMessageOperation(store));
- buffer.enlist(new TxnTester(store));
-
- buffer.commit(null);
- validateOps();
- store.validate();
- }
-
- private void validateOps()
- {
- for (MockOp op : ops)
- {
- op.validate();
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TxnBufferTest.class);
- }
-
- class MockOp implements TxnOp
- {
- final Object PREPARE = "PREPARE";
- final Object COMMIT = "COMMIT";
- final Object UNDO_PREPARE = "UNDO_PREPARE";
- final Object ROLLBACK = "ROLLBACK";
-
- private final LinkedList expected = new LinkedList();
-
- MockOp()
- {
- ops.add(this);
- }
-
- public void prepare(StoreContext context)
- {
- assertEquals(expected.removeLast(), PREPARE);
- }
-
- public void commit(StoreContext context)
- {
- assertEquals(expected.removeLast(), COMMIT);
- }
-
- public void undoPrepare()
- {
- assertEquals(expected.removeLast(), UNDO_PREPARE);
- }
-
- public void rollback(StoreContext context)
- {
- assertEquals(expected.removeLast(), ROLLBACK);
- }
-
- private MockOp expect(Object optype)
- {
- expected.addFirst(optype);
- return this;
- }
-
- MockOp expectPrepare()
- {
- return expect(PREPARE);
- }
-
- MockOp expectCommit()
- {
- return expect(COMMIT);
- }
-
- MockOp expectUndoPrepare()
- {
- return expect(UNDO_PREPARE);
- }
-
- MockOp expectRollback()
- {
- return expect(ROLLBACK);
- }
-
- void validate()
- {
- assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
- }
-
- void clear()
- {
- expected.clear();
- }
- }
-
- class MockStore extends TestableMemoryMessageStore
- {
- final Object BEGIN = "BEGIN";
- final Object ABORT = "ABORT";
- final Object COMMIT = "COMMIT";
-
- private final LinkedList expected = new LinkedList();
- private boolean inTran;
-
- public void beginTran(StoreContext context) throws AMQException
- {
- inTran = true;
- }
-
- public void commitTran(StoreContext context) throws AMQException
- {
- assertEquals(expected.removeLast(), COMMIT);
- inTran = false;
- }
-
- public void abortTran(StoreContext context) throws AMQException
- {
- assertEquals(expected.removeLast(), ABORT);
- inTran = false;
- }
-
- public boolean inTran(StoreContext context)
- {
- return inTran;
- }
-
- private MockStore expect(Object optype)
- {
- expected.addFirst(optype);
- return this;
- }
-
- MockStore expectBegin()
- {
- return expect(BEGIN);
- }
-
- MockStore expectCommit()
- {
- return expect(COMMIT);
- }
-
- MockStore expectAbort()
- {
- return expect(ABORT);
- }
-
- void clear()
- {
- expected.clear();
- }
-
- void validate()
- {
- assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
- }
- }
-
- class NullOp implements TxnOp
- {
- public void prepare(StoreContext context) throws AMQException
- {
- }
- public void commit(StoreContext context)
- {
- }
- public void undoPrepare()
- {
- }
- public void rollback(StoreContext context)
- {
- }
- }
-
- class FailedPrepare extends NullOp
- {
- public void prepare() throws AMQException
- {
- throw new AMQException("Fail!");
- }
- }
-
- class TxnTester extends NullOp
- {
- private final MessageStore store;
-
- private final StoreContext context = new StoreContext();
-
- TxnTester(MessageStore store)
- {
- this.store = store;
- }
-
- public void prepare() throws AMQException
- {
- assertTrue("Expected prepare to be performed under txn", store.inTran(context));
- }
-
- public void commit()
- {
- assertTrue("Expected commit not to be performed under txn", !store.inTran(context));
- }
- }
-
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
deleted file mode 100644
index 1d17985ab5..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import org.apache.qpid.server.util.TimedRun;
-
-import java.util.concurrent.Callable;
-import java.util.Collection;
-
-public class AveragedRun implements Callable<RunStats>
-{
- private final RunStats stats = new RunStats();
- private final TimedRun test;
- private final int iterations;
-
- public AveragedRun(TimedRun test, int iterations)
- {
- this.test = test;
- this.iterations = iterations;
- }
-
- public RunStats call() throws Exception
- {
- for (int i = 0; i < iterations; i++)
- {
- stats.record(test.call());
- }
- return stats;
- }
-
- public void run() throws Exception
- {
- System.out.println(test + ": " + call());
- }
-
- public String toString()
- {
- return test.toString();
- }
-
- static void run(Collection<AveragedRun> tests) throws Exception
- {
- for(AveragedRun test : tests)
- {
- test.run();
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
deleted file mode 100644
index ec67fc68b3..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-public class RunStats
-{
- private long min = Long.MAX_VALUE;
- private long max;
- private long total;
- private int count;
-
- public void record(long time)
- {
- max = Math.max(time, max);
- min = Math.min(time, min);
- total += time;
- count++;
- }
-
- public long getMin()
- {
- return min;
- }
-
- public long getMax()
- {
- return max;
- }
-
- public long getAverage()
- {
- return total / count;
- }
-
- public String toString()
- {
- return "avg=" + getAverage() + ", min=" + min + ", max=" + max;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
deleted file mode 100644
index f801daf27c..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
-import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.AuthenticationManager;
-import org.apache.qpid.server.security.auth.NullAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.MapConfiguration;
-
-import java.util.HashMap;
-
-public class TestApplicationRegistry extends ApplicationRegistry
-{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private ManagedObjectRegistry _managedObjectRegistry;
-
- private AuthenticationManager _authenticationManager;
-
- private MessageStore _messageStore;
-
- public TestApplicationRegistry()
- {
- super(new MapConfiguration(new HashMap()));
- }
-
- public void initialise() throws Exception
- {
- _managedObjectRegistry = new NoopManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
- _authenticationManager = new NullAuthenticationManager();
- _messageStore = new TestableMemoryMessageStore();
-
- _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
- }
-
- public Configuration getConfiguration()
- {
- return _configuration;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ManagedObjectRegistry getManagedObjectRegistry()
- {
- return _managedObjectRegistry;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-}
-
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
deleted file mode 100644
index 1291380311..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import java.util.concurrent.Callable;
-
-public abstract class TimedRun implements Callable<Long>
-{
- private final String description;
-
- public TimedRun(String description)
- {
- this.description = description;
- }
-
- public Long call() throws Exception
- {
- setup();
- long start = System.currentTimeMillis();
- run();
- long stop = System.currentTimeMillis();
- teardown();
- return stop - start;
- }
-
- public String toString()
- {
- return description;
- }
-
- protected void setup() throws Exception{}
- protected void teardown() throws Exception{}
- protected abstract void run() throws Exception;
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java b/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java
deleted file mode 100644
index e859fac4af..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-
-import org.apache.qpid.client.transport.TransportConnection;
-
-public class VMBrokerSetup extends TestSetup
-{
- public VMBrokerSetup(Test t)
- {
- super(t);
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- try
- {
- TransportConnection.createVMBroker(1);
- }
- catch (Exception e)
- {
- fail("Unable to create broker: " + e);
- }
- }
-
- protected void tearDown() throws Exception
- {
- TransportConnection.killVMBroker(1);
- super.tearDown();
- }
-}