diff options
Diffstat (limited to 'java/bdbstore')
31 files changed, 601 insertions, 342 deletions
diff --git a/java/bdbstore/bin/backup.sh b/java/bdbstore/bin/backup.sh index 61311cd2ef..ba51758d3c 100755 --- a/java/bdbstore/bin/backup.sh +++ b/java/bdbstore/bin/backup.sh @@ -34,11 +34,8 @@ if [ -z "${QPID_HOME}" ]; then export QPID_HOME=`cd ${WHEREAMI}/../ && pwd` fi -VERSION=0.19 - # BDB's je JAR expected to be found in lib/opt -LIBS="${QPID_HOME}/lib/opt/*:${QPID_HOME}/lib/qpid-bdbstore-${VERSION}.jar:${QPID_HOME}/lib/qpid-all.jar" - +LIBS="${QPID_HOME}/lib/opt/*:${QPID_HOME}/lib/qpid-all.jar" echo "Starting Hot Backup Script" java -Dlog4j.configuration=backup-log4j.xml ${JAVA_OPTS} -cp "${LIBS}" org.apache.qpid.server.store.berkeleydb.BDBBackup "${ARGS[@]}" diff --git a/java/bdbstore/build.xml b/java/bdbstore/build.xml index 7c305c7c2f..46809f6a90 100644 --- a/java/bdbstore/build.xml +++ b/java/bdbstore/build.xml @@ -18,7 +18,7 @@ --> <project name="bdbstore" xmlns:ivy="antlib:org.apache.ivy.ant" default="build"> <property name="module.depends" value="common broker" /> - <property name="module.test.depends" value="test client common/test broker/test management/common systests" /> + <property name="module.test.depends" value="client common/tests broker/tests management/common systests broker-plugins/management-jmx" /> <property name="module.genpom" value="true"/> <import file="../module.xml" /> @@ -78,19 +78,4 @@ http://www.oracle.com/technetwork/database/berkeleydb/downloads/jeoslicense-0868 <target name="build" depends="check-request-props, bdb-jar-required, module.build" /> - <target name="postbuild" depends="copy-store-to-upgrade" /> - - <target name="copy-store-to-upgrade" description="copy the upgrade tool resource folder contents into the build tree"> - <copy todir="${qpid.home}" failonerror="true"> - <fileset dir="src/test/resources/upgrade"/> - </copy> - </target> - - <target name="precompile-tests"> - <mkdir dir="${module.test.resources}"/> - <copy todir="${module.test.resources}"> - <fileset dir="src/test/resources"/> - </copy> - </target> - </project> diff --git a/java/bdbstore/jmx/MANIFEST.MF b/java/bdbstore/jmx/MANIFEST.MF deleted file mode 100644 index ee59bc3ad8..0000000000 --- a/java/bdbstore/jmx/MANIFEST.MF +++ /dev/null @@ -1,20 +0,0 @@ -Manifest-Version: 1.0 -Bundle-ManifestVersion: 2 -Bundle-Name: Qpid Bdbstore-Plugins JMX -Bundle-SymbolicName: bdbstore-plugins-jmx -Bundle-Description: Bdbstore Management plugin for Qpid. -Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt -Bundle-DocURL: http://www.apache.org/ -Bundle-Version: 1.0.0 -Bundle-RequiredExecutionEnvironment: JavaSE-1.6 -Bundle-ClassPath: . -Fragment-Host: broker-plugins-management-jmx -Import-Package: org.apache.qpid, - org.apache.qpid.management.common.mbeans.annotations, - org.apache.qpid.server.model, - org.apache.qpid.server.virtualhost, - org.apache.qpid.server.store.berkeleydb, - org.apache.log4j;version=1.2.16, - javax.management, - javax.management.openmbean -Export-Package: org.apache.qpid.server.store.berkeleydb.jmx diff --git a/java/bdbstore/jmx/build.xml b/java/bdbstore/jmx/build.xml index 229631555d..d3e9f63b46 100644 --- a/java/bdbstore/jmx/build.xml +++ b/java/bdbstore/jmx/build.xml @@ -18,13 +18,13 @@ --> <project name="bdbstore-jmx" default="build"> <property name="module.depends" value="common broker broker-plugins/management-jmx management/common bdbstore" /> - <property name="module.test.depends" value="test broker/test common/test management/common client systests bdbstore/test" /> + <property name="module.test.depends" value="broker/tests common/tests management/common client systests bdbstore/tests" /> - <property name="module.manifest" value="MANIFEST.MF" /> - <property name="module.plugin" value="true" /> <property name="module.genpom" value="true"/> <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker=provided -Sqpid-broker-plugins-management-jmx=provided -Sqpid-management-common=provided -Sqpid-bdbstore=provided -Sje=provided"/> + <property name="broker.plugin" value="true"/> + <import file="../../module.xml" /> <target name="bundle" depends="bundle-tasks" /> diff --git a/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index cfcea602b4..28528ec83c 100644 --- a/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import javax.management.JMException; +import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.CompositeType; @@ -91,7 +92,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M @Override public String getObjectInstanceName() { - return _store.getName(); + return ObjectName.quote(_store.getName()); } @Override diff --git a/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java index 837da1eef3..14cdec1669 100644 --- a/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java +++ b/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -28,13 +28,11 @@ import org.apache.qpid.server.jmx.MBeanProvider; import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; /** * This provide will create a {@link BDBHAMessageStoreManagerMBean} if the child is a virtual - * host and of type {@link BDBHAMessageStore#BDB_HA_STORE_TYPE}. + * host and of type {@link BDBHAMessageStore#TYPE}. * */ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider @@ -50,7 +48,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider public boolean isChildManageableByMBean(ConfiguredObject child) { return (child instanceof VirtualHost - && BDBHAMessageStore.BDB_HA_STORE_TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); + && BDBHAMessageStore.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); } @Override @@ -58,10 +56,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider { VirtualHost virtualHostChild = (VirtualHost) child; - VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry(); - org.apache.qpid.server.virtualhost.VirtualHost vhost = virtualHostRegistry.getVirtualHost(virtualHostChild.getName()); - - BDBHAMessageStore messageStore = (BDBHAMessageStore) vhost.getMessageStore(); + BDBHAMessageStore messageStore = (BDBHAMessageStore) virtualHostChild.getMessageStore(); if (LOGGER.isDebugEnabled()) { diff --git a/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider b/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider index b5bc947612..8ece9627b0 100644 --- a/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider +++ b/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider @@ -1 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBeanProvider diff --git a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index 606983cdae..ff47ed958d 100644 --- a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.Set; import javax.jms.Connection; +import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; @@ -37,7 +38,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; -import org.apache.qpid.server.virtualhost.ManagedVirtualHost; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -55,7 +55,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; private static final String VIRTUAL_HOST = "test"; - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); private static final int NUMBER_OF_NODES = 4; private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); @@ -67,7 +67,6 @@ public class HAClusterManagementTest extends QpidBrokerTestCase protected void setUp() throws Exception { _brokerType = BrokerType.SPAWNED; - _jmxUtils.setUp(); _clusterCreator.configureClusterNodes(); _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); @@ -132,12 +131,11 @@ public class HAClusterManagementTest extends QpidBrokerTestCase final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); + final TabularData groupMembers = storeBean.getAllNodesInGroup(); assertNotNull(groupMembers); - final int numberOfDataRows = groupMembers.size(); - assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows); - for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) { final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); @@ -155,8 +153,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); - final int numberOfDataRows = storeBean.getAllNodesInGroup().size(); - assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); _clusterCreator.stopNode(brokerPortNumberToBeRemoved); @@ -266,4 +263,27 @@ public class HAClusterManagementTest extends QpidBrokerTestCase return _jmxUtils.getManagedBroker(VIRTUAL_HOST); } + + private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception + { + long totalTimeWaited = 0l; + long waitInterval = 100l; + long maxWaitTime = 10000; + + int currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime) + { + LOGGER.debug("Still awaiting nodes to join group; expecting " + + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes + + " after " + totalTimeWaited + " ms."); + + totalTimeWaited += waitInterval; + Thread.sleep(waitInterval); + + currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + } + + assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms", + expectedNumberOfNodes ,currentNumberOfNodes); + } } diff --git a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java index 22877ec36c..95626f7fa5 100644 --- a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -27,6 +27,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; +import javax.management.ObjectName; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; @@ -41,7 +42,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase private static final String VIRTUAL_HOST = "test"; - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); private static final int NUMBER_OF_NODES = 2; private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); @@ -56,7 +57,6 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase assertTrue(isJavaBroker()); assertTrue(isBrokerStorePersistent()); - _jmxUtils.setUp(); super.setUp(); } @@ -86,11 +86,11 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix(); - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); - setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); _clusterCreator.configureClusterNodes(); _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); diff --git a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index 49b3ddd3dc..298d5bc045 100644 --- a/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import javax.management.JMException; +import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeType; import javax.management.openmbean.TabularData; @@ -84,7 +85,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase { when(_store.getName()).thenReturn(TEST_STORE_NAME); - String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME; + String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME); assertEquals(expectedObjectName, _mBean.getObjectName().toString()); } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 9323111fdd..6e64ea5597 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -54,13 +54,10 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.federation.Bridge; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; @@ -73,7 +70,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; @@ -423,8 +419,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); - recoverBrokerLinks(lrh); + brh.completeBindingRecovery(); } catch (DatabaseException e) { @@ -466,66 +461,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) - { - Cursor cursor = null; - - try - { - cursor = _linkDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - long createTime = LongBinding.entryToLong(value); - Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); - - ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); - - recoverBridges(brh, id); - } - } - finally - { - closeCursorSafely(cursor); - } - - } - - private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) - { - Cursor cursor = null; - - try - { - cursor = _bridgeDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - if(parentId.equals(linkId)) - { - - long createTime = LongBinding.entryToLong(value); - Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); - brh.bridge(id,createTime,arguments); - } - } - brh.completeBridgeRecoveryForLink(); - } - finally - { - closeCursorSafely(cursor); - } - - } - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException { @@ -940,89 +875,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - public void createBrokerLink(final BrokerLink link) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); - - DatabaseEntry value = new DatabaseEntry(); - LongBinding.longToEntry(link.getCreateTime(), value); - StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); - - try - { - _linkDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Link " + link - + " to database: " + e.getMessage(), e); - } - } - } - - public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(link.getQMFId(), key); - try - { - OperationStatus status = _linkDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Link " + link + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e); - } - } - - public void createBridge(final Bridge bridge) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); - - DatabaseEntry value = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getQMFId(),value); - LongBinding.longToEntry(bridge.getCreateTime(),value); - StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value); - - try - { - _bridgeDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Bridge " + bridge - + " to database: " + e.getMessage(), e); - } - - } - } - - public void deleteBridge(final Bridge bridge) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding.getInstance().objectToEntry(bridge.getQMFId(), key); - try - { - OperationStatus status = _bridgeDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Bridge " + bridge + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e); - } - } /** * Places a message onto a specified queue, in a given transaction. @@ -1050,7 +902,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { LOGGER.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " [Transaction" + tx + "]"); + + " in transaction " + tx); } _deliveryDb.put(tx, key, value); } @@ -1204,7 +1056,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("commitTranImpl completed for [Transaction:" + tx + "]"); + String transactionType = syncCommit ? "synchronous" : "asynchronous"; + LOGGER.debug("commitTranImpl completed " + transactionType + " transaction " + tx); } } catch (DatabaseException e) @@ -1226,7 +1079,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("abortTran called for [Transaction:" + tx + "]"); + LOGGER.debug("abortTran called for transaction " + tx); } try @@ -1338,7 +1191,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing content for message " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing content for message " + messageId + " in transaction " + tx); } } @@ -1363,8 +1216,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = " - + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called"); + LOGGER.debug("storeMetaData called for transaction " + tx + + ", messageId " + messageId + + ", messageMetaData " + messageMetaData); } DatabaseEntry key = new DatabaseEntry(); @@ -1378,7 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageMetaDataDb.put(tx, key, value); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]"); + LOGGER.debug("Storing message metadata for message id " + messageId + " in transaction " + tx); } } catch (DatabaseException e) @@ -1680,7 +1534,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore else { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); + buf.limit(length); buf.position(0); return buf; } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index c40f24dbc3..ba111e8091 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -105,7 +105,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); }}); - public static final String BDB_HA_STORE_TYPE = "BDB-HA"; + public static final String TYPE = "BDB-HA"; private String _groupName; private String _nodeName; @@ -602,6 +602,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess @Override public String getStoreType() { - return BDB_HA_STORE_TYPE; + return TYPE; } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java new file mode 100644 index 0000000000..20dce2628d --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreFactory; + +public class BDBHAMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return BDBHAMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new BDBHAMessageStore(); + } + +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 82bc3d8564..4028de4b80 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -42,7 +42,7 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - private static final String BDB_STORE_TYPE = "BDB"; + public static final String TYPE = "BDB"; private CommitThreadWrapper _commitThreadWrapper; @Override @@ -108,7 +108,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore @Override public String getStoreType() { - return BDB_STORE_TYPE; + return TYPE; } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java new file mode 100644 index 0000000000..126bf1928a --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreFactory; + +public class BDBMessageStoreFactory implements MessageStoreFactory +{ + + @Override + public String getType() + { + return BDBMessageStore.TYPE; + } + + @Override + public MessageStore createMessageStore() + { + return new BDBMessageStore(); + } + +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index fe1556b5a6..598d20146c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -80,7 +80,7 @@ public class CommitThreadWrapper { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); + LOGGER.debug("complete() called for transaction " + _tx); } _complete = true; @@ -101,7 +101,10 @@ public class CommitThreadWrapper if(!_syncCommit) { - LOGGER.debug("CommitAsync was requested, returning immediately."); + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("CommitAsync was requested, returning immediately."); + } return; } @@ -121,6 +124,12 @@ public class CommitThreadWrapper public synchronized void waitForCompletion() { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + while (!isComplete()) { _commitThread.explicitNotify(); @@ -133,6 +142,12 @@ public class CommitThreadWrapper throw new RuntimeException(e); } } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } } } @@ -198,8 +213,20 @@ public class CommitThreadWrapper try { + long startTime = 0; + if(LOGGER.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } + _environment.flushLog(true); + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("flushLog completed in " + duration + " ms"); + } + for(int i = 0; i < size; i++) { BDBCommitFuture commit = _jobQueue.poll(); diff --git a/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.store.MessageStoreFactory b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.store.MessageStoreFactory new file mode 100644 index 0000000000..0be7035e2e --- /dev/null +++ b/java/bdbstore/src/main/resources/META-INF/services/org.apache.qpid.server.store.MessageStoreFactory @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory +org.apache.qpid.server.store.berkeleydb.BDBHAMessageStoreFactory
\ No newline at end of file diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java index 342c185b99..7c04d83e79 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java @@ -63,7 +63,7 @@ public class BDBBackupTest extends QpidBrokerTestCase // It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...") // but the config as known to QBTC does not pull-in the virtualhost section from its separate source file - _backupFromDir = new File(qpidWork + "/bdbstore/" + TEST_VHOST + "-store"); + _backupFromDir = new File(qpidWork + File.separator + TEST_VHOST + "-store"); boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java index a04fb20680..8e32a1d113 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java @@ -24,12 +24,8 @@ import java.io.File; import java.net.InetAddress; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.TestLogActor; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -51,6 +47,7 @@ public class BDBHAMessageStoreTest extends QpidTestCase private int _masterPort; private String _host; private XMLConfiguration _configXml; + private VirtualHost _virtualHost; public void setUp() throws Exception { @@ -63,41 +60,48 @@ public class BDBHAMessageStoreTest extends QpidTestCase FileUtils.delete(new File(_workDir), true); _configXml = new XMLConfiguration(); - } - public void tearDown() throws Exception - { - FileUtils.delete(new File(_workDir), true); - super.tearDown(); - } + BrokerTestHelper.setUp(); + } - public void testSetSystemConfiguration() throws Exception + public void tearDown() throws Exception { - // create virtual host configuration, registry and host instance - addVirtualHostConfiguration(); - TestApplicationRegistry registry = initialize(); try { - VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort); - BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore(); - - // test whether JVM system settings were applied - Environment env = store.getEnvironment(); - assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); - assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); - - ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); - assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, - repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); - assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, - repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); + FileUtils.delete(new File(_workDir), true); + if (_virtualHost != null) + { + _virtualHost.close(); + } } finally { - ApplicationRegistry.remove(); + BrokerTestHelper.tearDown(); + super.tearDown(); } } + public void testSetSystemConfiguration() throws Exception + { + // create virtual host configuration, registry and host instance + addVirtualHostConfiguration(); + String vhostName = "test" + _masterPort; + VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock()); + _virtualHost = BrokerTestHelper.createVirtualHost(configuration); + BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore(); + + // test whether JVM system settings were applied + Environment env = store.getEnvironment(); + assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); + assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); + + ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); + assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, + repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); + assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, + repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); + } + private void addVirtualHostConfiguration() throws Exception { int port = findFreePort(); @@ -152,14 +156,4 @@ public class BDBHAMessageStoreTest extends QpidTestCase } return _host + ":" + _masterPort; } - - private TestApplicationRegistry initialize() throws Exception - { - CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); - ServerConfiguration configuration = new ServerConfiguration(_configXml); - TestApplicationRegistry registry = new TestApplicationRegistry(configuration); - ApplicationRegistry.initialise(registry); - registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort); - return registry; - } } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index eef9f7eab4..d18c850ecf 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -225,7 +225,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto messageStore.close(); AbstractBDBMessageStore newStore = new BDBMessageStore(); - newStore.configure("", _config.subset("store")); + newStore.configure("", getConfig().subset("store")); newStore.startWithNoRecover(); diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java index 122f846a2d..390d667db0 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.util.HashMap; +import java.util.Map; + import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -35,6 +38,11 @@ import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; @@ -62,6 +70,11 @@ public class BDBStoreUpgradeTestPreparer public static final String QUEUE_NAME="myUpgradeQueue"; public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; + public static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; + public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; + public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner"; + public static final String MISUSED_OWNER = "misused-owner-as-description"; + private static AMQConnectionFactory _connFac; private static final String CONN_URL = "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'"; @@ -86,10 +99,10 @@ public class BDBStoreUpgradeTestPreparer { Connection connection = _connFac.createConnection(); AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME); + AMQShortString queueName = new AMQShortString(NON_DURABLE_QUEUE_NAME); AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME); session.sendCreateQueue(queueName, false, false, false, null); - session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination); + session.bindQueue(queueName, queueName, null, new AMQShortString("amq.direct"), destination); MessageProducer messageProducer = session.createProducer(destination); sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3); connection.close(); @@ -140,11 +153,56 @@ public class BDBStoreUpgradeTestPreparer // Publish 5 persistent messages which will NOT be committed and so should be 'lost' sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); + messageProducer.close(); + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + // Create a priority queue on broker + final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>(); + priorityQueueArguments.put("x-qpid-priorities",10); + createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments); + + // Create a queue that has a DLQ + final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>(); + queueWithDLQArguments.put("x-qpid-dlq-enabled", true); + queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2); + createAndBindQueueOnBroker(session, QUEUE_WITH_DLQ_NAME, queueWithDLQArguments); + + // Send message to the DLQ + Queue dlq = session.createQueue("fanout://" + QUEUE_WITH_DLQ_NAME + "_DLE//does-not-matter"); + MessageProducer dlqMessageProducer = session.createProducer(dlq); + sendMessages(session, dlqMessageProducer, dlq, DeliveryMode.PERSISTENT, 1*1024, 1); + session.commit(); + // Create a queue with JMX specifying an owner, so it can later be moved into description + createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments); session.close(); connection.close(); } + private void createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception + { + ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); + Queue queue = (Queue) session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='true'"); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); + } + + private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception + { + Map<String, Object> environment = new HashMap<String, Object>(); + environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","admin"}); + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi"); + JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment); + MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); + ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"test\""); + + Object[] params = new Object[] {queueName, owner, true, arguments}; + String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()}; + mbsc.invoke(virtualHost, "createNewQueue", params, signature); + + ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"test\",name=\"amq.direct\",ExchangeType=direct"); + mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()}); + } /** * Prepare a DurableSubscription backing queue for use in testing selector * recovery and queue exclusivity marking during the upgrade process. diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index 4e201d5473..e4837b212e 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -22,16 +22,20 @@ package org.apache.qpid.server.store.berkeleydb; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; import java.io.File; +import java.io.InputStream; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -43,7 +47,10 @@ import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; +import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -70,7 +77,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase public void setUp() throws Exception { assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); - _storeLocation = getWorkDirBaseDir() + "/bdbstore/test-store"; + _storeLocation = getWorkDirBaseDir() + File.separator + "test-store"; //Clear the two target directories if they exist. File directory = new File(_storeLocation); @@ -78,15 +85,13 @@ public class BDBUpgradeTest extends QpidBrokerTestCase { FileUtils.delete(directory, true); } + directory.mkdirs(); // copy store files - String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath(); - FileUtils.copyRecursive(new File(src), new File(_storeLocation)); - - //override the broker config used and then start the broker with the updated store - _configFile = new File("build/etc/config-systests-bdb.xml"); - setConfigurationProperty("management.enabled", "true"); + InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb"); + FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); + getBrokerConfiguration().addJmxManagementConfiguration(); super.setUp(); } @@ -302,11 +307,110 @@ public class BDBUpgradeTest extends QpidBrokerTestCase Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); - for (int i = 0; i < 3; i++) + for (int i = 1; i <= 3; i++) { Message message = messageConsumer.receive(1000); assertNotNull("Message was not migrated!", message); assertTrue("Unexpected message received!", message instanceof TextMessage); + assertEquals("ID property did not match", i, message.getIntProperty("ID")); + } + } + + /** + * Tests store upgrade has maintained the priority queue configuration, + * such that sending messages with priorities out-of-order and then consuming + * them gets the messages back in priority order. + */ + public void testPriorityQueue() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // send some messages to the priority queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(PRIORITY_QUEUE_NAME); + MessageProducer producer = session.createProducer(queue); + + producer.setPriority(4); + producer.send(createMessage(1, false, session, producer)); + producer.setPriority(1); + producer.send(createMessage(2, false, session, producer)); + producer.setPriority(9); + producer.send(createMessage(3, false, session, producer)); + session.close(); + + //consume the messages, expected order: msg 3, msg 1, msg 2. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + Message msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(2, msg.getIntProperty("msg")); + } + + /** + * Test that the queue configured to have a DLQ was recovered and has the alternate exchange + * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the + * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ. + * + * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments + * that turned it on for this specific queue. + */ + public void testRecoveryOfQueueWithDLQ() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ + ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE"); + assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType()); + TabularDataSupport bindings = (TabularDataSupport) exchange.bindings(); + assertEquals(1, bindings.size()); + for(Object o : bindings.values()) + { + CompositeData binding = (CompositeData) o; + + String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY); + String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES); + + //Because its a fanout exchange, we just return a single '*' key with all bound queues + assertEquals("unexpected binding key", "*", bindingKey); + assertEquals("unexpected number of queues bound", 1, queueNames.length); + assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]); + } + + //verify the queue exists, has the expected alternate exchange and max delivery count + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME); + assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount()); + + ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ"); + assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount()); + + String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ"); + assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString)); + } + finally + { + jmxUtils.close(); } } @@ -387,4 +491,11 @@ public class BDBUpgradeTest extends QpidBrokerTestCase session.close(); } + private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msgId); + send.setIntProperty("msg", msgId); + + return send; + } } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java index c6a9ba8f8b..0e1ef7b25d 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestUtils; import com.sleepycat.je.rep.ReplicationConfig; @@ -134,7 +135,10 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase public void assertFailoverOccurs(long delay) throws InterruptedException { - _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + if (!_failoverLatch.await(delay, TimeUnit.MILLISECONDS)) + { + LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n"); + } assertEquals("Failover did not occur", 0, _failoverLatch.getCount()); } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index abe13edc32..4c2fa910f5 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -43,6 +43,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.url.URLSyntaxException; @@ -67,7 +68,7 @@ public class HATestClusterCreator private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>(); private final String _virtualHostName; - private final String _storeConfigKeyPrefix; + private final String _vhostStoreConfigKeyPrefix; private final String _ipAddressOfBroker; private final String _groupName ; @@ -82,7 +83,7 @@ public class HATestClusterCreator _groupName = "group" + _testcase.getName(); _ipAddressOfBroker = getIpAddressOfBrokerHost(); _numberOfNodes = numberOfNodes; - _storeConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; + _vhostStoreConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; _bdbHelperPort = 0; } @@ -102,7 +103,9 @@ public class HATestClusterCreator } configureClusterNode(brokerPort, bdbPort); - collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts()); + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); + brokerConfiguration.addJmxManagementConfiguration(); + collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts()); brokerPort = _testcase.getNextAvailable(bdbPort + 1); } @@ -127,7 +130,7 @@ public class HATestClusterCreator */ private String getConfigKey(String configKeySuffix) { - final String configKey = StringUtils.substringAfter(_storeConfigKeyPrefix + configKeySuffix, "virtualhosts."); + final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts."); return configKey; } @@ -135,7 +138,6 @@ public class HATestClusterCreator { final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); - _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration()); _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts()); _testcase.startBroker(brokerPortNumber); @@ -204,7 +206,7 @@ public class HATestClusterCreator public void stopNode(final int brokerPortNumber) { - _testcase.stopBroker(brokerPortNumber); + _testcase.killBroker(brokerPortNumber); } public void stopCluster() throws Exception @@ -348,12 +350,12 @@ public class HATestClusterCreator { final String nodeName = getNodeNameForNodeAt(bdbPort); - _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); - _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.groupName", _groupName); - _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeName", nodeName); - _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); - _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); } public String getIpAddressOfBrokerHost() @@ -369,24 +371,24 @@ public class HATestClusterCreator } } - private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) { - _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(), + _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration, (XMLConfiguration) testVirtualhosts.clone())); } public class BrokerConfigHolder { - private final XMLConfiguration _testConfiguration; + private final TestBrokerConfiguration _testConfiguration; private final XMLConfiguration _testVirtualhosts; - public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) { _testConfiguration = testConfiguration; _testVirtualhosts = testVirtualhosts; } - public XMLConfiguration getTestConfiguration() + public TestBrokerConfiguration getTestConfiguration() { return _testConfiguration; } @@ -416,7 +418,7 @@ public class HATestClusterCreator public String getStoreConfigKeyPrefix() { - return _storeConfigKeyPrefix; + return _vhostStoreConfigKeyPrefix; } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java new file mode 100644 index 0000000000..d33eb868c2 --- /dev/null +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreCreatorTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; +import org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.derby.DerbyMessageStore; +import org.apache.qpid.test.utils.QpidTestCase; + +public class MessageStoreCreatorTest extends QpidTestCase +{ + private static final String[] STORE_TYPES = {MemoryMessageStore.TYPE, DerbyMessageStore.TYPE, BDBMessageStore.TYPE, BDBHAMessageStore.TYPE}; + + public void testMessageStoreCreator() + { + MessageStoreCreator messageStoreCreator = new MessageStoreCreator(); + for (String type : STORE_TYPES) + { + MessageStore store = messageStoreCreator.createMessageStore(type); + assertNotNull("Store of type " + type + " is not created", store); + } + } +} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java index cd2654f79f..b2b28b3c2d 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -20,7 +20,14 @@ */ package org.apache.qpid.server.store.berkeleydb.upgrade; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; + import java.io.File; +import java.io.InputStream; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.subjects.TestBlankSubject; @@ -51,15 +58,15 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase } } - public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue", - "queue-non-durable", "nonexclusive-with-erroneous-owner" }; - public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0}; - public static int TOTAL_MESSAGE_NUMBER = 15; + public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", QUEUE_NAME, NON_DURABLE_QUEUE_NAME, + NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, PRIORITY_QUEUE_NAME, QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME + "_DLQ" }; + public static int[] QUEUE_SIZES = { 1, 1, 10, 3, 0, 0, 0, 1}; + public static int TOTAL_MESSAGE_NUMBER = 16; protected static final LogSubject LOG_SUBJECT = new TestBlankSubject(); - // one binding per exchange - protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2; - protected static final int TOTAL_EXCHANGES = 5; + // myQueueWithDLQ_DLQ is not bound to the default exchange + protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2 - 1; + protected static final int TOTAL_EXCHANGES = 6; private File _storeLocation; protected Environment _environment; @@ -105,10 +112,24 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase private File copyStore(String storeDirectoryName) throws Exception { - String src = getClass().getClassLoader().getResource("upgrade/" + storeDirectoryName).toURI().getPath(); File storeLocation = new File(new File(TMP_FOLDER), "test-store"); deleteDirectoryIfExists(storeLocation); - FileUtils.copyRecursive(new File(src), new File(TMP_FOLDER)); + storeLocation.mkdirs(); + int index = 0; + String prefix = "0000000"; + String extension = ".jdb"; + InputStream is = null; + do + { + String fileName = prefix + index + extension; + is = getClass().getClassLoader().getResourceAsStream("upgrade/" + storeDirectoryName + "/test-store/" + fileName); + if (is != null) + { + FileUtils.copy(is, new File(storeLocation, fileName)); + } + index++; + } + while (is != null); return storeLocation; } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java index 65a8bb03fb..500fb0a919 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.server.store.berkeleydb.upgrade; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NONEXCLUSIVE_WITH_ERRONEOUS_OWNER; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -34,7 +41,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey; @@ -50,9 +56,6 @@ import com.sleepycat.je.Transaction; public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase { - private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; - private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME; - private static final String NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner"; private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName"; private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName"; private static final String EXCHANGE_DB_NAME = "exchangeDb_v5"; @@ -85,15 +88,14 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase final List<BindingRecord> queueBindings = loadBindings(); - assertEquals("Unxpected list size", TOTAL_BINDINGS, queueBindings.size()); - assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, ""); - assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", - BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); - assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); - assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null); - assertBindingRecord(queueBindings, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, null); + assertEquals("Unxpected bindings size", TOTAL_BINDINGS, queueBindings.size()); + assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, ""); + assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", SELECTOR_TOPIC_NAME, "testprop='true'"); + assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null); + assertBindingRecord(queueBindings, NON_DURABLE_QUEUE_NAME, "amq.direct", NON_DURABLE_QUEUE_NAME, null); + assertBindingRecord(queueBindings, NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "amq.direct", NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, null); - assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); + assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); assertContent(); } @@ -102,26 +104,29 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName()); - assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE, NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER))); + HashSet<String> queues = new HashSet<String>(Arrays.asList(QUEUE_NAMES)); + assertTrue(NON_DURABLE_QUEUE_NAME + " should be in the list of queues" , queues.remove(NON_DURABLE_QUEUE_NAME)); + + assertQueues(queues); - assertDatabaseRecordCount(DELIVERY_DB_NAME, 12); - assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12); + assertDatabaseRecordCount(DELIVERY_DB_NAME, 13); + assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 13); assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES); assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1); assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1); - assertQueueMessages(DURABLE_QUEUE, 10); + assertQueueMessages(QUEUE_NAME, 10); + assertQueueMessages(QUEUE_WITH_DLQ_NAME + "_DLQ", 1); final List<BindingRecord> queueBindings = loadBindings(); assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size()); - assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, - ""); + assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", TOPIC_NAME, ""); assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", - BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); - assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); + SELECTOR_TOPIC_NAME, "testprop='true'"); + assertBindingRecord(queueBindings, QUEUE_NAME, "amq.direct", QUEUE_NAME, null); - assertQueueHasOwner(NON_EXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); + assertQueueHasOwner(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, "misused-owner-as-description"); assertContent(); } diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 2d2a6b20a2..c33d427868 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.store.berkeleydb.upgrade; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CONFIGURED_OBJECTS_DB_NAME; import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_CONTENT_DB_NAME; import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_DELIVERY_DB_NAME; @@ -42,6 +44,7 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -50,7 +53,6 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.ConfiguredObjectBinding; -import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransaction; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransactionBinding; @@ -60,6 +62,7 @@ import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewRecord import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransaction; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransactionBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldRecordImpl; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeUUIDBinding; import org.apache.qpid.server.util.MapJsonSerializer; @@ -115,8 +118,8 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName()); - assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 11); - assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 11); + assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); + assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12); assertConfiguredObjects(); assertQueueEntries(); @@ -264,17 +267,17 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase private void assertDatabaseRecordCounts() { - assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 12); - assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12); + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 21); + assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 13); - assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); - assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12); + assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 13); + assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 13); } private void assertConfiguredObjects() { Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - assertEquals("Unexpected number of configured objects", 12, configuredObjects.size()); + assertEquals("Unexpected number of configured objects", 21, configuredObjects.size()); Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(12); List<UUID> expectedBindingIDs = new ArrayList<UUID>(); @@ -282,8 +285,26 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase expected.add(createExpectedQueueMap("myUpgradeQueue", Boolean.FALSE, null, null)); expected.add(createExpectedQueueMap("clientid:mySelectorDurSubName", Boolean.TRUE, "clientid", null)); expected.add(createExpectedQueueMap("clientid:myDurSubName", Boolean.TRUE, "clientid", null)); - expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null, - Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description"))); + + final Map<String, Object> queueWithOwnerArguments = new HashMap<String, Object>(); + queueWithOwnerArguments.put("x-qpid-priorities", 10); + queueWithOwnerArguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, "misused-owner-as-description"); + expected.add(createExpectedQueueMap("nonexclusive-with-erroneous-owner", Boolean.FALSE, null,queueWithOwnerArguments)); + + final Map<String, Object> priorityQueueArguments = new HashMap<String, Object>(); + priorityQueueArguments.put("x-qpid-priorities", 10); + expected.add(createExpectedQueueMap(PRIORITY_QUEUE_NAME, Boolean.FALSE, null, priorityQueueArguments)); + + final Map<String, Object> queueWithDLQArguments = new HashMap<String, Object>(); + queueWithDLQArguments.put("x-qpid-dlq-enabled", true); + queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2); + expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME, Boolean.FALSE, null, queueWithDLQArguments)); + + final Map<String, Object> dlqArguments = new HashMap<String, Object>(); + dlqArguments.put("x-qpid-dlq-enabled", false); + dlqArguments.put("x-qpid-maximum-delivery-count", 0); + expected.add(createExpectedQueueMap(QUEUE_WITH_DLQ_NAME + "_DLQ", Boolean.FALSE, null, dlqArguments)); + expected.add(createExpectedExchangeMap(QUEUE_WITH_DLQ_NAME + "_DLE", "fanout")); expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue","myUpgradeQueue", "<<default>>", null, expectedBindingIDs)); expected.add(createExpectedQueueBindingMapAndID("myUpgradeQueue", "myUpgradeQueue", "amq.direct", null, expectedBindingIDs)); @@ -296,6 +317,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner", "nonexclusive-with-erroneous-owner", "amq.direct", null, expectedBindingIDs)); expected.add(createExpectedQueueBindingMapAndID("nonexclusive-with-erroneous-owner","nonexclusive-with-erroneous-owner", "<<default>>", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "<<default>>", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_NAME, "amq.direct", null, expectedBindingIDs)); + + expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "<<default>>", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME, QUEUE_WITH_DLQ_NAME, "amq.direct", null, expectedBindingIDs)); + expected.add(createExpectedQueueBindingMapAndID(QUEUE_WITH_DLQ_NAME + "_DLQ", "dlq", QUEUE_WITH_DLQ_NAME + "_DLE", null, expectedBindingIDs)); + Set<String> expectedTypes = new HashSet<String>(); expectedTypes.add(Queue.class.getName()); expectedTypes.add(Exchange.class.getName()); @@ -305,7 +333,9 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { UpgradeConfiguredObjectRecord object = entry.getValue(); Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes()); - assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized)); + + assertTrue("Unexpected entry in a store - json [" + object.getAttributes() + "], map [" + deserialized + "]", + expected.remove(deserialized)); String type = object.getType(); assertTrue("Unexpected type:" + type, expectedTypes.contains(type)); UUID key = entry.getKey(); @@ -350,7 +380,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase return expectedQueueBinding; } - private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, String> argumentMap) + private Map<String, Object> createExpectedQueueMap(String name, boolean exclusiveFlag, String owner, Map<String, Object> argumentMap) { Map<String, Object> expectedQueueEntry = new HashMap<String, Object>(); expectedQueueEntry.put(Queue.NAME, name); @@ -363,6 +393,15 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase return expectedQueueEntry; } + private Map<String, Object> createExpectedExchangeMap(String name, String type) + { + Map<String, Object> expectedExchnageEntry = new HashMap<String, Object>(); + expectedExchnageEntry.put(Exchange.NAME, name); + expectedExchnageEntry.put(Exchange.TYPE, type); + expectedExchnageEntry.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name()); + return expectedExchnageEntry; + } + private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects() { final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>(); diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb Binary files differindex f5ed9aa5a2..cfc1f05d28 100644 --- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb +++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb Binary files differindex f5ed9aa5a2..cfc1f05d28 100644 --- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb +++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb Binary files differindex d5ae8c1096..4b45ff61e6 100644 --- a/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb +++ b/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb |