summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/test
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
committerRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
commitf5d67044a9797c397764a7ac1aa1a1ed4aa893a3 (patch)
treecf8a9cf6a5f741e31417ca4d32a6b708bb3b9fdd /qpid/java/bdbstore/src/test
parentf523b9e510fc90ce3f7f7d7c2960f3bfee3d42df (diff)
downloadqpid-python-f5d67044a9797c397764a7ac1aa1a1ed4aa893a3.tar.gz
QPID-4006: add support for using BDB HA to form an active-passive cluster for persistent messaging
- Includes support for setting BDB configuration parameters via the store configuration, both for the existing store and the new HA variant. - Removes the MessageStoreFactory and reverts store configuration to historical values. Applied patch from Keith Wall, Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java218
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java144
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java20
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java163
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java233
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java234
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java288
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java43
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java479
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java6
10 files changed, 1815 insertions, 13 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java
new file mode 100644
index 0000000000..00f99b7097
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+
+public class BDBHAMessageStoreManagerMBeanTest extends TestCase
+{
+ private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_NODE_NAME = "testNodeName";
+ private static final String TEST_NODE_HOST_PORT = "host:1234";
+ private static final String TEST_HELPER_HOST_PORT = "host:5678";
+ private static final String TEST_REPLICATION_POLICY = "sync,sync,all";
+ private static final String TEST_NODE_STATE = "MASTER";
+ private static final String TEST_STORE_NAME = "testStoreName";
+ private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
+
+ private BDBHAMessageStore _store;
+ private BDBHAMessageStoreManagerMBean _mBean;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+ _store = mock(BDBHAMessageStore.class);
+ _mBean = new BDBHAMessageStoreManagerMBean(_store);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ CurrentActor.remove();
+ }
+
+ public void testObjectName() throws Exception
+ {
+ when(_store.getName()).thenReturn(TEST_STORE_NAME);
+
+ String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME;
+ assertEquals(expectedObjectName, _mBean.getObjectName().toString());
+ }
+
+ public void testGroupName() throws Exception
+ {
+ when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+
+ assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
+ }
+
+ public void testNodeName() throws Exception
+ {
+ when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+
+ assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
+ }
+
+ public void testNodeHostPort() throws Exception
+ {
+ when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+
+ assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
+ }
+
+ public void testHelperHostPort() throws Exception
+ {
+ when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+
+ assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
+ }
+
+ public void testReplicationPolicy() throws Exception
+ {
+ when(_store.getReplicationPolicy()).thenReturn(TEST_REPLICATION_POLICY);
+
+ assertEquals(TEST_REPLICATION_POLICY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_REPLICATION_POLICY));
+ }
+
+ public void testNodeState() throws Exception
+ {
+ when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+
+ assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
+ }
+
+ public void testDesignatedPrimaryFlag() throws Exception
+ {
+ when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+
+ assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
+ }
+
+ public void testGroupMembersForGroupWithOneNode() throws Exception
+ {
+ List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
+ when(_store.getGroupMembers()).thenReturn(members);
+
+ final TabularData resultsTable = _mBean.getAllNodesInGroup();
+
+ assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+
+ final int numberOfDataRows = resultsTable.size();
+ assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
+ final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
+ assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+
+ public void testRemoveNodeFromReplicationGroup() throws Exception
+ {
+ _mBean.removeNodeFromGroup(TEST_NODE_NAME);
+
+ verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ }
+
+ public void testRemoveNodeFromReplicationGroupWithError() throws Exception
+ {
+ doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+
+ try
+ {
+ _mBean.removeNodeFromGroup(TEST_NODE_NAME);
+ fail("Exception not thrown");
+ }
+ catch (JMException je)
+ {
+ // PASS
+ }
+ }
+
+ public void testSetAsDesignatedPrimary() throws Exception
+ {
+ _mBean.setDesignatedPrimary(true);
+
+ verify(_store).setDesignatedPrimary(true);
+ }
+
+ public void testSetAsDesignatedPrimaryWithError() throws Exception
+ {
+ doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+
+ try
+ {
+ _mBean.setDesignatedPrimary(true);
+ fail("Exception not thrown");
+ }
+ catch (JMException je)
+ {
+ // PASS
+ }
+ }
+
+ public void testUpdateAddress() throws Exception
+ {
+ String newHostName = "newHostName";
+ int newPort = 1967;
+
+ _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
+
+ verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ }
+
+ private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
+ {
+ CompositeType headingsRow = resultsTable.getTabularType().getRowType();
+ for (final String headingName : headingNames)
+ {
+ assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName));
+ }
+ }
+
+ private Map<String, String> createTestNodeResult()
+ {
+ Map<String, String> items = new HashMap<String, String>();
+ items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+ items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+ return items;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
new file mode 100644
index 0000000000..6f851bd94e
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
@@ -0,0 +1,144 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetAddress;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class BDBHAMessageStoreTest extends QpidTestCase
+{
+ private static final String TEST_LOG_FILE_MAX = "1000000";
+ private static final String TEST_ELECTION_RETRIES = "1000";
+ private static final String TEST_NUMBER_OF_THREADS = "10";
+ private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999";
+ private String _groupName;
+ private String _workDir;
+ private int _masterPort;
+ private String _host;
+ private XMLConfiguration _configXml;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _workDir = TMP_FOLDER + File.separator + getName();
+ _host = InetAddress.getByName("localhost").getHostAddress();
+ _groupName = "group" + getName();
+ _masterPort = -1;
+
+ FileUtils.delete(new File(_workDir), true);
+ _configXml = new XMLConfiguration();
+ }
+
+ public void tearDown() throws Exception
+ {
+ FileUtils.delete(new File(_workDir), true);
+ super.tearDown();
+ }
+
+ public void testSetSystemConfiguration() throws Exception
+ {
+ // create virtual host configuration, registry and host instance
+ addVirtualHostConfiguration();
+ TestApplicationRegistry registry = initialize();
+ try
+ {
+ VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort);
+ BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore();
+
+ // test whether JVM system settings were applied
+ Environment env = store.getEnvironment();
+ assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
+ assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
+ assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
+ assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ private void addVirtualHostConfiguration() throws Exception
+ {
+ int port = findFreePort();
+ if (_masterPort == -1)
+ {
+ _masterPort = port;
+ }
+ String nodeName = getNodeNameForNodeAt(port);
+
+ String vhostName = "test" + port;
+ String vhostPrefix = "virtualhosts.virtualhost." + vhostName;
+
+ _configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
+ _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName());
+ _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator
+ + port);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort",
+ getNodeHostPortForNodeAt(port));
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort",
+ getHelperHostPort());
+
+ _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS);
+ _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS);
+
+ _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX);
+ _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX);
+
+ _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES);
+
+ _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT);
+ _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT);
+ }
+
+ private String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + getName() + bdbPort;
+ }
+
+ private String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _host + ":" + bdbPort;
+ }
+
+ private String getHelperHostPort()
+ {
+ if (_masterPort == -1)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+ return _host + ":" + _masterPort;
+ }
+
+ private TestApplicationRegistry initialize() throws Exception
+ {
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+ ServerConfiguration configuration = new ServerConfiguration(_configXml);
+ TestApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort);
+ return registry;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a318187f13..591bc27d1e 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -70,7 +70,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -220,11 +220,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
* Use this method instead of reloading the virtual host like other tests in order
* to avoid the recovery handler deleting the message for not being on a queue.
*/
- private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
+ private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception
{
messageStore.close();
- BDBMessageStore newStore = new BDBMessageStore();
+ AbstractBDBMessageStore newStore = new BDBMessageStore();
newStore.configure("", _config.subset("store"));
newStore.startWithNoRecover();
@@ -282,7 +282,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
public void testGetContentWithOffset() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -342,7 +342,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
public void testMessageCreationAndRemoval() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -367,12 +367,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
- private BDBMessageStore assertBDBStore(MessageStore store)
+ private AbstractBDBMessageStore assertBDBStore(MessageStore store)
{
assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
- return (BDBMessageStore) store;
+ return (AbstractBDBMessageStore) store;
}
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
@@ -405,7 +405,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -443,7 +443,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -484,7 +484,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
new file mode 100644
index 0000000000..afe0435901
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * The HA black box tests test the BDB cluster as a opaque unit. Client connects to
+ * the cluster via a failover url
+ *
+ * @see HAClusterWhiteboxTest
+ */
+public class HAClusterBlackboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+ private static final int NUMBER_OF_NODES = 3;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ private FailoverAwaitingListener _failoverAwaitingListener;
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME)
+ + File.separator + "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+
+ _clusterCreator.startCluster();
+ _failoverAwaitingListener = new FailoverAwaitingListener();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testLossOfActiveNodeCausesClientToFailover() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ _clusterCreator.stopNode(activeBrokerPort);
+ LOGGER.info("Node is stopped");
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception
+ {
+ LOGGER.info("Connecting to " + _brokerFailoverUrl);
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+ final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+
+ LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort);
+
+ _clusterCreator.stopNode(inactiveBrokerPort);
+
+ _failoverAwaitingListener.assertFailoverDoesNotOccur(2000);
+
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private final class FailoverAwaitingListener implements ConnectionListener
+ {
+ private final CountDownLatch _failoverLatch = new CountDownLatch(1);
+
+ @Override
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public void assertFailoverOccurs(long delay) throws InterruptedException
+ {
+ _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover did not occur", 0, _failoverLatch.getCount());
+ }
+
+ public void assertFailoverDoesNotOccur(long delay) throws InterruptedException
+ {
+ _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount());
+ }
+
+
+ @Override
+ public void failoverComplete()
+ {
+ _failoverLatch.countDown();
+ }
+
+ @Override
+ public void bytesSent(long count)
+ {
+ }
+
+ @Override
+ public void bytesReceived(long count)
+ {
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
new file mode 100644
index 0000000000..1afa45fd5a
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.EnvironmentFailureException;
+
+/**
+ * System test verifying the ability to control a cluster via the Management API.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterManagementTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class);
+
+ private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+ private static final int NUMBER_OF_NODES = 4;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+ _jmxUtils.setUp();
+
+ _clusterCreator.configureClusterNodes();
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testReadonlyMBeanAttributes() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+ final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName());
+ assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName());
+ assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort());
+ assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort());
+ // As we have chosen an arbitrary broker from the cluster, we cannot predict its state
+ assertNotNull("Store state must not be null", storeBean.getNodeState());
+ }
+
+ public void testStateOfActiveBrokerIsMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber);
+ assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState());
+ }
+
+ public void testStateOfNonActiveBrokerIsNotMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
+ final String nodeState = storeBean.getNodeState();
+ assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState));
+ }
+
+ public void testGroupMembers() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ final TabularData groupMembers = storeBean.getAllNodesInGroup();
+ assertNotNull(groupMembers);
+
+ final int numberOfDataRows = groupMembers.size();
+ assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows);
+
+ for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
+ {
+ final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
+ final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber);
+
+ CompositeData row = groupMembers.get(new Object[] {nodeName});
+ assertNotNull("Table does not contain row for node name " + nodeName, row);
+ assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+ }
+
+ public void testRemoveNodeFromGroup() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
+ final int numberOfDataRows = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows);
+
+ final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
+ _clusterCreator.stopNode(brokerPortNumberToBeRemoved);
+ storeBean.removeNodeFromGroup(removedNodeName);
+
+ final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
+ }
+
+ /**
+ * Updates the address of a node.
+ *
+ * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
+ * assert.
+ *
+ * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
+ */
+ public void testUpdateAddress() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ }
+
+ /**
+ * @see #testUpdateAddress()
+ */
+ public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ // now deliberately don't call updateAddress
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ try
+ {
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ fail("Exception not thrown");
+ }
+ catch(RuntimeException rte)
+ {
+ //check cause was BDBs EnvironmentFailureException
+ assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName()));
+ // PASS
+ }
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
new file mode 100644
index 0000000000..5f995ae25d
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class HAClusterTwoNodeTest extends QpidBrokerTestCase
+{
+ private static final long RECEIVE_TIMEOUT = 5000l;
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+ private static final int NUMBER_OF_NODES = 2;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+ _jmxUtils.setUp();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ private void startCluster(boolean autoDesignedPrimary) throws Exception
+ {
+ setSystemProperty("java.util.logging.config.file",
+ System.getProperty(QPID_HOME) + File.separator + "etc" + File.separator + "log.properties");
+
+ String vhostPrefix = "virtualhosts.virtualhost." + VIRTUAL_HOST;
+
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(0).value", "2 s");
+
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(1).value", "0");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary);
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+ }
+
+ /**
+ * Tests that a two node cluster, in which the master CAN automatically designate itself primary
+ * (after becoming master) continues to operate after being shut down and restarted.
+ *
+ * The test does not concern itself with which broker becomes master at any given point
+ * (which is likely to swap during the test).
+ */
+ public void testClusterRestartWithAutoDesignatedPrimary() throws Exception
+ {
+ testClusterRestartImpl(true);
+ }
+
+ /**
+ * Tests that a two node cluster, in which the master can NOT automatically designate itself
+ * primary (after becoming master) continues to operate after being shut down and restarted.
+ *
+ * The test does not concern itself with which broker becomes master at any given point
+ * (which is likely to swap during the test).
+ */
+ public void testClusterRestartWithoutAutoDesignatedPrimary() throws Exception
+ {
+ testClusterRestartImpl(false);
+ }
+
+ private void testClusterRestartImpl(boolean autoDesignatedPrimary) throws Exception
+ {
+ startCluster(autoDesignatedPrimary);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _clusterCreator.stopCluster();
+ _clusterCreator.startClusterParallel();
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ /**
+ * This test make sure than JMS operations are still working after stopping replica
+ * when master is designated primary (which is by default).
+ * <p>
+ * When master is not designated primary this test should fail.
+ */
+ public void testAutoDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ public void testPersistentOperationsFailOnNonAutoDesignatedPrimarysAfterSecondaryStopped() throws Exception
+ {
+
+ startCluster(false);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ try
+ {
+ assertProducingConsuming(connection);
+ fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available");
+ }
+ catch(JMSException e)
+ {
+ // JMSException should be thrown on transaction start/commit
+ }
+ }
+
+ public void testSecondaryDoesNotBecomePrimaryWhenAutoDesignatedPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+
+ try
+ {
+ getConnection(_brokerFailoverUrl);
+ fail("Connection not expected");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
+ {
+ startCluster(true);
+ final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary());
+ assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary());
+
+ final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary());
+ }
+
+ public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+
+ assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary());
+ storeBean.setDesignatedPrimary(true);
+ assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary());
+
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to new primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ return storeBean;
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 1);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ session.commit();
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
new file mode 100644
index 0000000000..4b64466ff2
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+import com.sleepycat.je.rep.InsufficientLogException;
+
+/**
+ * The HA white box tests test the BDB cluster where the test retains the knowledge of the
+ * individual test nodes. It uses this knowledge to examine the nodes to ensure that they
+ * remain in the correct state throughout the test.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterWhiteboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private final int NUMBER_OF_NODES = 3;
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ // TODO Test for node falling behind
+ // TODO Factory refactoring?? // MessageStore construction??
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME)
+ + File.separator + "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testClusterPermitsConnectionToOnlyOneNode() throws Exception
+ {
+ int connectionSuccesses = 0;
+ int connectionFails = 0;
+
+ for (int brokerPortNumber : getBrokerPortNumbers())
+ {
+ try
+ {
+ getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+ connectionSuccesses++;
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ connectionFails++;
+ }
+ }
+
+ assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails);
+ assertEquals("Unexpected number of successful connections", 1, connectionSuccesses);
+ }
+
+ public void testClusterThatLosesNodeStillAllowsConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+
+ // verify that JMS persistence operations are working
+ assertProducingConsuming(subsequentConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection);
+
+ killBroker(subsequentPortNumber);
+
+ final Connection finalConnection = getConnectionToNodeInCluster();
+ assertNull(finalConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception
+ {
+ final Connection connection = getConnectionToNodeInCluster();
+ assertNotNull(connection);
+
+ final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+
+ _clusterCreator.stopNode(brokerPortNumber);
+ _clusterCreator.startNode(brokerPortNumber);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ }
+
+ public void testClusterLosingNodeRetainsData() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+
+ final String queueNamePrefix = getTestQueueName();
+ final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'";
+ final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'";
+
+ populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+
+ killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+
+ assertNotNull("no valid connection obtained", subsequentConnection);
+
+ checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+ }
+
+ public void testRecoveryOfOutOfDateNode() throws Exception
+ {
+ /*
+ * TODO: Implement
+ *
+ * Cant yet find a way to control cleaning in a deterministic way to allow provoking
+ * a node to become out of date. We do now know that even a new joiner to the group
+ * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has
+ * done *any* cleaning and then adding a new node should be sufficient to cause this.
+ */
+ }
+
+ private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception
+ {
+ populateBrokerWithData(connection, 1, queueUrls);
+ }
+
+ private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception
+ {
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ session.createConsumer(queue).close();
+ sendMessage(session, queue, noOfMessages);
+ }
+ }
+
+ private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException
+ {
+ connection.start();
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ final Message message = consumer.receive(1000);
+ session.commit();
+ assertNotNull("Queue " + queue + " should have message", message);
+ assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX));
+ }
+ }
+
+ private Connection getConnectionToNodeInCluster() throws URLSyntaxException
+ {
+ Connection connection = null;
+ Set<Integer> runningBrokerPorts = getBrokerPortNumbers();
+
+ for (int brokerPortNumber : runningBrokerPorts)
+ {
+ try
+ {
+ connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+ break;
+ }
+ catch(JMSException je)
+ {
+ assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ }
+ }
+ return connection;
+ }
+
+ private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException,
+ InterruptedException
+ {
+ try
+ {
+ // NewMasterEvent is received twice: first for the existing master,
+ // second for a new master
+ CountDownLatch newMasterLatch = new CountDownLatch(2);
+ _clusterCreator.startMonitorNode();
+ _clusterCreator.statListeningForNewMasterEvent(newMasterLatch);
+
+ final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ killBroker(initialPortNumber);
+
+ assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ _clusterCreator.shutdownMonitor();
+ }
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 2);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ Message m2 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 2 is not received", m2);
+ assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX));
+ session.commit();
+ }
+
+ private void closeConnection(final Connection initialConnection)
+ {
+ try
+ {
+ initialConnection.close();
+ }
+ catch(Exception e)
+ {
+ // ignore.
+ // java.net.SocketException is seen sometimes on active connection
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
new file mode 100644
index 0000000000..eaa3c3eba4
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class HAMessageStoreSmokeTest extends QpidTestCase
+{
+ private final BDBHAMessageStore _store = new BDBHAMessageStore();
+ private final XMLConfiguration _config = new XMLConfiguration();
+
+ public void testMissingHAConfigThrowsException() throws Exception
+ {
+ try
+ {
+ _store.configure("test", _config);
+ fail("Expected an exception to be thrown");
+ }
+ catch (ConfigurationException ce)
+ {
+ assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
+ }
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
new file mode 100644
index 0000000000..43cfa5f4d5
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.monitor.GroupChangeEvent;
+import com.sleepycat.je.rep.monitor.JoinGroupEvent;
+import com.sleepycat.je.rep.monitor.LeaveGroupEvent;
+import com.sleepycat.je.rep.monitor.Monitor;
+import com.sleepycat.je.rep.monitor.MonitorChangeListener;
+import com.sleepycat.je.rep.monitor.MonitorConfig;
+import com.sleepycat.je.rep.monitor.NewMasterEvent;
+
+public class HATestClusterCreator
+{
+ protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class);
+
+ private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
+ private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
+ private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
+
+ private static final int CYCLECOUNT = 2;
+ private static final int RETRIES = 2;
+ private static final int CONNECTDELAY = 1000;
+
+ private final QpidBrokerTestCase _testcase;
+ private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
+ private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
+ private final String _virtualHostName;
+ private final String _configKeyPrefix;
+
+ private final String _ipAddressOfBroker;
+ private final String _groupName ;
+ private final int _numberOfNodes;
+ private int _bdbHelperPort;
+ private int _primaryBrokerPort;
+ private Monitor _monitor;
+
+ public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
+ {
+ _testcase = testcase;
+ _virtualHostName = virtualHostName;
+ _groupName = "group" + _testcase.getName();
+ _ipAddressOfBroker = getIpAddressOfBrokerHost();
+ _numberOfNodes = numberOfNodes;
+ _configKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
+ _bdbHelperPort = 0;
+ }
+
+ public void configureClusterNodes() throws Exception
+ {
+ int brokerPort = _testcase.findFreePort();
+
+ for (int i = 0; i < _numberOfNodes; i++)
+ {
+ int bdbPort = _testcase.getNextAvailable(brokerPort + 1);
+ _brokerPortToBdbPortMap.put(brokerPort, bdbPort);
+
+ LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort);
+ if (_bdbHelperPort == 0)
+ {
+ _bdbHelperPort = bdbPort;
+ }
+
+ configureClusterNode(brokerPort, bdbPort);
+ collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts());
+
+ brokerPort = _testcase.getNextAvailable(bdbPort + 1);
+ }
+ }
+
+ public void setAutoDesignatedPrimary(boolean autoDesignatedPrimary) throws Exception
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next();
+ final String configKey = getConfigKey("highAvailability.autoDesignatedPrimary");
+ brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(autoDesignatedPrimary));
+ _primaryBrokerPort = brokerConfigEntry.getKey();
+ }
+
+ /**
+ * @param configKeySuffix "highAvailability.designatedPrimary", for example
+ * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example
+ */
+ private String getConfigKey(String configKeySuffix)
+ {
+ final String configKey = StringUtils.substringAfter(_configKeyPrefix + configKeySuffix, "virtualhosts.");
+ return configKey;
+ }
+
+ public void startNode(final int brokerPortNumber) throws Exception
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+
+ _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration());
+ _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts());
+
+ _testcase.startBroker(brokerPortNumber);
+ }
+
+ public void startCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ startNode(brokerPortNumber);
+ }
+ }
+
+ public void startClusterParallel() throws Exception
+ {
+ final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size());
+ try
+ {
+ List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>();
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+ Future<Object> future = executor.submit(new Callable<Object>()
+ {
+ public Object call()
+ {
+ try
+ {
+ _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(),
+ brokerConfigHolder.getTestVirtualhosts());
+ return "OK";
+ }
+ catch (Exception e)
+ {
+ return e;
+ }
+ }
+ });
+ brokers.add(future);
+ }
+ for (Future<Object> future : brokers)
+ {
+ Object result = future.get(30, TimeUnit.SECONDS);
+ LOGGER.debug("Node startup result:" + result);
+ if (result instanceof Exception)
+ {
+ throw (Exception) result;
+ }
+ else if (!"OK".equals(result))
+ {
+ throw new Exception("One of the cluster nodes is not started");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ stopCluster();
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+
+ }
+
+ public void stopNode(final int brokerPortNumber)
+ {
+ _testcase.stopBroker(brokerPortNumber);
+ }
+
+ public void stopCluster() throws Exception
+ {
+ shutdownMonitor();
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ try
+ {
+ stopNode(brokerPortNumber);
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Failed to stop node on port:" + brokerPortNumber);
+ }
+ }
+ }
+
+ public int getBrokerPortNumberFromConnection(Connection connection)
+ {
+ final AMQConnection amqConnection = (AMQConnection)connection;
+ return amqConnection.getActiveBrokerDetails().getPort();
+ }
+
+ public int getPortNumberOfAnInactiveBroker(final Connection activeConnection)
+ {
+ final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers();
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection);
+ allBrokerPorts.remove(activeBrokerPort);
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int inactiveBrokerPort = allBrokerPorts.iterator().next();
+ return inactiveBrokerPort;
+ }
+
+ public int getBdbPortForBrokerPort(final int brokerPortNumber)
+ {
+ return _brokerPortToBdbPortMap.get(brokerPortNumber);
+ }
+
+ public Set<Integer> getBdbPortNumbers()
+ {
+ return new HashSet<Integer>(_brokerPortToBdbPortMap.values());
+ }
+
+ public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception
+ {
+ final StringBuilder brokerList = new StringBuilder();
+
+ for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); )
+ {
+ int brokerPortNumber = itr.next();
+
+ brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES));
+ if (itr.hasNext())
+ {
+ brokerList.append(";");
+ }
+ }
+
+ return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT));
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException
+ {
+ String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+ return new AMQConnectionURL(url);
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + _testcase.getName() + bdbPort;
+ }
+
+ public String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _ipAddressOfBroker + ":" + bdbPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ if (_bdbHelperPort == 0)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+
+ return _ipAddressOfBroker + ":" + _bdbHelperPort;
+ }
+
+ public void setHelperHostPort(int bdbHelperPort)
+ {
+ _bdbHelperPort = bdbHelperPort;
+ }
+
+ public int getBrokerPortNumberOfPrimary()
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ return _primaryBrokerPort;
+ }
+
+ public int getBrokerPortNumberOfSecondaryNode()
+ {
+ final Set<Integer> portNumbers = getBrokerPortNumbersForNodes();
+ portNumbers.remove(getBrokerPortNumberOfPrimary());
+ return portNumbers.iterator().next();
+ }
+
+ public Set<Integer> getBrokerPortNumbersForNodes()
+ {
+ return new HashSet<Integer>(_brokerConfigurations.keySet());
+ }
+
+ private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception
+ {
+ final String nodeName = getNodeNameForNodeAt(bdbPort);
+
+ _testcase.setConfigurationProperty(_configKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
+
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.groupName", _groupName);
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName);
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
+ // TODO replication policy
+ }
+
+ public String getIpAddressOfBrokerHost()
+ {
+ String brokerHost = _testcase.getBroker().getHost();
+ try
+ {
+ return InetAddress.getByName(brokerHost).getHostAddress();
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e);
+ }
+ }
+
+ private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(),
+ (XMLConfiguration) testVirtualhosts.clone()));
+ }
+
+ public class BrokerConfigHolder
+ {
+ private final XMLConfiguration _testConfiguration;
+ private final XMLConfiguration _testVirtualhosts;
+
+ public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _testConfiguration = testConfiguration;
+ _testVirtualhosts = testVirtualhosts;
+ }
+
+ public XMLConfiguration getTestConfiguration()
+ {
+ return _testConfiguration;
+ }
+
+ public XMLConfiguration getTestVirtualhosts()
+ {
+ return _testVirtualhosts;
+ }
+ }
+
+ public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved);
+ final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts();
+
+ final String configKey = getConfigKey("highAvailability.nodeHostPort");
+ final String oldBdbHostPort = virtualHostConfig.getString(configKey);
+
+ final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
+ final String oldHost = oldHostAndPort[0];
+
+ final String newBdbHostPort = oldHost + ":" + newBdbPort;
+
+ virtualHostConfig.setProperty(configKey, newBdbHostPort);
+ collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
+ }
+
+ public void startMonitorNode()
+ {
+ shutdownMonitor();
+
+ MonitorConfig config = new MonitorConfig();
+ config.setGroupName(_groupName);
+ int monitorPort = _testcase.findFreePort();
+ config.setNodeName(getNodeNameForNodeAt(monitorPort));
+ config.setNodeHostPort("" + monitorPort);
+ config.setHelperHosts(getHelperHostPort());
+
+ _monitor = new Monitor(config);
+
+ ReplicationNode currentMaster = _monitor.register();
+ LOGGER.info("Current master " + currentMaster.getName());
+ }
+
+ public void startListening(MonitorChangeListener listener) throws IOException
+ {
+ _monitor.startListener(listener);
+ }
+
+ public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException
+ {
+ startListening(new MonitorChangeListenerSupport(){
+ @Override
+ public void notify(NewMasterEvent newMasterEvent)
+ {
+ LOGGER.debug("New master is elected " + newMasterEvent.getMasterName());
+ latch.countDown();
+ }
+ });
+ }
+
+ public void shutdownMonitor()
+ {
+ if (_monitor != null)
+ {
+ try
+ {
+ _monitor.shutdown();
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Monitor shutdown error:", e);
+ }
+ }
+ }
+
+ public static class MonitorChangeListenerSupport implements MonitorChangeListener
+ {
+
+ @Override
+ public void notify(NewMasterEvent newMasterEvent)
+ {
+ }
+
+ @Override
+ public void notify(GroupChangeEvent groupChangeEvent)
+ {
+ }
+
+ @Override
+ public void notify(JoinGroupEvent joinGroupEvent)
+ {
+ }
+
+ @Override
+ public void notify(LeaveGroupEvent leaveGroupEvent)
+ {
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
index ba5ca842bf..23fd9bc24f 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
@@ -24,7 +24,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
@@ -94,7 +94,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
{
assertEquals("Unexpected store version", -1, getStoreVersion());
_upgrader.upgradeIfNecessary();
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+ assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
assertContent();
}
@@ -112,7 +112,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
List<String> expectedDatabases = new ArrayList<String>();
expectedDatabases.add(Upgrader.VERSION_DB_NAME);
assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+ assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
nonExistentStoreLocation.delete();
}