diff options
Diffstat (limited to 'java/systests')
37 files changed, 1384 insertions, 940 deletions
diff --git a/java/systests/distribution/pom.xml b/java/systests/distribution/pom.xml new file mode 100644 index 0000000000..bff1e0d9e5 --- /dev/null +++ b/java/systests/distribution/pom.xml @@ -0,0 +1,111 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests-distribution</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid System Tests Distribution</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + <java.source.version>1.5</java.source.version> + <qpid.version>${pom.version}</qpid.version> + <qpid.targetDir>${project.build.directory}</qpid.targetDir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests</artifactId> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>${java.source.version}</source> + <target>${java.source.version}</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>${assembly.version}</version> + <configuration> + <finalName>qpid-${pom.version}</finalName> + <outputDirectory>${qpid.targetDir}</outputDirectory> + <tarLongFileMode>gnu</tarLongFileMode> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <finalName>qpid-systests</finalName> + <archive> + <manifest> + <addClasspath>true</addClasspath> + </manifest> + </archive> + </configuration> + </plugin> + + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>distribution-package</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <descriptors> + <descriptor>src/main/assembly/systests.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + </build> + +</project> diff --git a/java/systests/distribution/src/main/assembly/systests.xml b/java/systests/distribution/src/main/assembly/systests.xml new file mode 100644 index 0000000000..2d6a6d8572 --- /dev/null +++ b/java/systests/distribution/src/main/assembly/systests.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<assembly> + <id>system-test-java</id> + <includeBaseDirectory>false</includeBaseDirectory> + <formats> + <format>tar.gz</format> + <format>zip</format> + </formats> + +<fileSets> + <!-- Apache Licensing Details--> + <fileSet> + <directory>../../resources</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>DISCLAIMER</include> + <include>LICENSE.txt</include> + <include>NOTICE.txt</include> + <include>README.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>..</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>*.txt</include> + </includes> + </fileSet> + <fileSet> + <directory>../../release-docs</directory> + <outputDirectory>qpid-${qpid.version}/docs</outputDirectory> + <includes> + <include>RELEASE_NOTES.txt</include> + </includes> + </fileSet> + + <!-- Scripts to run the system tests--> + <fileSet> + <directory>../bin</directory> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <includes> + <include>*</include> + </includes> + </fileSet> + + <!-- Include source files in easy access form --> + <fileSet> + <directory>../src/main</directory> + <outputDirectory>qpid-${qpid.version}/src</outputDirectory> + <includes> + <include>**/*.java</include> + <include>**/*.log4j</include> + </includes> + </fileSet> + <fileSet> + <directory>target</directory> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <includes> + <include>qpid-systests.jar</include> + </includes> + </fileSet> + </fileSets> + + <dependencySets> + <dependencySet> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <unpack>false</unpack> + <excludes> + <exclude>org.apache.qpid:qpid-systests-distribution</exclude> + </excludes> + </dependencySet> + </dependencySets> +</assembly> diff --git a/java/systests/pom.xml b/java/systests/pom.xml index c73e5f2c44..614166754c 100644 --- a/java/systests/pom.xml +++ b/java/systests/pom.xml @@ -39,23 +39,20 @@ <dependencies> <dependency> <groupId>org.apache.qpid</groupId> - <artifactId>qpid-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> <artifactId>qpid-client</artifactId> + <type>jar</type> </dependency> + <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker</artifactId> + <type>jar</type> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> + <scope>compile</scope> </dependency> </dependencies> @@ -65,26 +62,30 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> - <systemProperties> - <property> - <name>amqj.noAutoCreateVMBroker</name> - <value>true</value> - </property> - <property> - <name>amqj.logging.level</name> - <value>${amqj.logging.level}</value> - </property> - <property> - <name>log4j.debug</name> - <value>true</value> - </property> - <property> - <name>log4j.configuration</name> - <value>file:///${basedir}/src/test/java/log4j.properties</value> - </property> - </systemProperties> + <skip>true</skip> </configuration> </plugin> + </plugins> + + <!-- Include source files in built jar --> + <resources> + <resource> + <targetPath>src/</targetPath> + <filtering>false</filtering> + <directory>src/main/java</directory> + <includes> + <include>**/*.java</include> + </includes> + </resource> + <resource> + <targetPath>src/</targetPath> + <filtering>false</filtering> + <directory>src/main/java</directory> + <includes> + <include>systests.log4j</include> + </includes> + </resource> + </resources> </build> </project> diff --git a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index 21ad1b6a7f..62dc44e23f 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -23,6 +23,8 @@ import org.apache.qpid.server.management.ManagedBroker; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.framing.AMQShortString; public class AMQBrokerManagerMBeanTest extends TestCase { @@ -35,40 +37,44 @@ public class AMQBrokerManagerMBeanTest extends TestCase String exchange2 = "testExchange2_" + System.currentTimeMillis(); String exchange3 = "testExchange3_" + System.currentTimeMillis(); - assertTrue(_exchangeRegistry.getExchange(exchange1) == null); - assertTrue(_exchangeRegistry.getExchange(exchange2) == null); - assertTrue(_exchangeRegistry.getExchange(exchange3) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); - ManagedBroker mbean = new AMQBrokerManagerMBean(); + VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject()); mbean.createNewExchange(exchange1,"direct",false, false); mbean.createNewExchange(exchange2,"topic",false, false); mbean.createNewExchange(exchange3,"headers",false, false); - assertTrue(_exchangeRegistry.getExchange(exchange1) != null); - assertTrue(_exchangeRegistry.getExchange(exchange2) != null); - assertTrue(_exchangeRegistry.getExchange(exchange3) != null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) != null); mbean.unregisterExchange(exchange1); mbean.unregisterExchange(exchange2); mbean.unregisterExchange(exchange3); - assertTrue(_exchangeRegistry.getExchange(exchange1) == null); - assertTrue(_exchangeRegistry.getExchange(exchange2) == null); - assertTrue(_exchangeRegistry.getExchange(exchange3) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); } public void testQueueOperations() throws Exception { String queueName = "testQueue_" + System.currentTimeMillis(); - ManagedBroker mbean = new AMQBrokerManagerMBean(); + VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject()); - assertTrue(_queueRegistry.getQueue(queueName) == null); + assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); mbean.createNewQueue(queueName, false, "test", true); - assertTrue(_queueRegistry.getQueue(queueName) != null); + assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null); mbean.deleteQueue(queueName); - assertTrue(_queueRegistry.getQueue(queueName) == null); + assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); } @Override @@ -76,7 +82,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase { super.setUp(); IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _queueRegistry = appRegistry.getQueueRegistry(); - _exchangeRegistry = appRegistry.getExchangeRegistry(); + _queueRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry(); + _exchangeRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry(); } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 1a15ca7561..5451d64b45 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,18 @@ */ package org.apache.qpid.server.ack; +import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; - -import junit.framework.TestCase; +import java.util.*; public class TxAckTest extends TestCase { @@ -87,18 +89,54 @@ public class TxAckTest extends TestCase private class Scenario { - private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>(); - private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages); + private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000); private final TxAck _op = new TxAck(_map); private final List<Long> _acked; private final List<Long> _unacked; + private StoreContext _storeContext = new StoreContext(); Scenario(int messageCount, List<Long> acked, List<Long> unacked) { + MessageStore messageStore = new TestableMemoryMessageStore(); + TransactionalContext txnContext = new NonTransactionalContext(messageStore, + _storeContext, null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); for(int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; - _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag)); + // TODO: fix hardcoded protocol version data + TestMessage message = new TestMessage(deliveryTag, messageStore, + new MessageTransferBody( + (byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + null, // AMQShortString appId + null, // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + null, // AMQShortString exchange + 0L, // long expiration + false, // boolean immediate + false, // boolean mandatory + null, // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + null, // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null), // AMQShortString userId + txnContext); + _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); } _acked = acked; _unacked = unacked; @@ -113,7 +151,7 @@ public class TxAckTest extends TestCase { for(long tag : tags) { - UnacknowledgedMessage u = _messages.get(tag); + UnacknowledgedMessage u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); ((TestMessage) u.message).assertCountEquals(expected); } @@ -122,11 +160,11 @@ public class TxAckTest extends TestCase void prepare() throws AMQException { _op.consolidate(); - _op.prepare(); + _op.prepare(_storeContext); assertCount(_acked, -1); assertCount(_unacked, 0); - + } void undoPrepare() { @@ -140,16 +178,16 @@ public class TxAckTest extends TestCase void commit() { _op.consolidate(); - _op.commit(); - + _op.commit(_storeContext); + //check acked messages are removed from map - HashSet<Long> keys = new HashSet<Long>(_messages.keySet()); + Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags()); keys.retainAll(_acked); assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); //check unacked messages are still in map keys = new HashSet<Long>(_unacked); - keys.removeAll(_messages.keySet()); + keys.removeAll(_map.getDeliveryTags()); assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); } } @@ -159,9 +197,9 @@ public class TxAckTest extends TestCase private final long _tag; private int _count; - TestMessage(long tag) + TestMessage(long tag, MessageStore store, MessageTransferBody transferBody, TransactionalContext txnContext) { - super(new TestableMemoryMessageStore(), null); + super(store, transferBody, txnContext); _tag = tag; } @@ -170,7 +208,7 @@ public class TxAckTest extends TestCase _count++; } - public void decrementReference() + public void decrementReference(StoreContext context) { _count--; } diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java new file mode 100644 index 0000000000..a047afa978 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -0,0 +1,330 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.log4j.Logger; + +import java.util.*; + +public class AbstractHeadersExchangeTestBase extends TestCase +{ + private static TransactionalContext txnContext = null; + + private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); + + private final HeadersExchange exchange = new HeadersExchange(); + protected final Set<TestQueue> queues = new HashSet<TestQueue>(); + + /** + * Not used in this test, just there to stub out the routing calls + */ + private MessageStore _store = new MemoryMessageStore(); + + private StoreContext _storeContext = new StoreContext(); + + private MessageHandleFactory _handleFactory = new MessageHandleFactory(); + + private int count; + + public void testDoNothing() + { + // this is here only to make junit under Eclipse happy + } + + protected TestQueue bindDefault(String... bindings) throws AMQException + { + return bind("Queue" + (++count), bindings); + } + + protected TestQueue bind(String queueName, String... bindings) throws AMQException + { + return bind(queueName, getHeaders(bindings)); + } + + protected TestQueue bind(String queue, FieldTable bindings) throws AMQException + { + return bind(new TestQueue(new AMQShortString(queue)), bindings); + } + + protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException + { + return bind(queue, getHeaders(bindings)); + } + + protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException + { + queues.add(queue); + exchange.registerQueue(null, queue, bindings); + return queue; + } + + + protected void route(Message m) throws AMQException + { + m.route(exchange); +// m.routingComplete(_store, _storeContext, _handleFactory); + } + + protected void routeAndTest(Message m, TestQueue... expected) throws AMQException + { + routeAndTest(m, false, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException + { + routeAndTest(m, expectReturn, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException + { + routeAndTest(m, false, expected); + } + + protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException + { + try + { + route(m); + assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + for (TestQueue q : queues) + { + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } + } + } + + catch (NoRouteException ex) + { + assertTrue("Expected "+m+" not to be returned",expectReturn); + } + + } + + static FieldTable getHeaders(String... entries) + { + FieldTable headers = FieldTableFactory.newFieldTable(); + for (String s : entries) + { + String[] parts = s.split("=", 2); + headers.setObject(parts[0], parts.length > 1 ? parts[1] : ""); + } + return headers; + } + + static MessageTransferBody getPublishRequest(String id) + { + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) + // TODO: Establish some way to determine the version for the test. + MessageTransferBody request = new MessageTransferBody( + (byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + new AMQShortString(id), // AMQShortString appId + null, // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + null, // AMQShortString exchange + 0L, // long expiration + false, // boolean immediate + false, // boolean mandatory + null, // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + null, // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null); // AMQShortString userId + + return request; + } + +// static ContentHeaderBody getContentHeader(FieldTable headers) +// { +// ContentHeaderBody header = new ContentHeaderBody(); +// header.properties = getProperties(headers); +// return header; +// } +// +// static BasicContentHeaderProperties getProperties(FieldTable headers) +// { +// BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); +// properties.setHeaders(headers); +// return properties; +// } + + static class TestQueue extends AMQQueue + { + final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + + public TestQueue(AMQShortString name) throws AMQException + { + super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); + } + + /** + * We override this method so that the default behaviour, which attempts to use a delivery manager, is + * not invoked. It is unnecessary since for this test we only care to know whether the message was + * sent to the queue; the queue processing logic is not being tested. + * @param msg + * @throws AMQException + */ + public void process(StoreContext context, AMQMessage msg) throws AMQException + { + messages.add(new HeadersExchangeTest.Message(msg)); + } + } + + /** + * Just add some extra utility methods to AMQMessage to aid testing. + */ + static class Message extends AMQMessage + { + private static MessageStore _messageStore = new SkeletonMessageStore(); + + private static StoreContext _storeContext = new StoreContext(); + + private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + + Message(String id, String... headers) throws AMQException + { + this(new MessageTransferBody( + (byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + null, // AMQShortString appId + getHeaders(headers), // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + null, // AMQShortString exchange + 0L, // long expiration + false, // boolean immediate + false, // boolean mandatory + new AMQShortString(id), // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + null, // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null)); // AMQShortString userId + } + +// Message(String id, FieldTable headers) throws AMQException +// { +// this(getPublishRequest(id), getContentHeader(headers), null); +// } + + private Message(MessageTransferBody publish) throws AMQException + { + super(_messageStore, publish, _txnContext); + } + + private Message(AMQMessage msg) throws AMQException + { + super(msg); + } + + void route(Exchange exchange) throws AMQException + { + exchange.route(this); + } + + boolean isInQueue(TestQueue queue) + { + return queue.messages.contains(this); + } + + public int hashCode() + { + return getKey().hashCode(); + } + + public boolean equals(Object o) + { + return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); + } + + private boolean equals(HeadersExchangeTest.Message m) + { + return getKey().equals(m.getKey()); + } + + public String toString() + { + return getKey().toString(); + } + + private Object getKey() + { +// try +// { + return getTransferBody().getRoutingKey(); +// } +// catch (AMQException e) +// { +// _log.error("Error getting routing key: " + e, e); +// return null; +// } + } + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java index bb88d2e8d0..9653155a51 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -21,7 +21,11 @@ import junit.framework.TestCase; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; @@ -34,6 +38,7 @@ public class ExchangeMBeanTest extends TestCase { private AMQQueue _queue; private QueueRegistry _queueRegistry; + private VirtualHost _virtualHost; /** * Test for direct exchange mbean @@ -43,12 +48,12 @@ public class ExchangeMBeanTest extends TestCase public void testDirectExchangeMBean() throws Exception { DestNameExchange exchange = new DestNameExchange(); - exchange.initialise("amq.direct", false, 0, true); + exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; - mbean.createNewBinding(_queue.getName(), "binding1"); - mbean.createNewBinding(_queue.getName(), "binding2"); + mbean.createNewBinding(_queue.getName().toString(), "binding1"); + mbean.createNewBinding(_queue.getName().toString(), "binding2"); TabularData data = mbean.bindings(); ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values()); @@ -70,12 +75,12 @@ public class ExchangeMBeanTest extends TestCase public void testTopicExchangeMBean() throws Exception { DestWildExchange exchange = new DestWildExchange(); - exchange.initialise("amq.topic", false, 0, true); + exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; - mbean.createNewBinding(_queue.getName(), "binding1"); - mbean.createNewBinding(_queue.getName(), "binding2"); + mbean.createNewBinding(_queue.getName().toString(), "binding1"); + mbean.createNewBinding(_queue.getName().toString(), "binding2"); TabularData data = mbean.bindings(); ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values()); @@ -97,19 +102,19 @@ public class ExchangeMBeanTest extends TestCase public void testHeadersExchangeMBean() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise("amq.headers", false, 0, true); + exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; - mbean.createNewBinding(_queue.getName(), "key1=binding1,key2=binding2"); - mbean.createNewBinding(_queue.getName(), "key3=binding3"); + mbean.createNewBinding(_queue.getName().toString(), "key1=binding1,key2=binding2"); + mbean.createNewBinding(_queue.getName().toString(), "key3=binding3"); TabularData data = mbean.bindings(); ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values()); assertTrue(list.size() == 2); // test general exchange properties - assertEquals(mbean.getName(), "amq.headers"); + assertEquals(mbean.getName(), "amq.match"); assertEquals(mbean.getExchangeType(), "headers"); assertTrue(mbean.getTicketNo() == 0); assertTrue(!mbean.isDurable()); @@ -120,8 +125,11 @@ public class ExchangeMBeanTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); - _queue = new AMQQueue("testQueue", false, "ExchangeMBeanTest", false, _queueRegistry); + + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _virtualHost.getQueueRegistry(); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost); _queueRegistry.registerQueue(_queue); } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 00a645628f..7bfc2e5386 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -21,16 +21,17 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.util.NullApplicationRegistry; +import org.apache.qpid.framing.MessageTransferBody; public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase { protected void setUp() throws Exception { super.setUp(); - ApplicationRegistry.initialise(new TestApplicationRegistry()); + ApplicationRegistry.initialise(new NullApplicationRegistry()); } public void testSimple() throws AMQException @@ -54,14 +55,16 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase Message m7 = new Message("Message7", "XXXXX"); - MessageTransferBody tb7 = m7.getTransferBody(); - tb7.mandatory = true; + MessageTransferBody mtb7 = m7.getTransferBody(); + mtb7.mandatory = true; routeAndTest(m7,true); Message m8 = new Message("Message8", "F0000"); - MessageTransferBody tb8 = m8.getTransferBody(); - tb8.mandatory = true; + MessageTransferBody mtb8 = m8.getTransferBody(); + mtb8.mandatory = true; routeAndTest(m8,false,q1); + + } public void testAny() throws AMQException @@ -85,10 +88,10 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase bindDefault("F0000"); Message m1 = new Message("Message1", "XXXXX"); Message m2 = new Message("Message2", "F0000"); - MessageTransferBody tb1 = m1.getTransferBody(); - tb1.mandatory = true; - MessageTransferBody tb2 = m2.getTransferBody(); - tb2.mandatory = true; + MessageTransferBody mtb1 = m1.getTransferBody(); + mtb1.mandatory = true; + MessageTransferBody mtb2 = m2.getTransferBody(); + mtb2.mandatory = true; routeAndTest(m1,true); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index 546c61eda0..c8271f1549 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -4,6 +4,7 @@ import junit.framework.TestCase; import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
@@ -38,7 +39,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex {
super.setUp();
TransportConnection.createVMBroker(1);
- ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
}
protected void tearDown() throws Exception
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index f84f14f28d..3e23b9ae42 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -21,17 +21,21 @@ import junit.framework.TestCase; import org.apache.mina.common.IoSession; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import javax.management.JMException; -import javax.management.openmbean.OpenDataException; /** * Test class to test MBean operations for AMQMinaProtocolSession. @@ -45,23 +49,28 @@ public class AMQProtocolSessionMBeanTest extends TestCase private QueueRegistry _queueRegistry; private ExchangeRegistry _exchangeRegistry; private AMQProtocolSessionMBean _mbean; + private VirtualHost _virtualHost; public void testChannels() throws Exception { // check the channel count is correct int channelCount = _mbean.channels().size(); - assertTrue(channelCount == 2); - _protocolSession.addChannel(new AMQChannel(2,_protocolSession, _messageStore, null,null)); + assertTrue(channelCount == 1); + AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()), + false, new AMQShortString("test"), true, _virtualHost); + AMQChannel channel = new AMQChannel(2, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/); + channel.setDefaultQueue(queue); + _protocolSession.addChannel(channel); channelCount = _mbean.channels().size(); - assertTrue(channelCount == 3); + assertTrue(channelCount == 2); // general properties test _mbean.setMaximumNumberOfChannels(1000L); assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L); // check APIs - AMQChannel channel3 = new AMQChannel(3,_protocolSession, _messageStore, null,null); - channel3.setTransactional(true); + AMQChannel channel3 = new AMQChannel(3, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/); + channel3.setLocalTransactional(); _protocolSession.addChannel(channel3); _mbean.rollbackTransactions(2); _mbean.rollbackTransactions(3); @@ -80,31 +89,33 @@ public class AMQProtocolSessionMBeanTest extends TestCase } // check if closing of session works - _protocolSession.addChannel(new AMQChannel(5,_protocolSession, _messageStore, null,null)); + _protocolSession.addChannel(new AMQChannel(5, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/)); _mbean.closeConnection(); - channelCount = _mbean.channels().size(); - assertTrue(channelCount == 0); - try - { +// try +// { + channelCount = _mbean.channels().size(); + assertTrue(channelCount == 0); // session is now closed so adding another channel should throw an exception - _protocolSession.addChannel(new AMQChannel(6,_protocolSession, _messageStore, null,null)); + _protocolSession.addChannel(new AMQChannel(6, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/)); fail(); - } - catch(IllegalStateException ex) - { - System.out.println("expected exception is thrown :" + ex.getMessage()); - } +// } +// catch(AMQException ex) +// { +// System.out.println("expected exception is thrown :" + ex.getMessage()); +// } } @Override protected void setUp() throws Exception { - super.setUp(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory()); + super.setUp(); + _channel = new AMQChannel(1, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _virtualHost.getQueueRegistry(); + _exchangeRegistry = _virtualHost.getExchangeRegistry(); _mockIOSession = new MockIoSession(); - _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true)); - _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null); + _protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true)); _protocolSession.addChannel(_channel); _mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject(); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java index cf6366b513..cf6366b513 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index ae398961be..92ba9db728 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -17,20 +17,24 @@ */ package org.apache.qpid.server.queue; -import java.util.Collections; - -import javax.management.JMException; - import junit.framework.TestCase; - -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.MessageHeaders; -import org.apache.qpid.framing.Content; import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.StoreContext; + +import javax.management.JMException; +import java.util.LinkedList; +import java.util.HashSet; /** * Test class to test AMQQueueMBean attribtues and operations @@ -41,8 +45,14 @@ public class AMQQueueMBeanTest extends TestCase private AMQQueueMBean _queueMBean; private QueueRegistry _queueRegistry; private MessageStore _messageStore = new SkeletonMessageStore(); + private StoreContext _storeContext = new StoreContext(); + private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); private MockProtocolSession _protocolSession; private AMQChannel _channel; + private VirtualHost _virtualHost; public void testMessageCount() throws Exception { @@ -50,8 +60,7 @@ public class AMQQueueMBeanTest extends TestCase sendMessages(messageCount); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); - // each message is 1K - assertTrue(_queueMBean.getQueueDepth() == messageCount); + assertTrue(_queueMBean.getQueueDepth() == 10); _queueMBean.deleteMessageFromTop(); assertTrue(_queueMBean.getMessageCount() == messageCount - 1); @@ -67,12 +76,12 @@ public class AMQQueueMBeanTest extends TestCase SubscriptionManager mgr = _queue.getSubscribers(); assertFalse(mgr.hasActiveSubscribers()); assertTrue(_queueMBean.getActiveConsumerCount() == 0); - - _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null); + + _channel = new AMQChannel(1, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/); + _protocolSession = new MockProtocolSession(_messageStore); _protocolSession.addChannel(_channel); - - _queue.registerProtocolSession(_protocolSession, 1, "test", false, null, false, false); + + _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; @@ -138,8 +147,9 @@ public class AMQQueueMBeanTest extends TestCase AMQMessage msg = message(false); long id = msg.getMessageId(); - _queue.clearQueue(); - _queue.deliver(msg); + _queue.clearQueue(_storeContext); + _queue.process(_storeContext, msg); +// msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); _queueMBean.viewMessageContent(id); try { @@ -156,46 +166,46 @@ public class AMQQueueMBeanTest extends TestCase { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Establish some way to determine the version for the test. - - MessageHeaders messageHeaders = new MessageHeaders(); - - ByteBuffer buffer = ByteBuffer.wrap(new byte[1000]); - Content body = new Content(Content.TypeEnum.INLINE_T, buffer); - MessageTransferBody methodBody = MessageTransferBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - messageHeaders.getAppId(), // String appId - messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders - body, // Content body - messageHeaders.getEncoding(), // String contentEncoding - messageHeaders.getContentType(), // String contentType - messageHeaders.getCorrelationId(), // String correlationId - (short)1, // short deliveryMode - "someExchange", // String destination - "someExchange", // String exchange - messageHeaders.getExpiration(), // long expiration - immediate, // boolean immediate - false, // boolean mandatory - "", // String messageId - (short)0, // short priority - false, // boolean redelivered - messageHeaders.getReplyTo(), // String replyTo - "rk", // String routingKey - new String("abc123").getBytes(), // byte[] securityToken - 0, // int ticket - messageHeaders.getTimestamp(), // long timestamp - messageHeaders.getTransactionId(), // String transactionId - 0, // long ttl - messageHeaders.getUserId()); // String userId - - return new AMQMessage(_messageStore, methodBody, Collections.singletonList(buffer)); + MessageTransferBody publish = new MessageTransferBody( + (byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + null, // AMQShortString appId + null, // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + null, // AMQShortString exchange + 0L, // long expiration + immediate, // boolean immediate + false, // boolean mandatory + null, // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + null, // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null); // AMQShortString userId + + return new AMQMessage(_messageStore, publish, _transactionalContext); } @Override protected void setUp() throws Exception { super.setUp(); - _queueRegistry = new DefaultQueueRegistry(); - _queue = new AMQQueue("testQueue", false, "AMQueueMBeanTest", false, _queueRegistry); + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _virtualHost.getQueueRegistry(); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); _queueMBean = new AMQQueueMBean(_queue); } @@ -205,11 +215,15 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false); - ; } for (int i = 0; i < messageCount; i++) { - _queue.deliver(messages[i]); + _queue.process(_storeContext, messages[i]); + } + + for (int i = 0; i < messages.length; i++) + { +// messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); } } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index ca777ca535..a812257203 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,28 +20,34 @@ */ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; - import junit.framework.TestCase; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.MessageHeaders; -import org.apache.qpid.framing.Content; import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.util.NullApplicationRegistry; + +import java.util.LinkedList; +import java.util.Set; +import java.util.HashSet; /** * Tests that acknowledgements are handled correctly. */ public class AckTest extends TestCase { + private TransactionalContext txnContext = null; + private static final Logger _log = Logger.getLogger(AckTest.class); private SubscriptionImpl _subscription; @@ -50,26 +56,30 @@ public class AckTest extends TestCase private TestableMemoryMessageStore _messageStore; + private StoreContext _storeContext = new StoreContext(); + private AMQChannel _channel; private SubscriptionSet _subscriptionManager; private AMQQueue _queue; + private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag"); + public AckTest() throws Exception { - ApplicationRegistry.initialise(new TestApplicationRegistry()); + ApplicationRegistry.initialise(new NullApplicationRegistry()); } protected void setUp() throws Exception { super.setUp(); - _messageStore = new TestableMemoryMessageStore(); + _messageStore = new TestableMemoryMessageStore(); _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(5,_protocolSession, _messageStore, null/*dont need exchange registry*/,null); + _channel = new AMQChannel(5, _protocolSession, null/*MessageStore*/, null/*ExchangeRegistry*/, null/*methodListener*/); _protocolSession.addChannel(_channel); - _subscriptionManager = new SubscriptionSet(); - _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager); + _subscriptionManager = new SubscriptionSet(); + _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager); } private void publishMessages(int count) throws AMQException @@ -79,41 +89,54 @@ public class AckTest extends TestCase private void publishMessages(int count, boolean persistent) throws AMQException { + TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Establish some way to determine the version for the test. - //AMQMessage(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents) - - MessageHeaders messageHeaders = new MessageHeaders(); - - MessageTransferBody methodBody = MessageTransferBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - messageHeaders.getAppId(), // String appId - messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders - new Content(), // Content body - messageHeaders.getEncoding(), // String contentEncoding - messageHeaders.getContentType(), // String contentType - messageHeaders.getCorrelationId(), // String correlationId - persistent ? (short)2: (short)1, // short deliveryMode - "someExchange", // String destination - "someExchange", // String exchange - messageHeaders.getExpiration(), // long expiration - false, // boolean immediate - false, // boolean mandatory - "" + i, // String messageId - (short)0, // short priority - false, // boolean redelivered - messageHeaders.getReplyTo(), // String replyTo - "rk", // String routingKey - new String("abc123").getBytes(), // byte[] securityToken - 0, // int ticket - messageHeaders.getTimestamp(), // long timestamp - messageHeaders.getTransactionId(), // String transactionId - 0, // long ttl - messageHeaders.getUserId()); // String userId - - AMQMessage msg = new AMQMessage(_messageStore, methodBody, new ArrayList()); + MessageTransferBody publishBody = new MessageTransferBody( + (byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + null, // AMQShortString appId + null, // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + new AMQShortString("someExchange"), // AMQShortString exchange + 0L, // long expiration + false, // boolean immediate + false, // boolean mandatory + null, // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + new AMQShortString("rk"), // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null); // AMQShortString userId + AMQMessage msg = new AMQMessage(_messageStore, publishBody, txnContext); + if (persistent) + { + //This is DeliveryMode.PERSISTENT + msg.setDeliveryMode((byte) 2); + } + // we increment the reference here since we are not delivering the messaging to any queues, which is where + // the reference is normally incremented. The test is easier to construct if we have direct access to the + // subscription + msg.incrementReference(); +// msg.routingComplete(_messageStore, _storeContext, factory); + // we manually send the message to the subscription _subscription.send(msg, _queue); } } @@ -124,25 +147,26 @@ public class AckTest extends TestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); final int msgCount = 10; publishMessages(msgCount, true); - Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMap().size() == msgCount); + assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); - for (int i = 1; i <= map.size(); i++) + Set<Long> deliveryTagSet = map.getDeliveryTags(); + int i = 1; + for (long deliveryTag : deliveryTagSet) { - Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); - assertTrue(entry.getKey() == i); - UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(deliveryTag == i); + i++; + UnacknowledgedMessage unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.queue == _queue); } assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMap().size() == msgCount); + assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); } /** @@ -151,13 +175,13 @@ public class AckTest extends TestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false); + _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false); final int msgCount = 10; publishMessages(msgCount); - Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMap().size() == 0); + assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); } /** @@ -166,21 +190,20 @@ public class AckTest extends TestCase */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); final int msgCount = 10; publishMessages(msgCount); _channel.acknowledgeMessage(5, false); - Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount - 1); - Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); + Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; - while (i <= map.size()) + for (long deliveryTag : deliveryTagSet) { - Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); - assertTrue(entry.getKey() == i); - UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(deliveryTag == i); + UnacknowledgedMessage unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.queue == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) @@ -196,21 +219,20 @@ public class AckTest extends TestCase */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); final int msgCount = 10; publishMessages(msgCount); _channel.acknowledgeMessage(5, true); - Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); - Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); + Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; - while (i <= map.size()) + for (long deliveryTag : deliveryTagSet) { - Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); - assertTrue(entry.getKey() == i + 5); - UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(deliveryTag == i + 5); + UnacknowledgedMessage unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.queue == _queue); ++i; } @@ -221,21 +243,20 @@ public class AckTest extends TestCase */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); final int msgCount = 10; publishMessages(msgCount); _channel.acknowledgeMessage(0, true); - Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); + Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; - while (i <= map.size()) + for (long deliveryTag : deliveryTagSet) { - Map.Entry<Long, UnacknowledgedMessage> entry = it.next(); - assertTrue(entry.getKey() == i + 5); - UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(deliveryTag == i + 5); + UnacknowledgedMessage unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.queue == _queue); ++i; } @@ -246,7 +267,7 @@ public class AckTest extends TestCase int lowMark = 5; int highMark = 10; - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); _channel.setPrefetchLowMarkCount(lowMark); _channel.setPrefetchHighMarkCount(highMark); @@ -259,7 +280,7 @@ public class AckTest extends TestCase // which have not bee received so will be queued up in the channel // which should be suspended assertTrue(_subscription.isSuspended()); - Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == highMark); //acknowledge messages so we are just above lowMark @@ -297,7 +318,7 @@ public class AckTest extends TestCase public void testPrefetch() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); _channel.setPrefetchCount(5); assertTrue(_channel.getPrefetchCount() == 5); @@ -308,7 +329,7 @@ public class AckTest extends TestCase // at this point we should have sent out only 5 messages with a further 5 queued // up in the channel which should now be suspended assertTrue(_subscription.isSuspended()); - Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); _channel.acknowledgeMessage(5, true); assertTrue(!_subscription.isSuspended()); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java index fe8960c872..6f3d42d090 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,11 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.*; import java.util.concurrent.Executor; @@ -49,11 +53,15 @@ public class ConcurrencyTest extends MessageTestHelper private boolean isComplete; private boolean failed; + private VirtualHost _virtualHost; public ConcurrencyTest() throws Exception { - _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); + + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, + _virtualHost)); } public void testConcurrent1() throws InterruptedException, AMQException @@ -186,7 +194,7 @@ public class ConcurrencyTest extends MessageTestHelper AMQMessage msg = nextMessage(); if (msg != null) { - _deliveryMgr.deliver(toString(), msg); + _deliveryMgr.deliver(null, new AMQShortString(toString()), msg); } } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index 3631264e5a..e1be640c8e 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import junit.framework.TestSuite; @@ -29,6 +31,8 @@ abstract public class DeliveryManagerTest extends MessageTestHelper { protected final SubscriptionSet _subscriptions = new SubscriptionSet(); protected DeliveryManager _mgr; + protected StoreContext _storeContext = new StoreContext(); + private static final AMQShortString DEFAULT_QUEUE_NAME = new AMQShortString("Me"); public DeliveryManagerTest() throws Exception { @@ -45,7 +49,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); } SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); @@ -55,7 +59,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = batch; i < messages.length; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); } assertTrue(s1.getMessages().isEmpty()); @@ -93,7 +97,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); } assertEquals(batch, s1.getMessages().size()); @@ -107,7 +111,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper s1.setSuspended(true); for (int i = batch; i < messages.length; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); } _mgr.processAsync(new OnCurrentThreadExecutor()); @@ -129,7 +133,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper try { AMQMessage msg = message(true); - _mgr.deliver("Me", msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } @@ -151,7 +155,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); - _mgr.deliver("Me", msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } @@ -168,8 +172,6 @@ abstract public class DeliveryManagerTest extends MessageTestHelper public static junit.framework.Test suite() { TestSuite suite = new TestSuite(); - suite.addTestSuite(ConcurrentDeliveryManagerTest.class); - suite.addTestSuite(SynchronizedDeliveryManagerTest.class); return suite; } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java new file mode 100644 index 0000000000..78f8223733 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.AMQException; + +import junit.framework.TestCase; + +import java.util.LinkedList; +import java.util.HashSet; + +class MessageTestHelper extends TestCase +{ + private final MessageStore _messageStore = new SkeletonMessageStore(); + + private final StoreContext _storeContext = new StoreContext(); + + private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + + MessageTestHelper() throws Exception + { + ApplicationRegistry.initialise(new TestApplicationRegistry()); + } + + AMQMessage message() throws AMQException + { + return message(false); + } + + AMQMessage message(boolean immediate) throws AMQException + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Establish some way to determine the version for the test. + MessageTransferBody publish = new MessageTransferBody( + (byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + null, // AMQShortString appId + null, // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + null, // AMQShortString exchange + 0L, // long expiration + immediate, // boolean immediate + false, // boolean mandatory + null, // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + null, // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null); // AMQShortString userId + + return new AMQMessage(_messageStore, publish, _txnContext); + } + +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 400b21c5b1..ca357976b2 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -24,10 +24,12 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.VersionSpecificRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.store.MessageStore; @@ -35,7 +37,6 @@ import org.apache.qpid.server.store.MessageStore; import javax.security.sasl.SaslServer; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * A protocol session that can be used for testing purposes. @@ -46,13 +47,8 @@ public class MockProtocolSession implements AMQProtocolSession private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); - // Keeps a tally of connections for logging and debugging - private static AtomicInteger _ConnectionId; - static { _ConnectionId = new AtomicInteger(0); } - public MockProtocolSession(MessageStore messageStore) { - _ConnectionId.incrementAndGet(); _messageStore = messageStore; } @@ -64,12 +60,12 @@ public class MockProtocolSession implements AMQProtocolSession { } - public String getContextKey() + public AMQShortString getContextKey() { return null; } - public void setContextKey(String contextKey) + public void setContextKey(AMQShortString contextKey) { } @@ -142,110 +138,116 @@ public class MockProtocolSession implements AMQProtocolSession public void setClientProperties(FieldTable clientProperties) { } - public Object getClientIdentifier() { return null; } - public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException { - // TODO Auto-generated method stub - - } - - public void closeChannelResponse(int channelId, long requestId) throws AMQException { - // TODO Auto-generated method stub - - } - - public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException { - // TODO Auto-generated method stub - - } - - public void closeSessionRequest(int replyCode, String replyText) throws AMQException { - // TODO Auto-generated method stub - - } - - public void closeSessionResponse(long requestId) throws AMQException { - // TODO Auto-generated method stub - - } - - public void setFrameMax(long size) { - // TODO Auto-generated method stub - - } - - public long getFrameMax() { - // TODO Auto-generated method stub - return 0; - } - - public QueueRegistry getQueueRegistry() { - // TODO Auto-generated method stub - return null; - } - - public ExchangeRegistry getExchangeRegistry() { - // TODO Auto-generated method stub - return null; - } + public VirtualHost getVirtualHost() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } - public AMQStateManager getStateManager() { - // TODO Auto-generated method stub - return null; - } + public void setVirtualHost(VirtualHost virtualHost) + { + //To change body of implemented methods use File | Settings | File Templates. + } - public byte getMajor() { - // TODO Auto-generated method stub - return 0; - } + public void addSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } - public byte getMinor() { - // TODO Auto-generated method stub - return 0; - } + public void removeSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } - public boolean versionEquals(byte major, byte minor) { - // TODO Auto-generated method stub - return false; - } + public byte getProtocolMajorVersion() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } - public void checkMethodBodyVersion(AMQMethodBody methodBody) { - // TODO Auto-generated method stub - - } + public byte getProtocolMinorVersion() + { + return 9; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isProtocolVersionEqual(byte major, byte minor) + { + return major == 0 && minor == 9; + } + + public void checkMethodBodyVersion(AMQMethodBody methodBody) + { - public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) { - // TODO Auto-generated method stub - return 0; - } + } - public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) { - // TODO Auto-generated method stub - - } + public VersionSpecificRegistry getRegistry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getConnectionId() + { + return 0L; + } + + public AMQStateManager getStateManager() + { + return null; + } + + public long getFrameMax() + { + return 0L; + } + + public void setFrameMax(long size) + { + + } + + public void closeChannelRequest(int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + + } + + public void closeChannelResponse(int channelId, long requestId) throws AMQException + { + + } + + public void closeSessionRequest(int replyCode, AMQShortString replyText, int classId, int methodId) throws AMQException + { + + } - public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) { - // TODO Auto-generated method stub - - } + public void closeSessionRequest(int replyCode, AMQShortString replyText) throws AMQException + { + + } - public int getConnectionId() + public void closeSessionResponse(long requestId) throws AMQException { - return _ConnectionId.get(); + + } + + public long writeRequest(int channelNum, AMQMethodBody methodBody, + AMQMethodListener methodListener) + { + return 0L; } - public void addSessionCloseTask(Task task) + public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) { - //To change body of implemented methods use File | Settings | File Templates. + } - public void removeSessionCloseTask(Task task) + public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) { - //To change body of implemented methods use File | Settings | File Templates. + } + } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java index d3ec3c11d4..d3ec3c11d4 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java index bcf54693d3..bcf54693d3 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index fea3c93280..b3574ecba4 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -67,6 +67,12 @@ public class SubscriptionTestHelper implements Subscription return isSuspended; } + public boolean wouldSuspend(AMQMessage msg) + { + return isSuspended; + } + + public void queueDeleted(AMQQueue queue) { } diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index bc0a8a7d64..4468bfe13b 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,11 +20,12 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -40,63 +41,94 @@ public class SkeletonMessageStore implements MessageStore public void configure(String base, Configuration config) throws Exception { } - - public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception { + //To change body of implemented methods use File | Settings | File Templates. } public void close() throws Exception { } - public void put(AMQMessage msg) + public void removeMessage(StoreContext s, Long messageId) { } - public void removeMessage(long messageId) + public void createQueue(AMQQueue queue) throws AMQException + { + } + + public void beginTran(StoreContext s) throws AMQException { } - public void createQueue(AMQQueue queue) throws AMQException + public boolean inTran(StoreContext sc) { + return false; } - public void removeQueue(String name) throws AMQException + public void commitTran(StoreContext storeContext) throws AMQException { } - public void enqueueMessage(String name, long messageId) throws AMQException + public void abortTran(StoreContext storeContext) throws AMQException { } - public void dequeueMessage(String name, long messageId) throws AMQException + public List<AMQQueue> createQueues() throws AMQException { + return null; } - public void beginTran() throws AMQException + public Long getNewMessageId() { + return _messageId.getAndIncrement(); } - public boolean inTran() + public void storeContentChunk(StoreContext sc, Long messageId, int index, byte[] content, boolean lastContentBody) throws AMQException { - return false; + } - - public void commitTran() throws AMQException + + public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException { + } - public void abortTran() throws AMQException + public MessageMetaData getMessageMetaData(Long messageId) throws AMQException { + return null; } - public List<AMQQueue> createQueues() throws AMQException + public byte[] getContentChunk(Long messageId, int index) throws AMQException { return null; } - public long getNewMessageId() + public void removeQueue(AMQShortString name) throws AMQException { - return _messageId.getAndIncrement(); + + } + + public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + { + + } + + public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + { + + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, byte[] contentBody, boolean lastContentBody) throws AMQException + { + } + + public byte[] getContentBodyChunk(Long messageId, int index) throws AMQException + { + return new byte[0]; + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java new file mode 100644 index 0000000000..5d60596c64 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.txn.TransactionalContext; + +/** + * Tests that reference counting works correctly with AMQMessage and the message store + */ +public class TestReferenceCounting extends TestCase +{ + private TransactionalContext txnContext = null; + + private TestableMemoryMessageStore _store; + + private StoreContext _storeContext = new StoreContext(); + + protected void setUp() throws Exception + { + super.setUp(); + _store = new TestableMemoryMessageStore(); + } + + /** + * Check that when the reference count is decremented the message removes itself from the store + */ + public void testMessageGetsRemoved() throws AMQException + { + // TODO: fix hardcoded protocol version data + AMQMessage message = new AMQMessage(_store, + new MessageTransferBody((byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + null, // AMQShortString appId + null, // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + null, // AMQShortString exchange + 0L, // long expiration + false, // boolean immediate + false, // boolean mandatory + null, // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + null, // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null), // AMQShortString userId + txnContext); + message.incrementReference(); + // we call routing complete to set up the handle +// message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + assertTrue(_store.getMessageMetaDataMap().size() == 1); + message.decrementReference(_storeContext); + assertTrue(_store.getMessageMetaDataMap().size() == 0); + } + + public void testMessageRemains() throws AMQException + { + // TODO: fix hardcoded protocol version data + AMQMessage message = new AMQMessage(_store, + new MessageTransferBody((byte)0, + (byte)9, + MessageTransferBody.getClazz((byte)0,(byte)9), + MessageTransferBody.getMethod((byte)0,(byte)9), + null, // AMQShortString appId + null, // FieldTable applicationHeaders + null, // Content body + null, // AMQShortString contentEncoding + null, // AMQShortString contentType + null, // AMQShortString correlationId + (short)0, // short deliveryMode + null, // AMQShortString destination + null, // AMQShortString exchange + 0L, // long expiration + false, // boolean immediate + false, // boolean mandatory + null, // AMQShortString messageId + (short)0, // short priority + false, // boolean redelivered + null, // AMQShortString replyTo + null, // AMQShortString routingKey + null, // byte[] securityToken + 0, // int ticket + 0L, // long timestamp + null, // AMQShortString transactionId + 0L, // long ttl + null), // AMQShortString userId + txnContext); + message.incrementReference(); + // we call routing complete to set up the handle +// message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + assertTrue(_store.getMessageMetaDataMap().size() == 1); + message.incrementReference(); + message.decrementReference(_storeContext); + assertTrue(_store.getMessageMetaDataMap().size() == 1); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TestReferenceCounting.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index be7687a22c..fc775c904c 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,10 +20,11 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageMetaData; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; /** * Adds some extra methods to the memory message store for testing purposes. @@ -32,11 +33,17 @@ public class TestableMemoryMessageStore extends MemoryMessageStore { public TestableMemoryMessageStore() { - _messageMap = new ConcurrentHashMap<Long, AMQMessage>(); + _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); + _contentBodyMap = new ConcurrentHashMap<Long, List<byte[]>>(); + } + + public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() + { + return _metaDataMap; } - public ConcurrentMap<Long, AMQMessage> getMessageMap() + public ConcurrentMap<Long, List<byte[]>> getContentBodyMap() { - return _messageMap; + return _contentBodyMap; } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java index ac5c60a931..1d9e30c24e 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,23 +20,23 @@ */ package org.apache.qpid.server.txn; +import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; -import junit.framework.TestCase; - public class TxnBufferTest extends TestCase { - private final LinkedList<MockOp> ops = new LinkedList<MockOp>(); + private final LinkedList<MockOp> ops = new LinkedList<MockOp>(); public void testCommit() throws AMQException { MockStore store = new MockStore(); - TxnBuffer buffer = new TxnBuffer(store); + TxnBuffer buffer = new TxnBuffer(); buffer.enlist(new MockOp().expectPrepare().expectCommit()); //check relative ordering MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit(); @@ -44,7 +44,7 @@ public class TxnBufferTest extends TestCase buffer.enlist(op); buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); @@ -54,12 +54,12 @@ public class TxnBufferTest extends TestCase { MockStore store = new MockStore(); - TxnBuffer buffer = new TxnBuffer(store); + TxnBuffer buffer = new TxnBuffer(); buffer.enlist(new MockOp().expectRollback()); buffer.enlist(new MockOp().expectRollback()); buffer.enlist(new MockOp().expectRollback()); - buffer.rollback(); + buffer.rollback(null); validateOps(); store.validate(); @@ -68,17 +68,17 @@ public class TxnBufferTest extends TestCase public void testCommitWithFailureDuringPrepare() throws AMQException { MockStore store = new MockStore(); - store.expectBegin().expectAbort(); + store.beginTran(null); - TxnBuffer buffer = new TxnBuffer(store); - buffer.containsPersistentChanges(); + TxnBuffer buffer = new TxnBuffer(); + buffer.enlist(new StoreMessageOperation(store)); buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); buffer.enlist(new TxnTester(store)); buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); buffer.enlist(new FailedPrepare()); buffer.enlist(new MockOp()); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); } @@ -86,16 +86,17 @@ public class TxnBufferTest extends TestCase public void testCommitWithPersistance() throws AMQException { MockStore store = new MockStore(); - store.expectBegin().expectCommit(); + store.beginTran(null); + store.expectCommit(); - TxnBuffer buffer = new TxnBuffer(store); + TxnBuffer buffer = new TxnBuffer(); buffer.enlist(new MockOp().expectPrepare().expectCommit()); buffer.enlist(new MockOp().expectPrepare().expectCommit()); buffer.enlist(new MockOp().expectPrepare().expectCommit()); + buffer.enlist(new StoreMessageOperation(store)); buffer.enlist(new TxnTester(store)); - buffer.containsPersistentChanges(); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); } @@ -114,7 +115,7 @@ public class TxnBufferTest extends TestCase } class MockOp implements TxnOp - { + { final Object PREPARE = "PREPARE"; final Object COMMIT = "COMMIT"; final Object UNDO_PREPARE = "UNDO_PREPARE"; @@ -127,12 +128,12 @@ public class TxnBufferTest extends TestCase ops.add(this); } - public void prepare() + public void prepare(StoreContext context) { assertEquals(expected.removeLast(), PREPARE); } - public void commit() + public void commit(StoreContext context) { assertEquals(expected.removeLast(), COMMIT); } @@ -142,7 +143,7 @@ public class TxnBufferTest extends TestCase assertEquals(expected.removeLast(), UNDO_PREPARE); } - public void rollback() + public void rollback(StoreContext context) { assertEquals(expected.removeLast(), ROLLBACK); } @@ -193,25 +194,24 @@ public class TxnBufferTest extends TestCase private final LinkedList expected = new LinkedList(); private boolean inTran; - public void beginTran() throws AMQException + public void beginTran(StoreContext context) throws AMQException { - assertEquals(expected.removeLast(), BEGIN); inTran = true; } - - public void commitTran() throws AMQException + + public void commitTran(StoreContext context) throws AMQException { assertEquals(expected.removeLast(), COMMIT); inTran = false; } - - public void abortTran() throws AMQException + + public void abortTran(StoreContext context) throws AMQException { assertEquals(expected.removeLast(), ABORT); inTran = false; } - public boolean inTran() + public boolean inTran(StoreContext context) { return inTran; } @@ -249,23 +249,23 @@ public class TxnBufferTest extends TestCase } class NullOp implements TxnOp - { - public void prepare() throws AMQException + { + public void prepare(StoreContext context) throws AMQException { } - public void commit() + public void commit(StoreContext context) { } public void undoPrepare() { } - public void rollback() + public void rollback(StoreContext context) { } } class FailedPrepare extends NullOp - { + { public void prepare() throws AMQException { throw new AMQException("Fail!"); @@ -273,9 +273,11 @@ public class TxnBufferTest extends TestCase } class TxnTester extends NullOp - { + { private final MessageStore store; + private final StoreContext context = new StoreContext(); + TxnTester(MessageStore store) { this.store = store; @@ -283,12 +285,12 @@ public class TxnBufferTest extends TestCase public void prepare() throws AMQException { - assertTrue("Expected prepare to be performed under txn", store.inTran()); + assertTrue("Expected prepare to be performed under txn", store.inTran(context)); } public void commit() { - assertTrue("Expected commit not to be performed under txn", !store.inTran()); + assertTrue("Expected commit not to be performed under txn", !store.inTran(context)); } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/java/systests/src/main/java/org/apache/qpid/server/util/AveragedRun.java index 1d17985ab5..1d17985ab5 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java +++ b/java/systests/src/main/java/org/apache/qpid/server/util/AveragedRun.java diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/java/systests/src/main/java/org/apache/qpid/server/util/RunStats.java index ec67fc68b3..ec67fc68b3 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java +++ b/java/systests/src/main/java/org/apache/qpid/server/util/RunStats.java diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java index f801daf27c..849285e6d6 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -29,14 +29,18 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.NullAuthenticationManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import java.util.HashMap; +import java.util.Collection; public class TestApplicationRegistry extends ApplicationRegistry { @@ -51,6 +55,7 @@ public class TestApplicationRegistry extends ApplicationRegistry private AuthenticationManager _authenticationManager; private MessageStore _messageStore; + private VirtualHost _vHost; public TestApplicationRegistry() { @@ -59,10 +64,12 @@ public class TestApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { - _managedObjectRegistry = new NoopManagedObjectRegistry(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeFactory = new DefaultExchangeFactory(); - _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _managedObjectRegistry = appRegistry.getManagedObjectRegistry(); + _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queueRegistry = _vHost.getQueueRegistry(); + _exchangeFactory = _vHost.getExchangeFactory(); + _exchangeRegistry = _vHost.getExchangeRegistry(); _authenticationManager = new NullAuthenticationManager(); _messageStore = new TestableMemoryMessageStore(); @@ -99,6 +106,16 @@ public class TestApplicationRegistry extends ApplicationRegistry return _authenticationManager; } + public Collection<String> getVirtualHostNames() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public VirtualHostRegistry getVirtualHostRegistry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public MessageStore getMessageStore() { return _messageStore; diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/java/systests/src/main/java/org/apache/qpid/server/util/TimedRun.java index 1291380311..1291380311 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java +++ b/java/systests/src/main/java/org/apache/qpid/server/util/TimedRun.java diff --git a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java b/java/systests/src/main/java/org/apache/qpid/test/VMBrokerSetup.java index e859fac4af..e859fac4af 100644 --- a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java +++ b/java/systests/src/main/java/org/apache/qpid/test/VMBrokerSetup.java diff --git a/java/systests/src/test/java/log4j.properties b/java/systests/src/main/java/systests.log4j index 6d596d1d19..6d596d1d19 100644 --- a/java/systests/src/test/java/log4j.properties +++ b/java/systests/src/main/java/systests.log4j diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java index 6490b9f270..2c5712fd35 100644 --- a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java +++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,6 +27,9 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; @@ -43,6 +46,7 @@ import org.apache.qpid.server.util.TimedRun; import java.util.ArrayList; import java.util.List; +import java.util.LinkedList; public class SendPerfTest extends TimedRun { @@ -101,13 +105,16 @@ public class SendPerfTest extends TimedRun ContentHeaderBody header = new ContentHeaderBody(); List<ContentBody> body = new ArrayList<ContentBody>(); MessageStore messageStore = new SkeletonMessageStore(); + // channel can be null since it is only used in ack processing which does not apply to this test + TransactionalContext txContext = new NonTransactionalContext(messageStore, null, + new LinkedList<RequiredDeliveryException>()); body.add(new ContentBody()); + MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 0; i < count; i++) { - for (AMQQueue q : queues) - { - q.deliver(new AMQMessage(messageStore, i, publish, header, body)); - } + // this routes and delivers the message + AMQMessage msg = new AMQMessage(i, publish, txContext, header, queues, body, messageStore, + factory); } } diff --git a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java index a3e555aac9..3e35e3c85b 100644 --- a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java +++ b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,22 +20,19 @@ */ package org.apache.qpid.test.unit.ack; +import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.test.VMBrokerSetup; import javax.jms.*; -import junit.framework.TestCase; - public class DisconnectAndRedeliverTest extends TestCase { private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); @@ -55,12 +52,14 @@ public class DisconnectAndRedeliverTest extends TestCase protected void setUp() throws Exception { super.setUp(); + TransportConnection.createVMBroker(1); ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); } protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killAllVMBrokers(); } /** @@ -82,7 +81,7 @@ public class DisconnectAndRedeliverTest extends TestCase ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -149,7 +148,7 @@ public class DisconnectAndRedeliverTest extends TestCase _logger.info("No messages redelivered as is expected"); con.close(); - _logger.info("Actually:" + store.getMessageMap().size()); + _logger.info("Actually:" + store.getMessageMetaDataMap().size()); // assertTrue(store.getMessageMap().size() == 0); } @@ -204,13 +203,13 @@ public class DisconnectAndRedeliverTest extends TestCase assertNull(tm); _logger.info("No messages redelivered as is expected"); - _logger.info("Actually:" + store.getMessageMap().size()); - assertTrue(store.getMessageMap().size() == 0); + _logger.info("Actually:" + store.getMessageMetaDataMap().size()); + assertTrue(store.getMessageMetaDataMap().size() == 0); con.close(); } public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(DisconnectAndRedeliverTest.class)); + return new junit.framework.TestSuite(DisconnectAndRedeliverTest.class); } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java deleted file mode 100644 index 6320a2c1be..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import junit.framework.TestCase; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.MessageHeaders; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public class AbstractHeadersExchangeTestBase extends TestCase -{ - private final HeadersExchange exchange = new HeadersExchange(); - protected final Set<TestQueue> queues = new HashSet<TestQueue>(); - private int count; - - public void testDoNothing() - { - // this is here only to make junit under Eclipse happy - } - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - return bind("Queue" + (++count), bindings); - } - - protected TestQueue bind(String queueName, String... bindings) throws AMQException - { - return bind(queueName, getHeaders(bindings)); - } - - protected TestQueue bind(String queue, FieldTable bindings) throws AMQException - { - return bind(new TestQueue(queue), bindings); - } - - protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException - { - return bind(queue, getHeaders(bindings)); - } - - protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException - { - queues.add(queue); - exchange.registerQueue(null, queue, bindings); - return queue; - } - - - protected void route(Message m) throws AMQException - { - m.route(exchange); - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, false, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException - { - routeAndTest(m, expectReturn, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException - { - routeAndTest(m, false, expected); - } - - protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException - { - try - { - route(m); - assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - } - - catch (NoRouteException ex) - { - assertTrue("Expected "+m+" not to be returned",expectReturn); - } - - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = FieldTableFactory.newFieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.put(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - static class TestQueue extends AMQQueue - { - final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); - - public TestQueue(String name) throws AMQException - { - super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry()); - } - - public void deliver(AMQMessage msg) throws AMQException - { - messages.add(new HeadersExchangeTest.Message(msg)); - } - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private static MessageStore _messageStore = new SkeletonMessageStore(); - - Message(String id, String... headers) throws AMQException - { - this(id, getHeaders(headers)); - } - - Message(String id, FieldTable headers) throws AMQException - { - this(_messageStore, getMessageTransferBody(id,headers), new ArrayList()); - } - - private static MessageTransferBody getMessageTransferBody(String id,FieldTable headers){ - MessageHeaders messageHeaders = new MessageHeaders(); - MessageTransferBody methodBody = MessageTransferBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - messageHeaders.getAppId(), // String appId - headers, // FieldTable applicationHeaders - new Content(), // Content body - messageHeaders.getEncoding(), // String contentEncoding - messageHeaders.getContentType(), // String contentType - messageHeaders.getCorrelationId(), // String correlationId - (short)1, // short deliveryMode - "someExchange", // String destination - "someExchange", // String exchange - messageHeaders.getExpiration(), // long expiration - false, // boolean immediate - false, // boolean mandatory - null, // String messageId - (short)0, // short priority - false, // boolean redelivered - messageHeaders.getReplyTo(), // String replyTo - id, // String routingKey - new String("abc123").getBytes(), // byte[] securityToken - 0, // int ticket - messageHeaders.getTimestamp(), // long timestamp - messageHeaders.getTransactionId(), // String transactionId - 0, // long ttl - messageHeaders.getUserId()); // String userId - - return methodBody; - } - - Message(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents) throws AMQException - { - super(_messageStore, transferBody, contents); - } - - private Message(AMQMessage msg) throws AMQException - { - super(msg); - } - - void route(Exchange exchange) throws AMQException - { - exchange.route(this); - } - - boolean isInQueue(TestQueue queue) - { - return queue.messages.contains(this); - } - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - return this.getTransferBody().getRoutingKey(); - } - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java deleted file mode 100644 index 3072d44f48..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.ConcurrentDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.DeliveryManagerTest; - -public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest -{ - public ConcurrentDeliveryManagerTest() throws Exception - { - try - { - System.setProperty("concurrentdeliverymanager","true"); - _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); - } - catch (Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ConcurrentDeliveryManagerTest.class); - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java deleted file mode 100644 index 3eba217903..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.ArrayList; - -import junit.framework.TestCase; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.MessageHeaders; -import org.apache.qpid.framing.Content; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.util.TestApplicationRegistry; - -class MessageTestHelper extends TestCase -{ - private final MessageStore _messageStore = new SkeletonMessageStore(); - - MessageTestHelper() throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry()); - } - - AMQMessage message() throws AMQException - { - return message(false); - } - - AMQMessage message(boolean immediate) throws AMQException - { - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Establish some way to determine the version for the test. - MessageHeaders messageHeaders = new MessageHeaders(); - - MessageTransferBody methodBody = MessageTransferBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - messageHeaders.getAppId(), // String appId - messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders - new Content(), // Content body - messageHeaders.getEncoding(), // String contentEncoding - messageHeaders.getContentType(), // String contentType - messageHeaders.getCorrelationId(), // String correlationId - (short)1, // short deliveryMode - "someExchange", // String destination - "someExchange", // String exchange - messageHeaders.getExpiration(), // long expiration - immediate, // boolean immediate - false, // boolean mandatory - "", // String messageId - (short)0, // short priority - false, // boolean redelivered - messageHeaders.getReplyTo(), // String replyTo - "rk", // String routingKey - new String("abc123").getBytes(), // byte[] securityToken - 0, // int ticket - messageHeaders.getTimestamp(), // long timestamp - messageHeaders.getTransactionId(), // String transactionId - 0, // long ttl - messageHeaders.getUserId()); // String userId - - return new AMQMessage(_messageStore, methodBody, new ArrayList()); - } - -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java deleted file mode 100644 index ebe8e192a0..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.queue.SynchronizedDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.DeliveryManagerTest; -import org.apache.qpid.AMQException; - -import junit.framework.TestSuite; - -public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest -{ - public SynchronizedDeliveryManagerTest() throws Exception - { - try - { - System.setProperty("concurrentdeliverymanager","false"); - _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); - } - catch (Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } - } - - public static junit.framework.Test suite() - { - TestSuite suite = new TestSuite(); - suite.addTestSuite(SynchronizedDeliveryManagerTest.class); - return suite; - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java deleted file mode 100644 index f162506fed..0000000000 --- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.AMQException; - -import junit.framework.TestCase; - -/** - * Tests that reference counting works correctly with AMQMessage and the message store - */ -public class TestReferenceCounting extends TestCase -{ - private TestableMemoryMessageStore _store; - - protected void setUp() throws Exception - { - super.setUp(); - _store = new TestableMemoryMessageStore(); - } - - /** - * Check that when the reference count is decremented the message removes itself from the store - */ - public void testMessageGetsRemoved() throws AMQException - { - AMQMessage message = new AMQMessage(_store, null); - _store.put(message); - assertTrue(_store.getMessageMap().size() == 1); - message.decrementReference(); - assertTrue(_store.getMessageMap().size() == 0); - } - - public void testMessageRemains() throws AMQException - { - AMQMessage message = new AMQMessage(_store, null); - _store.put(message); - assertTrue(_store.getMessageMap().size() == 1); - message.incrementReference(); - message.decrementReference(); - assertTrue(_store.getMessageMap().size() == 1); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TestReferenceCounting.class); - } -} |
