From 2e24b12c60f054db7c37287dbe50c92769923416 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 13 Jul 2013 18:43:28 +0000 Subject: QPID-4983 : [Java Broker] Move store implementations to broker plugins git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1502835 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/bdbstore/build.xml | 2 +- qpid/java/bdbstore/jmx/build.xml | 2 +- .../store/berkeleydb/BDBMessageStoreTest.java | 5 +- qpid/java/broker-plugins/derby-store/build.xml | 1 + ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 + ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 - qpid/java/broker-plugins/jdbc-store/build.xml | 1 + .../resources/virtualhost/store/pool/none/add.html | 0 ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 + ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 - .../management/virtualhost/store/memory/add.js | 56 -- .../resources/virtualhost/store/memory/add.html | 0 .../resources/virtualhost/store/pool/none/add.html | 0 qpid/java/broker-plugins/memory-store/build.xml | 32 + .../qpid/server/store/MemoryMessageStore.java | 34 + .../server/store/MemoryMessageStoreFactory.java | 53 ++ .../management/virtualhost/store/memory/add.js | 56 ++ .../resources/virtualhost/store/memory/add.html | 0 ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 + .../configuration/VirtualHostConfiguration.java | 3 +- .../server/store/AbstractMemoryMessageStore.java | 141 +++ .../qpid/server/store/MemoryMessageStore.java | 150 ---- .../server/store/MemoryMessageStoreFactory.java | 53 -- .../qpid/server/store/MessageStoreCreator.java | 3 +- .../virtualhost/StandardVirtualHostFactory.java | 17 - ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 - .../qpid/server/ExtractResendAndRequeueTest.java | 4 +- .../startup/VirtualHostRecovererTest.java | 2 +- .../qpid/server/exchange/TopicExchangeTest.java | 4 +- .../apache/qpid/server/model/VirtualHostTest.java | 7 +- .../apache/qpid/server/store/MessageStoreTest.java | 948 --------------------- .../qpid/server/store/TestMemoryMessageStore.java | 34 + .../store/TestMemoryMessageStoreFactory.java | 54 ++ .../server/store/TestableMemoryMessageStore.java | 2 +- .../virtualhost/StandardVirtualHostTest.java | 11 +- ...g.apache.qpid.server.plugin.MessageStoreFactory | 19 + qpid/java/build.deps | 1 + qpid/java/perftests/build.xml | 2 +- qpid/java/systests/build.xml | 2 +- .../apache/qpid/server/store/MessageStoreTest.java | 948 +++++++++++++++++++++ .../apache/qpid/server/store/SlowMessageStore.java | 4 +- .../apache/qpid/test/utils/QpidBrokerTestCase.java | 7 +- 42 files changed, 1461 insertions(+), 1311 deletions(-) create mode 100644 qpid/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory delete mode 100644 qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/pool/none/add.html create mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory delete mode 100644 qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/memory/add.html delete mode 100644 qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/pool/none/add.html create mode 100644 qpid/java/broker-plugins/memory-store/build.xml create mode 100644 qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java create mode 100644 qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java create mode 100644 qpid/java/broker-plugins/memory-store/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js create mode 100644 qpid/java/broker-plugins/memory-store/src/main/java/resources/virtualhost/store/memory/add.html create mode 100644 qpid/java/broker-plugins/memory-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java delete mode 100644 qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java create mode 100644 qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/build.xml b/qpid/java/bdbstore/build.xml index 1a749fab39..67c30787d3 100644 --- a/qpid/java/bdbstore/build.xml +++ b/qpid/java/bdbstore/build.xml @@ -18,7 +18,7 @@ --> - + diff --git a/qpid/java/bdbstore/jmx/build.xml b/qpid/java/bdbstore/jmx/build.xml index d3e9f63b46..5f3654c6c5 100644 --- a/qpid/java/bdbstore/jmx/build.xml +++ b/qpid/java/bdbstore/jmx/build.xml @@ -18,7 +18,7 @@ --> - + diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index e77119b140..fbfdf78bd2 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -39,7 +39,7 @@ import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStoreTest; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -57,13 +57,12 @@ import org.apache.qpid.transport.MessageTransfer; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Subclass of MessageStoreTest which runs the standard tests from the superclass against * the BDB Store as well as additional tests specific to the BDB store-implementation. */ -public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest +public class BDBMessageStoreTest extends MessageStoreTest { private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; diff --git a/qpid/java/broker-plugins/derby-store/build.xml b/qpid/java/broker-plugins/derby-store/build.xml index e93b81aad7..be3d72f059 100644 --- a/qpid/java/broker-plugins/derby-store/build.xml +++ b/qpid/java/broker-plugins/derby-store/build.xml @@ -22,6 +22,7 @@ + diff --git a/qpid/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..88ca1fed5e --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.derby.DerbyMessageStoreFactory diff --git a/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory deleted file mode 100644 index 88ca1fed5e..0000000000 --- a/qpid/java/broker-plugins/derby-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -org.apache.qpid.server.store.derby.DerbyMessageStoreFactory diff --git a/qpid/java/broker-plugins/jdbc-store/build.xml b/qpid/java/broker-plugins/jdbc-store/build.xml index de6ec59845..9b6aeb32d6 100644 --- a/qpid/java/broker-plugins/jdbc-store/build.xml +++ b/qpid/java/broker-plugins/jdbc-store/build.xml @@ -22,6 +22,7 @@ + diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/pool/none/add.html b/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/pool/none/add.html new file mode 100644 index 0000000000..e69de29bb2 diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..a77458f27d --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory deleted file mode 100644 index a77458f27d..0000000000 --- a/qpid/java/broker-plugins/jdbc-store/src/main/resources/services/org.apache.qpid.server.plugin.MessageStoreFactory +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -org.apache.qpid.server.store.jdbc.JDBCMessageStoreFactory diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js deleted file mode 100644 index 3a9b23274d..0000000000 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -define(["dojo/_base/xhr", - "dojo/dom", - "dojo/dom-construct", - "dojo/_base/window", - "dijit/registry", - "dojo/parser", - "dojo/_base/array", - "dojo/_base/event", - "dojo/_base/json", - "dojo/string", - "dojo/store/Memory", - "dijit/form/FilteringSelect", - "dojo/domReady!"], - function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { - return { - show: function() { - var node = dom.byId("addVirtualHost.storeSpecificDiv"); - var that = this; - - array.forEach(registry.toArray(), - function(item) { - if(item.id.substr(0,33) == "formAddVirtualHost.specific.store") { - item.destroyRecursive(); - } - }); - - xhr.get({url: "virtualhost/store/memory/add.html", - sync: true, - load: function(data) { - node.innerHTML = data; - parser.parse(node); - - }}); - } - }; - }); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/memory/add.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/memory/add.html deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/pool/none/add.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/store/pool/none/add.html deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/qpid/java/broker-plugins/memory-store/build.xml b/qpid/java/broker-plugins/memory-store/build.xml new file mode 100644 index 0000000000..f265e68e94 --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/build.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java new file mode 100644 index 0000000000..61fef91e83 --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +public class MemoryMessageStore extends AbstractMemoryMessageStore +{ + public static final String TYPE = "Memory"; + + @Override + public String getStoreType() + { + return TYPE; + } +} diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java new file mode 100644 index 0000000000..49f823e7ee --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collections; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.plugin.MessageStoreFactory; + +public class MemoryMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return MemoryMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new MemoryMessageStore(); + } + + @Override + public Map convertStoreConfiguration(Configuration configuration) + { + return Collections.emptyMap(); + } + + @Override + public void validateAttributes(Map attributes) + { + } +} diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js b/qpid/java/broker-plugins/memory-store/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js new file mode 100644 index 0000000000..3a9b23274d --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/src/main/java/resources/js/qpid/management/virtualhost/store/memory/add.js @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +define(["dojo/_base/xhr", + "dojo/dom", + "dojo/dom-construct", + "dojo/_base/window", + "dijit/registry", + "dojo/parser", + "dojo/_base/array", + "dojo/_base/event", + "dojo/_base/json", + "dojo/string", + "dojo/store/Memory", + "dijit/form/FilteringSelect", + "dojo/domReady!"], + function (xhr, dom, construct, win, registry, parser, array, event, json, string, Memory, FilteringSelect) { + return { + show: function() { + var node = dom.byId("addVirtualHost.storeSpecificDiv"); + var that = this; + + array.forEach(registry.toArray(), + function(item) { + if(item.id.substr(0,33) == "formAddVirtualHost.specific.store") { + item.destroyRecursive(); + } + }); + + xhr.get({url: "virtualhost/store/memory/add.html", + sync: true, + load: function(data) { + node.innerHTML = data; + parser.parse(node); + + }}); + } + }; + }); diff --git a/qpid/java/broker-plugins/memory-store/src/main/java/resources/virtualhost/store/memory/add.html b/qpid/java/broker-plugins/memory-store/src/main/java/resources/virtualhost/store/memory/add.html new file mode 100644 index 0000000000..e69de29bb2 diff --git a/qpid/java/broker-plugins/memory-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker-plugins/memory-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..02f22eb21a --- /dev/null +++ b/qpid/java/broker-plugins/memory-store/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.MemoryMessageStoreFactory diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 041ccf1f50..189f5916e0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -26,7 +26,6 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.configuration.plugins.AbstractConfiguration; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.store.MemoryMessageStore; import java.io.File; import java.util.HashMap; @@ -129,7 +128,7 @@ public class VirtualHostConfiguration extends AbstractConfiguration public String getMessageStoreClass() { - return getStringValue("store.class", MemoryMessageStore.class.getName()); + return getStringValue("store.class", null); } public void setMessageStoreClass(String storeFactoryClass) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java new file mode 100644 index 0000000000..ac95d9fdb3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.VirtualHost; + +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +abstract public class AbstractMemoryMessageStore extends NullMessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() + { + @Override + public StoreFuture commitTranAsync() throws AMQStoreException + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + } + + @Override + public void commitTran() throws AMQStoreException + { + } + + @Override + public void abortTran() throws AMQStoreException + { + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } + }; + + private final StateManager _stateManager; + private final EventManager _eventManager = new EventManager(); + + public AbstractMemoryMessageStore() + { + _stateManager = new StateManager(_eventManager); + } + + @Override + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + VirtualHost virtualHost) throws Exception + { + _stateManager.attainState(State.INITIALISING); + } + + @Override + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception + { + _stateManager.attainState(State.INITIALISED); + } + + @Override + public void activate() throws Exception + { + _stateManager.attainState(State.ACTIVATING); + + _stateManager.attainState(State.ACTIVE); + } + + @Override + public StoredMessage addMessage(StorableMessageMetaData metaData) + { + final long id = _messageId.getAndIncrement(); + StoredMemoryMessage message = new StoredMemoryMessage(id, metaData); + + return message; + } + + @Override + public Transaction newTransaction() + { + return IN_MEMORY_TRANSACTION; + } + + @Override + public boolean isPersistent() + { + return false; + } + + @Override + public void close() throws Exception + { + _stateManager.attainState(State.CLOSING); + _closed.getAndSet(true); + _stateManager.attainState(State.CLOSED); + } + + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java deleted file mode 100644 index b7372828e1..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.Map; -import java.util.UUID; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.message.EnqueableMessage; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.qpid.server.model.VirtualHost; - -/** A simple message store that stores the messages in a thread-safe structure in memory. */ -public class MemoryMessageStore extends NullMessageStore -{ - public static final String TYPE = "Memory"; - private final AtomicLong _messageId = new AtomicLong(1); - private final AtomicBoolean _closed = new AtomicBoolean(false); - - private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() - { - @Override - public StoreFuture commitTranAsync() throws AMQStoreException - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException - { - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException - { - } - - @Override - public void commitTran() throws AMQStoreException - { - } - - @Override - public void abortTran() throws AMQStoreException - { - } - - @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - } - - @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - } - }; - - private final StateManager _stateManager; - private final EventManager _eventManager = new EventManager(); - - public MemoryMessageStore() - { - _stateManager = new StateManager(_eventManager); - } - - @Override - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - VirtualHost virtualHost) throws Exception - { - _stateManager.attainState(State.INITIALISING); - } - - @Override - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception - { - _stateManager.attainState(State.INITIALISED); - } - - @Override - public void activate() throws Exception - { - _stateManager.attainState(State.ACTIVATING); - - _stateManager.attainState(State.ACTIVE); - } - - @Override - public StoredMessage addMessage(StorableMessageMetaData metaData) - { - final long id = _messageId.getAndIncrement(); - StoredMemoryMessage message = new StoredMemoryMessage(id, metaData); - - return message; - } - - @Override - public Transaction newTransaction() - { - return IN_MEMORY_TRANSACTION; - } - - @Override - public boolean isPersistent() - { - return false; - } - - @Override - public void close() throws Exception - { - _stateManager.attainState(State.CLOSING); - _closed.getAndSet(true); - _stateManager.attainState(State.CLOSED); - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - _eventManager.addEventListener(eventListener, events); - } - - @Override - public String getStoreType() - { - return TYPE; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java deleted file mode 100644 index 49f823e7ee..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.Collections; -import java.util.Map; -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.plugin.MessageStoreFactory; - -public class MemoryMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public String getType() - { - return MemoryMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new MemoryMessageStore(); - } - - @Override - public Map convertStoreConfiguration(Configuration configuration) - { - return Collections.emptyMap(); - } - - @Override - public void validateAttributes(Map attributes) - { - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java index fe7dd81e0c..a8013b8e9b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java @@ -61,7 +61,8 @@ public class MessageStoreCreator MessageStoreFactory factory = _factories.get(storeType.toLowerCase()); if (factory == null) { - throw new IllegalConfigurationException("Unknown store type: " + storeType); + throw new IllegalConfigurationException("Unknown store type: " + storeType + + ". Supported types: " + _factories.keySet()); } return factory.createMessageStore(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java index 2b4cc37814..08f35c08f9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java @@ -19,20 +19,14 @@ package org.apache.qpid.server.virtualhost;/* * */ -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.model.adapter.VirtualHostAdapter; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreCreator; @@ -89,17 +83,6 @@ public class StandardVirtualHostFactory implements VirtualHostFactory factory.validateAttributes(attributes); } } - // TODO - each store type should validate its own attributes - if(!((String) storeType).equalsIgnoreCase(MemoryMessageStore.TYPE)) - { - /* Object storePath = attributes.get(STORE_PATH_ATTRIBUTE); - if(!(storePath instanceof String)) - { - throw new IllegalArgumentException("Attribute '"+ STORE_PATH_ATTRIBUTE - +"' is required and must be of type String."); - - }*/ - } } diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory deleted file mode 100644 index 02f22eb21a..0000000000 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -org.apache.qpid.server.store.MemoryMessageStoreFactory diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 616ee74b2d..9756cdfd55 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -31,8 +31,8 @@ import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryIterator; import org.apache.qpid.server.queue.SimpleQueueEntryList; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; @@ -63,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; private AMQQueue _queue = new MockAMQQueue(getName()); - private MessageStore _messageStore = new MemoryMessageStore(); + private MessageStore _messageStore = new TestMemoryMessageStore(); private LinkedList _referenceList = new LinkedList(); @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java index 042abca9c4..f00d12b77d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java @@ -78,7 +78,7 @@ public class VirtualHostRecovererTest extends TestCase attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_TYPE, "MEMORY"); + attributes.put(VirtualHost.STORE_TYPE, "TESTMEMORY"); when(entry.getAttributes()).thenReturn(attributes); VirtualHost host = recoverer.create(null, entry, parent); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index f1bf632235..3ee2345cee 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -35,8 +35,8 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -56,7 +56,7 @@ public class TopicExchangeTest extends QpidTestCase BrokerTestHelper.setUp(); _exchange = new TopicExchange(); _vhost = BrokerTestHelper.createVirtualHost(getName()); - _store = new MemoryMessageStore(); + _store = new TestMemoryMessageStore(); } @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 05d5d75864..ce213ee582 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -38,9 +38,8 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; public class VirtualHostTest extends TestCase @@ -92,7 +91,7 @@ public class VirtualHostTest extends TestCase Map attributes = new HashMap(); attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE); + attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE); attributes.put(VirtualHost.STATE, State.QUIESCED); VirtualHost host = createHost(attributes); @@ -131,7 +130,7 @@ public class VirtualHostTest extends TestCase Map attributes = new HashMap(); attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE); + attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE); VirtualHost host = createHost(attributes); return host; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java deleted file mode 100644 index 8b678c4eb4..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ /dev/null @@ -1,948 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - - -import java.util.ArrayList; -import java.util.Collection; -import org.apache.commons.configuration.PropertiesConfiguration; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.queue.AMQPriorityQueue; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.ConflationQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.FileUtils; - -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * This tests the MessageStores by using the available interfaces. - * - * For persistent stores, it validates that Exchanges, Queues, Bindings and - * Messages are persisted and recovered correctly. - */ -public class MessageStoreTest extends QpidTestCase -{ - public static final int DEFAULT_PRIORTY_LEVEL = 5; - public static final String SELECTOR_VALUE = "Test = 'MST'"; - public static final String LVQ_KEY = "MST-LVQ-KEY"; - - private String nonDurableExchangeName = "MST-NonDurableDirectExchange"; - private String directExchangeName = "MST-DirectExchange"; - private String topicExchangeName = "MST-TopicExchange"; - - private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable"); - private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable"); - private AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue"); - private AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue"); - - private AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive"); - private AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); - private AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable"); - private AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); - private AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); - private AMQShortString queueName = new AMQShortString("MST-Queue"); - - private AMQShortString directRouting = new AMQShortString("MST-direct"); - private AMQShortString topicRouting = new AMQShortString("MST-topic"); - - private AMQShortString queueOwner = new AMQShortString("MST"); - - private PropertiesConfiguration _config; - - private VirtualHost _virtualHost; - private org.apache.qpid.server.model.VirtualHost _virtualHostModel; - private Broker _broker; - private String _storePath; - - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - - _storePath = System.getProperty("QPID_WORK") + File.separator + getName(); - - _config = new PropertiesConfiguration(); - _config.addProperty("store.class", getTestProfileMessageStoreClassName()); - _config.addProperty("store.environment-path", _storePath); - _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class); - when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_PATH))).thenReturn(_storePath); - - - - cleanup(new File(_storePath)); - - _broker = BrokerTestHelper.createBrokerMock(); - - reloadVirtualHost(); - } - - protected String getStorePath() - { - return _storePath; - } - - protected org.apache.qpid.server.model.VirtualHost getVirtualHostModel() - { - return _virtualHostModel; - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public PropertiesConfiguration getConfig() - { - return _config; - } - - protected void reloadVirtualHost() - { - VirtualHost original = getVirtualHost(); - - if (getVirtualHost() != null) - { - try - { - getVirtualHost().close(); - } - catch (Exception e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - try - { - _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker),null,getVirtualHostModel()); - } - catch (Exception e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost()); - } - - /** - * Old MessageStoreTest segment which runs against both persistent and non-persistent stores - * creating queues, exchanges and bindings and then verifying message delivery to them. - */ - public void testQueueExchangeAndBindingCreation() throws Exception - { - assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueueRegistry().getQueues().size()); - - createAllQueues(); - createAllTopicQueues(); - - //Register Non-Durable DirectExchange - Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false); - bindAllQueuesToExchange(nonDurableExchange, directRouting); - - //Register DirectExchange - Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true); - bindAllQueuesToExchange(directExchange, directRouting); - - //Register TopicExchange - Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); - bindAllTopicQueuesToExchange(topicExchange, topicRouting); - - //Send Message To NonDurable direct Exchange = persistent - sendMessageOnExchange(nonDurableExchange, directRouting, true); - // and non-persistent - sendMessageOnExchange(nonDurableExchange, directRouting, false); - - //Send Message To direct Exchange = persistent - sendMessageOnExchange(directExchange, directRouting, true); - // and non-persistent - sendMessageOnExchange(directExchange, directRouting, false); - - //Send Message To topic Exchange = persistent - sendMessageOnExchange(topicExchange, topicRouting, true); - // and non-persistent - sendMessageOnExchange(topicExchange, topicRouting, false); - - //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings - validateMessageOnQueues(4, true); - //Ensure all the topics have two messages (one transient, one persistent) - validateMessageOnTopics(2, true); - - assertEquals("Not all queues correctly registered", - 10, getVirtualHost().getQueueRegistry().getQueues().size()); - } - - /** - * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above - * before reloading the virtual host and ensuring that the persistent messages were restored. - * - * More specific testing of message persistence is left to store-specific unit testing. - */ - public void testMessagePersistence() throws Exception - { - testQueueExchangeAndBindingCreation(); - - reloadVirtualHost(); - - //Validate durable queues and subscriptions still have the persistent messages - validateMessageOnQueues(2, false); - validateMessageOnTopics(1, false); - } - - /** - * Tests message removal by running the testMessagePersistence() method above before - * clearing the queues, reloading the virtual host, and ensuring that the persistent - * messages were removed from the queues. - */ - public void testMessageRemoval() throws Exception - { - testMessagePersistence(); - - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - assertEquals("Incorrect number of queues registered after recovery", - 6, queueRegistry.getQueues().size()); - - //clear the queue - queueRegistry.getQueue(durableQueueName).clearQueue(); - - //check the messages are gone - validateMessageOnQueue(durableQueueName, 0); - - //reload and verify messages arent restored - reloadVirtualHost(); - - validateMessageOnQueue(durableQueueName, 0); - } - - /** - * Tests queue persistence by creating a selection of queues with differing properties, both - * durable and non durable, and ensuring that following the recovery process the correct queues - * are present and any property manipulations (eg queue exclusivity) are correctly recovered. - */ - public void testQueuePersistence() throws Exception - { - assertEquals("Should not be any existing queues", - 0, getVirtualHost().getQueueRegistry().getQueues().size()); - - //create durable and non durable queues/topics - createAllQueues(); - createAllTopicQueues(); - - //reload the virtual host, prompting recovery of the queues/topics - reloadVirtualHost(); - - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - assertEquals("Incorrect number of queues registered after recovery", - 6, queueRegistry.getQueues().size()); - - //Validate the non-Durable Queues were not recovered. - assertNull("Non-Durable queue still registered:" + priorityQueueName, - queueRegistry.getQueue(priorityQueueName)); - assertNull("Non-Durable queue still registered:" + queueName, - queueRegistry.getQueue(queueName)); - assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, - queueRegistry.getQueue(priorityTopicQueueName)); - assertNull("Non-Durable queue still registered:" + topicQueueName, - queueRegistry.getQueue(topicQueueName)); - - //Validate normally expected properties of Queues/Topics - validateDurableQueueProperties(); - - //Update the durable exclusive queue's exclusivity - setQueueExclusivity(false); - validateQueueExclusivityProperty(false); - } - - /** - * Tests queue removal by creating a durable queue, verifying it recovers, and - * then removing it from the store, and ensuring that following the second reload - * process it is not recovered. - */ - public void testDurableQueueRemoval() throws Exception - { - //Register Durable Queue - createQueue(durableQueueName, false, true, false, false); - - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered before recovery", - 1, queueRegistry.getQueues().size()); - - reloadVirtualHost(); - - queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after first recovery", - 1, queueRegistry.getQueues().size()); - - //test that removing the queue means it is not recovered next time - final AMQQueue queue = queueRegistry.getQueue(durableQueueName); - DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); - - reloadVirtualHost(); - - queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of queues registered after second recovery", - 0, queueRegistry.getQueues().size()); - assertNull("Durable queue was not removed:" + durableQueueName, - queueRegistry.getQueue(durableQueueName)); - } - - /** - * Tests exchange persistence by creating a selection of exchanges, both durable - * and non durable, and ensuring that following the recovery process the correct - * durable exchanges are still present. - */ - public void testExchangePersistence() throws Exception - { - int origExchangeCount = getVirtualHost().getExchanges().size(); - - Map oldExchanges = createExchanges(); - - assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 3, getVirtualHost().getExchanges().size()); - - reloadVirtualHost(); - - //verify the exchanges present after recovery - validateExchanges(origExchangeCount, oldExchanges); - } - - /** - * Tests exchange removal by creating a durable exchange, verifying it recovers, and - * then removing it from the store, and ensuring that following the second reload - * process it is not recovered. - */ - public void testDurableExchangeRemoval() throws Exception - { - int origExchangeCount = getVirtualHost().getExchanges().size(); - - createExchange(DirectExchange.TYPE, directExchangeName, true); - - assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 1, getVirtualHost().getExchanges().size()); - - reloadVirtualHost(); - - assertEquals("Incorrect number of exchanges registered after first recovery", - origExchangeCount + 1, getVirtualHost().getExchanges().size()); - - //test that removing the exchange means it is not recovered next time - final Exchange exchange = getVirtualHost().getExchange(directExchangeName); - DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); - - reloadVirtualHost(); - - assertEquals("Incorrect number of exchanges registered after second recovery", - origExchangeCount, getVirtualHost().getExchanges().size()); - assertNull("Durable exchange was not removed:" + directExchangeName, - getVirtualHost().getExchange(directExchangeName)); - } - - /** - * Tests binding persistence by creating a selection of queues and exchanges, both durable - * and non durable, then adding bindings with and without selectors before reloading the - * virtual host and verifying that following the recovery process the correct durable - * bindings (those for durable queues to durable exchanges) are still present. - */ - public void testBindingPersistence() throws Exception - { - int origExchangeCount = getVirtualHost().getExchanges().size(); - - createAllQueues(); - createAllTopicQueues(); - - Map exchanges = createExchanges(); - - Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName); - Exchange directExchange = exchanges.get(directExchangeName); - Exchange topicExchange = exchanges.get(topicExchangeName); - - bindAllQueuesToExchange(nonDurableExchange, directRouting); - bindAllQueuesToExchange(directExchange, directRouting); - bindAllTopicQueuesToExchange(topicExchange, topicRouting); - - assertEquals("Incorrect number of exchanges registered before recovery", - origExchangeCount + 3, getVirtualHost().getExchanges().size()); - - reloadVirtualHost(); - - validateExchanges(origExchangeCount, exchanges); - - validateBindingProperties(); - } - - /** - * Tests binding removal by creating a durable exchange, and queue, binding them together, - * recovering to verify the persistence, then removing it from the store, and ensuring - * that following the second reload process it is not recovered. - */ - public void testDurableBindingRemoval() throws Exception - { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - //create durable queue and exchange, bind them - Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true); - createQueue(durableQueueName, false, true, false, false); - bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); - - assertEquals("Incorrect number of bindings registered before recovery", - 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); - - //verify binding is actually normally recovered - reloadVirtualHost(); - - queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of bindings registered after first recovery", - 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); - - exch = getVirtualHost().getExchange(directExchangeName); - assertNotNull("Exchange was not recovered", exch); - - //remove the binding and verify result after recovery - unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); - - reloadVirtualHost(); - - queueRegistry = getVirtualHost().getQueueRegistry(); - assertEquals("Incorrect number of bindings registered after second recovery", - 0, queueRegistry.getQueue(durableQueueName).getBindings().size()); - } - - /** - * Validates that the durable exchanges are still present, the non durable exchange is not, - * and that the new exchanges are not the same objects as the provided list (i.e. that the - * reload actually generated new exchange objects) - */ - private void validateExchanges(int originalNumExchanges, Map oldExchanges) - { - Collection exchanges = getVirtualHost().getExchanges(); - Collection exchangeNames = new ArrayList(exchanges.size()); - for(Exchange exchange : exchanges) - { - exchangeNames.add(exchange.getName()); - } - assertTrue(directExchangeName + " exchange NOT reloaded", - exchangeNames.contains(directExchangeName)); - assertTrue(topicExchangeName + " exchange NOT reloaded", - exchangeNames.contains(topicExchangeName)); - assertTrue(nonDurableExchangeName + " exchange reloaded", - !exchangeNames.contains(nonDurableExchangeName)); - - //check the old exchange objects are not the same as the new exchanges - assertTrue(directExchangeName + " exchange NOT reloaded", - getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName)); - assertTrue(topicExchangeName + " exchange NOT reloaded", - getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); - - // There should only be the original exchanges + our 2 recovered durable exchanges - assertEquals("Incorrect number of exchanges available", - originalNumExchanges + 2, getVirtualHost().getExchanges().size()); - } - - /** Validates the Durable queues and their properties are as expected following recovery */ - private void validateBindingProperties() - { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size()); - - validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false); - validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true); - validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false); - validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true); - validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false); - } - - /** - * Validate that each queue is bound only once following recovery (i.e. that bindings for non durable - * queues or to non durable exchanges are not recovered), and if a selector should be present - * that it is and contains the correct value - * - * @param bindings the set of bindings to validate - * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it - */ - private void validateBindingProperties(List bindings, boolean useSelectors) - { - assertEquals("Each queue should only be bound once.", 1, bindings.size()); - - Binding binding = bindings.get(0); - - if (useSelectors) - { - assertTrue("Binding does not contain a Selector argument.", - binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.toString())); - assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE, - binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.toString()).toString()); - } - } - - private void setQueueExclusivity(boolean exclusive) throws AMQException - { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); - - queue.setExclusive(exclusive); - } - - private void validateQueueExclusivityProperty(boolean expected) - { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); - - assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); - } - - - private void validateDurableQueueProperties() - { - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false); - validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false); - validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true); - } - - private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) - { - if(usePriority || lastValueQueue) - { - assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); - } - - if (usePriority) - { - assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); - assertEquals("Priority Queue does not have set priorities", - DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities()); - } - else if (lastValueQueue) - { - assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass()); - assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey()); - } - else - { - assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass()); - } - - assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner()); - assertEquals("Queue durability is not as expected", durable, queue.isDurable()); - assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive()); - } - - /** - * Delete the Store Environment path - * - * @param environmentPath The configuration that contains the store environment path. - */ - private void cleanup(File environmentPath) - { - if (environmentPath.exists()) - { - FileUtils.delete(environmentPath, true); - } - } - - private void sendMessageOnExchange(Exchange exchange, AMQShortString routingKey, boolean deliveryMode) - { - //Set MessagePersistence - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue()); - FieldTable headers = properties.getHeaders(); - headers.setString("Test", "MST"); - properties.setHeaders(headers); - - MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); - - final IncomingMessage currentMessage; - - - currentMessage = new IncomingMessage(messageInfo); - - currentMessage.setExchange(exchange); - - ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l); - - try - { - currentMessage.setContentHeaderBody(headerBody); - } - catch (AMQException e) - { - fail(e.getMessage()); - } - - currentMessage.setExpiration(); - - MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis()); - currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd)); - currentMessage.getStoredMessage().flushToStore(); - currentMessage.route(); - - - // check and deliver if header says body length is zero - if (currentMessage.allContentReceived()) - { - ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - final List destinationQueues = currentMessage.getDestinationQueues(); - trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() { - public void postCommit() - { - try - { - AMQMessage message = new AMQMessage(currentMessage.getStoredMessage()); - - for(BaseQueue queue : destinationQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - e.printStackTrace(); - } - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); - } - } - - private void createAllQueues() - { - //Register Durable Priority Queue - createQueue(durablePriorityQueueName, true, true, false, false); - - //Register Durable Simple Queue - createQueue(durableQueueName, false, true, false, false); - - //Register Durable Exclusive Simple Queue - createQueue(durableExclusiveQueueName, false, true, true, false); - - //Register Durable LastValue Queue - createQueue(durableLastValueQueueName, false, true, true, true); - - //Register NON-Durable Priority Queue - createQueue(priorityQueueName, true, false, false, false); - - //Register NON-Durable Simple Queue - createQueue(queueName, false, false, false, false); - } - - private void createAllTopicQueues() - { - //Register Durable Priority Queue - createQueue(durablePriorityTopicQueueName, true, true, false, false); - - //Register Durable Simple Queue - createQueue(durableTopicQueueName, false, true, false, false); - - //Register NON-Durable Priority Queue - createQueue(priorityTopicQueueName, true, false, false, false); - - //Register NON-Durable Simple Queue - createQueue(topicQueueName, false, false, false, false); - } - - private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) - { - - FieldTable queueArguments = null; - - if(usePriority || lastValueQueue) - { - assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); - } - - if (usePriority) - { - queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); - } - - if (lastValueQueue) - { - queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY); - } - - AMQQueue queue = null; - - //Ideally we would be able to use the QueueDeclareHandler here. - try - { - queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName.asString(), durable, queueOwner.asString(), false, exclusive, - getVirtualHost(), FieldTable.convertToMap(queueArguments)); - - validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); - - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(), - queue, - queueArguments); - } - } - catch (AMQException e) - { - fail(e.getMessage()); - } - - getVirtualHost().getQueueRegistry().registerQueue(queue); - - } - - private Map createExchanges() - { - Map exchanges = new HashMap(); - - //Register non-durable DirectExchange - exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false)); - - //Register durable DirectExchange and TopicExchange - exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true)); - exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true)); - - return exchanges; - } - - private Exchange createExchange(ExchangeType type, String name, boolean durable) - { - Exchange exchange = null; - - try - { - exchange = getVirtualHost().createExchange(null, name, type.getName().toString(), durable, false, null); - } - catch (AMQException e) - { - fail(e.getMessage()); - } - - return exchange; - } - - private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey) - { - FieldTable queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); - - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null); - } - - private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey) - { - FieldTable queueArguments = new FieldTable(); - queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); - - QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); - - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments); - bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null); - } - - - protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) - { - FieldTable bindArguments = null; - - if (useSelector) - { - bindArguments = new FieldTable(); - bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE ); - } - - try - { - exchange.addBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments)); - } - catch (Exception e) - { - fail(e.getMessage()); - } - } - - protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) - { - FieldTable bindArguments = null; - - if (useSelector) - { - bindArguments = new FieldTable(); - bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE ); - } - - try - { - exchange.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments)); - } - catch (Exception e) - { - fail(e.getMessage()); - } - } - - private void validateMessageOnTopics(long messageCount, boolean allQueues) - { - validateMessageOnQueue(durablePriorityTopicQueueName, messageCount); - validateMessageOnQueue(durableTopicQueueName, messageCount); - - if (allQueues) - { - validateMessageOnQueue(priorityTopicQueueName, messageCount); - validateMessageOnQueue(topicQueueName, messageCount); - } - } - - private void validateMessageOnQueues(long messageCount, boolean allQueues) - { - validateMessageOnQueue(durablePriorityQueueName, messageCount); - validateMessageOnQueue(durableQueueName, messageCount); - - if (allQueues) - { - validateMessageOnQueue(priorityQueueName, messageCount); - validateMessageOnQueue(queueName, messageCount); - } - } - - private void validateMessageOnQueue(AMQShortString queueName, long messageCount) - { - AMQQueue queue = getVirtualHost().getQueueRegistry().getQueue(queueName); - - assertNotNull("Queue(" + queueName + ") not correctly registered:", queue); - - assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount()); - } - - private class TestMessagePublishInfo implements MessagePublishInfo - { - - Exchange _exchange; - boolean _immediate; - boolean _mandatory; - AMQShortString _routingKey; - - TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - public AMQShortString getExchange() - { - return _exchange.getNameShortString(); - } - - public void setExchange(AMQShortString exchange) - { - //no-op - } - - public boolean isImmediate() - { - return _immediate; - } - - public boolean isMandatory() - { - return _mandatory; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java new file mode 100644 index 0000000000..32df355c07 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +public class TestMemoryMessageStore extends AbstractMemoryMessageStore +{ + public static final String TYPE = "TestMemory"; + + @Override + public String getStoreType() + { + return TYPE; + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java new file mode 100644 index 0000000000..fd2d4215ab --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store; + +import java.util.Collections; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.plugin.MessageStoreFactory; + +public class TestMemoryMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return TestMemoryMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new TestMemoryMessageStore(); + } + + @Override + public Map convertStoreConfiguration(Configuration configuration) + { + return Collections.emptyMap(); + } + + @Override + public void validateAttributes(Map attributes) + { + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 210408f490..bb3c0cf535 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Adds some extra methods to the memory message store for testing purposes. */ -public class TestableMemoryMessageStore extends MemoryMessageStore +public class TestableMemoryMessageStore extends TestMemoryMessageStore { private final Map _messages = new HashMap(); private final AtomicInteger _messageCount = new AtomicInteger(0); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index 6b8ea0e80b..e72196c383 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.virtualhost; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -34,7 +36,7 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -306,7 +308,7 @@ public class StandardVirtualHostTest extends QpidTestCase writer.write(" <" + vhostName + ">"); writer.write(" " + StandardVirtualHostFactory.TYPE + ""); writer.write(" "); - writer.write(" " + MemoryMessageStore.class.getName() + ""); + writer.write(" " + TestMemoryMessageStore.class.getName() + ""); writer.write(" "); if(exchangeName != null && !dontDeclare) { @@ -363,10 +365,11 @@ public class StandardVirtualHostTest extends QpidTestCase _virtualHostRegistry = broker.getVirtualHostRegistry(); Configuration config = new PropertiesConfiguration(); - config.setProperty("store.type", MemoryMessageStore.TYPE); VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker); + final org.apache.qpid.server.model.VirtualHost virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class); + when(virtualHost.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn(TestMemoryMessageStore.TYPE); VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration, - mock(org.apache.qpid.server.model.VirtualHost.class)); + virtualHost); _virtualHostRegistry.registerVirtualHost(host); return host; } diff --git a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..9512fb8117 --- /dev/null +++ b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.TestMemoryMessageStoreFactory diff --git a/qpid/java/build.deps b/qpid/java/build.deps index 630932638a..5614618ffb 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -98,6 +98,7 @@ broker-plugins-management-http.test.libs=${test.libs} broker-plugins-management-jmx.test.libs=${test.libs} broker-plugins-jdbc-store.test.libs=${test.libs} broker-plugins-derby-store.test.libs=${test.libs} +broker-plugins-memory-store.test.libs=${test.libs} management-common.test.libs=${test.libs} diff --git a/qpid/java/perftests/build.xml b/qpid/java/perftests/build.xml index d29649ad68..690f1fd375 100644 --- a/qpid/java/perftests/build.xml +++ b/qpid/java/perftests/build.xml @@ -33,7 +33,7 @@ - + diff --git a/qpid/java/systests/build.xml b/qpid/java/systests/build.xml index dee73b2e1e..d53ee4b0ae 100644 --- a/qpid/java/systests/build.xml +++ b/qpid/java/systests/build.xml @@ -34,7 +34,7 @@ nn - or more contributor license agreements. See the NOTICE file - + diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java new file mode 100644 index 0000000000..8b678c4eb4 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -0,0 +1,948 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.commons.configuration.PropertiesConfiguration; + +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.queue.AMQPriorityQueue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * This tests the MessageStores by using the available interfaces. + * + * For persistent stores, it validates that Exchanges, Queues, Bindings and + * Messages are persisted and recovered correctly. + */ +public class MessageStoreTest extends QpidTestCase +{ + public static final int DEFAULT_PRIORTY_LEVEL = 5; + public static final String SELECTOR_VALUE = "Test = 'MST'"; + public static final String LVQ_KEY = "MST-LVQ-KEY"; + + private String nonDurableExchangeName = "MST-NonDurableDirectExchange"; + private String directExchangeName = "MST-DirectExchange"; + private String topicExchangeName = "MST-TopicExchange"; + + private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable"); + private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable"); + private AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue"); + private AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue"); + + private AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive"); + private AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); + private AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable"); + private AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); + private AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); + private AMQShortString queueName = new AMQShortString("MST-Queue"); + + private AMQShortString directRouting = new AMQShortString("MST-direct"); + private AMQShortString topicRouting = new AMQShortString("MST-topic"); + + private AMQShortString queueOwner = new AMQShortString("MST"); + + private PropertiesConfiguration _config; + + private VirtualHost _virtualHost; + private org.apache.qpid.server.model.VirtualHost _virtualHostModel; + private Broker _broker; + private String _storePath; + + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + + _storePath = System.getProperty("QPID_WORK") + File.separator + getName(); + + _config = new PropertiesConfiguration(); + _config.addProperty("store.class", getTestProfileMessageStoreClassName()); + _config.addProperty("store.environment-path", _storePath); + _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class); + when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_PATH))).thenReturn(_storePath); + + + + cleanup(new File(_storePath)); + + _broker = BrokerTestHelper.createBrokerMock(); + + reloadVirtualHost(); + } + + protected String getStorePath() + { + return _storePath; + } + + protected org.apache.qpid.server.model.VirtualHost getVirtualHostModel() + { + return _virtualHostModel; + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_virtualHost != null) + { + _virtualHost.close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public PropertiesConfiguration getConfig() + { + return _config; + } + + protected void reloadVirtualHost() + { + VirtualHost original = getVirtualHost(); + + if (getVirtualHost() != null) + { + try + { + getVirtualHost().close(); + } + catch (Exception e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + try + { + _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker),null,getVirtualHostModel()); + } + catch (Exception e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost()); + } + + /** + * Old MessageStoreTest segment which runs against both persistent and non-persistent stores + * creating queues, exchanges and bindings and then verifying message delivery to them. + */ + public void testQueueExchangeAndBindingCreation() throws Exception + { + assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueueRegistry().getQueues().size()); + + createAllQueues(); + createAllTopicQueues(); + + //Register Non-Durable DirectExchange + Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false); + bindAllQueuesToExchange(nonDurableExchange, directRouting); + + //Register DirectExchange + Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true); + bindAllQueuesToExchange(directExchange, directRouting); + + //Register TopicExchange + Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); + bindAllTopicQueuesToExchange(topicExchange, topicRouting); + + //Send Message To NonDurable direct Exchange = persistent + sendMessageOnExchange(nonDurableExchange, directRouting, true); + // and non-persistent + sendMessageOnExchange(nonDurableExchange, directRouting, false); + + //Send Message To direct Exchange = persistent + sendMessageOnExchange(directExchange, directRouting, true); + // and non-persistent + sendMessageOnExchange(directExchange, directRouting, false); + + //Send Message To topic Exchange = persistent + sendMessageOnExchange(topicExchange, topicRouting, true); + // and non-persistent + sendMessageOnExchange(topicExchange, topicRouting, false); + + //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings + validateMessageOnQueues(4, true); + //Ensure all the topics have two messages (one transient, one persistent) + validateMessageOnTopics(2, true); + + assertEquals("Not all queues correctly registered", + 10, getVirtualHost().getQueueRegistry().getQueues().size()); + } + + /** + * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above + * before reloading the virtual host and ensuring that the persistent messages were restored. + * + * More specific testing of message persistence is left to store-specific unit testing. + */ + public void testMessagePersistence() throws Exception + { + testQueueExchangeAndBindingCreation(); + + reloadVirtualHost(); + + //Validate durable queues and subscriptions still have the persistent messages + validateMessageOnQueues(2, false); + validateMessageOnTopics(1, false); + } + + /** + * Tests message removal by running the testMessagePersistence() method above before + * clearing the queues, reloading the virtual host, and ensuring that the persistent + * messages were removed from the queues. + */ + public void testMessageRemoval() throws Exception + { + testMessagePersistence(); + + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + assertEquals("Incorrect number of queues registered after recovery", + 6, queueRegistry.getQueues().size()); + + //clear the queue + queueRegistry.getQueue(durableQueueName).clearQueue(); + + //check the messages are gone + validateMessageOnQueue(durableQueueName, 0); + + //reload and verify messages arent restored + reloadVirtualHost(); + + validateMessageOnQueue(durableQueueName, 0); + } + + /** + * Tests queue persistence by creating a selection of queues with differing properties, both + * durable and non durable, and ensuring that following the recovery process the correct queues + * are present and any property manipulations (eg queue exclusivity) are correctly recovered. + */ + public void testQueuePersistence() throws Exception + { + assertEquals("Should not be any existing queues", + 0, getVirtualHost().getQueueRegistry().getQueues().size()); + + //create durable and non durable queues/topics + createAllQueues(); + createAllTopicQueues(); + + //reload the virtual host, prompting recovery of the queues/topics + reloadVirtualHost(); + + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + assertEquals("Incorrect number of queues registered after recovery", + 6, queueRegistry.getQueues().size()); + + //Validate the non-Durable Queues were not recovered. + assertNull("Non-Durable queue still registered:" + priorityQueueName, + queueRegistry.getQueue(priorityQueueName)); + assertNull("Non-Durable queue still registered:" + queueName, + queueRegistry.getQueue(queueName)); + assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, + queueRegistry.getQueue(priorityTopicQueueName)); + assertNull("Non-Durable queue still registered:" + topicQueueName, + queueRegistry.getQueue(topicQueueName)); + + //Validate normally expected properties of Queues/Topics + validateDurableQueueProperties(); + + //Update the durable exclusive queue's exclusivity + setQueueExclusivity(false); + validateQueueExclusivityProperty(false); + } + + /** + * Tests queue removal by creating a durable queue, verifying it recovers, and + * then removing it from the store, and ensuring that following the second reload + * process it is not recovered. + */ + public void testDurableQueueRemoval() throws Exception + { + //Register Durable Queue + createQueue(durableQueueName, false, true, false, false); + + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + assertEquals("Incorrect number of queues registered before recovery", + 1, queueRegistry.getQueues().size()); + + reloadVirtualHost(); + + queueRegistry = getVirtualHost().getQueueRegistry(); + assertEquals("Incorrect number of queues registered after first recovery", + 1, queueRegistry.getQueues().size()); + + //test that removing the queue means it is not recovered next time + final AMQQueue queue = queueRegistry.getQueue(durableQueueName); + DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); + + reloadVirtualHost(); + + queueRegistry = getVirtualHost().getQueueRegistry(); + assertEquals("Incorrect number of queues registered after second recovery", + 0, queueRegistry.getQueues().size()); + assertNull("Durable queue was not removed:" + durableQueueName, + queueRegistry.getQueue(durableQueueName)); + } + + /** + * Tests exchange persistence by creating a selection of exchanges, both durable + * and non durable, and ensuring that following the recovery process the correct + * durable exchanges are still present. + */ + public void testExchangePersistence() throws Exception + { + int origExchangeCount = getVirtualHost().getExchanges().size(); + + Map oldExchanges = createExchanges(); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 3, getVirtualHost().getExchanges().size()); + + reloadVirtualHost(); + + //verify the exchanges present after recovery + validateExchanges(origExchangeCount, oldExchanges); + } + + /** + * Tests exchange removal by creating a durable exchange, verifying it recovers, and + * then removing it from the store, and ensuring that following the second reload + * process it is not recovered. + */ + public void testDurableExchangeRemoval() throws Exception + { + int origExchangeCount = getVirtualHost().getExchanges().size(); + + createExchange(DirectExchange.TYPE, directExchangeName, true); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 1, getVirtualHost().getExchanges().size()); + + reloadVirtualHost(); + + assertEquals("Incorrect number of exchanges registered after first recovery", + origExchangeCount + 1, getVirtualHost().getExchanges().size()); + + //test that removing the exchange means it is not recovered next time + final Exchange exchange = getVirtualHost().getExchange(directExchangeName); + DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); + + reloadVirtualHost(); + + assertEquals("Incorrect number of exchanges registered after second recovery", + origExchangeCount, getVirtualHost().getExchanges().size()); + assertNull("Durable exchange was not removed:" + directExchangeName, + getVirtualHost().getExchange(directExchangeName)); + } + + /** + * Tests binding persistence by creating a selection of queues and exchanges, both durable + * and non durable, then adding bindings with and without selectors before reloading the + * virtual host and verifying that following the recovery process the correct durable + * bindings (those for durable queues to durable exchanges) are still present. + */ + public void testBindingPersistence() throws Exception + { + int origExchangeCount = getVirtualHost().getExchanges().size(); + + createAllQueues(); + createAllTopicQueues(); + + Map exchanges = createExchanges(); + + Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName); + Exchange directExchange = exchanges.get(directExchangeName); + Exchange topicExchange = exchanges.get(topicExchangeName); + + bindAllQueuesToExchange(nonDurableExchange, directRouting); + bindAllQueuesToExchange(directExchange, directRouting); + bindAllTopicQueuesToExchange(topicExchange, topicRouting); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 3, getVirtualHost().getExchanges().size()); + + reloadVirtualHost(); + + validateExchanges(origExchangeCount, exchanges); + + validateBindingProperties(); + } + + /** + * Tests binding removal by creating a durable exchange, and queue, binding them together, + * recovering to verify the persistence, then removing it from the store, and ensuring + * that following the second reload process it is not recovered. + */ + public void testDurableBindingRemoval() throws Exception + { + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + //create durable queue and exchange, bind them + Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true); + createQueue(durableQueueName, false, true, false, false); + bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); + + assertEquals("Incorrect number of bindings registered before recovery", + 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); + + //verify binding is actually normally recovered + reloadVirtualHost(); + + queueRegistry = getVirtualHost().getQueueRegistry(); + assertEquals("Incorrect number of bindings registered after first recovery", + 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); + + exch = getVirtualHost().getExchange(directExchangeName); + assertNotNull("Exchange was not recovered", exch); + + //remove the binding and verify result after recovery + unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); + + reloadVirtualHost(); + + queueRegistry = getVirtualHost().getQueueRegistry(); + assertEquals("Incorrect number of bindings registered after second recovery", + 0, queueRegistry.getQueue(durableQueueName).getBindings().size()); + } + + /** + * Validates that the durable exchanges are still present, the non durable exchange is not, + * and that the new exchanges are not the same objects as the provided list (i.e. that the + * reload actually generated new exchange objects) + */ + private void validateExchanges(int originalNumExchanges, Map oldExchanges) + { + Collection exchanges = getVirtualHost().getExchanges(); + Collection exchangeNames = new ArrayList(exchanges.size()); + for(Exchange exchange : exchanges) + { + exchangeNames.add(exchange.getName()); + } + assertTrue(directExchangeName + " exchange NOT reloaded", + exchangeNames.contains(directExchangeName)); + assertTrue(topicExchangeName + " exchange NOT reloaded", + exchangeNames.contains(topicExchangeName)); + assertTrue(nonDurableExchangeName + " exchange reloaded", + !exchangeNames.contains(nonDurableExchangeName)); + + //check the old exchange objects are not the same as the new exchanges + assertTrue(directExchangeName + " exchange NOT reloaded", + getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName)); + assertTrue(topicExchangeName + " exchange NOT reloaded", + getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); + + // There should only be the original exchanges + our 2 recovered durable exchanges + assertEquals("Incorrect number of exchanges available", + originalNumExchanges + 2, getVirtualHost().getExchanges().size()); + } + + /** Validates the Durable queues and their properties are as expected following recovery */ + private void validateBindingProperties() + { + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size()); + + validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false); + validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true); + validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false); + validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true); + validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false); + } + + /** + * Validate that each queue is bound only once following recovery (i.e. that bindings for non durable + * queues or to non durable exchanges are not recovered), and if a selector should be present + * that it is and contains the correct value + * + * @param bindings the set of bindings to validate + * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it + */ + private void validateBindingProperties(List bindings, boolean useSelectors) + { + assertEquals("Each queue should only be bound once.", 1, bindings.size()); + + Binding binding = bindings.get(0); + + if (useSelectors) + { + assertTrue("Binding does not contain a Selector argument.", + binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.toString())); + assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE, + binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.toString()).toString()); + } + } + + private void setQueueExclusivity(boolean exclusive) throws AMQException + { + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); + + queue.setExclusive(exclusive); + } + + private void validateQueueExclusivityProperty(boolean expected) + { + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); + + assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); + } + + + private void validateDurableQueueProperties() + { + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false); + validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false); + validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true); + } + + private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) + { + if(usePriority || lastValueQueue) + { + assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); + } + + if (usePriority) + { + assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); + assertEquals("Priority Queue does not have set priorities", + DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities()); + } + else if (lastValueQueue) + { + assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass()); + assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey()); + } + else + { + assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass()); + } + + assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner()); + assertEquals("Queue durability is not as expected", durable, queue.isDurable()); + assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive()); + } + + /** + * Delete the Store Environment path + * + * @param environmentPath The configuration that contains the store environment path. + */ + private void cleanup(File environmentPath) + { + if (environmentPath.exists()) + { + FileUtils.delete(environmentPath, true); + } + } + + private void sendMessageOnExchange(Exchange exchange, AMQShortString routingKey, boolean deliveryMode) + { + //Set MessagePersistence + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue()); + FieldTable headers = properties.getHeaders(); + headers.setString("Test", "MST"); + properties.setHeaders(headers); + + MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); + + final IncomingMessage currentMessage; + + + currentMessage = new IncomingMessage(messageInfo); + + currentMessage.setExchange(exchange); + + ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l); + + try + { + currentMessage.setContentHeaderBody(headerBody); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + currentMessage.setExpiration(); + + MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis()); + currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd)); + currentMessage.getStoredMessage().flushToStore(); + currentMessage.route(); + + + // check and deliver if header says body length is zero + if (currentMessage.allContentReceived()) + { + ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); + final List destinationQueues = currentMessage.getDestinationQueues(); + trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() { + public void postCommit() + { + try + { + AMQMessage message = new AMQMessage(currentMessage.getStoredMessage()); + + for(BaseQueue queue : destinationQueues) + { + queue.enqueue(message); + } + } + catch (AMQException e) + { + e.printStackTrace(); + } + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); + } + } + + private void createAllQueues() + { + //Register Durable Priority Queue + createQueue(durablePriorityQueueName, true, true, false, false); + + //Register Durable Simple Queue + createQueue(durableQueueName, false, true, false, false); + + //Register Durable Exclusive Simple Queue + createQueue(durableExclusiveQueueName, false, true, true, false); + + //Register Durable LastValue Queue + createQueue(durableLastValueQueueName, false, true, true, true); + + //Register NON-Durable Priority Queue + createQueue(priorityQueueName, true, false, false, false); + + //Register NON-Durable Simple Queue + createQueue(queueName, false, false, false, false); + } + + private void createAllTopicQueues() + { + //Register Durable Priority Queue + createQueue(durablePriorityTopicQueueName, true, true, false, false); + + //Register Durable Simple Queue + createQueue(durableTopicQueueName, false, true, false, false); + + //Register NON-Durable Priority Queue + createQueue(priorityTopicQueueName, true, false, false, false); + + //Register NON-Durable Simple Queue + createQueue(topicQueueName, false, false, false, false); + } + + private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) + { + + FieldTable queueArguments = null; + + if(usePriority || lastValueQueue) + { + assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue); + } + + if (usePriority) + { + queueArguments = new FieldTable(); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); + } + + if (lastValueQueue) + { + queueArguments = new FieldTable(); + queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY); + } + + AMQQueue queue = null; + + //Ideally we would be able to use the QueueDeclareHandler here. + try + { + queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName.asString(), durable, queueOwner.asString(), false, exclusive, + getVirtualHost(), FieldTable.convertToMap(queueArguments)); + + validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); + + if (queue.isDurable() && !queue.isAutoDelete()) + { + DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(), + queue, + queueArguments); + } + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + getVirtualHost().getQueueRegistry().registerQueue(queue); + + } + + private Map createExchanges() + { + Map exchanges = new HashMap(); + + //Register non-durable DirectExchange + exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false)); + + //Register durable DirectExchange and TopicExchange + exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true)); + exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true)); + + return exchanges; + } + + private Exchange createExchange(ExchangeType type, String name, boolean durable) + { + Exchange exchange = null; + + try + { + exchange = getVirtualHost().createExchange(null, name, type.getName().toString(), durable, false, null); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + return exchange; + } + + private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey) + { + FieldTable queueArguments = new FieldTable(); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); + + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null); + } + + private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey) + { + FieldTable queueArguments = new FieldTable(); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); + + QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); + + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null); + } + + + protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) + { + FieldTable bindArguments = null; + + if (useSelector) + { + bindArguments = new FieldTable(); + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE ); + } + + try + { + exchange.addBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments)); + } + catch (Exception e) + { + fail(e.getMessage()); + } + } + + protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) + { + FieldTable bindArguments = null; + + if (useSelector) + { + bindArguments = new FieldTable(); + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE ); + } + + try + { + exchange.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments)); + } + catch (Exception e) + { + fail(e.getMessage()); + } + } + + private void validateMessageOnTopics(long messageCount, boolean allQueues) + { + validateMessageOnQueue(durablePriorityTopicQueueName, messageCount); + validateMessageOnQueue(durableTopicQueueName, messageCount); + + if (allQueues) + { + validateMessageOnQueue(priorityTopicQueueName, messageCount); + validateMessageOnQueue(topicQueueName, messageCount); + } + } + + private void validateMessageOnQueues(long messageCount, boolean allQueues) + { + validateMessageOnQueue(durablePriorityQueueName, messageCount); + validateMessageOnQueue(durableQueueName, messageCount); + + if (allQueues) + { + validateMessageOnQueue(priorityQueueName, messageCount); + validateMessageOnQueue(queueName, messageCount); + } + } + + private void validateMessageOnQueue(AMQShortString queueName, long messageCount) + { + AMQQueue queue = getVirtualHost().getQueueRegistry().getQueue(queueName); + + assertNotNull("Queue(" + queueName + ") not correctly registered:", queue); + + assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount()); + } + + private class TestMessagePublishInfo implements MessagePublishInfo + { + + Exchange _exchange; + boolean _immediate; + boolean _mandatory; + AMQShortString _routingKey; + + TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange.getNameShortString(); + } + + public void setExchange(AMQShortString exchange) + { + //no-op + } + + public boolean isImmediate() + { + return _immediate; + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 50f2ed655f..c44d4778d4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -40,8 +40,8 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore private HashMap _preDelays = new HashMap(); private HashMap _postDelays = new HashMap(); private long _defaultDelay = 0L; - private MessageStore _realStore = new MemoryMessageStore(); - private DurableConfigurationStore _durableConfigurationStore = (MemoryMessageStore) _realStore; + private MessageStore _realStore = new MessageStoreCreator().createMessageStore("Memory"); + private DurableConfigurationStore _durableConfigurationStore = (DurableConfigurationStore) _realStore; private static final String PRE = "pre"; private static final String POST = "post"; private String DEFAULT_DELAY = "default"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index cf05cc0304..605296d55c 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -66,7 +66,6 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreCreator; import org.apache.qpid.url.URLSyntaxException; @@ -1433,10 +1432,10 @@ public class QpidBrokerTestCase extends QpidTestCase public String getTestProfileMessageStoreType() { final String storeClass = getTestProfileMessageStoreClassName(); - if (storeClass == null) + /* if (storeClass == null) { - return MemoryMessageStore.TYPE; - } + return "Memory"; + }*/ return supportedStoresClassToTypeMapping.get(storeClass); } -- cgit v1.2.1