summaryrefslogtreecommitdiff
path: root/java/systests/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/test')
-rw-r--r--java/systests/src/test/java/log4j.properties28
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java82
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java183
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java247
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java127
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java99
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java149
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java111
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java297
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java215
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java330
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java257
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java50
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java175
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java85
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java251
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java102
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java144
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java123
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java54
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java102
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java67
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java42
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java295
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java66
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java57
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java107
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java52
-rw-r--r--java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java52
29 files changed, 0 insertions, 3949 deletions
diff --git a/java/systests/src/test/java/log4j.properties b/java/systests/src/test/java/log4j.properties
deleted file mode 100644
index 6d596d1d19..0000000000
--- a/java/systests/src/test/java/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-log4j.rootLogger=${root.logging.level}
-
-
-log4j.logger.org.apache.qpid=${amqj.logging.level}, console
-log4j.additivity.org.apache.qpid=false
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=all
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
deleted file mode 100644
index 21ad1b6a7f..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server;
-
-import junit.framework.TestCase;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.ManagedBroker;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-
-public class AMQBrokerManagerMBeanTest extends TestCase
-{
- private QueueRegistry _queueRegistry;
- private ExchangeRegistry _exchangeRegistry;
-
- public void testExchangeOperations() throws Exception
- {
- String exchange1 = "testExchange1_" + System.currentTimeMillis();
- String exchange2 = "testExchange2_" + System.currentTimeMillis();
- String exchange3 = "testExchange3_" + System.currentTimeMillis();
-
- assertTrue(_exchangeRegistry.getExchange(exchange1) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange2) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange3) == null);
-
- ManagedBroker mbean = new AMQBrokerManagerMBean();
- mbean.createNewExchange(exchange1,"direct",false, false);
- mbean.createNewExchange(exchange2,"topic",false, false);
- mbean.createNewExchange(exchange3,"headers",false, false);
-
- assertTrue(_exchangeRegistry.getExchange(exchange1) != null);
- assertTrue(_exchangeRegistry.getExchange(exchange2) != null);
- assertTrue(_exchangeRegistry.getExchange(exchange3) != null);
-
- mbean.unregisterExchange(exchange1);
- mbean.unregisterExchange(exchange2);
- mbean.unregisterExchange(exchange3);
-
- assertTrue(_exchangeRegistry.getExchange(exchange1) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange2) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange3) == null);
- }
-
- public void testQueueOperations() throws Exception
- {
- String queueName = "testQueue_" + System.currentTimeMillis();
- ManagedBroker mbean = new AMQBrokerManagerMBean();
-
- assertTrue(_queueRegistry.getQueue(queueName) == null);
-
- mbean.createNewQueue(queueName, false, "test", true);
- assertTrue(_queueRegistry.getQueue(queueName) != null);
-
- mbean.deleteQueue(queueName);
- assertTrue(_queueRegistry.getQueue(queueName) == null);
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
deleted file mode 100644
index 1a15ca7561..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.ack;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-public class TxAckTest extends TestCase
-{
- private Scenario individual;
- private Scenario multiple;
- private Scenario combined;
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- //ack only 5th msg
- individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
- individual.update(5, false);
-
- //ack all up to and including 5th msg
- multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
- multiple.update(5, true);
-
- //leave only 8th and 9th unacked
- combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
- combined.update(3, false);
- combined.update(5, true);
- combined.update(7, true);
- combined.update(2, true);//should be ignored
- combined.update(1, false);//should be ignored
- combined.update(10, false);
- }
-
- public void testPrepare() throws AMQException
- {
- individual.prepare();
- multiple.prepare();
- combined.prepare();
- }
-
- public void testUndoPrepare() throws AMQException
- {
- individual.undoPrepare();
- multiple.undoPrepare();
- combined.undoPrepare();
- }
-
- public void testCommit() throws AMQException
- {
- individual.commit();
- multiple.commit();
- combined.commit();
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TxAckTest.class);
- }
-
- private class Scenario
- {
- private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>();
- private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages);
- private final TxAck _op = new TxAck(_map);
- private final List<Long> _acked;
- private final List<Long> _unacked;
-
- Scenario(int messageCount, List<Long> acked, List<Long> unacked)
- {
- for(int i = 0; i < messageCount; i++)
- {
- long deliveryTag = i + 1;
- _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag));
- }
- _acked = acked;
- _unacked = unacked;
- }
-
- void update(long deliverytag, boolean multiple)
- {
- _op.update(deliverytag, multiple);
- }
-
- private void assertCount(List<Long> tags, int expected)
- {
- for(long tag : tags)
- {
- UnacknowledgedMessage u = _messages.get(tag);
- assertTrue("Message not found for tag " + tag, u != null);
- ((TestMessage) u.message).assertCountEquals(expected);
- }
- }
-
- void prepare() throws AMQException
- {
- _op.consolidate();
- _op.prepare();
-
- assertCount(_acked, -1);
- assertCount(_unacked, 0);
-
- }
- void undoPrepare()
- {
- _op.consolidate();
- _op.undoPrepare();
-
- assertCount(_acked, 1);
- assertCount(_unacked, 0);
- }
-
- void commit()
- {
- _op.consolidate();
- _op.commit();
-
-
- //check acked messages are removed from map
- HashSet<Long> keys = new HashSet<Long>(_messages.keySet());
- keys.retainAll(_acked);
- assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
- //check unacked messages are still in map
- keys = new HashSet<Long>(_unacked);
- keys.removeAll(_messages.keySet());
- assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
- }
- }
-
- private class TestMessage extends AMQMessage
- {
- private final long _tag;
- private int _count;
-
- TestMessage(long tag)
- {
- super(new TestableMemoryMessageStore(), null);
- _tag = tag;
- }
-
- public void incrementReference()
- {
- _count++;
- }
-
- public void decrementReference()
- {
- _count--;
- }
-
- void assertCountEquals(int expected)
- {
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
deleted file mode 100644
index 6320a2c1be..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import junit.framework.TestCase;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class AbstractHeadersExchangeTestBase extends TestCase
-{
- private final HeadersExchange exchange = new HeadersExchange();
- protected final Set<TestQueue> queues = new HashSet<TestQueue>();
- private int count;
-
- public void testDoNothing()
- {
- // this is here only to make junit under Eclipse happy
- }
-
- protected TestQueue bindDefault(String... bindings) throws AMQException
- {
- return bind("Queue" + (++count), bindings);
- }
-
- protected TestQueue bind(String queueName, String... bindings) throws AMQException
- {
- return bind(queueName, getHeaders(bindings));
- }
-
- protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
- {
- return bind(new TestQueue(queue), bindings);
- }
-
- protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
- {
- return bind(queue, getHeaders(bindings));
- }
-
- protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
- {
- queues.add(queue);
- exchange.registerQueue(null, queue, bindings);
- return queue;
- }
-
-
- protected void route(Message m) throws AMQException
- {
- m.route(exchange);
- }
-
- protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, false, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, expectReturn, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
- {
- try
- {
- route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
- for (TestQueue q : queues)
- {
- if (expected.contains(q))
- {
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
- }
- }
- }
-
- catch (NoRouteException ex)
- {
- assertTrue("Expected "+m+" not to be returned",expectReturn);
- }
-
- }
-
- static FieldTable getHeaders(String... entries)
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- for (String s : entries)
- {
- String[] parts = s.split("=", 2);
- headers.put(parts[0], parts.length > 1 ? parts[1] : "");
- }
- return headers;
- }
-
- static class TestQueue extends AMQQueue
- {
- final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
-
- public TestQueue(String name) throws AMQException
- {
- super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry());
- }
-
- public void deliver(AMQMessage msg) throws AMQException
- {
- messages.add(new HeadersExchangeTest.Message(msg));
- }
- }
-
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends AMQMessage
- {
- private static MessageStore _messageStore = new SkeletonMessageStore();
-
- Message(String id, String... headers) throws AMQException
- {
- this(id, getHeaders(headers));
- }
-
- Message(String id, FieldTable headers) throws AMQException
- {
- this(_messageStore, getMessageTransferBody(id,headers), new ArrayList());
- }
-
- private static MessageTransferBody getMessageTransferBody(String id,FieldTable headers){
- MessageHeaders messageHeaders = new MessageHeaders();
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- headers, // FieldTable applicationHeaders
- new Content(), // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- false, // boolean immediate
- false, // boolean mandatory
- null, // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- id, // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- return methodBody;
- }
-
- Message(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents) throws AMQException
- {
- super(_messageStore, transferBody, contents);
- }
-
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg);
- }
-
- void route(Exchange exchange) throws AMQException
- {
- exchange.route(this);
- }
-
- boolean isInQueue(TestQueue queue)
- {
- return queue.messages.contains(this);
- }
-
- public int hashCode()
- {
- return getKey().hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
- }
-
- private boolean equals(HeadersExchangeTest.Message m)
- {
- return getKey().equals(m.getKey());
- }
-
- public String toString()
- {
- return getKey().toString();
- }
-
- private Object getKey()
- {
- return this.getTransferBody().getRoutingKey();
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
deleted file mode 100644
index bb88d2e8d0..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import junit.framework.TestCase;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.management.ManagedObject;
-
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-import java.util.ArrayList;
-
-/**
- * Unit test class for testing different Exchange MBean operations
- */
-public class ExchangeMBeanTest extends TestCase
-{
- private AMQQueue _queue;
- private QueueRegistry _queueRegistry;
-
- /**
- * Test for direct exchange mbean
- * @throws Exception
- */
-
- public void testDirectExchangeMBean() throws Exception
- {
- DestNameExchange exchange = new DestNameExchange();
- exchange.initialise("amq.direct", false, 0, true);
- ManagedObject managedObj = exchange.getManagedObject();
- ManagedExchange mbean = (ManagedExchange)managedObj;
-
- mbean.createNewBinding(_queue.getName(), "binding1");
- mbean.createNewBinding(_queue.getName(), "binding2");
-
- TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
- assertTrue(list.size() == 2);
-
- // test general exchange properties
- assertEquals(mbean.getName(), "amq.direct");
- assertEquals(mbean.getExchangeType(), "direct");
- assertTrue(mbean.getTicketNo() == 0);
- assertTrue(!mbean.isDurable());
- assertTrue(mbean.isAutoDelete());
- }
-
- /**
- * Test for "topic" exchange mbean
- * @throws Exception
- */
-
- public void testTopicExchangeMBean() throws Exception
- {
- DestWildExchange exchange = new DestWildExchange();
- exchange.initialise("amq.topic", false, 0, true);
- ManagedObject managedObj = exchange.getManagedObject();
- ManagedExchange mbean = (ManagedExchange)managedObj;
-
- mbean.createNewBinding(_queue.getName(), "binding1");
- mbean.createNewBinding(_queue.getName(), "binding2");
-
- TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
- assertTrue(list.size() == 2);
-
- // test general exchange properties
- assertEquals(mbean.getName(), "amq.topic");
- assertEquals(mbean.getExchangeType(), "topic");
- assertTrue(mbean.getTicketNo() == 0);
- assertTrue(!mbean.isDurable());
- assertTrue(mbean.isAutoDelete());
- }
-
- /**
- * Test for "Headers" exchange mbean
- * @throws Exception
- */
-
- public void testHeadersExchangeMBean() throws Exception
- {
- HeadersExchange exchange = new HeadersExchange();
- exchange.initialise("amq.headers", false, 0, true);
- ManagedObject managedObj = exchange.getManagedObject();
- ManagedExchange mbean = (ManagedExchange)managedObj;
-
- mbean.createNewBinding(_queue.getName(), "key1=binding1,key2=binding2");
- mbean.createNewBinding(_queue.getName(), "key3=binding3");
-
- TabularData data = mbean.bindings();
- ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
- assertTrue(list.size() == 2);
-
- // test general exchange properties
- assertEquals(mbean.getName(), "amq.headers");
- assertEquals(mbean.getExchangeType(), "headers");
- assertTrue(mbean.getTicketNo() == 0);
- assertTrue(!mbean.isDurable());
- assertTrue(mbean.isAutoDelete());
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
- _queue = new AMQQueue("testQueue", false, "ExchangeMBeanTest", false, _queueRegistry);
- _queueRegistry.registerQueue(_queue);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
deleted file mode 100644
index 00a645628f..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-
-public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
-{
- protected void setUp() throws Exception
- {
- super.setUp();
- ApplicationRegistry.initialise(new TestApplicationRegistry());
- }
-
- public void testSimple() throws AMQException
- {
- TestQueue q1 = bindDefault("F0000");
- TestQueue q2 = bindDefault("F0000=Aardvark");
- TestQueue q3 = bindDefault("F0001");
- TestQueue q4 = bindDefault("F0001=Bear");
- TestQueue q5 = bindDefault("F0000", "F0001");
- TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
- TestQueue q7 = bindDefault("F0000", "F0001=Bear");
- TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
-
- routeAndTest(new Message("Message1", "F0000"), q1);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
- q1, q2, q3, q4, q5, q6, q7, q8);
- routeAndTest(new Message("Message6", "F0002"));
-
- Message m7 = new Message("Message7", "XXXXX");
-
- MessageTransferBody tb7 = m7.getTransferBody();
- tb7.mandatory = true;
- routeAndTest(m7,true);
-
- Message m8 = new Message("Message8", "F0000");
- MessageTransferBody tb8 = m8.getTransferBody();
- tb8.mandatory = true;
- routeAndTest(m8,false,q1);
- }
-
- public void testAny() throws AMQException
- {
- TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
- TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
- TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
- TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
- TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
-
- routeAndTest(new Message("Message1", "F0000"), q1, q3);
- routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
- routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
- routeAndTest(new Message("Message6", "F0002"));
- }
-
- public void testMandatory() throws AMQException
- {
- bindDefault("F0000");
- Message m1 = new Message("Message1", "XXXXX");
- Message m2 = new Message("Message2", "F0000");
- MessageTransferBody tb1 = m1.getTransferBody();
- tb1.mandatory = true;
- MessageTransferBody tb2 = m2.getTransferBody();
- tb2.mandatory = true;
- routeAndTest(m1,true);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(HeadersExchangeTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
deleted file mode 100644
index 546c61eda0..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package org.apache.qpid.server.exchange;
-
-import junit.framework.TestCase;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.client.*;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.FieldTable;
-
-import javax.jms.*;
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
-
-public class ReturnUnroutableMandatoryMessageTest extends TestCase implements ExceptionListener
-{
- private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
-
- private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
-
- static
- {
- String workdir = System.getProperty("QPID_WORK");
- if (workdir == null || workdir.equals(""))
- {
- String tempdir = System.getProperty("java.io.tmpdir");
- System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
- System.setProperty("QPID_WORK", tempdir);
- }
-// DOMConfigurator.configure("../broker/etc/log4j.xml");
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- TransportConnection.createVMBroker(1);
- ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- TransportConnection.killAllVMBrokers();
- }
-
- /**
- * Tests that mandatory message which are not routable are returned to the producer
- *
- * @throws Exception
- */
- public void testReturnUnroutableMandatoryMessage() throws Exception
- {
- _bouncedMessageList.clear();
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
-
-
- AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
- FieldTable ft = new FieldTable();
- ft.setString("F1000", "1");
- MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
-
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
-
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
-
- con2.setExceptionListener(this);
- AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- // Need to start the "producer" connection in order to receive bounced messages
- _logger.info("Starting producer connection");
- con2.start();
-
-
- MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false);
- MessageProducer mandatoryProducer = producerSession.createProducer(queue);
-
- // First test - should neither be bounced nor routed
- _logger.info("Sending non-routable non-mandatory message");
- TextMessage msg1 = producerSession.createTextMessage("msg1");
- nonMandatoryProducer.send(msg1);
-
- // Second test - should be bounced
- _logger.info("Sending non-routable mandatory message");
- TextMessage msg2 = producerSession.createTextMessage("msg2");
- mandatoryProducer.send(msg2);
-
- // Third test - should be routed
- _logger.info("Sending routable message");
- TextMessage msg3 = producerSession.createTextMessage("msg3");
- msg3.setStringProperty("F1000", "1");
- mandatoryProducer.send(msg3);
-
-
- _logger.info("Starting consumer connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive(1000L);
-
- assertTrue("No message routed to receiver", tm != null);
- assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
-
- try
- {
- Thread.sleep(1000L);
- }
- catch (InterruptedException e)
- {
- ;
- }
-
- assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
- Message m = _bouncedMessageList.get(0);
- assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
-
-
- con.close();
- con2.close();
-
-
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class);
- }
-
- public void onException(JMSException jmsException)
- {
-
- Exception linkedException = jmsException.getLinkedException();
- if (linkedException instanceof AMQNoRouteException)
- {
- AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
- Message bounced = (Message) noRoute.getUndeliveredMessage();
- _bouncedMessageList.add(bounced);
- _logger.info("Caught expected NoRouteException");
- }
- else
- {
- _logger.warn("Caught exception on producer: ", jmsException);
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
deleted file mode 100644
index f84f14f28d..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import junit.framework.TestCase;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.AMQException;
-
-import javax.management.JMException;
-import javax.management.openmbean.OpenDataException;
-
-/**
- * Test class to test MBean operations for AMQMinaProtocolSession.
- */
-public class AMQProtocolSessionMBeanTest extends TestCase
-{
- private IoSession _mockIOSession;
- private MessageStore _messageStore = new SkeletonMessageStore();
- private AMQMinaProtocolSession _protocolSession;
- private AMQChannel _channel;
- private QueueRegistry _queueRegistry;
- private ExchangeRegistry _exchangeRegistry;
- private AMQProtocolSessionMBean _mbean;
-
- public void testChannels() throws Exception
- {
- // check the channel count is correct
- int channelCount = _mbean.channels().size();
- assertTrue(channelCount == 2);
- _protocolSession.addChannel(new AMQChannel(2,_protocolSession, _messageStore, null,null));
- channelCount = _mbean.channels().size();
- assertTrue(channelCount == 3);
-
- // general properties test
- _mbean.setMaximumNumberOfChannels(1000L);
- assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
-
- // check APIs
- AMQChannel channel3 = new AMQChannel(3,_protocolSession, _messageStore, null,null);
- channel3.setTransactional(true);
- _protocolSession.addChannel(channel3);
- _mbean.rollbackTransactions(2);
- _mbean.rollbackTransactions(3);
- _mbean.commitTransactions(2);
- _mbean.commitTransactions(3);
-
- // This should throw exception, because the channel does't exist
- try
- {
- _mbean.commitTransactions(4);
- fail();
- }
- catch (JMException ex)
- {
- System.out.println("expected exception is thrown :" + ex.getMessage());
- }
-
- // check if closing of session works
- _protocolSession.addChannel(new AMQChannel(5,_protocolSession, _messageStore, null,null));
- _mbean.closeConnection();
- channelCount = _mbean.channels().size();
- assertTrue(channelCount == 0);
- try
- {
- // session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(6,_protocolSession, _messageStore, null,null));
- fail();
- }
- catch(IllegalStateException ex)
- {
- System.out.println("expected exception is thrown :" + ex.getMessage());
- }
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory());
- _mockIOSession = new MockIoSession();
- _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true));
- _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null);
- _protocolSession.addChannel(_channel);
- _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
deleted file mode 100644
index cf6366b513..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.DefaultCloseFuture;
-import org.apache.mina.common.support.DefaultWriteFuture;
-
-import java.net.SocketAddress;
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-public class MockIoSession implements IoSession
-{
- private AMQProtocolSession _protocolSession;
-
- /**
- * Stores the last response written
- */
- private Object _lastWrittenObject;
-
- private boolean _closing;
-
- public MockIoSession()
- {
- }
-
- public Object getLastWrittenObject()
- {
- return _lastWrittenObject;
- }
-
- public IoService getService()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return null;
- }
-
- public IoHandler getHandler()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoSessionConfig getConfig()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public IoFilterChain getFilterChain()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public WriteFuture write(Object message)
- {
- WriteFuture wf = new DefaultWriteFuture(null);
- _lastWrittenObject = message;
- return wf;
- }
-
- public CloseFuture close()
- {
- _closing = true;
- CloseFuture cf = new DefaultCloseFuture(null);
- cf.setClosed();
- return cf;
- }
-
- public Object getAttachment()
- {
- return _protocolSession;
- }
-
- public Object setAttachment(Object attachment)
- {
- Object current = _protocolSession;
- _protocolSession = (AMQProtocolSession) attachment;
- return current;
- }
-
- public Object getAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key, Object value)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object setAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object removeAttribute(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean containsAttribute(String key)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Set getAttributeKeys()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TransportType getTransportType()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isConnected()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isClosing()
- {
- return _closing;
- }
-
- public CloseFuture getCloseFuture()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getLocalAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SocketAddress getServiceAddress()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getWriteTimeout()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public TrafficMask getTrafficMask()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void suspendWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeRead()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resumeWrite()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getWrittenBytes()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getReadMessages()
- {
- return 0L;
- }
-
- public long getWrittenMessages()
- {
- return 0L;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteRequests()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getScheduledWriteBytes()
- {
- return 0; //TODO
- }
-
- public long getCreationTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIoTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastReadTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastWriteTime()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
deleted file mode 100644
index ae398961be..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.Collections;
-
-import javax.management.JMException;
-
-import junit.framework.TestCase;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.Content;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-
-/**
- * Test class to test AMQQueueMBean attribtues and operations
- */
-public class AMQQueueMBeanTest extends TestCase
-{
- private AMQQueue _queue;
- private AMQQueueMBean _queueMBean;
- private QueueRegistry _queueRegistry;
- private MessageStore _messageStore = new SkeletonMessageStore();
- private MockProtocolSession _protocolSession;
- private AMQChannel _channel;
-
- public void testMessageCount() throws Exception
- {
- int messageCount = 10;
- sendMessages(messageCount);
- assertTrue(_queueMBean.getMessageCount() == messageCount);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- // each message is 1K
- assertTrue(_queueMBean.getQueueDepth() == messageCount);
-
- _queueMBean.deleteMessageFromTop();
- assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-
- _queueMBean.clearQueue();
- assertTrue(_queueMBean.getMessageCount() == 0);
- assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- }
-
- public void testConsumerCount() throws Exception
- {
- SubscriptionManager mgr = _queue.getSubscribers();
- assertFalse(mgr.hasActiveSubscribers());
- assertTrue(_queueMBean.getActiveConsumerCount() == 0);
-
- _protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null);
- _protocolSession.addChannel(_channel);
-
- _queue.registerProtocolSession(_protocolSession, 1, "test", false, null, false, false);
- assertTrue(_queueMBean.getActiveConsumerCount() == 1);
-
- SubscriptionSet _subscribers = (SubscriptionSet) mgr;
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
- _subscribers.addSubscriber(s1);
- _subscribers.addSubscriber(s2);
- assertTrue(_queueMBean.getActiveConsumerCount() == 3);
- assertTrue(_queueMBean.getConsumerCount() == 3);
-
- s1.setSuspended(true);
- assertTrue(_queueMBean.getActiveConsumerCount() == 2);
- assertTrue(_queueMBean.getConsumerCount() == 3);
- }
-
- public void testGeneralProperties()
- {
- _queueMBean.setMaximumMessageCount(50000);
- _queueMBean.setMaximumMessageSize(2000l);
- _queueMBean.setMaximumQueueDepth(1000l);
-
- assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
- assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
- assertTrue(_queueMBean.getMaximumQueueDepth() == 1000);
-
- assertTrue(_queueMBean.getName().equals("testQueue"));
- assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
- assertFalse(_queueMBean.isAutoDelete());
- assertFalse(_queueMBean.isDurable());
- }
-
- public void testExceptions() throws Exception
- {
- try
- {
- _queueMBean.viewMessages(0, 3);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- try
- {
- _queueMBean.viewMessages(2, 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- try
- {
- _queueMBean.viewMessages(-1, 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
-
- AMQMessage msg = message(false);
- long id = msg.getMessageId();
- _queue.clearQueue();
- _queue.deliver(msg);
- _queueMBean.viewMessageContent(id);
- try
- {
- _queueMBean.viewMessageContent(id + 1);
- fail();
- }
- catch (JMException ex)
- {
-
- }
- }
-
- private AMQMessage message(boolean immediate) throws AMQException
- {
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Establish some way to determine the version for the test.
-
- MessageHeaders messageHeaders = new MessageHeaders();
-
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1000]);
- Content body = new Content(Content.TypeEnum.INLINE_T, buffer);
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- body, // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- false, // boolean mandatory
- "", // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- "rk", // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- return new AMQMessage(_messageStore, methodBody, Collections.singletonList(buffer));
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _queueRegistry = new DefaultQueueRegistry();
- _queue = new AMQQueue("testQueue", false, "AMQueueMBeanTest", false, _queueRegistry);
- _queueMBean = new AMQQueueMBean(_queue);
- }
-
- private void sendMessages(int messageCount) throws AMQException
- {
- AMQMessage[] messages = new AMQMessage[messageCount];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message(false);
- ;
- }
- for (int i = 0; i < messageCount; i++)
- {
- _queue.deliver(messages[i]);
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
deleted file mode 100644
index ca777ca535..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.Content;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-
-/**
- * Tests that acknowledgements are handled correctly.
- */
-public class AckTest extends TestCase
-{
- private static final Logger _log = Logger.getLogger(AckTest.class);
-
- private SubscriptionImpl _subscription;
-
- private MockProtocolSession _protocolSession;
-
- private TestableMemoryMessageStore _messageStore;
-
- private AMQChannel _channel;
-
- private SubscriptionSet _subscriptionManager;
-
- private AMQQueue _queue;
-
- public AckTest() throws Exception
- {
- ApplicationRegistry.initialise(new TestApplicationRegistry());
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _messageStore = new TestableMemoryMessageStore();
- _protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(5,_protocolSession, _messageStore, null/*dont need exchange registry*/,null);
- _protocolSession.addChannel(_channel);
- _subscriptionManager = new SubscriptionSet();
- _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager);
- }
-
- private void publishMessages(int count) throws AMQException
- {
- publishMessages(count, false);
- }
-
- private void publishMessages(int count, boolean persistent) throws AMQException
- {
- for (int i = 1; i <= count; i++)
- {
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Establish some way to determine the version for the test.
- //AMQMessage(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents)
-
- MessageHeaders messageHeaders = new MessageHeaders();
-
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- new Content(), // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- persistent ? (short)2: (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- false, // boolean immediate
- false, // boolean mandatory
- "" + i, // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- "rk", // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- AMQMessage msg = new AMQMessage(_messageStore, methodBody, new ArrayList());
- _subscription.send(msg, _queue);
- }
- }
-
- /**
- * Tests that the acknowledgements are correctly associated with a channel and
- * order is preserved when acks are enabled
- */
- public void testAckChannelAssociationTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
- final int msgCount = 10;
- publishMessages(msgCount, true);
-
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMap().size() == msgCount);
-
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
- for (int i = 1; i <= map.size(); i++)
- {
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- }
-
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMap().size() == msgCount);
- }
-
- /**
- * Tests that in no-ack mode no messages are retained
- */
- public void testNoAckMode() throws AMQException
- {
- // false arg means no acks expected
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMap().size() == 0);
- }
-
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
- public void testSingleAckReceivedTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(5, false);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount - 1);
-
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
- int i = 1;
- while (i <= map.size())
- {
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- // 5 is the delivery tag of the message that *should* be removed
- if (++i == 5)
- {
- ++i;
- }
- }
- }
-
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
- public void testMultiAckReceivedTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(5, true);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 5);
-
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
- int i = 1;
- while (i <= map.size())
- {
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i + 5);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- ++i;
- }
- }
-
- /**
- * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
- */
- public void testMultiAckAllReceivedTest() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(0, true);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 0);
-
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
- int i = 1;
- while (i <= map.size())
- {
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i + 5);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- ++i;
- }
- }
-
- public void testPrefetchHighLow() throws AMQException
- {
- int lowMark = 5;
- int highMark = 10;
-
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
- _channel.setPrefetchLowMarkCount(lowMark);
- _channel.setPrefetchHighMarkCount(highMark);
-
- assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
- assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
-
- publishMessages(highMark);
-
- // at this point we should have sent out only highMark messages
- // which have not bee received so will be queued up in the channel
- // which should be suspended
- assertTrue(_subscription.isSuspended());
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == highMark);
-
- //acknowledge messages so we are just above lowMark
- _channel.acknowledgeMessage(lowMark - 1, true);
-
- //we should still be suspended
- assertTrue(_subscription.isSuspended());
- assertTrue(map.size() == lowMark + 1);
-
- //acknowledge one more message
- _channel.acknowledgeMessage(lowMark, true);
-
- //and suspension should be lifted
- assertTrue(!_subscription.isSuspended());
-
- //pubilsh more msgs so we are just below the limit
- publishMessages(lowMark - 1);
-
- //we should not be suspended
- assertTrue(!_subscription.isSuspended());
-
- //acknowledge all messages
- _channel.acknowledgeMessage(0, true);
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- _log.error("Error: " + e, e);
- }
- //map will be empty
- assertTrue(map.size() == 0);
- }
-
- public void testPrefetch() throws AMQException
- {
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
- _channel.setPrefetchCount(5);
-
- assertTrue(_channel.getPrefetchCount() == 5);
-
- final int msgCount = 5;
- publishMessages(msgCount);
-
- // at this point we should have sent out only 5 messages with a further 5 queued
- // up in the channel which should now be suspended
- assertTrue(_subscription.isSuspended());
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 5);
- _channel.acknowledgeMessage(5, true);
- assertTrue(!_subscription.isSuspended());
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- _log.error("Error: " + e, e);
- }
- assertTrue(map.size() == 0);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(AckTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
deleted file mode 100644
index fe8960c872..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-
-import java.util.*;
-import java.util.concurrent.Executor;
-
-/**
- * Tests delivery in the face of concurrent incoming _messages, subscription alterations
- * and attempts to asynchronously process queued _messages.
- */
-public class ConcurrencyTest extends MessageTestHelper
-{
- private final Random random = new Random();
-
- private final int numMessages = 1000;
-
- private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
- private final Set<Subscription> _active = new HashSet<Subscription>();
- private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
- private int next = 0;//index to next message to send
- private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
- private final Executor _executor = new OnCurrentThreadExecutor();
- private final List<Thread> _threads = new ArrayList<Thread>();
-
- private final SubscriptionSet _subscriptionMgr = new SubscriptionSet();
- private final DeliveryManager _deliveryMgr;
-
- private boolean isComplete;
- private boolean failed;
-
- public ConcurrencyTest() throws Exception
- {
- _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
- new DefaultQueueRegistry()));
- }
-
- public void testConcurrent1() throws InterruptedException, AMQException
- {
- initSubscriptions(10);
- initMessages(numMessages);
- initThreads(1, 4, 4, 4);
- doRun();
- check();
- }
-
- public void testConcurrent2() throws InterruptedException, AMQException
- {
- initSubscriptions(10);
- initMessages(numMessages);
- initThreads(4, 2, 2, 2);
- doRun();
- check();
- }
-
- void check()
- {
- assertFalse("Failed", failed);
-
- _deliveryMgr.processAsync(_executor);
-
- assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size());
- for(int i = 0; i < _messages.size(); i++)
- {
- assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i));
- }
- }
-
- void initSubscriptions(int subscriptions)
- {
- for(int i = 0; i < subscriptions; i++)
- {
- _subscribers.add(new SubscriptionTestHelper("Subscriber" + i, _received));
- }
- }
-
- void initMessages(int messages) throws AMQException
- {
- for(int i = 0; i < messages; i++)
- {
- _messages.add(message());
- }
- }
-
- void initThreads(int senders, int subscribers, int suspenders, int processors)
- {
- addThreads(senders, senders == 1 ? new Sender() : new OrderedSender());
- addThreads(subscribers, new Subscriber());
- addThreads(suspenders, new Suspender());
- addThreads(processors, new Processor());
- }
-
- void addThreads(int count, Runnable runner)
- {
- for(int i = 0; i < count; i++)
- {
- _threads.add(new Thread(runner, runner.toString()));
- }
- }
-
- void doRun() throws InterruptedException
- {
- for(Thread t : _threads)
- {
- t.start();
- }
-
- for(Thread t : _threads)
- {
- t.join();
- }
- }
-
- private void toggle(Subscription s)
- {
- synchronized (_active)
- {
- if (_active.contains(s))
- {
- _active.remove(s);
- Subscription result = _subscriptionMgr.removeSubscriber(s);
- assertTrue("Removed subscription " + result + " but trying to remove subscription " + s,
- result != null && result.equals(s));
- }
- else
- {
- _active.add(s);
- _subscriptionMgr.addSubscriber(s);
- }
- }
- }
-
- private AMQMessage nextMessage()
- {
- synchronized (_messages)
- {
- if (next < _messages.size())
- {
- return _messages.get(next++);
- }
- else
- {
- if (!_deliveryMgr.hasQueuedMessages()) {
- isComplete = true;
- }
- return null;
- }
- }
- }
-
- private boolean randomBoolean()
- {
- return random.nextBoolean();
- }
-
- private SubscriptionTestHelper randomSubscriber()
- {
- return _subscribers.get(random.nextInt(_subscribers.size()));
- }
-
- private class Sender extends Runner
- {
- void doRun() throws Throwable
- {
- AMQMessage msg = nextMessage();
- if (msg != null)
- {
- _deliveryMgr.deliver(toString(), msg);
- }
- }
- }
-
- private class OrderedSender extends Sender
- {
- synchronized void doRun() throws Throwable
- {
- super.doRun();
- }
- }
-
- private class Suspender extends Runner
- {
- void doRun() throws Throwable
- {
- randomSubscriber().setSuspended(randomBoolean());
- }
- }
-
- private class Subscriber extends Runner
- {
- void doRun() throws Throwable
- {
- toggle(randomSubscriber());
- }
- }
-
- private class Processor extends Runner
- {
- void doRun() throws Throwable
- {
- _deliveryMgr.processAsync(_executor);
- }
- }
-
- private abstract class Runner implements Runnable
- {
- public void run()
- {
- try
- {
- while (!stop())
- {
- doRun();
- }
- }
- catch (Throwable t)
- {
- failed = true;
- t.printStackTrace();
- }
- }
-
- abstract void doRun() throws Throwable;
-
- boolean stop()
- {
- return isComplete || failed;
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ConcurrencyTest.class);
- }
-
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
deleted file mode 100644
index 3072d44f48..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-
-public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
-{
- public ConcurrentDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","true");
- _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ConcurrentDeliveryManagerTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
deleted file mode 100644
index 3631264e5a..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-import org.apache.qpid.AMQException;
-
-import junit.framework.TestSuite;
-
-abstract public class DeliveryManagerTest extends MessageTestHelper
-{
- protected final SubscriptionSet _subscriptions = new SubscriptionSet();
- protected DeliveryManager _mgr;
-
- public DeliveryManagerTest() throws Exception
- {
- }
-
- public void testStartInQueueingMode() throws AMQException
- {
- AMQMessage[] messages = new AMQMessage[10];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message();
- }
- int batch = messages.length / 2;
-
- for (int i = 0; i < batch; i++)
- {
- _mgr.deliver("Me", messages[i]);
- }
-
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("2");
- _subscriptions.addSubscriber(s1);
- _subscriptions.addSubscriber(s2);
-
- for (int i = batch; i < messages.length; i++)
- {
- _mgr.deliver("Me", messages[i]);
- }
-
- assertTrue(s1.getMessages().isEmpty());
- assertTrue(s2.getMessages().isEmpty());
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
-
- assertEquals(messages.length / 2, s1.getMessages().size());
- assertEquals(messages.length / 2, s2.getMessages().size());
-
- for (int i = 0; i < messages.length; i++)
- {
- if (i % 2 == 0)
- {
- assertTrue(s1.getMessages().get(i / 2) == messages[i]);
- }
- else
- {
- assertTrue(s2.getMessages().get(i / 2) == messages[i]);
- }
- }
- }
-
- public void testStartInDirectMode() throws AMQException
- {
- AMQMessage[] messages = new AMQMessage[10];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message();
- }
- int batch = messages.length / 2;
-
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
- _subscriptions.addSubscriber(s1);
-
- for (int i = 0; i < batch; i++)
- {
- _mgr.deliver("Me", messages[i]);
- }
-
- assertEquals(batch, s1.getMessages().size());
- for (int i = 0; i < batch; i++)
- {
- assertTrue(messages[i] == s1.getMessages().get(i));
- }
- s1.getMessages().clear();
- assertEquals(0, s1.getMessages().size());
-
- s1.setSuspended(true);
- for (int i = batch; i < messages.length; i++)
- {
- _mgr.deliver("Me", messages[i]);
- }
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
- assertEquals(0, s1.getMessages().size());
- s1.setSuspended(false);
-
- _mgr.processAsync(new OnCurrentThreadExecutor());
- assertEquals(messages.length - batch, s1.getMessages().size());
-
- for (int i = batch; i < messages.length; i++)
- {
- assertTrue(messages[i] == s1.getMessages().get(i - batch));
- }
-
- }
-
- public void testNoConsumers() throws AMQException
- {
- try
- {
- AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
- msg.checkDeliveredToConsumer();
- fail("expected exception did not occur");
- }
- catch (NoConsumersException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NoConsumersException, got " + e);
- }
- }
-
- public void testNoActiveConsumers() throws AMQException
- {
- try
- {
- SubscriptionTestHelper s = new SubscriptionTestHelper("A");
- _subscriptions.addSubscriber(s);
- s.setSuspended(true);
- AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
- msg.checkDeliveredToConsumer();
- fail("expected exception did not occur");
- }
- catch (NoConsumersException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NoConsumersException, got " + e);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
- return suite;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
deleted file mode 100644
index 3eba217903..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.ArrayList;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.Content;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-
-class MessageTestHelper extends TestCase
-{
- private final MessageStore _messageStore = new SkeletonMessageStore();
-
- MessageTestHelper() throws Exception
- {
- ApplicationRegistry.initialise(new TestApplicationRegistry());
- }
-
- AMQMessage message() throws AMQException
- {
- return message(false);
- }
-
- AMQMessage message(boolean immediate) throws AMQException
- {
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Establish some way to determine the version for the test.
- MessageHeaders messageHeaders = new MessageHeaders();
-
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- new Content(), // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- false, // boolean mandatory
- "", // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- "rk", // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- return new AMQMessage(_messageStore, methodBody, new ArrayList());
- }
-
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
deleted file mode 100644
index 400b21c5b1..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.store.MessageStore;
-
-import javax.security.sasl.SaslServer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A protocol session that can be used for testing purposes.
- */
-public class MockProtocolSession implements AMQProtocolSession
-{
- private MessageStore _messageStore;
-
- private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
-
- // Keeps a tally of connections for logging and debugging
- private static AtomicInteger _ConnectionId;
- static { _ConnectionId = new AtomicInteger(0); }
-
- public MockProtocolSession(MessageStore messageStore)
- {
- _ConnectionId.incrementAndGet();
- _messageStore = messageStore;
- }
-
- public void dataBlockReceived(AMQDataBlock message) throws Exception
- {
- }
-
- public void writeFrame(AMQDataBlock frame)
- {
- }
-
- public String getContextKey()
- {
- return null;
- }
-
- public void setContextKey(String contextKey)
- {
- }
-
- public AMQChannel getChannel(int channelId)
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new IllegalArgumentException("Invalid channel id: " + channelId);
- }
- else
- {
- return channel;
- }
- }
-
- public void addChannel(AMQChannel channel)
- {
- if (channel == null)
- {
- throw new IllegalArgumentException("Channel must not be null");
- }
- else
- {
- _channelMap.put(channel.getChannelId(), channel);
- }
- }
-
- public void closeChannel(int channelId) throws AMQException
- {
- }
-
- public void removeChannel(int channelId)
- {
- _channelMap.remove(channelId);
- }
-
- public void initHeartbeats(int delay)
- {
- }
-
- public void closeSession() throws AMQException
- {
- }
-
- public Object getKey()
- {
- return null;
- }
-
- public String getLocalFQDN()
- {
- return null;
- }
-
- public SaslServer getSaslServer()
- {
- return null;
- }
-
- public void setSaslServer(SaslServer saslServer)
- {
- }
-
- public FieldTable getClientProperties()
- {
- return null;
- }
-
- public void setClientProperties(FieldTable clientProperties)
- {
- }
-
-
- public Object getClientIdentifier()
- {
- return null;
- }
-
- public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeChannelResponse(int channelId, long requestId) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeSessionRequest(int replyCode, String replyText) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeSessionResponse(long requestId) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void setFrameMax(long size) {
- // TODO Auto-generated method stub
-
- }
-
- public long getFrameMax() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public QueueRegistry getQueueRegistry() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public ExchangeRegistry getExchangeRegistry() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public AMQStateManager getStateManager() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public byte getMajor() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public byte getMinor() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public boolean versionEquals(byte major, byte minor) {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void checkMethodBodyVersion(AMQMethodBody methodBody) {
- // TODO Auto-generated method stub
-
- }
-
- public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) {
- // TODO Auto-generated method stub
-
- }
-
- public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) {
- // TODO Auto-generated method stub
-
- }
-
- public int getConnectionId()
- {
- return _ConnectionId.get();
- }
-
- public void addSessionCloseTask(Task task)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void removeSessionCloseTask(Task task)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
deleted file mode 100644
index d3ec3c11d4..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-
-public class SubscriptionManagerTest extends TestCase
-{
- private final SubscriptionSet mgr = new SubscriptionSet();
-
- public void testBasicSubscriptionManagement()
- {
- assertTrue(mgr.isEmpty());
- assertFalse(mgr.hasActiveSubscribers());
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
- mgr.addSubscriber(s1);
- assertFalse(mgr.isEmpty());
- assertTrue(mgr.hasActiveSubscribers());
-
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
- mgr.addSubscriber(s2);
-
- s2.setSuspended(true);
- assertFalse(mgr.isEmpty());
- assertTrue(mgr.hasActiveSubscribers());
- assertTrue(s2.isSuspended());
- assertFalse(s1.isSuspended());
-
- s1.setSuspended(true);
- assertFalse(mgr.hasActiveSubscribers());
-
- mgr.removeSubscriber(new SubscriptionTestHelper("S1"));
- assertFalse(mgr.isEmpty());
- mgr.removeSubscriber(new SubscriptionTestHelper("S2"));
- assertTrue(mgr.isEmpty());
- }
-
- public void testRoundRobin()
- {
- SubscriptionTestHelper a = new SubscriptionTestHelper("A");
- SubscriptionTestHelper b = new SubscriptionTestHelper("B");
- SubscriptionTestHelper c = new SubscriptionTestHelper("C");
- SubscriptionTestHelper d = new SubscriptionTestHelper("D");
- mgr.addSubscriber(a);
- mgr.addSubscriber(b);
- mgr.addSubscriber(c);
- mgr.addSubscriber(d);
-
- for (int i = 0; i < 3; i++)
- {
- assertEquals(a, mgr.nextSubscriber(null));
- assertEquals(b, mgr.nextSubscriber(null));
- assertEquals(c, mgr.nextSubscriber(null));
- assertEquals(d, mgr.nextSubscriber(null));
- }
-
- c.setSuspended(true);
-
- for (int i = 0; i < 3; i++)
- {
- assertEquals(a, mgr.nextSubscriber(null));
- assertEquals(b, mgr.nextSubscriber(null));
- assertEquals(d, mgr.nextSubscriber(null));
- }
-
- mgr.removeSubscriber(a);
- d.setSuspended(true);
- c.setSuspended(false);
- Subscription e = new SubscriptionTestHelper("D");
- mgr.addSubscriber(e);
-
- for (int i = 0; i < 3; i++)
- {
- assertEquals(b, mgr.nextSubscriber(null));
- assertEquals(c, mgr.nextSubscriber(null));
- assertEquals(e, mgr.nextSubscriber(null));
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(SubscriptionManagerTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
deleted file mode 100644
index bcf54693d3..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-
-public class SubscriptionSetTest extends TestCase
-{
- /**
- * A SubscriptionSet that counts the number of items scanned.
- */
- static class TestSubscriptionSet extends SubscriptionSet
- {
- private int scanned = 0;
-
- void resetScanned()
- {
- scanned = 0;
- }
-
- protected void subscriberScanned()
- {
- ++scanned;
- }
-
- int getScanned()
- {
- return scanned;
- }
- }
-
- final SubscriptionTestHelper sub1 = new SubscriptionTestHelper("1");
- final SubscriptionTestHelper sub2 = new SubscriptionTestHelper("2");
- final SubscriptionTestHelper sub3 = new SubscriptionTestHelper("3");
-
- final SubscriptionTestHelper suspendedSub1 = new SubscriptionTestHelper("sus1", true);
- final SubscriptionTestHelper suspendedSub2 = new SubscriptionTestHelper("sus2", true);
- final SubscriptionTestHelper suspendedSub3 = new SubscriptionTestHelper("sus3", true);
-
- public void testNextMessage()
- {
- SubscriptionSet ss = new SubscriptionSet();
- assertNull(ss.nextSubscriber(null));
- assertEquals(0, ss.getCurrentSubscriber());
-
- ss.addSubscriber(sub1);
- assertEquals(sub1, ss.nextSubscriber(null));
- assertEquals(1, ss.getCurrentSubscriber());
- assertEquals(sub1, ss.nextSubscriber(null));
- assertEquals(1, ss.getCurrentSubscriber());
-
- ss.addSubscriber(sub2);
- ss.addSubscriber(sub3);
-
- assertEquals(sub2, ss.nextSubscriber(null));
- assertEquals(2, ss.getCurrentSubscriber());
-
- assertEquals(sub3, ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
- }
-
- public void testNextMessageWhenAllSuspended()
- {
- SubscriptionSet ss = createAllSuspendedSubscriptionSet();
- assertNull(ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
-
- assertNull(ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
- }
-
- private TestSubscriptionSet createAllSuspendedSubscriptionSet()
- {
- TestSubscriptionSet ss = new TestSubscriptionSet();
- ss.addSubscriber(suspendedSub1);
- ss.addSubscriber(suspendedSub2);
- ss.addSubscriber(suspendedSub3);
- return ss;
- }
-
- public void testNextMessageAfterRemove()
- {
- SubscriptionSet ss = new SubscriptionSet();
- ss.addSubscriber(suspendedSub1);
- ss.addSubscriber(suspendedSub2);
- ss.addSubscriber(sub3);
- assertEquals(sub3, ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
-
- assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1));
-
- assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here.
- assertEquals(2, ss.getCurrentSubscriber());
- }
-
- public void testNextMessageOverScanning()
- {
- TestSubscriptionSet ss = new TestSubscriptionSet();
- SubscriptionTestHelper sub = new SubscriptionTestHelper("test");
- ss.addSubscriber(suspendedSub1);
- ss.addSubscriber(sub);
- ss.addSubscriber(suspendedSub3);
- assertEquals(sub, ss.nextSubscriber(null));
- assertEquals(2, ss.getCurrentSubscriber());
- assertEquals(2, ss.getScanned());
-
- ss.resetScanned();
- sub.setSuspended(true);
- assertNull(ss.nextSubscriber(null));
- assertEquals(3, ss.getCurrentSubscriber());
- // Current implementation overscans by one item here.
- assertEquals(ss.size() + 1, ss.getScanned());
- }
-
- public void testNextMessageOverscanWorstCase() {
- TestSubscriptionSet ss = createAllSuspendedSubscriptionSet();
- ss.nextSubscriber(null);
- // Scans the subscriptions twice.
- assertEquals(ss.size() * 2, ss.getScanned());
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(SubscriptionSetTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
deleted file mode 100644
index fea3c93280..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-
-public class SubscriptionTestHelper implements Subscription
-{
- private final List<AMQMessage> messages;
- private final Object key;
- private boolean isSuspended;
-
- public SubscriptionTestHelper(Object key)
- {
- this(key, new ArrayList<AMQMessage>());
- }
-
- public SubscriptionTestHelper(final Object key, final boolean isSuspended)
- {
- this(key);
- setSuspended(isSuspended);
- }
-
- SubscriptionTestHelper(Object key, List<AMQMessage> messages)
- {
- this.key = key;
- this.messages = messages;
- }
-
- List<AMQMessage> getMessages()
- {
- return messages;
- }
-
- public void send(AMQMessage msg, AMQQueue queue)
- {
- messages.add(msg);
- }
-
- public void setSuspended(boolean suspended)
- {
- isSuspended = suspended;
- }
-
- public boolean isSuspended()
- {
- return isSuspended;
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- }
-
- public boolean hasFilters()
- {
- return false;
- }
-
- public boolean hasInterest(AMQMessage msg)
- {
- return true;
- }
-
- public Queue<AMQMessage> getPreDeliveryQueue()
- {
- return null;
- }
-
- public void enqueueForPreDelivery(AMQMessage msg)
- {
- //no-op
- }
-
- public boolean isAutoClose()
- {
- return false;
- }
-
- public void close()
- {
- //no-op
- }
-
- public boolean isBrowser()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public int hashCode()
- {
- return key.hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key);
- }
-
- public String toString()
- {
- return key.toString();
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
deleted file mode 100644
index ebe8e192a0..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-import org.apache.qpid.AMQException;
-
-import junit.framework.TestSuite;
-
-public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest
-{
- public SynchronizedDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","false");
- _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
- return suite;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
deleted file mode 100644
index bc0a8a7d64..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A message store that does nothing. Designed to be used in tests that do not want to use any message store
- * functionality.
- */
-public class SkeletonMessageStore implements MessageStore
-{
- private final AtomicLong _messageId = new AtomicLong(1);
-
- public void configure(String base, Configuration config) throws Exception
- {
- }
-
- public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public void put(AMQMessage msg)
- {
- }
-
- public void removeMessage(long messageId)
- {
- }
-
- public void createQueue(AMQQueue queue) throws AMQException
- {
- }
-
- public void removeQueue(String name) throws AMQException
- {
- }
-
- public void enqueueMessage(String name, long messageId) throws AMQException
- {
- }
-
- public void dequeueMessage(String name, long messageId) throws AMQException
- {
- }
-
- public void beginTran() throws AMQException
- {
- }
-
- public boolean inTran()
- {
- return false;
- }
-
- public void commitTran() throws AMQException
- {
- }
-
- public void abortTran() throws AMQException
- {
- }
-
- public List<AMQQueue> createQueues() throws AMQException
- {
- return null;
- }
-
- public long getNewMessageId()
- {
- return _messageId.getAndIncrement();
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
deleted file mode 100644
index f162506fed..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.AMQException;
-
-import junit.framework.TestCase;
-
-/**
- * Tests that reference counting works correctly with AMQMessage and the message store
- */
-public class TestReferenceCounting extends TestCase
-{
- private TestableMemoryMessageStore _store;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _store = new TestableMemoryMessageStore();
- }
-
- /**
- * Check that when the reference count is decremented the message removes itself from the store
- */
- public void testMessageGetsRemoved() throws AMQException
- {
- AMQMessage message = new AMQMessage(_store, null);
- _store.put(message);
- assertTrue(_store.getMessageMap().size() == 1);
- message.decrementReference();
- assertTrue(_store.getMessageMap().size() == 0);
- }
-
- public void testMessageRemains() throws AMQException
- {
- AMQMessage message = new AMQMessage(_store, null);
- _store.put(message);
- assertTrue(_store.getMessageMap().size() == 1);
- message.incrementReference();
- message.decrementReference();
- assertTrue(_store.getMessageMap().size() == 1);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TestReferenceCounting.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
deleted file mode 100644
index be7687a22c..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.queue.AMQMessage;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestableMemoryMessageStore extends MemoryMessageStore
-{
- public TestableMemoryMessageStore()
- {
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>();
- }
-
- public ConcurrentMap<Long, AMQMessage> getMessageMap()
- {
- return _messageMap;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
deleted file mode 100644
index ac5c60a931..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.txn;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-
-import java.util.LinkedList;
-
-import junit.framework.TestCase;
-
-public class TxnBufferTest extends TestCase
-{
- private final LinkedList<MockOp> ops = new LinkedList<MockOp>();
-
- public void testCommit() throws AMQException
- {
- MockStore store = new MockStore();
-
- TxnBuffer buffer = new TxnBuffer(store);
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- //check relative ordering
- MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit();
- buffer.enlist(op);
- buffer.enlist(op);
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
-
- buffer.commit();
-
- validateOps();
- store.validate();
- }
-
- public void testRollback() throws AMQException
- {
- MockStore store = new MockStore();
-
- TxnBuffer buffer = new TxnBuffer(store);
- buffer.enlist(new MockOp().expectRollback());
- buffer.enlist(new MockOp().expectRollback());
- buffer.enlist(new MockOp().expectRollback());
-
- buffer.rollback();
-
- validateOps();
- store.validate();
- }
-
- public void testCommitWithFailureDuringPrepare() throws AMQException
- {
- MockStore store = new MockStore();
- store.expectBegin().expectAbort();
-
- TxnBuffer buffer = new TxnBuffer(store);
- buffer.containsPersistentChanges();
- buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
- buffer.enlist(new TxnTester(store));
- buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
- buffer.enlist(new FailedPrepare());
- buffer.enlist(new MockOp());
-
- buffer.commit();
- validateOps();
- store.validate();
- }
-
- public void testCommitWithPersistance() throws AMQException
- {
- MockStore store = new MockStore();
- store.expectBegin().expectCommit();
-
- TxnBuffer buffer = new TxnBuffer(store);
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new TxnTester(store));
- buffer.containsPersistentChanges();
-
- buffer.commit();
- validateOps();
- store.validate();
- }
-
- private void validateOps()
- {
- for (MockOp op : ops)
- {
- op.validate();
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TxnBufferTest.class);
- }
-
- class MockOp implements TxnOp
- {
- final Object PREPARE = "PREPARE";
- final Object COMMIT = "COMMIT";
- final Object UNDO_PREPARE = "UNDO_PREPARE";
- final Object ROLLBACK = "ROLLBACK";
-
- private final LinkedList expected = new LinkedList();
-
- MockOp()
- {
- ops.add(this);
- }
-
- public void prepare()
- {
- assertEquals(expected.removeLast(), PREPARE);
- }
-
- public void commit()
- {
- assertEquals(expected.removeLast(), COMMIT);
- }
-
- public void undoPrepare()
- {
- assertEquals(expected.removeLast(), UNDO_PREPARE);
- }
-
- public void rollback()
- {
- assertEquals(expected.removeLast(), ROLLBACK);
- }
-
- private MockOp expect(Object optype)
- {
- expected.addFirst(optype);
- return this;
- }
-
- MockOp expectPrepare()
- {
- return expect(PREPARE);
- }
-
- MockOp expectCommit()
- {
- return expect(COMMIT);
- }
-
- MockOp expectUndoPrepare()
- {
- return expect(UNDO_PREPARE);
- }
-
- MockOp expectRollback()
- {
- return expect(ROLLBACK);
- }
-
- void validate()
- {
- assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
- }
-
- void clear()
- {
- expected.clear();
- }
- }
-
- class MockStore extends TestableMemoryMessageStore
- {
- final Object BEGIN = "BEGIN";
- final Object ABORT = "ABORT";
- final Object COMMIT = "COMMIT";
-
- private final LinkedList expected = new LinkedList();
- private boolean inTran;
-
- public void beginTran() throws AMQException
- {
- assertEquals(expected.removeLast(), BEGIN);
- inTran = true;
- }
-
- public void commitTran() throws AMQException
- {
- assertEquals(expected.removeLast(), COMMIT);
- inTran = false;
- }
-
- public void abortTran() throws AMQException
- {
- assertEquals(expected.removeLast(), ABORT);
- inTran = false;
- }
-
- public boolean inTran()
- {
- return inTran;
- }
-
- private MockStore expect(Object optype)
- {
- expected.addFirst(optype);
- return this;
- }
-
- MockStore expectBegin()
- {
- return expect(BEGIN);
- }
-
- MockStore expectCommit()
- {
- return expect(COMMIT);
- }
-
- MockStore expectAbort()
- {
- return expect(ABORT);
- }
-
- void clear()
- {
- expected.clear();
- }
-
- void validate()
- {
- assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
- }
- }
-
- class NullOp implements TxnOp
- {
- public void prepare() throws AMQException
- {
- }
- public void commit()
- {
- }
- public void undoPrepare()
- {
- }
- public void rollback()
- {
- }
- }
-
- class FailedPrepare extends NullOp
- {
- public void prepare() throws AMQException
- {
- throw new AMQException("Fail!");
- }
- }
-
- class TxnTester extends NullOp
- {
- private final MessageStore store;
-
- TxnTester(MessageStore store)
- {
- this.store = store;
- }
-
- public void prepare() throws AMQException
- {
- assertTrue("Expected prepare to be performed under txn", store.inTran());
- }
-
- public void commit()
- {
- assertTrue("Expected commit not to be performed under txn", !store.inTran());
- }
- }
-
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
deleted file mode 100644
index 1d17985ab5..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import org.apache.qpid.server.util.TimedRun;
-
-import java.util.concurrent.Callable;
-import java.util.Collection;
-
-public class AveragedRun implements Callable<RunStats>
-{
- private final RunStats stats = new RunStats();
- private final TimedRun test;
- private final int iterations;
-
- public AveragedRun(TimedRun test, int iterations)
- {
- this.test = test;
- this.iterations = iterations;
- }
-
- public RunStats call() throws Exception
- {
- for (int i = 0; i < iterations; i++)
- {
- stats.record(test.call());
- }
- return stats;
- }
-
- public void run() throws Exception
- {
- System.out.println(test + ": " + call());
- }
-
- public String toString()
- {
- return test.toString();
- }
-
- static void run(Collection<AveragedRun> tests) throws Exception
- {
- for(AveragedRun test : tests)
- {
- test.run();
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
deleted file mode 100644
index ec67fc68b3..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-public class RunStats
-{
- private long min = Long.MAX_VALUE;
- private long max;
- private long total;
- private int count;
-
- public void record(long time)
- {
- max = Math.max(time, max);
- min = Math.min(time, min);
- total += time;
- count++;
- }
-
- public long getMin()
- {
- return min;
- }
-
- public long getMax()
- {
- return max;
- }
-
- public long getAverage()
- {
- return total / count;
- }
-
- public String toString()
- {
- return "avg=" + getAverage() + ", min=" + min + ", max=" + max;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
deleted file mode 100644
index f801daf27c..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
-import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.AuthenticationManager;
-import org.apache.qpid.server.security.auth.NullAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.MapConfiguration;
-
-import java.util.HashMap;
-
-public class TestApplicationRegistry extends ApplicationRegistry
-{
- private QueueRegistry _queueRegistry;
-
- private ExchangeRegistry _exchangeRegistry;
-
- private ExchangeFactory _exchangeFactory;
-
- private ManagedObjectRegistry _managedObjectRegistry;
-
- private AuthenticationManager _authenticationManager;
-
- private MessageStore _messageStore;
-
- public TestApplicationRegistry()
- {
- super(new MapConfiguration(new HashMap()));
- }
-
- public void initialise() throws Exception
- {
- _managedObjectRegistry = new NoopManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
- _authenticationManager = new NullAuthenticationManager();
- _messageStore = new TestableMemoryMessageStore();
-
- _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
- }
-
- public Configuration getConfiguration()
- {
- return _configuration;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- public ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- public ManagedObjectRegistry getManagedObjectRegistry()
- {
- return _managedObjectRegistry;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
- public MessageStore getMessageStore()
- {
- return _messageStore;
- }
-}
-
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
deleted file mode 100644
index 1291380311..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import java.util.concurrent.Callable;
-
-public abstract class TimedRun implements Callable<Long>
-{
- private final String description;
-
- public TimedRun(String description)
- {
- this.description = description;
- }
-
- public Long call() throws Exception
- {
- setup();
- long start = System.currentTimeMillis();
- run();
- long stop = System.currentTimeMillis();
- teardown();
- return stop - start;
- }
-
- public String toString()
- {
- return description;
- }
-
- protected void setup() throws Exception{}
- protected void teardown() throws Exception{}
- protected abstract void run() throws Exception;
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java b/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java
deleted file mode 100644
index e859fac4af..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-
-import org.apache.qpid.client.transport.TransportConnection;
-
-public class VMBrokerSetup extends TestSetup
-{
- public VMBrokerSetup(Test t)
- {
- super(t);
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- try
- {
- TransportConnection.createVMBroker(1);
- }
- catch (Exception e)
- {
- fail("Unable to create broker: " + e);
- }
- }
-
- protected void tearDown() throws Exception
- {
- TransportConnection.killVMBroker(1);
- super.tearDown();
- }
-}