From 9b5723a1f2a3b72c81c143f78d7b8dc5ee6b7271 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Mon, 23 Sep 2013 23:45:59 +0000 Subject: QPID-5162: move the bdbstore-specific systests to their own module git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1525741 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/HAClusterManagementTest.java | 289 ---------- .../store/berkeleydb/HAClusterTwoNodeTest.java | 217 -------- .../server/store/berkeleydb/BDBBackupTest.java | 193 ------- .../store/berkeleydb/BDBMessageStoreTest.java | 612 --------------------- .../berkeleydb/BDBStoreUpgradeTestPreparer.java | 6 +- .../server/store/berkeleydb/BDBUpgradeTest.java | 501 ----------------- .../store/berkeleydb/HAClusterBlackboxTest.java | 169 ------ .../store/berkeleydb/HAClusterWhiteboxTest.java | 266 --------- .../store/berkeleydb/HATestClusterCreator.java | 429 --------------- qpid/java/bdbstore/systests/build.xml | 28 + .../server/store/berkeleydb/BDBBackupTest.java | 193 +++++++ .../store/berkeleydb/BDBMessageStoreTest.java | 612 +++++++++++++++++++++ .../server/store/berkeleydb/BDBUpgradeTest.java | 544 ++++++++++++++++++ .../store/berkeleydb/HAClusterBlackboxTest.java | 167 ++++++ .../store/berkeleydb/HAClusterManagementTest.java | 289 ++++++++++ .../store/berkeleydb/HAClusterTwoNodeTest.java | 217 ++++++++ .../store/berkeleydb/HAClusterWhiteboxTest.java | 266 +++++++++ .../store/berkeleydb/HATestClusterCreator.java | 429 +++++++++++++++ qpid/java/build.xml | 2 +- qpid/java/module.xml | 9 +- 20 files changed, 2757 insertions(+), 2681 deletions(-) delete mode 100644 qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java delete mode 100644 qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java delete mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java create mode 100644 qpid/java/bdbstore/systests/build.xml create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java create mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java deleted file mode 100644 index 0e25c4e17a..0000000000 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; -import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import javax.jms.Connection; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; - -import org.apache.log4j.Logger; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import com.sleepycat.je.EnvironmentFailureException; - -/** - * System test verifying the ability to control a cluster via the Management API. - * - * @see HAClusterBlackboxTest - */ -public class HAClusterManagementTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); - - private static final Set NON_MASTER_STATES = new HashSet(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; - private static final String VIRTUAL_HOST = "test"; - - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); - private static final int NUMBER_OF_NODES = 4; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); - - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - _clusterCreator.configureClusterNodes(); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - _clusterCreator.startCluster(); - - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _jmxUtils.close(); - } - finally - { - super.tearDown(); - } - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testReadonlyMBeanAttributes() throws Exception - { - final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); - final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); - assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); - assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); - assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); - assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); - // As we have chosen an arbitrary broker from the cluster, we cannot predict its state - assertNotNull("Store state must not be null", storeBean.getNodeState()); - } - - public void testStateOfActiveBrokerIsMaster() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); - assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); - } - - public void testStateOfNonActiveBrokerIsNotMaster() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); - final String nodeState = storeBean.getNodeState(); - assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); - } - - public void testGroupMembers() throws Exception - { - final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); - - ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); - awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); - - final TabularData groupMembers = storeBean.getAllNodesInGroup(); - assertNotNull(groupMembers); - - for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) - { - final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); - final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); - - CompositeData row = groupMembers.get(new Object[] {nodeName}); - assertNotNull("Table does not contain row for node name " + nodeName, row); - assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); - } - } - - public void testRemoveNodeFromGroup() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); - final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); - awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); - - final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); - _clusterCreator.stopNode(brokerPortNumberToBeRemoved); - storeBean.removeNodeFromGroup(removedNodeName); - - final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); - assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); - } - - /** - * Updates the address of a node. - * - * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit - * assert. - * - * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case - */ - public void testUpdateAddress() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); - final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); - - _clusterCreator.stopNode(brokerPortNumberToBeMoved); - - final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); - final int newBdbPort = getNextAvailable(oldBdbPort + 1); - - storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); - - _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); - - _clusterCreator.startNode(brokerPortNumberToBeMoved); - } - - /** - * @see #testUpdateAddress() - */ - public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception - { - final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); - final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); - - _clusterCreator.stopNode(brokerPortNumberToBeMoved); - - final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); - final int newBdbPort = getNextAvailable(oldBdbPort + 1); - - // now deliberately don't call updateAddress - - _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); - - try - { - _clusterCreator.startNode(brokerPortNumberToBeMoved); - fail("Exception not thrown"); - } - catch(RuntimeException rte) - { - //check cause was BDBs EnvironmentFailureException - assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName())); - // PASS - } - } - - public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception - { - final Connection activeConnection = getConnection(_brokerFailoverUrl); - final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); - - ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); - - try - { - inactiveBroker.createNewQueue(getTestQueueName(), null, true); - fail("Exception not thrown"); - } - catch (Exception e) - { - String message = e.getMessage(); - assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message); - } - - try - { - inactiveBroker.createNewExchange(getName(), "direct", true); - fail("Exception not thrown"); - } - catch (Exception e) - { - String message = e.getMessage(); - assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message); - } - } - - private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception - { - _jmxUtils.open(brokerPortNumber); - - return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); - } - - private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception - { - _jmxUtils.open(brokerPortNumber); - - return _jmxUtils.getManagedBroker(VIRTUAL_HOST); - } - - private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception - { - long totalTimeWaited = 0l; - long waitInterval = 100l; - long maxWaitTime = 10000; - - int currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); - while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime) - { - LOGGER.debug("Still awaiting nodes to join group; expecting " - + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes - + " after " + totalTimeWaited + " ms."); - - totalTimeWaited += waitInterval; - Thread.sleep(waitInterval); - - currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); - } - - assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms", - expectedNumberOfNodes ,currentNumberOfNodes); - } -} diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java deleted file mode 100644 index 95626f7fa5..0000000000 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.management.ObjectName; - -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import com.sleepycat.je.rep.ReplicationConfig; - -public class HAClusterTwoNodeTest extends QpidBrokerTestCase -{ - private static final long RECEIVE_TIMEOUT = 5000l; - - private static final String VIRTUAL_HOST = "test"; - - private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); - private static final int NUMBER_OF_NODES = 2; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); - - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _jmxUtils.close(); - } - finally - { - super.tearDown(); - } - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - private void startCluster(boolean designedPrimary) throws Exception - { - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - - String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix(); - - setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); - setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); - - setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); - setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); - - _clusterCreator.configureClusterNodes(); - _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - _clusterCreator.startCluster(); - } - - public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception - { - startCluster(true); - final Connection initialConnection = getConnection(_brokerFailoverUrl); - int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - assertProducingConsuming(initialConnection); - initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startNode(masterPort); - final Connection secondConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(secondConnection); - secondConnection.close(); - } - - public void testClusterRestartWithoutDesignatedPrimary() throws Exception - { - startCluster(false); - final Connection initialConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(initialConnection); - initialConnection.close(); - _clusterCreator.stopCluster(); - _clusterCreator.startClusterParallel(); - final Connection secondConnection = getConnection(_brokerFailoverUrl); - assertProducingConsuming(secondConnection); - secondConnection.close(); - } - - public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to primary", connection); - assertProducingConsuming(connection); - } - - public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception - { - startCluster(false); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to primary", connection); - try - { - assertProducingConsuming(connection); - fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available"); - } - catch(JMSException e) - { - // JMSException should be thrown on transaction start/commit - } - } - - public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); - - try - { - getConnection(_brokerFailoverUrl); - fail("Connection not expected"); - } - catch (JMSException e) - { - // PASS - } - } - - public void testInitialDesignatedPrimaryStateOfNodes() throws Exception - { - startCluster(true); - final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); - assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); - - final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); - } - - public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception - { - startCluster(true); - _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); - final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); - - assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); - storeBean.setDesignatedPrimary(true); - assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); - - final Connection connection = getConnection(_brokerFailoverUrl); - assertNotNull("Expected to get a valid connection to new primary", connection); - assertProducingConsuming(connection); - } - - private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( - final int activeBrokerPortNumber) throws Exception - { - _jmxUtils.open(activeBrokerPortNumber); - - ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); - return storeBean; - } - - private void assertProducingConsuming(final Connection connection) throws JMSException, Exception - { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - sendMessage(session, destination, 1); - connection.start(); - Message m1 = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message 1 is not received", m1); - assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); - session.commit(); - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java deleted file mode 100644 index 7c04d83e79..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.Piper; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.FileUtils; - -/** - * Tests the BDB backup script can successfully perform a backup and that - * backup can be restored and used by the Broker. - */ -public class BDBBackupTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class); - - private static final String BACKUP_SCRIPT = "/bin/backup.sh"; - private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed"; - - private static final String TEST_VHOST = "test"; - private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir"); - - private File _backupToDir; - private File _backupFromDir; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); - _backupToDir.mkdirs(); - - final String qpidWork = getBroker(DEFAULT_PORT).getWorkingDirectory(); - - // It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...") - // but the config as known to QBTC does not pull-in the virtualhost section from its separate source file - _backupFromDir = new File(qpidWork + File.separator + TEST_VHOST + "-store"); - boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); - assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); - } - - @Override - protected void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - FileUtils.delete(_backupToDir, true); - } - } - - public void testBackupAndRestoreMaintainsMessages() throws Exception - { - sendNumberedMessages(0, 10); - invokeBdbBackup(_backupFromDir, _backupToDir); - sendNumberedMessages(10, 20); - confirmBrokerHasMessages(0, 20); - stopBroker(); - - deleteStore(_backupFromDir); - replaceStoreWithBackup(_backupToDir, _backupFromDir); - - startBroker(); - confirmBrokerHasMessages(0, 10); - } - - private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception - { - Connection con = getConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(getTestQueueName()); - // Create queue by consumer side-effect - session.createConsumer(destination).close(); - - final int numOfMessages = endIndex - startIndex; - final int batchSize = 0; - sendMessage(session, destination, numOfMessages, startIndex, batchSize); - con.close(); - } - - private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception - { - Connection con = getConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - con.start(); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - for (int i = startIndex; i < endIndex; i++) - { - Message msg = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message " + i + " not received", msg); - assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); - } - - Message msg = consumer.receive(100); - if(msg != null) - { - fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); - } - con.close(); - } - - private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception - { - if (String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows")) - { - BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()}); - } - else - { - runBdbBackupScript(backupFromDir, backupToDir); - } - } - - private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException, - InterruptedException - { - Process backupProcess = null; - try - { - String qpidHome = System.getProperty(QPID_HOME); - ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()); - pb.redirectErrorStream(true); - Map env = pb.environment(); - env.put(QPID_HOME, qpidHome); - - LOGGER.debug("Backup command is " + pb.command()); - backupProcess = pb.start(); - Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE); - piper.start(); - piper.await(2, TimeUnit.SECONDS); - backupProcess.waitFor(); - piper.join(); - - LOGGER.debug("Backup command completed " + backupProcess.exitValue()); - assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue()); - } - finally - { - if (backupProcess != null) - { - backupProcess.getErrorStream().close(); - backupProcess.getInputStream().close(); - backupProcess.getOutputStream().close(); - } - } - } - - private void replaceStoreWithBackup(File source, File dst) throws Exception - { - LOGGER.debug("Copying store " + source + " to " + dst); - FileUtils.copyRecursive(source, dst); - } - - private void deleteStore(File storeDir) - { - LOGGER.debug("Deleting store " + storeDir); - FileUtils.delete(storeDir, true); - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java deleted file mode 100644 index 76b990038d..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ /dev/null @@ -1,612 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; -import org.apache.qpid.server.store.MessageStoreTest; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; - -/** - * Subclass of MessageStoreTest which runs the standard tests from the superclass against - * the BDB Store as well as additional tests specific to the BDB store-implementation. - */ -public class BDBMessageStoreTest extends MessageStoreTest -{ - private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - - /** - * Tests that message metadata and content are successfully read back from a - * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to - * verify their ability to co-exist within the store and be successful retrieved. - */ - public void testBDBMessagePersistence() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - - AbstractBDBMessageStore bdbStore = assertBDBStore(store); - - // Create content ByteBuffers. - // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. - // Use a single chunk for the 0-10 message as per broker behaviour. - String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; - - ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); - ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); - - ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); - int bodySize = completeContentBody_0_10.limit(); - - /* - * Create and insert a 0-8 message (metadata and multi-chunk content) - */ - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); - StoredMessage storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); - - long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - storedMessage_0_8.addContent(0, firstContentBytes_0_8); - storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - storedMessage_0_8.flushToStore(); - - /* - * Create and insert a 0-10 message (metadata and content) - */ - MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); - DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); - Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); - - MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); - - MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); - - long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - - storedMessage_0_10.addContent(0, completeContentBody_0_10); - storedMessage_0_10.flushToStore(); - - /* - * reload the store only (read-only) - */ - AbstractBDBMessageStore readOnlyStore = reloadStore(bdbStore); - - /* - * Read back and validate the 0-8 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8); - - assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); - MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; - - assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); - - MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); - assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); - assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); - assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); - assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); - - ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); - assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); - assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); - assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); - - BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties(); - assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); - assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); - - ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; - long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8); - assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); - String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); - - /* - * Read back and validate the 0-10 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10); - - assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); - MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; - - assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); - - DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); - assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); - assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); - assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); - assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); - assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); - assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); - - MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); - assertNotNull("MessageProperties were not returned", returnedMsgProps); - assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); - assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); - assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); - - ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; - long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent); - assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); - - String returnedPayloadString_0_10 = new String(recoveredContent.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); - - readOnlyStore.close(); - } - - private DeliveryProperties createDeliveryProperties_0_10() - { - DeliveryProperties delProps_0_10 = new DeliveryProperties(); - - delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); - delProps_0_10.setImmediate(true); - delProps_0_10.setExchange("exchange12345"); - delProps_0_10.setRoutingKey("routingKey12345"); - delProps_0_10.setExpiration(5); - delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); - - return delProps_0_10; - } - - private MessageProperties createMessageProperties_0_10(int bodySize) - { - MessageProperties msgProps_0_10 = new MessageProperties(); - msgProps_0_10.setContentLength(bodySize); - msgProps_0_10.setCorrelationId("qwerty".getBytes()); - msgProps_0_10.setContentType("text/html"); - - return msgProps_0_10; - } - - /** - * Close the provided store and create a new (read-only) store to read back the data. - * - * Use this method instead of reloading the virtual host like other tests in order - * to avoid the recovery handler deleting the message for not being on a queue. - */ - private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception - { - messageStore.close(); - - AbstractBDBMessageStore newStore = new BDBMessageStore(); - newStore.configure(getVirtualHostModel(),true); - - newStore.startWithNoRecover(); - - return newStore; - } - - private MessagePublishInfo createPublishInfoBody_0_8() - { - return new MessagePublishInfo() - { - public AMQShortString getExchange() - { - return new AMQShortString("exchange12345"); - } - - public void setExchange(AMQShortString exchange) - { - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return true; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString("routingKey12345"); - } - }; - - } - - private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); - return new ContentHeaderBody(classForBasic, 1, props, length); - } - - private BasicContentHeaderProperties createContentHeaderProperties_0_8() - { - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); - props.setContentType("text/html"); - props.getHeaders().setString("Test", "MST"); - return props; - } - - public void testGetContentWithOffset() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(store); - StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - // normal case: offset is 0 - ByteBuffer dst = ByteBuffer.allocate(10); - int length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", CONTENT_BYTES.length, length); - byte[] array = dst.array(); - assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); - - // offset is in the middle - dst = ByteBuffer.allocate(10); - length = bdbStore.getContent(messageid_0_8, 5, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - byte[] expected = new byte[10]; - System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // offset beyond the content length - dst = ByteBuffer.allocate(10); - try - { - bdbStore.getContent(messageid_0_8, 15, dst); - fail("Should fail for the offset greater than message size"); - } - catch (RuntimeException e) - { - assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " - + messageid_0_8 + "!", e.getMessage()); - } - - // buffer is smaller then message size - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // buffer is smaller then message size, offset is not 0 - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 2, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - } - /** - * Tests that messages which are added to the store and then removed using the - * public MessageStore interfaces are actually removed from the store by then - * interrogating the store with its own implementation methods and verifying - * expected exceptions are thrown to indicate the message is not present. - */ - public void testMessageCreationAndRemoval() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(store); - - StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - bdbStore.removeMessage(messageid_0_8, true); - - //verify the removal using the BDB store implementation methods directly - try - { - // the next line should throw since the message id should not be found - bdbStore.getMessageMetaData(messageid_0_8); - fail("No exception thrown when message id not found getting metadata"); - } - catch (AMQStoreException e) - { - // pass since exception expected - } - - //expecting no content, allocate a 1 byte - ByteBuffer dst = ByteBuffer.allocate(1); - - assertEquals("Retrieved content when none was expected", - 0, bdbStore.getContent(messageid_0_8, 0, dst)); - } - private AbstractBDBMessageStore assertBDBStore(MessageStore store) - { - - assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); - - return (AbstractBDBMessageStore) store; - } - - private StoredMessage createAndStoreSingleChunkMessage_0_8(MessageStore store) - { - ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); - - int bodySize = CONTENT_BYTES.length; - - //create and store the message using the MessageStore interface - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); - StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - - storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); - - return storedMessage_0_8; - } - - /** - * Tests transaction commit by utilising the enqueue and dequeue methods available - * in the TransactionLog interface implemented by the store, and verifying the - * behaviour using BDB implementation methods. - */ - public void testTranCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - AbstractBDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public UUID getId() - { - return mockQueueId; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(1L)); - txn.enqueueMessage(mockQueue, new MockMessage(5L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 1L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 5L, val.longValue()); - } - - - /** - * Tests transaction rollback before a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackBeforeCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - AbstractBDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public UUID getId() - { - return mockQueueId; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(21L)); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(22L)); - txn.enqueueMessage(mockQueue, new MockMessage(23L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 22L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 23L, val.longValue()); - } - - public void testOnDelete() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - AbstractBDBMessageStore bdbStore = assertBDBStore(log); - String storeLocation = bdbStore.getStoreLocation(); - - File location = new File(storeLocation); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - bdbStore.close(); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - bdbStore.onDelete(); - assertFalse("Store exists at " + storeLocation, location.exists()); - } - - /** - * Tests transaction rollback after a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackAfterCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - AbstractBDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public UUID getId() - { - return mockQueueId; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(30L)); - txn.commitTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(31L)); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(32L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 30L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 32L, val.longValue()); - } - - @SuppressWarnings("rawtypes") - private static class MockMessage implements ServerMessage, EnqueableMessage - { - private long _messageId; - - public MockMessage(long messageId) - { - _messageId = messageId; - } - - public String getRoutingKey() - { - return null; - } - - public AMQMessageHeader getMessageHeader() - { - return null; - } - - public StoredMessage getStoredMessage() - { - return null; - } - - public boolean isPersistent() - { - return true; - } - - public long getSize() - { - return 0; - } - - public boolean isImmediate() - { - return false; - } - - public long getExpiration() - { - return 0; - } - - public MessageReference newReference() - { - return null; - } - - public long getMessageNumber() - { - return _messageId; - } - - public long getArrivalTime() - { - return 0; - } - - public int getContent(ByteBuffer buf, int offset) - { - return 0; - } - - public ByteBuffer getContent(int offset, int length) - { - return null; - } - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java index 63af8d3840..ed6024feb1 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -293,7 +293,7 @@ public class BDBStoreUpgradeTestPreparer connection.close(); } - public static void sendMessages(Session session, MessageProducer messageProducer, + private static void sendMessages(Session session, MessageProducer messageProducer, Destination dest, int deliveryMode, int length, int numMesages) throws JMSException { for (int i = 1; i <= numMesages; i++) @@ -304,7 +304,7 @@ public class BDBStoreUpgradeTestPreparer } } - public static void publishMessages(Session session, TopicPublisher publisher, + private static void publishMessages(Session session, TopicPublisher publisher, Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException { for (int i = 1; i <= numMesages; i++) @@ -322,7 +322,7 @@ public class BDBStoreUpgradeTestPreparer * @param length number of characters in the string * @return string sequence of the given length */ - public static String generateString(int length) + private static String generateString(int length) { char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; char[] chars = new char[length]; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java deleted file mode 100644 index e4837b212e..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ /dev/null @@ -1,501 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - - -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; - -import java.io.File; -import java.io.InputStream; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularDataSupport; - -import org.apache.qpid.management.common.mbeans.ManagedExchange; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tests upgrading a BDB store on broker startup. - * The store will then be used to verify that the upgrade is completed - * properly and that once upgraded it functions as expected. - */ -public class BDBUpgradeTest extends QpidBrokerTestCase -{ - protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class); - - private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024); - private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256); - private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); - - private String _storeLocation; - - @Override - public void setUp() throws Exception - { - assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); - _storeLocation = getWorkDirBaseDir() + File.separator + "test-store"; - - //Clear the two target directories if they exist. - File directory = new File(_storeLocation); - if (directory.exists() && directory.isDirectory()) - { - FileUtils.delete(directory, true); - } - directory.mkdirs(); - - // copy store files - InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb"); - FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); - - getBrokerConfiguration().addJmxManagementConfiguration(); - super.setUp(); - } - - private String getWorkDirBaseDir() - { - return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort()); - } - - /** - * Test that the selector applied to the DurableSubscription was successfully - * transfered to the new store, and functions as expected with continued use - * by monitoring message count while sending new messages to the topic and then - * consuming them. - */ - public void testSelectorDurability() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME); - assertEquals("DurableSubscription backing queue should have 1 message on it initially", - new Integer(1), dursubQueue.getMessageCount()); - - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - // Send messages which don't match and do match the selector, checking message count - TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); - Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME); - TopicPublisher publisher = pubSession.createPublisher(topic); - - BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); - pubSession.commit(); - assertEquals("DurableSubscription backing queue should still have 1 message on it", - Integer.valueOf(1), dursubQueue.getMessageCount()); - - BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); - pubSession.commit(); - assertEquals("DurableSubscription backing queue should now have 2 messages on it", - Integer.valueOf(2), dursubQueue.getMessageCount()); - - TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); - Message m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - pubSession.commit(); - - pubSession.close(); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the DurableSubscription without selector was successfully - * transfered to the new store, and functions as expected with continued use. - */ - public void testDurableSubscriptionWithoutSelector() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); - assertEquals("DurableSubscription backing queue should have 1 message on it initially", - new Integer(1), dursubQueue.getMessageCount()); - - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - // Send new message matching the topic, checking message count - TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(TOPIC_NAME); - TopicPublisher publisher = session.createPublisher(topic); - - BDBStoreUpgradeTestPreparer.publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent"); - session.commit(); - assertEquals("DurableSubscription backing queue should now have 2 messages on it", - Integer.valueOf(2), dursubQueue.getMessageCount()); - - TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME); - Message m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - - session.commit(); - session.close(); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the backing queue for the durable subscription created was successfully - * detected and set as being exclusive during the upgrade process, and that the - * regular queue was not. - */ - public void testQueueExclusivity() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME); - assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive()); - - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); - assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive()); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the upgraded queue continues to function properly when used - * for persistent messaging and restarting the broker. - * - * Sends the new messages to the queue BEFORE consuming those which were - * sent before the upgrade. In doing so, this also serves to test that - * the queue bindings were successfully transitioned during the upgrade. - */ - public void testBindingAndMessageDurabability() throws Exception - { - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(QUEUE_NAME); - MessageProducer messageProducer = session.createProducer(queue); - - // Send a new message - BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1); - - session.close(); - - // Restart the broker - restartBroker(); - - // Drain the queue of all messages - connection = (TopicConnection) getConnection(); - connection.start(); - consumeQueueMessages(connection, true); - } - - /** - * Test that all of the committed persistent messages previously sent to - * the broker are properly received following update of the MetaData and - * Content entries during the store upgrade process. - */ - public void testConsumptionOfUpgradedMessages() throws Exception - { - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - consumeDurableSubscriptionMessages(connection, true); - consumeDurableSubscriptionMessages(connection, false); - consumeQueueMessages(connection, false); - } - - /** - * Tests store migration containing messages for non-existing queue. - * - * @throws Exception - */ - public void testMigrationOfMessagesForNonDurableQueues() throws Exception - { - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - // consume a message for non-existing store - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); - MessageConsumer messageConsumer = session.createConsumer(queue); - - for (int i = 1; i <= 3; i++) - { - Message message = messageConsumer.receive(1000); - assertNotNull("Message was not migrated!", message); - assertTrue("Unexpected message received!", message instanceof TextMessage); - assertEquals("ID property did not match", i, message.getIntProperty("ID")); - } - } - - /** - * Tests store upgrade has maintained the priority queue configuration, - * such that sending messages with priorities out-of-order and then consuming - * them gets the messages back in priority order. - */ - public void testPriorityQueue() throws Exception - { - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - // send some messages to the priority queue - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(PRIORITY_QUEUE_NAME); - MessageProducer producer = session.createProducer(queue); - - producer.setPriority(4); - producer.send(createMessage(1, false, session, producer)); - producer.setPriority(1); - producer.send(createMessage(2, false, session, producer)); - producer.setPriority(9); - producer.send(createMessage(3, false, session, producer)); - session.close(); - - //consume the messages, expected order: msg 3, msg 1, msg 2. - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - - Message msg = consumer.receive(1500); - assertNotNull("expected message was not received", msg); - assertEquals(3, msg.getIntProperty("msg")); - msg = consumer.receive(1500); - assertNotNull("expected message was not received", msg); - assertEquals(1, msg.getIntProperty("msg")); - msg = consumer.receive(1500); - assertNotNull("expected message was not received", msg); - assertEquals(2, msg.getIntProperty("msg")); - } - - /** - * Test that the queue configured to have a DLQ was recovered and has the alternate exchange - * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the - * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ. - * - * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments - * that turned it on for this specific queue. - */ - public void testRecoveryOfQueueWithDLQ() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ - ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE"); - assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType()); - TabularDataSupport bindings = (TabularDataSupport) exchange.bindings(); - assertEquals(1, bindings.size()); - for(Object o : bindings.values()) - { - CompositeData binding = (CompositeData) o; - - String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY); - String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES); - - //Because its a fanout exchange, we just return a single '*' key with all bound queues - assertEquals("unexpected binding key", "*", bindingKey); - assertEquals("unexpected number of queues bound", 1, queueNames.length); - assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]); - } - - //verify the queue exists, has the expected alternate exchange and max delivery count - ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME); - assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange()); - assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount()); - - ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ"); - assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange()); - assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount()); - - String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ"); - assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString)); - } - finally - { - jmxUtils.close(); - } - } - - private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = null; - TopicSubscriber durSub = null; - - if(selector) - { - topic = session.createTopic(SELECTOR_TOPIC_NAME); - durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); - } - else - { - topic = session.createTopic(TOPIC_NAME); - durSub = session.createDurableSubscriber(topic, SUB_NAME); - } - - - // Retrieve the matching message - Message m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - if(selector) - { - assertEquals("Selector property did not match", "true", m.getStringProperty("testprop")); - } - assertEquals("ID property did not match", 1, m.getIntProperty("ID")); - assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText()); - - // Verify that no more messages are received - m = durSub.receive(1000); - assertNull("No more messages should have been recieved", m); - - durSub.close(); - session.close(); - } - - private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(QUEUE_NAME); - - MessageConsumer consumer = session.createConsumer(queue); - Message m; - - // Retrieve the initial pre-upgrade messages - for (int i=1; i <= 5 ; i++) - { - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", i, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); - } - for (int i=1; i <= 5 ; i++) - { - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", i, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText()); - } - - if(extraMessage) - { - //verify that the extra message is received - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", 1, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); - } - - // Verify that no more messages are received - m = consumer.receive(1000); - assertNull("No more messages should have been recieved", m); - - consumer.close(); - session.close(); - } - - private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException - { - Message send = producerSession.createTextMessage("Message: " + msgId); - send.setIntProperty("msg", msgId); - - return send; - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java deleted file mode 100644 index 0e1ef7b25d..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestUtils; - -import com.sleepycat.je.rep.ReplicationConfig; - -/** - * The HA black box tests test the BDB cluster as a opaque unit. Client connects to - * the cluster via a failover url - * - * @see HAClusterWhiteboxTest - */ -public class HAClusterBlackboxTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); - - private static final String VIRTUAL_HOST = "test"; - private static final int NUMBER_OF_NODES = 3; - - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - - private FailoverAwaitingListener _failoverAwaitingListener; - private ConnectionURL _brokerFailoverUrl; - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - - _clusterCreator.configureClusterNodes(); - - _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); - - _clusterCreator.startCluster(); - _failoverAwaitingListener = new FailoverAwaitingListener(); - - super.setUp(); - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testLossOfMasterNodeCausesClientToFailover() throws Exception - { - final Connection connection = getConnection(_brokerFailoverUrl); - - ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); - - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - - _clusterCreator.stopNode(activeBrokerPort); - LOGGER.info("Node is stopped"); - _failoverAwaitingListener.assertFailoverOccurs(20000); - LOGGER.info("Listener has finished"); - // any op to ensure connection remains - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception - { - LOGGER.info("Connecting to " + _brokerFailoverUrl); - final Connection connection = getConnection(_brokerFailoverUrl); - LOGGER.info("Got connection to cluster"); - - ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); - final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port " + activeBrokerPort); - final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); - - LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); - - _clusterCreator.stopNode(inactiveBrokerPort); - - _failoverAwaitingListener.assertFailoverDoesNotOccur(2000); - - // any op to ensure connection remains - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - private final class FailoverAwaitingListener implements ConnectionListener - { - private final CountDownLatch _failoverLatch = new CountDownLatch(1); - - @Override - public boolean preResubscribe() - { - return true; - } - - @Override - public boolean preFailover(boolean redirect) - { - return true; - } - - public void assertFailoverOccurs(long delay) throws InterruptedException - { - if (!_failoverLatch.await(delay, TimeUnit.MILLISECONDS)) - { - LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n"); - } - assertEquals("Failover did not occur", 0, _failoverLatch.getCount()); - } - - public void assertFailoverDoesNotOccur(long delay) throws InterruptedException - { - _failoverLatch.await(delay, TimeUnit.MILLISECONDS); - assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount()); - } - - - @Override - public void failoverComplete() - { - _failoverLatch.countDown(); - } - - @Override - public void bytesSent(long count) - { - } - - @Override - public void bytesReceived(long count) - { - } - } - -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java deleted file mode 100644 index 408643b98a..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.util.Set; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.URLSyntaxException; - -/** - * The HA white box tests test the BDB cluster where the test retains the knowledge of the - * individual test nodes. It uses this knowledge to examine the nodes to ensure that they - * remain in the correct state throughout the test. - * - * @see HAClusterBlackboxTest - */ -public class HAClusterWhiteboxTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); - - private static final String VIRTUAL_HOST = "test"; - - private final int NUMBER_OF_NODES = 3; - private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - - @Override - protected void setUp() throws Exception - { - _brokerType = BrokerType.SPAWNED; - - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); - - _clusterCreator.configureClusterNodes(); - _clusterCreator.startCluster(); - - super.setUp(); - } - - @Override - public void startBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } - - public void testClusterPermitsConnectionToOnlyOneNode() throws Exception - { - int connectionSuccesses = 0; - int connectionFails = 0; - - for (int brokerPortNumber : getBrokerPortNumbers()) - { - try - { - getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); - connectionSuccesses++; - } - catch(JMSException e) - { - assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); - connectionFails++; - } - } - - assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); - assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); - } - - public void testClusterThatLosesNodeStillAllowsConnection() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - assertNotNull(initialConnection); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - - // verify that JMS persistence operations are working - assertProducingConsuming(subsequentConnection); - - closeConnection(initialConnection); - } - - public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - assertNotNull(initialConnection); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); - - killBroker(subsequentPortNumber); - - final Connection finalConnection = getConnectionToNodeInCluster(); - assertNull(finalConnection); - - closeConnection(initialConnection); - } - - public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception - { - final Connection connection = getConnectionToNodeInCluster(); - assertNotNull(connection); - - final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); - connection.close(); - - _clusterCreator.stopNode(brokerPortNumber); - _clusterCreator.startNode(brokerPortNumber); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - assertNotNull(subsequentConnection); - } - - public void testClusterLosingNodeRetainsData() throws Exception - { - final Connection initialConnection = getConnectionToNodeInCluster(); - - final String queueNamePrefix = getTestQueueName(); - final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; - final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; - - populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - - closeConnectionAndKillBroker(initialConnection); - - final Connection subsequentConnection = getConnectionToNodeInCluster(); - - assertNotNull("no valid connection obtained", subsequentConnection); - - checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - } - - public void xtestRecoveryOfOutOfDateNode() throws Exception - { - /* - * TODO: Implement - * - * Cant yet find a way to control cleaning in a deterministic way to allow provoking - * a node to become out of date. We do now know that even a new joiner to the group - * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has - * done *any* cleaning and then adding a new node should be sufficient to cause this. - */ - } - - private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception - { - populateBrokerWithData(connection, 1, queueUrls); - } - - private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception - { - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - for (final String queueUrl : queueUrls) - { - final Queue queue = session.createQueue(queueUrl); - session.createConsumer(queue).close(); - sendMessage(session, queue, noOfMessages); - } - } - - private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException - { - connection.start(); - final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - for (final String queueUrl : queueUrls) - { - final Queue queue = session.createQueue(queueUrl); - final MessageConsumer consumer = session.createConsumer(queue); - final Message message = consumer.receive(1000); - session.commit(); - assertNotNull("Queue " + queue + " should have message", message); - assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); - } - } - - private Connection getConnectionToNodeInCluster() throws URLSyntaxException - { - Connection connection = null; - Set runningBrokerPorts = getBrokerPortNumbers(); - - for (int brokerPortNumber : runningBrokerPorts) - { - try - { - connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); - break; - } - catch(JMSException je) - { - assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); - } - } - return connection; - } - - private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception - { - final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - initialConnection.close(); - - killBroker(initialPortNumber); // kill awaits the death of the child - } - - private void assertProducingConsuming(final Connection connection) throws JMSException, Exception - { - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination = session.createQueue(getTestQueueName()); - MessageConsumer consumer = session.createConsumer(destination); - sendMessage(session, destination, 2); - connection.start(); - Message m1 = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message 1 is not received", m1); - assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); - Message m2 = consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message 2 is not received", m2); - assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX)); - session.commit(); - } - - private void closeConnection(final Connection initialConnection) - { - try - { - initialConnection.close(); - } - catch(Exception e) - { - // ignore. - // java.net.SocketException is seen sometimes on active connection - } - } -} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java deleted file mode 100644 index 353c3a0ec5..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; - -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.test.utils.TestBrokerConfiguration; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.URLSyntaxException; - -public class HATestClusterCreator -{ - protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); - - private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; - private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - - private static final int FAILOVER_CYCLECOUNT = 10; - private static final int FAILOVER_RETRIES = 1; - private static final int FAILOVER_CONNECTDELAY = 1000; - - private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; - private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; - - private static final int RETRIES = 60; - private static final int CONNECTDELAY = 75; - - private final QpidBrokerTestCase _testcase; - private final Map _brokerPortToBdbPortMap = new HashMap(); - private final Map _brokerConfigurations = new TreeMap(); - private final String _virtualHostName; - private final String _vhostStoreConfigKeyPrefix; - - private final String _ipAddressOfBroker; - private final String _groupName ; - private final int _numberOfNodes; - private int _bdbHelperPort; - private int _primaryBrokerPort; - private String _vhostConfigKeyPrefix; - - public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) - { - _testcase = testcase; - _virtualHostName = virtualHostName; - _groupName = "group" + _testcase.getName(); - _ipAddressOfBroker = getIpAddressOfBrokerHost(); - _numberOfNodes = numberOfNodes; - _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + "."; - _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store."; - _bdbHelperPort = 0; - } - - public void configureClusterNodes() throws Exception - { - int brokerPort = _testcase.findFreePort(); - - for (int i = 0; i < _numberOfNodes; i++) - { - int bdbPort = _testcase.getNextAvailable(brokerPort + 1); - _brokerPortToBdbPortMap.put(brokerPort, bdbPort); - - LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); - if (_bdbHelperPort == 0) - { - _bdbHelperPort = bdbPort; - } - - configureClusterNode(brokerPort, bdbPort); - TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); - brokerConfiguration.addJmxManagementConfiguration(); - collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts()); - - brokerPort = _testcase.getNextAvailable(bdbPort + 1); - } - } - - public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception - { - if (_numberOfNodes != 2) - { - throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); - } - - final Entry brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next(); - final String configKey = getConfigKey("highAvailability.designatedPrimary"); - brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary)); - _primaryBrokerPort = brokerConfigEntry.getKey(); - } - - /** - * @param configKeySuffix "highAvailability.designatedPrimary", for example - * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example - */ - private String getConfigKey(String configKeySuffix) - { - final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts."); - return configKey; - } - - public void startNode(final int brokerPortNumber) throws Exception - { - final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); - - _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts()); - - _testcase.startBroker(brokerPortNumber); - } - - public void startCluster() throws Exception - { - for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) - { - startNode(brokerPortNumber); - } - } - - public void startClusterParallel() throws Exception - { - final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size()); - try - { - List> brokers = new CopyOnWriteArrayList>(); - for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) - { - final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); - Future future = executor.submit(new Callable() - { - public Object call() - { - try - { - _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(), - brokerConfigHolder.getTestVirtualhosts()); - return "OK"; - } - catch (Exception e) - { - return e; - } - } - }); - brokers.add(future); - } - for (Future future : brokers) - { - Object result = future.get(30, TimeUnit.SECONDS); - LOGGER.debug("Node startup result:" + result); - if (result instanceof Exception) - { - throw (Exception) result; - } - else if (!"OK".equals(result)) - { - throw new Exception("One of the cluster nodes is not started"); - } - } - } - catch (Exception e) - { - stopCluster(); - throw e; - } - finally - { - executor.shutdown(); - } - - } - - public void stopNode(final int brokerPortNumber) - { - _testcase.killBroker(brokerPortNumber); - } - - public void stopCluster() throws Exception - { - for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) - { - try - { - stopNode(brokerPortNumber); - } - catch(Exception e) - { - LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); - } - } - } - - public int getBrokerPortNumberFromConnection(Connection connection) - { - final AMQConnection amqConnection = (AMQConnection)connection; - return amqConnection.getActiveBrokerDetails().getPort(); - } - - public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) - { - final Set allBrokerPorts = _testcase.getBrokerPortNumbers(); - LOGGER.debug("Broker ports:" + allBrokerPorts); - final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); - allBrokerPorts.remove(activeBrokerPort); - LOGGER.debug("Broker ports:" + allBrokerPorts); - final int inactiveBrokerPort = allBrokerPorts.iterator().next(); - return inactiveBrokerPort; - } - - public int getBdbPortForBrokerPort(final int brokerPortNumber) - { - return _brokerPortToBdbPortMap.get(brokerPortNumber); - } - - public Set getBdbPortNumbers() - { - return new HashSet(_brokerPortToBdbPortMap.values()); - } - - public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception - { - final StringBuilder brokerList = new StringBuilder(); - - for(Iterator itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) - { - int brokerPortNumber = itr.next(); - - brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); - if (itr.hasNext()) - { - brokerList.append(";"); - } - } - - return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); - } - - public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException - { - return getConnectionUrlForSingleNode(brokerPortNumber, false); - } - - public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException - { - return getConnectionUrlForSingleNode(brokerPortNumber, true); - } - - private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException - { - final String url; - if (retryAllowed) - { - url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); - } - else - { - url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); - } - - return new AMQConnectionURL(url); - } - - public String getGroupName() - { - return _groupName; - } - - public String getNodeNameForNodeAt(final int bdbPort) - { - return "node" + _testcase.getName() + bdbPort; - } - - public String getNodeHostPortForNodeAt(final int bdbPort) - { - return _ipAddressOfBroker + ":" + bdbPort; - } - - public String getHelperHostPort() - { - if (_bdbHelperPort == 0) - { - throw new IllegalStateException("Helper port not yet assigned."); - } - - return _ipAddressOfBroker + ":" + _bdbHelperPort; - } - - public void setHelperHostPort(int bdbHelperPort) - { - _bdbHelperPort = bdbHelperPort; - } - - public int getBrokerPortNumberOfPrimary() - { - if (_numberOfNodes != 2) - { - throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); - } - - return _primaryBrokerPort; - } - - public int getBrokerPortNumberOfSecondaryNode() - { - final Set portNumbers = getBrokerPortNumbersForNodes(); - portNumbers.remove(getBrokerPortNumberOfPrimary()); - return portNumbers.iterator().next(); - } - - public Set getBrokerPortNumbersForNodes() - { - return new HashSet(_brokerConfigurations.keySet()); - } - - private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception - { - final String nodeName = getNodeNameForNodeAt(bdbPort); - - - _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); - - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); - } - - public String getIpAddressOfBrokerHost() - { - String brokerHost = _testcase.getBroker().getHost(); - try - { - return InetAddress.getByName(brokerHost).getHostAddress(); - } - catch (UnknownHostException e) - { - throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); - } - } - - private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) - { - _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration, - (XMLConfiguration) testVirtualhosts.clone())); - } - - public class BrokerConfigHolder - { - private final TestBrokerConfiguration _testConfiguration; - private final XMLConfiguration _testVirtualhosts; - - public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) - { - _testConfiguration = testConfiguration; - _testVirtualhosts = testVirtualhosts; - } - - public TestBrokerConfiguration getTestConfiguration() - { - return _testConfiguration; - } - - public XMLConfiguration getTestVirtualhosts() - { - return _testVirtualhosts; - } - } - - public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) - { - final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved); - final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts(); - - final String configKey = getConfigKey("highAvailability.nodeHostPort"); - final String oldBdbHostPort = virtualHostConfig.getString(configKey); - - final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); - final String oldHost = oldHostAndPort[0]; - - final String newBdbHostPort = oldHost + ":" + newBdbPort; - - virtualHostConfig.setProperty(configKey, newBdbHostPort); - collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); - } - - public String getStoreConfigKeyPrefix() - { - return _vhostStoreConfigKeyPrefix; - } - - -} diff --git a/qpid/java/bdbstore/systests/build.xml b/qpid/java/bdbstore/systests/build.xml new file mode 100644 index 0000000000..ca809ad0cb --- /dev/null +++ b/qpid/java/bdbstore/systests/build.xml @@ -0,0 +1,28 @@ + + + + + + + + + diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java new file mode 100644 index 0000000000..7c04d83e79 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.Piper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; + +/** + * Tests the BDB backup script can successfully perform a backup and that + * backup can be restored and used by the Broker. + */ +public class BDBBackupTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class); + + private static final String BACKUP_SCRIPT = "/bin/backup.sh"; + private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed"; + + private static final String TEST_VHOST = "test"; + private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir"); + + private File _backupToDir; + private File _backupFromDir; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); + _backupToDir.mkdirs(); + + final String qpidWork = getBroker(DEFAULT_PORT).getWorkingDirectory(); + + // It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...") + // but the config as known to QBTC does not pull-in the virtualhost section from its separate source file + _backupFromDir = new File(qpidWork + File.separator + TEST_VHOST + "-store"); + boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); + assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); + } + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + FileUtils.delete(_backupToDir, true); + } + } + + public void testBackupAndRestoreMaintainsMessages() throws Exception + { + sendNumberedMessages(0, 10); + invokeBdbBackup(_backupFromDir, _backupToDir); + sendNumberedMessages(10, 20); + confirmBrokerHasMessages(0, 20); + stopBroker(); + + deleteStore(_backupFromDir); + replaceStoreWithBackup(_backupToDir, _backupFromDir); + + startBroker(); + confirmBrokerHasMessages(0, 10); + } + + private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(getTestQueueName()); + // Create queue by consumer side-effect + session.createConsumer(destination).close(); + + final int numOfMessages = endIndex - startIndex; + final int batchSize = 0; + sendMessage(session, destination, numOfMessages, startIndex, batchSize); + con.close(); + } + + private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + con.start(); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = startIndex; i < endIndex; i++) + { + Message msg = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " not received", msg); + assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); + } + + Message msg = consumer.receive(100); + if(msg != null) + { + fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + } + con.close(); + } + + private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception + { + if (String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows")) + { + BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()}); + } + else + { + runBdbBackupScript(backupFromDir, backupToDir); + } + } + + private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException, + InterruptedException + { + Process backupProcess = null; + try + { + String qpidHome = System.getProperty(QPID_HOME); + ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()); + pb.redirectErrorStream(true); + Map env = pb.environment(); + env.put(QPID_HOME, qpidHome); + + LOGGER.debug("Backup command is " + pb.command()); + backupProcess = pb.start(); + Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE); + piper.start(); + piper.await(2, TimeUnit.SECONDS); + backupProcess.waitFor(); + piper.join(); + + LOGGER.debug("Backup command completed " + backupProcess.exitValue()); + assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue()); + } + finally + { + if (backupProcess != null) + { + backupProcess.getErrorStream().close(); + backupProcess.getInputStream().close(); + backupProcess.getOutputStream().close(); + } + } + } + + private void replaceStoreWithBackup(File source, File dst) throws Exception + { + LOGGER.debug("Copying store " + source + " to " + dst); + FileUtils.copyRecursive(source, dst); + } + + private void deleteStore(File storeDir) + { + LOGGER.debug("Deleting store " + storeDir); + FileUtils.delete(storeDir, true); + } + +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java new file mode 100644 index 0000000000..76b990038d --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -0,0 +1,612 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; +import org.apache.qpid.server.store.MessageStoreTest; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; + +/** + * Subclass of MessageStoreTest which runs the standard tests from the superclass against + * the BDB Store as well as additional tests specific to the BDB store-implementation. + */ +public class BDBMessageStoreTest extends MessageStoreTest +{ + private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + /** + * Tests that message metadata and content are successfully read back from a + * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to + * verify their ability to co-exist within the store and be successful retrieved. + */ + public void testBDBMessagePersistence() throws Exception + { + MessageStore store = getVirtualHost().getMessageStore(); + + AbstractBDBMessageStore bdbStore = assertBDBStore(store); + + // Create content ByteBuffers. + // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. + // Use a single chunk for the 0-10 message as per broker behaviour. + String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; + + ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); + ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); + + ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); + int bodySize = completeContentBody_0_10.limit(); + + /* + * Create and insert a 0-8 message (metadata and multi-chunk content) + */ + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); + StoredMessage storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); + + long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + storedMessage_0_8.addContent(0, firstContentBytes_0_8); + storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); + storedMessage_0_8.flushToStore(); + + /* + * Create and insert a 0-10 message (metadata and content) + */ + MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); + DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); + Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); + + MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); + + MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); + StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); + + long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); + long messageid_0_10 = storedMessage_0_10.getMessageNumber(); + + storedMessage_0_10.addContent(0, completeContentBody_0_10); + storedMessage_0_10.flushToStore(); + + /* + * reload the store only (read-only) + */ + AbstractBDBMessageStore readOnlyStore = reloadStore(bdbStore); + + /* + * Read back and validate the 0-8 message metadata and content + */ + StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8); + + assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); + assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); + MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; + + assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); + + MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); + assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); + assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); + assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); + assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); + + ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); + assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); + assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); + assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); + + BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties(); + assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); + assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); + + ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; + long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8); + assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); + String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); + + /* + * Read back and validate the 0-10 message metadata and content + */ + StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10); + + assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); + assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); + MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; + + assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); + + DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); + assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); + assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); + assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); + assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); + assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); + assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); + + MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); + assertNotNull("MessageProperties were not returned", returnedMsgProps); + assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); + assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); + assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); + + ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; + long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent); + assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); + + String returnedPayloadString_0_10 = new String(recoveredContent.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); + + readOnlyStore.close(); + } + + private DeliveryProperties createDeliveryProperties_0_10() + { + DeliveryProperties delProps_0_10 = new DeliveryProperties(); + + delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); + delProps_0_10.setImmediate(true); + delProps_0_10.setExchange("exchange12345"); + delProps_0_10.setRoutingKey("routingKey12345"); + delProps_0_10.setExpiration(5); + delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); + + return delProps_0_10; + } + + private MessageProperties createMessageProperties_0_10(int bodySize) + { + MessageProperties msgProps_0_10 = new MessageProperties(); + msgProps_0_10.setContentLength(bodySize); + msgProps_0_10.setCorrelationId("qwerty".getBytes()); + msgProps_0_10.setContentType("text/html"); + + return msgProps_0_10; + } + + /** + * Close the provided store and create a new (read-only) store to read back the data. + * + * Use this method instead of reloading the virtual host like other tests in order + * to avoid the recovery handler deleting the message for not being on a queue. + */ + private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception + { + messageStore.close(); + + AbstractBDBMessageStore newStore = new BDBMessageStore(); + newStore.configure(getVirtualHostModel(),true); + + newStore.startWithNoRecover(); + + return newStore; + } + + private MessagePublishInfo createPublishInfoBody_0_8() + { + return new MessagePublishInfo() + { + public AMQShortString getExchange() + { + return new AMQShortString("exchange12345"); + } + + public void setExchange(AMQShortString exchange) + { + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return true; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString("routingKey12345"); + } + }; + + } + + private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); + return new ContentHeaderBody(classForBasic, 1, props, length); + } + + private BasicContentHeaderProperties createContentHeaderProperties_0_8() + { + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); + props.setContentType("text/html"); + props.getHeaders().setString("Test", "MST"); + return props; + } + + public void testGetContentWithOffset() throws Exception + { + MessageStore store = getVirtualHost().getMessageStore(); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); + StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + // normal case: offset is 0 + ByteBuffer dst = ByteBuffer.allocate(10); + int length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", CONTENT_BYTES.length, length); + byte[] array = dst.array(); + assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); + + // offset is in the middle + dst = ByteBuffer.allocate(10); + length = bdbStore.getContent(messageid_0_8, 5, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + byte[] expected = new byte[10]; + System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // offset beyond the content length + dst = ByteBuffer.allocate(10); + try + { + bdbStore.getContent(messageid_0_8, 15, dst); + fail("Should fail for the offset greater than message size"); + } + catch (RuntimeException e) + { + assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " + + messageid_0_8 + "!", e.getMessage()); + } + + // buffer is smaller then message size + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // buffer is smaller then message size, offset is not 0 + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 2, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + } + /** + * Tests that messages which are added to the store and then removed using the + * public MessageStore interfaces are actually removed from the store by then + * interrogating the store with its own implementation methods and verifying + * expected exceptions are thrown to indicate the message is not present. + */ + public void testMessageCreationAndRemoval() throws Exception + { + MessageStore store = getVirtualHost().getMessageStore(); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); + + StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + bdbStore.removeMessage(messageid_0_8, true); + + //verify the removal using the BDB store implementation methods directly + try + { + // the next line should throw since the message id should not be found + bdbStore.getMessageMetaData(messageid_0_8); + fail("No exception thrown when message id not found getting metadata"); + } + catch (AMQStoreException e) + { + // pass since exception expected + } + + //expecting no content, allocate a 1 byte + ByteBuffer dst = ByteBuffer.allocate(1); + + assertEquals("Retrieved content when none was expected", + 0, bdbStore.getContent(messageid_0_8, 0, dst)); + } + private AbstractBDBMessageStore assertBDBStore(MessageStore store) + { + + assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); + + return (AbstractBDBMessageStore) store; + } + + private StoredMessage createAndStoreSingleChunkMessage_0_8(MessageStore store) + { + ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); + + int bodySize = CONTENT_BYTES.length; + + //create and store the message using the MessageStore interface + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); + StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); + + storedMessage_0_8.addContent(0, chunk1); + storedMessage_0_8.flushToStore(); + + return storedMessage_0_8; + } + + /** + * Tests transaction commit by utilising the enqueue and dequeue methods available + * in the TransactionLog interface implemented by the store, and verifying the + * behaviour using BDB implementation methods. + */ + public void testTranCommit() throws Exception + { + MessageStore log = getVirtualHost().getMessageStore(); + + AbstractBDBMessageStore bdbStore = assertBDBStore(log); + + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = new TransactionLogResource() + { + @Override + public UUID getId() + { + return mockQueueId; + } + }; + + Transaction txn = log.newTransaction(); + + txn.enqueueMessage(mockQueue, new MockMessage(1L)); + txn.enqueueMessage(mockQueue, new MockMessage(5L)); + txn.commitTran(); + + List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + Long val = enqueuedIds.get(0); + assertEquals("First Message is incorrect", 1L, val.longValue()); + val = enqueuedIds.get(1); + assertEquals("Second Message is incorrect", 5L, val.longValue()); + } + + + /** + * Tests transaction rollback before a commit has occurred by utilising the + * enqueue and dequeue methods available in the TransactionLog interface + * implemented by the store, and verifying the behaviour using BDB + * implementation methods. + */ + public void testTranRollbackBeforeCommit() throws Exception + { + MessageStore log = getVirtualHost().getMessageStore(); + + AbstractBDBMessageStore bdbStore = assertBDBStore(log); + + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = new TransactionLogResource() + { + @Override + public UUID getId() + { + return mockQueueId; + } + }; + + Transaction txn = log.newTransaction(); + + txn.enqueueMessage(mockQueue, new MockMessage(21L)); + txn.abortTran(); + + txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, new MockMessage(22L)); + txn.enqueueMessage(mockQueue, new MockMessage(23L)); + txn.commitTran(); + + List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + Long val = enqueuedIds.get(0); + assertEquals("First Message is incorrect", 22L, val.longValue()); + val = enqueuedIds.get(1); + assertEquals("Second Message is incorrect", 23L, val.longValue()); + } + + public void testOnDelete() throws Exception + { + MessageStore log = getVirtualHost().getMessageStore(); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); + String storeLocation = bdbStore.getStoreLocation(); + + File location = new File(storeLocation); + assertTrue("Store does not exist at " + storeLocation, location.exists()); + + bdbStore.close(); + assertTrue("Store does not exist at " + storeLocation, location.exists()); + + bdbStore.onDelete(); + assertFalse("Store exists at " + storeLocation, location.exists()); + } + + /** + * Tests transaction rollback after a commit has occurred by utilising the + * enqueue and dequeue methods available in the TransactionLog interface + * implemented by the store, and verifying the behaviour using BDB + * implementation methods. + */ + public void testTranRollbackAfterCommit() throws Exception + { + MessageStore log = getVirtualHost().getMessageStore(); + + AbstractBDBMessageStore bdbStore = assertBDBStore(log); + + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = new TransactionLogResource() + { + @Override + public UUID getId() + { + return mockQueueId; + } + }; + + Transaction txn = log.newTransaction(); + + txn.enqueueMessage(mockQueue, new MockMessage(30L)); + txn.commitTran(); + + txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, new MockMessage(31L)); + txn.abortTran(); + + txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, new MockMessage(32L)); + txn.commitTran(); + + List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + Long val = enqueuedIds.get(0); + assertEquals("First Message is incorrect", 30L, val.longValue()); + val = enqueuedIds.get(1); + assertEquals("Second Message is incorrect", 32L, val.longValue()); + } + + @SuppressWarnings("rawtypes") + private static class MockMessage implements ServerMessage, EnqueableMessage + { + private long _messageId; + + public MockMessage(long messageId) + { + _messageId = messageId; + } + + public String getRoutingKey() + { + return null; + } + + public AMQMessageHeader getMessageHeader() + { + return null; + } + + public StoredMessage getStoredMessage() + { + return null; + } + + public boolean isPersistent() + { + return true; + } + + public long getSize() + { + return 0; + } + + public boolean isImmediate() + { + return false; + } + + public long getExpiration() + { + return 0; + } + + public MessageReference newReference() + { + return null; + } + + public long getMessageNumber() + { + return _messageId; + } + + public long getArrivalTime() + { + return 0; + } + + public int getContent(ByteBuffer buf, int offset) + { + return 0; + } + + public ByteBuffer getContent(int offset, int length) + { + return null; + } + } +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java new file mode 100644 index 0000000000..755168ca9c --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -0,0 +1,544 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.InputStream; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests upgrading a BDB store on broker startup. + * The store will then be used to verify that the upgrade is completed + * properly and that once upgraded it functions as expected. + * + * Store prepared using old client/broker with BDBStoreUpgradeTestPreparer. + */ +public class BDBUpgradeTest extends QpidBrokerTestCase +{ + protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class); + + private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); + + private static final String STRING_1024 = generateString(1024); + private static final String STRING_1024_256 = generateString(1024*256); + + private static final String TOPIC_NAME="myUpgradeTopic"; + private static final String SUB_NAME="myDurSubName"; + private static final String SELECTOR_SUB_NAME="mySelectorDurSubName"; + private static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic"; + private static final String QUEUE_NAME="myUpgradeQueue"; + private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; + private static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; + private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; + + private String _storeLocation; + + @Override + public void setUp() throws Exception + { + assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); + _storeLocation = getWorkDirBaseDir() + File.separator + "test-store"; + + //Clear the two target directories if they exist. + File directory = new File(_storeLocation); + if (directory.exists() && directory.isDirectory()) + { + FileUtils.delete(directory, true); + } + directory.mkdirs(); + + // copy store files + InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb"); + FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); + + getBrokerConfiguration().addJmxManagementConfiguration(); + super.setUp(); + } + + private String getWorkDirBaseDir() + { + return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort()); + } + + /** + * Test that the selector applied to the DurableSubscription was successfully + * transfered to the new store, and functions as expected with continued use + * by monitoring message count while sending new messages to the topic and then + * consuming them. + */ + public void testSelectorDurability() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME); + assertEquals("DurableSubscription backing queue should have 1 message on it initially", + new Integer(1), dursubQueue.getMessageCount()); + + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + // Send messages which don't match and do match the selector, checking message count + TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME); + TopicPublisher publisher = pubSession.createPublisher(topic); + + publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should still have 1 message on it", + Integer.valueOf(1), dursubQueue.getMessageCount()); + + publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should now have 2 messages on it", + Integer.valueOf(2), dursubQueue.getMessageCount()); + + TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + pubSession.commit(); + + pubSession.close(); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the DurableSubscription without selector was successfully + * transfered to the new store, and functions as expected with continued use. + */ + public void testDurableSubscriptionWithoutSelector() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertEquals("DurableSubscription backing queue should have 1 message on it initially", + new Integer(1), dursubQueue.getMessageCount()); + + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + // Send new message matching the topic, checking message count + TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + TopicPublisher publisher = session.createPublisher(topic); + + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent"); + session.commit(); + assertEquals("DurableSubscription backing queue should now have 2 messages on it", + Integer.valueOf(2), dursubQueue.getMessageCount()); + + TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME); + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + + session.commit(); + session.close(); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the backing queue for the durable subscription created was successfully + * detected and set as being exclusive during the upgrade process, and that the + * regular queue was not. + */ + public void testQueueExclusivity() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME); + assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive()); + + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive()); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the upgraded queue continues to function properly when used + * for persistent messaging and restarting the broker. + * + * Sends the new messages to the queue BEFORE consuming those which were + * sent before the upgrade. In doing so, this also serves to test that + * the queue bindings were successfully transitioned during the upgrade. + */ + public void testBindingAndMessageDurabability() throws Exception + { + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageProducer messageProducer = session.createProducer(queue); + + // Send a new message + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1); + + session.close(); + + // Restart the broker + restartBroker(); + + // Drain the queue of all messages + connection = (TopicConnection) getConnection(); + connection.start(); + consumeQueueMessages(connection, true); + } + + /** + * Test that all of the committed persistent messages previously sent to + * the broker are properly received following update of the MetaData and + * Content entries during the store upgrade process. + */ + public void testConsumptionOfUpgradedMessages() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + consumeDurableSubscriptionMessages(connection, true); + consumeDurableSubscriptionMessages(connection, false); + consumeQueueMessages(connection, false); + } + + /** + * Tests store migration containing messages for non-existing queue. + * + * @throws Exception + */ + public void testMigrationOfMessagesForNonDurableQueues() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // consume a message for non-existing store + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); + MessageConsumer messageConsumer = session.createConsumer(queue); + + for (int i = 1; i <= 3; i++) + { + Message message = messageConsumer.receive(1000); + assertNotNull("Message was not migrated!", message); + assertTrue("Unexpected message received!", message instanceof TextMessage); + assertEquals("ID property did not match", i, message.getIntProperty("ID")); + } + } + + /** + * Tests store upgrade has maintained the priority queue configuration, + * such that sending messages with priorities out-of-order and then consuming + * them gets the messages back in priority order. + */ + public void testPriorityQueue() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // send some messages to the priority queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(PRIORITY_QUEUE_NAME); + MessageProducer producer = session.createProducer(queue); + + producer.setPriority(4); + producer.send(createMessage(1, false, session, producer)); + producer.setPriority(1); + producer.send(createMessage(2, false, session, producer)); + producer.setPriority(9); + producer.send(createMessage(3, false, session, producer)); + session.close(); + + //consume the messages, expected order: msg 3, msg 1, msg 2. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + Message msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(2, msg.getIntProperty("msg")); + } + + /** + * Test that the queue configured to have a DLQ was recovered and has the alternate exchange + * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the + * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ. + * + * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments + * that turned it on for this specific queue. + */ + public void testRecoveryOfQueueWithDLQ() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ + ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE"); + assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType()); + TabularDataSupport bindings = (TabularDataSupport) exchange.bindings(); + assertEquals(1, bindings.size()); + for(Object o : bindings.values()) + { + CompositeData binding = (CompositeData) o; + + String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY); + String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES); + + //Because its a fanout exchange, we just return a single '*' key with all bound queues + assertEquals("unexpected binding key", "*", bindingKey); + assertEquals("unexpected number of queues bound", 1, queueNames.length); + assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]); + } + + //verify the queue exists, has the expected alternate exchange and max delivery count + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME); + assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount()); + + ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ"); + assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount()); + + String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ"); + assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString)); + } + finally + { + jmxUtils.close(); + } + } + + private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = null; + TopicSubscriber durSub = null; + + if(selector) + { + topic = session.createTopic(SELECTOR_TOPIC_NAME); + durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); + } + else + { + topic = session.createTopic(TOPIC_NAME); + durSub = session.createDurableSubscriber(topic, SUB_NAME); + } + + + // Retrieve the matching message + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + if(selector) + { + assertEquals("Selector property did not match", "true", m.getStringProperty("testprop")); + } + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", generateString(1024) , ((TextMessage)m).getText()); + + // Verify that no more messages are received + m = durSub.receive(1000); + assertNull("No more messages should have been recieved", m); + + durSub.close(); + session.close(); + } + + private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + + MessageConsumer consumer = session.createConsumer(queue); + Message m; + + // Retrieve the initial pre-upgrade messages + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText()); + } + + if(extraMessage) + { + //verify that the extra message is received + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + + // Verify that no more messages are received + m = consumer.receive(1000); + assertNull("No more messages should have been recieved", m); + + consumer.close(); + session.close(); + } + + private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msgId); + send.setIntProperty("msg", msgId); + + return send; + } + + /** + * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. + * + * @param length number of characters in the string + * @return string sequence of the given length + */ + private static String generateString(int length) + { + char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; + char[] chars = new char[length]; + for (int i = 0; i < (length); i++) + { + chars[i] = base_chars[i % 10]; + } + return new String(chars); + } + + private static void sendMessages(Session session, MessageProducer messageProducer, + Destination dest, int deliveryMode, int length, int numMesages) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + private static void publishMessages(Session session, TopicPublisher publisher, + Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + message.setStringProperty("testprop", selectorProperty); + publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java new file mode 100644 index 0000000000..0464269efc --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestUtils; + +/** + * The HA black box tests test the BDB cluster as a opaque unit. Client connects to + * the cluster via a failover url + * + * @see HAClusterWhiteboxTest + */ +public class HAClusterBlackboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final int NUMBER_OF_NODES = 3; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + private FailoverAwaitingListener _failoverAwaitingListener; + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + + _clusterCreator.startCluster(); + _failoverAwaitingListener = new FailoverAwaitingListener(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testLossOfMasterNodeCausesClientToFailover() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + _clusterCreator.stopNode(activeBrokerPort); + LOGGER.info("Node is stopped"); + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception + { + LOGGER.info("Connecting to " + _brokerFailoverUrl); + final Connection connection = getConnection(_brokerFailoverUrl); + LOGGER.info("Got connection to cluster"); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + + LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); + + _clusterCreator.stopNode(inactiveBrokerPort); + + _failoverAwaitingListener.assertFailoverDoesNotOccur(2000); + + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private final class FailoverAwaitingListener implements ConnectionListener + { + private final CountDownLatch _failoverLatch = new CountDownLatch(1); + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + public void assertFailoverOccurs(long delay) throws InterruptedException + { + if (!_failoverLatch.await(delay, TimeUnit.MILLISECONDS)) + { + LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n"); + } + assertEquals("Failover did not occur", 0, _failoverLatch.getCount()); + } + + public void assertFailoverDoesNotOccur(long delay) throws InterruptedException + { + _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount()); + } + + + @Override + public void failoverComplete() + { + _failoverLatch.countDown(); + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + } + +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java new file mode 100644 index 0000000000..0e25c4e17a --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.Connection; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.EnvironmentFailureException; + +/** + * System test verifying the ability to control a cluster via the Management API. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterManagementTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); + + private static final Set NON_MASTER_STATES = new HashSet(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); + private static final int NUMBER_OF_NODES = 4; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + _clusterCreator.configureClusterNodes(); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testReadonlyMBeanAttributes() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); + assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); + assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); + assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); + // As we have chosen an arbitrary broker from the cluster, we cannot predict its state + assertNotNull("Store state must not be null", storeBean.getNodeState()); + } + + public void testStateOfActiveBrokerIsMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); + assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); + } + + public void testStateOfNonActiveBrokerIsNotMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + final String nodeState = storeBean.getNodeState(); + assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); + } + + public void testGroupMembers() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); + + final TabularData groupMembers = storeBean.getAllNodesInGroup(); + assertNotNull(groupMembers); + + for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) + { + final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); + final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); + + CompositeData row = groupMembers.get(new Object[] {nodeName}); + assertNotNull("Table does not contain row for node name " + nodeName, row); + assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + } + + public void testRemoveNodeFromGroup() throws Exception + { + final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); + + final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); + _clusterCreator.stopNode(brokerPortNumberToBeRemoved); + storeBean.removeNodeFromGroup(removedNodeName); + + final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); + } + + /** + * Updates the address of a node. + * + * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit + * assert. + * + * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case + */ + public void testUpdateAddress() throws Exception + { + final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + _clusterCreator.startNode(brokerPortNumberToBeMoved); + } + + /** + * @see #testUpdateAddress() + */ + public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception + { + final Iterator brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + // now deliberately don't call updateAddress + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + try + { + _clusterCreator.startNode(brokerPortNumberToBeMoved); + fail("Exception not thrown"); + } + catch(RuntimeException rte) + { + //check cause was BDBs EnvironmentFailureException + assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName())); + // PASS + } + } + + public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + + ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + + try + { + inactiveBroker.createNewQueue(getTestQueueName(), null, true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message); + } + + try + { + inactiveBroker.createNewExchange(getName(), "direct", true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message); + } + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception + { + _jmxUtils.open(brokerPortNumber); + + return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + } + + private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception + { + _jmxUtils.open(brokerPortNumber); + + return _jmxUtils.getManagedBroker(VIRTUAL_HOST); + } + + private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception + { + long totalTimeWaited = 0l; + long waitInterval = 100l; + long maxWaitTime = 10000; + + int currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime) + { + LOGGER.debug("Still awaiting nodes to join group; expecting " + + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes + + " after " + totalTimeWaited + " ms."); + + totalTimeWaited += waitInterval; + Thread.sleep(waitInterval); + + currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + } + + assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms", + expectedNumberOfNodes ,currentNumberOfNodes); + } +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java new file mode 100644 index 0000000000..95626f7fa5 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.management.ObjectName; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class HAClusterTwoNodeTest extends QpidBrokerTestCase +{ + private static final long RECEIVE_TIMEOUT = 5000l; + + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); + private static final int NUMBER_OF_NODES = 2; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + private void startCluster(boolean designedPrimary) throws Exception + { + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix(); + + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s"); + + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + } + + public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception + { + startCluster(true); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startNode(masterPort); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testClusterRestartWithoutDesignatedPrimary() throws Exception + { + startCluster(false); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startClusterParallel(); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + assertProducingConsuming(connection); + } + + public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception + { + startCluster(false); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + try + { + assertProducingConsuming(connection); + fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available"); + } + catch(JMSException e) + { + // JMSException should be thrown on transaction start/commit + } + } + + public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + + try + { + getConnection(_brokerFailoverUrl); + fail("Connection not expected"); + } + catch (JMSException e) + { + // PASS + } + } + + public void testInitialDesignatedPrimaryStateOfNodes() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); + + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + } + + public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + + assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to new primary", connection); + assertProducingConsuming(connection); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + return storeBean; + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 1); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + session.commit(); + } + +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java new file mode 100644 index 0000000000..408643b98a --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.Set; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +/** + * The HA white box tests test the BDB cluster where the test retains the knowledge of the + * individual test nodes. It uses this knowledge to examine the nodes to ensure that they + * remain in the correct state throughout the test. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterWhiteboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + + private final int NUMBER_OF_NODES = 3; + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testClusterPermitsConnectionToOnlyOneNode() throws Exception + { + int connectionSuccesses = 0; + int connectionFails = 0; + + for (int brokerPortNumber : getBrokerPortNumbers()) + { + try + { + getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); + connectionSuccesses++; + } + catch(JMSException e) + { + assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + connectionFails++; + } + } + + assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); + assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); + } + + public void testClusterThatLosesNodeStillAllowsConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + + // verify that JMS persistence operations are working + assertProducingConsuming(subsequentConnection); + + closeConnection(initialConnection); + } + + public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); + + killBroker(subsequentPortNumber); + + final Connection finalConnection = getConnectionToNodeInCluster(); + assertNull(finalConnection); + + closeConnection(initialConnection); + } + + public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception + { + final Connection connection = getConnectionToNodeInCluster(); + assertNotNull(connection); + + final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); + connection.close(); + + _clusterCreator.stopNode(brokerPortNumber); + _clusterCreator.startNode(brokerPortNumber); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + } + + public void testClusterLosingNodeRetainsData() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + + final String queueNamePrefix = getTestQueueName(); + final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; + final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; + + populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + + closeConnectionAndKillBroker(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + + assertNotNull("no valid connection obtained", subsequentConnection); + + checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + } + + public void xtestRecoveryOfOutOfDateNode() throws Exception + { + /* + * TODO: Implement + * + * Cant yet find a way to control cleaning in a deterministic way to allow provoking + * a node to become out of date. We do now know that even a new joiner to the group + * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has + * done *any* cleaning and then adding a new node should be sufficient to cause this. + */ + } + + private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception + { + populateBrokerWithData(connection, 1, queueUrls); + } + + private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception + { + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + session.createConsumer(queue).close(); + sendMessage(session, queue, noOfMessages); + } + } + + private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException + { + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + final MessageConsumer consumer = session.createConsumer(queue); + final Message message = consumer.receive(1000); + session.commit(); + assertNotNull("Queue " + queue + " should have message", message); + assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); + } + } + + private Connection getConnectionToNodeInCluster() throws URLSyntaxException + { + Connection connection = null; + Set runningBrokerPorts = getBrokerPortNumbers(); + + for (int brokerPortNumber : runningBrokerPorts) + { + try + { + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); + break; + } + catch(JMSException je) + { + assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + } + } + return connection; + } + + private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception + { + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + initialConnection.close(); + + killBroker(initialPortNumber); // kill awaits the death of the child + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 2); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + Message m2 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 2 is not received", m2); + assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX)); + session.commit(); + } + + private void closeConnection(final Connection initialConnection) + { + try + { + initialConnection.close(); + } + catch(Exception e) + { + // ignore. + // java.net.SocketException is seen sometimes on active connection + } + } +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java new file mode 100644 index 0000000000..353c3a0ec5 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +public class HATestClusterCreator +{ + protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); + + private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + + private static final int FAILOVER_CYCLECOUNT = 10; + private static final int FAILOVER_RETRIES = 1; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 60; + private static final int CONNECTDELAY = 75; + + private final QpidBrokerTestCase _testcase; + private final Map _brokerPortToBdbPortMap = new HashMap(); + private final Map _brokerConfigurations = new TreeMap(); + private final String _virtualHostName; + private final String _vhostStoreConfigKeyPrefix; + + private final String _ipAddressOfBroker; + private final String _groupName ; + private final int _numberOfNodes; + private int _bdbHelperPort; + private int _primaryBrokerPort; + private String _vhostConfigKeyPrefix; + + public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + { + _testcase = testcase; + _virtualHostName = virtualHostName; + _groupName = "group" + _testcase.getName(); + _ipAddressOfBroker = getIpAddressOfBrokerHost(); + _numberOfNodes = numberOfNodes; + _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + "."; + _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store."; + _bdbHelperPort = 0; + } + + public void configureClusterNodes() throws Exception + { + int brokerPort = _testcase.findFreePort(); + + for (int i = 0; i < _numberOfNodes; i++) + { + int bdbPort = _testcase.getNextAvailable(brokerPort + 1); + _brokerPortToBdbPortMap.put(brokerPort, bdbPort); + + LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); + if (_bdbHelperPort == 0) + { + _bdbHelperPort = bdbPort; + } + + configureClusterNode(brokerPort, bdbPort); + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); + brokerConfiguration.addJmxManagementConfiguration(); + collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts()); + + brokerPort = _testcase.getNextAvailable(bdbPort + 1); + } + } + + public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + final Entry brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next(); + final String configKey = getConfigKey("highAvailability.designatedPrimary"); + brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary)); + _primaryBrokerPort = brokerConfigEntry.getKey(); + } + + /** + * @param configKeySuffix "highAvailability.designatedPrimary", for example + * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example + */ + private String getConfigKey(String configKeySuffix) + { + final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts."); + return configKey; + } + + public void startNode(final int brokerPortNumber) throws Exception + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + + _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts()); + + _testcase.startBroker(brokerPortNumber); + } + + public void startCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + startNode(brokerPortNumber); + } + } + + public void startClusterParallel() throws Exception + { + final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size()); + try + { + List> brokers = new CopyOnWriteArrayList>(); + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + Future future = executor.submit(new Callable() + { + public Object call() + { + try + { + _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(), + brokerConfigHolder.getTestVirtualhosts()); + return "OK"; + } + catch (Exception e) + { + return e; + } + } + }); + brokers.add(future); + } + for (Future future : brokers) + { + Object result = future.get(30, TimeUnit.SECONDS); + LOGGER.debug("Node startup result:" + result); + if (result instanceof Exception) + { + throw (Exception) result; + } + else if (!"OK".equals(result)) + { + throw new Exception("One of the cluster nodes is not started"); + } + } + } + catch (Exception e) + { + stopCluster(); + throw e; + } + finally + { + executor.shutdown(); + } + + } + + public void stopNode(final int brokerPortNumber) + { + _testcase.killBroker(brokerPortNumber); + } + + public void stopCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + try + { + stopNode(brokerPortNumber); + } + catch(Exception e) + { + LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); + } + } + } + + public int getBrokerPortNumberFromConnection(Connection connection) + { + final AMQConnection amqConnection = (AMQConnection)connection; + return amqConnection.getActiveBrokerDetails().getPort(); + } + + public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) + { + final Set allBrokerPorts = _testcase.getBrokerPortNumbers(); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); + allBrokerPorts.remove(activeBrokerPort); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int inactiveBrokerPort = allBrokerPorts.iterator().next(); + return inactiveBrokerPort; + } + + public int getBdbPortForBrokerPort(final int brokerPortNumber) + { + return _brokerPortToBdbPortMap.get(brokerPortNumber); + } + + public Set getBdbPortNumbers() + { + return new HashSet(_brokerPortToBdbPortMap.values()); + } + + public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + final StringBuilder brokerList = new StringBuilder(); + + for(Iterator itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) + { + int brokerPortNumber = itr.next(); + + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); + if (itr.hasNext()) + { + brokerList.append(";"); + } + } + + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); + } + + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException + { + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + + return new AMQConnectionURL(url); + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + _testcase.getName() + bdbPort; + } + + public String getNodeHostPortForNodeAt(final int bdbPort) + { + return _ipAddressOfBroker + ":" + bdbPort; + } + + public String getHelperHostPort() + { + if (_bdbHelperPort == 0) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + + return _ipAddressOfBroker + ":" + _bdbHelperPort; + } + + public void setHelperHostPort(int bdbHelperPort) + { + _bdbHelperPort = bdbHelperPort; + } + + public int getBrokerPortNumberOfPrimary() + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + return _primaryBrokerPort; + } + + public int getBrokerPortNumberOfSecondaryNode() + { + final Set portNumbers = getBrokerPortNumbersForNodes(); + portNumbers.remove(getBrokerPortNumberOfPrimary()); + return portNumbers.iterator().next(); + } + + public Set getBrokerPortNumbersForNodes() + { + return new HashSet(_brokerConfigurations.keySet()); + } + + private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception + { + final String nodeName = getNodeNameForNodeAt(bdbPort); + + + _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); + + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); + _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); + } + + public String getIpAddressOfBrokerHost() + { + String brokerHost = _testcase.getBroker().getHost(); + try + { + return InetAddress.getByName(brokerHost).getHostAddress(); + } + catch (UnknownHostException e) + { + throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); + } + } + + private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration, + (XMLConfiguration) testVirtualhosts.clone())); + } + + public class BrokerConfigHolder + { + private final TestBrokerConfiguration _testConfiguration; + private final XMLConfiguration _testVirtualhosts; + + public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _testConfiguration = testConfiguration; + _testVirtualhosts = testVirtualhosts; + } + + public TestBrokerConfiguration getTestConfiguration() + { + return _testConfiguration; + } + + public XMLConfiguration getTestVirtualhosts() + { + return _testVirtualhosts; + } + } + + public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved); + final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts(); + + final String configKey = getConfigKey("highAvailability.nodeHostPort"); + final String oldBdbHostPort = virtualHostConfig.getString(configKey); + + final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + final String oldHost = oldHostAndPort[0]; + + final String newBdbHostPort = oldHost + ":" + newBdbPort; + + virtualHostConfig.setProperty(configKey, newBdbHostPort); + collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); + } + + public String getStoreConfigKeyPrefix() + { + return _vhostStoreConfigKeyPrefix; + } + + +} diff --git a/qpid/java/build.xml b/qpid/java/build.xml index 85117b739d..e761677f6d 100644 --- a/qpid/java/build.xml +++ b/qpid/java/build.xml @@ -23,7 +23,7 @@ - + diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 7dc66f3929..3bd9f7381c 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -537,7 +537,14 @@ - + + + + + + + + -- cgit v1.2.1