diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-05-17 17:26:04 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-05-17 17:26:04 +0000 |
| commit | f5d67044a9797c397764a7ac1aa1a1ed4aa893a3 (patch) | |
| tree | cf8a9cf6a5f741e31417ca4d32a6b708bb3b9fdd /qpid/java/bdbstore/src/test | |
| parent | f523b9e510fc90ce3f7f7d7c2960f3bfee3d42df (diff) | |
| download | qpid-python-f5d67044a9797c397764a7ac1aa1a1ed4aa893a3.tar.gz | |
QPID-4006: add support for using BDB HA to form an active-passive cluster for persistent messaging
- Includes support for setting BDB configuration parameters via the store configuration, both for the existing store and the new HA variant.
- Removes the MessageStoreFactory and reverts store configuration to historical values.
Applied patch from Keith Wall, Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
10 files changed, 1815 insertions, 13 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java new file mode 100644 index 0000000000..00f99b7097 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; + +public class BDBHAMessageStoreManagerMBeanTest extends TestCase +{ + private static final String TEST_GROUP_NAME = "testGroupName"; + private static final String TEST_NODE_NAME = "testNodeName"; + private static final String TEST_NODE_HOST_PORT = "host:1234"; + private static final String TEST_HELPER_HOST_PORT = "host:5678"; + private static final String TEST_REPLICATION_POLICY = "sync,sync,all"; + private static final String TEST_NODE_STATE = "MASTER"; + private static final String TEST_STORE_NAME = "testStoreName"; + private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; + + private BDBHAMessageStore _store; + private BDBHAMessageStoreManagerMBean _mBean; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + _store = mock(BDBHAMessageStore.class); + _mBean = new BDBHAMessageStoreManagerMBean(_store); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + CurrentActor.remove(); + } + + public void testObjectName() throws Exception + { + when(_store.getName()).thenReturn(TEST_STORE_NAME); + + String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME; + assertEquals(expectedObjectName, _mBean.getObjectName().toString()); + } + + public void testGroupName() throws Exception + { + when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME); + + assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME)); + } + + public void testNodeName() throws Exception + { + when(_store.getNodeName()).thenReturn(TEST_NODE_NAME); + + assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME)); + } + + public void testNodeHostPort() throws Exception + { + when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT); + + assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT)); + } + + public void testHelperHostPort() throws Exception + { + when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT); + + assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); + } + + public void testReplicationPolicy() throws Exception + { + when(_store.getReplicationPolicy()).thenReturn(TEST_REPLICATION_POLICY); + + assertEquals(TEST_REPLICATION_POLICY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_REPLICATION_POLICY)); + } + + public void testNodeState() throws Exception + { + when(_store.getNodeState()).thenReturn(TEST_NODE_STATE); + + assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE)); + } + + public void testDesignatedPrimaryFlag() throws Exception + { + when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG); + + assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY)); + } + + public void testGroupMembersForGroupWithOneNode() throws Exception + { + List<Map<String, String>> members = Collections.singletonList(createTestNodeResult()); + when(_store.getGroupMembers()).thenReturn(members); + + final TabularData resultsTable = _mBean.getAllNodesInGroup(); + + assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT); + + final int numberOfDataRows = resultsTable.size(); + assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows); + final CompositeData row = (CompositeData) resultsTable.values().iterator().next(); + assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME)); + assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + + public void testRemoveNodeFromReplicationGroup() throws Exception + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + + verify(_store).removeNodeFromGroup(TEST_NODE_NAME); + } + + public void testRemoveNodeFromReplicationGroupWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME); + + try + { + _mBean.removeNodeFromGroup(TEST_NODE_NAME); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testSetAsDesignatedPrimary() throws Exception + { + _mBean.setDesignatedPrimary(true); + + verify(_store).setDesignatedPrimary(true); + } + + public void testSetAsDesignatedPrimaryWithError() throws Exception + { + doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true); + + try + { + _mBean.setDesignatedPrimary(true); + fail("Exception not thrown"); + } + catch (JMException je) + { + // PASS + } + } + + public void testUpdateAddress() throws Exception + { + String newHostName = "newHostName"; + int newPort = 1967; + + _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort); + + verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort); + } + + private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames) + { + CompositeType headingsRow = resultsTable.getTabularType().getRowType(); + for (final String headingName : headingNames) + { + assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName)); + } + } + + private Map<String, String> createTestNodeResult() + { + Map<String, String> items = new HashMap<String, String>(); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); + items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); + return items; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java new file mode 100644 index 0000000000..6f851bd94e --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java @@ -0,0 +1,144 @@ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.net.InetAddress; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; + +public class BDBHAMessageStoreTest extends QpidTestCase +{ + private static final String TEST_LOG_FILE_MAX = "1000000"; + private static final String TEST_ELECTION_RETRIES = "1000"; + private static final String TEST_NUMBER_OF_THREADS = "10"; + private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999"; + private String _groupName; + private String _workDir; + private int _masterPort; + private String _host; + private XMLConfiguration _configXml; + + public void setUp() throws Exception + { + super.setUp(); + + _workDir = TMP_FOLDER + File.separator + getName(); + _host = InetAddress.getByName("localhost").getHostAddress(); + _groupName = "group" + getName(); + _masterPort = -1; + + FileUtils.delete(new File(_workDir), true); + _configXml = new XMLConfiguration(); + } + + public void tearDown() throws Exception + { + FileUtils.delete(new File(_workDir), true); + super.tearDown(); + } + + public void testSetSystemConfiguration() throws Exception + { + // create virtual host configuration, registry and host instance + addVirtualHostConfiguration(); + TestApplicationRegistry registry = initialize(); + try + { + VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort); + BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore(); + + // test whether JVM system settings were applied + Environment env = store.getEnvironment(); + assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS)); + assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX)); + + ReplicatedEnvironment repEnv = store.getReplicatedEnvironment(); + assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES, + repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES)); + assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT, + repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT)); + } + finally + { + ApplicationRegistry.remove(); + } + } + + private void addVirtualHostConfiguration() throws Exception + { + int port = findFreePort(); + if (_masterPort == -1) + { + _masterPort = port; + } + String nodeName = getNodeNameForNodeAt(port); + + String vhostName = "test" + port; + String vhostPrefix = "virtualhosts.virtualhost." + vhostName; + + _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); + _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName()); + _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator + + port); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort", + getNodeHostPortForNodeAt(port)); + _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort", + getHelperHostPort()); + + _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS); + _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS); + + _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX); + _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX); + + _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES); + + _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT); + _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT); + } + + private String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + getName() + bdbPort; + } + + private String getNodeHostPortForNodeAt(final int bdbPort) + { + return _host + ":" + bdbPort; + } + + private String getHelperHostPort() + { + if (_masterPort == -1) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + return _host + ":" + _masterPort; + } + + private TestApplicationRegistry initialize() throws Exception + { + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + ServerConfiguration configuration = new ServerConfiguration(_configXml); + TestApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort); + return registry; + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index a318187f13..591bc27d1e 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -70,7 +70,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); // Create content ByteBuffers. // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. @@ -220,11 +220,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto * Use this method instead of reloading the virtual host like other tests in order * to avoid the recovery handler deleting the message for not being on a queue. */ - private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception + private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception { messageStore.close(); - BDBMessageStore newStore = new BDBMessageStore(); + AbstractBDBMessageStore newStore = new BDBMessageStore(); newStore.configure("", _config.subset("store")); newStore.startWithNoRecover(); @@ -282,7 +282,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto public void testGetContentWithOffset() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -342,7 +342,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto public void testMessageCreationAndRemoval() throws Exception { MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); + AbstractBDBMessageStore bdbStore = assertBDBStore(store); StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); @@ -367,12 +367,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto assertEquals("Retrieved content when none was expected", 0, bdbStore.getContent(messageid_0_8, 0, dst)); } - private BDBMessageStore assertBDBStore(MessageStore store) + private AbstractBDBMessageStore assertBDBStore(MessageStore store) { assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); - return (BDBMessageStore) store; + return (AbstractBDBMessageStore) store; } private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) @@ -405,7 +405,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() @@ -443,7 +443,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() @@ -484,7 +484,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto { MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); + AbstractBDBMessageStore bdbStore = assertBDBStore(log); final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java new file mode 100644 index 0000000000..afe0435901 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * The HA black box tests test the BDB cluster as a opaque unit. Client connects to + * the cluster via a failover url + * + * @see HAClusterWhiteboxTest + */ +public class HAClusterBlackboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final int NUMBER_OF_NODES = 3; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + private FailoverAwaitingListener _failoverAwaitingListener; + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME) + + File.separator + "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + + _clusterCreator.startCluster(); + _failoverAwaitingListener = new FailoverAwaitingListener(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testLossOfActiveNodeCausesClientToFailover() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + _clusterCreator.stopNode(activeBrokerPort); + LOGGER.info("Node is stopped"); + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception + { + LOGGER.info("Connecting to " + _brokerFailoverUrl); + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + + LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); + + _clusterCreator.stopNode(inactiveBrokerPort); + + _failoverAwaitingListener.assertFailoverDoesNotOccur(2000); + + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private final class FailoverAwaitingListener implements ConnectionListener + { + private final CountDownLatch _failoverLatch = new CountDownLatch(1); + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + public void assertFailoverOccurs(long delay) throws InterruptedException + { + _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover did not occur", 0, _failoverLatch.getCount()); + } + + public void assertFailoverDoesNotOccur(long delay) throws InterruptedException + { + _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount()); + } + + + @Override + public void failoverComplete() + { + _failoverLatch.countDown(); + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java new file mode 100644 index 0000000000..1afa45fd5a --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.Connection; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.EnvironmentFailureException; + +/** + * System test verifying the ability to control a cluster via the Management API. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterManagementTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class); + + private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 4; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + _jmxUtils.setUp(); + + _clusterCreator.configureClusterNodes(); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testReadonlyMBeanAttributes() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); + assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); + assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); + assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); + // As we have chosen an arbitrary broker from the cluster, we cannot predict its state + assertNotNull("Store state must not be null", storeBean.getNodeState()); + } + + public void testStateOfActiveBrokerIsMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); + assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); + } + + public void testStateOfNonActiveBrokerIsNotMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + final String nodeState = storeBean.getNodeState(); + assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); + } + + public void testGroupMembers() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + final TabularData groupMembers = storeBean.getAllNodesInGroup(); + assertNotNull(groupMembers); + + final int numberOfDataRows = groupMembers.size(); + assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows); + + for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) + { + final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); + final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); + + CompositeData row = groupMembers.get(new Object[] {nodeName}); + assertNotNull("Table does not contain row for node name " + nodeName, row); + assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + } + + public void testRemoveNodeFromGroup() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); + final int numberOfDataRows = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows); + + final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); + _clusterCreator.stopNode(brokerPortNumberToBeRemoved); + storeBean.removeNodeFromGroup(removedNodeName); + + final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval); + } + + /** + * Updates the address of a node. + * + * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit + * assert. + * + * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case + */ + public void testUpdateAddress() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + _clusterCreator.startNode(brokerPortNumberToBeMoved); + } + + /** + * @see #testUpdateAddress() + */ + public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next(); + + _clusterCreator.stopNode(brokerPortNumberToBeMoved); + + final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); + final int newBdbPort = getNextAvailable(oldBdbPort + 1); + + // now deliberately don't call updateAddress + + _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); + + try + { + _clusterCreator.startNode(brokerPortNumberToBeMoved); + fail("Exception not thrown"); + } + catch(RuntimeException rte) + { + //check cause was BDBs EnvironmentFailureException + assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName())); + // PASS + } + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java new file mode 100644 index 0000000000..5f995ae25d --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class HAClusterTwoNodeTest extends QpidBrokerTestCase +{ + private static final long RECEIVE_TIMEOUT = 5000l; + + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST; + private static final int NUMBER_OF_NODES = 2; + + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + _jmxUtils.setUp(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + private void startCluster(boolean autoDesignedPrimary) throws Exception + { + setSystemProperty("java.util.logging.config.file", + System.getProperty(QPID_HOME) + File.separator + "etc" + File.separator + "log.properties"); + + String vhostPrefix = "virtualhosts.virtualhost." + VIRTUAL_HOST; + + setConfigurationProperty(vhostPrefix + ".store.repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT); + setConfigurationProperty(vhostPrefix + ".store.repConfig(0).value", "2 s"); + + setConfigurationProperty(vhostPrefix + ".store.repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); + setConfigurationProperty(vhostPrefix + ".store.repConfig(1).value", "0"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + } + + /** + * Tests that a two node cluster, in which the master CAN automatically designate itself primary + * (after becoming master) continues to operate after being shut down and restarted. + * + * The test does not concern itself with which broker becomes master at any given point + * (which is likely to swap during the test). + */ + public void testClusterRestartWithAutoDesignatedPrimary() throws Exception + { + testClusterRestartImpl(true); + } + + /** + * Tests that a two node cluster, in which the master can NOT automatically designate itself + * primary (after becoming master) continues to operate after being shut down and restarted. + * + * The test does not concern itself with which broker becomes master at any given point + * (which is likely to swap during the test). + */ + public void testClusterRestartWithoutAutoDesignatedPrimary() throws Exception + { + testClusterRestartImpl(false); + } + + private void testClusterRestartImpl(boolean autoDesignatedPrimary) throws Exception + { + startCluster(autoDesignatedPrimary); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startClusterParallel(); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + /** + * This test make sure than JMS operations are still working after stopping replica + * when master is designated primary (which is by default). + * <p> + * When master is not designated primary this test should fail. + */ + public void testAutoDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + assertProducingConsuming(connection); + } + + public void testPersistentOperationsFailOnNonAutoDesignatedPrimarysAfterSecondaryStopped() throws Exception + { + + startCluster(false); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + try + { + assertProducingConsuming(connection); + fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available"); + } + catch(JMSException e) + { + // JMSException should be thrown on transaction start/commit + } + } + + public void testSecondaryDoesNotBecomePrimaryWhenAutoDesignatedPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + + try + { + getConnection(_brokerFailoverUrl); + fail("Connection not expected"); + } + catch (JMSException e) + { + // PASS + } + } + + public void testInitialDesignatedPrimaryStateOfNodes() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); + + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + } + + public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception + { + startCluster(true); + _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); + + assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to new primary", connection); + assertProducingConsuming(connection); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + return storeBean; + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 1); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + session.commit(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java new file mode 100644 index 0000000000..4b64466ff2 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +import com.sleepycat.je.rep.InsufficientLogException; + +/** + * The HA white box tests test the BDB cluster where the test retains the knowledge of the + * individual test nodes. It uses this knowledge to examine the nodes to ensure that they + * remain in the correct state throughout the test. + * + * @see HAClusterBlackboxTest + */ +public class HAClusterWhiteboxTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class); + + private static final String VIRTUAL_HOST = "test"; + + private final int NUMBER_OF_NODES = 3; + private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + // TODO Test for node falling behind + // TODO Factory refactoring?? // MessageStore construction?? + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME) + + File.separator + "etc" + File.separator + "log.properties"); + + _clusterCreator.configureClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testClusterPermitsConnectionToOnlyOneNode() throws Exception + { + int connectionSuccesses = 0; + int connectionFails = 0; + + for (int brokerPortNumber : getBrokerPortNumbers()) + { + try + { + getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + connectionSuccesses++; + } + catch(JMSException e) + { + assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + connectionFails++; + } + } + + assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails); + assertEquals("Unexpected number of successful connections", 1, connectionSuccesses); + } + + public void testClusterThatLosesNodeStillAllowsConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + + // verify that JMS persistence operations are working + assertProducingConsuming(subsequentConnection); + + closeConnection(initialConnection); + } + + public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + assertNotNull(initialConnection); + + killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection); + + killBroker(subsequentPortNumber); + + final Connection finalConnection = getConnectionToNodeInCluster(); + assertNull(finalConnection); + + closeConnection(initialConnection); + } + + public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception + { + final Connection connection = getConnectionToNodeInCluster(); + assertNotNull(connection); + + final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection); + + _clusterCreator.stopNode(brokerPortNumber); + _clusterCreator.startNode(brokerPortNumber); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + assertNotNull(subsequentConnection); + } + + public void testClusterLosingNodeRetainsData() throws Exception + { + final Connection initialConnection = getConnectionToNodeInCluster(); + + final String queueNamePrefix = getTestQueueName(); + final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'"; + final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'"; + + populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + + killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + + final Connection subsequentConnection = getConnectionToNodeInCluster(); + + assertNotNull("no valid connection obtained", subsequentConnection); + + checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); + } + + public void testRecoveryOfOutOfDateNode() throws Exception + { + /* + * TODO: Implement + * + * Cant yet find a way to control cleaning in a deterministic way to allow provoking + * a node to become out of date. We do now know that even a new joiner to the group + * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has + * done *any* cleaning and then adding a new node should be sufficient to cause this. + */ + } + + private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception + { + populateBrokerWithData(connection, 1, queueUrls); + } + + private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception + { + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + session.createConsumer(queue).close(); + sendMessage(session, queue, noOfMessages); + } + } + + private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException + { + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + for (final String queueUrl : queueUrls) + { + final Queue queue = session.createQueue(queueUrl); + final MessageConsumer consumer = session.createConsumer(queue); + final Message message = consumer.receive(1000); + session.commit(); + assertNotNull("Queue " + queue + " should have message", message); + assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX)); + } + } + + private Connection getConnectionToNodeInCluster() throws URLSyntaxException + { + Connection connection = null; + Set<Integer> runningBrokerPorts = getBrokerPortNumbers(); + + for (int brokerPortNumber : runningBrokerPorts) + { + try + { + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + break; + } + catch(JMSException je) + { + assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active")); + } + } + return connection; + } + + private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException, + InterruptedException + { + try + { + // NewMasterEvent is received twice: first for the existing master, + // second for a new master + CountDownLatch newMasterLatch = new CountDownLatch(2); + _clusterCreator.startMonitorNode(); + _clusterCreator.statListeningForNewMasterEvent(newMasterLatch); + + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + killBroker(initialPortNumber); + + assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS)); + } + finally + { + _clusterCreator.shutdownMonitor(); + } + } + + private void assertProducingConsuming(final Connection connection) throws JMSException, Exception + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 2); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + Message m2 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 2 is not received", m2); + assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX)); + session.commit(); + } + + private void closeConnection(final Connection initialConnection) + { + try + { + initialConnection.close(); + } + catch(Exception e) + { + // ignore. + // java.net.SocketException is seen sometimes on active connection + } + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java new file mode 100644 index 0000000000..eaa3c3eba4 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.test.utils.QpidTestCase; + +public class HAMessageStoreSmokeTest extends QpidTestCase +{ + private final BDBHAMessageStore _store = new BDBHAMessageStore(); + private final XMLConfiguration _config = new XMLConfiguration(); + + public void testMissingHAConfigThrowsException() throws Exception + { + try + { + _store.configure("test", _config); + fail("Expected an exception to be thrown"); + } + catch (ConfigurationException ce) + { + assertTrue(ce.getMessage().contains("BDB HA configuration key not found")); + } + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java new file mode 100644 index 0000000000..43cfa5f4d5 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.URLSyntaxException; + +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.monitor.GroupChangeEvent; +import com.sleepycat.je.rep.monitor.JoinGroupEvent; +import com.sleepycat.je.rep.monitor.LeaveGroupEvent; +import com.sleepycat.je.rep.monitor.Monitor; +import com.sleepycat.je.rep.monitor.MonitorChangeListener; +import com.sleepycat.je.rep.monitor.MonitorConfig; +import com.sleepycat.je.rep.monitor.NewMasterEvent; + +public class HATestClusterCreator +{ + protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); + + private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + + private static final int CYCLECOUNT = 2; + private static final int RETRIES = 2; + private static final int CONNECTDELAY = 1000; + + private final QpidBrokerTestCase _testcase; + private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); + private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>(); + private final String _virtualHostName; + private final String _configKeyPrefix; + + private final String _ipAddressOfBroker; + private final String _groupName ; + private final int _numberOfNodes; + private int _bdbHelperPort; + private int _primaryBrokerPort; + private Monitor _monitor; + + public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + { + _testcase = testcase; + _virtualHostName = virtualHostName; + _groupName = "group" + _testcase.getName(); + _ipAddressOfBroker = getIpAddressOfBrokerHost(); + _numberOfNodes = numberOfNodes; + _configKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store."; + _bdbHelperPort = 0; + } + + public void configureClusterNodes() throws Exception + { + int brokerPort = _testcase.findFreePort(); + + for (int i = 0; i < _numberOfNodes; i++) + { + int bdbPort = _testcase.getNextAvailable(brokerPort + 1); + _brokerPortToBdbPortMap.put(brokerPort, bdbPort); + + LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); + if (_bdbHelperPort == 0) + { + _bdbHelperPort = bdbPort; + } + + configureClusterNode(brokerPort, bdbPort); + collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts()); + + brokerPort = _testcase.getNextAvailable(bdbPort + 1); + } + } + + public void setAutoDesignatedPrimary(boolean autoDesignatedPrimary) throws Exception + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next(); + final String configKey = getConfigKey("highAvailability.autoDesignatedPrimary"); + brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(autoDesignatedPrimary)); + _primaryBrokerPort = brokerConfigEntry.getKey(); + } + + /** + * @param configKeySuffix "highAvailability.designatedPrimary", for example + * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example + */ + private String getConfigKey(String configKeySuffix) + { + final String configKey = StringUtils.substringAfter(_configKeyPrefix + configKeySuffix, "virtualhosts."); + return configKey; + } + + public void startNode(final int brokerPortNumber) throws Exception + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + + _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration()); + _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts()); + + _testcase.startBroker(brokerPortNumber); + } + + public void startCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + startNode(brokerPortNumber); + } + } + + public void startClusterParallel() throws Exception + { + final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size()); + try + { + List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>(); + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + Future<Object> future = executor.submit(new Callable<Object>() + { + public Object call() + { + try + { + _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(), + brokerConfigHolder.getTestVirtualhosts()); + return "OK"; + } + catch (Exception e) + { + return e; + } + } + }); + brokers.add(future); + } + for (Future<Object> future : brokers) + { + Object result = future.get(30, TimeUnit.SECONDS); + LOGGER.debug("Node startup result:" + result); + if (result instanceof Exception) + { + throw (Exception) result; + } + else if (!"OK".equals(result)) + { + throw new Exception("One of the cluster nodes is not started"); + } + } + } + catch (Exception e) + { + stopCluster(); + throw e; + } + finally + { + executor.shutdown(); + } + + } + + public void stopNode(final int brokerPortNumber) + { + _testcase.stopBroker(brokerPortNumber); + } + + public void stopCluster() throws Exception + { + shutdownMonitor(); + for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + { + try + { + stopNode(brokerPortNumber); + } + catch(Exception e) + { + LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); + } + } + } + + public int getBrokerPortNumberFromConnection(Connection connection) + { + final AMQConnection amqConnection = (AMQConnection)connection; + return amqConnection.getActiveBrokerDetails().getPort(); + } + + public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) + { + final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers(); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); + allBrokerPorts.remove(activeBrokerPort); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int inactiveBrokerPort = allBrokerPorts.iterator().next(); + return inactiveBrokerPort; + } + + public int getBdbPortForBrokerPort(final int brokerPortNumber) + { + return _brokerPortToBdbPortMap.get(brokerPortNumber); + } + + public Set<Integer> getBdbPortNumbers() + { + return new HashSet<Integer>(_brokerPortToBdbPortMap.values()); + } + + public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + final StringBuilder brokerList = new StringBuilder(); + + for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) + { + int brokerPortNumber = itr.next(); + + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES)); + if (itr.hasNext()) + { + brokerList.append(";"); + } + } + + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException + { + String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + return new AMQConnectionURL(url); + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + _testcase.getName() + bdbPort; + } + + public String getNodeHostPortForNodeAt(final int bdbPort) + { + return _ipAddressOfBroker + ":" + bdbPort; + } + + public String getHelperHostPort() + { + if (_bdbHelperPort == 0) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + + return _ipAddressOfBroker + ":" + _bdbHelperPort; + } + + public void setHelperHostPort(int bdbHelperPort) + { + _bdbHelperPort = bdbHelperPort; + } + + public int getBrokerPortNumberOfPrimary() + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + return _primaryBrokerPort; + } + + public int getBrokerPortNumberOfSecondaryNode() + { + final Set<Integer> portNumbers = getBrokerPortNumbersForNodes(); + portNumbers.remove(getBrokerPortNumberOfPrimary()); + return portNumbers.iterator().next(); + } + + public Set<Integer> getBrokerPortNumbersForNodes() + { + return new HashSet<Integer>(_brokerConfigurations.keySet()); + } + + private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception + { + final String nodeName = getNodeNameForNodeAt(bdbPort); + + _testcase.setConfigurationProperty(_configKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); + + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.groupName", _groupName); + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName); + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); + _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); + // TODO replication policy + } + + public String getIpAddressOfBrokerHost() + { + String brokerHost = _testcase.getBroker().getHost(); + try + { + return InetAddress.getByName(brokerHost).getHostAddress(); + } + catch (UnknownHostException e) + { + throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); + } + } + + private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(), + (XMLConfiguration) testVirtualhosts.clone())); + } + + public class BrokerConfigHolder + { + private final XMLConfiguration _testConfiguration; + private final XMLConfiguration _testVirtualhosts; + + public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts) + { + _testConfiguration = testConfiguration; + _testVirtualhosts = testVirtualhosts; + } + + public XMLConfiguration getTestConfiguration() + { + return _testConfiguration; + } + + public XMLConfiguration getTestVirtualhosts() + { + return _testVirtualhosts; + } + } + + public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) + { + final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved); + final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts(); + + final String configKey = getConfigKey("highAvailability.nodeHostPort"); + final String oldBdbHostPort = virtualHostConfig.getString(configKey); + + final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + final String oldHost = oldHostAndPort[0]; + + final String newBdbHostPort = oldHost + ":" + newBdbPort; + + virtualHostConfig.setProperty(configKey, newBdbHostPort); + collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); + } + + public void startMonitorNode() + { + shutdownMonitor(); + + MonitorConfig config = new MonitorConfig(); + config.setGroupName(_groupName); + int monitorPort = _testcase.findFreePort(); + config.setNodeName(getNodeNameForNodeAt(monitorPort)); + config.setNodeHostPort("" + monitorPort); + config.setHelperHosts(getHelperHostPort()); + + _monitor = new Monitor(config); + + ReplicationNode currentMaster = _monitor.register(); + LOGGER.info("Current master " + currentMaster.getName()); + } + + public void startListening(MonitorChangeListener listener) throws IOException + { + _monitor.startListener(listener); + } + + public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException + { + startListening(new MonitorChangeListenerSupport(){ + @Override + public void notify(NewMasterEvent newMasterEvent) + { + LOGGER.debug("New master is elected " + newMasterEvent.getMasterName()); + latch.countDown(); + } + }); + } + + public void shutdownMonitor() + { + if (_monitor != null) + { + try + { + _monitor.shutdown(); + } + catch (Exception e) + { + LOGGER.warn("Monitor shutdown error:", e); + } + } + } + + public static class MonitorChangeListenerSupport implements MonitorChangeListener + { + + @Override + public void notify(NewMasterEvent newMasterEvent) + { + } + + @Override + public void notify(GroupChangeEvent groupChangeEvent) + { + } + + @Override + public void notify(JoinGroupEvent joinGroupEvent) + { + } + + @Override + public void notify(LeaveGroupEvent leaveGroupEvent) + { + } + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index ba5ca842bf..23fd9bc24f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -24,7 +24,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import com.sleepycat.bind.tuple.IntegerBinding; @@ -94,7 +94,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase { assertEquals("Unexpected store version", -1, getStoreVersion()); _upgrader.upgradeIfNecessary(); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion()); assertContent(); } @@ -112,7 +112,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase List<String> expectedDatabases = new ArrayList<String>(); expectedDatabases.add(Upgrader.VERSION_DB_NAME); assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); - assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion()); nonExistentStoreLocation.delete(); } |
