summaryrefslogtreecommitdiff
path: root/java/systests
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests')
-rw-r--r--java/systests/distribution/pom.xml111
-rw-r--r--java/systests/distribution/src/main/assembly/systests.xml91
-rw-r--r--java/systests/pom.xml53
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java)38
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java)80
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java330
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java)32
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java)23
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java)3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java)55
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java (renamed from java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java)0
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java)120
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java)185
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java)18
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java)22
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java95
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java)186
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java)0
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java)0
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (renamed from java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java)6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (renamed from java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java)78
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java135
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (renamed from java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java)21
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java (renamed from java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java)74
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/util/AveragedRun.java (renamed from java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java)0
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/util/RunStats.java (renamed from java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java)0
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java (renamed from java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java)25
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/util/TimedRun.java (renamed from java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java)0
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/VMBrokerSetup.java (renamed from java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java)0
-rw-r--r--java/systests/src/main/java/systests.log4j (renamed from java/systests/src/test/java/log4j.properties)0
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java19
-rw-r--r--java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java21
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java247
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java50
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java85
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java54
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java67
37 files changed, 1384 insertions, 940 deletions
diff --git a/java/systests/distribution/pom.xml b/java/systests/distribution/pom.xml
new file mode 100644
index 0000000000..bff1e0d9e5
--- /dev/null
+++ b/java/systests/distribution/pom.xml
@@ -0,0 +1,111 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-distribution</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid System Tests Distribution</name>
+ <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ <java.source.version>1.5</java.source.version>
+ <qpid.version>${pom.version}</qpid.version>
+ <qpid.targetDir>${project.build.directory}</qpid.targetDir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>${java.source.version}</source>
+ <target>${java.source.version}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${assembly.version}</version>
+ <configuration>
+ <finalName>qpid-${pom.version}</finalName>
+ <outputDirectory>${qpid.targetDir}</outputDirectory>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <finalName>qpid-systests</finalName>
+ <archive>
+ <manifest>
+ <addClasspath>true</addClasspath>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>distribution-package</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/systests.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ </build>
+
+</project>
diff --git a/java/systests/distribution/src/main/assembly/systests.xml b/java/systests/distribution/src/main/assembly/systests.xml
new file mode 100644
index 0000000000..2d6a6d8572
--- /dev/null
+++ b/java/systests/distribution/src/main/assembly/systests.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<assembly>
+ <id>system-test-java</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+
+<fileSets>
+ <!-- Apache Licensing Details-->
+ <fileSet>
+ <directory>../../resources</directory>
+ <outputDirectory>qpid-${qpid.version}</outputDirectory>
+ <includes>
+ <include>DISCLAIMER</include>
+ <include>LICENSE.txt</include>
+ <include>NOTICE.txt</include>
+ <include>README.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>..</directory>
+ <outputDirectory>qpid-${qpid.version}</outputDirectory>
+ <includes>
+ <include>*.txt</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../../release-docs</directory>
+ <outputDirectory>qpid-${qpid.version}/docs</outputDirectory>
+ <includes>
+ <include>RELEASE_NOTES.txt</include>
+ </includes>
+ </fileSet>
+
+ <!-- Scripts to run the system tests-->
+ <fileSet>
+ <directory>../bin</directory>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <includes>
+ <include>*</include>
+ </includes>
+ </fileSet>
+
+ <!-- Include source files in easy access form -->
+ <fileSet>
+ <directory>../src/main</directory>
+ <outputDirectory>qpid-${qpid.version}/src</outputDirectory>
+ <includes>
+ <include>**/*.java</include>
+ <include>**/*.log4j</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>qpid-${qpid.version}/lib</outputDirectory>
+ <includes>
+ <include>qpid-systests.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>qpid-${qpid.version}/lib</outputDirectory>
+ <unpack>false</unpack>
+ <excludes>
+ <exclude>org.apache.qpid:qpid-systests-distribution</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/java/systests/pom.xml b/java/systests/pom.xml
index c73e5f2c44..614166754c 100644
--- a/java/systests/pom.xml
+++ b/java/systests/pom.xml
@@ -39,23 +39,20 @@
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
<artifactId>qpid-client</artifactId>
+ <type>jar</type>
</dependency>
+
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker</artifactId>
+ <type>jar</type>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>compile</scope>
</dependency>
</dependencies>
@@ -65,26 +62,30 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <systemProperties>
- <property>
- <name>amqj.noAutoCreateVMBroker</name>
- <value>true</value>
- </property>
- <property>
- <name>amqj.logging.level</name>
- <value>${amqj.logging.level}</value>
- </property>
- <property>
- <name>log4j.debug</name>
- <value>true</value>
- </property>
- <property>
- <name>log4j.configuration</name>
- <value>file:///${basedir}/src/test/java/log4j.properties</value>
- </property>
- </systemProperties>
+ <skip>true</skip>
</configuration>
</plugin>
+
</plugins>
+
+ <!-- Include source files in built jar -->
+ <resources>
+ <resource>
+ <targetPath>src/</targetPath>
+ <filtering>false</filtering>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </resource>
+ <resource>
+ <targetPath>src/</targetPath>
+ <filtering>false</filtering>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>systests.log4j</include>
+ </includes>
+ </resource>
+ </resources>
</build>
</project>
diff --git a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index 21ad1b6a7f..62dc44e23f 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -23,6 +23,8 @@ import org.apache.qpid.server.management.ManagedBroker;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.framing.AMQShortString;
public class AMQBrokerManagerMBeanTest extends TestCase
{
@@ -35,40 +37,44 @@ public class AMQBrokerManagerMBeanTest extends TestCase
String exchange2 = "testExchange2_" + System.currentTimeMillis();
String exchange3 = "testExchange3_" + System.currentTimeMillis();
- assertTrue(_exchangeRegistry.getExchange(exchange1) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange2) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange3) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
- ManagedBroker mbean = new AMQBrokerManagerMBean();
+ VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject());
mbean.createNewExchange(exchange1,"direct",false, false);
mbean.createNewExchange(exchange2,"topic",false, false);
mbean.createNewExchange(exchange3,"headers",false, false);
- assertTrue(_exchangeRegistry.getExchange(exchange1) != null);
- assertTrue(_exchangeRegistry.getExchange(exchange2) != null);
- assertTrue(_exchangeRegistry.getExchange(exchange3) != null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) != null);
mbean.unregisterExchange(exchange1);
mbean.unregisterExchange(exchange2);
mbean.unregisterExchange(exchange3);
- assertTrue(_exchangeRegistry.getExchange(exchange1) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange2) == null);
- assertTrue(_exchangeRegistry.getExchange(exchange3) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
}
public void testQueueOperations() throws Exception
{
String queueName = "testQueue_" + System.currentTimeMillis();
- ManagedBroker mbean = new AMQBrokerManagerMBean();
+ VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean)vHost.getManagedObject());
- assertTrue(_queueRegistry.getQueue(queueName) == null);
+ assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
mbean.createNewQueue(queueName, false, "test", true);
- assertTrue(_queueRegistry.getQueue(queueName) != null);
+ assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null);
mbean.deleteQueue(queueName);
- assertTrue(_queueRegistry.getQueue(queueName) == null);
+ assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
}
@Override
@@ -76,7 +82,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase
{
super.setUp();
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
+ _queueRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry();
+ _exchangeRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry();
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 1a15ca7561..5451d64b45 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,16 +20,18 @@
*/
package org.apache.qpid.server.ack;
+import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-
-import junit.framework.TestCase;
+import java.util.*;
public class TxAckTest extends TestCase
{
@@ -87,18 +89,54 @@ public class TxAckTest extends TestCase
private class Scenario
{
- private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = new LinkedHashMap<Long, UnacknowledgedMessage>();
- private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages);
+ private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
private final TxAck _op = new TxAck(_map);
private final List<Long> _acked;
private final List<Long> _unacked;
+ private StoreContext _storeContext = new StoreContext();
Scenario(int messageCount, List<Long> acked, List<Long> unacked)
{
+ MessageStore messageStore = new TestableMemoryMessageStore();
+ TransactionalContext txnContext = new NonTransactionalContext(messageStore,
+ _storeContext, null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
for(int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
- _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag));
+ // TODO: fix hardcoded protocol version data
+ TestMessage message = new TestMessage(deliveryTag, messageStore,
+ new MessageTransferBody(
+ (byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ null, // AMQShortString appId
+ null, // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ null, // AMQShortString exchange
+ 0L, // long expiration
+ false, // boolean immediate
+ false, // boolean mandatory
+ null, // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ null, // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null), // AMQShortString userId
+ txnContext);
+ _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
}
_acked = acked;
_unacked = unacked;
@@ -113,7 +151,7 @@ public class TxAckTest extends TestCase
{
for(long tag : tags)
{
- UnacknowledgedMessage u = _messages.get(tag);
+ UnacknowledgedMessage u = _map.get(tag);
assertTrue("Message not found for tag " + tag, u != null);
((TestMessage) u.message).assertCountEquals(expected);
}
@@ -122,11 +160,11 @@ public class TxAckTest extends TestCase
void prepare() throws AMQException
{
_op.consolidate();
- _op.prepare();
+ _op.prepare(_storeContext);
assertCount(_acked, -1);
assertCount(_unacked, 0);
-
+
}
void undoPrepare()
{
@@ -140,16 +178,16 @@ public class TxAckTest extends TestCase
void commit()
{
_op.consolidate();
- _op.commit();
-
+ _op.commit(_storeContext);
+
//check acked messages are removed from map
- HashSet<Long> keys = new HashSet<Long>(_messages.keySet());
+ Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
keys.retainAll(_acked);
assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
//check unacked messages are still in map
keys = new HashSet<Long>(_unacked);
- keys.removeAll(_messages.keySet());
+ keys.removeAll(_map.getDeliveryTags());
assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
}
}
@@ -159,9 +197,9 @@ public class TxAckTest extends TestCase
private final long _tag;
private int _count;
- TestMessage(long tag)
+ TestMessage(long tag, MessageStore store, MessageTransferBody transferBody, TransactionalContext txnContext)
{
- super(new TestableMemoryMessageStore(), null);
+ super(store, transferBody, txnContext);
_tag = tag;
}
@@ -170,7 +208,7 @@ public class TxAckTest extends TestCase
_count++;
}
- public void decrementReference()
+ public void decrementReference(StoreContext context)
{
_count--;
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
new file mode 100644
index 0000000000..a047afa978
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -0,0 +1,330 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+public class AbstractHeadersExchangeTestBase extends TestCase
+{
+ private static TransactionalContext txnContext = null;
+
+ private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
+
+ private final HeadersExchange exchange = new HeadersExchange();
+ protected final Set<TestQueue> queues = new HashSet<TestQueue>();
+
+ /**
+ * Not used in this test, just there to stub out the routing calls
+ */
+ private MessageStore _store = new MemoryMessageStore();
+
+ private StoreContext _storeContext = new StoreContext();
+
+ private MessageHandleFactory _handleFactory = new MessageHandleFactory();
+
+ private int count;
+
+ public void testDoNothing()
+ {
+ // this is here only to make junit under Eclipse happy
+ }
+
+ protected TestQueue bindDefault(String... bindings) throws AMQException
+ {
+ return bind("Queue" + (++count), bindings);
+ }
+
+ protected TestQueue bind(String queueName, String... bindings) throws AMQException
+ {
+ return bind(queueName, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
+ {
+ return bind(new TestQueue(new AMQShortString(queue)), bindings);
+ }
+
+ protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
+ {
+ return bind(queue, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
+ {
+ queues.add(queue);
+ exchange.registerQueue(null, queue, bindings);
+ return queue;
+ }
+
+
+ protected void route(Message m) throws AMQException
+ {
+ m.route(exchange);
+// m.routingComplete(_store, _storeContext, _handleFactory);
+ }
+
+ protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, false, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, expectReturn, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
+ {
+ routeAndTest(m, false, expected);
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
+ {
+ try
+ {
+ route(m);
+ assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ for (TestQueue q : queues)
+ {
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
+ }
+ }
+
+ catch (NoRouteException ex)
+ {
+ assertTrue("Expected "+m+" not to be returned",expectReturn);
+ }
+
+ }
+
+ static FieldTable getHeaders(String... entries)
+ {
+ FieldTable headers = FieldTableFactory.newFieldTable();
+ for (String s : entries)
+ {
+ String[] parts = s.split("=", 2);
+ headers.setObject(parts[0], parts.length > 1 ? parts[1] : "");
+ }
+ return headers;
+ }
+
+ static MessageTransferBody getPublishRequest(String id)
+ {
+ // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+ // TODO: Establish some way to determine the version for the test.
+ MessageTransferBody request = new MessageTransferBody(
+ (byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ new AMQShortString(id), // AMQShortString appId
+ null, // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ null, // AMQShortString exchange
+ 0L, // long expiration
+ false, // boolean immediate
+ false, // boolean mandatory
+ null, // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ null, // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null); // AMQShortString userId
+
+ return request;
+ }
+
+// static ContentHeaderBody getContentHeader(FieldTable headers)
+// {
+// ContentHeaderBody header = new ContentHeaderBody();
+// header.properties = getProperties(headers);
+// return header;
+// }
+//
+// static BasicContentHeaderProperties getProperties(FieldTable headers)
+// {
+// BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+// properties.setHeaders(headers);
+// return properties;
+// }
+
+ static class TestQueue extends AMQQueue
+ {
+ final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+
+ public TestQueue(AMQShortString name) throws AMQException
+ {
+ super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
+ }
+
+ /**
+ * We override this method so that the default behaviour, which attempts to use a delivery manager, is
+ * not invoked. It is unnecessary since for this test we only care to know whether the message was
+ * sent to the queue; the queue processing logic is not being tested.
+ * @param msg
+ * @throws AMQException
+ */
+ public void process(StoreContext context, AMQMessage msg) throws AMQException
+ {
+ messages.add(new HeadersExchangeTest.Message(msg));
+ }
+ }
+
+ /**
+ * Just add some extra utility methods to AMQMessage to aid testing.
+ */
+ static class Message extends AMQMessage
+ {
+ private static MessageStore _messageStore = new SkeletonMessageStore();
+
+ private static StoreContext _storeContext = new StoreContext();
+
+ private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+
+ Message(String id, String... headers) throws AMQException
+ {
+ this(new MessageTransferBody(
+ (byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ null, // AMQShortString appId
+ getHeaders(headers), // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ null, // AMQShortString exchange
+ 0L, // long expiration
+ false, // boolean immediate
+ false, // boolean mandatory
+ new AMQShortString(id), // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ null, // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null)); // AMQShortString userId
+ }
+
+// Message(String id, FieldTable headers) throws AMQException
+// {
+// this(getPublishRequest(id), getContentHeader(headers), null);
+// }
+
+ private Message(MessageTransferBody publish) throws AMQException
+ {
+ super(_messageStore, publish, _txnContext);
+ }
+
+ private Message(AMQMessage msg) throws AMQException
+ {
+ super(msg);
+ }
+
+ void route(Exchange exchange) throws AMQException
+ {
+ exchange.route(this);
+ }
+
+ boolean isInQueue(TestQueue queue)
+ {
+ return queue.messages.contains(this);
+ }
+
+ public int hashCode()
+ {
+ return getKey().hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ }
+
+ private boolean equals(HeadersExchangeTest.Message m)
+ {
+ return getKey().equals(m.getKey());
+ }
+
+ public String toString()
+ {
+ return getKey().toString();
+ }
+
+ private Object getKey()
+ {
+// try
+// {
+ return getTransferBody().getRoutingKey();
+// }
+// catch (AMQException e)
+// {
+// _log.error("Error getting routing key: " + e, e);
+// return null;
+// }
+ }
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
index bb88d2e8d0..9653155a51 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -21,7 +21,11 @@ import junit.framework.TestCase;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@@ -34,6 +38,7 @@ public class ExchangeMBeanTest extends TestCase
{
private AMQQueue _queue;
private QueueRegistry _queueRegistry;
+ private VirtualHost _virtualHost;
/**
* Test for direct exchange mbean
@@ -43,12 +48,12 @@ public class ExchangeMBeanTest extends TestCase
public void testDirectExchangeMBean() throws Exception
{
DestNameExchange exchange = new DestNameExchange();
- exchange.initialise("amq.direct", false, 0, true);
+ exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
- mbean.createNewBinding(_queue.getName(), "binding1");
- mbean.createNewBinding(_queue.getName(), "binding2");
+ mbean.createNewBinding(_queue.getName().toString(), "binding1");
+ mbean.createNewBinding(_queue.getName().toString(), "binding2");
TabularData data = mbean.bindings();
ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
@@ -70,12 +75,12 @@ public class ExchangeMBeanTest extends TestCase
public void testTopicExchangeMBean() throws Exception
{
DestWildExchange exchange = new DestWildExchange();
- exchange.initialise("amq.topic", false, 0, true);
+ exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
- mbean.createNewBinding(_queue.getName(), "binding1");
- mbean.createNewBinding(_queue.getName(), "binding2");
+ mbean.createNewBinding(_queue.getName().toString(), "binding1");
+ mbean.createNewBinding(_queue.getName().toString(), "binding2");
TabularData data = mbean.bindings();
ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
@@ -97,19 +102,19 @@ public class ExchangeMBeanTest extends TestCase
public void testHeadersExchangeMBean() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise("amq.headers", false, 0, true);
+ exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
- mbean.createNewBinding(_queue.getName(), "key1=binding1,key2=binding2");
- mbean.createNewBinding(_queue.getName(), "key3=binding3");
+ mbean.createNewBinding(_queue.getName().toString(), "key1=binding1,key2=binding2");
+ mbean.createNewBinding(_queue.getName().toString(), "key3=binding3");
TabularData data = mbean.bindings();
ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
assertTrue(list.size() == 2);
// test general exchange properties
- assertEquals(mbean.getName(), "amq.headers");
+ assertEquals(mbean.getName(), "amq.match");
assertEquals(mbean.getExchangeType(), "headers");
assertTrue(mbean.getTicketNo() == 0);
assertTrue(!mbean.isDurable());
@@ -120,8 +125,11 @@ public class ExchangeMBeanTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
- _queue = new AMQQueue("testQueue", false, "ExchangeMBeanTest", false, _queueRegistry);
+
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost);
_queueRegistry.registerQueue(_queue);
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 00a645628f..7bfc2e5386 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -21,16 +21,17 @@
package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.framing.MessageTransferBody;
public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
{
protected void setUp() throws Exception
{
super.setUp();
- ApplicationRegistry.initialise(new TestApplicationRegistry());
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
}
public void testSimple() throws AMQException
@@ -54,14 +55,16 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
Message m7 = new Message("Message7", "XXXXX");
- MessageTransferBody tb7 = m7.getTransferBody();
- tb7.mandatory = true;
+ MessageTransferBody mtb7 = m7.getTransferBody();
+ mtb7.mandatory = true;
routeAndTest(m7,true);
Message m8 = new Message("Message8", "F0000");
- MessageTransferBody tb8 = m8.getTransferBody();
- tb8.mandatory = true;
+ MessageTransferBody mtb8 = m8.getTransferBody();
+ mtb8.mandatory = true;
routeAndTest(m8,false,q1);
+
+
}
public void testAny() throws AMQException
@@ -85,10 +88,10 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
bindDefault("F0000");
Message m1 = new Message("Message1", "XXXXX");
Message m2 = new Message("Message2", "F0000");
- MessageTransferBody tb1 = m1.getTransferBody();
- tb1.mandatory = true;
- MessageTransferBody tb2 = m2.getTransferBody();
- tb2.mandatory = true;
+ MessageTransferBody mtb1 = m1.getTransferBody();
+ mtb1.mandatory = true;
+ MessageTransferBody mtb2 = m2.getTransferBody();
+ mtb2.mandatory = true;
routeAndTest(m1,true);
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index 546c61eda0..c8271f1549 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -4,6 +4,7 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
@@ -38,7 +39,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
{
super.setUp();
TransportConnection.createVMBroker(1);
- ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
+ ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
}
protected void tearDown() throws Exception
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index f84f14f28d..3e23b9ae42 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -21,17 +21,21 @@ import junit.framework.TestCase;
import org.apache.mina.common.IoSession;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import javax.management.JMException;
-import javax.management.openmbean.OpenDataException;
/**
* Test class to test MBean operations for AMQMinaProtocolSession.
@@ -45,23 +49,28 @@ public class AMQProtocolSessionMBeanTest extends TestCase
private QueueRegistry _queueRegistry;
private ExchangeRegistry _exchangeRegistry;
private AMQProtocolSessionMBean _mbean;
+ private VirtualHost _virtualHost;
public void testChannels() throws Exception
{
// check the channel count is correct
int channelCount = _mbean.channels().size();
- assertTrue(channelCount == 2);
- _protocolSession.addChannel(new AMQChannel(2,_protocolSession, _messageStore, null,null));
+ assertTrue(channelCount == 1);
+ AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()),
+ false, new AMQShortString("test"), true, _virtualHost);
+ AMQChannel channel = new AMQChannel(2, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/);
+ channel.setDefaultQueue(queue);
+ _protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
- assertTrue(channelCount == 3);
+ assertTrue(channelCount == 2);
// general properties test
_mbean.setMaximumNumberOfChannels(1000L);
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
- AMQChannel channel3 = new AMQChannel(3,_protocolSession, _messageStore, null,null);
- channel3.setTransactional(true);
+ AMQChannel channel3 = new AMQChannel(3, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/);
+ channel3.setLocalTransactional();
_protocolSession.addChannel(channel3);
_mbean.rollbackTransactions(2);
_mbean.rollbackTransactions(3);
@@ -80,31 +89,33 @@ public class AMQProtocolSessionMBeanTest extends TestCase
}
// check if closing of session works
- _protocolSession.addChannel(new AMQChannel(5,_protocolSession, _messageStore, null,null));
+ _protocolSession.addChannel(new AMQChannel(5, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/));
_mbean.closeConnection();
- channelCount = _mbean.channels().size();
- assertTrue(channelCount == 0);
- try
- {
+// try
+// {
+ channelCount = _mbean.channels().size();
+ assertTrue(channelCount == 0);
// session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(6,_protocolSession, _messageStore, null,null));
+ _protocolSession.addChannel(new AMQChannel(6, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/));
fail();
- }
- catch(IllegalStateException ex)
- {
- System.out.println("expected exception is thrown :" + ex.getMessage());
- }
+// }
+// catch(AMQException ex)
+// {
+// System.out.println("expected exception is thrown :" + ex.getMessage());
+// }
}
@Override
protected void setUp() throws Exception
{
- super.setUp();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeRegistry = new DefaultExchangeRegistry(new DefaultExchangeFactory());
+ super.setUp();
+ _channel = new AMQChannel(1, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/);
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _exchangeRegistry = _virtualHost.getExchangeRegistry();
_mockIOSession = new MockIoSession();
- _protocolSession = new AMQMinaProtocolSession(_mockIOSession, _queueRegistry, _exchangeRegistry, new AMQCodecFactory(true));
- _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null);
+ _protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true));
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java
index cf6366b513..cf6366b513 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index ae398961be..92ba9db728 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -17,20 +17,24 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Collections;
-
-import javax.management.JMException;
-
import junit.framework.TestCase;
-
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import javax.management.JMException;
+import java.util.LinkedList;
+import java.util.HashSet;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -41,8 +45,14 @@ public class AMQQueueMBeanTest extends TestCase
private AMQQueueMBean _queueMBean;
private QueueRegistry _queueRegistry;
private MessageStore _messageStore = new SkeletonMessageStore();
+ private StoreContext _storeContext = new StoreContext();
+ private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
private MockProtocolSession _protocolSession;
private AMQChannel _channel;
+ private VirtualHost _virtualHost;
public void testMessageCount() throws Exception
{
@@ -50,8 +60,7 @@ public class AMQQueueMBeanTest extends TestCase
sendMessages(messageCount);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- // each message is 1K
- assertTrue(_queueMBean.getQueueDepth() == messageCount);
+ assertTrue(_queueMBean.getQueueDepth() == 10);
_queueMBean.deleteMessageFromTop();
assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
@@ -67,12 +76,12 @@ public class AMQQueueMBeanTest extends TestCase
SubscriptionManager mgr = _queue.getSubscribers();
assertFalse(mgr.hasActiveSubscribers());
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
-
- _protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null);
+
+ _channel = new AMQChannel(1, _protocolSession, _virtualHost.getMessageStore(), _virtualHost.getExchangeRegistry(), null/*methodListener*/);
+ _protocolSession = new MockProtocolSession(_messageStore);
_protocolSession.addChannel(_channel);
-
- _queue.registerProtocolSession(_protocolSession, 1, "test", false, null, false, false);
+
+ _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;
@@ -138,8 +147,9 @@ public class AMQQueueMBeanTest extends TestCase
AMQMessage msg = message(false);
long id = msg.getMessageId();
- _queue.clearQueue();
- _queue.deliver(msg);
+ _queue.clearQueue(_storeContext);
+ _queue.process(_storeContext, msg);
+// msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
_queueMBean.viewMessageContent(id);
try
{
@@ -156,46 +166,46 @@ public class AMQQueueMBeanTest extends TestCase
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Establish some way to determine the version for the test.
-
- MessageHeaders messageHeaders = new MessageHeaders();
-
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1000]);
- Content body = new Content(Content.TypeEnum.INLINE_T, buffer);
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- body, // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- false, // boolean mandatory
- "", // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- "rk", // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- return new AMQMessage(_messageStore, methodBody, Collections.singletonList(buffer));
+ MessageTransferBody publish = new MessageTransferBody(
+ (byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ null, // AMQShortString appId
+ null, // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ null, // AMQShortString exchange
+ 0L, // long expiration
+ immediate, // boolean immediate
+ false, // boolean mandatory
+ null, // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ null, // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null); // AMQShortString userId
+
+ return new AMQMessage(_messageStore, publish, _transactionalContext);
}
@Override
protected void setUp() throws Exception
{
super.setUp();
- _queueRegistry = new DefaultQueueRegistry();
- _queue = new AMQQueue("testQueue", false, "AMQueueMBeanTest", false, _queueRegistry);
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
}
@@ -205,11 +215,15 @@ public class AMQQueueMBeanTest extends TestCase
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false);
- ;
}
for (int i = 0; i < messageCount; i++)
{
- _queue.deliver(messages[i]);
+ _queue.process(_storeContext, messages[i]);
+ }
+
+ for (int i = 0; i < messages.length; i++)
+ {
+// messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
}
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index ca777ca535..a812257203 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,28 +20,34 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-
import junit.framework.TestCase;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.HashSet;
/**
* Tests that acknowledgements are handled correctly.
*/
public class AckTest extends TestCase
{
+ private TransactionalContext txnContext = null;
+
private static final Logger _log = Logger.getLogger(AckTest.class);
private SubscriptionImpl _subscription;
@@ -50,26 +56,30 @@ public class AckTest extends TestCase
private TestableMemoryMessageStore _messageStore;
+ private StoreContext _storeContext = new StoreContext();
+
private AMQChannel _channel;
private SubscriptionSet _subscriptionManager;
private AMQQueue _queue;
+ private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag");
+
public AckTest() throws Exception
{
- ApplicationRegistry.initialise(new TestApplicationRegistry());
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
}
protected void setUp() throws Exception
{
super.setUp();
- _messageStore = new TestableMemoryMessageStore();
+ _messageStore = new TestableMemoryMessageStore();
_protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(5,_protocolSession, _messageStore, null/*dont need exchange registry*/,null);
+ _channel = new AMQChannel(5, _protocolSession, null/*MessageStore*/, null/*ExchangeRegistry*/, null/*methodListener*/);
_protocolSession.addChannel(_channel);
- _subscriptionManager = new SubscriptionSet();
- _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager);
+ _subscriptionManager = new SubscriptionSet();
+ _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);
}
private void publishMessages(int count) throws AMQException
@@ -79,41 +89,54 @@ public class AckTest extends TestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
+ TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+ MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Establish some way to determine the version for the test.
- //AMQMessage(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents)
-
- MessageHeaders messageHeaders = new MessageHeaders();
-
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- new Content(), // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- persistent ? (short)2: (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- false, // boolean immediate
- false, // boolean mandatory
- "" + i, // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- "rk", // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- AMQMessage msg = new AMQMessage(_messageStore, methodBody, new ArrayList());
+ MessageTransferBody publishBody = new MessageTransferBody(
+ (byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ null, // AMQShortString appId
+ null, // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ new AMQShortString("someExchange"), // AMQShortString exchange
+ 0L, // long expiration
+ false, // boolean immediate
+ false, // boolean mandatory
+ null, // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ new AMQShortString("rk"), // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null); // AMQShortString userId
+ AMQMessage msg = new AMQMessage(_messageStore, publishBody, txnContext);
+ if (persistent)
+ {
+ //This is DeliveryMode.PERSISTENT
+ msg.setDeliveryMode((byte) 2);
+ }
+ // we increment the reference here since we are not delivering the messaging to any queues, which is where
+ // the reference is normally incremented. The test is easier to construct if we have direct access to the
+ // subscription
+ msg.incrementReference();
+// msg.routingComplete(_messageStore, _storeContext, factory);
+ // we manually send the message to the subscription
_subscription.send(msg, _queue);
}
}
@@ -124,25 +147,26 @@ public class AckTest extends TestCase
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
final int msgCount = 10;
publishMessages(msgCount, true);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMap().size() == msgCount);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
- for (int i = 1; i <= map.size(); i++)
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
+ int i = 1;
+ for (long deliveryTag : deliveryTagSet)
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i);
- UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(deliveryTag == i);
+ i++;
+ UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.queue == _queue);
}
assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMap().size() == msgCount);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
}
/**
@@ -151,13 +175,13 @@ public class AckTest extends TestCase
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
+ _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false);
final int msgCount = 10;
publishMessages(msgCount);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMap().size() == 0);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
}
/**
@@ -166,21 +190,20 @@ public class AckTest extends TestCase
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
final int msgCount = 10;
publishMessages(msgCount);
_channel.acknowledgeMessage(5, false);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount - 1);
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
- while (i <= map.size())
+ for (long deliveryTag : deliveryTagSet)
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i);
- UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(deliveryTag == i);
+ UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.queue == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
@@ -196,21 +219,20 @@ public class AckTest extends TestCase
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
final int msgCount = 10;
publishMessages(msgCount);
_channel.acknowledgeMessage(5, true);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
- while (i <= map.size())
+ for (long deliveryTag : deliveryTagSet)
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i + 5);
- UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(deliveryTag == i + 5);
+ UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.queue == _queue);
++i;
}
@@ -221,21 +243,20 @@ public class AckTest extends TestCase
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
final int msgCount = 10;
publishMessages(msgCount);
_channel.acknowledgeMessage(0, true);
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
- while (i <= map.size())
+ for (long deliveryTag : deliveryTagSet)
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i + 5);
- UnacknowledgedMessage unackedMsg = entry.getValue();
+ assertTrue(deliveryTag == i + 5);
+ UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
assertTrue(unackedMsg.queue == _queue);
++i;
}
@@ -246,7 +267,7 @@ public class AckTest extends TestCase
int lowMark = 5;
int highMark = 10;
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
_channel.setPrefetchLowMarkCount(lowMark);
_channel.setPrefetchHighMarkCount(highMark);
@@ -259,7 +280,7 @@ public class AckTest extends TestCase
// which have not bee received so will be queued up in the channel
// which should be suspended
assertTrue(_subscription.isSuspended());
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == highMark);
//acknowledge messages so we are just above lowMark
@@ -297,7 +318,7 @@ public class AckTest extends TestCase
public void testPrefetch() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
_channel.setPrefetchCount(5);
assertTrue(_channel.getPrefetchCount() == 5);
@@ -308,7 +329,7 @@ public class AckTest extends TestCase
// at this point we should have sent out only 5 messages with a further 5 queued
// up in the channel which should now be suspended
assertTrue(_subscription.isSuspended());
- Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
_channel.acknowledgeMessage(5, true);
assertTrue(!_subscription.isSuspended());
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
index fe8960c872..6f3d42d090 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,11 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
import java.util.concurrent.Executor;
@@ -49,11 +53,15 @@ public class ConcurrencyTest extends MessageTestHelper
private boolean isComplete;
private boolean failed;
+ private VirtualHost _virtualHost;
public ConcurrencyTest() throws Exception
{
- _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
- new DefaultQueueRegistry()));
+
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
+ _virtualHost));
}
public void testConcurrent1() throws InterruptedException, AMQException
@@ -186,7 +194,7 @@ public class ConcurrencyTest extends MessageTestHelper
AMQMessage msg = nextMessage();
if (msg != null)
{
- _deliveryMgr.deliver(toString(), msg);
+ _deliveryMgr.deliver(null, new AMQShortString(toString()), msg);
}
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index 3631264e5a..e1be640c8e 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,9 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import junit.framework.TestSuite;
@@ -29,6 +31,8 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
{
protected final SubscriptionSet _subscriptions = new SubscriptionSet();
protected DeliveryManager _mgr;
+ protected StoreContext _storeContext = new StoreContext();
+ private static final AMQShortString DEFAULT_QUEUE_NAME = new AMQShortString("Me");
public DeliveryManagerTest() throws Exception
{
@@ -45,7 +49,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
}
SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
@@ -55,7 +59,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
}
assertTrue(s1.getMessages().isEmpty());
@@ -93,7 +97,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
}
assertEquals(batch, s1.getMessages().size());
@@ -107,7 +111,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
s1.setSuspended(true);
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
}
_mgr.processAsync(new OnCurrentThreadExecutor());
@@ -129,7 +133,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
try
{
AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
@@ -151,7 +155,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
_subscriptions.addSubscriber(s);
s.setSuspended(true);
AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
@@ -168,8 +172,6 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
public static junit.framework.Test suite()
{
TestSuite suite = new TestSuite();
- suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
return suite;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
new file mode 100644
index 0000000000..78f8223733
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.AMQException;
+
+import junit.framework.TestCase;
+
+import java.util.LinkedList;
+import java.util.HashSet;
+
+class MessageTestHelper extends TestCase
+{
+ private final MessageStore _messageStore = new SkeletonMessageStore();
+
+ private final StoreContext _storeContext = new StoreContext();
+
+ private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+
+ MessageTestHelper() throws Exception
+ {
+ ApplicationRegistry.initialise(new TestApplicationRegistry());
+ }
+
+ AMQMessage message() throws AMQException
+ {
+ return message(false);
+ }
+
+ AMQMessage message(boolean immediate) throws AMQException
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Establish some way to determine the version for the test.
+ MessageTransferBody publish = new MessageTransferBody(
+ (byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ null, // AMQShortString appId
+ null, // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ null, // AMQShortString exchange
+ 0L, // long expiration
+ immediate, // boolean immediate
+ false, // boolean mandatory
+ null, // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ null, // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null); // AMQShortString userId
+
+ return new AMQMessage(_messageStore, publish, _txnContext);
+ }
+
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 400b21c5b1..ca357976b2 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -24,10 +24,12 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.store.MessageStore;
@@ -35,7 +37,6 @@ import org.apache.qpid.server.store.MessageStore;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* A protocol session that can be used for testing purposes.
@@ -46,13 +47,8 @@ public class MockProtocolSession implements AMQProtocolSession
private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
- // Keeps a tally of connections for logging and debugging
- private static AtomicInteger _ConnectionId;
- static { _ConnectionId = new AtomicInteger(0); }
-
public MockProtocolSession(MessageStore messageStore)
{
- _ConnectionId.incrementAndGet();
_messageStore = messageStore;
}
@@ -64,12 +60,12 @@ public class MockProtocolSession implements AMQProtocolSession
{
}
- public String getContextKey()
+ public AMQShortString getContextKey()
{
return null;
}
- public void setContextKey(String contextKey)
+ public void setContextKey(AMQShortString contextKey)
{
}
@@ -142,110 +138,116 @@ public class MockProtocolSession implements AMQProtocolSession
public void setClientProperties(FieldTable clientProperties)
{
}
-
public Object getClientIdentifier()
{
return null;
}
- public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeChannelResponse(int channelId, long requestId) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeSessionRequest(int replyCode, String replyText) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void closeSessionResponse(long requestId) throws AMQException {
- // TODO Auto-generated method stub
-
- }
-
- public void setFrameMax(long size) {
- // TODO Auto-generated method stub
-
- }
-
- public long getFrameMax() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public QueueRegistry getQueueRegistry() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public ExchangeRegistry getExchangeRegistry() {
- // TODO Auto-generated method stub
- return null;
- }
+ public VirtualHost getVirtualHost()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
- public AMQStateManager getStateManager() {
- // TODO Auto-generated method stub
- return null;
- }
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
- public byte getMajor() {
- // TODO Auto-generated method stub
- return 0;
- }
+ public void addSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
- public byte getMinor() {
- // TODO Auto-generated method stub
- return 0;
- }
+ public void removeSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
- public boolean versionEquals(byte major, byte minor) {
- // TODO Auto-generated method stub
- return false;
- }
+ public byte getProtocolMajorVersion()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
- public void checkMethodBodyVersion(AMQMethodBody methodBody) {
- // TODO Auto-generated method stub
-
- }
+ public byte getProtocolMinorVersion()
+ {
+ return 9; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isProtocolVersionEqual(byte major, byte minor)
+ {
+ return major == 0 && minor == 9;
+ }
+
+ public void checkMethodBodyVersion(AMQMethodBody methodBody)
+ {
- public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) {
- // TODO Auto-generated method stub
- return 0;
- }
+ }
- public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) {
- // TODO Auto-generated method stub
-
- }
+ public VersionSpecificRegistry getRegistry()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getConnectionId()
+ {
+ return 0L;
+ }
+
+ public AMQStateManager getStateManager()
+ {
+ return null;
+ }
+
+ public long getFrameMax()
+ {
+ return 0L;
+ }
+
+ public void setFrameMax(long size)
+ {
+
+ }
+
+ public void closeChannelRequest(int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+
+ }
+
+ public void closeChannelResponse(int channelId, long requestId) throws AMQException
+ {
+
+ }
+
+ public void closeSessionRequest(int replyCode, AMQShortString replyText, int classId, int methodId) throws AMQException
+ {
+
+ }
- public void writeResponse(AMQMethodEvent evt, AMQMethodBody response) {
- // TODO Auto-generated method stub
-
- }
+ public void closeSessionRequest(int replyCode, AMQShortString replyText) throws AMQException
+ {
+
+ }
- public int getConnectionId()
+ public void closeSessionResponse(long requestId) throws AMQException
{
- return _ConnectionId.get();
+
+ }
+
+ public long writeRequest(int channelNum, AMQMethodBody methodBody,
+ AMQMethodListener methodListener)
+ {
+ return 0L;
}
- public void addSessionCloseTask(Task task)
+ public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
- public void removeSessionCloseTask(Task task)
+ public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
+
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
index d3ec3c11d4..d3ec3c11d4 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
index bcf54693d3..bcf54693d3 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index fea3c93280..b3574ecba4 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -67,6 +67,12 @@ public class SubscriptionTestHelper implements Subscription
return isSuspended;
}
+ public boolean wouldSuspend(AMQMessage msg)
+ {
+ return isSuspended;
+ }
+
+
public void queueDeleted(AMQQueue queue)
{
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index bc0a8a7d64..4468bfe13b 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,63 +41,94 @@ public class SkeletonMessageStore implements MessageStore
public void configure(String base, Configuration config) throws Exception
{
}
-
- public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
+ //To change body of implemented methods use File | Settings | File Templates.
}
public void close() throws Exception
{
}
- public void put(AMQMessage msg)
+ public void removeMessage(StoreContext s, Long messageId)
{
}
- public void removeMessage(long messageId)
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void beginTran(StoreContext s) throws AMQException
{
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public boolean inTran(StoreContext sc)
{
+ return false;
}
- public void removeQueue(String name) throws AMQException
+ public void commitTran(StoreContext storeContext) throws AMQException
{
}
- public void enqueueMessage(String name, long messageId) throws AMQException
+ public void abortTran(StoreContext storeContext) throws AMQException
{
}
- public void dequeueMessage(String name, long messageId) throws AMQException
+ public List<AMQQueue> createQueues() throws AMQException
{
+ return null;
}
- public void beginTran() throws AMQException
+ public Long getNewMessageId()
{
+ return _messageId.getAndIncrement();
}
- public boolean inTran()
+ public void storeContentChunk(StoreContext sc, Long messageId, int index, byte[] content, boolean lastContentBody) throws AMQException
{
- return false;
+
}
-
- public void commitTran() throws AMQException
+
+ public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
{
+
}
- public void abortTran() throws AMQException
+ public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
+ return null;
}
- public List<AMQQueue> createQueues() throws AMQException
+ public byte[] getContentChunk(Long messageId, int index) throws AMQException
{
return null;
}
- public long getNewMessageId()
+ public void removeQueue(AMQShortString name) throws AMQException
{
- return _messageId.getAndIncrement();
+
+ }
+
+ public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ {
+
+ }
+
+ public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ {
+
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, byte[] contentBody, boolean lastContentBody) throws AMQException
+ {
+
}
+
+ public byte[] getContentBodyChunk(Long messageId, int index) throws AMQException
+ {
+ return new byte[0];
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
new file mode 100644
index 0000000000..5d60596c64
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.txn.TransactionalContext;
+
+/**
+ * Tests that reference counting works correctly with AMQMessage and the message store
+ */
+public class TestReferenceCounting extends TestCase
+{
+ private TransactionalContext txnContext = null;
+
+ private TestableMemoryMessageStore _store;
+
+ private StoreContext _storeContext = new StoreContext();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _store = new TestableMemoryMessageStore();
+ }
+
+ /**
+ * Check that when the reference count is decremented the message removes itself from the store
+ */
+ public void testMessageGetsRemoved() throws AMQException
+ {
+ // TODO: fix hardcoded protocol version data
+ AMQMessage message = new AMQMessage(_store,
+ new MessageTransferBody((byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ null, // AMQShortString appId
+ null, // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ null, // AMQShortString exchange
+ 0L, // long expiration
+ false, // boolean immediate
+ false, // boolean mandatory
+ null, // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ null, // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null), // AMQShortString userId
+ txnContext);
+ message.incrementReference();
+ // we call routing complete to set up the handle
+// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
+ assertTrue(_store.getMessageMetaDataMap().size() == 1);
+ message.decrementReference(_storeContext);
+ assertTrue(_store.getMessageMetaDataMap().size() == 0);
+ }
+
+ public void testMessageRemains() throws AMQException
+ {
+ // TODO: fix hardcoded protocol version data
+ AMQMessage message = new AMQMessage(_store,
+ new MessageTransferBody((byte)0,
+ (byte)9,
+ MessageTransferBody.getClazz((byte)0,(byte)9),
+ MessageTransferBody.getMethod((byte)0,(byte)9),
+ null, // AMQShortString appId
+ null, // FieldTable applicationHeaders
+ null, // Content body
+ null, // AMQShortString contentEncoding
+ null, // AMQShortString contentType
+ null, // AMQShortString correlationId
+ (short)0, // short deliveryMode
+ null, // AMQShortString destination
+ null, // AMQShortString exchange
+ 0L, // long expiration
+ false, // boolean immediate
+ false, // boolean mandatory
+ null, // AMQShortString messageId
+ (short)0, // short priority
+ false, // boolean redelivered
+ null, // AMQShortString replyTo
+ null, // AMQShortString routingKey
+ null, // byte[] securityToken
+ 0, // int ticket
+ 0L, // long timestamp
+ null, // AMQShortString transactionId
+ 0L, // long ttl
+ null), // AMQShortString userId
+ txnContext);
+ message.incrementReference();
+ // we call routing complete to set up the handle
+// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
+ assertTrue(_store.getMessageMetaDataMap().size() == 1);
+ message.incrementReference();
+ message.decrementReference(_storeContext);
+ assertTrue(_store.getMessageMetaDataMap().size() == 1);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestReferenceCounting.class);
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index be7687a22c..fc775c904c 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageMetaData;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
/**
* Adds some extra methods to the memory message store for testing purposes.
@@ -32,11 +33,17 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
{
public TestableMemoryMessageStore()
{
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>();
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<byte[]>>();
+ }
+
+ public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ {
+ return _metaDataMap;
}
- public ConcurrentMap<Long, AMQMessage> getMessageMap()
+ public ConcurrentMap<Long, List<byte[]>> getContentBodyMap()
{
- return _messageMap;
+ return _contentBodyMap;
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
index ac5c60a931..1d9e30c24e 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,23 +20,23 @@
*/
package org.apache.qpid.server.txn;
+import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
-import junit.framework.TestCase;
-
public class TxnBufferTest extends TestCase
{
- private final LinkedList<MockOp> ops = new LinkedList<MockOp>();
+ private final LinkedList<MockOp> ops = new LinkedList<MockOp>();
public void testCommit() throws AMQException
{
MockStore store = new MockStore();
- TxnBuffer buffer = new TxnBuffer(store);
+ TxnBuffer buffer = new TxnBuffer();
buffer.enlist(new MockOp().expectPrepare().expectCommit());
//check relative ordering
MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit();
@@ -44,7 +44,7 @@ public class TxnBufferTest extends TestCase
buffer.enlist(op);
buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
@@ -54,12 +54,12 @@ public class TxnBufferTest extends TestCase
{
MockStore store = new MockStore();
- TxnBuffer buffer = new TxnBuffer(store);
+ TxnBuffer buffer = new TxnBuffer();
buffer.enlist(new MockOp().expectRollback());
buffer.enlist(new MockOp().expectRollback());
buffer.enlist(new MockOp().expectRollback());
- buffer.rollback();
+ buffer.rollback(null);
validateOps();
store.validate();
@@ -68,17 +68,17 @@ public class TxnBufferTest extends TestCase
public void testCommitWithFailureDuringPrepare() throws AMQException
{
MockStore store = new MockStore();
- store.expectBegin().expectAbort();
+ store.beginTran(null);
- TxnBuffer buffer = new TxnBuffer(store);
- buffer.containsPersistentChanges();
+ TxnBuffer buffer = new TxnBuffer();
+ buffer.enlist(new StoreMessageOperation(store));
buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
buffer.enlist(new TxnTester(store));
buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
buffer.enlist(new FailedPrepare());
buffer.enlist(new MockOp());
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
}
@@ -86,16 +86,17 @@ public class TxnBufferTest extends TestCase
public void testCommitWithPersistance() throws AMQException
{
MockStore store = new MockStore();
- store.expectBegin().expectCommit();
+ store.beginTran(null);
+ store.expectCommit();
- TxnBuffer buffer = new TxnBuffer(store);
+ TxnBuffer buffer = new TxnBuffer();
buffer.enlist(new MockOp().expectPrepare().expectCommit());
buffer.enlist(new MockOp().expectPrepare().expectCommit());
buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ buffer.enlist(new StoreMessageOperation(store));
buffer.enlist(new TxnTester(store));
- buffer.containsPersistentChanges();
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
}
@@ -114,7 +115,7 @@ public class TxnBufferTest extends TestCase
}
class MockOp implements TxnOp
- {
+ {
final Object PREPARE = "PREPARE";
final Object COMMIT = "COMMIT";
final Object UNDO_PREPARE = "UNDO_PREPARE";
@@ -127,12 +128,12 @@ public class TxnBufferTest extends TestCase
ops.add(this);
}
- public void prepare()
+ public void prepare(StoreContext context)
{
assertEquals(expected.removeLast(), PREPARE);
}
- public void commit()
+ public void commit(StoreContext context)
{
assertEquals(expected.removeLast(), COMMIT);
}
@@ -142,7 +143,7 @@ public class TxnBufferTest extends TestCase
assertEquals(expected.removeLast(), UNDO_PREPARE);
}
- public void rollback()
+ public void rollback(StoreContext context)
{
assertEquals(expected.removeLast(), ROLLBACK);
}
@@ -193,25 +194,24 @@ public class TxnBufferTest extends TestCase
private final LinkedList expected = new LinkedList();
private boolean inTran;
- public void beginTran() throws AMQException
+ public void beginTran(StoreContext context) throws AMQException
{
- assertEquals(expected.removeLast(), BEGIN);
inTran = true;
}
-
- public void commitTran() throws AMQException
+
+ public void commitTran(StoreContext context) throws AMQException
{
assertEquals(expected.removeLast(), COMMIT);
inTran = false;
}
-
- public void abortTran() throws AMQException
+
+ public void abortTran(StoreContext context) throws AMQException
{
assertEquals(expected.removeLast(), ABORT);
inTran = false;
}
- public boolean inTran()
+ public boolean inTran(StoreContext context)
{
return inTran;
}
@@ -249,23 +249,23 @@ public class TxnBufferTest extends TestCase
}
class NullOp implements TxnOp
- {
- public void prepare() throws AMQException
+ {
+ public void prepare(StoreContext context) throws AMQException
{
}
- public void commit()
+ public void commit(StoreContext context)
{
}
public void undoPrepare()
{
}
- public void rollback()
+ public void rollback(StoreContext context)
{
}
}
class FailedPrepare extends NullOp
- {
+ {
public void prepare() throws AMQException
{
throw new AMQException("Fail!");
@@ -273,9 +273,11 @@ public class TxnBufferTest extends TestCase
}
class TxnTester extends NullOp
- {
+ {
private final MessageStore store;
+ private final StoreContext context = new StoreContext();
+
TxnTester(MessageStore store)
{
this.store = store;
@@ -283,12 +285,12 @@ public class TxnBufferTest extends TestCase
public void prepare() throws AMQException
{
- assertTrue("Expected prepare to be performed under txn", store.inTran());
+ assertTrue("Expected prepare to be performed under txn", store.inTran(context));
}
public void commit()
{
- assertTrue("Expected commit not to be performed under txn", !store.inTran());
+ assertTrue("Expected commit not to be performed under txn", !store.inTran(context));
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/java/systests/src/main/java/org/apache/qpid/server/util/AveragedRun.java
index 1d17985ab5..1d17985ab5 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/util/AveragedRun.java
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/java/systests/src/main/java/org/apache/qpid/server/util/RunStats.java
index ec67fc68b3..ec67fc68b3 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/util/RunStats.java
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index f801daf27c..849285e6d6 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -29,14 +29,18 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.NullAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import java.util.HashMap;
+import java.util.Collection;
public class TestApplicationRegistry extends ApplicationRegistry
{
@@ -51,6 +55,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
private AuthenticationManager _authenticationManager;
private MessageStore _messageStore;
+ private VirtualHost _vHost;
public TestApplicationRegistry()
{
@@ -59,10 +64,12 @@ public class TestApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
- _managedObjectRegistry = new NoopManagedObjectRegistry();
- _queueRegistry = new DefaultQueueRegistry();
- _exchangeFactory = new DefaultExchangeFactory();
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _managedObjectRegistry = appRegistry.getManagedObjectRegistry();
+ _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _vHost.getQueueRegistry();
+ _exchangeFactory = _vHost.getExchangeFactory();
+ _exchangeRegistry = _vHost.getExchangeRegistry();
_authenticationManager = new NullAuthenticationManager();
_messageStore = new TestableMemoryMessageStore();
@@ -99,6 +106,16 @@ public class TestApplicationRegistry extends ApplicationRegistry
return _authenticationManager;
}
+ public Collection<String> getVirtualHostNames()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public VirtualHostRegistry getVirtualHostRegistry()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public MessageStore getMessageStore()
{
return _messageStore;
diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/java/systests/src/main/java/org/apache/qpid/server/util/TimedRun.java
index 1291380311..1291380311 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/util/TimedRun.java
diff --git a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java b/java/systests/src/main/java/org/apache/qpid/test/VMBrokerSetup.java
index e859fac4af..e859fac4af 100644
--- a/java/systests/src/test/java/org/apache/qpid/test/VMBrokerSetup.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/VMBrokerSetup.java
diff --git a/java/systests/src/test/java/log4j.properties b/java/systests/src/main/java/systests.log4j
index 6d596d1d19..6d596d1d19 100644
--- a/java/systests/src/test/java/log4j.properties
+++ b/java/systests/src/main/java/systests.log4j
diff --git a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
index 6490b9f270..2c5712fd35 100644
--- a/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
+++ b/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,9 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
@@ -43,6 +46,7 @@ import org.apache.qpid.server.util.TimedRun;
import java.util.ArrayList;
import java.util.List;
+import java.util.LinkedList;
public class SendPerfTest extends TimedRun
{
@@ -101,13 +105,16 @@ public class SendPerfTest extends TimedRun
ContentHeaderBody header = new ContentHeaderBody();
List<ContentBody> body = new ArrayList<ContentBody>();
MessageStore messageStore = new SkeletonMessageStore();
+ // channel can be null since it is only used in ack processing which does not apply to this test
+ TransactionalContext txContext = new NonTransactionalContext(messageStore, null,
+ new LinkedList<RequiredDeliveryException>());
body.add(new ContentBody());
+ MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 0; i < count; i++)
{
- for (AMQQueue q : queues)
- {
- q.deliver(new AMQMessage(messageStore, i, publish, header, body));
- }
+ // this routes and delivers the message
+ AMQMessage msg = new AMQMessage(i, publish, txContext, header, queues, body, messageStore,
+ factory);
}
}
diff --git a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
index a3e555aac9..3e35e3c85b 100644
--- a/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
+++ b/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,22 +20,19 @@
*/
package org.apache.qpid.test.unit.ack;
+import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.test.VMBrokerSetup;
import javax.jms.*;
-import junit.framework.TestCase;
-
public class DisconnectAndRedeliverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
@@ -55,12 +52,14 @@ public class DisconnectAndRedeliverTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
}
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killAllVMBrokers();
}
/**
@@ -82,7 +81,7 @@ public class DisconnectAndRedeliverTest extends TestCase
((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
-
+
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -149,7 +148,7 @@ public class DisconnectAndRedeliverTest extends TestCase
_logger.info("No messages redelivered as is expected");
con.close();
- _logger.info("Actually:" + store.getMessageMap().size());
+ _logger.info("Actually:" + store.getMessageMetaDataMap().size());
// assertTrue(store.getMessageMap().size() == 0);
}
@@ -204,13 +203,13 @@ public class DisconnectAndRedeliverTest extends TestCase
assertNull(tm);
_logger.info("No messages redelivered as is expected");
- _logger.info("Actually:" + store.getMessageMap().size());
- assertTrue(store.getMessageMap().size() == 0);
+ _logger.info("Actually:" + store.getMessageMetaDataMap().size());
+ assertTrue(store.getMessageMetaDataMap().size() == 0);
con.close();
}
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(DisconnectAndRedeliverTest.class));
+ return new junit.framework.TestSuite(DisconnectAndRedeliverTest.class);
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
deleted file mode 100644
index 6320a2c1be..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import junit.framework.TestCase;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class AbstractHeadersExchangeTestBase extends TestCase
-{
- private final HeadersExchange exchange = new HeadersExchange();
- protected final Set<TestQueue> queues = new HashSet<TestQueue>();
- private int count;
-
- public void testDoNothing()
- {
- // this is here only to make junit under Eclipse happy
- }
-
- protected TestQueue bindDefault(String... bindings) throws AMQException
- {
- return bind("Queue" + (++count), bindings);
- }
-
- protected TestQueue bind(String queueName, String... bindings) throws AMQException
- {
- return bind(queueName, getHeaders(bindings));
- }
-
- protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
- {
- return bind(new TestQueue(queue), bindings);
- }
-
- protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
- {
- return bind(queue, getHeaders(bindings));
- }
-
- protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
- {
- queues.add(queue);
- exchange.registerQueue(null, queue, bindings);
- return queue;
- }
-
-
- protected void route(Message m) throws AMQException
- {
- m.route(exchange);
- }
-
- protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, false, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
- {
- routeAndTest(m, expectReturn, Arrays.asList(expected));
- }
-
- protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
- {
- routeAndTest(m, false, expected);
- }
-
- protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
- {
- try
- {
- route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
- for (TestQueue q : queues)
- {
- if (expected.contains(q))
- {
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
- }
- }
- }
-
- catch (NoRouteException ex)
- {
- assertTrue("Expected "+m+" not to be returned",expectReturn);
- }
-
- }
-
- static FieldTable getHeaders(String... entries)
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- for (String s : entries)
- {
- String[] parts = s.split("=", 2);
- headers.put(parts[0], parts.length > 1 ? parts[1] : "");
- }
- return headers;
- }
-
- static class TestQueue extends AMQQueue
- {
- final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
-
- public TestQueue(String name) throws AMQException
- {
- super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry());
- }
-
- public void deliver(AMQMessage msg) throws AMQException
- {
- messages.add(new HeadersExchangeTest.Message(msg));
- }
- }
-
- /**
- * Just add some extra utility methods to AMQMessage to aid testing.
- */
- static class Message extends AMQMessage
- {
- private static MessageStore _messageStore = new SkeletonMessageStore();
-
- Message(String id, String... headers) throws AMQException
- {
- this(id, getHeaders(headers));
- }
-
- Message(String id, FieldTable headers) throws AMQException
- {
- this(_messageStore, getMessageTransferBody(id,headers), new ArrayList());
- }
-
- private static MessageTransferBody getMessageTransferBody(String id,FieldTable headers){
- MessageHeaders messageHeaders = new MessageHeaders();
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- headers, // FieldTable applicationHeaders
- new Content(), // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- false, // boolean immediate
- false, // boolean mandatory
- null, // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- id, // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- return methodBody;
- }
-
- Message(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents) throws AMQException
- {
- super(_messageStore, transferBody, contents);
- }
-
- private Message(AMQMessage msg) throws AMQException
- {
- super(msg);
- }
-
- void route(Exchange exchange) throws AMQException
- {
- exchange.route(this);
- }
-
- boolean isInQueue(TestQueue queue)
- {
- return queue.messages.contains(this);
- }
-
- public int hashCode()
- {
- return getKey().hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
- }
-
- private boolean equals(HeadersExchangeTest.Message m)
- {
- return getKey().equals(m.getKey());
- }
-
- public String toString()
- {
- return getKey().toString();
- }
-
- private Object getKey()
- {
- return this.getTransferBody().getRoutingKey();
- }
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
deleted file mode 100644
index 3072d44f48..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-
-public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
-{
- public ConcurrentDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","true");
- _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ConcurrentDeliveryManagerTest.class);
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
deleted file mode 100644
index 3eba217903..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.ArrayList;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
-import org.apache.qpid.framing.Content;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-
-class MessageTestHelper extends TestCase
-{
- private final MessageStore _messageStore = new SkeletonMessageStore();
-
- MessageTestHelper() throws Exception
- {
- ApplicationRegistry.initialise(new TestApplicationRegistry());
- }
-
- AMQMessage message() throws AMQException
- {
- return message(false);
- }
-
- AMQMessage message(boolean immediate) throws AMQException
- {
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Establish some way to determine the version for the test.
- MessageHeaders messageHeaders = new MessageHeaders();
-
- MessageTransferBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- new Content(), // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)1, // short deliveryMode
- "someExchange", // String destination
- "someExchange", // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- false, // boolean mandatory
- "", // String messageId
- (short)0, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- "rk", // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- 0, // long ttl
- messageHeaders.getUserId()); // String userId
-
- return new AMQMessage(_messageStore, methodBody, new ArrayList());
- }
-
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
deleted file mode 100644
index ebe8e192a0..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-import org.apache.qpid.AMQException;
-
-import junit.framework.TestSuite;
-
-public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest
-{
- public SynchronizedDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","false");
- _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
- return suite;
- }
-}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
deleted file mode 100644
index f162506fed..0000000000
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.AMQException;
-
-import junit.framework.TestCase;
-
-/**
- * Tests that reference counting works correctly with AMQMessage and the message store
- */
-public class TestReferenceCounting extends TestCase
-{
- private TestableMemoryMessageStore _store;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _store = new TestableMemoryMessageStore();
- }
-
- /**
- * Check that when the reference count is decremented the message removes itself from the store
- */
- public void testMessageGetsRemoved() throws AMQException
- {
- AMQMessage message = new AMQMessage(_store, null);
- _store.put(message);
- assertTrue(_store.getMessageMap().size() == 1);
- message.decrementReference();
- assertTrue(_store.getMessageMap().size() == 0);
- }
-
- public void testMessageRemains() throws AMQException
- {
- AMQMessage message = new AMQMessage(_store, null);
- _store.put(message);
- assertTrue(_store.getMessageMap().size() == 1);
- message.incrementReference();
- message.decrementReference();
- assertTrue(_store.getMessageMap().size() == 1);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TestReferenceCounting.class);
- }
-}